当前位置: 首页 > news >正文

C++11重写muduo网络库5——Thread,EventLoopThread,EventLoopThreadPool

5.1 Thread

一个Thread就是记录的一个线程的全部信息。

由于线程的id是在创建子线程的传的,因此为了避免子线程还没有开始执行父线程就去用tid,因此用个信号量去避免发生这种情况
在这里插入图片描述

5.2 EventLoopThread

上面的Thread只关注线程,现在是针对EventLoop的线程,这个EventLoopThread里面包含了一个EventLoop的指针和一个Thread类对象。

Thread类只关于一个线程,EventLoopThread类用于绑定一个EventLoop和一个Thread,在一个Thread里创建一个EventLoop,让这个Thread执行一个EventLoop,即:one loop per thread
在这里插入图片描述
startLoop会返回给上层调用者本loop的一个指针
在这里插入图片描述
EventLoopThread.h

#include "Thread.h"
#include "CurrentThread.h"

#include <semaphore.h>

std::atomic_int Thread::numCreated_(0);

Thread::Thread(ThreadFunc func, const std::string &name)
    : started_(false)
    , joined_(false)
    , tid_(0)
    , func_(std::move(func))
    , name_(name)
{
    setDefaultName();
}

Thread::~Thread()
{
    if (started_ && !joined_)
    {
        thread_->detach(); // thread类提供的设置分离线程的方法
    }
}

void Thread::start()  // 一个Thread对象,记录的就是一个新线程的详细信息
{
    started_ = true;
    sem_t sem;
    sem_init(&sem, false, 0);

    // 开启线程
    thread_ = std::shared_ptr<std::thread>(new std::thread([&](){
        // 获取线程的tid值
        tid_ = CurrentThread::tid();
        sem_post(&sem);
        // 开启一个新线程,专门执行该线程函数
        func_(); 
    }));

    // 这里必须等待获取上面新创建的线程的tid值
    sem_wait(&sem);
}

void Thread::join()
{
    joined_ = true;
    thread_->join();
}

void Thread::setDefaultName()
{
    int num = ++numCreated_;
    if (name_.empty())//如果线程还没有名字
    {
        char buf[32] = {0};
        snprintf(buf, sizeof buf, "Thread%d", num); //用线程序号换名字 比如Thread1
        name_ = buf;
    }
}

EventLoopThread.cc

#include "EventLoopThread.h"
#include "EventLoop.h"


EventLoopThread::EventLoopThread(const ThreadInitCallback &cb, 
        const std::string &name)
        : loop_(nullptr)
        , exiting_(false)
        , thread_(std::bind(&EventLoopThread::threadFunc, this), name)
        , mutex_()
        , cond_()
        , callback_(cb)
{

}

EventLoopThread::~EventLoopThread()
{
    exiting_ = true;
    if (loop_ != nullptr)
    {
        loop_->quit();  //退出事件循环
        thread_.join();//等待子线程结束
    }
}


EventLoop* EventLoopThread::startLoop()
{
    thread_.start(); // 底层创建一个新线程并启动,并执行成员变量thread构造时传入的 threadFunc,threadFunc就是下面的函数

    EventLoop *loop = nullptr; //先初始化一下
    {   //加锁
        std::unique_lock<std::mutex> lock(mutex_);
        while ( loop_ == nullptr ) //
        {
            cond_.wait(lock);//成员变量loop_没有被新线程执行threadFunc初始化的时候,一直wait在lock上等待其完成初始化
        }
        loop = loop_;
    }
    return loop;  //当成功加锁后将这个EventLoop对象返回给调用者
}

// 下面这个方法,是在单独的新线程里面运行的
//one loop per thread 是从这里加上的
//bind线程函数时有this指针,所以对象的成员能够在线程中访问
void EventLoopThread::threadFunc()
{
    EventLoop loop; // 创建一个独立的eventloop,和上面的线程是一一对应的,one loop per thread

//如果实现了传递callback_,ThreadInitCallback就是在底层启动一个新线程绑定EventLoop时调用的,进行一些init相关操作
    if (callback_)//callback_是一个传递过来的函数
    {
        callback_(&loop);//下面执行传递过来的函数
    }

    {
        std::unique_lock<std::mutex> lock(mutex_);
        loop_ = &loop;  //将构造出来的loop保存给loop_供TcpServer上层获取使用
        cond_.notify_one();
    }

    loop.loop(); // EventLoop loop  => Poller.poll  正常会一直运行在这里,如果到了下一行说明程序已经退出了
    std::unique_lock<std::mutex> lock(mutex_);
    loop_ = nullptr;
}

