当前位置: 首页 > news >正文

Python升级之路( Lv15 ) 并发编程三剑客: 进程, 线程与协程

Python系列文章目录

第一章 Python 入门
第二章 Python基本概念
第三章 序列
第四章 控制语句
第五章 函数
第六章 面向对象基础
第七章 面向对象深入
第八章 异常机制
第九章 文件操作
第十章 模块
第十一章 GUI图形界面编程
第十二章 pygame游戏开发基础
第十三章 pyinstaller 使用详解
第十四章 并发编程初识
第十五章 并发编程三剑客-进程, 线程与协程

进程, 线程与协程

  • Python系列文章目录
  • 进程
    • 创建方式
      • 类包装
      • 方法包装
    • 进程间通信方式
      • Queue队列
      • Pipe管道
    • Manager管理器
    • 进程池(Pool)
      • 使用with管理进程池
  • 线程
    • 创建方式
      • 方法包装
      • 类包装
    • Join
    • 守护线程
    • Event 事件
    • 线程锁
      • 全局锁GIL问题
      • 线程同步和互斥锁
        • 线程同步
        • 互斥锁
      • Semaphore信号量
      • 死锁
    • 生产者和消费者模式
  • 协程
    • 协程与线程的比较
    • asyncio实现协程(重点)

进程

书写上文, 我们可以知道: 进程是资源(CPU、内存等)分配的基本单位,它是程序执行时的一个实例.
程序运行时系统就会创建一个进程,并为它分配资源,然后把该进程放入进程就绪队列,进程调度器选中它的时候就会为它分配CPU时间,程序开始真正运行. 进程切换需要的资源很最大,效率低。

创建方式

进程创建有两种方式: 方法包装和类包装. 熟悉Java的人可能会发现, 类包装更符合我们原来的书写习惯
创建进程后, 我们使用start() 来启动进程

类包装

主要步骤:

  • 定义一个进程类, 并修改初始化构造, 改为有参构造
  • 创建进程时, 传入初始化方法中添加的参数即可

实操代码

import time
from multiprocessing import Process


class MyProcess(Process):
    """进程的创建方式: 2.类包装"""
    def __init__(self, name):
        Process.__init__(self)
        self.name = name

    def run(self):
        print(f"进程{self.name} 启动")
        time.sleep(3)
        print(f"进程{self.name} 结束")


if __name__ == "__main__":
    print("创建进程")
    p1 = MyProcess("p1")
    p2 = MyProcess("p2")
    p1.start()
    p2.start()


执行结果
在这里插入图片描述

方法包装

主要步骤:

  • 在创建进程时: 已默认值参数的方式声明目标函数, 以及传入目标函数的参数(元组的方式)

实操代码

import os
import time
from multiprocessing import Process


def function(name):
    """进程的创建方式: 1.方法包装"""
    print("当前进程ID:", os.getpid())
    print("父进程ID", os.getppid())
    print(f"Process:{name} start")
    time.sleep(3)
    print(f"Process:{name} end")


if __name__ == "__main__":
    print("当前main进程ID: ", os.getppid())
    # 创建进程
    p1 = Process(target=function, args=("p1",))
    p2 = Process(target=function, args=("p2",))
    p1.start()
    p2.start()

执行结果

多说一句

元组中如果只有一个元素, 是需要加逗号的!!! 这是因为括号( )既可以表示tuple,又可以表示数学公式中的小括号,
所以如果没有加逗号,那你里面放什么类型的数据那么类型就会是什么.

进程间通信方式

进程间通信有两种方式: Queue队列和Pipe管道方式

Queue队列

  • 要实现进程间通信,需要使用 multiprocessing 模块中的 Queue 类
  • 进程间通信的方式,就是使用了操作系统给开辟的一个队列空间,各个进程可以把数据放到该队列中,当然也可以从队列中把自己需要的信息取走

实现核心:

  • 这里利用类包装的方式, 并且添加了一个参数mq
  • 主函数声明一个Queue队列, 放入需要通信的消息
  • 在需要调用时, 利用mq,get 获取当前进程所传入的值

实操代码

from multiprocessing import Process, Queue


class MyProcess(Process):
    def __init__(self, name, mq):
        Process.__init__(self)
        self.name = name
        self.mq = mq

    def run(self):
        print("Process {} started".format(self.name))
        print("===Queue", self.mq.get(), "===")
        self.mq.put(self.name)
        print("Process {} end".format(self.name))


