当前位置: 首页 > news >正文

go 实现websocket以及详细设计流程过程,确保通俗易懂

websocket简介:

WebSocket 是一种网络传输协议,可在单个 TCP 连接上进行全双工通信,位于 OSI 模型的应用层。WebSocket 协议在 2011 年由 IETF 标准化为 RFC 6455,后由 RFC 7936 补充规范。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。

理解各种协议和通信层、套接字的含义

IP:网络层协议;(高速公路)

TCP和UDP:传输层协议;(卡车)

HTTP:应用层协议;(货物)。HTTP(超文本传输协议)是建立在TCP协议之上的一种应用。HTTP连接最显著的特点是客户端发送的每次请求都需要服务器回送响应,在请求结束后,会主动释放连接。从建立连接到关闭连接的过程称为“一次连接”。

SOCKET:套接字,TCP/IP网络的API。(港口码头/车站)Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。socket是在应用层和传输层之间的一个抽象层,它把TCP/IP层复杂的操作抽象为几个简单的接口供应用层调用已实现进程在网络中通信。

Websocket:同HTTP一样也是应用层的协议,但是它是一种双向通信协议,是建立在TCP之上的,解决了服务器与客户端全双工通信的问题,包含两部分:一部分是“握手”,一部分是“数据传输”。握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。

*注:什么是单工、半双工、全工通信?

数据只能单向传送为单工;
数据能双向传送但不能同时双向传送称为半双工;
数据能够同时双向传送则称为全双工。

上面是简单的接受了websocket情况以及和其他协议的区别及联系,在做之前,还是要了解下这块,对后期实战有帮助。

websocket开源选择:

在go语言中,websocket组件比较多的,可以到go仓库搜索下:

今天以仓库使用最多一个开源框架(gorilla)进行实战落地,以及讲解整个过程细节

websocket (github.com/gorilla/websocket)

上面介绍完整体开源情况。

业务背景:

      1、数据实时推送到前端进行图形化显示

      2、报警数据需要实时推送各端,例如web、cliet、安卓、ios等等其他客户端

      我前几年一直从事的是java,可以看看我的播客,基本上与java有关,go也是最近两三周学习的,我其他播客有具体说明,因为公司业务需要,所有就简单的学了一下go语言,作为刚接触go不久的技术人员,如何面对新技术进行探索和落地的,大家可以跟着我的思路进行学习模仿,这样后期学习稍微比较快一些。

 实战:    

        1、在实战之前,我们先看看官网使用说明:

             

这是最简单的,方式,没有其他业务,我们如何进行最佳实现呢?还是要看官网:

传送门:websocket/examples/chat at main · gorilla/websocket · GitHub

具体源码不说了,但是里面有几个重要点已经出现了,也是我们要学习的思想,这也是为什么要看开源代码的原因,要学习他们的思想和编码技巧。

稍微解释下Hub结构体里的字段:

            

   // Registered clients. // 这是保存客户端连接的信息,map,从这里可以看出来,所有的客户端都要保存到这里,这时可以想到,后期保存到redis里,从这里进行扩展即可。clients map[*Client]bool// Inbound messages from the clients. // 这个就是广播数据了,但是demo了用了字节链,目的是了并行执行,提高效率,字节目的是接收所有情况的数据broadcast chan []byte// Register requests from the clients.
// 这个比较好理解了,是新客户端进行连接时触发的链条,为什么走链,也是为了并行执行,也就是异步执行register chan *Client// Unregister requests from clients. 
//这个就是取消注册,也就是关闭客户端连接unregister chan *Client

其实这几个字段已经把我们的框架整体搭建起来了,clients负责存储客户端,broadcast负责服务端发送给客户端数据的,register负责用来监听新建连接,unregister负责关闭客户端连接的

其实websocket也就是干这个事情的,例如websocket服务端收集所有客户端,然后根据需要进行发送消息给客户,然后就是关闭,大致流程就是这样的。

在看客户端怎么进行封装的:

这个就是针对上面的Hub进行组装结构体实例的,这里就不介绍了,本次实战也是根据这个来的,大家看懂这个基本上后续其他开源的websocket都没啥大问题。

接下来真正进行项目实战:

1、第一步创建websocket的客户端管理结构体:

// clientManager
// @Description: 客户端管理者
type clientManager struct {//客户端存储的地方,我这边是用map进行存储,这块可以放到redis上,也是可以的,根据情况扩展即可//这里可以使用map[string]*client,也是可以的,如果这样设计,方便后期进行匹配比较简单,直接匹配key即可,这种方式也是可以的,匹配客户端对象里的keyclients map[*client]bool//广播数据链,进行业务限制,如果没有业务限制,直接使用[]byte 比较通用;用chan进行异步处理broadcast chan model.BusinessDataWrapper//客户端注册链;用chan进行异步处理register chan *client//客户端关闭链;用chan进行异步处理unregister chan *client
}

结构体首字母小写,目的不用暴露出去,用于内部使用即可;每个字段不解释了,上面注释已经写好了,其实和官网是一样的。

2、客户端结构体:

// client
// @Description: 客户端信息
type client struct {//每个客户端连接后都要进行生成唯一key,因为业务场景需求,不同的用户或设备接受的数据要一一对应,后期这块要会做权限控制key string//客户端连接对象socket *websocket.Conn//数据发送链send chan []byte
}

3、编写启动方法,其实官网写的很像,稍微进行重构,更加符合当前项目

func (m *clientManager) start(callBackFunc func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool) {for {select {case client := <-m.register: //进行连接m.clients[client] = truemsg := "有一个新连接出现,已连接成功,客户端key:" + client.keylog.Info(ctx, msg)//发送数据,这块可以不用发送,等后续进行注释即可//m.send([]byte(msg), client)case client := <-m.unregister: //关闭连接,进行释放资源if _, ok := m.clients[client]; ok {close(client.send)delete(m.clients, client)msg := "客户端key:" + client.key + ",已关闭"log.Info(ctx, msg)//m.send([]byte(msg), client)}case businessDataWrapper := <-m.broadcast: //广播数据for client := range m.clients {//这里其实就是发送数据了,//进行转换成json字符串//businessDataJsonStr, _ := json.Marshal(businessDataWrapper)//broadCastSend(m, client, businessDataJsonStr)//进行回调,我这里面目的为了后期这块不在调整代码了,在启动时进行业务处理,方便后期扩展用的,如果毕竟简单,直接使用broadCastSend(m, client, businessDataJsonStr)进行推送信息callBackFunc(client, m, businessDataWrapper)}}}
}// 广播进行发送数据
func broadCastSend(manager *clientManager, client *client, businessDataJsonByte []byte) {select {case client.send <- businessDataJsonByte:default:fmt.Println("关闭连接了,,,,,")close(client.send)delete(manager.clients, client)}
}

4、某一个客户端群发给其他客户端,排除自己客户端

//发送数据,这快类似群发消息
func (m *clientManager) send(message []byte, ignore *client) {for client := range m.clients {//ignore,这是忽略本身客户端,因为这是群发消息,自己可以不用接收了if client != ignore {//将数据写入到通道链client.send <- message}}
}

5、推送数据,第四步是写入到发送通道里,还没真正发送,这块就是真正推送到客户端机制:

func (c *client) write(manager clientManager) {defer func() {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "客户端进行关闭")}()for {select {//如果客户端有数据要进行写出去case message, ok := <-c.send:if !ok {c.socket.WriteMessage(websocket.CloseMessage, []byte{})log.Info(ctx, c.key, "发送关闭提示")return}//这里才是真正的把数据推送到客户端err := c.socket.WriteMessage(websocket.TextMessage, message)if err != nil {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "数据写入失败,进行关闭!")break}}}
}

6、接收客户端发送的数据

func (c *client) read(manager clientManager) {defer func() {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "客户端进行关闭")}()for {_, message, err := c.socket.ReadMessage()if err != nil {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "读数据出现异常,直接关闭。")break}//后期可以注释掉log.Info(ctx, c.key, "接收到客户端发送的数据", string(message))//读到数据,进行业务操作,目前我这边项目只需要推送到客户端即可,所以暂时不做业务了,其他需要做业务,这里做个监听即可}
}

7、提供创建客户端管理函数

// 创建客户端管理器
func newClientManager() *clientManager {return &clientManager{//广播数据,model.BusinessDataWrapper是我具体业务数据,可以换成[]byte接收broadcast:  make(chan model.BusinessDataWrapper),register:   make(chan *client),unregister: make(chan *client),clients:    make(map[*client]bool),}
}

以上就是整体的封装处理,可以看到和官方的demo很像,只是结合了一些业务场景而已,其他的都一样的。

接下来进行和业务进行集成:

1、创建管理器,上面也说了有两个业务场景, 一个是原始数据推送 ,另一个是报警数据推送,所以创建两个管理器出来:

// 原始ws客户端管理器
var rowDataManagerNew = newClientManager()// 报警数据ws客户端管理器
var alarmDataManagerNew = newClientManager()

2、我们注册路由上,通过上面也能推断,需要指定两个路由路径

// 初始化websocket协议配置
var upgrader = websocket.Upgrader{ReadBufferSize:  1024,WriteBufferSize: 1024,CheckOrigin:     func(r *http.Request) bool { return true }, //允许跨域 、 允许同源
}// registerRawDataClientConn
//
//	@Author  zhaosy
//	@Description: 注册原始数据客户端连接
//	@date  2024-07-16 18:12:19
func registerRawDataClientConn(w http.ResponseWriter, r *http.Request, businessType string, businessId string, userName string) {if lang.IsEmpty(businessType) {io.WriteString(w, "businessType 不能为空")}//生成客户端conn, err := upgrader.Upgrade(w, r, nil)if err != nil {log.Error(ctx, err.Error(), err)io.WriteString(w, "这是一个websocket连接,不是API.")return}clientId := guid.S()
//这个key是我随机生成的一个key,包含了一些业务,大家根据需要进行设置,也可以随机生成就行,就是一行字符串,如果对key没有要求,其实key不用处理也行哈key := websocketRowDataCachePrefix(businessType, businessId, userName, clientId)//初始化客户端对象client := &client{key:    key,socket: conn,send:   make(chan []byte),}rowDataManagerNew.register <- client//开启读go client.read(*rowDataManagerNew)//开起写go client.write(*rowDataManagerNew)}// registerAlarmClient
//
//	@Author  zhaosy
//	@Description: 注册报警客户端连接
//	@date  2024-07-16 19:40:52
func registerAlarmClientConn(w http.ResponseWriter, r *http.Request) {//生成客户端conn, err := upgrader.Upgrade(w, r, nil)if err != nil {log.Error(ctx, err.Error(), err)io.WriteString(w, "这是一个websocket,不是API.")return}clientId := guid.S()
//这个key是我随机生成的一个key,包含了一些业务,大家根据需要进行设置,也可以随机生成就行,就是一行字符串,如果对key没有要求,其实key不用处理也行哈key := websocketAlarmCachePrefix(clientId)//初始化客户端对象client := &client{key:    key,socket: conn,send:   make(chan []byte),}alarmDataManagerNew.register <- client//开启读go client.read(*alarmDataManagerNew)//开启写go client.write(*alarmDataManagerNew)}

上面看到 

w http.ResponseWriter, r *http.Request 这两个参数应该就知道怎么做了吧,直接绑定到路由路由,也是官网那种方式

这是注册到go的http路由上了,后续通过path路径进行访问即可。

3、如何与我们的业务数据进行绑定?

还需要提供包函数出去

// SendRowDataBusinessData
//
//	@Author  zhaosy
//	@Description: 接收业务数据进行推送到websocket
//	@date  2024-07-16 18:19:46
func SendRowDataBusinessData(data model.BusinessData) {rowDataManagerNew.broadcast <- model.SetRowDataWsWrapper(data)
}// SendAlarmBusinessData
//
//	@Author  zhaosy
//	@Description: 接收报警数据,推送到websocket客户端,这是我项目的业务,大家换成string即可
//	@date  2024-07-16 19:42:13
func SendAlarmBusinessData(data model.AlarmBusinessData) {alarmDataManagerNew.broadcast <- model.SetAlarmWsWrapper(data)
}

仅供参考。

这里是业务数据推送websocket的入口:

4、最后一步是管理器要启动了,启动前,大家知道我写了 回调函数,要进行实现下,具体业务了,所以大家参考即可:

func init() {//启动原始数据websocketgo rowDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {if consts.ZERO == businessDataWrapper.DataType { //原始数据,推送businessData := businessDataWrapper.BusinessDataif businessData.BusinessId == "" {//进行转换成json字符串businessDataJsonStr, _ := json.Marshal(businessData)//广播所有客户端broadCastSend(manager, c, businessDataJsonStr)return true}//找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的if strings.Contains(c.key, websocketRowDataCachePrefix(businessData.BusinessType, businessData.BusinessId, "", "")) {//进行转换成json字符串businessDataJsonStr, _ := json.Marshal(businessData)//广播指定客户端broadCastSend(manager, c, businessDataJsonStr)return true}return false}return false})//启动报警数据推送go alarmDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {// 报警数据,推送if consts.ONE == businessDataWrapper.DataType {alarmBusinessData := businessDataWrapper.AlarmBusinessDataif alarmBusinessData.BusinessId != "" {//找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的if strings.Contains(c.key, websocketAlarmCachePrefix("")) {//进行转换成json字符串alarmBusinessDataJsonStr, _ := json.Marshal(alarmBusinessData)//广播指定客户端broadCastSend(manager, c, alarmBusinessDataJsonStr)return true}}}return false})
}

这样就可以了,启动整体就没问题了,我这边用的是goframe框架,所以我单独提供了这两个:

func NewWs() *webSocket {return &webSocket{}
}type webSocket struct {
}// RawDataWSHandle
//
//	@Author  zhaosy
//	@Description: 原始数据websocket
//	@date  2024-07-16 15:00:04
func (w *webSocket) RawDataWSHandle(r *ghttp.Request) {//获取参数businessType := r.Get("businessType")businessId := r.Get("businessId")userName := r.Get("userName")registerRawDataClientConn(r.Response.BufferWriter, r.Request, businessType.String(), businessId.String(), userName.String())
}// AlarmWSHandle
//
//	@Author  zhaosy
//	@Description: 报警websocket处理器
//	@date  2024-07-16 19:43:57
func (w *webSocket) AlarmWSHandle(r *ghttp.Request) {registerAlarmClientConn(r.Response.BufferWriter, r.Request)
}

在goframe里cmd里进行绑定:

	//websocket--原始数据websocket推送s.BindHandler("/ws/rowdata/{businessType}/{businessId}/{userName}", websocket.NewWs().RawDataWSHandle)//websocket--报警数据websocket推送s.BindHandler("/ws/alarm", websocket.NewWs().AlarmWSHandle)

5、进行测试:

启动正常,日志也输出来了,进行测试

上面是连接正常,

通过业务数据进行测试:

以上就是本次研究的结果,整体上go的websocket比较简单,后面有机会,会重新进行重构,重构单独封装可以随意使用。

发一个整的代码:

// Package websocket
// @Author zhaosy
// @Date 2024/7/16 下午2:52:00
// @Desc websocket相关
package websocketimport ("context""encoding/json""fmt""github.com/gogf/gf/v2/frame/g""github.com/gogf/gf/v2/net/ghttp""github.com/gogf/gf/v2/util/guid""github.com/gorilla/websocket""io""net/http""skynet/internal/consts""skynet/internal/model""skynet/utility/lang""strings"
)var (ctx = context.TODO()log = g.Log()
)func init() {//启动原始数据websocketgo rowDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {if consts.ZERO == businessDataWrapper.DataType { //原始数据,推送businessData := businessDataWrapper.BusinessDataif businessData.BusinessId == "" {//进行转换成json字符串businessDataJsonStr, _ := json.Marshal(businessData)//广播所有客户端broadCastSend(manager, c, businessDataJsonStr)return true}//找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的if strings.Contains(c.key, websocketRowDataCachePrefix(businessData.BusinessType, businessData.BusinessId, "", "")) {//进行转换成json字符串businessDataJsonStr, _ := json.Marshal(businessData)//广播指定客户端broadCastSend(manager, c, businessDataJsonStr)return true}return false}return false})//启动报警数据推送go alarmDataManagerNew.start(func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool {// 报警数据,推送if consts.ONE == businessDataWrapper.DataType {alarmBusinessData := businessDataWrapper.AlarmBusinessDataif alarmBusinessData.BusinessId != "" {//找到对应客户端 --可以通过拼接缓存key进行匹配也是可以的if strings.Contains(c.key, websocketAlarmCachePrefix("")) {//进行转换成json字符串alarmBusinessDataJsonStr, _ := json.Marshal(alarmBusinessData)//广播指定客户端broadCastSend(manager, c, alarmBusinessDataJsonStr)return true}}}return false})
}func NewWs() *webSocket {return &webSocket{}
}// websocketRowDataCachePrefix
//
//	@Author  zhaosy
//	@Description: 原始数据缓存前缀
//	@date  2024-07-16 18:15:55
func websocketRowDataCachePrefix(businessType, businessId, userName, clientId string) string {key := "websocket:rowdata"if lang.IsNotEmpty(businessType) {key = key + ":" + businessTypeif lang.IsNotEmpty(businessId) {key = key + ":" + businessIdif lang.IsNotEmpty(userName) {key = key + ":" + userName}}}if lang.IsNotEmpty(clientId) {key = key + ":" + clientId}return key
}// websocketAlarmCachePrefix
//
//	@Author  zhaosy
//	@Description: 报警数据前缀
//	@date  2024-07-16 19:36:24
func websocketAlarmCachePrefix(clientId string) string {//后期要加组织机构,有权限控制这块,需要进行处理,暂时先不去处理key := "websocket:alarm"if lang.IsNotEmpty(clientId) {key = key + ":" + clientId}return key
}type webSocket struct {
}// RawDataWSHandle
//
//	@Author  zhaosy
//	@Description: 原始数据websocket
//	@date  2024-07-16 15:00:04
func (w *webSocket) RawDataWSHandle(r *ghttp.Request) {//获取参数businessType := r.Get("businessType")businessId := r.Get("businessId")userName := r.Get("userName")registerRawDataClientConn(r.Response.BufferWriter, r.Request, businessType.String(), businessId.String(), userName.String())
}// AlarmWSHandle
//
//	@Author  zhaosy
//	@Description: 报警websocket处理器
//	@date  2024-07-16 19:43:57
func (w *webSocket) AlarmWSHandle(r *ghttp.Request) {registerAlarmClientConn(r.Response.BufferWriter, r.Request)
}// SendRowDataBusinessData
//
//	@Author  zhaosy
//	@Description: 接收业务数据进行推送到websocket
//	@date  2024-07-16 18:19:46
func SendRowDataBusinessData(data model.BusinessData) {rowDataManagerNew.broadcast <- model.SetRowDataWsWrapper(data)
}// SendAlarmBusinessData
//
//	@Author  zhaosy
//	@Description: 接收报警数据,推送到websocket客户端
//	@date  2024-07-16 19:42:13
func SendAlarmBusinessData(data model.AlarmBusinessData) {alarmDataManagerNew.broadcast <- model.SetAlarmWsWrapper(data)
}// 初始化websocket协议配置
var upgrader = websocket.Upgrader{ReadBufferSize:  1024,WriteBufferSize: 1024,CheckOrigin:     func(r *http.Request) bool { return true }, //允许跨域 、 允许同源
}// registerRawDataClientConn
//
//	@Author  zhaosy
//	@Description: 注册原始数据客户端连接
//	@date  2024-07-16 18:12:19
func registerRawDataClientConn(w http.ResponseWriter, r *http.Request, businessType string, businessId string, userName string) {if lang.IsEmpty(businessType) {io.WriteString(w, "businessType 不能为空")}//生成客户端conn, err := upgrader.Upgrade(w, r, nil)if err != nil {log.Error(ctx, err.Error(), err)io.WriteString(w, "这是一个websocket连接,不是API.")return}clientId := guid.S()key := websocketRowDataCachePrefix(businessType, businessId, userName, clientId)//初始化客户端对象client := &client{key:    key,socket: conn,send:   make(chan []byte),}rowDataManagerNew.register <- client//开启读go client.read(*rowDataManagerNew)//开起写go client.write(*rowDataManagerNew)}// registerAlarmClient
//
//	@Author  zhaosy
//	@Description: 注册报警客户端连接
//	@date  2024-07-16 19:40:52
func registerAlarmClientConn(w http.ResponseWriter, r *http.Request) {//生成客户端conn, err := upgrader.Upgrade(w, r, nil)if err != nil {log.Error(ctx, err.Error(), err)io.WriteString(w, "这是一个websocket,不是网站.")return}clientId := guid.S()key := websocketAlarmCachePrefix(clientId)//初始化客户端对象client := &client{key:    key,socket: conn,send:   make(chan []byte),}alarmDataManagerNew.register <- client//开启读go client.read(*alarmDataManagerNew)//开启写go client.write(*alarmDataManagerNew)}// 原始ws客户端管理器
var rowDataManagerNew = newClientManager()// 报警数据ws客户端管理器
var alarmDataManagerNew = newClientManager()// **********************************以下是websocket进行封装,可以直接使用******************************************// 创建客户端管理器
func newClientManager() *clientManager {return &clientManager{broadcast:  make(chan model.BusinessDataWrapper),register:   make(chan *client),unregister: make(chan *client),clients:    make(map[*client]bool),}
}// clientManager
// @Description: 客户端管理者
type clientManager struct {//客户端存储的地方,我这边是用map进行存储,这块可以放到redis上,也是可以的,根据情况扩展即可//这里可以使用map[string]*client,也是可以的,如果这样设计,方便后期进行匹配比较简单,直接匹配key即可,这种方式也是可以的,匹配客户端对象里的keyclients map[*client]bool//广播数据链,进行业务限制,如果没有业务限制,直接使用[]byte 比较通用;用chan进行异步处理broadcast chan model.BusinessDataWrapper//客户端注册链;用chan进行异步处理register chan *client//客户端关闭链;用chan进行异步处理unregister chan *client
}// client
// @Description: 客户端信息
type client struct {//每个客户端连接后都要进行生成唯一key,因为业务场景需求,不同的用户或设备接受的数据要一一对应,后期这块要会做权限控制key string//客户端连接对象socket *websocket.Conn//数据发送链send chan []byte
}// start
//
//	@Author  zhaosy
//	@Description: websocket启动
//	@date  2024-07-17 10:55:28
func (m *clientManager) start(callBackFunc func(c *client, manager *clientManager, businessDataWrapper model.BusinessDataWrapper) bool) {for {select {case client := <-m.register: //进行连接m.clients[client] = truemsg := "有一个新连接出现,已连接成功,客户端key:" + client.keylog.Info(ctx, msg)//发送数据,这块可以不用发送,等后续进行注释即可//m.send([]byte(msg), client)case client := <-m.unregister: //关闭连接,进行释放资源if _, ok := m.clients[client]; ok {close(client.send)delete(m.clients, client)msg := "客户端key:" + client.key + ",已关闭"log.Info(ctx, msg)//m.send([]byte(msg), client)}case businessDataWrapper := <-m.broadcast: //广播数据for client := range m.clients {//这里其实就是发送数据了,//进行转换成json字符串//businessDataJsonStr, _ := json.Marshal(businessDataWrapper)//broadCastSend(m, client, businessDataJsonStr)//进行回调,我这里面目的为了后期这块不在调整代码了,在启动时进行业务处理,方便后期扩展用的,如果毕竟简单,直接使用broadCastSend(m, client, businessDataJsonStr)进行推送信息callBackFunc(client, m, businessDataWrapper)}}}
}// 广播进行发送数据
func broadCastSend(manager *clientManager, client *client, businessDataJsonByte []byte) {select {case client.send <- businessDataJsonByte:default:fmt.Println("关闭连接了,,,,,")close(client.send)delete(manager.clients, client)}
}// send
//
//	@Author  zhaosy
//	@Description: 这快类似群发消息
//	@date  2024-07-16 16:40:37
func (m *clientManager) send(message []byte, ignore *client) {for client := range m.clients {//ignore,这是忽略本身客户端,因为这是群发消息,自己可以不用接收了if client != ignore {//将数据写入到通道链client.send <- message}}
}func (c *client) read(manager clientManager) {defer func() {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "客户端进行关闭")}()for {_, message, err := c.socket.ReadMessage()if err != nil {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "读数据出现异常,直接关闭。")break}//后期可以注释掉log.Info(ctx, c.key, "接收到客户端发送的数据", string(message))//读到数据,进行业务操作,目前我这边项目只需要推送到客户端即可,所以暂时不做业务了,其他需要做业务,这里做个监听即可}
}// write
//
//	@Author  zhaosy
//	@Description: 写入数据
//	@date  2024-07-16 16:52:47
func (c *client) write(manager clientManager) {defer func() {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "客户端进行关闭")}()for {select {//如果客户端有数据要进行写出去case message, ok := <-c.send:if !ok {c.socket.WriteMessage(websocket.CloseMessage, []byte{})log.Info(ctx, c.key, "发送关闭提示")return}//这里才是真正的把数据推送到客户端err := c.socket.WriteMessage(websocket.TextMessage, message)if err != nil {manager.unregister <- cc.socket.Close()log.Info(ctx, c.key, "数据写入失败,进行关闭!")break}}}
}

ok。结束

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 谷粒商城实战笔记-37-前端基础-Vue-基本语法插件安装
  • 【阿里OSS文件上传】SpringBoot实现阿里OSS对象上传
  • 【Vite】快速入门及其配置
  • WPF中UI元素继承关系
  • python os库使用教程
  • 【源码阅读】Sony的go breaker熔断器源码探究
  • C2W1.Assignment.Parts-of-Speech Tagging (POS).Part2
  • 算法日记day 15(二叉树的遍历)
  • 【Qt】QWidget核心属性相关API
  • 新版网页无插件H.265播放器EasyPlayer.js如何测试demo视频?
  • 深入浅出mediasoup—通信框架
  • 【BUG】已解决:ModuleNotFoundError: No module named ‘paddle‘
  • 【云原生】Kubernetes微服务Istio:介绍、原理、应用及实战案例
  • uniapp vue3 上传视频组件封装
  • 【ffmpeg命令】RTMP推流
  • 【技术性】Search知识
  • Bytom交易说明(账户管理模式)
  • express.js的介绍及使用
  • iOS高仿微信项目、阴影圆角渐变色效果、卡片动画、波浪动画、路由框架等源码...
  • Spring技术内幕笔记(2):Spring MVC 与 Web
  • vue+element后台管理系统,从后端获取路由表,并正常渲染
  • 阿里云前端周刊 - 第 26 期
  • 阿里云容器服务区块链解决方案全新升级 支持Hyperledger Fabric v1.1
  • 程序员该如何有效的找工作?
  • 好的网址,关于.net 4.0 ,vs 2010
  • 基于axios的vue插件,让http请求更简单
  • 前端性能优化--懒加载和预加载
  • 前端学习笔记之观察者模式
  • 什么是Javascript函数节流?
  • 使用 @font-face
  • 用jQuery怎么做到前后端分离
  • const的用法,特别是用在函数前面与后面的区别
  • TPG领衔财团投资轻奢珠宝品牌APM Monaco
  • # linux 中使用 visudo 命令,怎么保存退出?
  • #我与Java虚拟机的故事#连载04:一本让自己没面子的书
  • (03)光刻——半导体电路的绘制
  • (2)空速传感器
  • (C语言)输入一个序列,判断是否为奇偶交叉数
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第5节(封闭类和Final方法)
  • (附源码)springboot宠物管理系统 毕业设计 121654
  • (三十五)大数据实战——Superset可视化平台搭建
  • (十二)python网络爬虫(理论+实战)——实战:使用BeautfulSoup解析baidu热搜新闻数据
  • (算法)Game
  • (学习日记)2024.03.25:UCOSIII第二十二节:系统启动流程详解
  • (一)基于IDEA的JAVA基础12
  • (一)项目实践-利用Appdesigner制作目标跟踪仿真软件
  • *1 计算机基础和操作系统基础及几大协议
  • .java 指数平滑_转载:二次指数平滑法求预测值的Java代码
  • .NET Core 成都线下面基会拉开序幕
  • .net core 实现redis分片_基于 Redis 的分布式任务调度框架 earth-frost
  • .NET HttpWebRequest、WebClient、HttpClient
  • .net MySql
  • .Net程序帮助文档制作
  • .net程序集学习心得
  • @SpringBootApplication 包含的三个注解及其含义