5.3EventLoopThreadPool

IO线程池,封装多个IO线程EventLoopThread,并保存每个IO的线程的loop。(基于EventLoopThread继续向上封装)

EventLoopThreadPool.h

#pragma once
#include "noncopyable.h"

#include <functional>
#include <string>
#include <vector>
#include <memory>

class EventLoop;
class EventLoopThread;

class EventLoopThreadPool : noncopyable
{
public:
    using ThreadInitCallback = std::function<void(EventLoop*)>; 

    EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg);
    ~EventLoopThreadPool();

    void setThreadNum(int numThreads) { numThreads_ = numThreads; }//设置IO线程数量 为上层调用

    // 启动线程池,创建若干线程并加入到threads_当中, 并保存线程启动后所有的loop*,为TcpServer的新Tcp连接分配loop要用。
    void start(const ThreadInitCallback &cb = ThreadInitCallback());

    // 如果工作在多线程中,baseLoop_默认以轮询的方式分配channel给subloop
    EventLoop* getNextLoop();

    // 返回事件循环池所有的EventLoop
    std::vector<EventLoop*> getAllLoops();

    bool started() const { return started_; }
    const std::string name() const { return name_; }
private:

    EventLoop *baseLoop_; // 最开始的eventloop,最起码需要有一个
    std::string name_;
    bool started_;  //是否开始,在start()函数中赋值为true
    int numThreads_; //在start之前设置,创建指定数量的IO线程
    int next_; 新连接到来时,所选择的EventLoop对象下标
    std::vector<std::unique_ptr<EventLoopThread>> threads_; //start之后创建的所有线程对象保存起来。
    std::vector<EventLoop*> loops_;//start之后每个thread返回的栈变量指针EventLoop*.即不同IO线程的事件循环。
};

EventLoopThreadPool.cc

#include "EventLoopThreadPool.h"
#include "EventLoopThread.h"

#include <memory>

EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg)
    : baseLoop_(baseLoop)
    , name_(nameArg)
    , started_(false)
    , numThreads_(0)
    , next_(0)
{}

EventLoopThreadPool::~EventLoopThreadPool()
{}

// 启动线程池,创建若干线程并加入到threads_当中, 并保存线程启动后所有的loop*,为TcpServer的新Tcp连接分配loop要用。开启事件循环
void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{
    started_ = true;//设置为已经开始

    for (int i = 0; i < numThreads_; ++i)
    {
        char buf[name_.size() + 32];  // 用“线程池的名字 + 下标”作为底层线程的名字
        snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
        EventLoopThread *t = new EventLoopThread(cb, buf);  //创建指定数目的EventLoopThread
        threads_.push_back(std::unique_ptr<EventLoopThread>(t));//将新创建的EventLoopThread指针加到容器里面
        loops_.push_back(t->startLoop()); // 底层创建线程,绑定一个新的EventLoop,并返回该loop的地址
    }

    // 整个服务端只有一个线程,运行着baseloop
    if (numThreads_ == 0 && cb)  // 如果用户没有调用EventLoopThreadPool::setThreadNum(),numThreads_默认为0
    {
        cb(baseLoop_);
    }
}

// 如果工作在多线程中,baseLoop_默认以轮询的方式分配channel给subloop
EventLoop* EventLoopThreadPool::getNextLoop()
{
    EventLoop *loop = baseLoop_;//不使用setThreadNum指定Reactor模型线程数量,那么muduo默认只有一个baseLoop_

    if (!loops_.empty()) // 通过轮询获取下一个处理事件的loop
    {
        loop = loops_[next_]; //next_是下一个EventLoop对象的下标 
        ++next_;//自增
        if (next_ >= loops_.size())
        {
            next_ = 0;
        }
    }

    return loop;
}

