当前位置: 首页 > news >正文

TeamTalk消息服务器学习

msg_server发送消息

信令

//service id  0x0003
message IMMsgData{//cmd id:		0x0301required uint32 from_user_id = 1;				//消息发送方required uint32 to_session_id = 2;				//消息接受方required uint32 msg_id = 3; // 非常重要:由谁产生?答:redis具体见下文required uint32 create_time = 4; required IM.BaseDefine.MsgType msg_type = 5; // 单聊或者群聊required bytes msg_data = 6;optional bytes attach_data = 20;
}message IMMsgDataAck{//cmd id:		0x0302required uint32 user_id = 1;			//发送此信令的用户idrequired uint32 session_id = 2;				required uint32 msg_id = 3;required IM.BaseDefine.SessionType session_type = 4;
}

流程图:
请添加图片描述

客户端A(葡萄)发送消息给客户端B(香蕉),信令为CID_MSG_DATA,msg_server收到后调用 CMsgConn::_HandleClientMsgData 函数

void CMsgConn::HandlePdu(CImPdu* pPdu)
{// request authorization checkif (pPdu->GetCommandId() != CID_LOGIN_REQ_USERLOGIN && !IsOpen() && IsKickOff()) {log("HandlePdu, wrong msg. ");throw CPduException(pPdu->GetServiceId(), pPdu->GetCommandId(), ERROR_CODE_WRONG_SERVICE_ID, "HandlePdu error, user not login. ");return;}switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑 case CID_MSG_DATA:_HandleClientMsgData(pPdu);break;case CID_MSG_DATA_ACK:_HandleClientMsgDataAck(pPdu);break;// ...... 省略无关逻辑 default:log("wrong msg, cmd id=%d, user id=%u. ", pPdu->GetCommandId(), GetUserId());break;}
}
void CMsgConn::_HandleClientMsgData(CImPdu* pPdu)
{IM::Message::IMMsgData msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));if (msg.msg_data().length() == 0) {log("discard an empty message, uid=%u ", GetUserId());return;}if (m_msg_cnt_per_sec >= MAX_MSG_CNT_PER_SECOND) {log("!!!too much msg cnt in one second, uid=%u ", GetUserId());return;}if (msg.from_user_id() == msg.to_session_id() && CHECK_MSG_TYPE_SINGLE(msg.msg_type())){log("!!!from_user_id == to_user_id. ");return;}m_msg_cnt_per_sec++;uint32_t to_session_id = msg.to_session_id();uint32_t msg_id = msg.msg_id();uint8_t msg_type = msg.msg_type();string msg_data = msg.msg_data();if (g_log_msg_toggle) {log("HandleClientMsgData, %d->%d, msg_type=%u, msg_id=%u. ", GetUserId(), to_session_id, msg_type, msg_id);}uint32_t cur_time = time(NULL);CDbAttachData attach_data(ATTACH_TYPE_HANDLE, m_handle, 0);msg.set_from_user_id(GetUserId());msg.set_create_time(cur_time);msg.set_attach_data(attach_data.GetBuffer(), attach_data.GetLength());pPdu->SetPBMsg(&msg);// send to DB storage serverCDBServConn* pDbConn = get_db_serv_conn();if (pDbConn) {pDbConn->SendPdu(pPdu);}
}

该函数直接将数据包转发给db_proxy_server,db_proxy_server有一个map来映射信令所对应的处理函数。db_proxy_server收到信令为CID_MSG_DATA,后调用 DB_PROXY::sendMessage。

该函数主要做:

  1. 创建会话ID,并且两个会话ID独立
nSessionId = CSessionModel::getInstance()->addSession(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE);nPeerSessionId = CSessionModel::getInstance()->getSessionId(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);
  1. 创建关系ID
uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, true);
  1. 生成消息ID (msg_id),写入消息到数据库
nMsgId = pMsgModel->getMsgId(nRelateId);
if(nMsgId != INVALID_VALUE)
{pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());CSessionModel::getInstance()->updateSession(nSessionId, nNow);CSessionModel::getInstance()->updateSession(nPeerSessionId, nNow);
}

msg_id:两个人之间的映射关系。如果按照时间排列,两个客户端之间的时间可能不一样,所以按照序号生成消息id。

  1. 每条消息id唯一
  2. 使用redis 生成消息 id
uint32_t CMessageModel::getMsgId(uint32_t nRelateId)
{uint32_t nMsgId = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if(pCacheConn){string strKey = "msg_id_" + int2string(nRelateId);nMsgId = pCacheConn->incrBy(strKey, 1);pCacheManager->RelCacheConn(pCacheConn);}return nMsgId;
}
  1. db_proxy_server 回复 msg_server

