当前位置: 首页 > news >正文

mediasoup-cluster横向扩容机制

基于mediasoup实现类似RTMP-CDN的回源机制,从而实现cluster-node的横向扩容。
mediasoup worker实现了PipeTransport机制。
pipeTransport代表了一个RTP通信对,可以实现同一台机器或者多台机器之间的通信。
A pipe transport represents a network path through which RTP, RTCP (optionally secured with SRTP) and SCTP (DataChannel) is transmitted. 
Pipe transports are intented to intercommunicate two Router instances collocated on the same host or on separate hosts.
基于这个机制,可以实现从源节点pipe video到多个不同edge节点的功能。

源节点作为socket server工作,等候edgeNode连接请求。
function OriginSvrListener (ioSvr) {
  ioSvr.on('connection', (socket) => {
    socket.routerIds = new Map()
    viewerEndpoints.set(socket.id, socket)

    roomList.forEach(async (room) => {
      const roomId = room.id
      //向edgenode请求用作pipe的routerId。
      const { remoteRouterId } = await socket.emit('getRemoteRouterInfo', { roomId })
      //建立本地room与多个edgenode的映射表
      socket.routerIds.set(roomId, remoteRouterId)
      
      //获得本地producers,然后pipe to remote router.
      const producers = room.getProducers()
      producers.forEach(({ producer, router }) => {
        room.pipeToRemoteRouter({ router, remoteRouterId, roomId, producer, socket })
      })
    })
  })
}

边缘节点作为socket client工作。
async function edgeSocketClient (params = { channelName: undefined, arrBroadcasterAddr: [] }) {

    let broadcasterAddr = arrBroadcasterAddr[0] + config.NAMESPACE
    const edgeSocket = ioClient.connect(broadcasterAddr, { transports: ['websocket'] })

    edgeSocket.on('createRemotePipeTransport', async ({ roomId, routerId }, callback) => {
        const params = await room.createRemotePipeTransport({ routerId })
    })
    
    edgeSocket.on('connectRemotePipeTransport', async ({ roomId, routerId, ip, port, srtpParameters }) => {
        const remotePipeTransport = room.pipe.transports.get(routerId)
        await remotePipeTransport.connect({ ip, port, srtpParameters })
    })
    
    //响应源节点要求创建pipeProducer
    edgeSocket.on('createPipeProducer', async ({ roomId, routerId, id, kind, rtpParameters, paused, appData }) => {
        await room.createPipeProducer({ routerId, id, kind, rtpParameters, paused, appData, edgeSocket })
    })

    //响应源节点要求创建room
    edgeSocket.on('getRemoteRouterInfo', async ({ roomId }, callback = doNothing) => {
      let room = roomList.get(roomId)
      try {
        if (!room) {
          room = await createRoom({ roomId })
        }
        response.remoteRouterId = room.getPrimaryRouterId()
        callback(response)
      } catch (error) {
        console.error(error.stack || error.toString())
        response.error = error
        callback(response)
      }
    })
}

mediasoup提供关键函数。//注释里的本地指originNode, 对端指edgeNode。
  async pipeToRemoteRouter ({ socket, router, remoteRouterId, roomId, producer }) {
    
    try {
      pipeTransportPairPromise = new Promise((resolve, reject) => {
        Promise.all([
        //本地创建pipeTransport
        router.createPipeTransport({...config.mediasoup.pipeTransport, listenIp:config.GetWebrtcListenIP()}),
        //通知对端创建pipeTransport
        socket.promisedEmit('createRemotePipeTransport', {roomId, routerId: router.id})  ])
          .then((pipeTransports) => {
            localPipeTransport = pipeTransports[0]
            remotePipeTransport = pipeTransports[1]
          })
          .then(() => {
            return Promise.all([
              //本地pipeTransport进行连接
              localPipeTransport.connect({
                ip: remotePipeTransport.tuple.localIp,
                port: remotePipeTransport.tuple.localPort,
                srtpParameters: remotePipeTransport.srtpParameters
              }),
              //通知对端进行pipeTransport连接。
              socket.emit('connectRemotePipeTransport', {
                roomId,
                routerId: router.id,
                ip: localPipeTransport.tuple.localIp,
                port: localPipeTransport.tuple.localPort,
                srtpParameters: localPipeTransport.srtpParameters
              })
            ]).catch((error) => { throw error })
          })
        })
      }

      let pipeConsumer
      try {
        //pipeConsumer的消费对象是本地的producer。
        pipeConsumer = await localPipeTransport.consume({
          producerId: producer.id
        })
        //向对端发出创建pipeProducer的event。edgeNode作为producer工作。pipe传输时,consumer与producer角色是反的。
        socket.emit('createPipeProducer', {
          roomId,
          routerId: router.id,
          id: producer.id,
          kind: pipeConsumer.kind,
          rtpParameters: pipeConsumer.rtpParameters,
          paused: pipeConsumer.producerPaused,
          appData: producer.appData
        })
        return { pipeConsumer }
      } catch (error) {
        throw error
      }
    } catch (error) {
      console.error(error.stack || error.toString())
    }
  }

