当前位置: 首页 > news >正文

TeamTalk数据库代理服务器

文章目录

  • main函数主流程
    • 关键步骤
    • 线程池
    • redis缓存
      • 未读消息计数
        • 未读消息计数-单聊
        • 未读消息计数-群聊
      • 群成员管理

main函数主流程

关键步骤

  1. 初始化epoll + 线程池
  2. 数据入口 reactor CProxyConn::HandlePduBuf
  3. 异步task任务封装,把任务放入线程池;线程池里的线程异步执行任务,然后放入回复列表 CProxyConn::AddResponsePdu()
  4. epoll 主线程读取回复列表的数据发送给请求端 CProxyConn::SendResponsePduList()

db_proxy_server 处理逻辑与其他server不同的是 epoll(主线程)解析 pdu,然后封装成task任务,再给到线程池。

void CProxyConn::HandlePduBuf(uchar_t* pdu_buf, uint32_t pdu_len)
{CImPdu* pPdu = NULL;pPdu = CImPdu::ReadPdu(pdu_buf, pdu_len);if (pPdu->GetCommandId() == IM::BaseDefine::CID_OTHER_HEARTBEAT) {return;}pdu_handler_t handler = s_handler_map->GetHandler(pPdu->GetCommandId());if (handler) {CTask* pTask = new CProxyTask(m_uuid, handler, pPdu);g_thread_pool.AddTask(pTask);} else {log("no handler for packet type: %d", pPdu->GetCommandId());}
}// 工作线程调用
void CProxyConn::AddResponsePdu(uint32_t conn_uuid, CImPdu* pPdu)
{ResponsePdu_t* pResp = new ResponsePdu_t;pResp->conn_uuid = conn_uuid;pResp->pPdu = pPdu;s_list_lock.lock();s_response_pdu_list.push_back(pResp);s_list_lock.unlock();
}void proxy_loop_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{CProxyConn::SendResponsePduList(); // epoll 主线程调用
}// 主线程调用
void CProxyConn::SendResponsePduList()
{s_list_lock.lock();while (!s_response_pdu_list.empty()) {ResponsePdu_t* pResp = s_response_pdu_list.front();s_response_pdu_list.pop_front();s_list_lock.unlock();CProxyConn* pConn = get_proxy_conn_by_uuid(pResp->conn_uuid);if (pConn) {if (pResp->pPdu) {pConn->SendPdu(pResp->pPdu);} else {log("close connection uuid=%d by parse pdu error\b", pResp->conn_uuid);pConn->Close();}}if (pResp->pPdu)delete pResp->pPdu;delete pResp;s_list_lock.lock();}s_list_lock.unlock();
}

m_uuid 作用和 socketfd 一致,但是不用 socketfd 是避免在线重连导致 socketfd 复用。
或者说由于处理请求和发送回复在两个线程,socket的handle可能重用(怎么理解???),所以需要用一个一直增加的uuid来表示一个连接

线程池


int init_proxy_conn(uint32_t thread_num)
{s_handler_map = CHandlerMap::getInstance(); // 根据CmdID找到处理函数g_thread_pool.Init(thread_num);netlib_add_loop(proxy_loop_callback, NULL); // 回发数据signal(SIGTERM, sig_handler);return netlib_register_timer(proxy_timer_callback, NULL, 1000);
}void proxy_loop_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{CProxyConn::SendResponsePduList(); // epoll 主线程调用
}int CThreadPool::Init(uint32_t worker_size)
{m_worker_size = worker_size;m_worker_list = new CWorkerThread [m_worker_size];if (!m_worker_list) {return 1;}for (uint32_t i = 0; i < m_worker_size; i++) {m_worker_list[i].SetThreadIdx(i);m_worker_list[i].Start();}return 0;
}

redis缓存

  1. 连接数量如何设置?同步连接池,应该测试为准,经验redis cpu核数2;mysql核数4(不理解???)
  2. 为什么分开不同的db?便于水平扩展。
  3. pool_name的意义?做抽象,不必关注redis是否分机。
