当前位置: 首页 > news >正文

Linux-10-线程

线程

  • 前言
  • 线程:
    • 含义:
      • 定义:
      • 与进程区别:
      • 线程共享内容:
      • 线程特有内容:
      • 优缺点:
        • 优点:
        • 用途:
        • 缺点:
        • 异常:
    • 组成:
      • LWP:
      • 虚拟地址空间:
      • 页表:
        • 页表分级:
        • 页目录:
        • 页表项:
        • 偏移量:
    • 使用:
      • pthread_create()创建:
      • pthread_self()获取线程ID:
      • return 终止:
      • pthread_exit()终止:
      • pthread_cancel()取消:
      • pthread_join()等待:
      • pthread_detach()分离:
  • 互斥量:
    • 含义:
      • 临界区:
      • mutex:
    • 使用:
      • 初始化:
        • 静态分配:
        • 动态分配:
      • 加锁:
      • 解锁:
      • 销毁:
        • 销毁静态分配所得:
        • 销毁动态分配所得:
      • 互斥量维护的抢票程序:
    • 原理:
      • mutex的创建:
      • 加锁:
      • 阻塞:
      • 解锁:
      • 总结;
  • 条件变量:
    • 含义:
    • 使用:
      • pthread_cond_init()初始化
      • pthread_cond_destroy()销毁:
      • pthread_cond_wait()阻塞:
      • pthread_cond_signal()单个唤醒:
      • pthread_cond_broadcast()全部唤醒:
      • 实例:
    • 生产者消费者模型:
      • 定义:
      • 阻塞队列:
  • 信号量:
    • 定义:
      • 本质:
      • PV操作:
      • 信号量与互斥锁:
      • 模拟信号量:
    • 使用:
      • sem_init()初始化:
      • sem_wait()等待:
      • sem_post()发布:
      • sem_destroy()销毁:
      • 信号量维护的抢票程序:
    • 环形队列:
      • 定义:
      • 实例:
  • 线程池:
    • 含义:
      • 背景:
      • 定义:
      • 使用场景:
      • 使用原理:
    • 线程池维护的计算器:
      • Task类:
      • ThreadPool类:
      • main()函数:
      • 结果与反思:
  • 互斥锁/条件变量/信号量对比:

前言

Vue框架:Vue驾校-从项目学Vue-1
算法系列博客友链:神机百炼

线程:

含义:

定义:

  • 线程的定义:

    一个程序中的一个执行路线

与进程区别:

  • 线程和进程的区别:

    1. 线程是进程内部的控制序列,一个进程内部至少含有一个线程
    2. 线程是作业处理CPU调度的基本单位
    3. 进程是资源申请分配访问控制的基本单位

线程共享内容:

  • 线程共享进程的内容:

    1. 文件描述符表
    2. 信号处理方式
    3. 当前工作目录
    4. 用户id与组id

线程特有内容:

  • 每个线程特有的内容:
    1. 线程ID
    2. 栈(每个线程创建自己的临时变量)
    3. 上下文寄存器(进程切换时保存当前内容)
    4. 调度优先级(PRI和NICE)
    5. 信号屏蔽字(毕竟可以通过sigprocmask()设定)
    6. errno

优缺点:

优点:

  1. 线程占用的资源比进程少很多
  2. 创建线程的代价比进程创建小很多
  3. 线程切换的消耗比进程切换小很多
  4. 多线程可以充分利用多处理器的可并行数量
  5. 对于计算密集型任务,可分解到多个线程中计算
  6. 对于IO密集型任务,将IO操作重叠,线程可以等待不同的IO操作
  7. 等待慢速IO操作结束同时,程序可执行其他计算任务

用途:

  1. 合理使用多线程,可以提升CPU计算密集型程序执行效率
  2. 合理使用多线程,可以提升IO密集型程序的用户体验(边播放边下载)

缺点:

  1. 性能损失:计算密集型线程很少和其他线程共享同一处理器,增加了其他线程调度和同步的开销
  2. 健壮性降低:多线程程序中,某一线程的微小失误造成的不良影响可能巨大
  3. 缺乏访问控制:进程是访问控制的基本粒度,线程中调用OS函数会对整个进程造成影响
  4. 编程难度提升:编写与调试一个多线程程序比单线程程序困难得多

异常:

  1. 线程是进程的执行分支,线程异常则进程异常,触发信号后进程及其内所有线程都终止
  2. 如单个线程出现除零/野指针问题,单个线程崩溃,则进程随之崩溃

组成:

LWP:

  • PCB和LWP的区别:
    1. PCB:进程的管理工具
    2. LWP:线程的管理工具
    3. 当一个进程下只有一个线程时,可以说LWP就是PCB
  • task_struct:
    1. 每个线程创建时,随之创建专属task_struct
      (原讲述进程时,将task_struct描述为随进程创建是由于当时一个进程下只有一个线程)
    2. 对于CPU而言,只能看到和调度每个线程的task_struct,符合线程是任务处理基本单位的设定

虚拟地址空间:

  • 线程虚拟地址空间:
    1. mm_struct是进程申请的虚拟地址空间
    2. task_struct是每个线程都特有的描述组织工具
    3. 进程地址空间中除栈内容为每个线程特有外,其他内容为所有线程共享
      线程地址空间

