【Linux操作系统】-- 多线程(三)-- 线程池+单例模式+读写者模型
目录
线程池
场景
代码实现
线程安全的单例模式
懒汉实现方式和懒汉实现方式
饿汉方式实现单例模式
懒汉方式实现单例模式
实战代码演练单例模式
读者写者模型
解释
基本操作
创建/销毁读写锁
读者锁和写者锁
解锁
伪代码理解读写锁
优先级
挂起等待锁vs自旋锁
自旋锁语法
线程池
在C++中用户使用new/malloc都是向操作系统OS申请的,在系统的角度,就相当于new/malloc在底层封装了系统调用,当调用系统调用。
- 状态发生变化,就是在malloc申请调用系统调用,需要从用户态转变为内核态,申请完状态再从内核态转变为用户态。
- 在向OS申请的时候,OS并不保证有足够的内存空间给你,或者在申请的时候,OS正在跑其他代码,并不一定有合适的内存块给你。有可能要执行OS内的内存处理算法,把内存碎片合并,或者把不要的内存释放掉,总之它要做更多的工作来腾出你要的空间。对于用户来说,OS做什么工作跟用户根本没有关系,但是OS做的工作所花的实践都嫁接到了用户头上,是耗时的。
所以频繁的使用new/malloc是有可能降低工作效率的,每次执行一次new/malloc都会执行一次耗时动作。与其这样,那么我们不如直接一次跟OS要一大块内存,那么OS系统所做的耗时的动作只需要执行一遍,这样用户想用多少空间就用多少空间,直接从这块池子中拿到空间,而不用一次一次的去向OS申请空间,执行内存管理算法。我们将这一大块内存称为内存池。
那么我们申请到大块内存空间,这个内存空间需要用户来进行管理!第二,内存池主要还是要提高效率。
当我们处理一批任务的时候,如果一批任务到来的时候,通常我们先创建线程,并让线程处理任务,这个是我们在网络服务经常使用的。当任务到来的时候再去创建线程,就相当于我需要内存的时候再从OS申请空间,创建线程也是有成本的。所以我们可以预先创建出一把线程,任务一到,线程已经提前准备好了,我们直接将任务指派给某个线程,让线程去运行就可以。提前准备好的线程,用来随时处理任务,就称为线程池。
内存池是为了提高效率,那么线程池的创建也是为了提高效率。
场景
假设线程池预先创建出一批线程,另有一个线程用来生产任务,我们想把这个生产出来的任务派发给线程池。那么怎么派发呢?我们可以在线程池中维护一个任务队列,这个跟我们写阻塞队列一样,但是这次我们不写固定大小。当线程产生任务,将任务放入任务队列,线程池中的线程竞争式的抢夺,从队列中拿任务,拿到自己的上下文处理任务就可以。那么这就是可以处理任务的线程池。
代码实现
根据上面的场景,我们的线程池,需要有一个类,需要包含一个任务队列,还需要包含若干个线程。
首先里面的成员变量需要一个int num来告诉线程池有几个线程。用queue类型表示队列,里面放的任务类型用模板表示。构造函数中,初始化线程数量,我们定义一个全局变量g_num来初始化,知道数量之后,就创建线程。
那么我们写一个初始化线程来创建一批线程,其中这里我们就不保存线程id了,因为线程创建好之后,主线程就不管他们了,所有新创建的线程直接分离。在初始化创建线程时,我们让每个线程都执行Routinue任务函数。因为我们不想让主线程等待线程池的线程,我们在Routinue函数中分离线程。一旦分离,主线程就会向后走,新线程(线程池线程)继续执行自己的内容。
在类中执行类内成员方法是不可行的,我们不能在类内直接写一个Routinue函数,因为类内函数包含一个隐藏的this指针。想要在类内让线程执行类内方法,必须让线程执行静态方法!静态成员方法是没有this指针的,并且它也无法直接使用非静态成员变量。
下面我们写向队列放任务的函数pushtask。我们在放任务的同时,线程池的线程会抢占式的竞争队列中的任务,这个过程在Rontinue中实现,所以我们需要一把锁来保护临界资源。当我安全的向任务队列放任务,有可能很长时间都没有任务,导致线程们都休眠了,那么我们要判断以下对垒是否为空。当队列是空的时候就跳出循环break,然后解锁,这样的话会导致一个问题,当线程没有抢占任务就会被挂起,而抢占到锁的线程可能会一直抢占,因为从挂起状态到抢占状态需要一段时间,而某一个线程抢占完任务,解锁后,因为没有被挂起,抢占能力更强,导致一支枪展资源,只有一个线程来执行任务。所以没有有任务的时候我们再解锁。
static void* Routinue(void* args)
{
pthread_detach(pthread_self());//将线程分离掉
ThreadPool<T>* tp = (ThreadPool<T>*)args;
while(true)
{
Lock();
if(tp->IsEmpty())
{
break;
}
Unlock();
}
}
所以在队列为空的时候,需要将所有线程挂起。此时需要一个条件变量。当有任务的时候,需要在成员变量队列中拿任务,但是静态函数不可以使用非静态成员变量,但是可以使用非静态函数,所以我们洗一个PopTask,从队列里拿出数据。PopTask函数在拿出数据后,因为pop已经将数据从队列里删除,数据已经不是临界资源,所以可以在锁外面处理数据。t.Run();这样就形成了多个线程同时在执行任务。
因为担心条件不就绪,就执行挂起等待,或者多核cpu处理程序,误判本来队列不为空,判断为空,那么判空就会出现错误,所以我们将if改为while,变成轮询检测。
thread_pool.hpp
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <unistd.h>
#include <pthread.h>
namespace ns_threadpool
{
const int g_num = 5;
template <class T>
class ThreadPool
{
private:
int num_;
std::queue<T> task_queue_;
pthread_mutex_t mtx_;
pthread_cond_t cond_;
public:
void Lock()
{
pthread_mutex_lock(&mtx_);
}
void Unlock()
{
pthread_mutex_unlock(&mtx_);
}
bool IsEmpty()
{
return task_queue_.empty();
}
void Wait()
{
pthread_cond_wait(&cond_,&mtx_);
}
void WakeUp()
{
pthread_cond_signal(&cond_);
}
public:
ThreadPool(int num = g_num)
: num_(num)
{
pthread_mutex_init(&mtx_,nullptr);
pthread_cond_init(&cond_,nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&cond_);
}
static void* Routinue(void* args)
{
pthread_detach(pthread_self());//将线程分离掉
ThreadPool<T>* tp = (ThreadPool<T>*)args;
while(true)
{
tp->Lock();
while(tp->IsEmpty())
{
tp->Wait();//挂起等待
}
//该队列一定有任务,需要从队列拿任务
// T t = task_queue_.front(); --不可以这样写,静态成员函数不可以使用非静态成员变量
T t;
tp->PopTask(&t);
tp->Unlock();
t();
}
}
void InitThreadPool()
{
pthread_t tid;
for (int i = 0; i < num_; i++)
{
pthread_create(&tid, nullptr, Routinue, (void*)this);//要执行对象,传this指针。
}
}
void PushTask(const T& in)
{
Lock();
task_queue_.push(in);
Unlock();
WakeUp();
}
void PopTask(T* out)
{
*out = task_queue_.front();
task_queue_.pop();
}
};
}
main.cc
#include "thread_pool.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
using namespace ns_threadpool;
using namespace ns_task;
int main()
{
ThreadPool<Task>* tp = new ThreadPool<Task>(10);
tp->InitThreadPool();//初始化线程池
srand((long long)time(nullptr));
const std::string ops = "+-*/%";
while(true)
{
//网络
Task t(rand()%20 + 1,rand()%10+1 ,"+-*/%"[rand()%ops.size()]);
tp->PushTask(t);
sleep(1);
}
return 0;
}
线程安全的单例模式
有时候,在做服务器开发的时候,会将很多数据加载到内存,这些数据往往只需要一个单例的类来管理这些数据,也就是这些数据在内存中只有一份。所以,如果我们想要一个数据在内存中只出现一次,我们就称之为单例模式。(一个类只能创建一个对象)
当我们定义对象,需要经历两个步骤:
- 开辟空间:开辟空间在编译器编译代码时,当程序加载到内存时,他会自动给你开辟空间。
- 给空间写入初始值:通常调用构造函数初始化。
开辟空间+填入数据,我们叫他初始化的过程,如果将这两个步骤分开,我们叫填入数据为赋值过程。定义对象的本质就是将对象加载到内存,那么单例模式就是让该对象在内存中存在一份,加载一次。
那么什么时候加载,什么时候创建呢?其中就有饿汉模式和懒汉模式,这两种模式。一般而言,我们的独享被设计成单例需要满足两个条件:
- 语义上,只需要一个
- 该对象内部存在大量的空间,保存了大量的数据,如果允许该对象存在多份,或者允许发生各种拷贝,内存中就会存在冗余数据。
懒汉实现方式和懒汉实现方式
通俗点讲,饿汉就是吃完饭立刻洗碗,懒汉就是吃完饭等下次吃饭的时候再洗碗。那么这两种方式有什么区别呢?懒汉最核心的方式就是延迟加载,我们遇见最典型的方式就是写时拷贝。那么饿汉方式有很多弊端,比如开辟空间立马就给你,但是有的空间你用不到,并且创建空间的时间更久,需要开辟更多空间,初始化更多数据。所以这就体现了懒汉模式的好处,需要的时候再做,不需要的时候不做。如果使用饿汉的时候,这会导致程序启动的时候非常慢,如果采用懒汉模式,刚开始启动的时候先不加载,先让程序跑起来,你用到数据的时候,再给你创建,此时通过延时加载的方式让代码启动时速度变快。
饿汉方式实现单例模式
这里有一个静态成员变量data,获取静态成员变量使用静态函数调用。那么我们知道,当构建出这个对象的时候,这个对象在类中已经被创建好了,static的成员函数/成员对象是属于类,而不属于对象的。也就是说下面这个代码编译形成可执行程序,加载到内存时,类只要被加载进来了,那么这个对象也早就存在了。所以当我们在创建这个对象的时候,当加载这个程序时,对象已经就有了。这也就是饿汉方式,创建对象,立马把对象加载出来。
template <typename T>
class Singleton
{
static T data;
public:
static T* GetInstance()
{
return &data;
}
};
懒汉方式实现单例模式
与饿汉不同,饿汉在编译时候就已经开辟好空间,而懒汉在用成员函数获取成员变量的时候,先要判断这个变量是否为空,如果为空就新建,如果已经有了对象,就返回这个存在的对象的地址。这样的话,我们在编译的时候还没有开辟空间,当我们想用这个对象,用懒汉的方式,用的时候再开辟即可。
template <typename T>
class Singleton
{
static T* inst;
public:
static T* GetInstance()
{
if(inst == NULL)
{
inst = new T();
}
return inst;
}
};
实战代码演练单例模式
在这篇文章的开始,我们讲到了线程池,而线程池数据多,在内存中也只需要一份,所以我们可以来写一个单例模式版的线程池。
因为是单例模式,所以我们要把构造函数变成私有的,并且不能有拷贝构造和赋值。并且成员变量需要有一个静态指针指向的这个单例,也就是说需要把那些能够创建对象的方法全部私有化,在类内定义一个私有的指针。因为是类内静态成员变量,需要在类外初始化。获取对象的时候不能直接用类创建对象,需要用一个方法获取到这个指向类的指针。
和以往的方式一样,创建对象后需要初始化,InitThreadPool,所以我们直接在thread_pool类中的GetInstance中创建对象后,直接初始化对象,这样更方便。
thread_pool.hpp
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <unistd.h>
#include <pthread.h>
namespace ns_threadpool
{
const int g_num = 5;
template <class T>
class ThreadPool
{
private:
//省略...
static ThreadPool<T> *ins_; //
private:
ThreadPool(int num = g_num)
: num_(num)
{
pthread_mutex_init(&mtx_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
ThreadPool(const ThreadPool<T> &tp) = delete; //拷贝构造
ThreadPool<T> &operator=(ThreadPool<T> &tp) = delete; //赋值重载
public:
//省略...
public:
static ThreadPool<T>* GetInstance()
{
if (ins_ == nullptr)
{
ins_ = new ThreadPool<T>();
ins_->InitThreadPool();
}
return ins_;
}
~ThreadPool()
{
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&cond_);
}
static void *Routinue(void *args)//静态函数为了传一个参数
{
pthread_detach(pthread_self()); //将线程分离掉
ThreadPool<T> *tp = (ThreadPool<T> *)args;
while (true)
{
tp->Lock();
while (tp->IsEmpty())
{
tp->Wait(); //挂起等待
}
//该队列一定有任务,需要从队列拿任务
// T t = task_queue_.front(); --不可以这样写,静态成员函数不可以使用非静态成员变量
T t;
tp->PopTask(&t);
tp->Unlock();
t();
}
}
void InitThreadPool()
{
pthread_t tid;
for (int i = 0; i < num_; i++)
{
pthread_create(&tid, nullptr, Routinue, (void *)this); //要执行对象,传this指针。
}
}
void PushTask(const T &in)
{
Lock();
task_queue_.push(in);
Unlock();
WakeUp();
}
void PopTask(T *out)
{
*out = task_queue_.front();
task_queue_.pop();
}
};
template <class T>
ThreadPool<T> *ThreadPool<T>::ins_ = nullptr;
}
main.cc
#include "thread_pool.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
using namespace ns_threadpool;
using namespace ns_task;
int main()
{
sleep(5);
srand((long long)time(nullptr));
const std::string ops = "+-*/%";
while(true)
{
//网络
Task t(rand()%20 + 1,rand()%10+1 ,ops[rand()%ops.size()]);
ThreadPool<Task>::GetInstance()->PushTask(t);
}
return 0;
}
单例本身会在任何场景,任何环境下调用,GetInstance势必会被多线程重入,导致线程安全问题。比如说,当单例第一次被创建,就在创建到new语句,开辟内存空间到一半,就被切走了,剩下的线程一看,发现此时单例还是空的,还没有被创建出来,这时就会再次重新创建,等到第一个单例被切回来的时候,又创建了一个。
所以可以定义一个static的锁,静态锁初始化可以用宏PTHREAD_MUTEX_INITIALIZER;初始化后就开始上锁,当线程抢占队列任务时候,先竞争锁,创建单例,创建初始化线程之后,解锁然后返回单例。每次上锁解决了单例不安全的问题,但如果队列任务为0的话,每次线程还要抢占锁再判断队列中有没有任务,这样非常消耗资源,所以我们做一个双判断,在锁的外面再加一个判空。
static ThreadPool<T> *GetInstance()
{
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
//当前单例对象还没有被创建按,上锁
if (ins_ == nullptr) //双判定,减少锁的征用,提高获取单例的效率
{
pthread_mutex_lock(&lock);
if (ins_ == nullptr)
{
ins_ = new ThreadPool<T>();
ins_->InitThreadPool();
}
pthread_mutex_unlock(&lock);
}
return ins_;
}
读者写者模型
解释
读者写者模型的思考方式实际上和生产者消费者模型一样,遵守321原则:
3:三种关系--
读者和读者,没有关系
写者和写者,互斥关系
读者和写者,互斥关系,同步关系
2:两种角色--读者和写者
1:一个交易场所--提供一段缓冲区(自己申请的/stl容器)
这个模型中,我们解释一下
我们举一个例子,在小学的时候我们都见过黑板报。当一个人正在写,另一个人就不能插入进来写,不然的话就乱套了,所以写者和写者之间是互斥关系。
当一个人正在写黑板报的时候,还没有写完的情况下,我过来读黑板报,过了一会写的人还没写完,我又过来读了以下,导致我每次读的数据都不一样。所以读者和写者关系最典型的就是互斥关系。在放假的时候,出黑板报的同学出了一个黑板报,在放假期间来来回回擦掉又出了几次,期间没有一名同学来看黑板报,那么出黑板报是毫无意义的。但是这种情况并没有错,没有让任何人获取信息,也没有把信息覆盖掉导致数据出错,但是作为一个写者,别人还没读就把他擦掉,这种情况就不合理。所以最合理的情况,读的人读完了,就让写的人来写;写的人写完了,就让读的人来读。所以读者和写者之间还要带一个同步关系。
读者和读者之间,我们生活中并不存在,我读完了,其他人再读的情况,或者每个人把队排好一个一个来读。所以读者和读者二者本身是没有关系的。
我们发现,三种关系中后两者跟生产者消费者模型一样,唯一不同的是读者和读者之间(与消费者和消费者之间对比,消费者之间是互斥关系)。不一样的根本原因是,读者不会取走资源,而消费者会拿走数据。使用代码完成读者写者模型,本质是使用锁来维护上面的三种关系。那么读者和写者这两种角色就由线程承担,由线程承担,他们的交易场所就很好定义出来了。
通过以上解释,我们总结一下:
1.对数据,大部分操作是读取,少量的操作是写入,这样的场景就适合读者写者模型
2.判断一个体系是否符合读者写者模型,我们的判断依据是,进行数据读取(消费)的一端,是否会将数据取走。如果不取走,就可以考虑读者写者模型。
在生活中哪些特别适合读者写者模型呢?比如新闻发布,我们今天看到一篇文章,这篇文章发出来之后,在网上一挂就是大半年,并且发完之后很少去修改。一个人很少去修改,并且大部分人都去读它。或者我们写的博客,很少去修改,但是有很多人看。
基本操作
创建/销毁读写锁
#include <pthread.h>
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);
想要创建读写锁,直接定义一个数据类型,pthread_rwlock_t,初始化读写锁用到的函数就是pthread_rwlock_init,传入的参数是读写锁和读写锁属性,和互斥锁没有区别。
读者锁和写者锁
以读者身份加锁,每种加锁都配了一个trylock,trylock是一个非阻塞式的加锁,本来我们申请锁失败就挂起,而trylock不挂起,他会出错了就返回。
#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
以读者身份加锁
#include <pthread.h>
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
解锁
无论读者还是写者,解锁都是用这种方式,来进行对读写锁进行释放
#include <pthread.h>
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
伪代码理解读写锁
假设这里有一个写者,有一个读者,读写者中间有一个int readers = 0;整数代表当前有多少人在读,所以读者进来的时候要做的最基本的检测。先加锁mtx1.lock();然后判断一下,如果读者数量大于0,对于写者而言,读者与写者互斥,读者大于0,那么写者就要等一等。所以要把锁释放掉然后就可以去等了。如果没有人读,那么写者获取锁。此时进入临界区就可以对数据进行修改。
当读者获取到锁时,因为读者和读者之间没有关系,所以遇到读者就直接增加读者数量,readers++。
我们用同一把锁保证读写者之间的互斥,我们用计数器reader来保证这种机制。
这里有一个问题,当写者进来,readers=0,写者就可以申请到锁,并进行修改数据。那么读者进来后,只要对计数器readers++,那么写者就永远进不来了。如果没有读者,写者就进入到临界区了,此时有可能读者也进来了,但是数据并没有写完,所以我们要写完临界区之后再解锁。
所以左边写者的加锁解锁方式我们统称为pthread_rwlock_wrlock(),右边读者的加锁解锁方式统称为pthread_rwlock_rdlock();
优先级
读者优先:读者和写者同时到来的时候,我们让读者先进入访问
写者优先:当读者和写者同时到来的时候,比当前写者晚来的所有读者,都不要进入临界区访问了,等临界区中没有读者的时候,让写者先写入。
读者多,写者少,所以是存在写饥饿问题。所以我们可以设置延时写入,或者写优先。
挂起等待锁vs自旋锁
举个例子,李四来找张三玩,但是张三在写作业,第一次李四给张三打电话,李四说还有一个小时才能写完,所以李四去了网吧玩,一个小时之后,张三写完了去找李四。第二次张三没有作业,李四打电话张三立马答应,说要下楼去找他,但是过了一会张三还是没有下来,李四又打了电话,张三说我已经走到2楼了,因为等待张三的时间短,所以李四不需要去电竞馆。那么决定李四在楼下等还是去电竞馆的因素是张三需要多长时间才能下来。张三如果需要时间久,我们就去电竞馆,将自己挂起,但是这个挂起是有时间成本的。张三如果需要时间短,我们就应该在楼下等,不断打电话检测张三状态,这个就是自旋的过程。
所以在Lock()和Unlock()中间是我们的临界资源,因为线程挂起等待是有成本的,所以如果花费时间长比较适合挂起等待锁,当一个线程A占用临界资源时间太长,我们就让他归还锁并挂起;如果花费的时间非常短,当一个线程A占用临界资源时间非常短,那么让other其他线程做轮询检测,问锁你好了没有,有没有用完,有没有就绪。实际上这个锁并没有归还,但每个线程都在问锁好没好,这个就叫自旋锁,达到提高效率的目的。
那么线程如何得知,自己会在临界资源中待多长时间呢?实际上线程不知道,这个是程序员才知道的。
自旋锁语法
初始化自旋锁
#include <pthread.h>
int pthread_spin_destroy(pthread_spinlock_t *lock);
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
定义自旋锁加锁和解锁
#include <pthread.h>
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
#include <pthread.h>
int pthread_spin_unlock(pthread_spinlock_t *lock);
自旋锁和互斥锁的使用方法是一样的,并且自旋锁如果申请不成功,自旋锁底层就会帮你检测进行while循环,不用像以前互斥锁申请不成功,自己还要进行while轮询检测申请所。只要调用了spin,spin底层就会帮我们轮询检测。