当前位置: 首页 > news >正文

python(二十八)

一 multiprocessing模块

1.1 process类

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性:

  daemon:和线程的setDeamon功能一样

  name:进程名字。

  pid:进程号。

复制代码
from multiprocessing import Process
import os
import time
def info(name):


    print("name:",name)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("------------------")
    time.sleep(1)

def foo(name):

    info(name)

if __name__ == '__main__':

    info('main process line')


    p1 = Process(target=info, args=('alvin',))
    p2 = Process(target=foo, args=('egon',))
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("ending")
复制代码

通过tasklist(Win)或者ps -elf |grep(linux)命令检测每一个进程号(PID)对应的进程名

1.2 进程间通讯 

1.2.1 进程对列Queue

复制代码
from multiprocessing import Process, Queue
import queue

def f(q,n):
    #q.put([123, 456, 'hello'])
    q.put(n*n+1)
    print("son process",id(q))

if __name__ == '__main__':
    q = Queue()  #try: q=queue.Queue()
    print("main process",id(q))

    for i in range(3):
        p = Process(target=f, args=(q,i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())
复制代码

1.2.2 管道(pipe)

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

复制代码
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([12, {"name":"yuan"}, 'hello'])
    response=conn.recv()
    print("response",response)
    conn.close()
    if __name__ == '__main__':

    parent_conn, child_conn = Pipe()
   
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    parent_conn.send("儿子你好!")
    p.join()
复制代码

Pipe()返回的两个连接对象代表管道的两端。 每个连接对象都有send()和recv()方法(等等)。 请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。

1.2.3 manager

Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

复制代码
from multiprocessing import Process, Manager

def f(d, l,n):

    d[n] = n
    d["name"] ="alvin"
    l.append(n)

    #print("l",l)

if __name__ == '__main__':

    with Manager() as manager:

        d = manager.dict()

        l = manager.list(range(5))
        p_list = []

        for i in range(10):
            p = Process(target=f, args=(d,l,i))
            p.start()
            p_list.append(p)

        for res in p_list:
            res.join()

        print(d)
        print(l)
复制代码

1.3 进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

复制代码
from multiprocessing import Pool
import time

def foo(args):
 time.sleep(1)
 print(args)

if __name__ == '__main__':
 p = Pool(5)
 for i in range(30):
     p.apply_async(func=foo, args= (i,))

 p.close()   # 等子进程执行完毕后关闭线程池
 # time.sleep(2)
 # p.terminate()  # 立刻关闭线程池
 p.join()
复制代码

进程池内部维护一个进程序列,当使用时,去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有以下几个主要方法:

  1. apply:从进程池里取一个进程并执行
  2. apply_async:apply的异步版本
  3. terminate:立刻关闭线程池
  4. join:主进程等待所有子进程执行完毕,必须在close或terminate之后
  5. close:等待所有进程结束后,才关闭线程池

二 协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

2.1 yield与协程

复制代码
import time

"""
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
"""
# 注意到consumer函数是一个generator(生成器):
# 任何包含yield关键字的函数都会自动成为生成器(generator)对象

def consumer():
    r = ''
    while True:
        # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
        #    yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
        #    当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
        #    就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
        n = yield r
        if not n:
            return
        print('[CONSUMER] ←← Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'
def produce(c):
    # 1、首先调用c.next()启动生成器
    next(c)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] →→ Producing %s...' % n)
        # 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
        cr = c.send(n)
        # 4、produce拿到consumer处理的结果,继续生产下一条消息;
        print('[PRODUCER] Consumer return: %s' % cr)
    # 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
    c.close()
if __name__=='__main__':
    # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
    c = consumer()
    produce(c)
    
    
'''
result:

[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK
'''
复制代码

2.2 greenlet

greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greentlet是python中实现我们所谓的"Coroutine(协程)"的一个基础库.

复制代码
from greenlet import greenlet
 
def test1():
    print (12)
    gr2.switch()
    print (34)
    gr2.switch()
 
def test2():
    print (56)
    gr1.switch()
    print (78)
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
复制代码

2.3 基于greenlet的框架

2.3.1 gevent模块实现协程

Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。

gevent是第三方库,通过greenlet实现协程,其基本思想是:

当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:

复制代码
import gevent
import time

def foo():
    print("running in foo")
    gevent.sleep(2)
    print("switch to foo again")

def bar():
    print("switch to bar")
    gevent.sleep(5)
    print("switch to bar again")

start=time.time()

gevent.joinall(
    [gevent.spawn(foo),
    gevent.spawn(bar)]
)

print(time.time()-start)
复制代码

当然,实际代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时,gevent自动切换,代码如下:

复制代码
from gevent import monkey
monkey.patch_all() import gevent from urllib import request import time def f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) start=time.time() gevent.joinall([ gevent.spawn(f, 'https://itk.org/'), gevent.spawn(f, 'https://www.github.com/'), gevent.spawn(f, 'https://zhihu.com/'), ]) # f('https://itk.org/') # f('https://www.github.com/') # f('https://zhihu.com/') print(time.time()-start)
复制代码