// message content
m_handler_map.insert(make_pair(uint32_t(CID_MSG_DATA), DB_PROXY::sendMessage));void sendMessage(CImPdu* pPdu, uint32_t conn_uuid){IM::Message::IMMsgData msg;if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength())){uint32_t nFromId = msg.from_user_id();uint32_t nToId = msg.to_session_id();uint32_t nCreateTime = msg.create_time();IM::BaseDefine::MsgType nMsgType = msg.msg_type();uint32_t nMsgLen = msg.msg_data().length();uint32_t nNow = (uint32_t)time(NULL);if (IM::BaseDefine::MsgType_IsValid(nMsgType)){if(nMsgLen != 0){CImPdu* pPduResp = new CImPdu;uint32_t nMsgId = INVALID_VALUE;uint32_t nSessionId = INVALID_VALUE;uint32_t nPeerSessionId = INVALID_VALUE;CMessageModel* pMsgModel = CMessageModel::getInstance();CGroupMessageModel* pGroupMsgModel = CGroupMessageModel::getInstance();if(nMsgType == IM::BaseDefine::MSG_TYPE_GROUP_TEXT) {// ...... 省略无关逻辑 } else if (nMsgType == IM::BaseDefine::MSG_TYPE_GROUP_AUDIO) {// ...... 省略无关逻辑 } else if(nMsgType== IM::BaseDefine::MSG_TYPE_SINGLE_TEXT) {if (nFromId != nToId) {nSessionId = CSessionModel::getInstance()->getSessionId(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);if (INVALID_VALUE == nSessionId) {// 创建会话nSessionId = CSessionModel::getInstance()->addSession(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE);}nPeerSessionId = CSessionModel::getInstance()->getSessionId(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);if(INVALID_VALUE ==  nPeerSessionId){// 创建会话关系IDnSessionId = CSessionModel::getInstance()->addSession(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE);}uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, true);if(nSessionId != INVALID_VALUE && nRelateId != INVALID_VALUE){nMsgId = pMsgModel->getMsgId(nRelateId);if(nMsgId != INVALID_VALUE){// 写入消息到数据库pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());CSessionModel::getInstance()->updateSession(nSessionId, nNow);CSessionModel::getInstance()->updateSession(nPeerSessionId, nNow);}else{log("msgId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);}}else{log("sessionId or relateId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);}}else{log("send msg to self. fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);}} else if(nMsgType == IM::BaseDefine::MSG_TYPE_SINGLE_AUDIO) {// ...... 省略无关逻辑 }log("fromId=%u, toId=%u, type=%u, msgId=%u, sessionId=%u", nFromId, nToId, nMsgType, nMsgId, nSessionId);msg.set_msg_id(nMsgId);pPduResp->SetPBMsg(&msg);pPduResp->SetSeqNum(pPdu->GetSeqNum());pPduResp->SetServiceId(IM::BaseDefine::SID_MSG);pPduResp->SetCommandId(IM::BaseDefine::CID_MSG_DATA);CProxyConn::AddResponsePdu(conn_uuid, pPduResp);}else{log("msgLen error. fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);}}else{log("invalid msgType.fromId=%u, toId=%u, msgType=%u", nFromId, nToId, nMsgType);}}else{log("parse pb failed");}}

msg_server收到CID_MSG_DATA信令后调用 CDBServConn::_HandleMsgData 函数

void CDBServConn::HandlePdu(CImPdu* pPdu)
{switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑 case CID_MSG_DATA:_HandleMsgData(pPdu);break;// ...... 省略无关逻辑 default:log("db server, wrong cmd id=%d ", pPdu->GetCommandId());}
}

CDBServConn::_HandleMsgData 处理消息有三点

  1. 首先 ack 客户端
  2. 然后发送到route_server广播
  3. 如果有多端登录,也要广播给其他客户端
void CDBServConn::_HandleMsgData(CImPdu *pPdu)
{IM::Message::IMMsgData msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));if (CHECK_MSG_TYPE_GROUP(msg.msg_type())) {s_group_chat->HandleGroupMessage(pPdu);return;}uint32_t from_user_id = msg.from_user_id();uint32_t to_user_id = msg.to_session_id();uint32_t msg_id = msg.msg_id();if (msg_id == 0) {log("HandleMsgData, write db failed, %u->%u.", from_user_id, to_user_id);return;}uint8_t msg_type = msg.msg_type();CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());uint32_t handle = attach_data.GetHandle();log("HandleMsgData, from_user_id=%u, to_user_id=%u, msg_id=%u.", from_user_id, to_user_id, msg_id);CMsgConn* pMsgConn = CImUserManager::GetInstance()->GetMsgConnByHandle(from_user_id, attach_data.GetHandle());if (pMsgConn){IM::Message::IMMsgDataAck msg2;msg2.set_user_id(from_user_id);msg2.set_msg_id(msg_id);msg2.set_session_id(to_user_id);msg2.set_session_type(::IM::BaseDefine::SESSION_TYPE_SINGLE);CImPdu pdu;pdu.SetPBMsg(&msg2);pdu.SetServiceId(SID_MSG);pdu.SetCommandId(CID_MSG_DATA_ACK);pdu.SetSeqNum(pPdu->GetSeqNum());pMsgConn->SendPdu(&pdu);}CRouteServConn* pRouteConn = get_route_serv_conn();if (pRouteConn) {pRouteConn->SendPdu(pPdu);}msg.clear_attach_data();pPdu->SetPBMsg(&msg);CImUser* pFromImUser = CImUserManager::GetInstance()->GetImUserById(from_user_id);CImUser* pToImUser = CImUserManager::GetInstance()->GetImUserById(to_user_id);pPdu->SetSeqNum(0);if (pFromImUser) {pFromImUser->BroadcastClientMsgData(pPdu, msg_id, pMsgConn, from_user_id);}if (pToImUser) {pToImUser->BroadcastClientMsgData(pPdu, msg_id, NULL, from_user_id);}IM::Server::IMGetDeviceTokenReq msg3;msg3.add_user_id(to_user_id);msg3.set_attach_data(pPdu->GetBodyData(), pPdu->GetBodyLength());CImPdu pdu2;pdu2.SetPBMsg(&msg3);pdu2.SetServiceId(SID_OTHER);pdu2.SetCommandId(CID_OTHER_GET_DEVICE_TOKEN_REQ);SendPdu(&pdu2);
}

route_server 收到 msg_server的消息转发,信令为CID_MSG_DATA,调用CRouteConn::_BroadcastMsg 广播函数。

void CRouteConn::HandlePdu(CImPdu* pPdu)
{switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑 case CID_MSG_DATA:case CID_SWITCH_P2P_CMD:case CID_MSG_READ_NOTIFY:case CID_OTHER_SERVER_KICK_USER:case CID_GROUP_CHANGE_MEMBER_NOTIFY:case CID_FILE_NOTIFY:case CID_BUDDY_LIST_REMOVE_SESSION_NOTIFY:_BroadcastMsg(pPdu, this);break;// ...... 省略无关逻辑 default:log("CRouteConn::HandlePdu, wrong cmd id: %d ", pPdu->GetCommandId());break;}
}

其实就是广播给所有的msg_server

void CRouteConn::_BroadcastMsg(CImPdu* pPdu, CRouteConn* pFromConn)
{ConnMap_t::iterator it;for (it = g_route_conn_map.begin(); it != g_route_conn_map.end(); it++) {CRouteConn* pRouteConn = (CRouteConn*)it->second;if (pRouteConn != pFromConn) {pRouteConn->SendPdu(pPdu);}}
}

所有的msg_server收到广播后再广播给所有的客户端

void CRouteServConn::_HandleMsgData(CImPdu* pPdu)
{IM::Message::IMMsgData msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));if (CHECK_MSG_TYPE_GROUP(msg.msg_type())) {s_group_chat->HandleGroupMessageBroadcast(pPdu);return;}uint32_t from_user_id = msg.from_user_id();uint32_t to_user_id = msg.to_session_id();uint32_t msg_id = msg.msg_id();log("HandleMsgData, %u->%u, msg_id=%u. ", from_user_id, to_user_id, msg_id);CImUser* pFromImUser = CImUserManager::GetInstance()->GetImUserById(from_user_id);if (pFromImUser){pFromImUser->BroadcastClientMsgData(pPdu, msg_id, NULL, from_user_id);}CImUser* pToImUser = CImUserManager::GetInstance()->GetImUserById(to_user_id);if (pToImUser){pToImUser->BroadcastClientMsgData(pPdu, msg_id, NULL, from_user_id);}
}

