当前位置: 首页 > news >正文

07 | Swoole 源码分析之 Channel 通道模块

原文首发链接:Swoole 源码分析之 Channel 通道模块
大家好,我是码农先森。

引言

通道,用于协程间通讯,支持多生产者协程和多消费者协程。底层自动实现了协程的切换和调度。

通道与 PHP 的 Array 类似,仅占用内存,没有其他额外的资源申请,所有操作均为内存操作,无 IO 消耗。

底层使用 PHP 引用计数实现,无内存拷贝。即使是传递巨大字符串或数组也不会产生额外性能消耗 channel 基于引用计数实现,是零拷贝的。

源码拆解

Channel 通道需要在协程环境中使用,我们先看下面这段代码,使用 new Channel(1) 创建一个 channel 对象,然后在第一个协程中向通道中推送数据,在第二个协程获取到通道内的数据进行消费。

use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\run;run(function(){// 创建 channel 通道对象$channel = new Channel(1);Coroutine::create(function () use ($channel) {for($i = 0; $i < 10; $i++) {Coroutine::sleep(1.0);// 向通道内推送数据$channel->push(['rand' => rand(1000, 9999), 'index' => $i]);echo "{$i}\n";}});Coroutine::create(function () use ($channel) {while(1) {// 从通道中获取数据$data = $channel->pop(2.0);if ($data) {var_dump($data);} else {assert($channel->errCode === SWOOLE_CHANNEL_TIMEOUT);break;}}});
});

在分析源代码之前,我们可以提前看一下源码整体的调用逻辑图,以便我们有个大致的印象。

这段代码主要是在 Swoole 的协程环境中创建 Channel 对象并初始化其容量的逻辑。

// swoole-src/ext-src/swoole-channel.cc:132
static PHP_METHOD(swoole_channel_coro, __construct) {zend_long capacity = 1;// 解析传入的参数ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1)Z_PARAM_OPTIONALZ_PARAM_LONG(capacity)ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);if (capacity <= 0) {capacity = 1;}// 当前对象对应的 ChannelObject 结构体指针ChannelObject *chan_t = php_swoole_channel_coro_fetch_object(Z_OBJ_P(ZEND_THIS));// 为该通道对象分配新的 Channel 实例,并设置其容量为传入的值。chan_t->chan = new Channel(capacity);zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("capacity"), capacity);
}

这段代码主要是在 Swoole 的协程环境中向通道中推送数据并对返回结果进行处理的逻辑。

// swoole-src/ext-src/swoole-channel.cc:149
static PHP_METHOD(swoole_channel_coro, push) {// 获取当前对象的 Channel 实例Channel *chan = php_swoole_get_channel(ZEND_THIS);zval *zdata;double timeout = -1;// 解析传入的参数ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 2)Z_PARAM_ZVAL(zdata)Z_PARAM_OPTIONALZ_PARAM_DOUBLE(timeout)ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);Z_TRY_ADDREF_P(zdata);zdata = sw_zval_dup(zdata);// 向通道中推入数据if (chan->push(zdata, timeout)) {zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), Channel::ERROR_OK);RETURN_TRUE;} else {zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), chan->get_error());Z_TRY_DELREF_P(zdata);efree(zdata);RETURN_FALSE;}
}// swoole-src/coroutine/channel.cc:105
bool Channel::push(void *data, double timeout) {// 获取当前协程对象 current_coCoroutine *current_co = Coroutine::get_current_safe();// 如果通道已关闭if (closed) {// 设置错误并返回空指针error_ = ERROR_CLOSED;return false;}// 如果通道已满或生产者队列不为空,则设置超时消息,并根据传入的超时值添加定时器,等待生产者。if (is_full() || !producer_queue.empty()) {TimeoutMessage msg;msg.error = false;msg.timer = nullptr;if (timeout > 0) {msg.chan = this;msg.type = PRODUCER;msg.co = current_co;// 根据传入的超时值添加定时器msg.timer = swoole_timer_add(timeout, false, timer_callback, &msg);}// 挂起生产者协程yield(PRODUCER);// 如果设置了定时器,则在超时消息中删除定时器if (msg.timer) {swoole_timer_del(msg.timer);}// 如果当前协程被取消if (current_co->is_canceled()) {// 设置错误并返回空指针error_ = ERROR_CANCELED;return nullptr;}// 如果发生超时if (msg.error) {// 设置错误并返回空指针error_ = ERROR_TIMEOUT;return nullptr;}// 如果通道关闭且为空的情况if (closed && is_empty()) {// 设置相应的错误并返回空指针。error_ = ERROR_CLOSED;return nullptr;}}// 将数据压入数据队列。data_queue.push(data);swoole_trace_log(SW_TRACE_CHANNEL, "push data to channel, count=%ld", length());// 如果消费者队列不为空,则唤醒消费者协程。if (!consumer_queue.empty()) {Coroutine *co = pop_coroutine(CONSUMER);// 恢复消费者协程co->resume();}return true;
}

