当前位置: 首页 > news >正文

Golang TCP/IP服务器/客户端应用程序,设计一个简单可靠帧传送通信协议。(并且正确处理基于流式控制协议,带来的应用层沾帧[沾包]问题)

在 Golang 语言标准库之中提供了,对于TCP/IP链接、侦听器的高级封装支持,这易于上层开发人员轻松基于这些BCL(基础类库)实现期望的功能。

TCP/IP链接(客户端)

net.Conn 接口

TCP/IP侦听器(服务器)

net.Listener

Golang 提供了易用的写入数据到远程(对端)实现,而不比像 C/C++ 这类传统的编程语言,人们需要自行处理发送的字节数。

例如:

原生:send、WSASend、WSPSend 等函数

ASIO:stream::async_write_some    等函数

它与 Microsoft .NET 提供的 System.Net.Socket 类的发送函数功能是类似的,调用该函数发送数据,它会确保数据全部发送到对端(远端),否则视为失败。

在实际生产环境之中,绝大多数的场景上面,人们的确不需要调用一次发送函数,但不保证本次期望传送数据全部发送成功,而是潜在的可能只发送一部分,还需要开发人员自行处理,这样繁琐的TCP网络程序发送实现的。

但这在一些特定场景的网络程序上面是有意义的,例如我们需要知道已用掉了多少的流量,因为这一次缓冲区发送并没有全部传送到远端,但已经传送了一部分也生产了网络带宽资源的浪费,所以,像这种问题,Golang 不提供类似接口,它这块的不自由,是会有一些问题的。

较为庆幸的是:

net.Conn 接口提供的 Read 函数并非是保证一定读入期望BUF大小的,否则这个在很多类型的网络程序上面就很坑人了。

它就相当于传统阻塞的 recv,不会出现非阻塞的EAGIN要求开发人员重试的操作的问题,所有它只有返回已收到的字节数,或发生错误。

当然人们仍需处理一个特殊的情况,recv 可能返回FIN 0字节,但并非错误,这是因为对端正确的关闭了TCP链接时产生的。

但遇到类似这类型的场景还是用 C/C++、或者CGO调用原生C API来实现把,功能上面都可以解决,只是用GO语言整会很麻烦就是了。

本文提供一个简单的网络传输协议,适用四个字节来表示长度,一个字节来表示关键帧字,不考虑对于流的效验合(checksum)的计算及验证,人们若有需求可以自行修改,在大多数的TCP应用协议服务器上面,它都可以经过少量修改集成到解决方案之中。(Go 语言之中或许该称为集成到 Package 程序包之中)

四个字节长度,可以描述到一帧最大 INT32_MAX(2147483647)字节封装传送,其实绝大多数情况传递大包是没有太大意义的,人们可以自行评估调整。

值得一提,在绝大多数的场景之中,如若产生大包,三个字节来表示长度,人们自行位运算即可,这是因为过大的帧长,可能会导致网络程序在接受这些大数据帧时,产生严重的内存恐慌问题。

个人一个好的建议是,对于追求网络吞吐性能的TCP应用协议,人们在适用 Golang 应该直接废弃掉,没有任何意义的各种接口及封装实现,如返回  io.Reader,并且应当适用固定缓冲区的最大帧对齐,如:4K,即用户不要发送超过最大对齐(4K)的单帧报文。

随机内存分配会导致碎片化的产生,影响网络程序的吞吐能力,同时频繁的内存复制也会导致内存、及CPU计算资源负载升高。

但在大多数场景的网络程序来说,并不需要在意这块的优化,因为没有太大意义,但对于纯网络IO密集型应用来说,这是有很大必要的。

本文提供的实现不适用上述场景,但可以适用于略微带一些大包处理(即用户不愿意在业务层分片、组片的场景),但本人更希望大家趋近于共同学习目的。

运行测试:

go run -race test.go

服务器及客户端实现及封装:(含测试用例)

main.go