if __name__ == "__main__":
    # 创建进程列表
    t_list = []
    mq = Queue()
    mq.put("1")
    mq.put("2")
    mq.put("3")
    # 利用range序列重复创建进程
    for i in range(3):
        t = MyProcess("p{}".format(i), mq)
        t.start()
        t_list.append(t)
    # 等待进程结束
    for t in t_list:
        t.join()
    print(mq.get())
    print(mq.get())
    print(mq.get())
    

执行结果
在这里插入图片描述

Pipe管道

Pipe 直译过来的意思是“管”或“管道”,和实际生活中的管(管道)是非常类似的.
Pipe方法返回(conn1, conn2)代表一个管道的两个端.

在这里插入图片描述

  • Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个参数是全双工模式,也就是说conn1和conn2均可收发
  • 若duplex为False,conn1只负责接收消息,conn2只负责发送消息. send和recv方法分别是发送和接受消息的方法.
    例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息.
  • 如果没有消息可接收,recv方法会一直阻塞. 如果管道已经被关闭,那么recv方法会抛出EOFError

实现核心

  • 主函数声明管道的两端 conn1, conn2 = multiprocessing.Pipe()
  • 以方法包装方式创建进程后, 在对应方法中调用管道的两端调用消息收发的方法 conn1.send/conn1.recv

实操代码

import multiprocessing
import time


def fun1(conn1):
    """
    管道结构
    进程<==>conn1(管道头)==pipe==conn2(管道尾)<==>进程2
    """
    sub_info = "进程向conn1发送消息, 管道另一头conn2 可以接收到消息"
    print(f"进程1--{multiprocessing.current_process().pid}发送数据:{sub_info}")
    time.sleep(1)
    conn1.send(sub_info)        # 调用conn1.send发送消息, 发送的消息会被管道的另一头接收
    print(f"conn1接收消息:{conn1.recv()}")    # conn1.recv接收消息, 如果没有消息可接收, recv方法会一直阻塞. 如果管道已经被关闭,那么recv方法会抛出EOFError
    time.sleep(1)


def fun2(conn2):
    sub_info = "进程向conn2发送消息, 管道另一头conn1 可以接收到消息"
    print(f"进程2--{multiprocessing.current_process().pid}发送数据:{sub_info}")
    time.sleep(1)
    conn2.send(sub_info)
    print(f"conn2接收消息:{conn2.recv()}")
    time.sleep(1)


if __name__ == "__main__":
    # 创建管道
    # Pipe方法返回(conn1, conn2)代表一个管道的两个端.如果conn1带表头, conn2代表尾, conn1发送的消息只会被conn2接收, 同理conn2发送的消息也只会被conn1接收
    conn1, conn2 = multiprocessing.Pipe()
    # 创建子进程
    # Python中,圆括号意味着调用函数. 在没有圆括号的情况下,Python会把函数当做普通对象
    process1 = multiprocessing.Process(target=fun1, args=(conn1,))
    process2 = multiprocessing.Process(target=fun2, args=(conn2,))
    # 启动子进程
    process1.start()
    process2.start()


Manager管理器

管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享

实现核心

  • 创建进程
  • 利用Manager创建字典, 列表等对象, 传入进程
  • 在各进程所对应的方法中修改上面创建的对象

实操代码

from multiprocessing import Manager, Process


def func1(name,m_list,m_dict):
     m_dict['area'] = '罗布泊'
     m_list.append('钱三强')


def func2(name, m_list, m_dict):
    m_dict['work'] = '造核弹'
    m_list.append('邓稼先')


if __name__ == "__main__":
    with Manager() as mgr:
        m_list = mgr.list()
        m_dict = mgr.dict()
        m_list.append("钱学森")
        # 两个进程不能直接互相使用对象,需要互相传递
        p1 = Process(target=func1, args=('p1', m_list, m_dict))
        p1.start()
        p1.join()   # 等p1进程结束,主进程继续执行
        print(m_list)
        print(m_dict)
        p2 = Process(target=func2, args=('p1', m_list, m_dict))
        p2.start()
        p2.join()   # 等p2进程结束,主进程继续执行
        print(m_list)
        print(m_dict)


执行结果
太过机密, 不予展示

进程池(Pool)

进程池可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;
反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行
使用进程池的好处就是可以节约内存空间, 提高资源利用率

进程池相关方法