这段代码主要是在 Swoole 的协程环境中从通道中取出数据并对返回结果进行处理的逻辑。

// swoole-src/ext-src/swoole-channel.cc:175
static PHP_METHOD(swoole_channel_coro, pop) {// 获取当前对象的 Channel 实例Channel *chan = php_swoole_get_channel(ZEND_THIS);// 设置超时变量为-1double timeout = -1;// 解析一个超时参数ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1)Z_PARAM_OPTIONALZ_PARAM_DOUBLE(timeout)ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);// 从通道中取出数据,并返回一个 zval 指针zval *zdata = (zval *) chan->pop(timeout);// 如果返回的 zval 指针不为空if (zdata) {// 将其返回给 PHP 脚本,并释放内存RETVAL_ZVAL(zdata, 0, 0);efree(zdata);zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), Channel::ERROR_OK);} else {zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), chan->get_error());RETURN_FALSE;}
}// swoole-src/coroutine/channel.cc:55
void *Channel::pop(double timeout) {// 获取当前协程对象 current_coCoroutine *current_co = Coroutine::get_current_safe();// 如果通道已关闭且为空if (closed && is_empty()) {// 设置错误并返回空指针error_ = ERROR_CLOSED;return nullptr;}// 如果通道为空或者消费者队列不为空if (is_empty() || !consumer_queue.empty()) {TimeoutMessage msg;msg.error = false;msg.timer = nullptr;if (timeout > 0) {msg.chan = this;msg.type = CONSUMER;msg.co = current_co;// 根据传入的超时值添加定时器msg.timer = swoole_timer_add(timeout, false, timer_callback, &msg);}// 挂起消费者协程yield(CONSUMER);// 如果设置了定时器,则在超时消息中删除定时器if (msg.timer) {swoole_timer_del(msg.timer);}// 如果当前协程被取消if (current_co->is_canceled()) {// 设置错误并返回空指针error_ = ERROR_CANCELED;return nullptr;}// 如果发生超时if (msg.error) {// 设置错误并返回空指针error_ = ERROR_TIMEOUT;return nullptr;}// 如果通道关闭且为空的情况if (closed && is_empty()) {// 设置相应的错误并返回空指针。error_ = ERROR_CLOSED;return nullptr;}}// 从数据队列中弹出数据,并返回该数据。void *data = data_queue.front();data_queue.pop();// 如果生产者队列不为空,则唤醒生产者协程if (!producer_queue.empty()) {Coroutine *co = pop_coroutine(PRODUCER);// 恢复到生产者协程co->resume();}return data;
}

这段代码一是针对超时回调处理的处理逻辑,并恢复相关的协程操作。二是实现了协程的挂起操作,并根据不同的类型将当前协程放入不同的队列中,以便后续根据需要恢复执行。