msg_server广播给所有的客户端有讲究,会将消息id等封装成 msg_ack_t 结构体塞入m_send_msg_list 发送列表,等到收到对端的CID_MSG_DATA_ACK,再将此msg_ack_t 结构体从发送列表中移除。其实就是业务层的ACK机制,避免丢消息。

typedef struct {uint32_t msg_id;uint32_t from_id;uint64_t timestamp;
} msg_ack_t; // 业务层的ackvoid CImUser::BroadcastClientMsgData(CImPdu* pPdu, uint32_t msg_id, CMsgConn* pFromConn, uint32_t from_id)
{for (map<uint32_t, CMsgConn*>::iterator it = m_conn_map.begin(); it != m_conn_map.end(); it++){CMsgConn* pConn = it->second;if (pConn != pFromConn) {pConn->SendPdu(pPdu);pConn->AddToSendList(msg_id, from_id);}}
}void CMsgConn::AddToSendList(uint32_t msg_id, uint32_t from_id)
{//log("AddSendMsg, seq_no=%u, from_id=%u ", seq_no, from_id);msg_ack_t msg;msg.msg_id = msg_id;msg.from_id = from_id;msg.timestamp = get_tick_count();m_send_msg_list.push_back(msg);g_down_msg_total_cnt++;
}

