引用
Python中已经有了threading模块,为什么还需要线程池呢,线程池又是什么东西呢?举个爬虫的例子 需要控制同时爬取的线程数,例子中创建了20个线程,而同时只允许3个线程在运行,但是20个线程都需要创建和销毁,线程的创建是需要消耗系统资源的,有没有更好的方案呢?其实只需要三个线程就行了,每个线程各分配一个任务,剩下的任务排队等待,当某个线程完成了任务的时候,排队任务就可以安排给这个线程继续执行。
这就是最简单的线程池思想,但是自己编写线程池很难写的比较完美,还需要考虑复杂情况下的线程同步,很容易发生死锁。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象(这里主要关注线程池),不仅可以帮我们自动调度线程,还可以做到:
-
主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
-
当一个线程完成的时候,主线程能够立即知道。
-
让多线程和多进程的编码接口一致。
python 线程池的使用
在python3以上版本中, python线程|进程池的使用进行了改进, 其中封装度较高的方法就是concurrent.futures模块提供的接口, 下面主要使用的就是concurrent模块的使用.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
import time
# 参数times用来模拟网络请求的时间
def time_block(times):
'''执行阻塞'''
print("time sleep {0} sec".format(times))
time.sleep(times)
return times
executor = ThreadPoolExecutor(max_workers=2)
# 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
task1 = executor.submit(time_block, (3))
task2 = executor.submit(time_block, (2))
# done方法用于判定某个任务是否完成
print(task1.done()) # True | False
# cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功
print(task2.cancel())
time.sleep(2)
print(task1.done())
# result方法可以获取task的执行结果
print(task1.result())
ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程池中最多能同时运行的线程数目。 使用submit函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图)submit()不是阻塞的,而是立即返回.
通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。上面的例子可以看出,由于任务有2s的延时,在task1提交后立刻判断,task1还未完成,而在延时4s之后判断,task1就完成了。 使用cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。这个例子中,线程池的大小设置为2,任务已经在运行了,所以取消失败。如果改变线程池的大小为1,那么先提交的是task1,task2还在排队等候,这是时候就可以成功取消。
使用result()方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的。
线程池实现阻塞的三种方法 (join方法)
推荐使用 as_completed 方法, 可对返回状态结果灵活操作
as_completed
上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用as_completed方法一次取出所有任务的结果。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
# 参数times用来模拟网络请求的时间
def time_block(times):
'''执行阻塞'''
print("time sleep {0} sec".format(times))
time.sleep(times)
return times
executor = ThreadPoolExecutor(max_workers=2)
task_list = [executor.submit(time_block, i) for i in range(5)]
for task in as_completed(task_list):
data = task.result()
print('thread data: {0}'.format(data))
print('任务完成')
wait
wait 方法可以让主线程阻塞,直到满足设定的要求。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
# 参数times用来模拟网络请求的时间
def time_block(times):
'''执行阻塞'''
print("time sleep {0} sec".format(times))
time.sleep(times)
return times
executor = ThreadPoolExecutor(max_workers=2)
task_list = [executor.submit(time_block, i) for i in range(5)]
wait(task_list)
print('任务完成')
map
executor.map 与 内置函数 map 操作方法一致
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
# 参数times用来模拟网络请求的时间
def time_block(times):
'''执行阻塞'''
print("time sleep {0} sec".format(times))
time.sleep(times)
return times
executor = ThreadPoolExecutor(max_workers=2)
time_list = [1,5,5,6,2,2]
for result in executor.map(time_block, time_list):
print(result)
print('任务完成')
线程池的异常处理
在使用python3的线程池过程中, 发现抛出的异常并不能在主线程中捕获输出, 在发现bug之前一直以为线程模块正常 原因: 查看源码可以输出的异常在返回主线程之前已经被捕获处理(异常的子线程会被内部结束运行),并不会抛出(raise), 所以需要我们手动捕获抛出异常
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
import traceback
# 参数times用来模拟网络请求的时间
def time_block(times):
'''执行阻塞'''
print("time sleep {0} sec".format(times))
time.sleep(times)
if times // 2:
return times / 0
return times
executor = ThreadPoolExecutor(max_workers=2)
time_list = [1,2,5,6,2,2,3]
task_list = [executor.submit(time_block, i) for i in time_list]
for task in as_completed(task_list):
try:
print(task.result())
except Exception as e:
print(traceback.print_exc(e))
# print('捕获异常: {0}'.format(e))
print('任务完成')
进程池用法
ThreadPoolExecutor 进程池接口封装与线程池用法一致 把ThreadPoolExecutor换成ProcessPoolExecutor其余用法全部相同