当前位置: 首页 > news >正文

python 并发、并行处理、分布式处理

learn from 《Python高性能(第2版)》

文章目录

    • 1. 异步编程
      • 阻塞、回调
      • future
      • 事件循环
    • 2. asyncio 框架
      • 协程
      • yield 接收值
      • asyncio 定义协程
      • 阻塞代码 -> 非阻塞 ThreadPoolExecutor
    • 3. 响应式编程
      • 被观察者
      • 运算符
    • 4. 并行编程
      • 线程
      • 进程
      • 使用多个进程
      • 接口 Executor ,ProcessPoolExecutor
    • 5. 锁
    • 6. 分布式处理
      • dask
      • pyspark
      • mpi4py 科学计算
    • 7. 开发部署
      • travis-ci
      • docker

  • 减少CPU指令:
    加速python可以利用 CPython 获取 C 语言的性能
    Numba 加速 Numpy
    PyPy解释器

  • 减少 IO 等待:
    异步

1. 异步编程

阻塞、回调

import time


def wait_and_print(msg):
    time.sleep(1)  # 阻塞程序执行流
    print(msg)


import threading


def wait_and_print_async(msg):
    def callback():
        print(msg)

    timer = threading.Timer(1, callback)  # 不会阻塞程序执行流程,1秒以后执行 callback 函数
    timer.start() # 启动定时器, 实质:启动了新线程
if __name__ == '__main__':
    t0 = time.time()
    wait_and_print('第一次')
    wait_and_print('第二次')
    print(f'After call, takes: {time.time() - t0} seconds')

输出

第一次
第二次
After call, takes: 2.017909049987793 seconds
	t0 = time.time()
    wait_and_print_async('第一次')
    wait_and_print_async('第二次')
    print(f'After call, takes: {time.time() - t0} seconds')

输出

After call, takes: 0.0020036697387695312 seconds
第二次第一次

把返回结果当参数传递给回调函数

def network_request_async(num, on_done):
    def timer_done():
        on_done({'success': True, 'result': num**2})
    timer = threading.Timer(1, timer_done)
    timer.start()

def on_done(result):
    print(result)

network_request_async(2, on_done)

异步代码需要层层编写回调函数,很麻烦

future

future 更便利,可用来跟踪异步调用的结果

from concurrent.futures import Future
fut = Future()
print(fut)  # <Future at 0x19c799d6280 state=pending>

pending 表示还未确定

可以使用 fut.set_result() 使结果可用

fut.set_result("hello michael")
print(fut, fut.result())
# <Future at 0x2c87b645280 state=finished returned str> hello michael

还可以通过 add_done_callback 指定回调函数,当结果可用时,调用它(第一参数为 future obj)

fut1 = Future()
fut1.add_done_callback(lambda future_obj: print(future_obj.result(), flush=True))
fut1.set_result("hello michael")
#  hello michael
import threading
from concurrent.futures import Future
def network_request_async(number):
    future = Future()
    result = {
        'success': True,
        'result': number**2
    }
    timer = threading.Timer(1, lambda: future.set_result(result))
    timer.start()
    return future

if __name__ == '__main__':
    fut = network_request_async(2)
    print(fut)
    # <Future at 0x24169416cd0 state=pending> 

上面的函数什么也没有返回,还处于 pending

添加回调函数

def fetch_square(number):
    fut = network_request_async(number)
    def on_done_future(future):
        response = future.result()
        if response['success']:
            print(f'result is {response["result"]}')
    fut.add_done_callback(on_done_future)

事件循环

不断监视各种资源的状态,并在事件发生时执行相应的回调函数

事件循环:每个执行单元都不会与其他执行单元同时运行。(能规避同时写一个数据的风险?)

import time


class Timer:
    def __init__(self, timeout):
        self.timeout = timeout
        self.start_time = time.time()

    def done(self):
        return time.time() - self.start_time > self.timeout


if __name__ == '__main__':
    timer = Timer(3)
    while True:
        if timer.done():
            print('Timer finished')
            break

流程不会被阻塞,可以在 while 循环中执行其他操作,通过循环不断轮询等待事件发生称为 busy-waiting

