当前位置: 首页 > news >正文

【Muduo】三大核心之EventLoop

Muduo网络库的EventLoop模块是网络编程框架中的核心组件,负责事件循环的驱动和管理。以下是对EventLoop模块的详细介绍:

作用与功能

  • EventLoop是网络服务器中负责循环的重要模块,它持续地监听、获取和处理各种事件,如IO事件、定时器事件等。
  • 它通过轮询访问Poller(如EPollPoller),获取激活的Channel列表,然后使Channel根据自身情况调用相应的回调函数来处理事件。
  • EventLoop确保了每个Loop都是相互独立的,拥有自己的事件循环、Poller监听者和Channel监听通道列表。

与Poller的关系

  • Poller负责从事件监听器上获取监听结果,即哪些文件描述符(fd)上发生了哪些事件。
  • EventLoop会轮询访问Poller,以获取这些发生事件的fd及其相关事件。

与Channel的关系

  • Channel类是对文件描述符(fd)以及其相关事件的封装。它保存了fd的感兴趣事件、实际发生的事件以及每种事件对应的处理函数。
  • 当Poller检测到某个fd上有事件发生时,EventLoop会找到对应的Channel,并调用其上的回调函数来处理该事件。

线程模型

  • EventLoop遵循“one loop one thread”的原则,即每个EventLoop都在一个独立的线程上运行。
  • 这种设计使得事件处理更加高效和清晰,避免了多线程环境下的竞态条件和同步问题。

mainLoop和subLoop

在Muduo网络库中,mainLoop和subLoop都是EventLoop的实例,它们分别代表主事件循环和子事件循环。

mainLoop(主事件循环)

  • mainLoop是整个Muduo网络库的核心事件循环。它负责监听服务器套接字(通常是listenfd),并接受来自客户端的连接请求。
  • mainLoop运行一个Accrptor,包含一个Poller,用于监听一个特定的非阻塞的服务器sockfd上的读事件。当Poller检测到有读事件发生时(一般是新用户连接),mainLoop会在线程池中通过轮询算法选择一个subLoop来处理这个连接的读写和关闭事件。Acceptor将在后续阐述。
  • mainLoop遵循 “one loop one thread” 的原则,即每个mainLoop都在一个独立的线程上运行。这确保了事件处理的高效性和清晰性,避免了多线程环境下的竞态条件和同步问题。

subLoop(子事件循环)

  • subLoop是mainLoop的子事件循环,用于处理已建立的连接的读写和关闭事件。每个subLoop都在一个独立的线程上运行,有一个用于唤醒自身的fd和Channel,运行一个Poller,并保存自己管理的多个Channel,以实现并发处理多个连接的目的。
  • 当mainLoop接受到一个新的连接请求时,它会根据EventLoopThreadPool中的线程来选择一个subLoop,将新创建的TcpConnection的Channel放入这个subLoop中。这个subLoop会接管该连接的fd,并监听其上的读写和关闭事件。
  • subLoop中的事件处理逻辑与mainLoop类似,也是通过Poller来监听fd上的事件,并调用相应的回调函数来处理这些事件。
  • 由于subLoop是独立的线程,因此它们可以并行处理多个连接,从而提高了服务器的并发处理能力。

总的来说,mainLoop和subLoop共同构成了Muduo网络库的事件驱动编程框架。mainLoop负责监听服务器套接字并接受连接请求,而subLoop则负责处理已建立的连接的读写和关闭事件。通过合理的线程调度和事件处理机制,Muduo网络库能够高效、稳定地处理大量的并发连接请求。

EventLoop.h

#pragma once
#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrentThread.h"
#include "LogStream.h"#include <functional>
#include <vector>
#include <atomic>
#include <memory>
#include <mutex>
#include <sys/types.h>class Channel;
class Poller;/*** 事件循环类  两大模型:Channel  Poller* mainLoop只负责处理IO,并返回client的fd* subLoop负责监听poll,并处理相应的回调* 两者之间通过weakupfd进行通信
*/
class EventLoop : noncopyable
{
public:using Functor = std::function<void()>;EventLoop();~EventLoop();// 开启loopvoid loop();// 退出loopvoid quit();Timestamp pollReturnTime() const { return pollReturnTime_; }// 在当前loop执行cbvoid runInLoop(Functor cb);// 把cb放入队列,唤醒subloop所在的线程,执行cbvoid queueInLoop(Functor cb);size_t queueSize() const;// 唤醒loop所在的线程,EventLoop::queueInLoop中调用void wakeup();// EventLoop方法 =》 Poller的方法void updateChannel(Channel *channel);void removeChannel(Channel *channel);bool hasChannel(Channel *channel);// 判断EventLoop对象是否在自己的线程中bool isInLoopThread() const {return threadId_ == CurrentThread::tid();}private:// waked up后的一个操作 void handleRead();       // 执行回调void doPendingFunctors(); using ChannelList = std::vector<Channel *>;std::atomic_bool looping_; // 原子操作,通过CAS实现std::atomic_bool quit_;    // 标识退出loop循环const pid_t threadId_; // 记录当前loop所属的线程idTimestamp pollReturnTime_; // poller返回发生事件的channels的时间点std::unique_ptr<Poller> poller_;int wakeupFd_; // 当mainLoop获取一个新用户的channel,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channel。使用eventfd// unlike in TimerQueue, which is an internal class,// we don't expose Channel to client.std::unique_ptr<Channel> wakeupChannel_;// scratch variablesChannelList activeChannels_;std::atomic_bool callingPendingFunctors_; // 标识当前loop是否有需要执行的回调操作,正在执行则为truestd::vector<Functor> pendingFunctors_;    // 存储loop需要执行的所有回调操作std::mutex mutex_;                        // 保护pendingFunctors_线程安全
};