类/方法功能参数
Pool(processes)创建进程池对象processes表示进程池中有多少进程
pool.apply_async(func,args,kwds)异步执行;将事件放入到进程池队列func 事件函数 args 以元组形式给func传参kwds 以字典形式给func传参返回值:返回一个代表进程池事件的对象,通过返回值的get方法可以得到事件函数的返回值
pool.apply(func,args,kwds)同步执行;将事件放入到进程池队列func 事件函数 args 以元组形式给func传参kwds 以字典形式给func传参
pool.close()关闭进程池
pool.join()关闭进程池
pool.map(func,iter)类似于python的map函数,将要做的事件放入进程池func 要执行的函数 iter 迭代对象

实现核心

  • 创建和初始化进程池
  • 以方法包装的方式传入相关参数, 并调用相关api

实操代码

from multiprocessing import Pool
import os
from time import sleep


def func1(name):
    print(f"方法1输出: 当前进程的ID:{os.getpid()},{name}")
    sleep(2)
    return name


def func2(args):
    print("方法2输出: ", args)


if __name__ == "__main__":
    pool = Pool(5)
    pool.apply_async(func=func1, args=('进程1',), callback=func2)
    pool.apply_async(func=func1, args=('进程2',), callback=func2)
    pool.apply_async(func=func1, args=('进程3',), callback=func2)
    pool.apply_async(func=func1, args=('进程4',))
    pool.apply_async(func=func1, args=('进程5',))
    pool.apply_async(func=func1, args=('进程6',))
    pool.apply_async(func=func1, args=('进程7',))
    pool.close()
    pool.join()

执行结果

在这里插入图片描述

使用with管理进程池

使用with 方法, 可以进行优雅的进行资源管理. 在这里是可以帮助我们优雅的关闭线程池

关于with方法

with所求值的对象必须有一个enter()方法,一个exit()方法.
紧跟with后面的语句被求值后,返回对象的__enter__()方法被调用,
这个方法的返回值将被赋值给as后面的变量。当with后面的代码块全部被执行完之后,将调用前面返回对象的exit()方法

实操代码

from multiprocessing import Pool
import os
from time import sleep


def func1(name):
    print(f"方法1输出: 当前进程的ID:{os.getpid()},{name}")
    sleep(2)
    return name


if __name__ == "__main__":
    with Pool(5) as pool:
        args = pool.map(func1, ('进程1,', '进程2,', '进程3,', '进程4,', '进程5,', '进程6,', '进程7,', '进程8,'))
    for a in args:
        print(a)

执行结果

在这里插入图片描述

线程

线程是程序执行时的最小单位,也是CPU调度和分派的基本单位.
一个进程可以由很多个线程组成,每个线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度
线程由CPU独立调度执行,在多CPU环境下就允许多个线程同时运行. 同样多线程也可以实现并发操作,每个请求分配一个线程来处理.

创建方式

Python的标准库提供了两个模块: _threadthreading_thread 是低级模块, threading 是高级模块,
是对 _thread 进行了封装。多数情况下,我们只需要使用 threading 这个高级模块.
而线程的创建的方式有两种: 一种是方法包装, 一种是类包装

方法包装

主要步骤:

  • 在创建进程时: 已默认值参数的方式声明目标函数, 以及传入目标函数的参数(元组的方式)

实操代码

from threading import Thread
from time import sleep


def func1(name):
    """线程创建方式(方法包装)"""
    for i in range(3):
        print(f"thread:{name} : {i}")
        sleep(1)


if __name__ == "__main__":
    """新建两个线程, 每个线程每隔1秒打印一次消息, 总共打印3次"""
    print("主线程, start")
    # 创建线程
    t1 = Thread(target=func1, args=("t1",))
    t2 = Thread(target=func1, args=("t2",))
    # 启动线程
    t1.start()
    t2.start()
    print("主线程, end")
"""
注意: 运行结果可能会出现换行问题,是因为多个线程抢夺控制台输出的IO流
"""

结果展示
在这里插入图片描述

类包装

主要步骤

  • 定义一个线程类, 并修改初始化构造, 改为有参构造
  • 创建线程时, 传入初始化方法中添加的参数即可

实操代码

from threading import Thread
from time import sleep


class MyThread(Thread):
    """
    类的方式创建线程(类包装)
    """

    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        for i in range(3):
            print(f"thread : {self.name} : {i}")
            sleep(1)


