python 多进程 协程里并发执行协程时部分阻塞超时怎么办

主题信息(必填)
主题描述(最多限制在50个字符)
申请人信息(必填)
申请信息已提交审核,请注意查收邮件,我们会尽快给您反馈。
如有疑问,请联系
CSDN &《程序员》研发主编,投稿&纠错等事宜请致邮
你只管努力,剩下的交给时光!
如今的编程是一场程序员和上帝的竞赛,程序员要开发出更大更好、傻瓜都会用到软件。而上帝在努力创造出更大更傻的傻瓜。目前为止,上帝是赢的。个人网站:。个人QQ群:、
个人大数据技术博客:
随着node.js的盛行,相信大家今年多多少少都听到了异步编程这个概念。Python社区虽然对于异步编程的支持相比其他语言稍显迟缓,但是也在Python3.4中加入了asyncio,在Python3.5上又提供了async/await语法层面的支持,刚正式发布的Python3.6中asyncio也已经由临时版改为了稳定版。下面我们就基于Python3.4+来了解一下异步编程的概念以及asyncio的用法。一、多线程
  多线程就是允许一个进程内存在多个控制权,以便让多个函数同时处于激活状态,从而让多个函数的操作同时运行。即使是单CPU的计算机,也可以通过不停地在不同线程的指令间切换,从而造成多线程同时运行的效果。
  多线程相当于一个并发(concunrrency)系统。并发系统一般同时执行多个任务。如果多个任务可以共享资源,特别是同时写入某个变量的时候,就需要解决同步的问题,比如多线程火车售票系统:两个指令,一个指令检查票是否卖完,另一个指令,多个窗口同时卖票,可能出现卖出不存在的票。
  在并发情况下,指令执行的先后顺序由内核决定。同一个线程内部,指令按照先后顺序执行,但不同线程之间的指令很难说清除哪一个会先执行。因此要考虑多线程同步的问题。同步(synchronization)是指在一定的时间内只允许某一个线程访问某个资源。&
详情请看:
1、thread模块
2、threading模块
threading.Thread 创建一个线程。
给判断是否有余票和卖票,加上互斥锁,这样就不会造成一个线程刚判断没有余票,而另外一个线程就执行卖票操作。
#! /usr/bin/python
#-* coding: utf-8 -*
# __author__ ="tyomcat"
import threading
import time
def booth(tid):
global lock
while True:
lock.acquire()
print "窗口:",tid,",剩余票数:",i
time.sleep(1)
print "Thread_id",tid,"No more tickets"
os._exit(0)
lock.release()
time.sleep(1)
lock=threading.Lock()
for k in range(10):
new_thread = threading.Thread(target=booth,args=(k,))
new_thread.start()
二、协程(又称微线程,纤程)
  协程,与线程的抢占式调度不同,它是协作式调度。协程也是单线程,但是它能让原来要使用异步+回调方式写的非人类代码,可以用看似同步的方式写出来。
1、协程在python中可以由生成器(generator)来实现。
  首先要对生成器和yield有一个扎实的理解.
  调用一个普通的python函数,一般是从函数的第一行代码开始执行,结束于return语句、异常或者函数执行(也可以认为是隐式地返回了None)。
一旦函数将控制权交还给调用者,就意味着全部结束。而有时可以创建能产生一个序列的函数,来&保存自己的工作&,这就是生成器(使用了yield关键字的函数)。
  能够&产生一个序列&是因为函数并没有像通常意义那样返回。return隐含的意思是函数正将执行代码的控制权返回给函数被调用的地方。而"yield"的隐含意思是控制权的转移是临时和自愿的,我们的函数将来还会收回控制权。