import time


class Timer:
    def __init__(self, timeout):
        self.timeout = timeout
        self.start_time = time.time()

    def done(self):
        return time.time() - self.start_time > self.timeout

    def on_timer_done(self, callback):
        self.callback = callback


if __name__ == '__main__':
    timer = Timer(1)
    timer.on_timer_done(lambda: print('timer done from callback'))
    while True:
        if timer.done():
            timer.callback()
            break
  • 扩展为多个定时器
if __name__ == '__main__':
    timer = Timer(1)
    timer.on_timer_done(lambda: print('timer done from callback'))
    timer1 = Timer(2)
    timer1.on_timer_done(lambda: print('timer1 done from callback'))
    timers = [timer, timer1]

    while True:
        for timer in timers:
            if timer.done():
                timer.callback()
                timers.remove(timer)
        if len(timers) == 0:
            break

2. asyncio 框架

import asyncio
loop = asyncio.get_event_loop() # 获取asyncio循环
def callback():
    print("hello michael")
    loop.stop()

loop.call_later(1, callback) # 1秒后调用回调函数
loop.run_forever() # 启动循环

协程

回调函数很繁琐,协程 像编写同步代码一样,来编写异步代码,更自然优雅(可将协程看做可停止和恢复执行的函数)

使用 yield 定义一个生成器

def range_gen(n):
    i = 0
    while i < n:
        print(f'generating value {i}')
        yield i
        i += 1

range_gen(5)

代码没有执行,只返回一个生成器对象
使用 next(gen) 取结果

gen = range_gen(5)
next(gen) # generating value 0

程序会停在 yield 处,并保持内部状态

yield 接收值

def parrot():
    while True:
        message = yield
        print(f'parrot says: {message}')

generator = parrot()
generator.send(None) # 必须写这句初始化, 否则 
# TypeError: can't send non-None value to a just-started generator
generator.send('hello')
generator.send({'hello': 'world'})
# parrot says: hello
# parrot says: {'hello': 'world'}

生成器可仅在相关资源就绪时才往前推进,不需要使用回调函数

asyncio 定义协程

async def hello():
    await asyncio.sleep(1)  # 等待1 s
    print("hello michael")

coro = hello()
print(coro)  # <coroutine object hello at 0x000001EAE90D08C0>

loop = asyncio.get_event_loop()
loop.run_until_complete(coro) # hello michael
  • await 给事件循环提供了一个断点,等待资源期间,事件循环可继续管理其他协程
async def network_request(number):
    await asyncio.sleep(1)
    return {'success': True, 'result': number**2}

async def fetch_square(number):
    response = await network_request(number)
    if response['success']:
        print(response['result'])

loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_square(5))
  • asyncio.ensure_future() 调度协程和 future
# 以下函数并发执行
asyncio.ensure_future(fetch_square(2)) # 返回一个 Task 实例 (Future的子类),可以await
asyncio.ensure_future(fetch_square(3))
asyncio.ensure_future(fetch_square(4))
loop.run_forever()

阻塞代码 -> 非阻塞 ThreadPoolExecutor

  • 将阻塞代码放在一个独立的线程(OS层级实现的,允许代码并行执行)中运行
import time
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=3)

def wait_and_return(msg):
    time.sleep(1) # 阻塞代码
    return msg

print(executor.submit(wait_and_return, "i am parameters: msg"))
# executor.submit 调度函数,返回 future
# <Future at 0x15788570460 state=running>

或者

import asyncio
loop = asyncio.get_event_loop()
fut = loop.run_in_executor(executor, wait_and_return, "i am parameters: msg")
print(fut)
# <Future pending cb=[_chain_future.<locals>._call_check_cancel() at D:\ProgramData\Anaconda3\envs\cv\lib\asyncio\futures.py:360]>
  • 例子,requests 请求库是 阻塞的
import requests


async def fetch_urls(urls):
    responses = []
    for url in urls:
        responses.append(await loop.run_in_executor(executor, requests.get, url))
    return responses


