当前位置: 首页 > news >正文

Go协程与通道的综合应用问题

1.简单了解什么是协程和通道

什么是协程

协程,是一种用户级的轻量级的线程,拥有独立的栈空间并共享程序的堆空间。

它是在单线程的基础上通过算法来实现的微线程,相比于多线程编程具有以下优点:

  • 协程的上下文切换由用户决定,无需系统内核的上下文切换,减少开销
  • 协程默认会做好全方位保护,以防止中断。无需原子操作锁
  • 单线程也可以实现高并发,甚至达到单核CPU就支持上万协程

什么是通道

通道,是一种用于协程之间进行通信的数据结构。类似于队列,一端为发送者,一端为接收者。使用通道可以很好地保证数据的同步性和顺序性。

通道分为有缓冲通道和无缓冲通道,其声明方式如下:

  • 有缓冲通道
intChan := make(chan int,<缓冲容量>)
  • 无缓冲通道
intChan := make(chan int)

有缓冲通道和无缓冲通道的区别:

  • 阻塞:无缓冲通道发送者会阻塞直到数据被接收;有缓冲通道发送者会阻塞直到缓冲区满,接收者会阻塞直到缓冲区不为空。
  • 数据同步和顺序:无缓冲通道保证数据的同步和顺序;有缓冲管道不保证数据的同步和顺序。
  • 应用场景:无缓冲通道要求严格的同步和顺序性;有缓冲通道可以异步通信并提高吞吐量。

在无缓冲通道的实现中需要注意的是,通道的两端必须存在发送者和接收者,否则会导致死锁。

2.协程-通道并发编程案例

(1)交替打印字母和数字

题意:使用协程-通道交替打印数字1-10和字母A-J。

代码:

package mainimport ("fmt""sync"
)/*
无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
解决办法:
避免死锁的发生:
当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
*/func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {defer wg.Done()for i := 1; i <= 10; i++ {<-alpCh // 等待字母goroutine发信号fmt.Print(i, " ")//避免死锁发生if i < 10 {numCh <- struct{}{} // 发信号给字母goroutine}if i == 10 {close(numCh)}}}func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {defer wg.Done()for i := 'A'; i <= 'J'; i++ {<-numCh // 等待数字goroutine发信号fmt.Printf("%c", i)alpCh <- struct{}{} // 发信号给数字goroutine}close(alpCh)
}func main() {numCh := make(chan struct{}) // 用于数字goroutine的信号通道alpCh := make(chan struct{}) // 用于字母goroutine的信号通道var wg sync.WaitGroupwg.Add(2)go printAlp(&wg, numCh, alpCh)go printNum(&wg, numCh, alpCh)// 启动时先给数字goroutine发送一个信号numCh <- struct{}{}wg.Wait()}

题目分析:

题目要求我们交替打印字母和数字,则需要保证两个协程的严格顺序性,符合无缓冲通道的应用场景。设置两个通道,分别存储数字和字母,两个打印数字和字母的协程分别担任两个通道的发送者和接收者的两重身份。循环打印一次,发一次信号,提醒另一个协程进行打印。

需要注意的是当最后一个字符'10'打印结束后,此时打印字母的协程已经结束,numCh通道已经没有接收者,此时已经不符合无缓冲通道的实现条件-必须存在发送者和接收者,再发送信号,会引起阻塞死锁。所以再第10次时不必再发送信号。

(2)设计一个任务调度器

题目:设计一个任务调度器,利用多协程+通道的编程模式,实现并发处理多任务的业务场景,且要求调度顺序按照任务添加顺序。

代码:

type scheduler struct {taskChan chan func()wt       sync.WaitGroup
}func (td *scheduler) AddTask(task func()) {td.taskChan <- task
}func (td *scheduler) Executer() {defer td.wt.Done()for {task, ok := <-td.taskChantask()if ok && len(td.taskChan) == 0 {break}}
}func (td *scheduler) Start() {td.wt.Add(4)//假设四个消费者for i := 0; i < 4; i++ {go td.Executer()}td.wt.Wait()
}func main() {sd := scheduler{taskChan: make(chan func(), 5),}go func() {sd.AddTask(func() {fmt.Println("任务1")})sd.AddTask(func() {fmt.Println("任务2")})sd.AddTask(func() {fmt.Println("任务3")})sd.AddTask(func() {fmt.Println("任务4")})sd.AddTask(func() {fmt.Println("任务5")})sd.AddTask(func() {fmt.Println("任务6")})close(sd.taskChan)}()sd.Start()}

问题分析:

由于添加的任务为多任务,不止一个,并且需要异步处理执行这些任务。符合有缓冲区的通道需要提高吞吐量和异步处理。

那么,我们需要将任务放进通道,多个接收者,按照顺序从通道中拿任务,并进行执行即可。

需要注意的问题是,如果在添加的任务数量大于通道的缓冲区,会在添加任务形成阻塞。为了不影响消费者的正常启动,需要将其单独开一个协程来添加任务。

这样当消费者进行消费时,形成阻塞的生产者会被唤醒,从而继续进行任务添加。

3.总结

经过对协程+通道的编程模式的学习,除了刚刚在题目中提到的,我们还应该注意以下问题:

1.为什么通道用完之后要关闭,不关闭有什么风险?

  • 为了避免死锁。关闭通道,也是告诉接收者,在发送者那里已经没有数据可以发送了,不需要再继续等待数据了。接收者收到通道关闭的信息后,停止接收数据;若不关闭通道,则会让接收者一直处于阻塞状态,有发生死锁的风险。
  • 释放资源和避免资源泄露。关闭通道后,系统会释放相应的资源,及时关闭通道则可以避免资源浪费和泄露。 

2. 怎么优雅地关闭通道?

首先,关闭通道的最基本原则是不要关闭已经关闭的通道。其次还有一个使用Go通道的原则:不要在数据接收方或者在有多个发送者的情况下关闭通道。换句话说我们只应该让一个通道唯一的发送者关闭此通道。

一种粗鲁的方式是通过异常恢复的方式来关闭通道,但很明显违反以上的原则且有可能发生数据竞争;另一种方式是sync.Once或sync.Mutex来关闭通道,当并不保证发生在一个通道上的并发关闭操作和发送操纵不会产生数据竞争。这两种方式都有一定的问题,就不过多介绍,下面介绍一种如何优雅地关闭通道的方法。

情形一:M个接收者和一个发送者

最容易处理的一种情形。当发送者需要结束发送时,让它关闭通道即可。上文的两个编程案例就是这种情形。

情形二:一个接收者和N个发送者

根据Go通道的基本原则,我们只能在通道的唯一发送者关闭通道。所以,在这种情况下,我们无法直接在某处关闭通道。但我们可以让接收者关闭一个额外的信号通道来告诉发送者不要再发送数据了

package mainimport ("log""sync"
)func main() {cosnt N := 5cosnt Max := 60000count := 0dataCh := make(chan int)stopCh := make(chan bool)var wt sync.WaitGroupwt.Add(1)//发送者for i := 0; i < N; i++ {go func() {for {select {case <-stopCh:returndefault:count += 1dataCh <- count}}}()}//接收者go func() {defer wt.Done()for value := range dataCh {if value == Max {// 此唯一的接收者同时也是stopCh通道的// 唯一发送者。尽管它不能安全地关闭dataCh数// 据通道,但它可以安全地关闭stopCh通道。close(stopCh)return}log.Println(value)}}()wt.Wait()
}

在这种方法中,我们额外增加了一个信号通道stopCh,在接收方通过它来告诉发送者不必再接收数据。并且,此方法并没有对dataCh进行关闭,当一个通道不再被任何协程使用时,它将会逐渐被垃圾回收掉,无论它是否已经被关闭。

此方法的优雅性就在于通过关闭一个通道来停止另一个通道的使用,从而间接关闭另一个通道。

情形三:M个接收者N个发送者

我们不能让接收者和发送者中的任何一个关闭用来传输数据的通道,我们也不能让多个接收者之一关闭一个额外的信号通道。这两种做法都违反了通道关闭原则。

不过,我们可以引入一个中间调解者角色并让其关闭额外的信号通道来通知所有接收者和发送者结束工作

代码示例:

package mainimport ("log""math/rand""strconv""sync"
)func main() {const Max = 100000const NumReceivers = 10const NumSenders = 1000var wt sync.WaitGroupwt.Add(NumReceivers)dataCh := make(chan int)stopCh := make(chan struct{})// stopCh是一个额外的信号通道。它的发送// 者为中间调解者。它的接收者为dataCh// 数据通道的所有的发送者和接收者。toStop := make(chan string, 1)// toStop是一个用来通知中间调解者让其// 关闭信号通道stopCh的第二个信号通道。// 此第二个信号通道的发送者为dataCh数据// 通道的所有的发送者和接收者,它的接收者// 为中间调解者。它必须为一个缓冲通道。var stoppedBy string// 中间调解者go func() {stoppedBy = <-toStopclose(stopCh)}()// 发送者for i := 0; i < NumSenders; i++ {go func(id string) {for {value := rand.Intn(Max)if value == 0 {// 为了防止阻塞,这里使用了一个尝试// 发送操作来向中间调解者发送信号。select {case toStop <- "发送者:" + id:default:}return}select {case <-stopCh:returncase dataCh <- value:}}}(strconv.Itoa(i))}// 接收者for i := 0; i < NumReceivers; i++ {go func(id string) {defer wt.Done()for {select {case <-stopCh:returncase value := <-dataCh:if value == Max {// 发送操作来向中间调解者发送信号。select {case toStop <- "接收者:" + id:default:}return}log.Println(value)}}}(strconv.Itoa(i))}wt.Wait()log.Println("被" + stoppedBy + "终止了")}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 240707-Sphinx配置Pydata-Sphinx-Theme
  • Linux tputs
  • vb.netcad二开自学笔记9:界面之ribbon
  • linux源码安装mysql8.0的小白教程
  • Nginx和Tomcat实现负载均衡群集部署应用
  • k8s record 20240705
  • 视频号矩阵系统源码,实现AI自动生成文案和自动回复私信评论,支持多个短视频平台
  • Android Camera Framework:从基础到高级
  • vue3+springboot+mybatis+mysql项目实践--简单登录注册功能实现
  • seaweedfs + TiKV 部署保姆级教程
  • C语言文件操作技术详解
  • React组件间通信的几种方式
  • Spring相关的面试题
  • vue3中使用 tilwindcss报错 Unknown at rule @tailwindcss
  • QT之嵌入外部第三方软件到本窗体中
  • IE9 : DOM Exception: INVALID_CHARACTER_ERR (5)
  • 【css3】浏览器内核及其兼容性
  • Android 初级面试者拾遗(前台界面篇)之 Activity 和 Fragment
  • HTTP传输编码增加了传输量,只为解决这一个问题 | 实用 HTTP
  • Java知识点总结(JavaIO-打印流)
  • JS笔记四:作用域、变量(函数)提升
  • markdown编辑器简评
  • mysql innodb 索引使用指南
  • SegmentFault 2015 Top Rank
  • 对超线程几个不同角度的解释
  • 基于axios的vue插件,让http请求更简单
  • 京东美团研发面经
  • 码农张的Bug人生 - 见面之礼
  • 面试遇到的一些题
  • 前端技术周刊 2019-02-11 Serverless
  • 强力优化Rancher k8s中国区的使用体验
  • 微信小程序--------语音识别(前端自己也能玩)
  • 小程序01:wepy框架整合iview webapp UI
  • 一个JAVA程序员成长之路分享
  • ​低代码平台的核心价值与优势
  • ​如何在iOS手机上查看应用日志
  • ​学习笔记——动态路由——IS-IS中间系统到中间系统(报文/TLV)​
  • # Redis 入门到精通(九)-- 主从复制(1)
  • # 数仓建模:如何构建主题宽表模型?
  • #我与Java虚拟机的故事#连载16:打开Java世界大门的钥匙
  • (10)Linux冯诺依曼结构操作系统的再次理解
  • (10)STL算法之搜索(二) 二分查找
  • (C语言)strcpy与strcpy详解,与模拟实现
  • (html转换)StringEscapeUtils类的转义与反转义方法
  • (MTK)java文件添加简单接口并配置相应的SELinux avc 权限笔记2
  • (NO.00004)iOS实现打砖块游戏(九):游戏中小球与反弹棒的碰撞
  • (poj1.2.1)1970(筛选法模拟)
  • (Qt) 默认QtWidget应用包含什么?
  • (vue)页面文件上传获取:action地址
  • (安全基本功)磁盘MBR,分区表,活动分区,引导扇区。。。详解与区别
  • (创新)基于VMD-CNN-BiLSTM的电力负荷预测—代码+数据
  • (二)基于wpr_simulation 的Ros机器人运动控制,gazebo仿真
  • (附源码)spring boot校园健康监测管理系统 毕业设计 151047
  • (接口封装)
  • (欧拉)openEuler系统添加网卡文件配置流程、(欧拉)openEuler系统手动配置ipv6地址流程、(欧拉)openEuler系统网络管理说明