&详情请看:
看一下生产者/消费者的例子:
#! /usr/bin/python
#-* coding: utf-8 -*
# __author__ ="tyomcat"
import time
import sys
def produce(l):
if i & 10:
l.append(i)
time.sleep(1)
def consume(l):
p = produce(l)
while len(l) & 0:
print l.pop()
except StopIteration:
sys.exit(0)
if __name__ == "__main__":
consume(l)
当程序执行到produce的yield i时,返回了一个generator并暂停执行,当我们在custom中调用p.next(),程序又返回到produce的yield i 继续执行,这样 l 中又append了元素,然后我们print l.pop(),直到p.next()引发了StopIteration异常。
2、Stackless Python
3、greenlet模块
  基于greenlet的实现则性能仅次于Stackless Python,大致比Stackless Python慢一倍,比其他方案快接近一个数量级。其实greenlet不是一种真正的并发机制,而是在同一线程内,在不同函数的执行代码块之间切换,实施&你运行一会、我运行一会&,并且在进行切换时必须指定何时切换以及切换到哪。
4、eventlet模块
详情请看:
三、多进程
1、子进程(subprocess包)
  在python中,通过subprocess包,fork一个子进程,并运行外部程序。
  调用系统的命令的时候,最先考虑的os模块。用os.system()和os.popen()来进行操作。但是这两个命令过于简单,不能完成一些复杂的操作,如给运行的命令提供输入或者读取命令的输出,判断该命令的运行状态,管理多个命令的并行等等。这时subprocess中的Popen命令就能有效的完成我们需要的操作
&&&import subprocess
&&&command_line=raw_input()
ping -c 10
&&&args=shlex.split(command_line)
&&&p=subprocess.Popen(args)
  利用subprocess.PIPE将多个子进程的输入和输出连接在一起,构成管道(pipe):
import subprocess
child1 = subprocess.Popen(["ls","-l"], stdout=subprocess.PIPE)
child2 = subprocess.Popen(["wc"], stdin=child1.stdout,stdout=subprocess.PIPE)
out = municate()
print(out)
communicate() 方法从stdout和stderr中读出数据,并输入到stdin中。
2、多进程(multiprocessing包)
详情请看:
  (1)、multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。
  进程池&(Process Pool)可以创建多个进程。
  apply_async(func,args)& 从进程池中取出一个进程执行func,args为func的参数。它将返回一个AsyncResult的对象,你可以对该对象调用get()方法以获得结果。
  close()& 进程池不再创建新的进程
  join()&& wait进程池中的全部进程。必须对Pool先调用close()方法才能join。
#! /usr/bin/env python
# -*- coding:utf-8
# __author__ == "tyomcat"
# "我的电脑有4个cpu"
from multiprocessing import Pool
import os, time
def long_time_task(name):
print 'Run task %s (%s)...' % (name, os.getpid())
start = time.time()
time.sleep(3)
end = time.time()
print 'Task %s runs %0.2f seconds.' % (name, (end - start))
if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Pool()
for i in range(4):
p.apply_async(long_time_task, args=(i,))
print 'Waiting for all subprocesses done...'
print 'All subprocesses done.'
(2)、多进程共享资源
通过共享内存和Manager对象:用一个进程作为服务器,建立Manager来真正存放资源。其它的进程可以通过参数传递或者根据地址来访问Manager,建立连接后,操作服务器上的资源。
#! /usr/bin/env python# -*- coding:utf-8
-*-# __author__ == "tyomcat"from multiprocessing import Queue,Poolimport multiprocessing,time,randomdef write(q):
for value in
['A','B','C','D']:
print "Put %s to Queue!" % value
q.put(value)
time.sleep(random.random())def read(q,lock):
while True:
lock.acquire()
if not q.empty():
value=q.get(True)
print "Get %s from Queue" % value
time.sleep(random.random())
lock.release()if __name__ == "__main__":
manager=multiprocessing.Manager()
q=manager.Queue()
lock=manager.Lock()
pw=p.apply_async(write,args=(q,))
pr=p.apply_async(read,args=(q,lock))
print "所有数据都写入并且读完"
  无论是线程还是进程,使用的都是同步进制,当发生阻塞时,性能会大幅度降低,无法充分利用CPU潜力,浪费硬件投资,更重要造成软件模块的铁板化,紧耦合,无法切割,不利于日后扩展和变化。
  不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。多个线程之间在一些访问互斥的代码时还需要加上锁,
  现下流行的异步server都是基于事件驱动的(如nginx)。
  异步事件驱动模型中,把会导致阻塞的操作转化为一个异步操作,主线程负责发起这个异步操作,并处理这个异步操作的结果。由于所有阻塞的操作都转化为异步操作,理论上主线程的大部分时间都是在处理实际的计算任务,少了多线程的调度时间,所以这种模型的性能通常会比较好。