res = loop.run_until_complete(fetch_urls(["https://www.baidu.com",
                                          "https://www.csdn.net"]))
                                          # 不会并行获取 url
print(res)


def fetch_urls_1(urls):
    return asyncio.gather(*[loop.run_in_executor(executor, requests.get, url) for url in urls])
    # gather 一次性提交所有协程并收集结果

res = loop.run_until_complete(fetch_urls_1(["https://www.baidu.com",
                                          "https://www.csdn.net"]))
                                          # 会并行但受制于 executor worker 数量
print(res)

为避免 executor worker 数量限制,应当使用 非阻塞aiohttp

3. 响应式编程

旨在打造出色的并发系统

  • 响应速度快
  • 伸缩性高,处理各种负载
  • 富有弹性,应对故障
  • 消息驱动,不阻塞

ReactiveX 是一个项目,实现了用于众多语言的响应式编程工具,RxPy 是其中一个库

https://reactivex.io/languages.html

pip install reactivex  # 4.0.4 version

被观察者

import reactivex as rx

obs = rx.from_iterable(range(4))
# Converts an iterable to an observable sequence  (被观察者)
print(obs)
# <reactivex.observable.observable.Observable object at 0x00000216C8F56CD0>
obs.subscribe(print)  # 将数据源 emit 发射的每个值传入 print 函数

被观察者很像一个有序的迭代器

c = [1,2,3,4,5]
iterator = iter(c)
print(next(iterator))
print(next(iterator))
for i in iterator:
    print(i)
  • Observable.subscribe 注册回调函数
c = [1,2,3,0,4,5]
obs = rx.from_iterable(c)
obs.subscribe(on_next=lambda x: print(f'next elem 1/{x}: {1/x}'),
              on_error=lambda x: print(f'error: 1/{x} illegal'),
              on_completed=lambda: print(f'completed calculation'))

输出

next elem 1/1: 1.0
next elem 1/2: 0.5
next elem 1/3: 0.3333333333333333
error: 1/division by zero illegal

Process finished with exit code 0
c = [1,2,3,4,5]
obs = rx.from_iterable(c)
obs.subscribe(on_next=lambda x: print(f'next elem 1/{x}: {1/x}'),
              on_completed=lambda: print(f'completed calculation'))

输出

next elem 1/1: 1.0
next elem 1/2: 0.5
next elem 1/3: 0.3333333333333333
next elem 1/4: 0.25
next elem 1/5: 0.2
completed calculation

RxPy 提供了可用来创建、变换、过滤 被观察者,以及对其进行编组的运算符,这些操作返回 被观察者(可以继续串接、组合,威力所在)

obs = rx.from_iterable(range(5))
obs2 = obs[:3]
obs2.subscribe(print)  # 0 1 2
obs.subscribe(print)  # 0 1 2 3 4

运算符

  • map
from reactivex.operators import map as rx_map
op = rx_map(lambda x: x**2)
(rx.from_iterable(range(5))).pipe(op).subscribe(print)
# 0
# 1
# 4
# 9
# 16
  • group by
from reactivex.operators import group_by as rx_group_by
op = rx_group_by(lambda x: x%3)
obs = (rx.from_iterable(range(10))).pipe(op)
obs.subscribe(lambda x: print(f"group key: {x.key}"))
# group key: 0
# group key: 1
# group key: 2

每个组都是一个 被观察者

obs[0].subscribe(lambda x: x.subscribe(print))
print('-'*10)
obs[1].subscribe(lambda x: x.subscribe(print))
print('-'*10)
obs[2].subscribe(lambda x: x.subscribe(print))
print('-'*10)
0
3
6
9
----------
1
4
7
----------
2
5
8
----------
  • merge_all
from reactivex.operators import merge_all
obs.pipe(merge_all()).subscribe(print)

输出 0 - 9 ,合并了所有 group 且按原顺序输出

4. 并行编程

问题是独立的,或者高度独立的,可以使用多核进行计算

如果子问题之间需要共享数据,实现起来不那么容器,有进程间通信开销的问题

线程

共享内存方式实现并行的一种常见方式是 线程