package mainimport ("encoding/binary""errors""fmt""io""math""math/rand""net""strconv""sync""time"
)type _ConnectionReader struct {owner       *Connectionlength      intoffset      intchecksum    uint32header_recv []bytelock_recv   sync.Mutex
}type Connection struct {disposed    boolconnection  net.Connheader_send []bytelock_sent   sync.Mutexreader      *_ConnectionReaderlistener    *Listener
}type Listener struct {sync.Mutexdisposed    boollistener    net.Listenerconnections map[*Connection]bool
}/*
#pragma pack(push, 1)
typedef struct {BYTE  bKf;       // 关键帧字DWORD dwLength;  // 载荷长度
} PACKET_HEADER;
#pragma pack(pop)static constexpr int PACKET_HEADER_SIZE = sizeof(PACKET_HEADER); // 4 + 1 = 5 BYTE
*/const (_CONNECTION_PACKET_HEADER_KF   = 0x2A // 置关键帧字_CONNECTION_PACKET_HEADER_SIZE = 5CONNECTION_MIN_PORT            = 0CONNECTION_MAX_PORT            = math.MaxUint16
)var ErrConnectionClosed = errors.New("connection has been closed")
var ErrConnectionArgP = errors.New("the parameter p cannot be incorrectly null or array length 0")
var ErrConnectionProtocolKf = errors.New("network protocol error, kf check error")
var ErrConnectionProtocolLength = errors.New("network protocol error, length check error")
var ErrConnectionArgAcceptor = errors.New("the acceptor parameter cannot be null")
var ErrConnectionDisconnect = errors.New("connection has been disconnect")// 功能名:发送数据
// 返回值:
// <  0 发送错误(ERR)
// == 0 链接断开(FIN)
// >  0 已发送字节数
func (my *Connection) Send(buffer []byte, offset int, length int) int {// 对于欲发送数据的参数检查if buffer == nil || offset < 0 || length < 1 {return -1}// 检查是否溢出BUFF缓存大小len := len(buffer)if offset+length > len {return -1}// 检查链接是否存在connection := my.connectionif connection == nil {return -1}// 预备环境及变量bytes_transferred := 0sync := &my.lock_sentheader := my.header_sendpayload := buffer[offset : offset+length]// 如果可以直接获取到信号,否则其它协同程序就等待发送结束,不要用管道这些莫名其妙的东西。sync.Lock()defer sync.Unlock()// 检查当前链接是否已经释放if my.disposed {return -1}// 先发送协议帧头header[0] = _CONNECTION_PACKET_HEADER_KFbinary.BigEndian.PutUint32(header[1:], uint32(length))written_size, err := connection.Write(header)if err != nil {return -1} else {bytes_transferred += written_size}// 在发送协议载荷written_size, err = connection.Write(payload)if err != nil {return -1}// 加上已传送的字节数bytes_transferred += written_sizereturn bytes_transferred
}// 功能名:收取数据
// 上个 Reader 未完成之前一直阻塞当前协程直到对方结束后返回
func (my *Connection) Receive() io.Reader {// 检查当前链接是否已经释放if my.disposed {return nil}// 检查链接是否存在connection := my.connectionif connection == nil {return nil}// 返回帧读入器reader := my.readerreader.lock_recv.Lock()return reader
}// 功能名:实例化一个链接对象
func NewConnection(conn net.Conn, listener *Listener) *Connection {var connection *Connectionif conn != nil {connection = &Connection{disposed:    false,connection:  conn,listener:    listener,header_send: make([]byte, _CONNECTION_PACKET_HEADER_SIZE),}connection.reader = &_ConnectionReader{owner:       connection,length:      0,offset:      0,checksum:    0,header_recv: make([]byte, _CONNECTION_PACKET_HEADER_SIZE),}}return connection
}// 功能名:链接主机
func Connect(host string, port int) *Connection {// 检查端口参数的有效性if port <= CONNECTION_MIN_PORT || port > CONNECTION_MAX_PORT {return nil}// 服务器主机地址不可为空if len(host) < 1 {return nil}// 服务器地址并且尝试链接address := host + ":" + strconv.Itoa(port)conn, err := net.Dial("tcp", address)if err != nil {return nil}// 返回TCP链接的封装对象return NewConnection(conn, nil)
}// 功能名:关闭链接(网络)
func (my *Connection) close(connection net.Conn) error {// 强制关闭链接,但可能会失败if my.disposed {return nil}my.disposed = truereturn connection.Close()
}// 功能名:关闭链接
func (my *Connection) Close(await bool) (err error) {// 检查链接是否存在connection := my.connectionif connection == nil {return}// 如果可以直接获取到信号,否则其它协同程序就等待发送结束,不要用管道这些莫名其妙的东西。sync := &my.lock_sentif await {sync.Lock()sync.Unlock()// 检查当前链接是否已经释放err = my.close(connection)} else {err = my.close(connection)}// 如果是服务器接受的链接对象,就从服务器列表之中删除这个链接实例。listener := my.listenerif listener != nil {listener.Lock()delete(listener.connections, my)listener.Unlock()}return
}func (my *Connection) connection_get_ip_end_point(remote bool) string {connection := my.connectionif connection == nil {return ""}var address net.Addrif remote {address = connection.RemoteAddr()} else {address = connection.LocalAddr()}if address == nil {return ""}return address.String()
}// 功能名:获取远程地址
func (my *Connection) GetRemoteEndPoint() string {return my.connection_get_ip_end_point(true)
}// 功能名:获取本地地址
func (my *Connection) GetLocalEndPoint() string {return my.connection_get_ip_end_point(false)
}// 功能名:读入帧数据
func (my *_ConnectionReader) Read(p []byte) (n int, err error) {// 检查当前链接是否已经释放owner := my.ownerif owner.disposed {return 0, ErrConnectionClosed}// 检查参数P不可以为NUL或数组长度为0length := len(p)if length < 1 {return 0, ErrConnectionArgP}// 帧已经被全部收取完成if my.length < 0 {my.length = 0my.lock_recv.Unlock()return 0, io.EOF}// 收取协议报文的头部if my.length == 0 {header := my.header_recvn, err := io.ReadFull(owner.connection, header)if err != nil {return n, err}// 判断协议关键帧字kf := header[0]if kf != _CONNECTION_PACKET_HEADER_KF {return 0, ErrConnectionProtocolKf}// 检查载荷的总长度my.length = int(binary.BigEndian.Uint32(header[1:]))my.offset = 0my.checksum = 0if my.length < 1 {return 0, ErrConnectionProtocolLength}}// 循环收取数据到缓存区P之中remain := my.length - my.offsetif length <= remain {n, err = owner.connection.Read(p)} else {n, err = owner.connection.Read(p[:remain])}// 从链接之中读入数据出现错误if err != nil {return n, err}// 是否收取到FIN字节(0)if n < 1 {return n, ErrConnectionDisconnect}// 计算当前帧是否已经收取完毕my.offset += nif my.offset < my.length {return n, nil} else {my.offset = 0my.length = -1my.checksum = 0return n, nil}
}// 功能名:实例化一个侦听器
func NewListener(host string, port int) *Listener {// 检查端口参数的有效性if port <= CONNECTION_MIN_PORT || port > CONNECTION_MAX_PORT {return nil}// 服务器主机地址不可为空if len(host) < 1 {return nil}// 服务器地址并且尝试绑定address := host + ":" + strconv.Itoa(port)listener, err := net.Listen("tcp", address)if err != nil {return nil}return &Listener{disposed:    false,listener:    listener,connections: make(map[*Connection]bool),}
}// 功能名:侦听服务器
func (my *Listener) ListenAndServe(acceptor func(*Connection)) error {// 接收器参数不可以为空if acceptor == nil {return ErrConnectionArgAcceptor}// 网络侦听器已经关闭if my.disposed {return ErrConnectionClosed}any := falselistener := my.listenerfor {// 网络如果已经被关闭了if my.disposed {return nil}// 尝试接收一个网络链接conn, err := listener.Accept()if err != nil {if any {return nil} else {return err}}// 如果没有获取到链接的引用则迭代到下个链接接受if conn == nil {continue}// 构建一个封装的网络链接对象connection := NewConnection(conn, my)my.Lock()my.connections[connection] = truemy.Unlock()// 启动对于链接处理的协同程序go acceptor(connection)}
}// 功能名:关闭全部链接
func (my *Listener) Close() {// 强制关闭服务器的侦听器listener := my.listenerif listener != nil {listener.Close()}// 释放全部持有的托管资源my.Lock()my.disposed = trueconnections := my.connectionsmy.connections = make(map[*Connection]bool)my.Unlock()// 强制关闭全部的网络链接for connection := range connections {connection.Close(false)}
}func test() {rand.Seed(time.Now().UnixNano())// 链接服务器packet := 0connection := Connect("127.0.0.1", 11111)for i, c := 0, rand.Intn(100)+1; i < c; i++ {length := rand.Intn(128) + 1buffer := make([]byte, length)for j := 0; j < length; j++ {buffer[j] = byte(rand.Intn(26)) + 97}// 发送数据transferred := connection.Send(buffer, 0, length)if transferred < 1 {break} else {// 接受数据r := connection.Receive()if r == nil {break}// 读取全部数据(一帧)buf, err := io.ReadAll(r)if err != nil {break} else if len(buf) < 1 {break}// 打印收到的帧数据packet++fmt.Printf("[%s]: client packet=%d length=%d string:%s\r\n", time.Now().Format("2006-01-02 15:04:05"), packet, len(buf), string(buf))}}// 关闭链接connection.Close(true)// 客户端关闭网络链接fmt.Printf("[%s]: %s\r\n", time.Now().Format("2006-01-02 15:04:05"), "client connection closed")
}func main() {// 运行客户端测试协程go test()// 打开服务器侦听器哟listener := NewListener("127.0.0.1", 11111)listener.ListenAndServe(func(c *Connection) {packet := 0remoteEP := c.GetRemoteEndPoint()for {// 获取网络接收器r := c.Receive()if r == nil {break}// 读取全部数据(一帧)buf, err := io.ReadAll(r)if err != nil {break} else if len(buf) < 1 {break}// 打印收到的帧数据packet++fmt.Printf("[%s]: server packet=%d length=%d remote=%s string:%s\r\n", time.Now().Format("2006-01-02 15:04:05"), packet, len(buf), remoteEP, string(buf))// 回显客户端的数据transferred := c.Send(buf, 0, len(buf))if transferred < 1 {break}}// 关闭客户端链接c.Close(true)// 服务器关闭网络链接fmt.Printf("[%s]: %s\r\n", time.Now().Format("2006-01-02 15:04:05"), "server connection closed")})
}

