当前位置: 首页 > news >正文

python使用multiprocessing

multiprocessing

multiprocessing是Python标准库中的一个模块,用于实现多进程编程。它提供了一种简单而高效的方式来利用多核处理器的能力,通过在多个进程中同时执行任务,加快程序的执行速度和提高系统的吞吐量。

下面是使用multiprocessing模块的一些常见操作:

  • 创建进程:
    • 使用Process类创建进程对象,指定要执行的函数或方法。
    • 使用Process类的start()方法启动进程。
  • 进程间通信:
    • 使用Queue类实现进程间的队列通信。
    • 使用Pipe类实现进程间的管道通信。
    • 使用共享内存(Value和Array)实现进程间的数据共享。
  • 进程管理:
    • 使用Process类的join()方法等待进程结束。
    • 使用Process类的terminate()方法终止进程。

process

​
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

参数介绍:

  • group默认为None(目前未使用)
  • target代表调用对象,即子进程执行的任务
  • name为进程名称
  • args调用对象的位置参数元组,args=(value1, value2, …)
  • kwargs调用对象的字典,kwargs={key1:value1, key2:value2, …}
  • daemon表示进程是否为守护进程,布尔值

方法介绍:

  • Process.start() 启动进程,并调用子进程中的run()方法
  • Process.run() 进程启动时运行的方法,在自定义时必须要实现该方法
  • Process.terminate() 强制终止进程,不进行清理操作,如果Process创建了子进程,会导致该进程变成僵尸进程
  • Process.join() 阻塞进程使主进程等待该进程终止
  • Process.kill() 与terminate()相同
  • Process.is_alive() 判断进程是否还存活,如果存活,返回True
  • Process.close() 关闭进程对象,并清理资源,如果进程仍在运行则返回错误

multiprocessing.Queue()

multiprocessing.Queue()是multiprocessing模块中的一个类,用于实现进程间通信的队列(Queue)。它提供了一种安全的方式,让多个进程之间可以共享数据。multiprocessing.Queue()类的主要特点包括:

  • 安全性:multiprocessing.Queue()是线程安全的,可以在多个进程中同时使用,而无需担心数据竞争或不一致性问题。

  • 先进先出(FIFO):它遵循先进先出的原则,保证了添加到队列中的元素按照添加的顺序被取出。

  • 阻塞操作:当队列为空时,使用get()方法从队列中获取元素会阻塞进程,直到队列中有可用的元素。当队列满时,使用put()方法向队列中添加元素会阻塞进程,直到队列有空闲空间。

import multiprocessingdef worker(queue):data = queue.get()  # 从队列中获取数据# 处理数据if __name__ == '__main__':queue = multiprocessing.Queue()process = multiprocessing.Process(target=worker, args=(queue,))process.start()queue.put(data)  # 向队列中添加数据process.join()

在上面的示例中,首先创建了一个multiprocessing.Queue()对象,然后将该队列对象作为参数传递给子进程的worker()函数。在子进程中,使用get()方法从队列中获取数据进行处理。在主进程中,使用put()方法向队列中添加数据。通过使用multiprocessing.Queue(),可以让多个进程之间安全地传递数据,实现进程间的通信和协作。这对于并行计算、任务分发和处理等场景非常有用。

拿之前的点点带宽举例

七个节点不重复取两个,C72也就是21组,即21次循环,每次循环sleep5秒,串行就是21x5=105秒,21个线程并行5秒。

import multiprocessing
import time
import randomdef get_oobw_parallel(node_names):results = []for i in range(0, len(node_names) - 1):for j in range(i + 1, len(node_names)):result = get_oobw(node_names[i], node_names[j])results.append(result)return resultsdef get_oobw(node_name1, node_name2):# 执行 get_oobw 的逻辑# ...time.sleep(5)latency, bandwidth = round(random.uniform(100.0, 200.0), 4), round(random.uniform(100.0, 200.0), 4)result = (node_name1, node_name2, latency, bandwidth)return resultstart_time = time.time()node_names = ["cn1", "cn2", "cn3", "cn4", "cn5", "cn6", "cn7"]  # 填入你的节点名称列表results = get_oobw_parallel(node_names)for result in results:node_name1, node_name2, latency, bandwidth = resultprint(node_name1, node_name2, latency, bandwidth)end_time = time.time()
# 计算执行时间
execution_time = end_time - start_time
print("程序执行时间:", execution_time, "秒")

在这里插入图片描述

