当前位置: 首页 > news >正文

Python的并行任务(进程池、线程池)

python的并行任务(进程池、线程池)

在Python中,进程(Process)和线程(Thread)是并发编程的两种主要方式,它们各自适用于不同的场景。了解何时使用进程或线程,可以帮助你更有效地设计并发程序。
使用内置基本库concurrent.futures来实现并发,简单使用这个模块,包括并行线程和并行进程执行器 。

进程(Process)适用场景:

  • CPU密集型任务:当任务主要是计算密集型时,使用进程通常比线程更有效。因为Python的全局解释器锁(GIL)限制了同一时刻只有一个线程可以执行Python字节码。对于CPU密集型任务,使用多进程可以绕过GIL的限制,充分利用多核CPU的计算能力。
  • 需要隔离的应用:如果程序的不同部分需要高度的隔离性(例如,不同的内存空间、文件描述符等),使用进程是更好的选择。进程之间的通信(IPC)虽然比线程间通信(如共享内存)复杂,但提供了更高的安全性。
  • 跨平台兼容性:虽然Python的线程在大多数平台上都能很好地工作,但在某些平台上(特别是Windows),线程的行为可能与预期不同。在这些情况下,使用进程可能是一个更可靠的解决方案。

线程(Thread)适用场景:

  • I/O密集型任务:当任务主要是等待I/O操作(如文件读写、网络请求等)完成时,使用线程是更合适的选择。因为线程之间的切换成本较低,可以更有效地利用等待时间。
  • 需要快速响应的应用:对于需要快速响应用户输入或网络请求的应用,使用线程可以更快地处理这些请求,因为线程之间的切换比进程快得多。
  • 共享数据:如果多个任务需要频繁地访问和修改共享数据,使用线程可能更方便,因为线程可以共享进程的内存空间。然而,这也需要小心处理数据同步和竞争条件的问题。

注意:

  • 全局解释器锁(GIL):Python的GIL限制了同一时刻只有一个线程可以执行Python字节码。这意味着,对于CPU密集型任务,使用多线程可能不会带来性能上的提升。然而,对于I/O密集型任务,多线程仍然可以显著提高程序的效率。
  • 线程和进程的选择:在选择使用线程还是进程时,需要综合考虑任务的性质、系统的资源以及程序的复杂度。对于简单的I/O密集型任务,线程可能是更好的选择;而对于复杂的、需要高度隔离的或CPU密集型任务,进程可能更合适。
  • 并发库:Python提供了多种并发编程的库,如threading(用于线程)、multiprocessing(用于进程)以及concurrent.futures(提供了更高级的并发执行框架)。你可以根据具体需求选择合适的库来实现你的并发程序。

1. 并行线程ThreadPoolExecutor

import concurrent.futures
import numpy as np  
import time  # 定义矩阵计算  
def complex_matrix_operation(n, iterations):  """  执行复杂矩阵运算来测试CPU性能。  参数:  n (int): 矩阵的大小,即n x n的矩阵。  iterations (int): 重复矩阵运算的次数。  返回:  float: 最终矩阵的迹(对角线元素之和)。  """  # 初始化随机矩阵  A = np.random.rand(n, n)  B = np.random.rand(n, n)  # 开始计时  start_time = time.time()  # 执行多次矩阵乘法  result = np.eye(n)  # 初始化为对角线矩阵  for _ in range(iterations): # 矩阵乘积result = np.dot(result, np.dot(A, B))  # 计算并返回迹  trace = np.trace(result)  # 结束计时  end_time = time.time()  print(f"计算完成 {n},耗时: {end_time - start_time:.6f} 秒")  return trace  if __name__ == '__main__':# 调用函数  n = 16  # 最大矩阵大小  iterations = 16  # 迭代次数  # 开始时间start_time = time.time()  # 创建线程池执行器 最多16线程with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:# 提交任务到线程池futures = [executor.submit(complex_matrix_operation, i,iterations) for i in range(n)]# 使用wait函数等待所有任务完成# done, not_done = concurrent.futures.wait(futures, timeout=None)# for future in done:#     result = future.result()#     print('wait:',result)# # 使用as_completed函数按照完成顺序获取结果# for future in concurrent.futures.as_completed(futures):#     result = future.result()#     print('completed:',result)# 结束计时  end_time = time.time()  print(f"计算完成,总共耗时: {end_time - start_time:.6f} 秒")  

执行结果:

计算完成 0,耗时: 0.000437 秒
计算完成 1,耗时: 0.001399 秒
计算完成 2,耗时: 0.001689 秒
计算完成 4,耗时: 0.005548 秒
计算完成 3,耗时: 0.006699 秒
计算完成 6,耗时: 0.000553 秒
计算完成 5,耗时: 0.010176 秒
计算完成 10,耗时: 0.000951 秒
计算完成 9,耗时: 0.000789 秒
计算完成 11,耗时: 0.001857 秒
计算完成 7,耗时: 0.006189 秒
计算完成 15,耗时: 0.004257 秒
计算完成 13,耗时: 0.004155 秒
计算完成 14,耗时: 0.007222 秒
计算完成 12,耗时: 0.006338 秒
计算完成 8,耗时: 0.007463 秒
计算完成,总共耗时: 0.030619

从结果可以看到,计算的顺序是混乱的,非按n的顺序执行。

2. 并行进程ProcessPoolExecutor

import concurrent.futures
import numpy as np  
import time  # 定义矩阵计算  
def complex_matrix_operation(n, iterations):  """  执行复杂矩阵运算来测试CPU性能。  参数:  n (int): 矩阵的大小,即n x n的矩阵。  iterations (int): 重复矩阵运算的次数。  返回:  float: 最终矩阵的迹(对角线元素之和)。  """  # 初始化随机矩阵  A = np.random.rand(n, n)  B = np.random.rand(n, n)  # 开始计时  start_time = time.time()  # 执行多次矩阵乘法  result = np.eye(n)  # 初始化为对角线矩阵  for _ in range(iterations): # 矩阵乘积result = np.dot(result, np.dot(A, B))  # 计算并返回迹  trace = np.trace(result)  # 结束计时  end_time = time.time()  print(f"计算完成 {n},耗时: {end_time - start_time:.6f} 秒")  return trace  if __name__ == '__main__':# 调用函数  n = 16  # 最大矩阵大小  iterations = 16  # 迭代次数  # 开始时间start_time = time.time()  # 创建线程池执行器 最多16进程with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor:# 提交任务到线程池futures = [executor.submit(complex_matrix_operation, i,iterations) for i in range(n)]# 使用wait函数等待所有任务完成# done, not_done = concurrent.futures.wait(futures, timeout=None)# for future in done:#     result = future.result()#     print('wait:',result)# # 使用as_completed函数按照完成顺序获取结果# for future in concurrent.futures.as_completed(futures):#     result = future.result()#     print('completed:',result)# 结束计时  end_time = time.time()  print(f"计算完成,总共耗时: {end_time - start_time:.6f} 秒")  

执行结果:

计算完成 1,耗时: 0.000953 秒计算完成 2,耗时: 0.000691 秒
计算完成 0,耗时: 0.000837 秒计算完成 3,耗时: 0.000993 秒
计算完成 6,耗时: 0.001096 秒计算完成 8,耗时: 0.000336 秒计算完成 7,耗时: 0.000955 秒计算完成 4,耗时: 0.000751 秒计算完成 9,耗时: 0.000916 秒计算完成 13,耗时: 0.000758 秒计算完成 12,耗时: 0.001723 秒计算完成 11,耗时: 0.000999 秒计算完成 10,耗时: 0.001201 秒计算完成 14,耗时: 0.001045 秒计算完成 5,耗时: 0.001212 秒计算完成 15,耗时: 0.000172 秒计算完成,总共耗时: 0.432435

从结果可以看到,计算的顺序是混乱的,非按n的顺序执行。
进程用时比线程更多 ,如果把n调到256 ,差距更大。
ThreadPoolExecutor

计算完成,总共耗时: 4.962590

ProcessPoolExecutor

计算完成,总共耗时: 8.072993

3.concurrent.futures 模块

concurrent.futures模块除了提供ThreadPoolExecutor和ProcessPoolExecutor之外,还提供了一些其他的函数来执行异步任务和处理结果。以下是一些常用的concurrent.futures模块函数:

(1)concurrent.futures.as_completed

concurrent.futures.as_completed(fs, timeout=None):
  • 接收一个可迭代的Future对象集合fs,返回一个生成器,在每个Future对象完成时产生结果。
  • 可选地指定timeout参数,用于限制等待结果的最长时间。

(2)concurrent.futures.wait

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):
  • 接收一个可迭代的Future对象集合fs,等待所有的Future对象完成。
  • 可选地指定timeout参数,用于限制等待结果的最长时间。
  • 可选地指定return_when参数,用于指定何时返回结果,可选值包括FIRST_COMPLETED、FIRST_EXCEPTION和ALL_COMPLETED。

4.Executor 基类

concurrent.futures模块中的Executor是一个抽象基类,用于表示执行器对象。它定义了一些共同的方法和行为,用于管理并发执行的任务。但要通过它的子类调用,而不是直接调用。

Executor类并不直接实例化,而是通过具体的子类如ThreadPoolExecutor和ProcessPoolExecutor来创建实例。

常用的Executor类方法:

(1)submit(fn, *args, **kwargs): 提交一个可调用对象和它的参数给执行器,返回一个Future对象,表示该任务的未来结果。