if __name__ == "__main__":
    """新建两个线程, 每个线程每隔1秒打印一次消息, 总共打印3次"""
    print("主线程, start")
    # 创建线程(类创建的方式)
    t1 = MyThread("t1")
    t2 = MyThread("t2")
    # 启动线程
    t1.start()
    t2.start()
    print("主线程, end")

结果展示
从结果可以看出, 启动线程对应的方法是异步的
在这里插入图片描述

Join

由上图可知,主线程不会等待子线程结束(异步).
如果需要等待子线程结束后,再结束主线程,可使用join()方法

实操代码

if __name__ == "__main__":
    """新建两个线程, 每个线程每隔1秒打印一次消息, 总共打印3次"""
    print("主线程, start")
    # 创建线程(类创建的方式)
    t1 = MyThread("t1")
    t2 = MyThread("t2")
    # 启动线程
    t1.start()
    t2.start()
    # 主线程等t1, t2结束之后, 再往下执行
    t1.join()
    t2.join()
    print("主线程, end")
    

守护线程

在行为上还有一种叫守护线程,主要的特征是在它的生命周期中主线程死亡,它也就随之死亡.
在python中,线程通过 setDaemon(True|False)来设置是否为守护线程.

实操代码

from threading import Thread
from time import sleep


class MyThread(Thread):
    """守护线程"""
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        for i in range(3):
            print(f"thread : {self.name} : {i}")
            sleep(1)


if __name__ == "__main__":
    print("主线程, start")
    # 创建线程(类创建的方式)
    t1 = MyThread("t1")
    # 设置为守护线程
    # t1.setDaemon(True)  # 该方法3.10后被废弃, 可以直接按照下面的方法设置
    t1.daemon = True
    # 启动线程
    t1.start()
    print("主线程, end")

执行结果
从结果可以看出, 主线程不会等待子线程结束(也就是执行循环体的后续循环),
因为子线程被设置成守护线程, 因此主线程执行完毕后子线程就会停止执行
在这里插入图片描述

Event 事件

事件Event主要用于唤醒正在阻塞等待状态的线程

注意:

  • Event 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生.
    在初始情况下,event 对象中的信号标志被设置假.
  • 如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真.
    一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待个 event 对象的线程
  • 如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行

Event() 可以创建一个事件管理标志,该标志(event)默认为False,event对象主要有四种方法可以调用:

方法名说明
event.wait(timeout=None)调用该方法的线程会被阻塞,如果设置了timeout参数,超时后,线程会停止阻塞继续执行;
event.set()将event的标志设置为True,调用wait方法的所有线程将被唤醒
event.clear()将event的标志设置为False,调用wait方法的所有线程将被阻塞
event.is_set()判断event的标志是否为True

实操代码

import time
import threading


def chihuoguo(name):
    """利用事件实现就餐, 吃饭"""
    # 等待时间, 进入阻塞等待状态
    print(f'{name} 已经到餐厅')
    print(f'小伙伴{name} 已经进入就餐状态!')
    event.wait()
    print(f'{name} 收到通知了.')
    print(f'小伙伴{name} 开始吃咯!')


if __name__ == "__main__":
    event = threading.Event()
    # 创建新线程
    threa1 = threading.Thread(target=chihuoguo, args=("玛奇玛",))
    threa2 = threading.Thread(target=chihuoguo, args=("电次",))
    # 开启线程
    threa1.start()
    threa2.start()
    time.sleep(10)
    # 发送时间通知
    print('---->>>主线程(波吉塔)通知小伙伴开吃咯!')
    event.set()

执行结果

  1. 可以看到, 启动两个线程后, 在调用 event.wait() 之前的代码段都能够正常执行,
    但只有事件标识被设置为 true 时event.set(), event.wait()之后的代码才会被执行

在这里插入图片描述

  1. 可以看到, 我们将 event.wait() 这段代码注释后, 由于主线程没有变更事件的标识, 即事件标识一直为false
    因此程序会一直阻塞直至该标识为true
    在这里插入图片描述

线程锁

在python中,无论你有多少核,在Cpython解释器中永远都是假象.
无论你是4核,8核,还是16核,同一时间执行的线程只有一个线程.
这个是python开发设计的一个缺陷,所以说python中的线程是“含有水分的线程”.

全局锁GIL问题