阅读(...) 评论()介绍了 Python 中的 yield 关键字。此篇介绍如何使用 yield 表达式,在 Python 中实现一个最基本的协程调度示例,避免 I/O 操作占用大量 CPU 计算时间。
协程及其特点协程是一种特殊的子程序,它可以在特定的位置暂停/恢复(而不是像普通函数那样在逻辑上顺序执行);并且每当协程暂停时,调用者可以从协程中获取状态,决定调用者接下来的走向;以及每当协程恢复时,调用者可以传递信息给协程,影响协程的行为。
从「可以暂停/恢复」来看,协程类似于 Python 中的迭代器。不过,迭代器仅只是将值返回给调用者,其内部的逻辑是确定的,无法与调用者做更多的交互。
因为协程可以暂停/恢复,所以,我们可以在多个协程中分别执行不同的任务;然后由调度器管理协程之间的执行,实现多任务并发。
此外,协程和调用者在同一线程中执行;考虑到,协程和调用者之间的切换,没有 CPU 上下文切换的开销。因此,相对使用多线程、多进程实现多任务并发,协程在这方面的开销非常小。
同样由于协程之间共享线程,所以使用协程实现的多任务并发,无法实现。因此,显而易见,协程适合 I/O 密集型的任务并发,而不适合 CPU 密集型的任务并发。
协程调度基础最简单的协程的例子,我们实际上已经见过了。在「使用 send() 方法与生成器函数通信」一节中,func 就扮演了协程函数的角色。每当协程函数在 yield 表达式处暂停,调用者就收到上一步计算的结果;每当协程函数自 yield 表达式处恢复,协程函数就用接收到的数进行下一轮计算。
在见识过最简单的协程示例之后,我们试着看看在调度协程的过程中,需要怎样处理。
coroutine_basic.py1234567891011121314151617181920212223242526from collections import deque
class Dispatcher(object):
def __init__(self, tasks):
self.tasks = deque(tasks)
def next(self):
return self.tasks.pop()
def run(self):
while len(self.tasks):
task = self.next()
next(task)
except StopIteration:
self.tasks.appendleft(task) def greeting(name, times):
for i in range(times):
print("Hello, %s.%d!" % (name, i))dispatcher = Dispatcher([greeting('Liam', 5), greeting('Sophia', 4),
greeting('Cancan', 6)])dispatcher.run()
这段代码中,有两个主要角色:调度器 (2) 和任务 (9)。
从调度器的角度来说,我们自 collections 模块引入了 deque 容器 (1),用于在 (3) 处保存任务。而后,我们在 (4) 定义了调度器 Dispatcher 的轮询函数 next(),它返回下一个尚未终止的任务。在调度器的 run() 函数中,(5) 和 (8) 保证了循环处理所有尚未完成的任务并清理已完成的任务,(6) 和 (7) 则负责触发每个任务的下一步动作。
从任务的角度来说,greeting 是一个生成器函数,是具体的协程任务。在 (10) 处,yield 表达式标记了函数暂停/恢复的位置;它将逻辑上连续的任务,在时间上切分成了若干段。
这段代码执行起来结果大致是这样:
123456789101112131415Hello, Cancan.0!Hello, Sophia.0!Hello, Liam.0!Hello, Cancan.1!Hello, Sophia.1!Hello, Liam.1!Hello, Cancan.2!Hello, Sophia.2!Hello, Liam.2!Hello, Cancan.3!Hello, Sophia.3!Hello, Liam.3!Hello, Cancan.4!Hello, Liam.4!Hello, Cancan.5!
看起来和多线程那种乱七八糟的输出顺序有点像,不是吗?当然,此处由于使用 deque.pop() 轮询任务队列,所以输出顺序大致是有迹可循的。不过,这并不影响我们将其作为协程调度的示例。
在这个例子中,尽管调用者和协程之间没有其他的通信,协程函数内也没有真正意义上的 I/O 操作,但我们仍可以进行一些总结。
首先,生成器函数充当了协程函数,实现了协程。
其次,协程任务在逻辑上是连续的,但是我们可以用 yield 表达式在时间上把协程任务分成若干部分。
再次,用 yield 分割的任务,需要有一个机制控制器暂停/恢复。这个机制此处由调度器提供。
再者,对于调度器来说,它需要知道「有哪些协程任务需要恢复」。因此,它必然直接或间接地维护一个事件队列。此处,我们用 Dispatcher.tasks 完成了这一工作。
最后,对于每个协程(任务)来说,一旦被暂停,其恢复就必须依赖主动唤起。因此,调度器必须「恰到好处」地反复唤起线程——不能多也不能少:多则浪费执行时间,甚至抛出异常;少则留下未能完成的任务。因此,调度器必须恰当地维护上述队列,确定何时从队列中移除已完成的任务。在我们的例子中,(6) 和 (7) 协同完成了这一工作。
异步 I/O 任务模拟回顾一下刚才的协程任务。
1234def greeting(name, times):
for i in range(times):
print("Hello, %s.%d!" % (name, i))
在这个任务里,yield 表达式将原本在逻辑上连续的循环,人为地在时间上切分成了若干份。然而,除了用于演示暂停/恢复的携程调度之外,这个例子实际上没有必要使用协程实现。这是因为,在协程任务中,去掉 yield 表达式之后,所有的操作都是立即完成的;不存在需要阻塞以等待 I/O 的空耗 CPU 的情况。
下列代码模拟了一个需要阻塞等待 I/O 的任务。
1234567from time import sleepfrom random import random as rddef greeting(name, times, duration = 1):
for i in range(times):
sleep(2 * duration * rd())
print("Hello, %s.%d!" % (name, i))
此处,新定义的 greeting 函数 (1) 有一个新的参数:duration。而后,在每次循环打印招呼信息的之前,会现行阻塞一段时间 (2)。这一阻塞就模拟了实际情况中的 I/O 类操作:空占 CPU 资源,但不进行任何计算。阻塞的时间是 2 * duration * rd(),这是一个一 duration 为期望的随机变量,用来模拟预计阻塞 duration 秒但实际情况会有波动的 I/O 任务。
假设 duration 设置为定值 1 而 times 设置为定值 3,那么执行一次 greeting 函数,平均需要耗时 3 秒。如若顺序执行 3 个这样的函数,平均下来,一共需要耗费 9 秒的时间。而这 9 秒之中,大多数时间 CPU 都仅只在空耗,没有执行实际的计算任务。因此,我们可以考虑用协程将它们并发起来执行,降低总的空耗的时间。为此,我们有如下思路。
将每个 I/O 任务理解为一个事件;
维护一个队列,用于记录尚在进行中的事件,以便后续操作;
当事件生成时,向上述队列注册(即将事件添加进队列);
使用轮询(polling)等方式,捕获完成的事件;
对已完成的事件,进行后续操作(特别地,恢复协程函数),而后从队列中删除该事件。
现在,我们开始逐步在这一思路的指导下,实现协程并发。
引出休眠事件(SleepEvent)回顾一下新版的 greeting 函数。若要通过生成器实现协程,就必然要添加 yield 表达式。
1234567from time import sleepfrom random import random as rddef greeting(name, times, duration = 1):
for i in range(times):
yield sleep(2 * duration * rd())
print("Hello, %s.%d!" % (name, i))
简单粗暴地以 (1) 的方式加上 yield 表达式是不行的。这是因为,yield 表达式会对 sleep 函数求值,而后将该值返回给调用者并暂停。但是,对 sleep 函数求值的过程,就是模拟的 I/O 操作,会阻塞执行线程。在阻塞完毕之后,再通过 yield 暂停,这就没有意义了。
1234567def coroutine_sleep(duration):
return SleepEvent(duration)
def greeting(name, times, duration = 1):
for i in range(times):
yield coroutine_sleep(duration)
print("Hello, %s.%d!" % (name, i))
因此,我们需要定义新的 coroutine_sleep 函数 (1)。这个函数会生成一个事件(SleepEvent),然后不阻塞地立即返回 (2)。因此,在 (3) 处,yield 表达式会将 coroutine_sleep 返回的 SleepEvent 对象传递给协程函数的调用者,并暂停当前协程函数。
定义事件框架接下来,我们需要定义事件框架。在实际动手之前,我们应该先分析一下一个事件类需要有哪些功能。
首先,事件应该有能力让外部知道自身存在。因此事件类应该伴随一个队列;并且在生成事件对象时,将自身注册进这个队列。
其次,事件应该有能力让外部知道自身状态,以便检查事件状态,进而进行下一步操作。因此,事件类应该是一个闭包,保存生成事件时的一些状态;并提供一个接口,利用这些状态检查事件是否完成。
最后,事件应当提供一个接口,记录在事件完成之后应当做什么;并且在事件完成之后执行这些操作。
据此,我们应该有如下代码。
12345678910111213events_list = list()
class Event(object):
def __init__(self, *args, **kwargs):
events_list.append(self)
self._callback = lambda:None
def is_ready(self):
ready = self._is_ready()
self._callback()
return ready
def set_callback(self, callback):
self._callback = callback
这里,(1) 处我们定义了一个全局的队列,用于记录尚在进行中的事件;与此同时,每当生成事件类对象时,(2) 会将当前事件对象注册到队列中。(3) 则定义了回调函数,用于记录事件完成之后执行什么操作。
(4) 和 (6) 分别是对外的接口。(4) 让外部有能力知道自身状态,其中 _is_ready() 需要在子类中实现;而 (6) 允许外部记录在事件完成之后应当做什么。(5) 则保证了当事件完成之后,(6) 中的设置会被正确执行。
至此,我们可以定义出 SleepEvent 类。
12345678910from time import time as current_timefrom random import random as rdclass SleepEvent(Event):
def __init__(self, duration):
super(SleepEvent, self).__init__(duration)
self._duration = 2 * rd() * duration
self._start_time = current_time()
def _is_ready(self):
return (current_time() - self._start_time &= self._duration)
这里,(1) 处定义了 SleepEvent 事件类,用来模拟 I/O 事件;模拟的核心在于 (2) 处定义的睡眠时长。(3) 则记录了事件诞生时的状态,用在 (4) 处确认事件是否已完成。
至此,协程函数这一侧的代码我们已经完成了,接下来我们看看调度器一侧的代码如何实现。
用轮询捕捉已完成的事件因为我们在 events_list 中保存了所有尚在执行中的事件。这是相当简单的工作,所以不作过多的解释。
12345while len(events_list):
for event in events_list:
if event.is_ready():
events_list.remove(event)
唤醒逻辑在 Event 类的定义中,is_ready() 函数会在事件完成后调用 _callback 函数。而对于协程函数来说,一个事件完成后,需要做的事情无非是:唤醒,恢复执行到下一个暂停点。因此可以有这样的唤醒逻辑。
123456def _next(gen_task):
yielded_event = next(gen_task)
yielded_event.set_callback(lambda: _next(gen_task))
except StopIteration:
这里,(1) 调用 Python 内建的 next 函数,唤醒协程函数,执行到下一个暂停点,并接受其返回值,保存在 yielded_event 当中。而后,在 (2) 处将该 Event 对象设置为 Lambda 函数 lambda: _next(gen_task)。显然,这是一个递归调用 _next 函数自身的闭包——捕获了需要继续唤醒的生成器 gen_task。若生成器执行完毕,则无需继续唤醒。因此在 (3) 处,直接 pass 即可。
完整实验将上述代码整合起来,就可以做实验了。
coroutine_async.py12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364from time import time as current_timefrom random import random as rdevents_list = list()class Event(object):
def __init__(self, *args, **kwargs):
events_list.append(self)
self._callback = lambda:None
def is_ready(self):
ready = self._is_ready()
self._callback()
return ready
def set_callback(self, callback):
self._callback = callbackclass SleepEvent(Event):
def __init__(self, duration):
super(SleepEvent, self).__init__(duration)
self._duration = 2 * rd() * duration
self._start_time = current_time()
def _is_ready(self):
return (current_time() - self._start_time &= self._duration)class Dispatcher(object):
def __init__(self, tasks):
self.tasks = tasks
self._start()
def _next(self, gen_task):
yielded_event = next(gen_task)
yielded_event.set_callback(lambda: self._next(gen_task))
except StopIteration:
def _start(self):
for task in self.tasks:
self._next(task)
def polling(self):
while len(events_list):
for event in events_list:
if event.is_ready():
events_list.remove(event)
breakdef coroutine_sleep(duration):
return SleepEvent(duration)def greeting(name, times, duration = 1):
for i in range(times):
yield coroutine_sleep(duration)
print("Hello, %s.%d!" % (name, i))if __name__ == '__main__':
def test():
dispatcher = Dispatcher([greeting('Liam', 3), greeting('Sophia', 3), greeting('Cancan', 3)])
dispatcher.polling()
import timeit
timeit_times = 10
avg_cost = timeit.timeit(lambda: test(), number = timeit_times) / timeit_times
print('%.3f' % (avg_cost))
可能的执行结果是:
123456789101112131415161718192021$ python coroutine_async.pyHello, Liam.0!Hello, Liam.1!Hello, Liam.2!Hello, Cancan.0!Hello, Sophia.0!Hello, Cancan.1!Hello, Sophia.1!Hello, Cancan.2!Hello, Sophia.2!......Hello, Liam.0!Hello, Sophia.0!Hello, Sophia.1!Hello, Cancan.0!Hello, Liam.1!Hello, Sophia.2!Hello, Liam.2!Hello, Cancan.1!Hello, Cancan.2!3.400
可以看到,平均下来,使用协程并发地执行三个 greeting 任务(times = 3, duration = 1)只需要 3.4 秒;耗时远低于顺序执行所需的 9 秒。Python中如何实现协程
1.1协程的概念
  协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。(其实并没有说明白~)
  我觉得单说协程,比较抽象,如果对线程有一定了解的话,应该就比较好理解了。
  那么这么来理解协程比较容易:
  线程是系统级别的,它们是由操作系统调度;协程是程序级别的,由程序员根据需要自己调度。我们把一个线程中的一个个函数叫做子程序,那么子程序在执行过程中可以中断去执行别的子程序;别的子程序也可以中断回来继续执行之前的子程序,这就是协程。也就是说同一线程下的一段代码&1&执行着执行着就可以中断,然后跳去执行另一段代码,当再次回来执行代码块&1&的时候,接着从之前中断的地方开始执行。
  比较专业的理解是:
  协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
