当前位置: 首页 > news >正文

网络编程(6)——发送的时序性,全双工通信

六、day6

在上午学习完如何通过c++11特性模拟伪闭包实现连接的安全回收之后,下午学习如何封装一个发送接口,该接口能保证发送的时序性(异步发送时TCP底层缓冲区可能无法将所有数据一次发出去,如果这时候再次调用异步发送,就可能造成数据错乱)。实现的关键在于:多次发送时,异步的发送要保证回调触发后再次发送才能确保数据是有序的(队列)。

文章开始前将前面文章中提到的Server和Session类分成CServer.h和CSession.h两个文件,注意两个文件的依赖关系:

CSession.h

#pragma once
#include <iostream>
#include <boost/asio.hpp>
#include <map>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>using boost::asio::ip::tcp;
using std::cout;
using std::cin;
using std::endl;class CServer;class MsgNode {
public:int _total_len; // 数据的总长度int _cur_len; // 已经处理的长度(已读的长度或者已写的长度)char* _msg; // 数据域首地址MsgNode(const char* msg, int total_len) :_total_len(total_len), _cur_len(0) { // 构造写节点_msg = new char[total_len];memcpy(_msg, msg, total_len);}MsgNode(int total_len) : _total_len(total_len), _cur_len(0) { // 构造读节点_msg = new char[total_len];}~MsgNode() {delete[] _msg;}
};class CSession:public std::enable_shared_from_this<CSession>
{
private:tcp::socket _socket; // 处理客户端读写的套接字enum { max_length = 1024 };char _data[max_length]; // headle回调函数void headle_read(const boost::system::error_code& error, size_t bytes_transferred,std::shared_ptr<CSession> _self_shared);void haddle_write(const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared);std::string _uuid;CServer* _server;
public:CSession(boost::asio::io_context& ioc, CServer* server) : _socket(ioc), _server(server){// random_generator是函数对象,加()就是函数,再加一个()就是调用该函数boost::uuids::uuid a_uuid = boost::uuids::random_generator()();_uuid = boost::uuids::to_string(a_uuid);}tcp::socket& Socket() { return _socket; }const std::string& GetUuid() const { return _uuid; }void Start();
};

CSession.cpp

#include "CSession.h"
#include "CServer.h"void CSession::Start() {memset(_data, 0, max_length); // 缓冲区清零// 从套接字中读取数据,并绑定回调函数headle_read_socket.async_read_some(boost::asio::buffer(_data, max_length),// 这里可以将shared_ptr<Session>(this)给bind绑定吗?// 不可以,会造成多个智能指针绑定同一块内存的问题std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2,shared_from_this()));
}// 
void CSession::headle_read(const boost::system::error_code& error, size_t bytes_transferred,std::shared_ptr<CSession> _self_shared) {if (!error) {cout << "server receive data is " << _data << endl;boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transferred),std::bind(&CSession::haddle_write, this, std::placeholders::_1, _self_shared));}else {cout << "read error" << endl;_server->ClearSession(_uuid);}
}void CSession::haddle_write(const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared) {if (!error) {memset(_data, 0, max_length);_socket.async_read_some(boost::asio::buffer(_data, max_length),std::bind(&CSession::headle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));}else {cout << "write error" << error.value() << endl;_server->ClearSession(_uuid);}
}

CServer.h

#pragma once
#include "CSession.h"class CServer
{
private:void start_accept();  // 启动一个acceptor// 当acceptor接收到连接后启动该函数void handle_accept(std::shared_ptr<CSession> new_session, const boost::system::error_code& error);boost::asio::io_context& _ioc;tcp::acceptor _acceptor;std::map<std::string, std::shared_ptr<CSession>> _sessions;
public:CServer(boost::asio::io_context& ioc, short port);void ClearSession(std::string uuid);
};

CServer.cpp