全局锁, 又称GIL, 全称是: Global Interpreter Lock. 他是用来保证同一时刻只有一个线程在运行.

在这里插入图片描述

注意:

  • GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念,同样一段代码可以通过PyPy,Psyco等除CPython外, 不同的Python执行环境来执行,就没有GIL的问题。
  • 因为CPython是大部分环境下默认的Python执行环境. 所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷

线程同步和互斥锁

线程同步

处理多线程问题时,多个线程访问同一个对象,并且某些线程还想修改这个对象. 这时候就需要用到“线程同步”.
线程同步其实就是一种等待机制,多个需要同时访问此对象的线程进入这个对象的等待池形成队列,等待前面的线程使用完毕后,下一个线程再使用

实操代码

  1. 多线程操作同一个对象(未使用线程同步)
from threading import Thread
from time import sleep


class Account:
    def __init__(self, money, name):
        self.money = money
        self.name = name


# 模拟提款操作
class Drawing(Thread):
    def __init__(self, drawingNum, account):
        Thread.__init__(self)
        self.drawingNum = drawingNum
        self.account = account
        self.expenseTotal = 0

    def run(self):
        if self.account.money - self.drawingNum < 0:
            return
        sleep(1)  # 判断完后阻塞。其他线程开始运行
        self.account.money -= self.drawingNum
        self.expenseTotal += self.drawingNum
        print(f"账户:{self.account.name},余额是:{self.account.money}")
        print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")


if __name__ == "__main__":
    a1 = Account(100, "timepause")
    # 定义取钱线程对象
    draw1 = Drawing(80, a1)
    # 定义取钱线程对象
    draw2 = Drawing(80, a1)
    draw1.start()
    draw2.start()
    

执行结果
可以看出, 没有线程同步机制,两个线程同时操作同一个账户对象,可以从只有100元的账户,轻松取出80*2=160元,账户余额变为-60. 但银行一般不会同意用户账户为负的.
在这里插入图片描述

  1. 我们可以通过“锁机制”来实现线程同步问题,锁机制有如下几个要点:
  • 必须使用同一个锁对象
  • 互斥锁的作用就是保证同一时刻只能有一个线程去操作共享数据,保证共享数据不会出现错误问题
  • 使用互斥锁的好处确保某段关键代码只能由一个线程从头到尾完整地去执行
  • 使用互斥锁会影响代码的执行效率
  • 同时持有多把锁,容易出现死锁的情况

互斥锁

互斥锁: 对共享数据进行锁定,保证同一时刻只能有一个线程去操作

注意:

  • 互斥锁是多个线程一起去抢,抢到锁的线程先执行,没有抢到锁的线程需要等待,等互斥锁使用完释放后,其它等待的线程再去抢这个锁
  • threading 模块中定义了 Lock 变量,这个变量本质上是一个函数,通过调用这个函数可以获取一把互斥锁

实现步骤

  • 创建互斥锁
  • 在需要加锁的代码段上面使用 lock1.acquire()获得锁之后进行加锁, 然后调用 lock1.release() 释放锁

实操代码

from threading import Thread, Lock
from time import sleep


class Account:
    def __init__(self, money, name):
        self.money = money
        self.name = name


# 模拟提款操作
class Drawing(Thread):
    def __init__(self, drawingNum, account):
        Thread.__init__(self)
        self.drawingNum = drawingNum
        self.account = account
        self.expenseTotal = 0

    def run(self):
        # 获得锁
        lock1.acquire()
        if self.account.money - self.drawingNum < 0:
            return
        sleep(1)  # 判断完后阻塞。其他线程开始运行
        self.account.money -= self.drawingNum
        self.expenseTotal += self.drawingNum
        # 释放锁
        lock1.release()
        print(f"账户:{self.account.name},余额是:{self.account.money}")
        print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")


if __name__ == "__main__":
    a1 = Account(100, "timepause")
    # 创建互斥锁
    lock1 = Lock()
    # 定义取钱线程对象
    draw1 = Drawing(80, a1)
    # 定义取钱线程对象
    draw2 = Drawing(80, a1)
    draw1.start()
    draw2.start()

执行结果
可以看到, 加锁之后, 解决了因线程未同步引起的结果紊乱的问题

在这里插入图片描述

Semaphore信号量

互斥锁使用后,一个资源同时只有一个线程访问。如果某个资源,我们同时想让N个(指定数值)线程访问?
这时候可以使用信号量. 信号量控制同时访问资源的数量.
信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过

