博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python:concurrent异步并行任务编程模块、 线程池、进程池、上下文管理
阅读量:4111 次
发布时间:2019-05-25

本文共 11185 字,大约阅读时间需要 37 分钟。

目录



concurrent包

3.2版本引入的模块。

异步并行任务编程模块,提供一个高级的异步可执行的便利接口。

提供2个池执行器:

ThreadPoolExcentor   异步调用线程池的Executor
ProcessPoolExecutor 异步调用的进程池的Executor

ThreadPoolExecutor对象

首先需要定义一个池的执行器对象,Exector子类对象

方法 含义
ThreadPoolExector(max_workers=1) 池中至多创建max_workers个线程的池来同时异步执行,返回Executor实例
submit(fn,*args,**kwargs) 提交执行的函数及其参数,返回Future实例
shutdown(wait=True) 清理池

Future类

方法 含义
done() 如果调用被成功的取消或者执行完成,返回True
cancelled() 如果调用被成功的取消,返回True
running() 如果正在运行且不能被取消,返回True
cancel() 尝试取消调用,如果已经执行且不能取消返回False,否则返回True
result(timeout=None) 取消返回的结果,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError异常
exception(timeout=None) 取返回的异常,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError异常
#ThreadPoolExecutor例子import threadingimport loggingimport timefrom concurrent import futures#输出定义FORMAT='%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s'logging.basicConfig(level=logging.INFO,format=FORMAT)def worker(n):    logging.info("begin to work{}".format(n))    time.sleep(5)    logging.info("finished {}".format(n))#创建线程池,池的容量为3exect = futures.ThreadPoolExecutor(max_workers=3)fs = []for i in range(3):    future = exect.submit(worker,i)    fs.append(future)for i in range(3,6):    future = exect.submit(worker,i)while True:    time.sleep(2)    logging.info(threading.enumerate())    flag = True    for _ in fs:   #判断是否还有任务未完成        logging.info(_.done())        flag = flag and _.done()    print("="*30)    if flag:        exect.shutdown()  #清理池,池中线程全部杀掉        logging.info(threading.enumerate())        break#线程池一旦创建了,就不需要在频繁清楚结果:2021-06-27 16:06:37,177	 [MainProcess:ThreadPoolExecutor-0_0,20060:    9812] begin to work02021-06-27 16:06:37,177	 [MainProcess:ThreadPoolExecutor-0_1,20060:    2816] begin to work12021-06-27 16:06:37,177	 [MainProcess:ThreadPoolExecutor-0_2,20060:   13268] begin to work22021-06-27 16:06:39,178	 [MainProcess:MainThread,20060:   26076] [<_MainThread(MainThread, started 26076)>, 
,
,
]2021-06-27 16:06:39,178 [MainProcess:MainThread,20060: 26076] False2021-06-27 16:06:39,178 [MainProcess:MainThread,20060: 26076] False2021-06-27 16:06:39,178 [MainProcess:MainThread,20060: 26076] False============================================================2021-06-27 16:06:41,179 [MainProcess:MainThread,20060: 26076] [<_MainThread(MainThread, started 26076)>,
,
,
]2021-06-27 16:06:41,179 [MainProcess:MainThread,20060: 26076] False2021-06-27 16:06:41,179 [MainProcess:MainThread,20060: 26076] False2021-06-27 16:06:41,179 [MainProcess:MainThread,20060: 26076] False2021-06-27 16:06:42,179 [MainProcess:ThreadPoolExecutor-0_2,20060: 13268] finished 22021-06-27 16:06:42,179 [MainProcess:ThreadPoolExecutor-0_2,20060: 13268] begin to work32021-06-27 16:06:42,179 [MainProcess:ThreadPoolExecutor-0_1,20060: 2816] finished 12021-06-27 16:06:42,179 [MainProcess:ThreadPoolExecutor-0_1,20060: 2816] begin to work42021-06-27 16:06:42,179 [MainProcess:ThreadPoolExecutor-0_0,20060: 9812] finished 02021-06-27 16:06:42,179 [MainProcess:ThreadPoolExecutor-0_0,20060: 9812] begin to work52021-06-27 16:06:43,181 [MainProcess:MainThread,20060: 26076] [<_MainThread(MainThread, started 26076)>,
,
,
]2021-06-27 16:06:43,181 [MainProcess:MainThread,20060: 26076] True2021-06-27 16:06:43,181 [MainProcess:MainThread,20060: 26076] True2021-06-27 16:06:43,181 [MainProcess:MainThread,20060: 26076] True==============================2021-06-27 16:06:47,180 [MainProcess:ThreadPoolExecutor-0_0,20060: 9812] finished 52021-06-27 16:06:47,180 [MainProcess:ThreadPoolExecutor-0_2,20060: 13268] finished 32021-06-27 16:06:47,180 [MainProcess:ThreadPoolExecutor-0_1,20060: 2816] finished 42021-06-27 16:06:47,180 [MainProcess:MainThread,20060: 26076] [<_MainThread(MainThread, started 26076)>]