由于 python 的 全局解释器锁 GIL ,线程执行 python 语句时,获取一个锁,执行完毕后,释放锁
每次只有一个线程能够获得这个锁,其他线程就不能执行 python 语句了

虽然有 GIL 的问题,但是遇到耗时操作(I/O) 时,依然可以使用线程来实现并发

进程

通过使用 进程 可以完全避开 GIL,进程 不共享内存,彼此独立,每个进程都有自己的解释器
进程的缺点:

  • 启动新进程比新线程慢
  • 消耗更多内存
  • 进程间通信速度慢

优点:分布在多台计算机中,可伸缩性更佳

使用多个进程

  • multiprocessing.Process 派生子类
  • 实现 Process.run 编写子进程中要执行的代码,processor_obj.start() 调用
import multiprocessing
import time

class MyProcess(multiprocessing.Process):
    def __init__(self, id):
        super(MyProcess, self).__init__()
        self.id = id
    def run(self):
        time.sleep(1)
        print(f'i am a process with id {self.id}')

if __name__ == '__main__':
    p = MyProcess(1)
    p.start()  # 不能直接调用 run
    p.join() # Wait until child process terminates
    print('end') # 没有 join 的话,会先打印 end
    t0 = time.time()
    processes = [MyProcess(1.1) for _ in range(4)]
    [p.start() for p in processes]
    [p.join() for p in processes]
    print(f'time: {time.time() - t0: .2f} s')

创建4个进程,执行并不需要 4倍的时间

  • 进程执行顺序是无法预测的,取决于操作系统

  • multiprocessing.Pool 类生成一组进程,可使用类方法 apply/apply_async map/map_async 提交任务

import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    inputs = list(range(4))
    out = pool.map(square, inputs) # 对每个元素执行 square 函数
    print(out)
    print('end')
    # [0, 1, 4, 9]
	# end

调用 Pool.map 主程序将 停止执行,直到所有工作进程处理完毕
使用 map_async 立即返回一个 AsyncResult 对象,在后台进行计算,不阻塞主程序,AsyncResult.get() 获取结果
Pool.apply_async 将单个函数任务分配给一个进程,apply_async 使用 函数,函数的参数,作为参数,返回 AsyncResult 对象

import multiprocessing
import time


def square(x):
    time.sleep(5)
    return x * x

if __name__ == '__main__':
    t0 = time.time()
    pool = multiprocessing.Pool(processes=4)
    inputs = list(range(4))
    out = pool.map_async(square, inputs)
    print(out)
    print('end')
    print(f'{time.time() - t0} s')
    get_out = out.get()
    print(get_out)
    print(f'{time.time() - t0} s')
# <multiprocessing.pool.MapResult object at 0x000001F33DE86C70>
# end
# 0.07700085639953613 s
# [0, 1, 4, 9]
# 5.8083672523498535 s
    out = [pool.apply_async(square, (i,)) for i in range(4)] 
    # 传入 int 会报错,argument after * must be an iterable, not int
    # (i, ) 变成元组,可迭代
    get_out = [r.get() for r in out]
    print(get_out)

接口 Executor ,ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor

def square(x):
    return x*x
if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=4)
    fut = executor.submit(square, 2)
    print(fut)  # <Future at 0x25ae0fffe50 state=running>
    print(fut.result()) # 4

    res = executor.map(square, range(4))  # 返回 迭代器
    print(list(res)) # [0, 1, 4, 9]

要从一个或多个 Future 中提取结果,可使用 concurrent.futures.wait concurrent.futures.as_completed

from concurrent.futures import wait, as_completed, ProcessPoolExecutor


def square(x):
    return x * x


if __name__ == '__main__':
    executor = ProcessPoolExecutor()
    fut1 = executor.submit(square, 2)
    fut2 = executor.submit(square, 3)
    wait([fut1, fut2])  # 阻塞程序执行,直到所有 future 执行完
    res = as_completed([fut1, fut2])
    print(res)
    print(list(res))
    out = [f.result() for f in [fut1, fut2]]
    print(out)