页表:

  • 虚拟地址到物理地址的映射硬件:MMU(Memory Manger Unite)
  • 虚拟地址到物理地址的映射软件:页表

页表分级:

  • 页表分级存储的必要性:

    1. 32位平台下的地址数:2^32
    2. 32位平台下一个地址大小:4Byte
    3. 一张页表映射全地址,则页表粗略大小:2^35字节 == 32GB
    4. 32位平台下内存大小:4GB << 32GB
  • 页表分级存储的示意图:
    页表分级存储

页目录:

  • 页目录:针对地址前10位做区分
  • 映射关系:通过虚拟地址前十位,初步确定到哪个页表项中查物理地址
  • 页目录大小:
    1. 前十位地址数目:2^10
    2. 32位平台下一个地址大小:4Byte
    3. 页目录粗略大小:2^13 Byte == 8KB

页表项:

  • 页框:内存/硬盘中数据的存储以4KB为一个单位,称为一个页框。页框地址就是页框中第一个存储空间中的首地址。
  • 页表项:仅针对地址11~20位做区分
  • 映射关系:通过虚拟地址中间十位,再确定到哪个页框中查物理地址
  • 页表项大小:
    1. 中间十位地址数目:2^10
    2. 32位平台下一个地址大小:4Byte
    3. 每个页表项粗略大小:2^13 Byte == 8KB

偏移量:

  • 偏移量:虚拟地址的最后12位
  • 映射关系:通过虚拟地址的最后12位,加页框首地址,形成最终物理地址

使用:

pthread_create()创建:

  • pthread_create():

    #include <pthread.h>
    pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg);
    
  • 参数:

    1. *thread:输出型参数,返回系统层线程地址ID
    2. attr:线程属性,为NULL则为默认属性
    3. start_routine:线程将执行的函数地址
    4. arg:线程将执行的函数的参数
  • 返回值:

    1. 创建线程成功:0
    2. 创建线程失败:具体错误码,而非-1
  • 实例代码:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <error.h>
#include <pthread.h>
void *func(void *arg){
	while(1){
        printf("child thread running\n");
        sleep(1);
    }
}
int main(){
    pthread_t tid;
    int ret;
    if((ret = pthread_create(&tid, NULL, func, NULL)) != 0){
		perror("pthread_create error\n");
        exit(ret);
    }
    while(1){
        printf("main thread running\n");
        sleep(1);
    }
	return 0;
}
  • 运行结果:
#编译指令
gcc -o pthread_create pthread_create.c -lpthread

pthread_create()

pthread_self()获取线程ID:

  • 前文提及了两次线程ID:

    1. 线程创建后含有的专属ID:进程调度层面的编号
    2. pthread_create()的第一个参数:开辟内存层面的地址
  • pthread_t类型的线程ID存储位置:
    线程id存储位置

return 终止:

  • 线程终止:只终止线程而不终止进程

  • 线程终止三大方法:

    1. 线程函数return终止自己。

      对主线程不适用,主线程return相当于exit()

    2. 线程调用pthread_exit()终止自己

    3. 调用pthread_cancel()终止另一进程

  • return:

void *func(void *arg){
    int *p = (int*)malloc(sizeof(int));
    *p = 1;
    return (void*)p;
}
  • 参数:

    1. 由于线程函数类型为void*,所以需要malloc()出来空间后强制类型转化
  • 实例代码:

#include <stdio.h>
#include <error.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
void *func(void *arg){
    printf("child thread ready to return\n");
    int *p = (int*)malloc(sizeof(int));
    *p = 1;
    return (void*)p;
}
int main(){
    int ret = 0;
    pthread_t tid;
    if((ret = pthread_create()){
		perror("pthread_create error");
        return -1;
    }
    printf("main thread ready to return\n");
    while(1);
    return 0;
}
  • 运行结果:
    pthread return

pthread_exit()终止:

  • pthread_exit():

    #include <pthread.h>
    void pthread_exit(void *value_ptr);
    
  • 作用:终止调用该函数的线程

  • 参数:

    1. value_ptr不可指向局部变量,只能指向堆上malloc所得内容
    2. malloc所得空间指针最终得强转为(void *)
  • 返回值:由于调用成功则线程结束,所以该函数无返回值

  • 实例代码:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <error.h>
#include <pthread.h>
void *func(void *arg){
    printf("child thread ready to exit\n");
    int *p = (int *)malloc(sizeof(int))
    *p = 1;    
    pthread_exit((void*)p);
}
int main(){
    int ret = 0;
    pthread_t tid;
    if((ret = pthread_create(&tid, NULL, func, NULL)) != 0){
        perror("pthread_create error");
        return -1;
    }
    printf("main thread ready to return\n");
    while(1);
    return 0;
}
  • 运行结果:
    pthread_exit()

pthread_cancel()取消:

  • pthread_cancel():

    #include <pthread.h>
    int pthread_cancel(pthread_t thread);
    
  • 作用:终止以参数为线程id的线程

  • 参数:线程id

  • 返回值:

    1. 终止目标线程成功:返回0
    2. 终止目标线程失败:错误码
  • 实例代码:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <error.h>
#include <>
void *func(){
    while(1){
        printf("child thread is running\n");
        sleep(1);
    }
    return NULL;
}
int main(){
    int ret = 0;
    pthread_t tid;
    if((ret = pthread_create(&tid, NULL, func, NULL)) != 0){
		perror("pthread_create error");
        return -1;
    }
    sleep(5);
    pthread_cancel(tid);
    return 0;
}
  • 运行结果:
    pthread_cancel

pthread_join()等待:

  • 为什么需要线程等待?

    1. 已退出的线程空间未被释放,还保留在进程地址空间内
    2. 新创建的线程不会复用未释放的线程空间
  • pthread_join():

    #include <pthread>
    int pthread_join(pthread_t thread, void **value_ptr);
    
  • 作用:等待thread号线程结束

  • 参数:

    1. thread:线程ID
    2. value_ptr:输出型参数,存储已终止线程的退出码
  • 返回值:

    1. 等待成功:0
    2. 等待失败:错误码
  • 线程等待的终止状态:

    根据线程终止的条件不同,线程等待得到的终止状态不同

    1. 线程通过return返回,value_ptr所指单元存放返回值
    2. 线程通过pthread_exit()终止,value_ptr所指单元存放pthread_exit()内参数值
    3. 线程被pthread_cancle(),value_ptr所指单元存放常数PTHREAD_CANCELED
  • 进程等待的时间线:
    pthrea_join()时间线

  • 实例代码:

#include <stdio.h>
#include <pthread.h>
#include <string.h>
#include <stdlib.h>
#include <error.h>
#include <unistd.h>
void *thread1(void *arg){
    printf("thread1 return\n");
    int *p = (int*)malloc(sizeof(int));
    *p = 1;
   return (void*)p; 
  }
void *thread2(void *arg){
    printf("thread2 pthread_exit\n");
    int *p = (int*) malloc(sizeof(int));
    *p = 2;
pthread_exit((void*)p);
  }
void *thread3(void *arg){
    while(1){
      printf("thread3 pthread_cancled\n");
      sleep(1);
    }
    return NULL;
  }
int main(){
    pthread_t tid;
    void* ret;
  
    //thread1:return 
    pthread_create(&tid, NULL, thread1, NULL);
      pthread_join(tid, &ret);
    printf("thread1 return %d\n", *(int*)ret);
    free(ret);
  
    //thread2:pthread_exit 
    pthread_create(&tid, NULL, thread2, NULL);
    pthread_join(tid, &ret);
    printf("thread2 pthread_exit(%d)\n", *(int*)ret); 
    free(ret);
  
    //thread3:pthread_cancel
    pthread_create(&tid, NULL, thread3, NULL);
    sleep(3);
    pthread_cancel(tid);
    pthread_join(tid, &ret);
if(ret == PTHREAD_CANCELED){
      printf("thread pthread_canceled PTHREAD_CANCELED\n");           
    }else{
      printf("thread isn't pthread_canceled\n");
    }
    return 0;
}
  • 运行结果:
    pthread_join()

pthread_detach()分离:

  • 进程分离:

    1. 默认情况下,新创建的线程是joinable的,线程退出之后,需要对其等待,否则无法释放资源,造成系统泄漏
    2. 不关心线程的返回值时,pthread_join()是一种负担,这时可以通知系统该线程退出时,自动释放线程资源即可
  • 矛盾:线程一旦分离,则不可再等待该进程退出了

  • pthread_detach():

    #include <pthread.h>
    int pthread_detach(pthread_t thread);
    
  • 参数:

    1. thread为线程的内存层面的id
  • 实例代码:

#include <stdio.h>
#include <pthread.h>
#include <error.h>
#include <unistd.h>
void *func(void *arg){    pthread_detach(pthread_self());
    printf("%s\n", (char*)arg);
    return NULL;    
}    
int main(){    
    int ret = 0;        
    pthread_t tid;
  	if((ret = pthread_create(&tid, NULL, func, "child thread detach...")) != 0){ 	perror("pthread_create error...");    
      return -1;
    }
     sleep(3);   //确保线程分离
  if(pthread_join(tid, NULL) == 0){ 
      printf("pthread_join wait success\n");
    }else{
      printf("pthread_join wait failed\n");
    }
    return 0;
  }
  • 运行结果:
    pthread_detach()

互斥量:

含义:

临界区:

  • 临界资源:多线程执行流共享的资源
  • 临界区:每个线程内部访问临界资源的代码
  • 互斥:任何时刻,保证有且只有一个执行流进入临界区访问临界资源
  • 原子性:不会被任何调度机制打断的操作,该操作只有两态:完成&未开始

mutex:

  • 线程中的变量:

    1. 局部变量:

      变量的地址空间在线程栈空间内,变量归属单个线程,其他线程无法获得该变量

    2. 共享变量:

      多个进程都可以访问到的变量

  • 互斥量mutex:

    1. 本质:为线程加的一把锁

    2. 作用:

      1. 线程互斥:当一个线程进入临界区执行时,不允许其他线程进入该临界区
      2. 单一线程:当临界区没有线程在执行时,只能允许一个线程进入临界区执行
      3. 阻止线程:如果线程不在临界区运行,则该线程不能阻止其他线程进入临界区
  1. 加锁后的临界区示意图:
    pthread_mutex_lock() unlock()

使用:

初始化:

静态分配:

  • PTHREAD_MUTEX_INITIALIZER:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

动态分配:

  • pthread_mutex_init():
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
  • 参数:
    1. mutex:要初始化的互斥量
    2. attr:NULL即可

加锁:

  • 加锁可能遇到的情况:

    1. 互斥量处于未锁时,加锁函数将锁定该互斥量,同时返回成功
    2. 互斥量已经被锁时,加锁函数将陷入阻塞等待,当互斥量解锁后立马为其加锁
  • pthread_lock():

#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
  • 参数:要加锁的互斥量

  • 返回值:

    1. 加锁成功:返回0
    2. 加锁失败:返回错误信号

解锁:

  • pthread_mutex_unlock():

    #include <pthread.h>
    int pthread_mutex_unlock(pthread_mutex_t *mutex);
    
  • 参数:要解锁的互斥量

  • 返回值:

    1. 加锁成功:返回0
    2. 加锁失败:返回错误信号

销毁:

销毁静态分配所得:

  • PTHREAD_MUTEX_INITIALIZER不需要手动销毁

销毁动态分配所得:

  • 销毁动态分配所得的互斥量前确定:

    1. 该互斥量未加锁
    2. 该互斥量后续不会有线程尝试加锁
  • pthread_mutex_destroy():

#include <pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex);
  • 参数:要销毁的互斥量

互斥量维护的抢票程序:

  • 实例代码:判断ticket==0前就得加锁
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int ticket = 100;
pthread_mutex_t mutex;
void *func(void *arg){
    char* id = (char*) arg;
    while(1){
        pthread_mutex_lock(&mutex);
        if(ticket > 0){
            usleep(1000);
            printf("%s sells ticket:%d\n", id, ticket);
        	ticket--;
            pthread_mutex_unlock(&mutex);
        }else{
			pthread_mutex_unlock(&mutex);
            break;
        }
    }
}
int main(){
    pthread_t t1, t2, t3, t4;
    pthread_mutex_init(&mutex, NULL);
    pthread_create(&t1, NULL, func, "thread 1");
    pthread_create(&t2, NULL, func, "thread 2");
    pthread_create(&t3, NULL, func, "thread 3");
    pthread_create(&t4, NULL, func, "thread 4");
    
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
    
    pthread_mutex_destory(&mutex);
}
  • 运行结果:
    mutex下抢票

原理:

  • 前提:
    1. 所有线程共享mutex变量
    2. 每个线程在cpu内独有自己的al寄存器

mutex的创建:

  • 共享变量mutex:

    一经创建和初始化则内存中的mutex值为1
    mutex初始化

加锁:

  • pthread_mutex_lock():

    加锁含有两步操作和一步判断:

    1. 将该线程拥有的CPU内与锁有关的al寄存器内值置为0
      %al = 0

    2. CPU原子交换值操作:swap()/exchange()
      swap()/exchange()

    3. 判断共享变量mutex值是否为0,若为0则线程阻塞,若非0则线程进入执行临界区代码

阻塞:

  • 锁阻塞:

    当一个线程已经将互斥锁持有,此时另一线程尝试取地锁时,该线程也执行了pthread_mutex_lock()的三大步骤,且是否持有锁查看的是最后的判断步骤

    1. 该线程在cpu内拥有的al寄存器内值置为0
    2. al寄存器值和mutex值发送swap()/exchange()
    3. 判断mutex值是否为1,若为1则成功取得锁,若为0则继续阻塞

解锁:

  • pthread_mutex_unlock():

    1. 解锁本质只有一步操作:将mutex值置为1
      pthread_unlock()操作

总结;

  • 可以将共享变量mutex值置为1的操作:
    1. PTHREAD_MUTEX_INITIALIZER
    2. pthread_init()
  • pthread_create()三大操作:
    1. 线程独有的al寄存器值化0
    2. al寄存器和mutex值交换
    3. 判断mutex值是否为1
  • mutex值本质像是一个消耗品:
    1. 初始化/解锁时为mutex提供一个1
    2. 一个线程通过加锁彻底消耗mutex的1

条件变量:

含义:

  • 适用情况:

    一个线程等待"条件变量的条件成立"而挂起;

    另一个线程使"条件成立"。 为了防止竞争,

  • 同步:保证数据安全前提下,让线程能够按照某种特定顺序访问临界资源,从而避免饥饿问题

  • 竞争条件:由于时序问题导致程序异常

  • 定义:在多线程程序中用来实现"等待 -> 唤醒"逻辑常用的方法。

  • 搭配使用:pthread_mutex_t,即条件变量的使用总是和一个互斥锁结合在一起。

使用:

pthread_cond_init()初始化

  • pthread_cond_init():

    int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
    
  • 作用:初始化条件变量cond

  • 参数:

    1. cond:要初始化的条件变量
    2. attr:属性为NULL即默认

pthread_cond_destroy()销毁:

  • pthread_cond_destroy():

    int pthread_cond_destroy(pthread_cond_t *cond);
    
  • 作用:销毁条件变量cond

  • 参数:

    1. 要销毁的条件变量

pthread_cond_wait()阻塞:

  • pthread_cond_wait():

    int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
    
  • 作用:暂时释放当前线程持有的锁后,将调用该函数的线程阻塞等待

  • 参数:

    1. 条件变量cond:用于监听条件变量是否满足
    2. 互斥量mutex:
      1. 进入等待时,该锁会自动释放,让其他线程获取锁而继续执行
      2. 如果当前等待线程被唤醒,又会自动获得对应的mutex互斥锁

pthread_cond_signal()单个唤醒:

  • pthread_cond_signal():

    int pthread_cond_signal(pthread_cond_t *cond)
    
  • 作用:通过修改条件变量cond值,使得一个线程摆脱阻塞

  • 参数:

    1. 条件变量cond

pthread_cond_broadcast()全部唤醒:

  • pthread_cond_broadcast():

    int pthread_cond_broadcast(pthread_cond_t *cond);
    
  • 作用:通过修改条件变量cond值,使得所有线程摆脱阻塞

  • 参数:

    1. 条件变量cond

实例:

  • 实例代码:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
pthread_mutex_t mutex;
pthread_cond_t cond;
void *run(void *arg){
    pthread_detach(pthread_self());
    while(1){
        pthread_wait();
        cout<<"thread "<<pthread_self()<<" 摆脱条件变量阻塞"<<endl;
        sleep(1);
    }
    pthread_exit((void*)0);
}
int main(){
    pthread_mutex_init(&mutex, nullptr);
    pthread_cond_init(&cond, nullptr);
    pthread_t t1, t2, t3;
    pthread_create(&t1, nullptr, run, nullptr);
    pthread_create(&t2 nullptr, run, nullptr);
    pthread_create(&t3, nullptr, run, nullptr);
    while(1){
        getchar();
        pthread_cond_signal(&cond);
    }
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    return 0;
}
  • 运行结果:
    getchar()自带阻塞

生产者消费者模型:

定义:

  • 定义:生产者和消费者不直接通讯,而是通过阻塞队列来进行通信。生产者生产完数据之后不等待消费者获取,而是直接存储到阻塞队列中,消费者需要数据时不向生产者索取,而是直接从阻塞队列中获取。
  • 优点:
    1. 阻塞队列相当于一个缓冲区,平衡了生产者和消费者的处理能力,将两者解耦
    2. 支持生产者消费者忙闲不均
    3. 支持多生产者和多消费者并发
  • 同步和异步:
    1. 异步:当两件事没有先后或资源上的依赖关系时,先处理哪一件对于另一件都是阻碍
    2. 同步:当两件事有先后或资源上的依赖关系时,先处理第一件对于第二件来说属于铺垫或帮助
  • 图示:
    生产者消费者模型原理图

阻塞队列:

  • 条件变量实现阻塞队列代码:
#include <iostream>
#include <pthread.h>
#include <cstdlib>
#include <ctime>
#include <queue>
#include <unistd.h>
#define NUM 32
using namespace std;
template<typename T>
class BlockQueue{
  private:
    bool IsFUll(){
      return q.size() == cap;
    }
    bool IsEmpty(){
      return q.size() == 0;
    }
  public:
    BlockQueue(int _cap = NUM){
      cap = _cap;
      pthread_mutex_init(&mutex, nullptr);
      pthread_cond_init(&full, nullptr);
      pthread_cond_init(&empty, nullptr);

    }
    void Push(const T& in){
      pthread_mutex_lock(&mutex);
      if(IsFUll()){
        //队列满则阻塞等待
        pthread_cond_wait(&full, &mutex);

      } 
      q.push(in);
      pthread_mutex_unlock(&mutex);
      //生产后队列不为空,供消费者消费:
      pthread_cond_signal(&empty);

    }
    void Pop(T& out){
      pthread_mutex_lock(&mutex);
      if(IsEmpty()){
        //队列为空则阻塞等待
        pthread_cond_wait(&empty, &mutex);

      }
      out = q.front();
      q.pop();
      pthread_mutex_unlock(&mutex);
      //消费后队列不满,供生产者生产
      pthread_cond_signal(&full);
    }
    ~BlockQueue(){
      pthread_mutex_destroy(&mutex);
      pthread_cond_destroy(&full);
      pthread_cond_destroy(&empty);
    }
    
  private:
    std::queue<T> q;  //临界资源
    pthread_mutex_t mutex;  //锁
    pthread_cond_t full, empty;
    int cap;
};
void *Producter(void *arg){
  auto bq = (BlockQueue<int> *)arg;
  while(1){
    int data = rand()%100 + 1;
    bq->Push(data);
    sleep(1);
  }
}
void *Consumer(void *arg){
  auto bq = (BlockQueue<int> *)arg;
  while(1){
    int data = 0;
    bq->Pop(data);
    cout<<"consumer: "<<data<<endl;
    sleep(1);
  }
}
int main(){
  BlockQueue<int> *bq = new BlockQueue<int>();
  srand((unsigned long)time(NULL));
  pthread_t p, c;
  pthread_create(&p, nullptr, Consumer, bq);
  pthread_create(&c, nullptr, Producter, bq);
  pthread_join(p, nullptr);
  pthread_join(c, nullptr);
  return 0;
}
  • 运行结果:
    阻塞队列
  • 生产者消费者交替出现的原因分析:
    1. 第一个执行的必然是product线程:
      1. 不论开始是product线程还是consumer线程先被调度,队列都为空
      2. product生产一个元素结束后,释放锁
    2. 第二个执行的必然是consumer线程:
      1. 此时consumer若在等待,则被pthread_cond_signal()唤醒
      2. 此时consumer若还未被调度,则由于product的sleep(1)的时间间隔而被调度
    3. 第三个执行的必然是product线程:
      1. consumer消耗元素后,队列为空
      2. 理论上此时调度consumer线程,consumer陷入等待
      3. 事实上由于consumer的sleep(1),product线程必然被调度
  • 防止两者较替出现的代码措施:
    1. 修改各自的sleep()时间
    2. 修改唤醒full / empty的条件

信号量:

定义:

本质:

  • 信号量的本质:

    1. 核心是一个计数器,描述临界资源中的资源数目

    2. 功能实现依赖mutex互斥锁

    3. 维护着一个阻塞队列,实现阻塞线程之间相对有序

    4. 本身即是一个临界资源:多个线程都可以看到和操作该资源

    5. 伪代码:

      struct {
      	ptread_mutex_t mutex;	//以锁维护原子性
          int count;	//核心计数器
          task_struct *queue;	//阻塞队列
      };
      

PV操作:

  • P操作:

    1. 申请信号量

    2. 本质是计数器- -,是依赖锁实现的原子操作

    3. 当P操作申请不到资源时,一定是资源暂时为空

      此时线程陷入阻塞,进入信号量自带的阻塞队列

      当P操作申请到资源时,获得的是资源的使用权,而不是已经在使用资源

  • V操作:

    1. 释放信号量
    2. 本质是计数器++,是依赖锁实现的原子操作

信号量与互斥锁:

  • 信号量与互斥锁的关联:
    1. 当信号量的sem_init()中参数value==1时,信号量基本等价于线程略有序版的互斥锁
    2. 信号量本身的PV操作原子性是由锁来维护的
  • 信号量与互斥锁的区别:
    1. 对于互斥锁:所有线程同时争夺,每次争夺没有优先级
    2. 对于信号量:由于阻塞队列的存在,未争夺成功的线程已经有序排队
  • 信号量与条件变量的使用差别:
    1. 条件变量的pthread_cond_wait()方法中还需要pthread_mutex_t参数
    2. 信号量的sem_wait()和sem_post()方法中不需要pthread_mutex_t参数,毕竟sem_t内部自带mutex

模拟信号量:

  • 问:自定义临界全局变量充当信号量是否可行?

    答:

    1. 全局变量的++ / --并非原子操作
    2. 但是可以通过加锁来模拟系统提供的信号量
    3. 也通过维护线程队列模拟信号量阻塞

使用:

sem_init()初始化:

  • sem_init():

    #include <semaphore.h>
    int sem_init(sem_t *sem, int pshared, unsigned int value);
    
  • 参数:

    1. sem:信号量本身
    2. pshared:为0表示线程间共享,非0表示进程间共享
    3. value:信号量计数器初始值,不论初始值多少,当count==0时才阻塞

sem_wait()等待:

  • sem_wait():

    #include <semaphore.h>
    int sem_wait(sem_t *sem);
    
  • 作用:P操作,信号量值-1

sem_post()发布:

  • sem_post():

    #include <semaphore.h>
    int sem_post(sem_t *sem);
    
  • 作用:V操作,信号量值+1

sem_destroy()销毁:

  • sem_destory():

    #include <semaphore.h>
    int sem_destroy(sem_t *sem);
    

信号量维护的抢票程序:

  • 利用c++核心的类来实现信号量抢票,其实也可以设立全局sem
  • 实例代码:
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
using namespace std;
class Sem{
  private:
    sem_t sem;
  public:
    Sem(int num = 1){
      sem_init(&sem, 0, num);
    }
    void P(){
      sem_wait(&sem);
    }
    void V(){
      sem_post(&sem);
    }
    ~Sem(){
      sem_destroy(&sem);
    }
};
Sem sem(1);
int ticket = 2000;
void *GetTickets(void *arg){
  string id = (char *)arg;
  sleep(1);
  while(1){
    sem.P();
    if(ticket > 0){
      usleep(10000);
      cout<<id <<" take ticket:"<<ticket--<<endl;
      sem.V();
    }else{
      sem.V();
      break;
    }
  }
  cout<<id<<" quit"<<endl;
  pthread_exit((void*)0);
}
int main(){
	pthread_t tid1, tid2, tid3;
    pthread_create(&tid1, nullptr,GetTickets, "thread 1");
    pthread_create(&tid2, nullptr, GetTickets, "thread 2");
    pthread_create(&tid3, nullptr, GetTickets, "thread 3");
    
    pthread_join(tid1, nullptr):
    pthread_join(tid2, nullptr);
    pthread_join(tid3, nullptr);
    return 0;
}
  • 运行效果:
    sem_wait() sem_post()

环形队列:

定义:

  • 在基于队列的生产者消费者模型中,我们发现:

    1. 生产者:关注队列内空间剩余量
    2. 消费者:关注队列内含有的数据量
  • 环形队列生产消费原则:

    1. 生产和消费不能指向同一位置
    2. 无论生产还是消费,都不应该将对方套一圈以上

    以上两条原则都由信号量来维护

实例:

  • 实例代码:P()操作申请不到资源,自动阻塞
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#define NUM 5
using namespace std;
template <typename T>
class RingQueue{
    private:
   		int cap;
    	vector<T> q;
    	sem_t blank, data;
    	int p_pos, c_pos;
    private:
    void P(sem_t &sem){
        sem_wait(&sem);
    }
    void V(sem_t &sem){
		sem_post(&sem);
    }
   	public:
    RingQueue(int _cap = NUM){
        cap = _cap;
        q.resize(cap);
        sem_init(&blank, 0, cap);
        sem_init(&data, 0, 0);
        p_pos = c_pos = 0;
    }
    void Push(const T& in){
        P(blank);
        q[p_pos++] = in;
        V(data);
        p_pos %= cap;
    }
    void Pop(T& out){
        P(data);
        out = q[c_pos++];
        V(blank);
        c_pos %= cap;
    }
    ~RingQueue(){
        sem_destroy(&blank);
        sem_destroy(&data);
    }
}
void *product(void *arg){
   RingQueue<int> *rq = (RingQueue<int>*) arg;
       pthread_detach(pthread_self());
    while(1){
        int x = rand()%100 + 1;
        rq->Push(x);
        cout<<"Productor <<< "<<x<<endl;
        sleep(1);
    }
}
void *consume(void *arg){
   RingQueue<int> *rq = 
(RingQueue<int>*) arg;      pthread_detach(pthread_self());
    while(1){
        int x = 0;
        rq->Pop(x);
        cout<<"Consumer <<< "<<x<<endl;
        sleep(1);
    }
}
int main(){
    srand((unsigned long) time(0));
    RingQueue<int> *rq = new RingQueue<int>();
    pthread_t p, c;
    pthread_create(&p, nullptr, Product, rq);
    pthread_create(&c, nullptr, Consume, rq);
    return 0;
}
  • 运行结果:
    sem_t

线程池:

含义:

背景:

  • 背景:过多线程的调度开销很大,会影响缓冲局部性和整体性能

定义:

  • 定义:线程池是一种线程使用模式,一个线程池维护着多个线程,等待着管理者分配可并发执行的任务

  • 线程池大小:可用线程数取决于:

    1. 并发处理器
    2. 处理器内核
    3. 内存
    4. 网络sockets数
  • 优点:避免了在处理短时间任务时创建与销毁线程的代价

使用场景:

  • 应用场景:

    1. 时间短,线程多:

      如Web服务器,单个任务体量小,但是任务数量多,任务完成时间紧

    2. 性能要求苛刻的应用:

      要求服务器迅速响应客户端请求

    3. 突发性大量客户请求:

      短时间内产生大量进程可能使内存到达极限

使用原理:

  • 图解:
    线程池使用原理示意图

线程池维护的计算器:

Task类:

  • 实际工程中的Task类内含有:
    1. 待处理数据
    2. 处理函数指针:让每个实例自有自己的处理方法
  • 简单的计算器程序暂不使用函数指针,全部类内同一函数处理
  • Task类代码:
#pragma once
#include <iostream>
#include <pthread.h>
using namespace std;
typedef int (*handler_t)(int, int, char);
//template <typename T>
class Task{
    private:
    	int x;
    	int y;
    	char op;
    	//handler_t handler;
    public:
    	Task(int _x, int _y, char _op){
			x = _x;
            y = _y;
            op = _op;
        }
    	void Run(){
            int z = 0;
            switch(op){
                case '+':
                    z = x+y;
                    break;
                case '-':
                    z = z-y;
                    break;
                case '*':
                    z = x*y;
                   	break;
                case '/':
                    if(y==0) cerr<<"div zero"<<endl;
                    else z = x/y;
                    break;
                case '%':
                    if(y==0) cerr<<"mod zero"<<endl;
                    else z = x%y;
                    break;
                default:
                    cerr<<"op error"<<endl;
                    break;
            }
            cout<<"thread [ "<<pthread_self()<<" ]:"<<x<<op<<y<<endl;
        }
    	
    	~Task(){
        }
}

ThreadPool类:

  • 冲突:
    1. 类内函数自带this指针
    2. pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg )中的线程处理函数参数只有(void*)
  • 解决方案:
    1. 线程处理函数采用static静态化,避免自带this指针
    2. 线程处理函数的参数为this指针,便于函数类调用类内函数或类内变量
  • ThreadPool类代码:
#pragma once
#include <queue>
#include "Task.hpp"
#define N 3
template <typename T>
class ThreadPool{
    private:
    	queue<Task> task_queue;
    	int thread_num;
    	pthread_cond_t cond;
    	pthread_mutex_t mutex;
    private:
    	void LockQueue(){
            pthread_mutex_lock(&mutex);
        }
    	void UnlockQueue(){
            pthread_mutex_unlock(&mutex);
        }
    	void Wait(){
            pthread_cond_wait(&cond, &mutex);
        }
    	void Wakeup(){
            pthread_cond_signal(&cond);
        }
    	void IsEmpty(){
			return task_queue.size()==0;
        }
    public:
		ThreadPool(int num=N){
            thread_num = num;
			 pthread_mutex_init(&mutex, nullptr);
            pthread_cond_init(&cond, nullptr);
        }
    	void InitThreadPool(){
            pthread_t tid;
            for(int i=0; i<thread_num; i++)
                pthread_create(&tid, nullptr, func, this);
            }
        }
		void Push(const T& in){
            LockQueue();
            task_queue.push(in);	
            //取和放都加锁
            UnlockQueue();
            Wakeup();
        }
		void Pop(T &out){
            out = task_queue.front();
            task_queue.pop();
        }
		static void* func(void *arg){
            pthread_detach(pthread_self());
            ThreadPool *self = (ThreadPool *)arg;
            //一个线程可以在被调度期间连续执行多个任务
            while(1){
                self->LockQueue();
                //if改while防止伪唤醒,唤醒后再确认一下条件,确实是该条件唤醒的则执行while()下一步
                while(self->IsQueueEmpty()){
                 	self->Wait();   
                }
                T t;
         		self->Pop(t);
                //一经取出就不在队列中了,其他线程看不到了
                self->UnlockQueue();
                t.Run();
            }
        }
    	~ThreadPool(){
        	   pthread_mutex_destroy(&mutex);
        	   pthread_cond_destroy(&cond);
        }
}

