Python基础语法(17多线程线程锁单例模式)
Python基础语法文章导航:
- Python基础(01初识数据类型&变量)
- Python基础(02条件&循环语句)
- Python基础(03字符串格式化&运算符&进制&编码)
- Python基础(04 基础练习题)
- Python数据类型(day05整型&布尔类型&字符串类型)
- Python数据类型(06列表&元组)
- Python数据类型(07集合&字典&浮点型&None)
- Python文件操作01(自动化测试文件相关操作)
-
Python函数入门(08函数定义&参数&返回值)
-
Python文件操作02(自动化测试文件相关操作)
-
Python函数(10生成器&内置函数&推导式)
-
Python函数(11自定义模块&第三方模块&内置模块)
-
Python函数(12时间处理&正则表达式)
-
Python函数(13面向对象)
-
Python面向对象(15成员&成员修饰符)
-
Python函数(16进程和线程)
目录
一.多线程开发
1. t.start(),当前线程准备就绪(等待CPU调度,具体时间是由CPU来决定)。
2. t.join(),等待当前线程的任务执行完毕后再向下继续执行。
3. t.setDaemon(布尔值) ,守护线程(必须放在start之前)
二.线程安全
三.线程锁
1.Lock,同步锁
2.RLock,递归锁
四.死锁
五.线程池
六.单例模式(扩展)
一.多线程开发
import threadingdef task(arg):pass# 创建一个Thread对象(线程),并封装线程被CPU调度时应该执行的任务和相关参数。
t = threading.Thread(target=task,args=('xxx',))
# 线程准备就绪(等待CPU调度),代码继续向下执行。
t.start()print("继续执行...") # 主线程执行完所有代码,不结束(等待子线程)
线程的常见方法:
1. t.start()
,当前线程准备就绪(等待CPU调度,具体时间是由CPU来决定)。
import threadingloop = 10000000
number = 0def _add(count):global numberfor i in range(count):number += 1t = threading.Thread(target=_add,args=(loop,))
t.start()print(number)
2. t.join()
,等待当前线程的任务执行完毕后再向下继续执行。
import threadingnumber = 0def _add():global numberfor i in range(10000000):number += 1t = threading.Thread(target=_add)
t.start()t.join() # 主线程等待中...print(number)
import threading
number = 0
def _add():global numberfor i in range(10000000):number += 1
def _sub():global numberfor i in range(10000000):number -= 1
t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.start()
t2.join() # t2线程执行完毕,才继续往后走
print(number)
import threading
loop = 10000000
number = 0
def _add(count):global numberfor i in range(count):number += 1
def _sub(count):global numberfor i in range(count):number -= 1
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走
print(number)
3. t.setDaemon(布尔值)
,守护线程(必须放在start之前)
-
t.setDaemon(True)
,设置为守护线程,主线程执行完毕后,子线程也自动关闭。 -
t.setDaemon(False)
,设置为非守护线程,主线程等待子线程,子线程执行完毕后,主线程才结束。(默认)
import threading
import timedef task(arg):time.sleep(5)print('任务')t = threading.Thread(target=task, args=(11,))
t.setDaemon(True) # True/False
t.start()print('END')
线程名称的设置和获取
import threading
def task(arg):# 获取当前执行此代码的线程name = threading.current_thread().getName()print(name)
for i in range(10):t = threading.Thread(target=task, args=(11,))t.setName('日魔-{}'.format(i))t.start()
自定义线程类,直接将线程需要做的事写到run方法中。
import threading
class MyThread(threading.Thread):def run(self):print('执行此线程', self._args)
t = MyThread(args=(100,))
t.start()
import requests
import threadingclass DouYinThread(threading.Thread):def run(self):file_name, video_url = self._argsres = requests.get(video_url)with open(file_name, mode='wb') as f:f.write(res.content)url_list = [("东北F4模仿秀.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0300f570000bvbmace0gvch7lo53oog"),("卡特扣篮.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f3e0000bv52fpn5t6p007e34q1g"),("罗斯mvp.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f240000buuer5aa4tij4gv6ajqg")
]
for item in url_list:t = DouYinThread(args=(item[0], item[1]))t.start()
二.线程安全
一个进程中可以有多个线程,且线程共享所有进程中的资源。
多个线程同时去操作一个"东西",可能会存在数据混乱的情况,例如:
示例1:
import threadingloop = 10000000
number = 0def _add(count):global numberfor i in range(count):number += 1def _sub(count):global numberfor i in range(count):number -= 1t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走print(number) #-4807368
import threadinglock_object = threading.RLock()loop = 10000000
number = 0def _add(count):lock_object.acquire() # 加锁global numberfor i in range(count):number += 1lock_object.release() # 释放锁def _sub(count):lock_object.acquire() # 申请锁(等待)global numberfor i in range(count):number -= 1lock_object.release() # 释放锁t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走print(number) 3import threadinglock_object = threading.RLock()loop = 10000000
number = 0def _add(count):lock_object.acquire() # 加锁global numberfor i in range(count):number += 1lock_object.release() # 释放锁def _sub(count):lock_object.acquire() # 申请锁(等待)global numberfor i in range(count):number -= 1lock_object.release() # 释放锁t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走print(number) #0
示例2:
import threading
num = 0def task():global numfor i in range(1000000):num += 1print(num)for i in range(2):t = threading.Thread(target=task)t.start()
# 805594
# 1072361
import threadingnum = 0
lock_object = threading.RLock()def task():print("开始")lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。global numfor i in range(1000000):num += 1lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了print(num)for i in range(2):t = threading.Thread(target=task)t.start()
# 开始
# 开始
# 1000000
# 2000000
import threadingnum = 0
lock_object = threading.RLock()def task():print("开始")with lock_object: # 基于上下文管理,内部自动执行 acquire 和 releaseglobal numfor i in range(1000000):num += 1print(num)for i in range(2):t = threading.Thread(target=task)t.start()# 开始
# 开始
# 1000000
# 2000000
三.线程锁
在程序中如果想要自己手动加锁,一般有两种:Lock 和 RLock。
1.Lock,同步锁
import threadingnum = 0
lock_object = threading.Lock()def task():print("开始")lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。global numfor i in range(1000000):num += 1lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了print(num)for i in range(2):t = threading.Thread(target=task)t.start()# 开始
# 开始
# 1000000
# 2000000
2.RLock,递归锁
import threadingnum = 0
lock_object = threading.RLock()def task():print("开始")lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。global numfor i in range(1000000):num += 1lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了print(num)for i in range(2):t = threading.Thread(target=task)t.start()
# 开始
# 开始
# 1000000
# 2000000
RLock支持多次申请锁和多次释放;Lock不支持。例如:
import threading
import timelock_object = threading.RLock()def task():print("开始")lock_object.acquire()lock_object.acquire()print(123)lock_object.release()lock_object.release()for i in range(3):t = threading.Thread(target=task)t.start()
import threading
lock = threading.RLock()# 程序员A开发了一个函数,函数可以被其他开发者调用,内部需要基于锁保证数据安全。
def func():with lock:pass# 程序员B开发了一个函数,可以直接调用这个函数。
def run():print("其他功能")func() # 调用程序员A写的func函数,内部用到了锁。print("其他功能")# 程序员C开发了一个函数,自己需要加锁,同时也需要调用func函数。
def process():with lock:print("其他功能")func() # ----------------> 此时就会出现多次锁的情况,只有RLock支持(Lock不支持)。print("其他功能")
四.死锁
死锁,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象。
import threadingnum = 0
lock_object = threading.Lock()def task():print("开始")lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。global numfor i in range(1000000):num += 1lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了print(num)for i in range(2):t = threading.Thread(target=task)t.start()
import threading
import time lock_1 = threading.Lock()
lock_2 = threading.Lock()def task1():lock_1.acquire()time.sleep(1)lock_2.acquire()print(11)lock_2.release()print(111)lock_1.release()print(1111)def task2():lock_2.acquire()time.sleep(1)lock_1.acquire()print(22)lock_1.release()print(222)lock_2.release()print(2222)t1 = threading.Thread(target=task1)
t1.start()t2 = threading.Thread(target=task2)
t2.start()
五.线程池
Python3中官方才正式提供线程池。
线程不是开的越多越好,开的多了可能会导致系统的性能更低了,例如:如下的代码是不推荐在项目开发中编写。
不建议:无限制的创建线程。
import threadingdef task(video_url):passurl_list = ["www.xxxx-{}.com".format(i) for i in range(30000)]for url in url_list:t = threading.Thread(target=task, args=(url,))t.start()# 这种每次都创建一个线程去操作,创建任务的太多,线程就会特别多,可能效率反倒降低了。
建议:使用线程池
示例1:
import time
from concurrent.futures import ThreadPoolExecutor# pool = ThreadPoolExecutor(100)
# pool.submit(函数名,参数1,参数2,参数...)def task(video_url,num):print("开始执行任务", video_url)time.sleep(5)# 创建线程池,最多维护10个线程。
pool = ThreadPoolExecutor(10)url_list = ["www.xxxx-{}.com".format(i) for i in range(300)]for url in url_list:# 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。pool.submit(task, url,2)print("END")
示例2:等待线程池的任务执行完毕。
import time
from concurrent.futures import ThreadPoolExecutordef task(video_url):print("开始执行任务", video_url)time.sleep(5)# 创建线程池,最多维护10个线程。
pool = ThreadPoolExecutor(10)url_list = ["www.xxxx-{}.com".format(i) for i in range(300)]
for url in url_list:# 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。pool.submit(task, url)print("执行中...")
pool.shutdown(True) # 等待线程池中的任务执行完毕后,在继续执行
print('继续往下走')
示例3:任务执行完任务,再干点其他事。
import time
import random
from concurrent.futures import ThreadPoolExecutor, Futuredef task(video_url):print("开始执行任务", video_url)time.sleep(2)return random.randint(0, 10)def done(response):print("任务执行后的返回值", response.result())# 创建线程池,最多维护10个线程。
pool = ThreadPoolExecutor(10)url_list = ["www.xxxx-{}.com".format(i) for i in range(15)]for url in url_list:# 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。future = pool.submit(task, url)future.add_done_callback(done) # 是子主线程执行# 可以做分工,例如:task专门下载,done专门将下载的数据写入本地文件。
示例4:最终统一获取结果。
import time
import random
from concurrent.futures import ThreadPoolExecutor,Futuredef task(video_url):print("开始执行任务", video_url)time.sleep(2)return random.randint(0, 10)# 创建线程池,最多维护10个线程。
pool = ThreadPoolExecutor(10)future_list = []url_list = ["www.xxxx-{}.com".format(i) for i in range(15)]
for url in url_list:# 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。future = pool.submit(task, url)future_list.append(future)pool.shutdown(True)
for fu in future_list:print(fu.result())
六.单例模式(扩展)
面向对象 + 多线程相关的一个面试题(以后项目和源码中会用到)。
之前写一个类,每次执行 类()
都会实例化一个类的对象。
class Foo:passobj1 = Foo()obj2 = Foo()
print(obj1,obj2)
简单的实现单例模式
class Singleton:instance = Nonedef __init__(self, name):self.name = namedef __new__(cls, *args, **kwargs):# 返回空对象if cls.instance:return cls.instancecls.instance = object.__new__(cls)return cls.instanceobj1 = Singleton('alex')
obj2 = Singleton('SB')print(obj1,obj2)
多线程执行单例模式,有BUG
import threading
import timeclass Singleton:instance = Nonedef __init__(self, name):self.name = namedef __new__(cls, *args, **kwargs):if cls.instance:return cls.instancetime.sleep(0.1)cls.instance = object.__new__(cls)return cls.instancedef task():obj = Singleton('x')print(obj)for i in range(10):t = threading.Thread(target=task)t.start()
加锁解决BUG
import threading
import time
class Singleton:instance = Nonelock = threading.RLock()def __init__(self, name):self.name = namedef __new__(cls, *args, **kwargs):with cls.lock:if cls.instance:return cls.instancetime.sleep(0.1)cls.instance = object.__new__(cls)return cls.instancedef task():obj = Singleton('x')print(obj)for i in range(10):t = threading.Thread(target=task)t.start()
加判断,提升性能
import threading
import time
class Singleton:instance = Nonelock = threading.RLock()def __init__(self, name):self.name = namedef __new__(cls, *args, **kwargs):if cls.instance:return cls.instancewith cls.lock:if cls.instance:return cls.instancetime.sleep(0.1)cls.instance = object.__new__(cls)return cls.instancedef task():obj = Singleton('x')print(obj)for i in range(10):t = threading.Thread(target=task)t.start()# 执行1000行代码data = Singleton('asdfasdf')
print(data)