(2)map(fn, *iterables, timeout=None): 批量提交任务,并按原始迭代器的顺序返回结果。它类似于内置函数map(),但是可以异步地并发执行任务。

(3)shutdown(wait=True): 关闭执行器,不再接受新的任务。如果wait参数为True(默认值),则在所有任务完成后再关闭执行器。

(4)submit_to_executor(fn, executor, *args, **kwargs): 将任务提交给指定的执行器对象,并返回一个Future对象。

(5)map_to_executor(fn, executor, *iterables, timeout=None): 将任务批量提交给指定的执行器对象,并返回结果。

这些方法使得在执行任务时更加方便和灵活。可以根据具体的需求选择合适的方法和执行器类型。

5.Future 对象

concurrent.futures.Future是concurrent.futures模块中的一个类,用于表示一个异步任务的未来结果。它将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建,除非测试,不应直接创建。

以下是concurrent.futures.Future的一些常用方法:

(1)result(): 等待并返回异步任务的结果。如果任务尚未完成,该方法会阻塞直到任务完成并返回结果。

(2)done(): 判断异步任务是否已经完成,返回布尔值。

(3)cancel(): 取消异步任务的执行。如果任务已经开始执行或已经完成,则无法取消。

(4)add_done_callback(fn): 添加一个回调函数,当异步任务完成时会调用该回调函数。

参考文章

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 关于vs2022项目占用空间太大的问题
  • MongoDB未授权访问漏洞
  • 【selenium】文件上传、下载、读取
  • TF卡(SD NAND)参考设计和使用提示
  • Codeforces Round 963 (Div. 2)
  • 【Git企业级开发实战指南①】Git安装、基本操作!
  • 文件加密软件精品推荐(10款不容错过的文件加密软件)
  • 【Unity】 HTFramework框架(五十四)【进阶篇】Deployment 轻量级资源部署管线
  • VUE框架面试整理-Vuex
  • 将一批 csv 的第一行抄进 Excel
  • C++——智能指针
  • springboot山东外事职业大学校园食堂点餐系统-计算机毕业设计源码10417
  • 前端常用的性能优化方案
  • Pandas行列变换指南:数据重塑的艺术
  • LeetCode刷题笔记 | 3 | 无重复字符的最长子串 | 双指针 | 滑动窗口 | 2025兴业银行秋招笔试题 | 哈希集合
  • 3.7、@ResponseBody 和 @RestController
  • Akka系列(七):Actor持久化之Akka persistence
  • css布局,左右固定中间自适应实现
  • docker容器内的网络抓包
  • ECS应用管理最佳实践
  • HashMap剖析之内部结构
  • HomeBrew常规使用教程
  • iOS 颜色设置看我就够了
  • JavaScript 事件——“事件类型”中“HTML5事件”的注意要点
  • js中forEach回调同异步问题
  • Netty 4.1 源代码学习:线程模型
  • Python中eval与exec的使用及区别
  • RxJS 实现摩斯密码(Morse) 【内附脑图】
  • Shell编程
  • Zsh 开发指南(第十四篇 文件读写)
  • 坑!为什么View.startAnimation不起作用?
  • 一起参Ember.js讨论、问答社区。
  • 阿里云服务器如何修改远程端口?
  • ()、[]、{}、(())、[[]]命令替换
  • (13):Silverlight 2 数据与通信之WebRequest
  • (9)STL算法之逆转旋转
  • (el-Date-Picker)操作(不使用 ts):Element-plus 中 DatePicker 组件的使用及输出想要日期格式需求的解决过程
  • (翻译)Quartz官方教程——第一课:Quartz入门
  • (精确度,召回率,真阳性,假阳性)ACC、敏感性、特异性等 ROC指标
  • (每日持续更新)jdk api之FileReader基础、应用、实战
  • (删)Java线程同步实现一:synchronzied和wait()/notify()
  • (五)activiti-modeler 编辑器初步优化
  • (五)关系数据库标准语言SQL
  • (学习日记)2024.02.29:UCOSIII第二节
  • (转)jQuery 基础
  • (转)winform之ListView
  • (转)甲方乙方——赵民谈找工作
  • (转贴)用VML开发工作流设计器 UCML.NET工作流管理系统
  • *_zh_CN.properties 国际化资源文件 struts 防乱码等
  • ./indexer: error while loading shared libraries: libmysqlclient.so.18: cannot open shared object fil
  • .NET Core中如何集成RabbitMQ
  • .net on S60 ---- Net60 1.1发布 支持VS2008以及新的特性
  • .Net Remoting(分离服务程序实现) - Part.3
  • .Net 基于MiniExcel的导入功能接口示例
  • .NET 指南:抽象化实现的基类