1.2 协程的优缺点
协程的优点:
  (1)无需线程上下文切换的开销,协程避免了无意义的调度,由此可以提高性能(但也因此,程序员必须自己承担调度的责任,同时,协程也失去了标准线程使用多CPU的能力)
  (2)无需原子操作锁定及同步的开销
  (3)方便切换控制流,简化编程模型
  (4)高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
协程的缺点:
  (1)无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  (2)进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
2 Python中如何实现协程
2.1 yield实现协程  
  前文所述&子程序(函数)在执行过程中可以中断去执行别的子程序;别的子程序也可以中断回来继续执行之前的子程序&,那么很容易想到Python的yield,显然yield是可以实现这种切换的。
使用yield实现协程操作例子:
1 #! /usr/bin/env python
2 # -*- coding:utf-8 -*-
3 # Author: "Zing-p"
7 def consumer(name):
print("要开始啃骨头了...")
while True:
print("\033[31;1m[consumer] %s\033[0m " % name)
bone = yield
print("[%s] 正在啃骨头 %s" % (name, bone))
15 def producer(obj1, obj2):
obj1.send(None)
# 启动obj1这个生成器,第一次必须用None
&==& obj1.__next__()
obj2.send(None)
# 启动obj2这个生成器,第一次必须用None
&==& obj2.__next__()
while n & 5:
print("\033[32;1m[producer]\033[0m 正在生产骨头 %s" % n)
obj1.send(n)
obj2.send(n)
26 if __name__ == '__main__':
con1 = consumer("消费者A")
con2 = consumer("消费者B")
producer(con1, con2)
运行的结果:
2.2&greenlet实现协程
  Python的 greenlet就相当于手动切换,去执行别的子程序,在&别的子程序&中又主动切换回来。。。