# <generator object as_completed at 0x000001B87B480970>
# [<Future at 0x1b87b48fdf0 state=finished returned int>, <Future at 0x1b87b564130 state=finished returned int>]
# [4, 9]

5. 锁

防止多个进程同时执行受保护的代码,例如同时写同一个文件

multiprocessing.Lock()

6. 分布式处理

dask

https://www.dask.org/

pyspark

用户提交任务,集群管理器自动将任务分派给空闲的执行器

mpi4py 科学计算

https://pypi.org/project/mpi4py/

7. 开发部署

travis-ci

https://www.travis-ci.org/

编写 yaml 配置文件,当有新代码push后,自动运行 配置文件中的 测试项

docker

提供隔离环境

相关文章:

  • 推荐一款国人开源的 Redis 可视化管理工具
  • 开发工程师必备————【Day22】前端开发之jQuery更多操作
  • 04【DQL查询】
  • Vscode常用插件
  • 利用MyBatisX插件自动生成代码
  • 【数据结构】——栈和链表的面试题详解
  • 如何从 apt-get 升级中排除特定软件包
  • C++/Python:罗德里格斯旋转矩阵
  • c++征途 --- STL初识
  • 学习编程的第二十三天
  • 上交所技术——2020春招应用开发工程师(Java)笔试
  • 猿创征文|时间序列分析算法之二次指数平滑法和三次指数平滑法详解+Python代码实现
  • 基于人工兔优化算法的函数寻优和工程优化
  • 网络安全无小事, 所有艾思运维人员, 在nginx中必须对thinkphp的目录做以下安全设置, 未尽目录请自行添加
  • Shiro 权限绕过漏洞(CVE-2020-1957)
  • [微信小程序] 使用ES6特性Class后出现编译异常
  • 【399天】跃迁之路——程序员高效学习方法论探索系列(实验阶段156-2018.03.11)...
  • Android Studio:GIT提交项目到远程仓库
  • ES学习笔记(12)--Symbol
  • HomeBrew常规使用教程
  • HTTP--网络协议分层,http历史(二)
  • JavaScript DOM 10 - 滚动
  • js
  • JS笔记四:作用域、变量(函数)提升
  • Mysql数据库的条件查询语句
  • python_bomb----数据类型总结
  • 创建一种深思熟虑的文化
  • 读懂package.json -- 依赖管理
  • 讲清楚之javascript作用域
  • 解析 Webpack中import、require、按需加载的执行过程
  • 老板让我十分钟上手nx-admin
  • 要让cordova项目适配iphoneX + ios11.4,总共要几步?三步
  • 正则表达式
  • 【运维趟坑回忆录】vpc迁移 - 吃螃蟹之路
  • 东超科技获得千万级Pre-A轮融资,投资方为中科创星 ...
  • 教程:使用iPhone相机和openCV来完成3D重建(第一部分) ...
  • 数据可视化之下发图实践
  • !!java web学习笔记(一到五)
  • #Spring-boot高级
  • #鸿蒙生态创新中心#揭幕仪式在深圳湾科技生态园举行
  • (2)(2.10) LTM telemetry
  • (2021|NIPS,扩散,无条件分数估计,条件分数估计)无分类器引导扩散
  • (4)logging(日志模块)
  • (C++)八皇后问题
  • (poj1.3.2)1791(构造法模拟)
  • (保姆级教程)Mysql中索引、触发器、存储过程、存储函数的概念、作用,以及如何使用索引、存储过程,代码操作演示
  • (层次遍历)104. 二叉树的最大深度
  • (六)vue-router+UI组件库
  • (全部习题答案)研究生英语读写教程基础级教师用书PDF|| 研究生英语读写教程提高级教师用书PDF
  • (原創) 如何解决make kernel时『clock skew detected』的warning? (OS) (Linux)
  • .bat批处理(八):各种形式的变量%0、%i、%%i、var、%var%、!var!的含义和区别
  • .NET 3.0 Framework已经被添加到WindowUpdate
  • .net CHARTING图表控件下载地址
  • .net/c# memcached 获取所有缓存键(keys)
  • .net遍历html中全部的中文,ASP.NET中遍历页面的所有button控件