应用场景

  • 在读写文件的时候,一般只能只有一个线程在写,而读可以有多个线程同时进行,如果需要限制同时读文件的线程个数,这时候就可以用到信号量了(如果用互斥锁,就是限制同一时刻只能有一个线程读取文件).
  • 在做爬虫抓取数据时
  • 生活中, 在上厕所时, 需要限制同时使用的人数, 因为厕所的坑位是固定的, 超过坑位数的人去使用厕所需要排队.

实操代码

from threading import Lock, Semaphore, Thread
from time import sleep


def home(name, se):
    """
    信号量控制同时访问资源的数量。信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过
    """
    # 拿到信号量
    se.acquire()
    print(f"{name}进入了厕所")
    sleep(3)
    print(f'******************{name}走出了厕所')
    # 归还信号量
    se.release()


if __name__ == "__main__":
    """
    一个房间一次只允许两个人使用: 若不使用信号量,会造成所有人都进入这个房子; 若只允许一人通过可以用锁Lock()
    """
    # 创建信号量的对象,有两个坑位
    se = Semaphore(2)
    print("欢迎光临本厕所, 目前只有两个坑位(联系商务即可扩增~~~)")
    for i in range(7):
        p = Thread(target=home, args=(f'tom{i}', se))  # 前面加f的作用是令{}里面的变量引用可以生效
        p.start()

执行结果
可以看到, 在使用信号量之后, 每次只能有两个人使用厕所. 在释放信号量之后, 后面的人可以继续使用厕所
在这里插入图片描述

死锁

在多线程程序中,死锁问题很大一部分是由于一个线程同时获取多个锁造成的

实操代码

from threading import Lock, Thread
from time import sleep


def run1():
    # 获取锁
    lock1.acquire()
    print("幸平创真拿到菜刀")
    # lock1.release()
    # print("幸平创真释放菜刀")
    lock2.acquire()
    print("幸平创真拿到锅")
    # lock2.release()
    # print("幸平创真释放锅")



def run2():
    lock2.acquire()
    print("卫宫士郎拿到锅")
    # lock2.release()
    # print("卫宫士郎释放锅")
    lock1.acquire()
    print("卫宫士郎拿到菜刀")
    # lock1.release()
    # print("卫宫士郎释放菜刀")



if __name__ == "__main__":
    """
    死锁是由于“同步块需要同时持有多个锁造成”的,要解决这个问题思路很简单,就是:同一个代码块,不要同时持有两个对象锁
    """
    lock1 = Lock()
    lock2 = Lock()

    t1 = Thread(target=run1)
    t2 = Thread(target=run2)

    t1.start()
    t2.start()

执行结果
可以看到, 在幸平创真拿到菜刀, 卫宫士郎拿到锅之后, 两人都不愿意释放手中的家伙, 因此无法继续进行做饭, 线程阻塞
在这里插入图片描述
放开注释的代码段之后, 由于每个代码块支持有一个对象锁, 因此不会发送死锁
在这里插入图片描述

生产者和消费者模式

多线程环境下,我们经常需要多个线程的并发和协作. 这个时候,就需要了解一个重要的多线程并发协作模型“生产者/消费者模式

  • 生产者
    生产者指的是负责生产数据的模块(这里模块可能是:方法、对象、线程、进程)
  • 消费者
    消费者指的是负责处理数据的模块(这里模块可能是:方法、对象、线程、进程)
  • 缓冲区
    存放数据的模块, 生产者将生产好的数据放入“缓冲区”,消费者从“缓冲区”拿要处理的数据
    在这里插入图片描述
    优点:
  • 实现线程的并发协作
  • 异步, 解耦了生产者和消费者
  • 解决忙闲不均,提高效率

缓冲区和queue对象

从一个线程向另一个线程发送数据最安全的方式可能就是使用queue 库中的队列了。
创建一个被多个线程共享的 Queue对象,这些线程通过使用 put() 和 get() 操作来向队列中添加或者删除元素。
Queue对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据
Queue对象类似进程中的Manager管理器, 本质都是创建了共享数据, 然后在不同进程/线程之间共享

实现步骤:

  • 创建queue队列, 创建生产者消费者线程
  • 生产者通过 queue.put()将数据放入queue队列, 消费者通过调用queue.get() 获取queue中的数据

实操代码

import queue
from threading import Thread
from time import sleep