// swoole-src/coroutine/channel.cc:22
void Channel::timer_callback(Timer *timer, TimerNode *tnode) {TimeoutMessage *msg = (TimeoutMessage *) tnode->data;msg->error = true;msg->timer = nullptr;if (msg->type == CONSUMER) {// 从消费者队列中移除该协程msg->chan->consumer_remove(msg->co);} else {// 从生产者队列中移除该协程msg->chan->producer_remove(msg->co);}// 恢复协程msg->co->resume();
}// swoole-src/coroutine/channel.cc:34
void Channel::yield(enum Opcode type) {// 获取当前协程Coroutine *co = Coroutine::get_current_safe();if (type == PRODUCER) {// 将当前协程放入到生产者队列producer_queue.push_back(co);swoole_trace_log(SW_TRACE_CHANNEL, "producer cid=%ld", co->get_cid());} else {// 将当前协程放入到消费者队列consumer_queue.push_back(co);swoole_trace_log(SW_TRACE_CHANNEL, "consumer cid=%ld", co->get_cid());}// 挂起被取消,则调用该函数Coroutine::CancelFunc cancel_fn = [this, type](Coroutine *co) {if (type == CONSUMER) {consumer_remove(co);} else {producer_remove(co);}co->resume();return true;};// 挂起当前协程co->yield(&cancel_fn);
}

总结

  1. Channel 通道需要在协程的环境中进行使用,通道是纯内存操作,没有 IO 消耗,非常高效。
  2. 底层使用 Channel::yield 函数实现了协程的自动切换和调度,如果通道处理超时则会自动调用 Channel::timer_callback 函数。
  3. Channel 通道是跨协程直接通信的一大利器,在实际的场景中使用起来十分的便利、高效。

相关文章:

  • 大语言模型RAG vs. 长文本
  • 【Linux】make是如何判断可执行文件是否需要重新编译呢?(ACM时间)
  • 基于springboot+vue+Mysql的职称评审管理系统
  • SQL语句的编写
  • 将博客搬至稀土掘金中
  • pipeline流水线学习
  • C数据结构:单链表
  • MySQL innoDB存储引擎多事务场景下的事务执行情况
  • java操作linux
  • Covalent Network(CQT)推出以太坊质押迁移计划,以增强长期结构化数据可用性、塑造万亿级 LLM 参数体系
  • 输入输出系统的发展历程
  • python + jdbc 连接 达梦数据库
  • 在Linux系统上实现TCP(socket)通信
  • c++20协程详解(三)
  • 19、差分矩阵
  • 【跃迁之路】【735天】程序员高效学习方法论探索系列(实验阶段492-2019.2.25)...
  • es6要点
  • GitUp, 你不可错过的秀外慧中的git工具
  • Hexo+码云+git快速搭建免费的静态Blog
  • javascript 哈希表
  • Less 日常用法
  • Logstash 参考指南(目录)
  • Python爬虫--- 1.3 BS4库的解析器
  • Sass Day-01
  • sessionStorage和localStorage
  • vue总结
  • 批量截取pdf文件
  • 前嗅ForeSpider中数据浏览界面介绍
  • 树莓派 - 使用须知
  • 微信小程序:实现悬浮返回和分享按钮
  • 小程序、APP Store 需要的 SSL 证书是个什么东西?
  • 一道面试题引发的“血案”
  • 与 ConTeXt MkIV 官方文档的接驳
  • # 深度解析 Socket 与 WebSocket:原理、区别与应用
  • #HarmonyOS:Web组件的使用
  • (SpringBoot)第二章:Spring创建和使用
  • (八)c52学习之旅-中断实验
  • (超简单)构建高可用网络应用:使用Nginx进行负载均衡与健康检查
  • (免费分享)基于springboot,vue疗养中心管理系统
  • (强烈推荐)移动端音视频从零到上手(上)
  • (源码版)2024美国大学生数学建模E题财产保险的可持续模型详解思路+具体代码季节性时序预测SARIMA天气预测建模
  • (自适应手机端)响应式新闻博客知识类pbootcms网站模板 自媒体运营博客网站源码下载
  • .NET/C# 判断某个类是否是泛型类型或泛型接口的子类型
  • .net6+aspose.words导出word并转pdf
  • .Net转Java自学之路—基础巩固篇十三(集合)
  • //解决validator验证插件多个name相同只验证第一的问题
  • [ NOI 2001 ] 食物链
  • [20171102]视图v$session中process字段含义
  • [20190401]关于semtimedop函数调用.txt
  • [Android]使用Retrofit进行网络请求
  • [C/C++] -- 二叉树
  • [C++][基础]1_变量、常量和基本类型
  • [Docker]四.Docker部署nodejs项目,部署Mysql,部署Redis,部署Mongodb
  • [Git].gitignore失效的原因
  • [HTML]Web前端开发技术7(HTML5、CSS3、JavaScript )CSS的定位机制——喵喵画网页