当前位置: 首页 > news >正文

Boost:asio网络编程从同步到异步

文章目录

  • 同步服务器
    • 客户端
    • 服务端
  • 异步服务器(有问题)
  • 异步服务器优化

在学TCP的时候,写的第一个服务器就是一个echo服务器,那在Boost网络编程中,自然也先写一个echo服务器练练手

同步服务器

客户端

#include <iostream>
#include <boost/asio.hpp>using namespace boost::asio::ip;
using namespace std;const int MAX_LENTH = 1024;int main()
{try{// 创建上下文服务boost::asio::io_context ioc;// 构造endpointtcp::endpoint remote_ep(address::from_string("127.0.0.1"), 10086);// 创建Sockettcp::socket sock(ioc);boost::system::error_code error = boost::asio::error::host_not_found;sock.connect(remote_ep, error);if (error){cout << "connect fail, code is " << error.value() << " Message: " << error.what() << endl;return 0;}cout << "connect success" << endl;cout << "Enter message: ";char request[MAX_LENTH];cin.getline(request, MAX_LENTH);size_t request_length = strlen(request);boost::asio::write(sock, boost::asio::buffer(request, request_length));char reply[MAX_LENTH];size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply, request_length));cout << "Reply is ";cout.write(reply, reply_length);cout << endl;}catch(exception& e){cerr << e.what() << endl;}return 0;
}

服务端

服务端这里采用的是一个线程处理一个连接的方式,同时使用一个智能指针集合来保证不被异常释放,主要是通过增加一个引用计数,使得不会在离开作用域后就被销毁

server的作用就是一个接收者,他负责进行连接的获取,并进行对应的分发,通过创建一个一个的线程,然后执行对应的session逻辑,进行对应的回报处理

#include <iostream>
#include <boost/asio.hpp>
#include <set>
#include <memory>using boost::asio::ip::tcp;
using namespace std;const int MAX_LENTH = 1024;
typedef shared_ptr<tcp::socket> socket_ptr;
set<shared_ptr<thread>> thread_set;// 服务器处理客户端的读和写
void session(socket_ptr sock)
{try{for (;;){char data[MAX_LENTH];memset(data, '\0', MAX_LENTH);boost::system::error_code error;// 直到读到最长为止,要读到1024字节//size_t length = boost::asio::read(sock, boost::asio::buffer(data, MAX_LENTH), error);// 部分读取,读多少算多少size_t length = sock->read_some(boost::asio::buffer(data, MAX_LENTH), error);if (error == boost::asio::error::eof){cout << "connection closed" << endl;break;}else if (error){throw boost::system::system_error(error);}cout << "receive from " << sock->remote_endpoint().address().to_string() << endl;cout << "receive message is " << data << endl;// 将数据传递回去boost::asio::write(*sock, boost::asio::buffer(data, MAX_LENTH));}}catch (exception& e){cerr << "Exception in thread" << e.what() << endl;}
}void server(boost::asio::io_context& ioc, unsigned short port)
{// 专门来接受连接,必须按照传递方式,Acceptor是归属于某个ioc,以及用ipv4的方式进行绑定到端口tcp::acceptor a(ioc, tcp::endpoint(tcp::v4(), port));for (;;){socket_ptr socket(new tcp::socket(ioc));a.accept(*socket);// 创建线程,做session工作,参数是Socket,等待客户端发数据,回传等工作auto t = make_shared<thread>(session, socket);thread_set.insert(t);}
}int main()
{try{boost::asio::io_context ioc;server(ioc, 10086);for (auto& t : thread_set){t->join();}}catch (exception& e){cerr << "Exception in thread" << e.what() << endl;}return 0;
}

但是这样同步的结构必然是不可取的,对于小体量的项目还可以接收,但是对于大级别的项目还是要使用异步的方式进行服务器的搭建,因此下面就开始搭建一个异步的服务器

异步服务器(有问题)

首先创建一个会话类

class Session
{
public:Session(boost::asio::io_context& ioc): _socket(ioc){}tcp::socket& Socket(){return _socket;}void Start();private:void handle_read(const boost::system::error_code& error, size_t byte_transferred);void handle_write(const boost::system::error_code& error);private:tcp::socket _socket;enum { max_length = 1024 };char _data[max_length];
};void Session::Start()
{memset(_data, 0, max_length);_socket.async_read_some(boost::asio::buffer(_data, max_length),bind(&Session::handle_read, this, placeholders::_1, placeholders::_2));
}// 分批次读取事件回调函数
void Session::handle_read(const boost::system::error_code& error, size_t byte_transferred)
{if (!error){cout << "server receive data is " << _data << endl;boost::asio::async_write(_socket, boost::asio::buffer(_data, byte_transferred),bind(&Session::handle_write, this, placeholders::_1));}else{cout << "read error" << endl;delete this;}
}void Session::handle_write(const boost::system::error_code& error)
{if (!error){memset(_data, 0, max_length);_socket.async_read_some(boost::asio::buffer(_data, max_length),bind(&Session::handle_read, this, placeholders::_1, placeholders::_2));}else{cout << "write error" << endl;delete this;}
}

这个会话主要负责的就是对应的echo的功能,将数据接收后再进行对应的发送的过程,收到数据后就进行回调,去将数据发送,同时进行数据发送的回调,进行数据的读取,直到当客户端关闭了连接,此时就会触发一个断开连接的操作,这个操作会被看成是一个读取操作失败,因此就会触发读回调函数的错误

但是这样的设计其实是有问题的,具体我们可以这样想,假设现在有一个内容,需要进行读取事件和写入事件,这是在一个作用域下要同时完成的,假如在进行读事件的时候触发了错误,比如客户端断开连接了,此时就会被捕获到异常,捕获到异常就会直接delete掉当前的这个session,而在作用域内还有一个写事件回调,这个写事件回调也可能会导致一个错误,也被捕获一次异常,进行一个delete函数,那这样就涉及到同一块内存析构两次的情况,这就会造成问题

那为了解决这样的问题,关键就是要对于session的生命周期进行一个合理的管理,因此就需要引入的是一个延长生命周期的操作,这里提供的是一个伪闭包来延长生命周期的操作,这里后续进行演示

为了保证模块的统一性,这里把后续的代码补充一下

class Server
{
public:Server(boost::asio::io_context& ioc, short port);private:// 启动一个监听连接的描述符void start_accept();// 有连接到来后的回调函数void handle_accept(Session* new_session, const boost::system::error_code& error);private:boost::asio::io_context& _ioc;tcp::acceptor _acceptor;
};

实现如下:

Server::Server(boost::asio::io_context& ioc, short port): _ioc(ioc), _acceptor(ioc, tcp::endpoint(tcp::v4(), port))
{// 需要被Acceptor进行捕获cout << "port: " << port << endl;start_accept();
}// 启动一个监听连接的描述符
void Server::start_accept()
{Session* new_session = new Session(_ioc);_acceptor.async_accept(new_session->Socket(), bind(&Server::handle_accept, this, new_session, placeholders::_1));
}// 有连接到来后的回调函数
void Server::handle_accept(Session* new_session, const boost::system::error_code& error)
{if (!error){new_session->Start();}else{delete new_session;}start_accept();
}

异步服务器优化

这样的异步服务器实际上是有问题的,问题主要在于前面说的析构的问题,那为了解决这样的问题,可以使用一个引用计数shared_ptr,并且使用一个Session会话进行管理,每一个Session创建一个uuid,作为一个字符串进行管理

那底层是如何进行管理的呢?本质上是维护了一个map,里面存储的是uid和对应的Session的智能指针的对应关系,前面我们说析构函数的问题在于,没有对于Session对象的生命周期进行合理的管理,因此我们就要想办法对于这个生命周期进行适度的延伸

下面理一下这个整体的调用逻辑

在这里插入图片描述
下面是代码实现:

#include "Session.h"
#include <iostream>
#include <boost/asio.hpp>using namespace std;
using boost::asio::ip::tcp;Session::Session(boost::asio::io_context& ioc, Server* server): _socket(ioc), _server(server)
{memset(_data, 0, max_length);boost::uuids::uuid a_uuid = boost::uuids::random_generator()();_uuid = boost::uuids::to_string(a_uuid);
}string& Session::GetUuid()
{return _uuid;
}tcp::socket& Session::Socket()
{return _socket;
}void Session::Start()
{memset(_data, 0, max_length);_socket.async_read_some(boost::asio::buffer(_data, max_length),bind(&Session::handle_read, this, placeholders::_1, placeholders::_2, shared_from_this()));
}// 分批次读取事件回调函数
void Session::handle_read(const boost::system::error_code& error, size_t byte_transferred, shared_ptr<Session> _self_shared)
{if (!error){cout << "server receive data is " << _data << endl;boost::asio::async_write(_socket, boost::asio::buffer(_data, byte_transferred),bind(&Session::handle_write, this, placeholders::_1, _self_shared));}else{cout << "read error" << endl;// 减少引用计数,直接从map中移除即可_server->ClearSession(_uuid);//delete this;}
}void Session::handle_write(const boost::system::error_code& error, shared_ptr<Session> _self_shared)
{if (!error){memset(_data, 0, max_length);_socket.async_read_some(boost::asio::buffer(_data, max_length),bind(&Session::handle_read, this, placeholders::_1, placeholders::_2, _self_shared));}else{cout << "write error" << endl;_server->ClearSession(_uuid);//delete this;}
}Server::Server(boost::asio::io_context& ioc, short port): _ioc(ioc), _acceptor(ioc, tcp::endpoint(tcp::v4(), port))
{// 需要被Acceptor进行捕获cout << "port: " << port << endl;start_accept();
}void Server::ClearSession(string uuid)
{_sessions.erase(uuid);
}// 启动一个监听连接的描述符
void Server::start_accept()
{// 使用智能指针来进行管理shared_ptr<Session> new_session = make_shared<Session>(_ioc, this);//Session* new_session = new Session(_ioc);_acceptor.async_accept(new_session->Socket(), bind(&Server::handle_accept, this, new_session, placeholders::_1));
}// 有连接到来后的回调函数
void Server::handle_accept(shared_ptr<Session> new_session, const boost::system::error_code& error)
{if (!error){new_session->Start();_sessions.insert(make_pair(new_session->GetUuid(), new_session));}else{// delete new_session;}start_accept();
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【C++】函数重载
  • idea个人常用快捷键设置
  • 掌握PyCharm代码格式化秘籍:提升代码质量的终极指南
  • vue3: vuedraggable 的使用方法(正常数据的基本使用与树结构数据递归使用)
  • 【K8S】为什么需要Kubernetes?
  • 【Wireshark 抓 CAN 总线】Wireshark 抓取 CAN 总线数据的实现思路
  • STM32 | ADC+RS485编写代码,实现光敏电阻控制灯的亮度
  • C语言 | Leetcode C语言题解之第319题灯泡开关
  • CSS面试题
  • 学习笔记一
  • JVM—HotSpot虚拟机对象探秘
  • 2024年第五届华数杯全国大学生数学建模竞赛【ABC题】完整思路
  • Python从入门到精通(第十章——1 类和对象)
  • 基于JSP、java、Tomcat三者的项目实战--校园交易网(3)主页--添加商品功能
  • 【C++】数组案例 五只小猪称体重
  • 「译」Node.js Streams 基础
  • conda常用的命令
  • CSS实用技巧干货
  • Fabric架构演变之路
  • golang中接口赋值与方法集
  • open-falcon 开发笔记(一):从零开始搭建虚拟服务器和监测环境
  • SegmentFault 技术周刊 Vol.27 - Git 学习宝典:程序员走江湖必备
  • Terraform入门 - 1. 安装Terraform
  • vue-cli在webpack的配置文件探究
  • Webpack入门之遇到的那些坑,系列示例Demo
  • 不上全站https的网站你们就等着被恶心死吧
  • 关于Java中分层中遇到的一些问题
  • 如何使用Mybatis第三方插件--PageHelper实现分页操作
  • 正则表达式
  • mysql 慢查询分析工具:pt-query-digest 在mac 上的安装使用 ...
  • ​软考-高级-系统架构设计师教程(清华第2版)【第1章-绪论-思维导图】​
  • (1)Hilt的基本概念和使用
  • (Matlab)基于蝙蝠算法实现电力系统经济调度
  • (二) 初入MySQL 【数据库管理】
  • (附源码)ssm高校运动会管理系统 毕业设计 020419
  • (论文阅读40-45)图像描述1
  • (微服务实战)预付卡平台支付交易系统卡充值业务流程设计
  • (转)JVM内存分配 -Xms128m -Xmx512m -XX:PermSize=128m -XX:MaxPermSize=512m
  • (转)mysql使用Navicat 导出和导入数据库
  • (转)四层和七层负载均衡的区别
  • (轉貼) 2008 Altera 亞洲創新大賽 台灣學生成果傲視全球 [照片花絮] (SOC) (News)
  • .NET C# 使用 iText 生成PDF
  • .NET delegate 委托 、 Event 事件
  • .NET HttpWebRequest、WebClient、HttpClient
  • .NET MVC 验证码
  • .NET 漏洞分析 | 某ERP系统存在SQL注入
  • .net 无限分类
  • .NET 依赖注入和配置系统
  • .NET 直连SAP HANA数据库
  • .net 中viewstate的原理和使用
  • .NETCORE 开发登录接口MFA谷歌多因子身份验证
  • .NetCore发布到IIS
  • /proc/interrupts 和 /proc/stat 查看中断的情况
  • @autowired注解作用_Spring Boot进阶教程——注解大全(建议收藏!)
  • @GlobalLock注解作用与原理解析