相关文章:

  • Linux调试器-gdb使用
  • 免费的WordPress插件大全
  • 生命在于折腾——WeChat机器人的研究和探索
  • c++ 中 什么是转交函数
  • PaddleNLP 如何打包成Windows环境可执行的exe?
  • git bash右键菜单失效解决方法
  • Linux 一键部署influxd2-telegraf
  • vue3模板中使用全局常量和全局方法
  • 小型内衣洗衣机什么牌子好?小型洗衣机全自动
  • 前端JavaScript篇之找出数组中重复的数字、js中数组是如何在内存中存储的?原生遍历数组的方式有哪些?请对以下数组,根据 `born` 的值降序排列
  • AI对比:ChatGPT和文心一言的区别和差异
  • 【python3零基础入门】No6.Python推导式学习
  • 有哪些简单好用、适合中小型企业的CRM系统?
  • openssl3.2/test/certs - 018 - trust variants: +anyEKU, -anyEKU
  • 多行SQL转成单行SQL
  • JS创建对象模式及其对象原型链探究(一):Object模式
  • Mybatis初体验
  • NSTimer学习笔记
  • Object.assign方法不能实现深复制
  • PV统计优化设计
  • scrapy学习之路4(itemloder的使用)
  • Spring框架之我见(三)——IOC、AOP
  • Traffic-Sign Detection and Classification in the Wild 论文笔记
  • Transformer-XL: Unleashing the Potential of Attention Models
  • ucore操作系统实验笔记 - 重新理解中断
  • 成为一名优秀的Developer的书单
  • 机器学习学习笔记一
  • 买一台 iPhone X,还是创建一家未来的独角兽?
  • 微信开放平台全网发布【失败】的几点排查方法
  • 协程
  • 最近的计划
  • 交换综合实验一
  • #QT项目实战(天气预报)
  • #预处理和函数的对比以及条件编译
  • (04)Hive的相关概念——order by 、sort by、distribute by 、cluster by
  • (52)只出现一次的数字III
  • (笔试题)分解质因式
  • (论文阅读31/100)Stacked hourglass networks for human pose estimation
  • (论文阅读32/100)Flowing convnets for human pose estimation in videos
  • (强烈推荐)移动端音视频从零到上手(下)
  • (完整代码)R语言中利用SVM-RFE机器学习算法筛选关键因子
  • (转)如何上传第三方jar包至Maven私服让maven项目可以使用第三方jar包
  • .Mobi域名介绍
  • .NET 8 编写 LiteDB vs SQLite 数据库 CRUD 接口性能测试(准备篇)
  • .Net FrameWork总结
  • .net 微服务 服务保护 自动重试 Polly
  • .NET 依赖注入和配置系统
  • .net利用SQLBulkCopy进行数据库之间的大批量数据传递
  • /bin/bash^M: bad interpreter: No such file ordirectory
  • /proc/stat文件详解(翻译)
  • @FeignClient注解,fallback和fallbackFactory
  • [ Linux 长征路第二篇] 基本指令head,tail,date,cal,find,grep,zip,tar,bc,unname
  • []常用AT命令解释()
  • [16/N]论得趣
  • [202209]mysql8.0 双主集群搭建 亲测可用