测试过程:
[17:20:08.077] [DEBUG] [pid:2385785}] [server/app.js:1479] - edge-EP received event: getRemoteRouterInfo
[17:20:08.079] [DEBUG] [pid:2385785}] [server/app.js:1404] - edge-EP received event: createRemotePipeTransport
[17:20:08.080] [DEBUG] [pid:2385785}] [server/app.js:1419] - edge-EP received event: connectRemotePipeTransport
[17:20:08.081] [DEBUG] [pid:2385785}] [server/app.js:1424] - remotePipeTransport connected {"room":"ch-11","broadcaster":{"ip":"122.248.233.61","port":10304}}
[17:20:08.082] [DEBUG] [pid:2385785}] [server/app.js:1448] - edge-EP received event: createPipeProducer
[17:20:08.083] [INFO] [pid:2385785}] [server/Room.js:629] - {"event":"createPipeProducer","name":"xiaoming","room":"ch-11","kind":"audio","codec":{"mimeType":"audio/opus","clockRate":48000,"channels":2,"payloadType":100,"parameters":{"minptime":10,"sprop-stereo":1,"usedtx":1,"useinbandfec":1}}}
[17:20:08.083] [DEBUG] [pid:2385785}] [server/Room.js:648] - pipeProducer created. broadcasting newProducers to ms-worker thread.
[17:20:08.130] [DEBUG] [pid:2385785}] [server/app.js:1479] - edge-EP received event: getRemoteRouterInfo
[17:20:08.132] [DEBUG] [pid:2385785}] [server/app.js:1448] - edge-EP received event: createPipeProducer
[17:20:08.132] [INFO] [pid:2385785}] [server/Room.js:629] - {"event":"createPipeProducer","name":"Greg","room":"ch-11","kind":"video","codec":{"mimeType":"video/h264","clockRate":90000,"payloadType":105,"parameters":{"packetizationMode":1,"profileLevelId":"42e01f"}}}

参考文档:https://mediasoup.org/documentation/v3/mediasoup/api/#PipeTransportOptions
 

相关文章:

  • mac flutter pb解析报错:protoc-gen-dart: program not found or is not executable
  • 蓝桥杯官网练习题(正则问题)
  • openGauss学习笔记-114 openGauss 数据库管理-设置安全策略-设置帐号有效期
  • gcc: __linux__
  • [SSD综述 1.4] SSD固态硬盘的架构和功能导论
  • Julia文件读写函数:write和read
  • 无mac电脑获取app的公钥的方法
  • IOC容器中的Bean是线程安全的吗?
  • 【jvm】虚拟机栈
  • 好物周刊#29:项目管理软件
  • vector类模拟实现(c++)(学习笔记)
  • 【C语言】【数据结构】【顺序表】
  • 二维码智慧门牌管理系统升级:一键报错解决三大问题
  • 电源管理(PMIC)MAX20428ATIA/VY、MAX20428ATIC/VY、MAX20428ATIE/VY适合汽车ADAS应用的开关稳压器
  • ORM-1 字段默认值
  • [原]深入对比数据科学工具箱:Python和R 非结构化数据的结构化
  • java正则表式的使用
  • opencv python Meanshift 和 Camshift
  • React-生命周期杂记
  • UMLCHINA 首席专家潘加宇鼎力推荐
  • vue.js框架原理浅析
  • 听说你叫Java(二)–Servlet请求
  • 学习笔记DL002:AI、机器学习、表示学习、深度学习,第一次大衰退
  • 阿里云ACE认证之理解CDN技术
  • ​软考-高级-信息系统项目管理师教程 第四版【第14章-项目沟通管理-思维导图】​
  • $Django python中使用redis, django中使用(封装了),redis开启事务(管道)
  • (2022 CVPR) Unbiased Teacher v2
  • (done) ROC曲线 和 AUC值 分别是什么?
  • (M)unity2D敌人的创建、人物属性设置,遇敌掉血
  • (分类)KNN算法- 参数调优
  • (附源码)ssm高校实验室 毕业设计 800008
  • (译) 函数式 JS #1:简介
  • (原創) 如何優化ThinkPad X61開機速度? (NB) (ThinkPad) (X61) (OS) (Windows)
  • (转)mysql使用Navicat 导出和导入数据库
  • (转)ObjectiveC 深浅拷贝学习
  • (转)大型网站架构演变和知识体系
  • .NET “底层”异步编程模式——异步编程模型(Asynchronous Programming Model,APM)...
  • .NET 6 在已知拓扑路径的情况下使用 Dijkstra,A*算法搜索最短路径
  • .NET 8.0 发布到 IIS
  • .Net CF下精确的计时器
  • .NET WebClient 类下载部分文件会错误?可能是解压缩的锅
  • .NET/C# 使用 SpanT 为字符串处理提升性能
  • @AutoConfigurationPackage的使用
  • @EventListener注解使用说明
  • @RequestMapping-占位符映射
  • []串口通信 零星笔记
  • [AIGC] 开源流程引擎哪个好,如何选型?
  • [APIO2012] 派遣 dispatching
  • [BZOJ2208][Jsoi2010]连通数
  • [LeetCode] Binary Tree Preorder Traversal 二叉树的先序遍历
  • [LeetCode]: 145: Binary Tree Postorder Traversal
  • [leetcode]56. Merge Intervals归并区间
  • [Linux]使用CentOS镜像与rpm来安装虚拟机软件
  • [MySQL]日期和时间函数
  • [p4] Uncheckout other user‘s file?