greenlet协程例子:
1 #! /usr/bin/env python
2 # -*- coding:utf-8 -*-
4 from greenlet import greenlet
5 # greenlet 其实就是手动切换;gevent是对greenlet的封装,可以实现自动切换
7 def test1():
print("123")
gr2.switch()
# 切换去执行test2
print("456")
gr2.switch()
# 切换回test2之前执行到的位置,接着执行
13 def test2():
print("789")
gr1.switch()
# 切换回test1之前执行到的位置,接着执行
print("666")
19 gr1 = greenlet(test1)
# 启动一个协程 注意test1不要加()
20 gr2 = greenlet(test2)
21 gr1.switch()
2.3 &gevent 实现协程
  Gevent 是一个第三方库,可以轻松通过gevent实现协程程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
  gevent会主动识别程序内部的IO操作,当子程序遇到IO后,切换到别的子程序。如果所有的子程序都进入IO,则阻塞。
协程之gevent例子:
1 #! /usr/bin/env python3
2 # -*- coding:utf-8 -*-
4 import gevent
6 def func1():
print("func1 running")
gevent.sleep(2)
# 内部函数实现io操作
print("switch func1")
11 def func2():
print("func2 running")
gevent.sleep(1)
print("switch func2")
16 def func3():
print("func3
gevent.sleep(0)
print("func3 done..")
21 gevent.joinall([gevent.spawn(func1),
gevent.spawn(func2),
gevent.spawn(func3),
同步与异步性能区别:
1 import gevent
3 def task(pid):
Some non-deterministic task
gevent.sleep(0.5)
print('Task %s done' % pid)
10 def synchronous():
for i in range(1,10):
14 def asynchronous():
threads = [gevent.spawn(task, i) for i in range(10)]
gevent.joinall(threads)
18 print('Synchronous:')
19 synchronous()
21 print('Asynchronous:')
22 asynchronous()
  上面程序的重要部分是将task函数封装到greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall&函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。
遇到Io阻塞时会切换任务之【爬虫版】
1 #! /usr/bin/env python3
2 # -*- coding:utf-8 -*-
4 from urllib import request
5 import gevent,time
6 from gevent import monkey
8 monkey.patch_all()
# 把当前程序中的所有io操作都做上标记
10 def spider(url):
print("GET:%s" % url)
resp = request.urlopen(url)
data = resp.read()
print("%s bytes received from %s.." % (len(data), url))
16 urls = [
"https://www.python.org/",
22 start_time = time.time()
23 for url in urls:
spider(url)
25 print("同步耗时:",time.time() - start_time)
27 async_time_start = time.time()
28 gevent.joinall([
gevent.spawn(spider,"https://www.python.org/"),
gevent.spawn(spider,"/"),
gevent.spawn(spider,"/"),
33 print("异步耗时:",time.time() - async_time_start)
35 # 最好爬国外网站吧
通过gevent实现【单线程】下的多socket并发
server端:
1 import sys
2 import socket
3 import time
4 import gevent
6 from gevent import socket,monkey
7 monkey.patch_all()
10 def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
19 def handle_request(conn):
while True:
data = conn.recv(1024)
print("recv:", data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as
conn.close()
32 if __name__ == '__main__':
server(9999)
client端:
1 import socket
3 HOST = 'localhost'
# The remote host
4 PORT = 9999
# The same port as used by the server
5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
6 s.connect((HOST, PORT))
7 while True:
msg = bytes(input("&&:"),encoding="utf8")
s.sendall(msg)
data = s.recv(1024)
#print(data)
print('Received', repr(data))
14 s.close()
  觉得将就点个赞~~
阅读(...) 评论()}

我要回帖

更多关于 python 协程 并发 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信