C++11重写muduo网络库5——Thread,EventLoopThread,EventLoopThreadPool
5.1 Thread
一个Thread就是记录的一个线程的全部信息。
由于线程的id是在创建子线程的传的,因此为了避免子线程还没有开始执行父线程就去用tid,因此用个信号量去避免发生这种情况
5.2 EventLoopThread
上面的Thread只关注线程,现在是针对EventLoop的线程,这个EventLoopThread里面包含了一个EventLoop的指针和一个Thread类对象。
Thread类只关于一个线程,EventLoopThread类用于绑定一个EventLoop和一个Thread,在一个Thread里创建一个EventLoop,让这个Thread执行一个EventLoop,即:one loop per thread
startLoop会返回给上层调用者本loop的一个指针
EventLoopThread.h
#include "Thread.h"
#include "CurrentThread.h"
#include <semaphore.h>
std::atomic_int Thread::numCreated_(0);
Thread::Thread(ThreadFunc func, const std::string &name)
: started_(false)
, joined_(false)
, tid_(0)
, func_(std::move(func))
, name_(name)
{
setDefaultName();
}
Thread::~Thread()
{
if (started_ && !joined_)
{
thread_->detach(); // thread类提供的设置分离线程的方法
}
}
void Thread::start() // 一个Thread对象,记录的就是一个新线程的详细信息
{
started_ = true;
sem_t sem;
sem_init(&sem, false, 0);
// 开启线程
thread_ = std::shared_ptr<std::thread>(new std::thread([&](){
// 获取线程的tid值
tid_ = CurrentThread::tid();
sem_post(&sem);
// 开启一个新线程,专门执行该线程函数
func_();
}));
// 这里必须等待获取上面新创建的线程的tid值
sem_wait(&sem);
}
void Thread::join()
{
joined_ = true;
thread_->join();
}
void Thread::setDefaultName()
{
int num = ++numCreated_;
if (name_.empty())//如果线程还没有名字
{
char buf[32] = {0};
snprintf(buf, sizeof buf, "Thread%d", num); //用线程序号换名字 比如Thread1
name_ = buf;
}
}
EventLoopThread.cc
#include "EventLoopThread.h"
#include "EventLoop.h"
EventLoopThread::EventLoopThread(const ThreadInitCallback &cb,
const std::string &name)
: loop_(nullptr)
, exiting_(false)
, thread_(std::bind(&EventLoopThread::threadFunc, this), name)
, mutex_()
, cond_()
, callback_(cb)
{
}
EventLoopThread::~EventLoopThread()
{
exiting_ = true;
if (loop_ != nullptr)
{
loop_->quit(); //退出事件循环
thread_.join();//等待子线程结束
}
}
EventLoop* EventLoopThread::startLoop()
{
thread_.start(); // 底层创建一个新线程并启动,并执行成员变量thread构造时传入的 threadFunc,threadFunc就是下面的函数
EventLoop *loop = nullptr; //先初始化一下
{ //加锁
std::unique_lock<std::mutex> lock(mutex_);
while ( loop_ == nullptr ) //
{
cond_.wait(lock);//成员变量loop_没有被新线程执行threadFunc初始化的时候,一直wait在lock上等待其完成初始化
}
loop = loop_;
}
return loop; //当成功加锁后将这个EventLoop对象返回给调用者
}
// 下面这个方法,是在单独的新线程里面运行的
//one loop per thread 是从这里加上的
//bind线程函数时有this指针,所以对象的成员能够在线程中访问
void EventLoopThread::threadFunc()
{
EventLoop loop; // 创建一个独立的eventloop,和上面的线程是一一对应的,one loop per thread
//如果实现了传递callback_,ThreadInitCallback就是在底层启动一个新线程绑定EventLoop时调用的,进行一些init相关操作
if (callback_)//callback_是一个传递过来的函数
{
callback_(&loop);//下面执行传递过来的函数
}
{
std::unique_lock<std::mutex> lock(mutex_);
loop_ = &loop; //将构造出来的loop保存给loop_供TcpServer上层获取使用
cond_.notify_one();
}
loop.loop(); // EventLoop loop => Poller.poll 正常会一直运行在这里,如果到了下一行说明程序已经退出了
std::unique_lock<std::mutex> lock(mutex_);
loop_ = nullptr;
}
5.3EventLoopThreadPool
IO线程池,封装多个IO线程EventLoopThread,并保存每个IO的线程的loop。(基于EventLoopThread继续向上封装)
EventLoopThreadPool.h
#pragma once
#include "noncopyable.h"
#include <functional>
#include <string>
#include <vector>
#include <memory>
class EventLoop;
class EventLoopThread;
class EventLoopThreadPool : noncopyable
{
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;
EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg);
~EventLoopThreadPool();
void setThreadNum(int numThreads) { numThreads_ = numThreads; }//设置IO线程数量 为上层调用
// 启动线程池,创建若干线程并加入到threads_当中, 并保存线程启动后所有的loop*,为TcpServer的新Tcp连接分配loop要用。
void start(const ThreadInitCallback &cb = ThreadInitCallback());
// 如果工作在多线程中,baseLoop_默认以轮询的方式分配channel给subloop
EventLoop* getNextLoop();
// 返回事件循环池所有的EventLoop
std::vector<EventLoop*> getAllLoops();
bool started() const { return started_; }
const std::string name() const { return name_; }
private:
EventLoop *baseLoop_; // 最开始的eventloop,最起码需要有一个
std::string name_;
bool started_; //是否开始,在start()函数中赋值为true
int numThreads_; //在start之前设置,创建指定数量的IO线程
int next_; 新连接到来时,所选择的EventLoop对象下标
std::vector<std::unique_ptr<EventLoopThread>> threads_; //start之后创建的所有线程对象保存起来。
std::vector<EventLoop*> loops_;//start之后每个thread返回的栈变量指针EventLoop*.即不同IO线程的事件循环。
};
EventLoopThreadPool.cc
#include "EventLoopThreadPool.h"
#include "EventLoopThread.h"
#include <memory>
EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg)
: baseLoop_(baseLoop)
, name_(nameArg)
, started_(false)
, numThreads_(0)
, next_(0)
{}
EventLoopThreadPool::~EventLoopThreadPool()
{}
// 启动线程池,创建若干线程并加入到threads_当中, 并保存线程启动后所有的loop*,为TcpServer的新Tcp连接分配loop要用。开启事件循环
void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{
started_ = true;//设置为已经开始
for (int i = 0; i < numThreads_; ++i)
{
char buf[name_.size() + 32]; // 用“线程池的名字 + 下标”作为底层线程的名字
snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
EventLoopThread *t = new EventLoopThread(cb, buf); //创建指定数目的EventLoopThread
threads_.push_back(std::unique_ptr<EventLoopThread>(t));//将新创建的EventLoopThread指针加到容器里面
loops_.push_back(t->startLoop()); // 底层创建线程,绑定一个新的EventLoop,并返回该loop的地址
}
// 整个服务端只有一个线程,运行着baseloop
if (numThreads_ == 0 && cb) // 如果用户没有调用EventLoopThreadPool::setThreadNum(),numThreads_默认为0
{
cb(baseLoop_);
}
}
// 如果工作在多线程中,baseLoop_默认以轮询的方式分配channel给subloop
EventLoop* EventLoopThreadPool::getNextLoop()
{
EventLoop *loop = baseLoop_;//不使用setThreadNum指定Reactor模型线程数量,那么muduo默认只有一个baseLoop_
if (!loops_.empty()) // 通过轮询获取下一个处理事件的loop
{
loop = loops_[next_]; //next_是下一个EventLoop对象的下标
++next_;//自增
if (next_ >= loops_.size())
{
next_ = 0;
}
}
return loop;
}
// 返回事件循环池所有的EventLoop
std::vector<EventLoop*> EventLoopThreadPool::getAllLoops()
{
if (loops_.empty())
{
return std::vector<EventLoop*>(1, baseLoop_);
}
else
{
loops_;
}
}
5.4 问题
1、为什么muduo库采用固定数量的线程池,而不采用动态增长的线程池?
因为它只是一个网络库,只负责IO线程接收新连接和工作线程监听已连接客户端发生的读写事件,而不管是什么事件,耗不耗费服务端的资源。比较耗费资源的大任务应该由业务层处理,而不在网络库的职责范围内。网络库的EventLoopThreadPool只负责比较简单的监听事件,而不去处理具体的事件,所以我们开启的线程和CPU核数相同即可,可以让线程并行。
在muduo网络库中,一个线程负责一个EventLoop,会监听很多fd。如果这个工作线程还要处理A用户到B用户文件传输请求的话,那么当前这个工作线程其他用户再有什么事件发生的话,这个工作线程就无法及时处理,所以像这种有耗时的IO操作还是需要有线程专门去做的