def producer():
    num = 1
    while True:
        if queue.qsize() < 5:
            print(f"生产:{num}号消息")
            queue.put(f"消费: {num}号消息")
            num += 1
        else:
            print("消息队列已满, 等待被消费")
        sleep(1)


def consumer():
    while True:
        print(f"获取消息:{queue.get()}")
        sleep(1)


if __name__ == "__main__":
    queue = queue.Queue()
    # 注意这里 target=函数名, 而不是 target=函数名()
    t = Thread(target=producer)
    t.start()
    c = Thread(target=consumer)
    c.start()
    c2 = Thread(target=consumer)
    c2.start()

执行结果

在这里插入图片描述

协程

协程也叫作纤程(Fiber),是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理.
我们可以将协程理解为运行在线程上的代码块, 协程挂起并不会引起线程阻塞, 他的作用是提高线程的利用率…
协程之间可以依靠邮箱来进行通信和数据共享, 了避免内存共享数据而带来的线程安全问题.
因为其轻量和高利用率的特点, 即使创建上千个线程也不会对系统造成很大负担, 而线程则恰恰相反.
协程是一种设计思想,不仅仅局限于某一门语言. 在Go, Java, Python 等语言中均有实现

协程的优点

  • 由于自身带有上下文和栈,无需线程上下文切换的开销,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级;
  • 无需原子操作的锁定及同步的开销;
  • 方便切换控制流,简化编程模型
  • 单线程内就可以实现并发的效果,最大限度地利用cpu,且可扩展性高,成本低(注:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理)

协程的缺点

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上

协程与线程的比较

  • 在单线程同步模型中,任务按照顺序执行
    如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行
  • 多线程模型中,多个任务分别在独立的线程中执行
    这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行. 这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行
  • 协程版本的程序中,多个任务交错执行,但仍然在一个单独的线程控制中
    当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件. 事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。

asyncio实现协程(重点)

  • 正常的函数执行时是不会中断的,所以你要写一个能够中断的函数,就需要加 async
  • async 用来声明一个函数为异步函数,异步函数的特点是能在函数执行过程中挂起,去执行其他
    异步函数,等到挂起条件(假设挂起条件是 sleep(5) )消失后,也就是5秒到了再回来执行
  • await 用来用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂
    起,去执行其他的异步程序。
  • asyncio 是python3.5之后的协程模块,是python实现并发重要的包,这个包使用事件循环驱动实
    现并发
  • asyncio协程是写爬虫比较好的方式. 比多线程和多进程都好.开辟新的线程和进程是非常耗时的

实操代码

  1. 不使用协程时

    # 不使用协程执行多个任务
    import time
    
    
    def fun1():
        for i in range(3):
            print(f'原子弹:第{i}次爆炸啦')
            time.sleep(1)
        return "fun1执行完毕"
    
    
    def fun2():
        for k in range(3):
            print(f'氢弹:第{k}次爆炸了')
            time.sleep(1)
        return "fun2执行完毕"
    
    
    def main():
        fun1()
        fun2()
    
    
    if __name__ == "__main__":
        start_time = time.time()
        main()
        end_time = time.time()
        print(f"耗时{end_time - start_time}")  # 不使用协程,耗时6秒
    
    
  2. 使用使用yield协程,实现任务切换

    # 不使用协程执行多个任务
    import time
    
    
    def fun1():
        for i in range(3):
            print(f'原子弹:第{i}次爆炸啦')
            yield   # 只要方法包含了yield,就变成一个生成器
            time.sleep(1)
        return "fun1执行完毕"
    
    
    def fun2():
        g = fun1()   # fun1是一个生成器,fun1()就不会直接调用,需要通过next()或for循环调用
        print(type(g))
        for k in range(3):
            print(f'氢弹:第{k}次爆炸了')
            next(g)   # 继续执行fun1的代码
            time.sleep(1)
        return "fun2执行完毕"
    
    
    def main():
        fun1()
        fun2()
    
    
    if __name__ == "__main__":
        start_time = time.time()
        main()
        end_time = time.time()
        print(f"耗时{end_time - start_time}")  # 耗时5.0秒,效率差别不大
    
    
  3. 使用asyncio异步IO的典型使用方式实现协程

    实现步骤:

    • 创建两个异步方法fun1, fun2
    • 创建一个main方法来管理上面两个异步方法 await asyncio.gather(fun1(), fun2())
    • 主函数中通过 asyncio.run(main()) 来运行main方法

    实操代码

    # 不使用协程执行多个任务
    import asyncio
    import time
    
    
    async def fun1():  # async表示方法是异步的
        for i in range(3):
            print(f'原子弹:第{i}次爆炸啦')
            # await异步执行func1方法
            await asyncio.sleep(1)
        return "fun1执行完毕"
    
    
    async def fun2():
        for k in range(3):
            print(f'氢弹:第{k}次爆炸了')
            # await异步执行func2方法
            await asyncio.sleep(1)
        return "fun2执行完毕"
    
    
    async def main():
        res = await asyncio.gather(fun1(), fun2())
        # 返回值为函数的返回值列表,本例为["func1执行完毕", "func2执行完毕"]
        print(res)
    
    if __name__ == "__main__":
        start_time = time.time()
        asyncio.run(main())
        end_time = time.time()
        print(f"耗时{end_time - start_time}")  # 耗时3秒,效率极大提高
    
    

    实操结果
    从这里可以看到, 这里进用了3.02s左右, 只比理论最短用时3s多了0.02s左右, 从这里可以看出使用协程的巨大优势
    在这里插入图片描述