收到确认 CID_MSG_DATA_ACK信令后,移除该确认结构体

void CMsgConn::_HandleClientMsgDataAck(CImPdu* pPdu)
{IM::Message::IMMsgDataAck msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));IM::BaseDefine::SessionType session_type = msg.session_type();if (session_type == IM::BaseDefine::SESSION_TYPE_SINGLE){uint32_t msg_id = msg.msg_id();uint32_t session_id = msg.session_id();DelFromSendList(msg_id, session_id);}
}void CMsgConn::DelFromSendList(uint32_t msg_id, uint32_t from_id)
{//log("DelSendMsg, seq_no=%u, from_id=%u ", seq_no, from_id);for (list<msg_ack_t>::iterator it = m_send_msg_list.begin(); it != m_send_msg_list.end(); it++) {msg_ack_t msg = *it;if ( (msg.msg_id == msg_id) && (msg.from_id == from_id) ) {m_send_msg_list.erase(it);break;}}
}

至此,消息就算发送成功了。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • spring入门(一)spring简介
  • debian系统安装mysql
  • taro ui 小程序at-calendar日历组件自定义样式+选择范围日历崩溃处理
  • 阿里龙晰系统上将yum安装的myql_8.0.36升级到mysql_8.4.0的过程
  • 279.完全平方数
  • 【Qt 事件】—— 详解Qt事件处理
  • 代码随想录Day 31|leetcode题目:56.合并区间、738.单调递增的数字、968.监控二叉树
  • 【网络原理】从0开始学习计算机网络常识,中学生看了都能学会
  • 倒计时1天!每日一题,零基础入门FPGA
  • 【时间盒子】-【2.准备】HarmonyOS 开发前需要准备什么?
  • Mysql8 主从复制主从切换(超详细)
  • 如何在 CentOS 6 上安装 Nagios
  • 面试(九)
  • 今日算法:蓝桥杯基础题之“星期一”
  • 【Nest 学习笔记】AOP切片编程
  • 【跃迁之路】【669天】程序员高效学习方法论探索系列(实验阶段426-2018.12.13)...
  • CentOS 7 修改主机名
  • CSS3 变换
  • iOS动画编程-View动画[ 1 ] 基础View动画
  • Java 网络编程(2):UDP 的使用
  • java概述
  • Python打包系统简单入门
  • TypeScript迭代器
  • 创建一种深思熟虑的文化
  • 官方新出的 Kotlin 扩展库 KTX,到底帮你干了什么?
  • 基于Javascript, Springboot的管理系统报表查询页面代码设计
  • 基于游标的分页接口实现
  • 世界编程语言排行榜2008年06月(ActionScript 挺进20强)
  • 微信支付JSAPI,实测!终极方案
  • 小程序开发中的那些坑
  • 用jquery写贪吃蛇
  • ​Distil-Whisper:比Whisper快6倍,体积小50%的语音识别模型
  • ​卜东波研究员:高观点下的少儿计算思维
  • ## 1.3.Git命令
  • ###C语言程序设计-----C语言学习(6)#
  • ###STL(标准模板库)
  • #免费 苹果M系芯片Macbook电脑MacOS使用Bash脚本写入(读写)NTFS硬盘教程
  • $nextTick的使用场景介绍
  • %@ page import=%的用法
  • (04)odoo视图操作
  • (1)(1.9) MSP (version 4.2)
  • (1)Map集合 (2)异常机制 (3)File类 (4)I/O流
  • (2024)docker-compose实战 (9)部署多项目环境(LAMP+react+vue+redis+mysql+nginx)
  • (2024,Vision-LSTM,ViL,xLSTM,ViT,ViM,双向扫描)xLSTM 作为通用视觉骨干
  • (5)STL算法之复制
  • (C语言)输入一个序列,判断是否为奇偶交叉数
  • (k8s中)docker netty OOM问题记录
  • (Python) SOAP Web Service (HTTP POST)
  • (附源码)php新闻发布平台 毕业设计 141646
  • (附源码)springboot高校宿舍交电费系统 毕业设计031552
  • (三)elasticsearch 源码之启动流程分析
  • (推荐)叮当——中文语音对话机器人
  • (一)模式识别——基于SVM的道路分割实验(附资源)
  • (一)认识微服务
  • (原創) 如何解决make kernel时『clock skew detected』的warning? (OS) (Linux)