// 返回事件循环池所有的EventLoop
std::vector<EventLoop*> EventLoopThreadPool::getAllLoops()
{
    if (loops_.empty())
    {
        return std::vector<EventLoop*>(1, baseLoop_);
    }
    else
    {
        loops_;
    }
}

5.4 问题

1、为什么muduo库采用固定数量的线程池,而不采用动态增长的线程池?

因为它只是一个网络库,只负责IO线程接收新连接和工作线程监听已连接客户端发生的读写事件,而不管是什么事件,耗不耗费服务端的资源。比较耗费资源的大任务应该由业务层处理,而不在网络库的职责范围内。网络库的EventLoopThreadPool只负责比较简单的监听事件,而不去处理具体的事件,所以我们开启的线程和CPU核数相同即可,可以让线程并行。

在muduo网络库中,一个线程负责一个EventLoop,会监听很多fd。如果这个工作线程还要处理A用户到B用户文件传输请求的话,那么当前这个工作线程其他用户再有什么事件发生的话,这个工作线程就无法及时处理,所以像这种有耗时的IO操作还是需要有线程专门去做的

相关文章:

  • NISP的渗透测试怎么操作的
  • 安装Nginx教程
  • Java_接口基本介绍
  • 16、Java——QuickHit游戏
  • SpringBoot--Controller获取HttpServletRequest
  • 牛客刷题笔记
  • 我把华为云的Ubuntu 18.04升级到了Ubuntu 22.04
  • Google Earth Engine-02(主界面介绍)
  • 5.java数据结构与算法 ---- 第七章 八大排序(冒泡;选择;插入/希尔;快排;归并;基数)
  • 新学期你立一个什么样的FLAG?
  • 【数据结构与算法】第九篇:哈希表原理
  • 自动驾驶江湖重新排位:毫末智行“重感知、轻地图”优势初现
  • [JS]变量
  • 用户登录更新中:网关,token,全局异常处理
  • Unity Pico Neo3 基础开发流程
  • @jsonView过滤属性
  • conda常用的命令
  • C语言笔记(第一章:C语言编程)
  • express如何解决request entity too large问题
  • JAVA 学习IO流
  • javascript数组去重/查找/插入/删除
  • Laravel5.4 Queues队列学习
  • Python 基础起步 (十) 什么叫函数?
  • Python实现BT种子转化为磁力链接【实战】
  • React的组件模式
  • windows-nginx-https-本地配置
  • 区块链共识机制优缺点对比都是什么
  • 如何选择开源的机器学习框架?
  • 微信小程序填坑清单
  • Java性能优化之JVM GC(垃圾回收机制)
  • ​你们这样子,耽误我的工作进度怎么办?
  • #WEB前端(HTML属性)
  • $refs 、$nextTic、动态组件、name的使用
  • (1)Nginx简介和安装教程
  • (32位汇编 五)mov/add/sub/and/or/xor/not
  • (4) PIVOT 和 UPIVOT 的使用
  • (Arcgis)Python编程批量将HDF5文件转换为TIFF格式并应用地理转换和投影信息
  • (C++20) consteval立即函数
  • (附源码)ssm高校升本考试管理系统 毕业设计 201631
  • (附源码)计算机毕业设计SSM在线影视购票系统
  • (转载)CentOS查看系统信息|CentOS查看命令
  • .cn根服务器被攻击之后
  • .NET/C# 使用反射调用含 ref 或 out 参数的方法
  • .net操作Excel出错解决
  • @zabbix数据库历史与趋势数据占用优化(mysql存储查询)
  • [ MSF使用实例 ] 利用永恒之蓝(MS17-010)漏洞导致windows靶机蓝屏并获取靶机权限
  • [ vulhub漏洞复现篇 ] Apache Flink目录遍历(CVE-2020-17519)
  • [ vulhub漏洞复现篇 ] ThinkPHP 5.0.23-Rce
  • [AAuto]给百宝箱增加娱乐功能
  • [Asp.net MVC]Asp.net MVC5系列——Razor语法
  • [c#基础]DataTable的Select方法
  • [codevs 1296] 营业额统计
  • [EULAR文摘] 利用蛋白组学技术开发一项蛋白评分用于预测TNFi疗效
  • [Everyday Mathematics]20150130
  • [jQuery]div滚动条回到最底部