#include "CServer.h"// 初始化服务器对象,绑定 I/O 上下文和监听的端口,并启动服务器
CServer::CServer(boost::asio::io_context& ioc, short port) : _ioc(ioc),
_acceptor(ioc, tcp::endpoint(tcp::v4(), port)) {cout << "Server start success, on port: " << port << endl;// 开始异步地接受客户端连接请求。服务器启动后就进入等待客户端连接的状态start_accept();
}void CServer::ClearSession(std::string uuid) {_sessions.erase(uuid);
}void CServer::start_accept() {// make_shared分配并构造一个 std::shared_ptr,_ioc, this是传给Session的参数std::shared_ptr<CSession> new_session = std::make_shared<CSession>(_ioc, this);// 开始一个异步接受操作,当new_session的socket与客户端连接成功时,调用回调函数handle_accept// 为什么new_session在右括号结束后仍不结束,而是bind后计数加一?// new_session通过bind绑定时,new_session的计数就会加一,所以在bind后,new_session的生命周期和// 新构造函数的生命周期相同,因为新生成的函数对象引用了new_session(new_session通过值传递的方式被复制构造函数使用)。// 所以只要新构造的bind回调函数没有被调用、移除,new_session的声明周期就始终存在,所以new_session不会随着'}'的结束而释放。_acceptor.async_accept(new_session->Socket(), std::bind(&CServer::handle_accept, this, new_session,std::placeholders::_1));
}// 当handle_accept触发时,也就是start_accept的回调函数被触发,当该回调函数结束后从队列中移除后,new_session的引用计数减一
void CServer::handle_accept(std::shared_ptr<CSession> new_session, const boost::system::error_code& error) {// 如果没有错误(error 为 false),调用 new_session->Start() 来启动与旧客户端的会话if (!error) {new_session->Start();_sessions.insert(std::make_pair(new_session->GetUuid(), new_session));}else cout << "session accept failed, error is " << error.what() << endl;// 无论当前连接是否成功,都重新调用 start_accept(),以便服务器能够继续接受下一个新客户端的连接请求。// 服务器始终保持在监听状态,随时准备接受新连接start_accept();
}

1)数据节点设计

首先,使用网络编程(3)中的数据节点,作为异步服务器数据的存储节点,放在CSession.h文件中

爱吃土豆:网络编程自学(3)1 赞同 · 0 评论文章

class MsgNode {
public:int _total_len; // 数据的总长度int _cur_len; // 已经处理的长度(已读的长度或者已写的长度)char* _msg; // 数据域首地址MsgNode(const char* msg, int total_len) :_total_len(total_len), _cur_len(0) { // 构造写节点_msg = new char[total_len];memcpy(_msg, msg, total_len);}MsgNode(int total_len) : _total_len(total_len), _cur_len(0) { // 构造读节点_msg = new char[total_len];}~MsgNode() {delete[] _msg;}
};

2)封装发送接口

服务器的发送接口一般是在逻辑线程调用,所以调用发送线程的接口和asio回调的网络线程不在一个线程,这个发送队列就存在两个线程的共同访问,所以需增加一个保证发送队列的安全性,同时新增一个发送接口Send

void Send(char* msg,  int max_length);
std::queue<shared_ptr<MsgNode> > _send_que;
std::mutex _send_lock;

以及send的实现:

发送接口里判断发送队列是否为空,如果不为空说明有数据未发送完,需要将数据放入队列,然后返回。如果发送队列为空,则说明当前没有未发送完的数据,将要发送的数据放入队列并调用async_write函数发送数据。

这个函数确保了在多线程环境下,发送操作的有序性与安全性。通过锁来保护发送队列,通过队列来缓存多个待发送的消息,并使用异步写操作 async_write 进行非阻塞的发送。函数还确保了只有一个异步写操作会在某一时刻进行,避免同时多次发送操作对同一套接字的竞争访问。

void CSession::Send(char* msg, int max_length) {bool pending = false; // 发送标志,true时有未完成的发送操作,false为空// 使用lock_guard锁住_send_lock,确保_send_lock(发送队列)的访问的线程安全的// 锁的存在确保了多个线程不会同时修改发送队列std::lock_guard<std::mutex> lock(_send_lock);// 判断队列是否有未完成的发送操作if (_send_que.size() > 0) {pending = true;}_send_que.push(std::make_shared<MsgNode>(msg, max_length)); // 将发送消息存储至队列if (pending) { // 如果有未完成的发送,直接返回return;}// 异步发送boost::asio::async_write(_socket, boost::asio::buffer(msg, max_length),std::bind(&CSession::haddle_write, this, std::placeholders::_1, shared_from_this()));
} // 当'}'结束后,_send_lock解锁,发送队列解锁

3)修改读和写回调