ProcessPoolExecutor对象

方法一样,就是是哦那个多进程

 

#ThreadPoolExecutor例子import threadingimport loggingimport timefrom concurrent import futures#输出定义FORMAT='%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s'logging.basicConfig(level=logging.INFO,format=FORMAT)def worker(n):    logging.info("begin to work{}".format(n))    time.sleep(5)    logging.info("finished {}".format(n))if __name__ == '__main__':    #创建线程池,池的容量为3    exect = futures.ProcessPoolExecutor(max_workers=3)    fs = []    for i in range(3):        future = exect.submit(worker,i)        fs.append(future)    for i in range(3,6):        future = exect.submit(worker,i)    while True:        time.sleep(2)        logging.info(threading.enumerate())     #多进程时看主线程已经没有必要了        flag = True        for _ in fs:   #判断是否还有任务未完成            logging.info(_.done())            flag = flag and _.done()        print("="*30)        if flag:            exect.shutdown()  #清理池,池中线程全部杀掉            logging.info(threading.enumerate())  #多进程时看主线程已经没有必要了            break    #线程池一旦创建了,就不需要在频繁清楚结果:2021-06-27 16:11:18,415	 [SpawnProcess-1:MainThread,25872:   25960] begin to work02021-06-27 16:11:18,421	 [SpawnProcess-2:MainThread,22072:   29224] begin to work12021-06-27 16:11:18,428	 [SpawnProcess-3:MainThread,18108:   20136] begin to work2==============================2021-06-27 16:11:20,345	 [MainProcess:MainThread,23556:   11200] [<_MainThread(MainThread, started 11200)>, 
,
]2021-06-27 16:11:20,345 [MainProcess:MainThread,23556: 11200] False2021-06-27 16:11:20,345 [MainProcess:MainThread,23556: 11200] False2021-06-27 16:11:20,345 [MainProcess:MainThread,23556: 11200] False2021-06-27 16:11:22,346 [MainProcess:MainThread,23556: 11200] [<_MainThread(MainThread, started 11200)>,
,
]2021-06-27 16:11:22,346 [MainProcess:MainThread,23556: 11200] False2021-06-27 16:11:22,346 [MainProcess:MainThread,23556: 11200] False2021-06-27 16:11:22,346 [MainProcess:MainThread,23556: 11200] False==============================2021-06-27 16:11:23,417 [SpawnProcess-1:MainThread,25872: 25960] finished 02021-06-27 16:11:23,417 [SpawnProcess-1:MainThread,25872: 25960] begin to work32021-06-27 16:11:23,422 [SpawnProcess-2:MainThread,22072: 29224] finished 12021-06-27 16:11:23,422 [SpawnProcess-2:MainThread,22072: 29224] begin to work42021-06-27 16:11:23,430 [SpawnProcess-3:MainThread,18108: 20136] finished 22021-06-27 16:11:23,430 [SpawnProcess-3:MainThread,18108: 20136] begin to work52021-06-27 16:11:24,347 [MainProcess:MainThread,23556: 11200] [<_MainThread(MainThread, started 11200)>,
,
]2021-06-27 16:11:24,347 [MainProcess:MainThread,23556: 11200] True2021-06-27 16:11:24,347 [MainProcess:MainThread,23556: 11200] True2021-06-27 16:11:24,347 [MainProcess:MainThread,23556: 11200] True==============================2021-06-27 16:11:28,419 [SpawnProcess-1:MainThread,25872: 25960] finished 32021-06-27 16:11:28,423 [SpawnProcess-2:MainThread,22072: 29224] finished 42021-06-27 16:11:28,431 [SpawnProcess-3:MainThread,18108: 20136] finished 52021-06-27 16:11:28,441 [MainProcess:MainThread,23556: 11200] [<_MainThread(MainThread, started 11200)>]

支持上线文管理

concurrent.futures.ProcessPoolExecutor继承自concurent.futures.base.Execurot,而父类有__enter__、__exit__方法,支持上下文管理。可以使用with语句

__exit__方法本质还是调用的shutdown(wait=True),就是一直阻塞到所有运行的任务完成

使用方法

import threadingfrom concurrent import futureswith futures.ProcessPoolExecutor(max_workers=3) as executor:    future = executor.submit(work,232,1235)    print(future.result())

使用上下文改造上面的例子,增加返回计算的结果

 