class CacheManager {
public:virtual ~CacheManager();static CacheManager* getInstance();int Init();CacheConn* GetCacheConn(const char* pool_name);void RelCacheConn(CacheConn* pCacheConn);
private:CacheManager();private:static CacheManager* 	s_cache_manager;map<string, CachePool*>	m_cache_pool_map; // 缓存连接池
};

未读消息计数

存储在unread连接池所在的redis数据库

  1. 单聊的消息ID设计:
    key设计为"msg_id_" + nRelateId
    函数:nMsgId = pMsgModel->getMsgId(nRelateId);
    nRelateId 来自于mysql的自增id,取决于两个人之间的映射
  2. 群聊的消息ID设计:
    key设计为 “group_msg_id_” + nGroupId
    函数:nMsgId = pGroupMsgModel->getMsgId(nToId);
未读消息计数-单聊

当发送消息时调用
pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());

else if(nMsgType== IM::BaseDefine::MSG_TYPE_SINGLE_TEXT) {if (nFromId != nToId) {nSessionId = CSessionModel::getInstance()->getSessionId(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);if (INVALID_VALUE == nSessionId) {nSessionId = CSessionModel::getInstance()->addSession(nFromId, nToId, IM::BaseDefine::SESSION_TYPE_SINGLE);}nPeerSessionId = CSessionModel::getInstance()->getSessionId(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE, false);if(INVALID_VALUE ==  nPeerSessionId){nSessionId = CSessionModel::getInstance()->addSession(nToId, nFromId, IM::BaseDefine::SESSION_TYPE_SINGLE);}uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, true);if(nSessionId != INVALID_VALUE && nRelateId != INVALID_VALUE){nMsgId = pMsgModel->getMsgId(nRelateId);if(nMsgId != INVALID_VALUE){pMsgModel->sendMessage(nRelateId, nFromId, nToId, nMsgType, nCreateTime, nMsgId, (string&)msg.msg_data());CSessionModel::getInstance()->updateSession(nSessionId, nNow);CSessionModel::getInstance()->updateSession(nPeerSessionId, nNow);}else{log("msgId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);}}else{log("sessionId or relateId is invalid. fromId=%u, toId=%u, nRelateId=%u, nSessionId=%u, nMsgType=%u", nFromId, nToId, nRelateId, nSessionId, nMsgType);}}bool CMessageModel::sendMessage(uint32_t nRelateId, uint32_t nFromId, uint32_t nToId, IM::BaseDefine::MsgType nMsgType, uint32_t nCreateTime, uint32_t nMsgId, string& strMsgContent)
{bool bRet =false;if (nFromId == 0 || nToId == 0) {log("invalied userId.%u->%u", nFromId, nToId);return bRet;}CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_master");if (pDBConn){string strTableName = "IMMessage_" + int2string(nRelateId % 8);string strSql = "insert into " + strTableName + " (`relateId`, `fromId`, `toId`, `msgId`, `content`, `status`, `type`, `created`, `updated`) values(?, ?, ?, ?, ?, ?, ?, ?, ?)";// 必须在释放连接前delete CPrepareStatement对象,否则有可能多个线程操作mysql对象,会crashCPrepareStatement* pStmt = new CPrepareStatement();if (pStmt->Init(pDBConn->GetMysql(), strSql)){uint32_t nStatus = 0;uint32_t nType = nMsgType;uint32_t index = 0;pStmt->SetParam(index++, nRelateId);pStmt->SetParam(index++, nFromId);pStmt->SetParam(index++, nToId);pStmt->SetParam(index++, nMsgId);pStmt->SetParam(index++, strMsgContent);pStmt->SetParam(index++, nStatus);pStmt->SetParam(index++, nType);pStmt->SetParam(index++, nCreateTime);pStmt->SetParam(index++, nCreateTime);bRet = pStmt->ExecuteUpdate();}delete pStmt;pDBManager->RelDBConn(pDBConn);if (bRet){uint32_t nNow = (uint32_t) time(NULL);incMsgCount(nFromId, nToId);}else{log("insert message failed: %s", strSql.c_str());}}else{log("no db connection for teamtalk_master");}return bRet;
}void CMessageModel::incMsgCount(uint32_t nFromId, uint32_t nToId)
{CacheManager* pCacheManager = CacheManager::getInstance();// increase message countCacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn) {pCacheConn->hincrBy("unread_" + int2string(nToId), int2string(nFromId), 1);pCacheManager->RelCacheConn(pCacheConn);} else {log("no cache connection to increase unread count: %d->%d", nFromId, nToId);}
}long CacheConn::hincrBy(string key, string field, long value)
{if (Init()) {return -1;}redisReply* reply = (redisReply *)redisCommand(m_pContext, "HINCRBY %s %s %ld", key.c_str(), field.c_str(), value);if (!reply) {log("redisCommand failed:%s", m_pContext->errstr);redisFree(m_pContext);m_pContext = NULL;return -1;}long ret_value = reply->integer;freeReplyObject(reply);return ret_value;
}

未读消息计数的key设计:“unread_” + int2string(nToId)
使用一个hash存储同一个user_id对应不同聊天的未读消息数量
请添加图片描述

未读消息计数-群聊

思考:如果群聊和单聊设计类似(群收到消息后,对每个群成员聊天数量+1),会有什么问题?群人多的话,效率低。

bool bRet = pStmt->ExecuteUpdate();
if (bRet)
{CGroupModel::getInstance()->updateGroupChat(nGroupId);incMessageCount(nFromId, nGroupId);clearMessageCount(nFromId, nGroupId);
} else {log("insert message failed: %s", strSql.c_str());
}#define     GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX    "_im_group_msg"
#define     GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX     "_im_user_group"
#define     GROUP_COUNTER_SUBKEY_COUNTER_FIELD          "count"bool CGroupMessageModel::incMessageCount(uint32_t nUserId, uint32_t nGroupId)
{bool bRet = false;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;pCacheConn->hincrBy(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD, 1);map<string, string> mapGroupCount;bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);if(bRet){string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);if(!strReply.empty()){bRet = true;}else{log("hmset %s failed !", strUserKey.c_str());}}else{log("hgetAll %s failed!", strGroupKey.c_str());}pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}return bRet;
}
void CGroupMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list<IM::BaseDefine::UnreadInfo>& lsUnreadCount)
{list<uint32_t> lsGroupId;CGroupModel::getInstance()->getUserGroupIds(nUserId, lsGroupId, 0);uint32_t nCount = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){for(auto it=lsGroupId.begin(); it!=lsGroupId.end(); ++it){uint32_t nGroupId = *it;string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;string strGroupCnt = pCacheConn->hget(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);if(strGroupCnt.empty()){
//                log("hget %s : count failed !", strGroupKey.c_str());continue;}uint32_t nGroupCnt = (uint32_t)(atoi(strGroupCnt.c_str()));string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;string strUserCnt = pCacheConn->hget(strUserKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);uint32_t nUserCnt = ( strUserCnt.empty() ? 0 : ((uint32_t)atoi(strUserCnt.c_str())) );if(nGroupCnt >= nUserCnt) {nCount = nGroupCnt - nUserCnt;}if(nCount > 0){IM::BaseDefine::UnreadInfo cUnreadInfo;cUnreadInfo.set_session_id(nGroupId);cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_GROUP);cUnreadInfo.set_unread_cnt(nCount);nTotalCnt += nCount;string strMsgData;uint32_t nMsgId;IM::BaseDefine::MsgType nType;uint32_t nFromId;getLastMsg(nGroupId, nMsgId, strMsgData, nType, nFromId);if(IM::BaseDefine::MsgType_IsValid(nType)){cUnreadInfo.set_latest_msg_id(nMsgId);cUnreadInfo.set_latest_msg_data(strMsgData);cUnreadInfo.set_latest_msg_type(nType);cUnreadInfo.set_latest_msg_from_user_id(nFromId);lsUnreadCount.push_back(cUnreadInfo);}else{log("invalid msgType. userId=%u, groupId=%u, msgType=%u, msgId=%u", nUserId, nGroupId, nType, nMsgId);}}}pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}
}

群未读消息计数:
1)一个群 Group_id 对应多个user_id
2) 同一个群 Group_id 对应多个user_id,不同的user_id对应的未读消息数量是不一样的
3)每次发消息时,群消息数量+1,发消息的个人计数也+1
4)未读消息数量 = 群消息数量 - 个人已读消息数量