main()函数:

  • main()函数产生任务Task,交给ThreadPool解决:
#include "ThreadPool"
#include <cstdlib>
#include <ctime>
int main(){
    ThreadPool<Task> *tp = new ThreadPool<Task>();
    tp->InitThreadPool();	//核心一步,才创建线程池,一经创建则等待push任务唤醒
    
    srand((usigned long)time(0));
    const char *op = "+-*/%";
    while(1){
        int x = rand()%100+1;
    	int y = rand()%100+1;
    	Task t(x, y, op[x%5]);
        tp->Push(t);
        sleep(1);
    }
    
    return 0;
}

结果与反思:

  • 运行结果:
    ThreadPool
  • 反思:
    1. 当前的待处理任务由我们自己生成,且没有搭配处理函数指针,导致任务处理方法单一
    2. 当我们学习了Linux下的网络编程后,就可以通过网络来为线程池提供多种任务,附加函数指针后让任务处理方法多样

互斥锁/条件变量/信号量对比:

  • 最简单的 - 信号量:
    1. 只有P() V()两种操作
    2. 内部自带锁
    3. 内部自带阻塞队列
  • 最实用的 - 条件变量:
    1. 鲜明的独有功能:wait()阻塞 + signal()/boardcast()唤醒
    2. 必须搭配互斥量mutex使用
  • 最核心的 - 互斥锁:
    1. 通用功能:lock() & unlock()
    2. 注意不要锁套锁,容易出现死锁问题