扩展

===========================一


gevent是一个基于协程(coroutine)的Python网络函数库,通过使用greenlet提供了一个在libev事件循环顶部的高级别并发API。

主要特性有以下几点:

<1> 基于libev的快速事件循环,Linux上面的是epoll机制

<2> 基于greenlet的轻量级执行单元

<3> API复用了Python标准库里的内容

<4> 支持SSL的协作式sockets

<5> 可通过线程池或c-ares实现DNS查询

<6> 通过monkey patching功能来使得第三方模块变成协作式

gevent.spawn()方法spawn一些jobs,然后通过gevent.joinall将jobs加入到微线程执行队列中等待其完成,设置超时为2秒。执行后的结果通过检查gevent.Greenlet.value值来收集。


===========================二
1、关于Linux的epoll机制:

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的
增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。epoll的优点:

(1)支持一个进程打开大数目的socket描述符。select的一个进程所打开的FD由FD_SETSIZE的设置来限定,而epoll没有这个限制,它所支持的FD上限是
最大可打开文件的数目,远大于2048。

(2)IO效率不随FD数目增加而线性下降:由于epoll只会对“活跃”的socket进行操作,于是,只有”活跃”的socket才会主动去调用 callback函数,其他
idle状态的socket则不会。

(3)使用mmap加速内核与用户空间的消息传递。epoll是通过内核于用户空间mmap同一块内存实现的。

(4)内核微调。

2、libev机制

提供了指定文件描述符事件发生时调用回调函数的机制。libev是一个事件循环器:向libev注册感兴趣的事件,比如socket可读事件,libev会对所注册的事件
的源进行管理,并在事件发生时触发相应的程序。

===========================三


‘’‘

import gevent

            from gevent import socket

            urls = [‘www.google.com.hk’,’www.example.com’, ‘www.python.org’ ]

            jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]

            gevent.joinall(jobs, timeout=2)

            [job.value for job in jobs]


[‘74.125.128.199’, ‘208.77.188.166’, ‘82.94.164.162’]

            ’‘’

gevent.spawn()方法spawn一些jobs,然后通过gevent.joinall将jobs加入到微线程执行队列中等待其完成,设置超时为2秒。执行后的结果通过检查gevent.Greenlet.value值来收集。gevent.socket.gethostbyname()函数与标准的socket.gethotbyname()有相同的接口,但它不会阻塞整个解释器,因此会使得其他的greenlets跟随着无阻的请求而执行。

Monket patching

Python的运行环境允许我们在运行时修改大部分的对象,包括模块、类甚至函数。虽然这样做会产生“隐式的副作用”,而且出现问题很难调试,但在需要修改Python本身的基础行为时,Monkey patching就派上用场了。Monkey patching能够使得gevent修改标准库里面大部分的阻塞式系统调用,包括socket,ssl,threading和select等模块,而变成协作式运行。



from gevent import monkey ;

monkey . patch_socket ()

import urllib2



通过monkey.patch_socket()方法,urllib2模块可以使用在多微线程环境,达到与gevent共同工作的目的。

事件循环

不像其他网络库,gevent和eventlet类似, 在一个greenlet中隐式开始事件循环。没有必须调用run()或dispatch()的反应器(reactor),在twisted中是有 reactor的。当gevent的API函数想阻塞时,它获得Hub实例(执行时间循环的greenlet),并切换过去。如果没有集线器实例则会动态 创建。

libev提供的事件循环默认使用系统最快轮询机制,设置LIBEV_FLAGS环境变量可指定轮询机制。LIBEV_FLAGS=1为select, LIBEV_FLAGS = 2为poll, LIBEV_FLAGS = 4为epoll,LIBEV_FLAGS = 8为kqueue。

Libev的API位于gevent.core下。注意libev API的回调在Hub的greenlet运行,因此使用同步greenlet的API。可以使用spawn()和Event.set()等异步API。
View Code

eventlet实现协程

eventlet 是基于 greenlet 实现的面向网络应用的并发处理框架,提供“线程”池、队列等与其他 Python 线程、进程模型非常相似的 api,并且提供了对 Python 发行版自带库及其他模块的超轻量并发适应性调整方法,比直接使用 greenlet 要方便得多。