#ThreadPoolExecutor例子import threadingimport loggingimport timefrom concurrent import futures#输出定义FORMAT='%(asctime)-15s\t [%(processName)s:%(threadName)s,%(process)d:%(thread)8d] %(message)s'logging.basicConfig(level=logging.INFO,format=FORMAT)def worker(n):    logging.info("begin to work{}".format(n))    time.sleep(5)    logging.info("finished {}".format(n))    return n + 100if __name__ == '__main__':    #创建线程池,池的容量为3    exect = futures.ProcessPoolExecutor(max_workers=3)    with exect:#上下文        fs = []        for i in range(3):            future = exect.submit(worker,i)            fs.append(future)        for i in range(3,6):            future = exect.submit(worker,i)        while True:            time.sleep(2)            logging.info(threading.enumerate())     #多进程时看主线程已经没有必要了            flag = True            for _ in fs:   #判断是否还有任务未完成                logging.info(_.done())                flag = flag and _.done()                if _.done():  #做完了,看下结果                    logging.info("result = {}".format(_.result()))            print("="*30)            if flag:break    # exect.shutdown()  #清理池,池中线程全部杀掉    logging.info(threading.enumerate())  #多进程时看主线程已经没有必要了                        #线程池一旦创建了,就不需要在频繁清楚结果:2021-06-27 16:28:04,265	 [SpawnProcess-1:MainThread,28816:   14260] begin to work02021-06-27 16:28:04,271	 [SpawnProcess-2:MainThread,28308:    9024] begin to work12021-06-27 16:28:04,277	 [SpawnProcess-3:MainThread,20960:    6168] begin to work22021-06-27 16:28:06,192	 [MainProcess:MainThread,1592:    8024] [<_MainThread(MainThread, started 8024)>, 
,
]2021-06-27 16:28:06,192 [MainProcess:MainThread,1592: 8024] False2021-06-27 16:28:06,192 [MainProcess:MainThread,1592: 8024] False2021-06-27 16:28:06,192 [MainProcess:MainThread,1592: 8024] False==============================2021-06-27 16:28:08,194 [MainProcess:MainThread,1592: 8024] [<_MainThread(MainThread, started 8024)>,
,
]2021-06-27 16:28:08,194 [MainProcess:MainThread,1592: 8024] False2021-06-27 16:28:08,194 [MainProcess:MainThread,1592: 8024] False2021-06-27 16:28:08,194 [MainProcess:MainThread,1592: 8024] False==============================2021-06-27 16:28:09,266 [SpawnProcess-1:MainThread,28816: 14260] finished 02021-06-27 16:28:09,266 [SpawnProcess-1:MainThread,28816: 14260] begin to work32021-06-27 16:28:09,272 [SpawnProcess-2:MainThread,28308: 9024] finished 12021-06-27 16:28:09,272 [SpawnProcess-2:MainThread,28308: 9024] begin to work42021-06-27 16:28:09,278 [SpawnProcess-3:MainThread,20960: 6168] finished 22021-06-27 16:28:09,278 [SpawnProcess-3:MainThread,20960: 6168] begin to work52021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] [<_MainThread(MainThread, started 8024)>,
,
]2021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] True2021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] result = 1002021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] True2021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] result = 1012021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] True2021-06-27 16:28:10,196 [MainProcess:MainThread,1592: 8024] result = 102==============================2021-06-27 16:28:14,267 [SpawnProcess-1:MainThread,28816: 14260] finished 32021-06-27 16:28:14,273 [SpawnProcess-2:MainThread,28308: 9024] finished 42021-06-27 16:28:14,279 [SpawnProcess-3:MainThread,20960: 6168] finished 52021-06-27 16:28:14,288 [MainProcess:MainThread,1592: 8024] [<_MainThread(MainThread, started 8024)>]

总结

改库统一线程池、进程池调用,简化了编程;是Python简单的思想哲学的体现;唯一的缺点:无法设置线程名称,但这都是不值一提的

转载地址:http://uoesi.baihongyu.com/

你可能感兴趣的文章
OpenFeign学习(六):OpenFign进行表单提交参数或传输文件
查看>>
OpenFeign学习(七):Spring Cloud OpenFeign的使用
查看>>
Ribbon 学习(二):Spring Cloud Ribbon 加载配置原理
查看>>
Ribbon 学习(三):RestTemplate 请求负载流程解析
查看>>
深入理解HashMap
查看>>
XML生成(一):DOM生成XML
查看>>
XML生成(三):JDOM生成
查看>>
Ubuntu Could not open lock file /var/lib/dpkg/lock - open (13:Permission denied)
查看>>
collect2: ld returned 1 exit status
查看>>
C#入门
查看>>
查找最大值最小值
查看>>
C#中ColorDialog需点两次确定才会退出的问题
查看>>
数据库
查看>>
nginx反代 499 502 bad gateway 和timeout
查看>>
linux虚拟机安装tar.gz版jdk步骤详解
查看>>
python猜拳游戏
查看>>
python实现100以内自然数之和,偶数之和
查看>>
python数字逆序输出及多个print输出在同一行
查看>>
ESP8266 WIFI数传 Pixhaw折腾笔记
查看>>
苏宁产品经理面经
查看>>