相关文章:

  • BP神经网络算法基本原理,bp神经网络算法的优点
  • 模块加载机制(require)--内置、第三方、自定义、文件夹
  • js分组匹配、遍历结果
  • shell脚本学习笔记2
  • STM32-串口通信波特率计算以及寄存器的配置详解
  • 物联网开发笔记(5)- 使用Wokwi仿真树莓派Pico实现LED灯交替闪烁(续)
  • 洛谷 P7302 [NOI1998] 免费的馅饼
  • Docker基础-2.常用命令与Docker镜像
  • Java的Lambda表达式学习笔记:认识lambda表达式
  • SAP Spartacus 项目开发时需要注意的一些常见错误
  • SpringBoot - @JsonIgnore和@JsonIgnoreProperties注解详解以及区别
  • 神经网络概念图片大全,神经网络概念图片解析
  • 产业园区构建公共服务平台应当包含哪些服务
  • 链动2+1模式是如何驱动品牌全面爆发的?
  • DMSA(Distributed multi-scenario analysis)
  • 0基础学习移动端适配
  • 2018一半小结一波
  • ECS应用管理最佳实践
  • iOS动画编程-View动画[ 1 ] 基础View动画
  • node-sass 安装卡在 node scripts/install.js 解决办法
  • React 快速上手 - 06 容器组件、展示组件、操作组件
  • vuex 学习笔记 01
  • 闭包--闭包作用之保存(一)
  • 测试开发系类之接口自动化测试
  • 程序员最讨厌的9句话,你可有补充?
  • 浮动相关
  • 技术:超级实用的电脑小技巧
  • 解析 Webpack中import、require、按需加载的执行过程
  • 前端路由实现-history
  • 数据库写操作弃用“SELECT ... FOR UPDATE”解决方案
  • 我有几个粽子,和一个故事
  • 用Visual Studio开发以太坊智能合约
  • 与 ConTeXt MkIV 官方文档的接驳
  • - 转 Ext2.0 form使用实例
  • ​二进制运算符:(与运算)、|(或运算)、~(取反运算)、^(异或运算)、位移运算符​
  • ​人工智能书单(数学基础篇)
  • # 数据结构
  • %3cscript放入php,跟bWAPP学WEB安全(PHP代码)--XSS跨站脚本攻击
  • (06)金属布线——为半导体注入生命的连接
  • (二十五)admin-boot项目之集成消息队列Rabbitmq
  • (接口封装)
  • (七)Java对象在Hibernate持久化层的状态
  • (图)IntelliTrace Tools 跟踪云端程序
  • (心得)获取一个数二进制序列中所有的偶数位和奇数位, 分别输出二进制序列。
  • (学习日记)2024.02.29:UCOSIII第二节
  • (一)UDP基本编程步骤
  • (原)记一次CentOS7 磁盘空间大小异常的解决过程
  • (转)Mysql的优化设置
  • (转)shell调试方法
  • (转)负载均衡,回话保持,cookie
  • (转)详解PHP处理密码的几种方式
  • (最简单,详细,直接上手)uniapp/vue中英文多语言切换
  • *ST京蓝入股力合节能 着力绿色智慧城市服务
  • .NET Core 通过 Ef Core 操作 Mysql
  • .net framework 4.0中如何 输出 form 的name属性。