Linux-10-线程
线程
- 前言
- 线程:
- 含义:
- 定义:
- 与进程区别:
- 线程共享内容:
- 线程特有内容:
- 优缺点:
- 优点:
- 用途:
- 缺点:
- 异常:
- 组成:
- LWP:
- 虚拟地址空间:
- 页表:
- 页表分级:
- 页目录:
- 页表项:
- 偏移量:
- 使用:
- pthread_create()创建:
- pthread_self()获取线程ID:
- return 终止:
- pthread_exit()终止:
- pthread_cancel()取消:
- pthread_join()等待:
- pthread_detach()分离:
- 互斥量:
- 含义:
- 临界区:
- mutex:
- 使用:
- 初始化:
- 静态分配:
- 动态分配:
- 加锁:
- 解锁:
- 销毁:
- 销毁静态分配所得:
- 销毁动态分配所得:
- 互斥量维护的抢票程序:
- 原理:
- mutex的创建:
- 加锁:
- 阻塞:
- 解锁:
- 总结;
- 条件变量:
- 含义:
- 使用:
- pthread_cond_init()初始化
- pthread_cond_destroy()销毁:
- pthread_cond_wait()阻塞:
- pthread_cond_signal()单个唤醒:
- pthread_cond_broadcast()全部唤醒:
- 实例:
- 生产者消费者模型:
- 定义:
- 阻塞队列:
- 信号量:
- 定义:
- 本质:
- PV操作:
- 信号量与互斥锁:
- 模拟信号量:
- 使用:
- sem_init()初始化:
- sem_wait()等待:
- sem_post()发布:
- sem_destroy()销毁:
- 信号量维护的抢票程序:
- 环形队列:
- 定义:
- 实例:
- 线程池:
- 含义:
- 背景:
- 定义:
- 使用场景:
- 使用原理:
- 线程池维护的计算器:
- Task类:
- ThreadPool类:
- main()函数:
- 结果与反思:
- 互斥锁/条件变量/信号量对比:
前言
Vue框架:
Vue驾校-从项目学Vue-1
算法系列博客友链:
神机百炼
线程:
含义:
定义:
-
线程的定义:
一个程序中的一个执行路线
与进程区别:
-
线程和进程的区别:
- 线程是进程内部的控制序列,一个进程内部至少含有一个线程
- 线程是作业处理和CPU调度的基本单位
- 进程是资源申请分配和访问控制的基本单位
线程共享内容:
-
线程共享进程的内容:
- 文件描述符表
- 信号处理方式
- 当前工作目录
- 用户id与组id
线程特有内容:
- 每个线程特有的内容:
- 线程ID
- 栈(每个线程创建自己的临时变量)
- 上下文寄存器(进程切换时保存当前内容)
- 调度优先级(PRI和NICE)
- 信号屏蔽字(毕竟可以通过sigprocmask()设定)
- errno
优缺点:
优点:
- 线程占用的资源比进程少很多
- 创建线程的代价比进程创建小很多
- 线程切换的消耗比进程切换小很多
- 多线程可以充分利用多处理器的可并行数量
- 对于计算密集型任务,可分解到多个线程中计算
- 对于IO密集型任务,将IO操作重叠,线程可以等待不同的IO操作
- 等待慢速IO操作结束同时,程序可执行其他计算任务
用途:
- 合理使用多线程,可以提升CPU计算密集型程序执行效率
- 合理使用多线程,可以提升IO密集型程序的用户体验(边播放边下载)
缺点:
- 性能损失:计算密集型线程很少和其他线程共享同一处理器,增加了其他线程调度和同步的开销
- 健壮性降低:多线程程序中,某一线程的微小失误造成的不良影响可能巨大
- 缺乏访问控制:进程是访问控制的基本粒度,线程中调用OS函数会对整个进程造成影响
- 编程难度提升:编写与调试一个多线程程序比单线程程序困难得多
异常:
- 线程是进程的执行分支,线程异常则进程异常,触发信号后进程及其内所有线程都终止
- 如单个线程出现除零/野指针问题,单个线程崩溃,则进程随之崩溃
组成:
LWP:
- PCB和LWP的区别:
- PCB:进程的管理工具
- LWP:线程的管理工具
- 当一个进程下只有一个线程时,可以说LWP就是PCB
- task_struct:
- 每个线程创建时,随之创建专属task_struct
(原讲述进程时,将task_struct描述为随进程创建是由于当时一个进程下只有一个线程) - 对于CPU而言,只能看到和调度每个线程的task_struct,符合线程是任务处理基本单位的设定
- 每个线程创建时,随之创建专属task_struct
虚拟地址空间:
- 线程虚拟地址空间:
- mm_struct是进程申请的虚拟地址空间
- task_struct是每个线程都特有的描述组织工具
- 进程地址空间中除栈内容为每个线程特有外,其他内容为所有线程共享
页表:
- 虚拟地址到物理地址的映射硬件:MMU(Memory Manger Unite)
- 虚拟地址到物理地址的映射软件:页表
页表分级:
-
页表分级存储的必要性:
- 32位平台下的地址数:2^32
- 32位平台下一个地址大小:4Byte
- 一张页表映射全地址,则页表粗略大小:2^35字节 == 32GB
- 32位平台下内存大小:4GB << 32GB
-
页表分级存储的示意图:
页目录:
- 页目录:针对地址前10位做区分
- 映射关系:通过虚拟地址前十位,初步确定到哪个页表项中查物理地址
- 页目录大小:
- 前十位地址数目:2^10
- 32位平台下一个地址大小:4Byte
- 页目录粗略大小:2^13 Byte == 8KB
页表项:
- 页框:内存/硬盘中数据的存储以4KB为一个单位,称为一个页框。页框地址就是页框中第一个存储空间中的首地址。
- 页表项:仅针对地址11~20位做区分
- 映射关系:通过虚拟地址中间十位,再确定到哪个页框中查物理地址
- 页表项大小:
- 中间十位地址数目:2^10
- 32位平台下一个地址大小:4Byte
- 每个页表项粗略大小:2^13 Byte == 8KB
偏移量:
- 偏移量:虚拟地址的最后12位
- 映射关系:通过虚拟地址的最后12位,加页框首地址,形成最终物理地址
使用:
pthread_create()创建:
-
pthread_create():
#include <pthread.h> pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg);
-
参数:
- *thread:输出型参数,返回系统层线程地址ID
- attr:线程属性,为NULL则为默认属性
- start_routine:线程将执行的函数地址
- arg:线程将执行的函数的参数
-
返回值:
- 创建线程成功:0
- 创建线程失败:具体错误码,而非-1
-
实例代码:
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <error.h>
#include <pthread.h>
void *func(void *arg){
while(1){
printf("child thread running\n");
sleep(1);
}
}
int main(){
pthread_t tid;
int ret;
if((ret = pthread_create(&tid, NULL, func, NULL)) != 0){
perror("pthread_create error\n");
exit(ret);
}
while(1){
printf("main thread running\n");
sleep(1);
}
return 0;
}
- 运行结果:
#编译指令
gcc -o pthread_create pthread_create.c -lpthread
pthread_self()获取线程ID:
-
前文提及了两次线程ID:
- 线程创建后含有的专属ID:进程调度层面的编号
- pthread_create()的第一个参数:开辟内存层面的地址
-
pthread_t类型的线程ID存储位置:
return 终止:
-
线程终止:只终止线程而不终止进程
-
线程终止三大方法:
-
线程函数return终止自己。
对主线程不适用,主线程return相当于exit()
-
线程调用pthread_exit()终止自己
-
调用pthread_cancel()终止另一进程
-
-
return:
void *func(void *arg){
int *p = (int*)malloc(sizeof(int));
*p = 1;
return (void*)p;
}
-
参数:
- 由于线程函数类型为void*,所以需要malloc()出来空间后强制类型转化
-
实例代码:
#include <stdio.h>
#include <error.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
void *func(void *arg){
printf("child thread ready to return\n");
int *p = (int*)malloc(sizeof(int));
*p = 1;
return (void*)p;
}
int main(){
int ret = 0;
pthread_t tid;
if((ret = pthread_create()){
perror("pthread_create error");
return -1;
}
printf("main thread ready to return\n");
while(1);
return 0;
}
- 运行结果:
pthread_exit()终止:
-
pthread_exit():
#include <pthread.h> void pthread_exit(void *value_ptr);
-
作用:终止调用该函数的线程
-
参数:
- value_ptr不可指向局部变量,只能指向堆上malloc所得内容
- malloc所得空间指针最终得强转为(void *)
-
返回值:由于调用成功则线程结束,所以该函数无返回值
-
实例代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <error.h>
#include <pthread.h>
void *func(void *arg){
printf("child thread ready to exit\n");
int *p = (int *)malloc(sizeof(int))
*p = 1;
pthread_exit((void*)p);
}
int main(){
int ret = 0;
pthread_t tid;
if((ret = pthread_create(&tid, NULL, func, NULL)) != 0){
perror("pthread_create error");
return -1;
}
printf("main thread ready to return\n");
while(1);
return 0;
}
- 运行结果:
pthread_cancel()取消:
-
pthread_cancel():
#include <pthread.h> int pthread_cancel(pthread_t thread);
-
作用:终止以参数为线程id的线程
-
参数:线程id
-
返回值:
- 终止目标线程成功:返回0
- 终止目标线程失败:错误码
-
实例代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <error.h>
#include <>
void *func(){
while(1){
printf("child thread is running\n");
sleep(1);
}
return NULL;
}
int main(){
int ret = 0;
pthread_t tid;
if((ret = pthread_create(&tid, NULL, func, NULL)) != 0){
perror("pthread_create error");
return -1;
}
sleep(5);
pthread_cancel(tid);
return 0;
}
- 运行结果:
pthread_join()等待:
-
为什么需要线程等待?
- 已退出的线程空间未被释放,还保留在进程地址空间内
- 新创建的线程不会复用未释放的线程空间
-
pthread_join():
#include <pthread> int pthread_join(pthread_t thread, void **value_ptr);
-
作用:等待thread号线程结束
-
参数:
- thread:线程ID
- value_ptr:输出型参数,存储已终止线程的退出码
-
返回值:
- 等待成功:0
- 等待失败:错误码
-
线程等待的终止状态:
根据线程终止的条件不同,线程等待得到的终止状态不同
- 线程通过return返回,value_ptr所指单元存放返回值
- 线程通过pthread_exit()终止,value_ptr所指单元存放pthread_exit()内参数值
- 线程被pthread_cancle(),value_ptr所指单元存放常数PTHREAD_CANCELED
-
进程等待的时间线:
-
实例代码:
#include <stdio.h>
#include <pthread.h>
#include <string.h>
#include <stdlib.h>
#include <error.h>
#include <unistd.h>
void *thread1(void *arg){
printf("thread1 return\n");
int *p = (int*)malloc(sizeof(int));
*p = 1;
return (void*)p;
}
void *thread2(void *arg){
printf("thread2 pthread_exit\n");
int *p = (int*) malloc(sizeof(int));
*p = 2;
pthread_exit((void*)p);
}
void *thread3(void *arg){
while(1){
printf("thread3 pthread_cancled\n");
sleep(1);
}
return NULL;
}
int main(){
pthread_t tid;
void* ret;
//thread1:return
pthread_create(&tid, NULL, thread1, NULL);
pthread_join(tid, &ret);
printf("thread1 return %d\n", *(int*)ret);
free(ret);
//thread2:pthread_exit
pthread_create(&tid, NULL, thread2, NULL);
pthread_join(tid, &ret);
printf("thread2 pthread_exit(%d)\n", *(int*)ret);
free(ret);
//thread3:pthread_cancel
pthread_create(&tid, NULL, thread3, NULL);
sleep(3);
pthread_cancel(tid);
pthread_join(tid, &ret);
if(ret == PTHREAD_CANCELED){
printf("thread pthread_canceled PTHREAD_CANCELED\n");
}else{
printf("thread isn't pthread_canceled\n");
}
return 0;
}
- 运行结果:
pthread_detach()分离:
-
进程分离:
- 默认情况下,新创建的线程是joinable的,线程退出之后,需要对其等待,否则无法释放资源,造成系统泄漏
- 不关心线程的返回值时,pthread_join()是一种负担,这时可以通知系统该线程退出时,自动释放线程资源即可
-
矛盾:线程一旦分离,则不可再等待该进程退出了
-
pthread_detach():
#include <pthread.h> int pthread_detach(pthread_t thread);
-
参数:
- thread为线程的内存层面的id
-
实例代码:
#include <stdio.h>
#include <pthread.h>
#include <error.h>
#include <unistd.h>
void *func(void *arg){ pthread_detach(pthread_self());
printf("%s\n", (char*)arg);
return NULL;
}
int main(){
int ret = 0;
pthread_t tid;
if((ret = pthread_create(&tid, NULL, func, "child thread detach...")) != 0){ perror("pthread_create error...");
return -1;
}
sleep(3); //确保线程分离
if(pthread_join(tid, NULL) == 0){
printf("pthread_join wait success\n");
}else{
printf("pthread_join wait failed\n");
}
return 0;
}
- 运行结果:
互斥量:
含义:
临界区:
- 临界资源:多线程执行流共享的资源
- 临界区:每个线程内部访问临界资源的代码
- 互斥:任何时刻,保证有且只有一个执行流进入临界区访问临界资源
- 原子性:不会被任何调度机制打断的操作,该操作只有两态:完成&未开始
mutex:
-
线程中的变量:
-
局部变量:
变量的地址空间在线程栈空间内,变量归属单个线程,其他线程无法获得该变量
-
共享变量:
多个进程都可以访问到的变量
-
-
互斥量mutex:
-
本质:为线程加的一把锁
-
作用:
- 线程互斥:当一个线程进入临界区执行时,不允许其他线程进入该临界区
- 单一线程:当临界区没有线程在执行时,只能允许一个线程进入临界区执行
- 阻止线程:如果线程不在临界区运行,则该线程不能阻止其他线程进入临界区
-
- 加锁后的临界区示意图:
使用:
初始化:
静态分配:
- PTHREAD_MUTEX_INITIALIZER:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
动态分配:
- pthread_mutex_init():
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
- 参数:
- mutex:要初始化的互斥量
- attr:NULL即可
加锁:
-
加锁可能遇到的情况:
- 互斥量处于未锁时,加锁函数将锁定该互斥量,同时返回成功
- 互斥量已经被锁时,加锁函数将陷入阻塞等待,当互斥量解锁后立马为其加锁
-
pthread_lock():
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
-
参数:要加锁的互斥量
-
返回值:
- 加锁成功:返回0
- 加锁失败:返回错误信号
解锁:
-
pthread_mutex_unlock():
#include <pthread.h> int pthread_mutex_unlock(pthread_mutex_t *mutex);
-
参数:要解锁的互斥量
-
返回值:
- 加锁成功:返回0
- 加锁失败:返回错误信号
销毁:
销毁静态分配所得:
- PTHREAD_MUTEX_INITIALIZER不需要手动销毁
销毁动态分配所得:
-
销毁动态分配所得的互斥量前确定:
- 该互斥量未加锁
- 该互斥量后续不会有线程尝试加锁
-
pthread_mutex_destroy():
#include <pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex);
- 参数:要销毁的互斥量
互斥量维护的抢票程序:
- 实例代码:判断ticket==0前就得加锁
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int ticket = 100;
pthread_mutex_t mutex;
void *func(void *arg){
char* id = (char*) arg;
while(1){
pthread_mutex_lock(&mutex);
if(ticket > 0){
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
pthread_mutex_unlock(&mutex);
}else{
pthread_mutex_unlock(&mutex);
break;
}
}
}
int main(){
pthread_t t1, t2, t3, t4;
pthread_mutex_init(&mutex, NULL);
pthread_create(&t1, NULL, func, "thread 1");
pthread_create(&t2, NULL, func, "thread 2");
pthread_create(&t3, NULL, func, "thread 3");
pthread_create(&t4, NULL, func, "thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
pthread_mutex_destory(&mutex);
}
- 运行结果:
原理:
- 前提:
- 所有线程共享mutex变量
- 每个线程在cpu内独有自己的al寄存器
mutex的创建:
-
共享变量mutex:
一经创建和初始化则内存中的mutex值为1
加锁:
-
pthread_mutex_lock():
加锁含有两步操作和一步判断:
-
将该线程拥有的CPU内与锁有关的al寄存器内值置为0
-
CPU原子交换值操作:swap()/exchange()
-
判断共享变量mutex值是否为0,若为0则线程阻塞,若非0则线程进入执行临界区代码
-
阻塞:
-
锁阻塞:
当一个线程已经将互斥锁持有,此时另一线程尝试取地锁时,该线程也执行了pthread_mutex_lock()的三大步骤,且是否持有锁查看的是最后的判断步骤
- 该线程在cpu内拥有的al寄存器内值置为0
- al寄存器值和mutex值发送swap()/exchange()
- 判断mutex值是否为1,若为1则成功取得锁,若为0则继续阻塞
解锁:
-
pthread_mutex_unlock():
- 解锁本质只有一步操作:将mutex值置为1
- 解锁本质只有一步操作:将mutex值置为1
总结;
- 可以将共享变量mutex值置为1的操作:
- PTHREAD_MUTEX_INITIALIZER
- pthread_init()
- pthread_create()三大操作:
- 线程独有的al寄存器值化0
- al寄存器和mutex值交换
- 判断mutex值是否为1
- mutex值本质像是一个消耗品:
- 初始化/解锁时为mutex提供一个1
- 一个线程通过加锁彻底消耗mutex的1
条件变量:
含义:
-
适用情况:
一个线程等待"条件变量的条件成立"而挂起;
另一个线程使"条件成立"。 为了防止竞争,
-
同步:保证数据安全前提下,让线程能够按照某种特定顺序访问临界资源,从而避免饥饿问题
-
竞争条件:由于时序问题导致程序异常
-
定义:在多线程程序中用来实现"等待 -> 唤醒"逻辑常用的方法。
-
搭配使用:pthread_mutex_t,即条件变量的使用总是和一个互斥锁结合在一起。
使用:
pthread_cond_init()初始化
-
pthread_cond_init():
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
-
作用:初始化条件变量cond
-
参数:
- cond:要初始化的条件变量
- attr:属性为NULL即默认
pthread_cond_destroy()销毁:
-
pthread_cond_destroy():
int pthread_cond_destroy(pthread_cond_t *cond);
-
作用:销毁条件变量cond
-
参数:
- 要销毁的条件变量
pthread_cond_wait()阻塞:
-
pthread_cond_wait():
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
-
作用:暂时释放当前线程持有的锁后,将调用该函数的线程阻塞等待
-
参数:
- 条件变量cond:用于监听条件变量是否满足
- 互斥量mutex:
- 进入等待时,该锁会自动释放,让其他线程获取锁而继续执行
- 如果当前等待线程被唤醒,又会自动获得对应的mutex互斥锁
pthread_cond_signal()单个唤醒:
-
pthread_cond_signal():
int pthread_cond_signal(pthread_cond_t *cond)
-
作用:通过修改条件变量cond值,使得一个线程摆脱阻塞
-
参数:
- 条件变量cond
pthread_cond_broadcast()全部唤醒:
-
pthread_cond_broadcast():
int pthread_cond_broadcast(pthread_cond_t *cond);
-
作用:通过修改条件变量cond值,使得所有线程摆脱阻塞
-
参数:
- 条件变量cond
实例:
- 实例代码:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
pthread_mutex_t mutex;
pthread_cond_t cond;
void *run(void *arg){
pthread_detach(pthread_self());
while(1){
pthread_wait();
cout<<"thread "<<pthread_self()<<" 摆脱条件变量阻塞"<<endl;
sleep(1);
}
pthread_exit((void*)0);
}
int main(){
pthread_mutex_init(&mutex, nullptr);
pthread_cond_init(&cond, nullptr);
pthread_t t1, t2, t3;
pthread_create(&t1, nullptr, run, nullptr);
pthread_create(&t2 nullptr, run, nullptr);
pthread_create(&t3, nullptr, run, nullptr);
while(1){
getchar();
pthread_cond_signal(&cond);
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
return 0;
}
- 运行结果:
生产者消费者模型:
定义:
- 定义:生产者和消费者不直接通讯,而是通过阻塞队列来进行通信。生产者生产完数据之后不等待消费者获取,而是直接存储到阻塞队列中,消费者需要数据时不向生产者索取,而是直接从阻塞队列中获取。
- 优点:
- 阻塞队列相当于一个缓冲区,平衡了生产者和消费者的处理能力,将两者解耦
- 支持生产者消费者忙闲不均
- 支持多生产者和多消费者并发
- 同步和异步:
- 异步:当两件事没有先后或资源上的依赖关系时,先处理哪一件对于另一件都是阻碍
- 同步:当两件事有先后或资源上的依赖关系时,先处理第一件对于第二件来说属于铺垫或帮助
- 图示:
阻塞队列:
- 条件变量实现阻塞队列代码:
#include <iostream>
#include <pthread.h>
#include <cstdlib>
#include <ctime>
#include <queue>
#include <unistd.h>
#define NUM 32
using namespace std;
template<typename T>
class BlockQueue{
private:
bool IsFUll(){
return q.size() == cap;
}
bool IsEmpty(){
return q.size() == 0;
}
public:
BlockQueue(int _cap = NUM){
cap = _cap;
pthread_mutex_init(&mutex, nullptr);
pthread_cond_init(&full, nullptr);
pthread_cond_init(&empty, nullptr);
}
void Push(const T& in){
pthread_mutex_lock(&mutex);
if(IsFUll()){
//队列满则阻塞等待
pthread_cond_wait(&full, &mutex);
}
q.push(in);
pthread_mutex_unlock(&mutex);
//生产后队列不为空,供消费者消费:
pthread_cond_signal(&empty);
}
void Pop(T& out){
pthread_mutex_lock(&mutex);
if(IsEmpty()){
//队列为空则阻塞等待
pthread_cond_wait(&empty, &mutex);
}
out = q.front();
q.pop();
pthread_mutex_unlock(&mutex);
//消费后队列不满,供生产者生产
pthread_cond_signal(&full);
}
~BlockQueue(){
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&full);
pthread_cond_destroy(&empty);
}
private:
std::queue<T> q; //临界资源
pthread_mutex_t mutex; //锁
pthread_cond_t full, empty;
int cap;
};
void *Producter(void *arg){
auto bq = (BlockQueue<int> *)arg;
while(1){
int data = rand()%100 + 1;
bq->Push(data);
sleep(1);
}
}
void *Consumer(void *arg){
auto bq = (BlockQueue<int> *)arg;
while(1){
int data = 0;
bq->Pop(data);
cout<<"consumer: "<<data<<endl;
sleep(1);
}
}
int main(){
BlockQueue<int> *bq = new BlockQueue<int>();
srand((unsigned long)time(NULL));
pthread_t p, c;
pthread_create(&p, nullptr, Consumer, bq);
pthread_create(&c, nullptr, Producter, bq);
pthread_join(p, nullptr);
pthread_join(c, nullptr);
return 0;
}
- 运行结果:
- 生产者消费者交替出现的原因分析:
- 第一个执行的必然是product线程:
- 不论开始是product线程还是consumer线程先被调度,队列都为空
- product生产一个元素结束后,释放锁
- 第二个执行的必然是consumer线程:
- 此时consumer若在等待,则被pthread_cond_signal()唤醒
- 此时consumer若还未被调度,则由于product的sleep(1)的时间间隔而被调度
- 第三个执行的必然是product线程:
- consumer消耗元素后,队列为空
- 理论上此时调度consumer线程,consumer陷入等待
- 事实上由于consumer的sleep(1),product线程必然被调度
- 第一个执行的必然是product线程:
- 防止两者较替出现的代码措施:
- 修改各自的sleep()时间
- 修改唤醒full / empty的条件
信号量:
定义:
本质:
-
信号量的本质:
-
核心是一个计数器,描述临界资源中的资源数目
-
功能实现依赖mutex互斥锁
-
维护着一个阻塞队列,实现阻塞线程之间相对有序
-
本身即是一个临界资源:多个线程都可以看到和操作该资源
-
伪代码:
struct { ptread_mutex_t mutex; //以锁维护原子性 int count; //核心计数器 task_struct *queue; //阻塞队列 };
-
PV操作:
-
P操作:
-
申请信号量
-
本质是计数器- -,是依赖锁实现的原子操作
-
当P操作申请不到资源时,一定是资源暂时为空
此时线程陷入阻塞,进入信号量自带的阻塞队列
当P操作申请到资源时,获得的是资源的使用权,而不是已经在使用资源
-
-
V操作:
- 释放信号量
- 本质是计数器++,是依赖锁实现的原子操作
信号量与互斥锁:
- 信号量与互斥锁的关联:
- 当信号量的sem_init()中参数value==1时,信号量基本等价于线程略有序版的互斥锁
- 信号量本身的PV操作原子性是由锁来维护的
- 信号量与互斥锁的区别:
- 对于互斥锁:所有线程同时争夺,每次争夺没有优先级
- 对于信号量:由于阻塞队列的存在,未争夺成功的线程已经有序排队
- 信号量与条件变量的使用差别:
- 条件变量的pthread_cond_wait()方法中还需要pthread_mutex_t参数
- 信号量的sem_wait()和sem_post()方法中不需要pthread_mutex_t参数,毕竟sem_t内部自带mutex
模拟信号量:
-
问:自定义临界全局变量充当信号量是否可行?
答:
- 全局变量的++ / --并非原子操作
- 但是可以通过加锁来模拟系统提供的信号量
- 也通过维护线程队列模拟信号量阻塞
使用:
sem_init()初始化:
-
sem_init():
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value);
-
参数:
- sem:信号量本身
- pshared:为0表示线程间共享,非0表示进程间共享
- value:信号量计数器初始值,不论初始值多少,当count==0时才阻塞
sem_wait()等待:
-
sem_wait():
#include <semaphore.h> int sem_wait(sem_t *sem);
-
作用:P操作,信号量值-1
sem_post()发布:
-
sem_post():
#include <semaphore.h> int sem_post(sem_t *sem);
-
作用:V操作,信号量值+1
sem_destroy()销毁:
-
sem_destory():
#include <semaphore.h> int sem_destroy(sem_t *sem);
信号量维护的抢票程序:
- 利用c++核心的类来实现信号量抢票,其实也可以设立全局sem
- 实例代码:
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
using namespace std;
class Sem{
private:
sem_t sem;
public:
Sem(int num = 1){
sem_init(&sem, 0, num);
}
void P(){
sem_wait(&sem);
}
void V(){
sem_post(&sem);
}
~Sem(){
sem_destroy(&sem);
}
};
Sem sem(1);
int ticket = 2000;
void *GetTickets(void *arg){
string id = (char *)arg;
sleep(1);
while(1){
sem.P();
if(ticket > 0){
usleep(10000);
cout<<id <<" take ticket:"<<ticket--<<endl;
sem.V();
}else{
sem.V();
break;
}
}
cout<<id<<" quit"<<endl;
pthread_exit((void*)0);
}
int main(){
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, nullptr,GetTickets, "thread 1");
pthread_create(&tid2, nullptr, GetTickets, "thread 2");
pthread_create(&tid3, nullptr, GetTickets, "thread 3");
pthread_join(tid1, nullptr):
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
return 0;
}
- 运行效果:
环形队列:
定义:
-
在基于队列的生产者消费者模型中,我们发现:
- 生产者:关注队列内空间剩余量
- 消费者:关注队列内含有的数据量
-
环形队列生产消费原则:
- 生产和消费不能指向同一位置
- 无论生产还是消费,都不应该将对方套一圈以上
以上两条原则都由信号量来维护
实例:
- 实例代码:P()操作申请不到资源,自动阻塞
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#define NUM 5
using namespace std;
template <typename T>
class RingQueue{
private:
int cap;
vector<T> q;
sem_t blank, data;
int p_pos, c_pos;
private:
void P(sem_t &sem){
sem_wait(&sem);
}
void V(sem_t &sem){
sem_post(&sem);
}
public:
RingQueue(int _cap = NUM){
cap = _cap;
q.resize(cap);
sem_init(&blank, 0, cap);
sem_init(&data, 0, 0);
p_pos = c_pos = 0;
}
void Push(const T& in){
P(blank);
q[p_pos++] = in;
V(data);
p_pos %= cap;
}
void Pop(T& out){
P(data);
out = q[c_pos++];
V(blank);
c_pos %= cap;
}
~RingQueue(){
sem_destroy(&blank);
sem_destroy(&data);
}
}
void *product(void *arg){
RingQueue<int> *rq = (RingQueue<int>*) arg;
pthread_detach(pthread_self());
while(1){
int x = rand()%100 + 1;
rq->Push(x);
cout<<"Productor <<< "<<x<<endl;
sleep(1);
}
}
void *consume(void *arg){
RingQueue<int> *rq =
(RingQueue<int>*) arg; pthread_detach(pthread_self());
while(1){
int x = 0;
rq->Pop(x);
cout<<"Consumer <<< "<<x<<endl;
sleep(1);
}
}
int main(){
srand((unsigned long) time(0));
RingQueue<int> *rq = new RingQueue<int>();
pthread_t p, c;
pthread_create(&p, nullptr, Product, rq);
pthread_create(&c, nullptr, Consume, rq);
return 0;
}
- 运行结果:
线程池:
含义:
背景:
- 背景:过多线程的调度开销很大,会影响缓冲局部性和整体性能
定义:
-
定义:线程池是一种线程使用模式,一个线程池维护着多个线程,等待着管理者分配可并发执行的任务。
-
线程池大小:可用线程数取决于:
- 并发处理器
- 处理器内核
- 内存
- 网络sockets数
-
优点:避免了在处理短时间任务时创建与销毁线程的代价。
使用场景:
-
应用场景:
-
时间短,线程多:
如Web服务器,单个任务体量小,但是任务数量多,任务完成时间紧
-
性能要求苛刻的应用:
要求服务器迅速响应客户端请求
-
突发性大量客户请求:
短时间内产生大量进程可能使内存到达极限
-
使用原理:
- 图解:
线程池维护的计算器:
Task类:
- 实际工程中的Task类内含有:
- 待处理数据
- 处理函数指针:让每个实例自有自己的处理方法
- 简单的计算器程序暂不使用函数指针,全部类内同一函数处理
- Task类代码:
#pragma once
#include <iostream>
#include <pthread.h>
using namespace std;
typedef int (*handler_t)(int, int, char);
//template <typename T>
class Task{
private:
int x;
int y;
char op;
//handler_t handler;
public:
Task(int _x, int _y, char _op){
x = _x;
y = _y;
op = _op;
}
void Run(){
int z = 0;
switch(op){
case '+':
z = x+y;
break;
case '-':
z = z-y;
break;
case '*':
z = x*y;
break;
case '/':
if(y==0) cerr<<"div zero"<<endl;
else z = x/y;
break;
case '%':
if(y==0) cerr<<"mod zero"<<endl;
else z = x%y;
break;
default:
cerr<<"op error"<<endl;
break;
}
cout<<"thread [ "<<pthread_self()<<" ]:"<<x<<op<<y<<endl;
}
~Task(){
}
}
ThreadPool类:
- 冲突:
- 类内函数自带this指针
- pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg )中的线程处理函数参数只有(void*)
- 解决方案:
- 线程处理函数采用static静态化,避免自带this指针
- 线程处理函数的参数为this指针,便于函数类调用类内函数或类内变量
- ThreadPool类代码:
#pragma once
#include <queue>
#include "Task.hpp"
#define N 3
template <typename T>
class ThreadPool{
private:
queue<Task> task_queue;
int thread_num;
pthread_cond_t cond;
pthread_mutex_t mutex;
private:
void LockQueue(){
pthread_mutex_lock(&mutex);
}
void UnlockQueue(){
pthread_mutex_unlock(&mutex);
}
void Wait(){
pthread_cond_wait(&cond, &mutex);
}
void Wakeup(){
pthread_cond_signal(&cond);
}
void IsEmpty(){
return task_queue.size()==0;
}
public:
ThreadPool(int num=N){
thread_num = num;
pthread_mutex_init(&mutex, nullptr);
pthread_cond_init(&cond, nullptr);
}
void InitThreadPool(){
pthread_t tid;
for(int i=0; i<thread_num; i++)
pthread_create(&tid, nullptr, func, this);
}
}
void Push(const T& in){
LockQueue();
task_queue.push(in);
//取和放都加锁
UnlockQueue();
Wakeup();
}
void Pop(T &out){
out = task_queue.front();
task_queue.pop();
}
static void* func(void *arg){
pthread_detach(pthread_self());
ThreadPool *self = (ThreadPool *)arg;
//一个线程可以在被调度期间连续执行多个任务
while(1){
self->LockQueue();
//if改while防止伪唤醒,唤醒后再确认一下条件,确实是该条件唤醒的则执行while()下一步
while(self->IsQueueEmpty()){
self->Wait();
}
T t;
self->Pop(t);
//一经取出就不在队列中了,其他线程看不到了
self->UnlockQueue();
t.Run();
}
}
~ThreadPool(){
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
}
}
main()函数:
- main()函数产生任务Task,交给ThreadPool解决:
#include "ThreadPool"
#include <cstdlib>
#include <ctime>
int main(){
ThreadPool<Task> *tp = new ThreadPool<Task>();
tp->InitThreadPool(); //核心一步,才创建线程池,一经创建则等待push任务唤醒
srand((usigned long)time(0));
const char *op = "+-*/%";
while(1){
int x = rand()%100+1;
int y = rand()%100+1;
Task t(x, y, op[x%5]);
tp->Push(t);
sleep(1);
}
return 0;
}
结果与反思:
- 运行结果:
- 反思:
- 当前的待处理任务由我们自己生成,且没有搭配处理函数指针,导致任务处理方法单一
- 当我们学习了Linux下的网络编程后,就可以通过网络来为线程池提供多种任务,附加函数指针后让任务处理方法多样
互斥锁/条件变量/信号量对比:
- 最简单的 - 信号量:
- 只有P() V()两种操作
- 内部自带锁
- 内部自带阻塞队列
- 最实用的 - 条件变量:
- 鲜明的独有功能:wait()阻塞 + signal()/boardcast()唤醒
- 必须搭配互斥量mutex使用
- 最核心的 - 互斥锁:
- 通用功能:lock() & unlock()
- 注意不要锁套锁,容易出现死锁问题