其基本原理是调整 Python 的 socket 调用,当发生阻塞时则切换到其他 greenlet 执行,这样来保证资源的有效利用。需要注意的是:
eventlet 提供的函数只能对 Python 代码中的 socket 调用进行处理,而不能对模块的 C 语言部分的 socket 调用进行修改。对后者这类模块,仍然需要把调用模块的代码封装在 Python 标准线程调用中,之后利用 eventlet 提供的适配器实现 eventlet 与标准线程之间的协作。
虽然 eventlet 把 api 封装成了非常类似标准线程库的形式,但两者的实际并发执行流程仍然有明显区别。在没有出现 I/O 阻塞时,除非显式声明,否则当前正在执行的 eventlet 永远不会把 cpu 交给其他的 eventlet,而标准线程则是无论是否出现阻塞,总是由所有线程一起争夺运行资源。所有 eventlet 对 I/O 阻塞无关的大运算量耗时操作基本没有什么帮助。

总结:

协程的好处:

无需线程上下文切换的开销
无需原子操作锁定及同步的开销
方便切换控制流,简化编程模型
高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:

无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

转载于:https://www.cnblogs.com/guozhenle/p/7210701.html

相关文章:

  • 页面查找技巧
  • SqlServer索引的原理与应用
  • Spring+SpringMVC+mybatis整合以及注解的使用(三)
  • vs2010 javascript代码折叠扩展插件
  • css定位
  • 最简单的手工写增,删,改,查的代码
  • Lucene 4.0 TieredMergePolicy
  • 关于表单修改
  • Ubuntu安装源配置文件/etc/apt/sources.list第X行有错误
  • 原型和原型链
  • Linux(ubuntu)下如何安装与卸载软件总结
  • 前端调试的一点新理解
  • Linux(ubuntu)下搭建Java开发环境 - 安装JDK
  • POJ 2585 Window Pains 拓扑排序
  • 【Android Dev Guide - 03】 - Content Providers
  • 【Leetcode】104. 二叉树的最大深度
  • Apache的基本使用
  • ES6简单总结(搭配简单的讲解和小案例)
  • iOS 颜色设置看我就够了
  • Js基础知识(一) - 变量
  • Laravel 中的一个后期静态绑定
  • oschina
  • php ci框架整合银盛支付
  • Python 反序列化安全问题(二)
  • Sequelize 中文文档 v4 - Getting started - 入门
  • TiDB 源码阅读系列文章(十)Chunk 和执行框架简介
  • vue中实现单选
  • 从零开始的无人驾驶 1
  • 海量大数据大屏分析展示一步到位:DataWorks数据服务+MaxCompute Lightning对接DataV最佳实践...
  • 前端每日实战:70# 视频演示如何用纯 CSS 创作一只徘徊的果冻怪兽
  • 深入浅出webpack学习(1)--核心概念
  • 突破自己的技术思维
  • 验证码识别技术——15分钟带你突破各种复杂不定长验证码
  • 一些关于Rust在2019年的思考
  • 3月27日云栖精选夜读 | 从 “城市大脑”实践,瞭望未来城市源起 ...
  • 曜石科技宣布获得千万级天使轮投资,全方面布局电竞产业链 ...
  • ​LeetCode解法汇总307. 区域和检索 - 数组可修改
  • ###51单片机学习(1)-----单片机烧录软件的使用,以及如何建立一个工程项目
  • #我与Java虚拟机的故事#连载17:我的Java技术水平有了一个本质的提升
  • $Django python中使用redis, django中使用(封装了),redis开启事务(管道)
  • $redis-setphp_redis Set命令,php操作Redis Set函数介绍
  • (04)odoo视图操作
  • (07)Hive——窗口函数详解
  • (10)工业界推荐系统-小红书推荐场景及内部实践【排序模型的特征】
  • (MATLAB)第五章-矩阵运算
  • (第8天)保姆级 PL/SQL Developer 安装与配置
  • (考研湖科大教书匠计算机网络)第一章概述-第五节1:计算机网络体系结构之分层思想和举例
  • (十七)devops持续集成开发——使用jenkins流水线pipeline方式发布一个微服务项目
  • (未解决)macOS matplotlib 中文是方框
  • (小白学Java)Java简介和基本配置
  • (转) SpringBoot:使用spring-boot-devtools进行热部署以及不生效的问题解决
  • (转)Mysql的优化设置
  • (轉)JSON.stringify 语法实例讲解
  • (轉貼) 資訊相關科系畢業的學生,未來會是什麼樣子?(Misc)
  • 、写入Shellcode到注册表上线