群成员管理

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • AI问答-数据库:理解头表和行表
  • ModuleNotFoundError: No module named ‘keras.layers.core‘怎么解决
  • 【CSS】mask-image属性的详细介绍
  • Java中校验导入字段长度与数据库字段长度一致性
  • 图为科技基于昇腾AI,打造智慧工厂检测解决方案
  • 金蝶云星空查询SQL
  • 数据仓库理论知识
  • 2024/9/9 408“回头看”:b树
  • Spark-ShuffleWriter
  • 风中摇曳的小萝卜(机器学习)笔记 EM算法
  • AutoIT:强大的RPA自动化脚本神器,安装到使用的保姆级教程!
  • Matlab程序练习
  • 4千6历年高考英语试题大全ACCESS\EXCEL数据库
  • strncpy陷阱
  • 运维问题0002:SAP多模块问题-SAP系统程序在执行时,跳出“加急快件”窗口,提示:快件文档“更新已终止”从作者***收到
  • 【编码】-360实习笔试编程题(二)-2016.03.29
  • CSS3 变换
  • Docker容器管理
  • Java知识点总结(JDBC-连接步骤及CRUD)
  • Js实现点击查看全文(类似今日头条、知乎日报效果)
  • Promise初体验
  • Spring Security中异常上抛机制及对于转型处理的一些感悟
  • Vue源码解析(二)Vue的双向绑定讲解及实现
  • 百度贴吧爬虫node+vue baidu_tieba_crawler
  • 二维平面内的碰撞检测【一】
  • 规范化安全开发 KOA 手脚架
  • 湖南卫视:中国白领因网络偷菜成当代最寂寞的人?
  • 基于Mobx的多页面小程序的全局共享状态管理实践
  • 基于游标的分页接口实现
  • 经典排序算法及其 Java 实现
  • 那些年我们用过的显示性能指标
  • 前端面试题总结
  • 深入浅出Node.js
  • 什么是Javascript函数节流?
  • ​520就是要宠粉,你的心头书我买单
  • ​低代码平台的核心价值与优势
  • ​学习一下,什么是预包装食品?​
  • ​字​节​一​面​
  • # Swust 12th acm 邀请赛# [ E ] 01 String [题解]
  • (6)STL算法之转换
  • (C语言)fgets与fputs函数详解
  • (c语言版)滑动窗口 给定一个字符串,只包含字母和数字,按要求找出字符串中的最长(连续)子串的长度
  • (done) 两个矩阵 “相似” 是什么意思?
  • (env: Windows,mp,1.06.2308310; lib: 3.2.4) uniapp微信小程序
  • (python)数据结构---字典
  • (PyTorch)TCN和RNN/LSTM/GRU结合实现时间序列预测
  • (第27天)Oracle 数据泵转换分区表
  • (附程序)AD采集中的10种经典软件滤波程序优缺点分析
  • (七)Activiti-modeler中文支持
  • (十六)视图变换 正交投影 透视投影
  • (四)docker:为mysql和java jar运行环境创建同一网络,容器互联
  • (一)插入排序
  • (转)Google的Objective-C编码规范
  • .equals()到底是什么意思?
  • .gitignore文件忽略的内容不生效问题解决