import multiprocessing
import time
import randomdef get_oobw_parallel(node_names):results = []processes = []result_queue = multiprocessing.Queue()for i in range(0, len(node_names) - 1):for j in range(i + 1, len(node_names)):process = multiprocessing.Process(target=get_oobw, args=(node_names[i], node_names[j], result_queue))process.start()processes.append(process)for process in processes:process.join()while not result_queue.empty():result = result_queue.get()results.append(result)return resultsdef get_oobw(node_name1, node_name2, result_queue):# 执行 get_oobw 的逻辑# ...time.sleep(5)latency, bandwidth = round(random.uniform(100.0, 200.0), 4), round(random.uniform(100.0, 200.0), 4)result = (node_name1, node_name2, latency, bandwidth)result_queue.put(result)# return latency, bandwidthstart_time = time.time()node_names = ["cn1", "cn2", "cn3", "cn4", "cn5", "cn6", "cn7"]  # 填入你的节点名称列表results = get_oobw_parallel(node_names)for result in results:node_name1, node_name2, latency, bandwidth = resultprint(node_name1, node_name2, latency, bandwidth)end_time = time.time()
# 计算执行时间
execution_time = end_time - start_time
print("程序执行时间:", execution_time, "秒")

在这里插入图片描述

相关文章:

  • Spring学习 基础(三)MVC
  • 2024/3/10周报
  • 如何清除keep-alive缓存
  • mongodb的备份与恢复
  • C#与欧姆龙PLC实现CIP通讯
  • Draco点云压缩测试
  • scikit-learn保姆级入门教程
  • Qt 定时器事件
  • Python中,括号内部的for循环(列表推导式)
  • Kubernetes 安全秘籍:5 个你必须知道的知识点
  • 【操作系统学习笔记】文件管理1.9
  • ROS2动作通信的实现
  • 电子数字灯LED管变化数量
  • 计算机找不到api-ms-win-core-path-l1-1-0的5种解决方法
  • 解决ts报错:类型“entry”上不存在属性“$AppTools”
  • Bootstrap JS插件Alert源码分析
  • co.js - 让异步代码同步化
  • conda常用的命令
  • CSS居中完全指南——构建CSS居中决策树
  • Docker入门(二) - Dockerfile
  • IDEA 插件开发入门教程
  • Java 9 被无情抛弃,Java 8 直接升级到 Java 10!!
  • Java的Interrupt与线程中断
  • leetcode讲解--894. All Possible Full Binary Trees
  • markdown编辑器简评
  • node-sass 安装卡在 node scripts/install.js 解决办法
  • SegmentFault 社区上线小程序开发频道,助力小程序开发者生态
  • Vue2 SSR 的优化之旅
  • 表单中readonly的input等标签,禁止光标进入(focus)的几种方式
  • 工作手记之html2canvas使用概述
  • 构建二叉树进行数值数组的去重及优化
  • 基于HAProxy的高性能缓存服务器nuster
  • 前端工程化(Gulp、Webpack)-webpack
  • 体验javascript之美-第五课 匿名函数自执行和闭包是一回事儿吗?
  • 用Node EJS写一个爬虫脚本每天定时给心爱的她发一封暖心邮件
  • ​Java并发新构件之Exchanger
  • # 日期待t_最值得等的SUV奥迪Q9:空间比MPV还大,或搭4.0T,香
  • (003)SlickEdit Unity的补全
  • (1) caustics\
  • (2009.11版)《网络管理员考试 考前冲刺预测卷及考点解析》复习重点
  • (9)YOLO-Pose:使用对象关键点相似性损失增强多人姿态估计的增强版YOLO
  • (八十八)VFL语言初步 - 实现布局
  • (二)linux使用docker容器运行mysql
  • (附源码)php新闻发布平台 毕业设计 141646
  • (学习日记)2024.03.25:UCOSIII第二十二节:系统启动流程详解
  • (转)http协议
  • (转)从零实现3D图像引擎:(8)参数化直线与3D平面函数库
  • ***监测系统的构建(chkrootkit )
  • .bat批处理(六):替换字符串中匹配的子串
  • .NET Core 网络数据采集 -- 使用AngleSharp做html解析
  • .NET NPOI导出Excel详解
  • .NET/C# 编译期间能确定的相同字符串,在运行期间是相同的实例
  • .NET的数据绑定
  • .NET简谈互操作(五:基础知识之Dynamic平台调用)
  • .NET连接数据库方式