17 并发编程
编程中(不使用线程时),一个进程中只有一个线程(主线程)来进行代码的解析执行。在使用threading时,主线程会创建并调用子线程来进行某些代码的执行,实现多个任务的并行执行。
threading.Thread(target=None, name=None, args=(), kwargs={}, daemon=false)
target : 函数
name:是线程名称。默认情况下,唯一名称的格式为“ Thread-N”,其中N是一个小十进制数字。
args、kwargs:函数的参数
daemon:是否设置线程为守护线程。如果是守护线程,主线程可以不再理会它是否执行完毕。
import threading
import time
def f1():
pass
def f2(a1,a2,id):
time.sleep(2)
f1()
print("t{}_end".format(id),'@',time.ctime())
print("t1_start @{}".format(time.ctime()))
t1=threading.Thread(target=f2,args=(111,222,1)) #创建一个线程,注意args参数是一个元组
t1.start() #运行线程
print("t2_start @{}".format(time.ctime()))
t2=threading.Thread(target=f2,args=(111,222,2))
t2.start()
print("t3_start @{}".format(time.ctime()))
t3=threading.Thread(target=f2,args=(111,222,3))
t3.start()
print("master finish @{}".format(time.ctime()))
以上示例中,主线程启动了3个子线程,在默认值t.daemon=false的情况下,主线程会等待所有的线程都执行完毕退出后再退出。
避免使用thread模块而使用threading模块的一个原因是该模块不支持守护线程这个概念。即当主线程退出时,所有子线程都将终止,不管它们是否仍在工作。如果你不希望发生这种行为,就要引入守护线程的概念了。
threading 模块支持守护线程,其工作方式是:守护线程一般是一个等待客户端请求服务的服务器。如果没有客户端请求,守护线程就是空闲的。如果把一个线程设置为守护线程,就表示这个线程是不重要的,进程退出时不需要等待这个线程执行完成。
如果主线程准备退出时,不需要等待某些子线程完成,就可以为这些子线程设置守护线程标记。该标记值为真时,表示该线程是不重要的,或者说该线程只是用来等待客户端请求而不做任何其他事情。
整个Python 程序(可以解读为:主线程)将在所有普通线程(非守护线程)退出之后才退出,换句话说,就是没有剩下存活的普通线程时。
如何设置子线程为守护线程?
定义时设置:t1=threading.Thread(target=fun,args=(999,),daemon=True)
运行前设置:
t1=threading.Thread(target=fun,args=(999,)) t1.daemon=True 或 t1.setDaemon(True) t1.start()要将一个线程设置为守护线程,需要在启动线程之前执行如下赋值语句:thread.daemon = True(调用~~thread.setDaemon(True)~~的旧方法已经弃用了)。同样,要检查线程的守护状态,也只需要检查这个值即可(对比过去调用thread.isDaemon()的方法)。一个新的子线程会继承父线程的守护标记。
示例:主线程不等待子线程
import threading
import time
def fun(id):
print("start_sleep:",id,time.ctime())
time.sleep(3)
print("end_sleep:",id,time.ctime())
print("id:",id)
if __name__ == "__main__":
t1=threading.Thread(target=fun,args=(999,))
t1.daemon=True #将线程程设置为了守护线程,守护线程可以理解为是一个不重要的线程,主线程退出时,无需等待它完成任务
t1.start()
time.sleep(0.1)
print("当前时间:",time.ctime())
for i in range(5):
print(i)
join(timeout=None)
父线程等待直到子线程终止。join()将阻塞调用线程,直到被调用join()方法的线程终止或者直到超时。如果超时,可以通过在join()之后执行is_alive()来判断是否超时,如果线程是alive的,则表示join()操作超时。》
join ()方法:父线程A创建了子线程B,如果父线程A中调用了B.join(),那么,父线程A就会join(链接)在B线程之后,会在调用的地方阻塞等待,直到子线程B完成操作(或超时)后,父线程才可以接着往下执行。
import threading
import time
def fun(id):
print("start_sleep",id,time.ctime(time.time()))
time.sleep(3)
print("end_sleep",id,time.ctime(time.time()))
print("id:",id)
if __name__ == "__main__":
t1=threading.Thread(target=fun,args=(999,))
t1.start()
t1.join(1) #序列执行而不是并行执行.通俗讲,主线程等待子线程t1.start()执行完毕后再执行后面的主线程代码
print(time.ctime())
for i in range(5):
print(i)
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,例如threading.Event()对象,表示线程畅通与否,默认值为"Fasle"。
如果“Flag”值为 False,那么当程序执行 event.wait()方法时就会阻塞;
如果“Flag”值为True,那么执行event.wait() 方法时便不再阻塞。
wait():根据Flag判断是否进行阻塞
clear():将“Flag”设置为默认值:False,表示线程阻塞
set():将“Flag”设置为True,表示线程不阻塞。
import threading
def do(event):
print('start')
event.wait()
print('execute')
event_obj = threading.Event() #创建一个event对象
for i in range(10):
t = threading.Thread(target=do, args=(event_obj,))
t.start()
event_obj.clear() #阻塞
inp = input('input:')
if inp == 'true':
event_obj.set() #非阻塞
针对一些公共资源,如全局变量,在多线程并行操作时,可是会引发数据安全问题,此时就需要锁机制来保证数据安全。
递归锁(可重入锁):必须由获取它的线程释放。一旦一个线程A获得了可重入锁,同一线程A就可以再次获得它而不会阻塞,且将递归级别增加一。此时,如果线程B想获取该锁,则必须等待该所被完全解锁(不属于任何线程),然后才能获取所有权,并将递归级别设置为1。如果一个锁阻塞了多个线程,则一次只能由一个线程获取所有权。线程必须在每次获取它后释放一次。每次获取,递归级别+1,每次释放,递归级别-1。
import threading
import time
gl_num = 0
lock = threading.RLock() #创建递归锁(可重入锁)
def Func():
lock.acquire() #获取锁(持锁)
global gl_num
gl_num += 1
time.sleep(1)
print(gl_num)
lock.release() #释放锁
for i in range(10):
t = threading.Thread(target=Func)
t.start()
is_alive()t.name=
python为什么要使用多进程?多进程为什么能提高效率?
Python 解释器有一个全局解释器锁(PIL),导致每个Python进程中最多同时运行一个线程,因此Python多线程程序并不能改善程序性能,不能发挥多核系统的优势。
multiprocessing.Process(target=None, name=None, args=(), kwargs={}, daemon=false)
进程创建与线程类似,参数不再解释。
import multiprocessing
import time
def func(id):
time.sleep(2)
print(id)
if __name__=="__main__":
t1=multiprocessing.Process(target=func,args=(1,))
t1.start()
t2=multiprocessing.Process(target=func,args=(2,))
t2.start()
t3=multiprocessing.Process(target=func,args=(3,))
t3.start()
print("end")
进程也支持p.daemon=和p.join()。与线程类似,不再解释。
进程间不使用数据共享:进程各自持有一份数据,默认无法共享数据(同一个进程中的所有线程共享一份数据。)
from multiprocessing import Process
import time
li = []
def foo(i):
time.sleep(1)
li.append(i)
print('say hi', li)
if __name__=="__main__":
for i in range(10):
p = Process(target=foo, args=(i,))
p.start()
p.join()
print('ending', li) #主进程内存 :子进程的资源继承(复制)自主进程
'''
输出:
say hi [0]
say hi [1]
say hi [2]
say hi [3]
say hi [4]
say hi [5]
say hi [6]
say hi [7]
say hi [8]
say hi [9]
ending []
'''
from multiprocessing import Process, Array
import time
temp = Array('i', [11, 22, 33, 44]) #‘i’是指定数组元素的类型为整形。
def Foo(i):
temp[i] = 100 + i
for item in temp:
print(i, '----->', item)
if __name__=="__main__":
for i in range(2):
p = Process(target=Foo, args=(i,))
p.start()
time.sleep(2)
for i in temp:
print(i)
'''
输出:
0 -----> 100
0 -----> 22
0 -----> 33
0 -----> 44
1 -----> 100
1 -----> 101
1 -----> 33
1 -----> 44
100
101
33
44
'''
说明:使用一个特殊的数据结构:数组
数组特性1:元素个数需事先指定,目的是分配连续的内存地址。
数组特性2:所有元素的数据类型必须统一。
from multiprocessing import Process, Manager
def Foo(i,dic):
dic[i] = 100 + i #此处的dic是一种特殊的数据结构
print(dic.values())
if __name__=='__main__':
manage = Manager()
dic = manage.dict() #两句可以写成:dic=Manager().dict() --> dic=multiprocessing.Manager().dict()
# dic=dict() #普通的字典对象,注意区分
for i in range(2):
p = Process(target=Foo, args=(i,dic,)) #注意args中使用dic对象
p.start()
p.join()
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
apply(func[, args[, kwds]])
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
from multiprocessing import Process, Pool
import time
def Foo(i):
time.sleep(1)
# print(i+100)
return i + 100
def Bar(arg):
print(arg)
if __name__=='__main__':
pool = Pool(5) #创建进程池对象,其中最多有5个子进程。进程池有两个常用方法,为apply和apply_async。这两个方法默认自动执行start()
for i in range(10):
#向池中加入进程
pool.apply_async(func=Foo, args=(i,), callback=Bar) #1、池中多个进程并发执行;并且可以设置回调函数(可选),func函数的返回值会作为回调函数的参数传递给回调函数;2、无p.join(),默认执行p.daemon=True,所以一般需要配合使用pool.join()
# pool.apply(func=Foo, args=(i,)) #1、池中多个进程顺序执行;2、默认执行p.join();
print('正在执行进程{}...'.format(i))
print('end')
pool.close() #调用join之前,先调用close函数或pool.terminate(),否则会出错。关闭pool,使其不在接受新的任务。阻止将更多任务提交给池。一旦完成所有任务,工作进程将退出。
pool.join() #阻塞主进程。进程池中子进程执行完毕后再关闭主进程,如果注释,那么apply_async时程序直接关闭。join函数等待所有子进程结束
multiprocessing.Queue() multiprocessing.Event()
协程:普通的线程实现n个任务并发需要n个线程,而协程可以通过一个线程就能够并发实现多个任务(通常是那些执行过程中需要阻塞的任务,如io操作)。协程能够从一个任务中跳出(如任务A)到另一个任务中去执行另一个任务(如任务B),而且当从另一个任务B中跳出重新进入任务A时,代码可以从上一次跳出的位置继续往后执行。
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(例如:IO操作),适用于协程;对于I/O密集型任务,gevent能对性能做很大提升的,协程的创建、调度开销都比线程小的多。
协程的实现可以通过greenlet模块或gevent模块(推荐,对greenlet实现封装)。
gevent.sleep(0)在代码中手动实现切换(让当前的协程sleep至少几秒钟。)
import gevent
def foo():
print('Running in foo')
gevent.sleep(0) #交出CPU控制权,时间至少为0秒,表现为切换到其他协程
print('Explicit context switch to foo again')
def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')
gevent.joinall([
gevent.spawn(foo), #通过gevent.spawn()运行某个协程:把函数创建协程实例
gevent.spawn(bar),
]) #gevent.joinall 阻塞当前流程,并执行所有给定的greenlet(列表形式给定),执行完继续往下走:把创建的协程实例添加到异步列表;等待列表中的所有实例执行完毕
gevent.spawn()方法会创建一个新的greenlet协程对象,并运行它。gevent.joinall()方法会等待所有传入的greenlet协程运行结束后再退出,这个方法可以接受一个timeout参数来设置超时时间,单位是秒。
遇到IO(阻塞式系统调用)操作自动切换:
使用猴子补丁的方式,gevent能够修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行。也就是通过猴子补丁的monkey.patch_xxx()来将python标准库中模块或函数改成gevent中的响应的具有协程的协作式对象,例如gevent.monkey.patch_socket()或gevent.monkey.patch_all()。这样在不改变原有代码的情况下,将应用的阻塞式方法,变成协程式的。
from gevent import monkey; monkey.patch_all() #使用猴子补丁
import gevent
import urllib.request
def f(url):
print('GET: %s' % url)
resp = urllib.request.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))
gevent.joinall([
#参数是一个gevent.spawn组成的列表
gevent.spawn(f, 'https://www.python.org/'), #gevent.spawn的参数:func,args…
gevent.spawn(f, 'https://www.baidu.com/'),
gevent.spawn(f, 'https://github.com/'),
]) #启动一个协程
from gevent import monkey;monkey.patch_all() # 使用猴子补丁
import gevent
import urllib.request
def fetch(i,url):
print('第{}个请求,GET: {}'.format(i+1,url))
resp = urllib.request.urlopen(url)
data = resp.read()
print('完成{}个请求, {} bytes received from {}.'.format(i+1,len(data), url))
url_list = [
'https://www.python.org/', 'https://www.baidu.com/', 'https://github.com/',
'https://www.baidu.com/', 'https://www.python.org/', 'https://github.com/',
'https://www.python.org/', 'https://github.com/','https://www.baidu.com/',
]
steps = 3
g_list = []
for i, url in enumerate(url_list):
g_list.append(gevent.spawn(fetch, i,url))
if i+1 % steps == 0: #i从0开始,先+1,让其从1开始
gevent.joinall(g_list)
g_list = []
else:
if g_list:
gevent.joinall(g_list)
import gevent
import gevent.pool
import time
def func(i):
print(i)
gevent.sleep(0)
time.sleep(0.1)
pool = gevent.pool.Pool(8)
for i in range(2000):
pool.spawn(func, i) #通过pool.spawn()运行协程池,从池中取一个协程来运行func。最多有8个协程同时运行
pool.join() #注意,协程池没有pool.close()方法
或者
import gevent
import gevent.pool
import time
def func(i):
print(i)
gevent.sleep(0)
time.sleep(0.1)
pool = gevent.pool.Pool(8)
threads = [pool.spawn(func, i) for i in range(2000)]
# threads = [pool.spawn(func, groups[i]) for i in xrange(len(groups))]
gevent.joinall(threads)
示例:
from gevent import monkey;monkey.patch_all() # 使用猴子补丁
import gevent.pool
import urllib.request
def fetch(i,url):
print('第{}个请求,GET: {}'.format(i+1,url))
resp = urllib.request.urlopen(url)
data = resp.read()
print('完成{}个请求, {} bytes received from {}.'.format(i+1,len(data), url))
url_list = [
'https://www.python.org/', 'https://www.baidu.com/', 'https://github.com/',
'https://www.baidu.com/', 'https://www.python.org/', 'https://github.com/',
'https://www.python.org/', 'https://github.com/','https://www.baidu.com/',
'https://www.python.org/', 'https://www.baidu.com/', 'https://github.com/',
'https://www.baidu.com/', 'https://www.python.org/', 'https://github.com/',
'https://www.python.org/', 'https://github.com/', 'https://www.baidu.com/',
'https://www.python.org/', 'https://www.baidu.com/', 'https://github.com/',
'https://www.baidu.com/', 'https://www.python.org/', 'https://github.com/',
'https://www.python.org/', 'https://github.com/', 'https://www.baidu.com/',
'https://www.python.org/', 'https://www.baidu.com/', 'https://github.com/',
'https://www.baidu.com/', 'https://www.python.org/', 'https://github.com/',
'https://www.baidu.com/', 'https://www.python.org/', 'https://github.com/',
'https://www.baidu.com/', 'https://www.python.org/', 'https://github.com/',
'https://www.python.org/', 'https://github.com/','https://www.baidu.com/',
]
pool = gevent.pool.Pool(8)
# for i,url in enumerate(url_list):
# pool.spawn(fetch, i,url) #通过pool.spawn()运行协程池,从池中取一个协程来运行func。最多有8个协程同时运行
# pool.join() #注意,协程池没有pool.close()方法
threads=[pool.spawn(fetch,i,url) for i,url in enumerate(url_list)]
gevent.joinall(threads)
生产者-消费者模型:
商品或服务的生产者生产商品,然后将其放到类似队列的数据结构中。生产商品的时间是不确定的,同样消费者消费生产者生产的商品的时间也是不确定的。
我们使用queue 模块来提供线程间通信的机制,从而让线程之间可以互相分享数据。具体而言,就是创建一个队列,让生产者(线程)在其中放入新的商品,而消费者(线程)消费这些商品。
import queue
q = queue.Queue() #创建一个队列,可以指定队列长度
q.put('a') #放一个元素进队列
q.put('b')
print(q.get()) #从队列中取出一个元素,先进先出原则
当 Queue 为 Queue.Full 状态时,再 put() 会堵塞,当状态为 Queue.Empty 时,再 get() 也是。当 put() 或 get() 设置了超时参数,而超时的时候,会抛出异常。
q= queue.Queue(maxsize=0)
Queue.qsize()
Queue.empty()
Queue.full()
Queue.put(item, block=True, timeout=None)
Queue.put_nowait(item)
Queue.get(block=True, timeout=None)
Queue.get_nowait()
Queue.task_done()
Queue.join()
Queue.queue.clear()
- q = queue.Queue(maxsize=0)
创建一个FIFO队列对象。maxsize为一个整数,表示队列的最大条目数。一旦队列满,插入将被阻塞直到队列中存在空闲空间。如果maxsize小于等于0,队列大小为无限。maxsize默认为0。
- Queue.qsize()
返回队列的近似大小。注意,qsize() > 0并不能保证接下来的get()方法不被阻塞;同样,qsize() < maxsize也不能保证put()将不被阻塞。
- Queue.empty()
如果队列是空的,则返回True,否则False。如果empty()返回True,并不能保证接下来的put()调用将不被阻塞。类似的,empty()返回False也不能保证接下来的get()调用将不被阻塞。
- Queue.full()
如果队列满则返回True,否则返回False。如果full()返回True,并不能保证接下来的get()调用将不被阻塞。类似的,full()返回False也不能保证接下来的put()调用将不被阻塞。
- Queue.put(item, block=True, timeout=None)
放item到队列中。
如果block是True,且timeout是None,该方法将一直等待直到有队列有空余空间。如果timeout是一个正整数,该方法则最多阻塞timeout秒并抛出Full异常。
如果block是False并且队列满,则直接抛出Full异常(这时timeout将被忽略)。
- Queue.put_nowait(item)
等价于put(item, False)。
- Queue.get(block=True, timeout=None)
从队列中移除并返回一个条目。如果block是True并且timeout是None,当队列为空时,该方法将阻塞直到队列中有条目可用。如果timeout是正整数,该方法将最多阻塞timeout秒并抛出Empty异常。如果block是False并且队列为空,则直接抛出Empty异常(这时timeout将被忽略)。
- Queue.get_nowait()
等价于get(False)。
如果需要跟踪进入队列中的任务是否已经被消费者线程处理完成,可以使用下面提供的两个方法:
- Queue.join()
阻塞直到队列中所有条目都被获取并处理。
当一个条目被增加到队列时,未完成任务的计数将增加。当一个消费者线程调用task_done()时,未完成任务的计数将减少。当未完成任务的计数减少到0时,join()解锁。
- Queue.task_done()
一般与join()操作连用,用于get()之后,否则永远阻塞在join()
- Queue.queue.clear() 清空队列中的所有元素
设计模型:
1、创建一个队列 ,队列中存放要执行的任务,放完任务后放object()对象,来标识任务结束。创建多个线程来执行队列中的任务。(队列中除了放任务外,还会放StopEvent,用于标识任务队列结束,线程一旦获取到StopEvent则认为所有的任务都处理完了)
2、列表generate_list记录了线程池中线程的实时数量,创建一个线程就会记录到generate_list中,销毁一个线程就会从generate_list中删除,(销毁线程的唯一途径就是q.get()到的event为StopEvent)。处理完当前任务的线程会变为空闲线程,记录到spare_list中。
3、创建线程时,如果有空闲线程,则使用空闲线程而不再创建新的线程。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading
import contextlib
#创建一个特殊对象(元类),用于区分与队列中的任务,如果从队列中取到的是StopEvent(即object对象),则认为队列任务处理完了
StopEvent = object()
class ThreadPool(object):
def __init__(self, max_num, max_task_num = None):
if max_task_num: #如果存在max_task_num参数(存在=不为空),则创建一个指定数量的队列
self.q = queue.Queue(max_task_num)
else: #创建一个队列长度无限的队列
self.q = queue.Queue()
self.max_num = max_num #线程池中的最大线程数
self.cancel = False
self.terminal = False
self.generate_list = [] #线程池中真正产生的线程的列表,因为在一次调用线程池的过程中,产生的线程数量可能小于max_num
self.spare_list = [] #空闲的线程列表。作用:如果任务很快就能完成,当要创建新的线程时,之前的线程就已经完成了任务,此时就无需创建新的线程而继续使用空闲线程。例如max_num=5时,可能2个线程就能完成所有的任务
#创建了一个线程,该线程就会不断地去获取任务来执行(通过while去调用实现操作的函数),直到获取的任务为StopEvent时不再让该线程执行新的任务从而变为”垃圾线程“,python解释器会将对其进行垃圾回收。
def apply_async(self, func, args, callback=None): #应用线程池执行任务
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数,为一个元组
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数:1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
if self.cancel:
return
if len(self.spare_list) == 0 and len(self.generate_list) < self.max_num: #动态创建线程:如果没有空闲线程并且线程池中真正产生的线程数量小于最大线程数时,就创建一个线程
self.generate_thread()
w = (func, args, callback,) #任务队列中的一个元素
self.q.put(w)
def generate_thread(self):
"""
创建一个线程
"""
t = threading.Thread(target=self.call) #生成一个线程,去执行操作(真正地去执行任务)
t.start()
def call(self):
"""
循环去获取任务函数并执行任务函数
"""
#获取当前线程,并追加到列表generate_list
current_thread = threading.currentThread() #返回当前的线程(可以理解为一个变量名)。
self.generate_list.append(current_thread)
event = self.q.get()
while event != StopEvent: #StopEvent是一个与任务不同数据类型的对象,用于标识队列中的任务处理完毕。
#开始执行操作(函数)
func, arguments, callback = event
try:
result = func(*arguments)
success = True
except Exception as e:
success = False
result = None
#函数执行完毕
#执行回调函数
if callback is not None:
try:
callback(success, result)
except Exception as e:
pass
#任务执行完毕后加入spare_list,然后再次从队列中获取新的任务,获取到任务后,则从spare_list中移除,如果获取不到任务,则线程阻塞。
with self.worker_state(self.spare_list, current_thread):
if self.terminal:
event = StopEvent
else:
event = self.q.get() #此处可能存在阻塞(队列中没有任务时)
else:
self.generate_list.remove(current_thread) #当该线程在所有任务执行完毕没有新任务需要执行时,或当调用terminal方法时,此时获取的StopEvent。则会将当前线程从generate_list移除。线程从generate_list移除 的唯一途径就是q.get()到的event为StopEvent。
def close(self): #处理完所有的任务后退出线程池。:通过在队列最后添加StopEvent
"""
执行完所有的任务后,所有线程停止
"""
self.cancel = True
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1
def terminate(self): #线程处理完当前任务后退出,不再处理后续队列中的任务:通过直接返回StopEvent
"""
无论是否还有任务,终止线程
"""
self.terminal = True #对于任务队列中还有未处理的任务时,通过这种方式终止.
while self.generate_list: #对于任务队列中的任务都处理完了时,所有的线程都阻塞在q.get(),通过这种方式终止,相当于close()
self.q.put(StopEvent)
self.q.queue.clear() #清空队列中的所有任务
@contextlib.contextmanager #装饰器,装饰后的函数可以使用with语句创建一个上下文管理器
def worker_state(self, spare_list, current_thread):
"""
用于记录线程中正在等待的线程数
"""
spare_list.append(current_thread)
try:
yield #返回值为None
finally:
spare_list.remove(current_thread)
# How to use
import time
def callback(status, result):
# status, execute action() status
# result, execute action() return value
pass
def action(i):
time.sleep(0.5)
print(i)
pool = ThreadPool(5)
for i in range(30):
ret = pool.apply_async(action, (i,), callback) #调用30次线程池(执行30个任务),每调用一次,就会在任务队列中加入一个任务,并动态创建一个线程。
# time.sleep(5)
# print(len(pool.generate_list), len(pool.spare_list))
# print(len(pool.generate_list), len(pool.spare_list))
pool.close() #让线程不再执行任务,所以也不会再阻塞,并且从generate_list移除。如果不进程pool.close(),则线程阻塞在q.get() 。
# pool.terminate()