写回调(实现了异步写操作完成后的处理逻辑,在写入操作成功时从发送队列中移除已发送的数据,并继续处理队列中的下一个数据包;如果写入操作失败,则处理错误并清除会话):

// 异步写操作完成后的回调处理函数
void CSession::haddle_write(const boost::system::error_code& error, std::shared_ptr<CSession> _self_shared) {if (!error) { // 检查异步写是否成功std::lock_guard<std::mutex> lock(_send_lock); // 加锁保护发送队列_send_que.pop(); // 移除上一个已发送的消息(send函数中的异步发)if (!_send_que.empty()) { // 若队列不为空,处理下一个消息auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_msg, msgnode->_total_len),std::bind(&CSession::haddle_write, this, std::placeholders::_1, _self_shared));}}else {std::cout << "handle write failed, error is " << error.what() << endl;_server->ClearSession(_uuid);}
}

读回调:

因为服务器一般是全双工通信,所以要一直监听对端发送的数据,在每次收到数据后继续绑定监听事件

void CSession::headle_read(const boost::system::error_code& error, size_t bytes_transferred,std::shared_ptr<CSession> _self_shared) {if (!error) {cout << "server receive data is " << _data << endl;Send(_data, bytes_transferred); // 将收到的消息回传memset(_data, 0, max_length); // 缓冲区清零_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&CSession::headle_read, this,std::placeholders::_1, std::placeholders::_2, _self_shared));}else {std::cout << "handle read failed, error is " << error.what() << endl;_server->ClearSession(_uuid);}
}

1.服务器的发送接口一般是在逻辑线程调用,调用发送线程的接口和asio回调的网络线程不在一个线程,发送队列存在两个线程的共同访问,如何解释这句话?

1)逻辑线程

  • 服务器程序可能有多个线程执行不同的任务,其中有一个或多个线程专门负责业务逻辑(通常称为逻辑线程)。这些线程负责处理如游戏逻辑、业务处理等高层次的操作。
  • 发送数据的请求通常由这些逻辑线程发起,也就是说,逻辑线程会调用服务器的发送接口来准备或触发向客户端发送数据。

2)网络线程

  • 使用Boost.Asio这样的异步I/O库时,实际处理网络通信的部分是由网络线程(通常是由Boost.Asio提供的线程)来负责。这些线程处理所有的网络事件和I/O操作,比如读写操作完成时的回调函数。
  • 当逻辑线程调用发送接口时,实际的数据发送操作是交由网络线程处理的,因此存在逻辑线程网络线程之间的协作。这两个线程不是同一个线程,存在并发访问的问题。

3)发送队列

  • 为了实现异步发送,服务器通常会有一个发送队列,用于暂存即将发送的数据包。逻辑线程将数据放入这个队列中,而网络线程则从队列中取出数据并通过网络发送出去。
  • 因为这个发送队列是由两个不同的线程(逻辑线程和网络线程)共同访问的,因此会有并发问题。如果没有进行适当的同步控制(如加锁或使用无锁队列),可能会导致数据竞争(data race)、不一致或崩溃等问题。

2."std::lock_guard<std::mutex> lock(_send_lock)"是如何保护发送队列的,什么时候解除保护?

std::lock_guard<std::mutex> 是一个类模板,它会在创建时锁住传递的互斥量(mutex),并在离开作用域时自动解锁。锁的保护通过以下机制实现:

std::lock_guard<std::mutex> lock(_send_lock);
  • 这行代码创建了一个 std::lock_guard 对象 lock,并将 _send_lock 传递给它,表示要锁定 _send_lock 互斥量。
  • 一旦 lock 对象被创建,构造函数会立即锁定 _send_lock,从而确保在该作用域内,其他线程无法同时访问受该锁保护的资源。
  • 锁定后,直到当前代码块结束前,其他线程无法获取 _send_lock,从而保证了临界区(即锁定代码之后的代码块)的线程安全。

std::lock_guard<std::mutex> 的锁定持续到该对象的生命周期结束。当 lock 对象超出其作用域时(即代码块结束时),它会自动调用其析构函数,从而释放互斥锁 _send_lock。

3.锁的机制

锁是用于协调对共享资源(比如发送队列)的访问,确保在多线程环境中只有一个线程能够在某一时刻访问该资源,当一个线程在访问发送队列时:

std::lock_guard<std::mutex> lock(_send_lock);
_send_que.push(...);  // 或者 _send_que.pop()
  • 通过 lock_guard 锁定 _send_lock,只有当前线程能进入这段代码,并操作 _send_que
  • 如果其他线程也想访问队列,它们会在获取 _send_lock 时被阻塞,直到当前线程释放锁。这就防止了多个线程同时修改队列的可能性。

举例:

void CSession::Send(char* msg, int max_length) {std::lock_guard<std::mutex> lock(_send_lock); // 锁定互斥锁_send_que.push(std::make_shared<MsgNode>(msg, max_length)); // 对共享队列的操作// 其他代码
}
  • 当线程A调用 Send() 函数并进入这段代码时,它加锁 _send_lock,防止其他线程B同时修改 _send_que。
  • 线程B调用 Send() 函数时,会发现 _send_lock 被线程A持有,线程B必须等待线程A释放锁后,才能获得锁并访问队列。

相关文章:

  • 一个 Java 语言简化处理 PDF 的框架,提供了一套简单易用的 API 接口,满足多样化需求又能简化开发流程的处理方案(附教程)
  • 【AD那些事 10 】焊盘如何修改为自己想要的形状!!!!! 焊盘设计规则如何更改??????
  • 【架构设计】同步与异步:应用场景与选择指南
  • cpu路、核、线程、主频、缓存
  • 相似度度量方法有哪些?
  • 数据结构--单链表
  • 创建Express后端项目
  • python之装饰器、迭代器、生成器
  • linux ip命令使用
  • npm run build报Cannot find module错误的解决方法
  • 容器技术介绍
  • 卷积神经网络(CNN)图像处理与识别原理
  • CE认证大电流计量装置
  • 如何把PDF样本册转换为网址链接
  • 护眼台灯哪个品牌更好?五款由专业眼科医生推荐的护眼台灯
  • Android 控件背景颜色处理
  • Java IO学习笔记一
  • JS题目及答案整理
  • JS字符串转数字方法总结
  • LeetCode18.四数之和 JavaScript
  • React as a UI Runtime(五、列表)
  • REST架构的思考
  • 好的网址,关于.net 4.0 ,vs 2010
  • 回流、重绘及其优化
  • 小而合理的前端理论:rscss和rsjs
  • ​第20课 在Android Native开发中加入新的C++类
  • #NOIP 2014# day.1 T3 飞扬的小鸟 bird
  • (03)光刻——半导体电路的绘制
  • (16)Reactor的测试——响应式Spring的道法术器
  • (aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器
  • (Charles)如何抓取手机http的报文
  • (Matalb时序预测)WOA-BP鲸鱼算法优化BP神经网络的多维时序回归预测
  • (创新)基于VMD-CNN-BiLSTM的电力负荷预测—代码+数据
  • (第一天)包装对象、作用域、创建对象
  • (免费领源码)python#django#mysql公交线路查询系统85021- 计算机毕业设计项目选题推荐
  • (三)centos7案例实战—vmware虚拟机硬盘挂载与卸载
  • (三)elasticsearch 源码之启动流程分析
  • (三)Kafka 监控之 Streams 监控(Streams Monitoring)和其他
  • (十三)Java springcloud B2B2C o2o多用户商城 springcloud架构 - SSO单点登录之OAuth2.0 根据token获取用户信息(4)...
  • (数据大屏)(Hadoop)基于SSM框架的学院校友管理系统的设计与实现+文档
  • (一)utf8mb4_general_ci 和 utf8mb4_unicode_ci 适用排序和比较规则场景
  • (已解决)vscode如何选择python解释器
  • ./configure、make、make install 命令
  • .MSSQLSERVER 导入导出 命令集--堪称经典,值得借鉴!
  • .NET Core WebAPI中使用Log4net 日志级别分类并记录到数据库
  • .NET 应用启用与禁用自动生成绑定重定向 (bindingRedirect),解决不同版本 dll 的依赖问题
  • .NET连接MongoDB数据库实例教程
  • @AutoConfigurationPackage的使用
  • [ solr入门 ] - 利用solrJ进行检索
  • [@Controller]4 详解@ModelAttribute
  • [5] CUDA线程调用与存储器架构
  • [7] CUDA之常量内存与纹理内存
  • [AX]AX2012 SSRS报表Drill through action
  • [C#小技巧]如何捕捉上升沿和下降沿
  • [C++]类和对象【上篇】