EventLoop.cc

#include "EventLoop.h"
#include "LogStream.h"
#include "Poller.h"
#include "Channel.h"#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <iostream>// 防止一个线程创建多个EventLoop    threadLocal
__thread EventLoop *t_loopInThisThread = nullptr;// 定义Poller超时时间
const int kPollTimeMs = 10000;// 创建weakupfd,用来notify唤醒subReactor处理新来的channel
int createEventfd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_FATAL << "Failed in eventfd" << errno;}return evtfd;
}EventLoop::EventLoop(): looping_(false),quit_(false),callingPendingFunctors_(false),threadId_(CurrentThread::tid()),poller_(Poller::newDefaultPoller(this)),wakeupFd_(createEventfd()),wakeupChannel_(new Channel(this, wakeupFd_))
{LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread){LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;}else{t_loopInThisThread = this;}// 设置weakupfd的事件类型以及发生事件后的回调操作wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));// we are always reading the wakeupfd// 每一个EventLoop都将监听weakupChannel的EPOLLIN读事件了// 作用是subloop在阻塞时能够被mainLoop通过weakupfd唤醒wakeupChannel_->enableReading();
}EventLoop::~EventLoop()
{LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_<< " destructs in thread " << CurrentThread::tid();wakeupChannel_->disableAll();wakeupChannel_->remove();::close(wakeupFd_);t_loopInThisThread = NULL;
}void EventLoop::handleRead()
{uint64_t one = 1;ssize_t n = read(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";}
}void EventLoop::loop()
{looping_ = true;quit_ = false;LOG_INFO << "EventLoop " << this << " start looping";while (!quit_){activeChannels_.clear();// 当前EventLoop的Poll,监听两类fd,client的fd(正常通信的,在baseloop中)和 weakupfd(mainLoop 和 subLoop 通信用来唤醒sub的)pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);for (Channel *channel : activeChannels_){// Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件channel->handleEvent(pollReturnTime_);}// 执行当前EventLoop事件循环需要处理的回调操作/*** IO线程 mainLoop 只 accept 然后返回client通信用的fd <= 用channel打包 并分发给 subloop* mainLoop事先注册一个回调cb(需要subLoop来执行),weakup subloop后,* 执行下面的方法,执行之前mainLoop注册的cb操作(一个或多个)*/doPendingFunctors();}LOG_INFO << "EventLoop " << this << " stop looping";looping_ = false;
}/*** 退出事件循环* 1、loop在自己的线程中 调用quit,此时肯定没有阻塞在poll中* 2、在其他线程中调用quit,如在subloop(woker)中调用mainLoop(IO)的qiut**                  mainLoop* *      Muduo库没有 生产者-消费者线程安全的队列 存储Channel*      直接使用wakeupfd进行线程间的唤醒       ** subLoop1         subLoop2        subLoop3*/
void EventLoop::quit()
{quit_ = true;// 2中,此时,若当前woker线程不等于mainLoop线程,将本线程在poll中唤醒if (!isInLoopThread()){wakeup();}
}void EventLoop::runInLoop(Functor cb)
{// LOG_DEBUG<<"EventLoop::runInLoop  cb:" << (cb != 0);if (isInLoopThread()) // 产生段错误{ // 在当前loop线程中 执行cbLOG_DEBUG << "在当前loop线程中 执行cb";cb();}else{ // 在其他loop线程执行cb,需要唤醒其loop所在线程,执行cbLOG_DEBUG << "在其他loop线程执行cb,需要唤醒其loop所在线程,执行cb";queueInLoop(cb);}
}void EventLoop::queueInLoop(Functor cb)
{{std::unique_lock<std::mutex> ulock(mutex_);pendingFunctors_.emplace_back(cb);}// 唤醒相应的,需要执行上面回调操作的loop线程// 若当前线程正在执行回调doPendingFunctors,但是又有了新的回调cb// 防止执行完回调后又阻塞在poll上无法执行新cb,所以预先wakeup写入一个数据if (!isInLoopThread() || callingPendingFunctors_) {wakeup(); // 唤醒loop所在线程}
}// 用来唤醒loop所在的线程,向wakeupfd写一个数据,wakeupChannel就发生读事件,当前loop线程就会被唤醒
void EventLoop::wakeup()
{uint64_t one = 1;ssize_t n = ::write(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";}
}void EventLoop::updateChannel(Channel *channel)
{// channel是发起方,通过loop调用pollpoller_->updateChannel(channel);
}void EventLoop::removeChannel(Channel *channel)
{// channel是发起方,通过loop调用pollpoller_->removeChannel(channel);
}bool EventLoop::hasChannel(Channel *channel)
{return poller_->hasChannel(channel);
}// 执行回调,由TcpServer提供的回调函数
void EventLoop::doPendingFunctors()
{std::vector<Functor> functors;callingPendingFunctors_ = true; // 正在执行回调操作{ // 使用swap,将原pendingFunctors_置空并且释放,其他线程不会因为pendingFunctors_阻塞std::unique_lock<std::mutex> lock(mutex_);functors.swap(pendingFunctors_);}for (const Functor &functor : functors){functor(); // 执行当前loop需要的回调操作}callingPendingFunctors_ = false;
}

相关文章:

  • Flink 数据源
  • 2024年短视频评论区批量爬取采集软件
  • 2024新数据库入门教程
  • JDK的Dockerfile
  • Shell脚本学习笔记(更新中...)
  • 安捷伦Agilent 8114A脉冲发生器的特点资料
  • C++: 二叉搜索树及实现
  • 【讲解下Web前端三大主流的框架】
  • 数据结构--树
  • io.net 是什么,DePIN(去中心化物理基础设施网络)
  • 【quarkus系列】构建可执行文件native image
  • Android:使用Kotlin搭建MVI架构模式
  • 内网横向移动小补充 --->PTK
  • sqlserver——查询(四)——连接查询
  • 高光谱成像技术简介,怎么选择成像方案?
  • HashMap剖析之内部结构
  • IIS 10 PHP CGI 设置 PHP_INI_SCAN_DIR
  • php中curl和soap方式请求服务超时问题
  • swift基础之_对象 实例方法 对象方法。
  • 从零开始的webpack生活-0x009:FilesLoader装载文件
  • 盘点那些不知名却常用的 Git 操作
  • 设计模式走一遍---观察者模式
  • 通过获取异步加载JS文件进度实现一个canvas环形loading图
  • 小程序01:wepy框架整合iview webapp UI
  • 原生js练习题---第五课
  • #{}和${}的区别是什么 -- java面试
  • #07【面试问题整理】嵌入式软件工程师
  • #Linux杂记--将Python3的源码编译为.so文件方法与Linux环境下的交叉编译方法
  • $.type 怎么精确判断对象类型的 --(源码学习2)
  • (4)(4.6) Triducer
  • (PHP)设置修改 Apache 文件根目录 (Document Root)(转帖)
  • (第一天)包装对象、作用域、创建对象
  • (附源码)springboot车辆管理系统 毕业设计 031034
  • (附源码)ssm经济信息门户网站 毕业设计 141634
  • (南京观海微电子)——COF介绍
  • (区间dp) (经典例题) 石子合并
  • (十三)Flask之特殊装饰器详解
  • *1 计算机基础和操作系统基础及几大协议
  • .net core 6 使用注解自动注入实例,无需构造注入 autowrite4net
  • @configuration注解_2w字长文给你讲透了配置类为什么要添加 @Configuration注解
  • [ IOS ] iOS-控制器View的创建和生命周期
  • [20171101]rman to destination.txt
  • [c#基础]值类型和引用类型的Equals,==的区别
  • [c]扫雷
  • [C++] 统计程序耗时
  • [C++提高编程](三):STL初识
  • [Geek Challenge 2023] web题解
  • [javaSE] 数据结构(二叉查找树-插入节点)
  • [LeetCode] 626. 换座位
  • [leetcode] 四数之和 M
  • [MZ test.16]P2 math 乘方e
  • [Oh My C++ Diary]用cout输出时后endl的使用
  • [OPEN SQL] 修改数据
  • [PyQt] Pycharm 配置 PyQt 开发环境
  • [Qt桌面开发]一个Qt简单界面的开发