相关文章:

  • 南大通用GBase 8a MPP Cluster开发接口简介
  • IntelliJ IDEA 插件推荐
  • Rt-Thread 启动流程与组件初始化
  • CentOS-7-x86_64 iso镜像的安装(Linux操作系统)
  • Parcel配置public静态文件目录
  • 设计模式——策略模式
  • “一万字”动静图生动结合详解:快速排序
  • Linux命令详解(14)useradd命令
  • 面试题之Java的异常
  • k8s helm Seata1.5.1
  • 物业公司如何解决降本增收?快鲸智慧社区系统来帮你
  • MobTech 短信验证Android端 API
  • kafka学习总结
  • 政策汇总 | 川渝发布若干政策支持双城经济圈健康发展、岷山行动计划第三批项目申报解答......近期16个政策汇总
  • Vue3.0中使用路由进行跳转和传参以及取值
  • ECMAScript6(0):ES6简明参考手册
  • E-HPC支持多队列管理和自动伸缩
  • hadoop入门学习教程--DKHadoop完整安装步骤
  • JavaScript工作原理(五):深入了解WebSockets,HTTP/2和SSE,以及如何选择
  • Java应用性能调优
  • Laravel 菜鸟晋级之路
  • Laravel核心解读--Facades
  • node入门
  • Selenium实战教程系列(二)---元素定位
  • Swoft 源码剖析 - 代码自动更新机制
  • windows-nginx-https-本地配置
  • 函数式编程与面向对象编程[4]:Scala的类型关联Type Alias
  • 看域名解析域名安全对SEO的影响
  • 离散点最小(凸)包围边界查找
  • 聊聊directory traversal attack
  • 我的业余项目总结
  • 线性表及其算法(java实现)
  • 想使用 MongoDB ,你应该了解这8个方面!
  • 怎么将电脑中的声音录制成WAV格式
  • 正则表达式
  • 湖北分布式智能数据采集方法有哪些?
  • 小白应该如何快速入门阿里云服务器,新手使用ECS的方法 ...
  • #前后端分离# 头条发布系统
  • (01)ORB-SLAM2源码无死角解析-(66) BA优化(g2o)→闭环线程:Optimizer::GlobalBundleAdjustemnt→全局优化
  • (06)金属布线——为半导体注入生命的连接
  • (14)目标检测_SSD训练代码基于pytorch搭建代码
  • (3)nginx 配置(nginx.conf)
  • (52)只出现一次的数字III
  • (机器学习-深度学习快速入门)第三章机器学习-第二节:机器学习模型之线性回归
  • (算法)前K大的和
  • (学习日记)2024.02.29:UCOSIII第二节
  • (一)Java算法:二分查找
  • (转)总结使用Unity 3D优化游戏运行性能的经验
  • (状压dp)uva 10817 Headmaster's Headache
  • ***监测系统的构建(chkrootkit )
  • ./configure,make,make install的作用(转)
  • .bashrc在哪里,alias妙用
  • .NET 4.0中的泛型协变和反变
  • .net core 源码_ASP.NET Core之Identity源码学习
  • .net websocket 获取http登录的用户_如何解密浏览器的登录密码?获取浏览器内用户信息?...