当前位置: 首页 > news >正文

go中的并发处理

. Goroutines

概念:

Goroutines 是 Go 的核心并发机制。它们是由 Go 运行时管理的轻量级线程,具有比操作系统线程更少的开销。每个 goroutine 只需少量的内存(大约 2KB),并且由 Go 运行时负责调度和管理,哪怕是java发展到21的虚拟线程和go比也还是不够轻量

创建:

go func() {fmt.Println("Hello from goroutine")
}()

使用 go 关键字前缀一个函数调用即可创建一个新的 goroutine。它将异步执行指定的函数。
调度模型:

Go 使用 M调度模型,其中 M 个用户级线程(goroutines)通过 N 个操作系统线程(OS threads)进行调度。Go 运行时会动态地将 goroutines 分配到操作系统线程上,减少了上下文切换的开销。
底层原理:
Go 的调度器使用协作式调度,依靠 goroutine 的堆栈跟踪(stack traces)和调度策略(如抢占式调度)来管理并发执行。调度器负责在 goroutines 和 OS threads 之间进行合理分配,以实现高效的并发执行。
###Channel

概念:
Channel 是用于 goroutine 之间通信的管道,可以安全地传递数据。它们实现了数据传递的同步机制,避免了传统的锁竞争问题。
创建:

ch := make(chan int)

发送和接收:

// 发送数据
ch <- 42// 接收数据
value := <-ch
//数据通过 <- 操作符在 channel 中发送和接收。发送操作会阻塞直到有接收方,接收操作会阻塞直到有发送方。

这样就可以简单模拟java的join 操作 ,只有上一个任务执行完才允许接下来的线程执行

func testTrueCache() {//如果是无缓冲的 需要立即读取 所以异步写入ch := make(chan string)go func() {fmt.Println("hello")ch <- "hello"}()fmt.Println(<-ch)//不采用读取 那么可能子携程还没有执行主协诚就结束了fmt.Print("这个输出第一是在hello之后")
}

缓冲区:

无缓冲 channel:发送和接收操作必须匹配,否则会阻塞。
注意 串行的代码块不能写如通道会立即死锁 ,因为 无缓冲通道在写入数据时必须立刻有其他协程来读取数据,否则会导致阻塞。串行中,写入数据和读取数据是在同一个协程中进行的,阻塞后根本不会执行下一个代码 这会导致死锁。

func testFialChannelCache() {// 创建无缓冲管道ch := make(chan int) // 正因为无缓冲管道无法存放数据,在向管道写入数据时必须立刻有其他协程来读取数据defer close(ch)// 启动一个新的协程来读取数据go func() {n := <-chfmt.Println(n)}()// 写入数据ch <- 123 // 不在串行 写入后必须读取 没有容量可以保存 会报错
}

在这里插入图片描述

缓冲 channel:创建时指定缓冲区大小,发送操作只有在缓冲区满时才会阻塞,接收操作只有在缓冲区空时才会阻塞。
go

ch := make(chan int, 2) // 创建一个缓冲区大小为 2 的 channel

关闭:

可以使用 close 函数关闭 channel,以表明没有更多数据将被发送。

//建议创建号channel后就使用defer close(ch)
关闭后的 channel 仍然可以读取数据,但不能再发送数据。读取操作会返回 channel 的零值。

底层原理:

Channel 是基于锁和条件变量实现的。每个 channel 有一个缓冲区和一个互斥锁,用于协调数据的发送和接收操作。

只读通道和只写通道的申明创建

var ch <-chan int // 只读通道,只能接收 int 类型的数据
var ch chan<- int // 只写通道,只能发送 int 类型的数据

WaitGroup

刚才使用无缓冲的通道模拟 java中的join api 效果
概念:

sync.WaitGroup 用于等待一组 goroutine 完成任务,确保所有并发操作完成后才继续执行。

var wg sync.WaitGroup
wg.Add(1) // 增加等待计数
go func() {defer wg.Done() // 任务完成时减少计数// 任务代码
}()
wg.Wait() // 等待所有任务完成

底层就是维护了一个原子性的任务数 wg.Done() 进行任务数自减1wg.Wait() // 等待所有任务完成 只有为0才会进行执行

这样就更直观的可以发出来 可以保证多个无序携程的有序性

func quicktest() {var wait sync.WaitGroup// 指定子协程的数量  本质是原子性的自增自减wait.Add(1) //自增go func() {fmt.Println(1)// 执行完毕wait.Done() //自减}()// 等待子协程wait.Wait() //判断是否为0 阻塞等待 只有为0才会继续执行 这样就保证了执行的有序性wait.Add(1)go func() {fmt.Println(2)// 执行完毕wait.Done() //自减}()wait.Wait()wait.Add(1)go func() {fmt.Println(3)// 执行完毕wait.Done() //自减}()wait.Wait()wait.Add(1)go func() {fmt.Println(4)// 执行完毕wait.Done() //自减}()// 等待子协程wait.Wait() //判断是否为0 阻塞等待fmt.Println("完成")
}

Select

概念:

select 语句允许一个 goroutine 等待多个 channel 操作,并在其中一个操作准备好时执行相应的代码块。它类似于 Java 中的 Selector,用于处理多个异步事件,和netty一样 用到了seletor 多路复用思想
使用:

go
基本用法 类似 swtich case go中的stich 不会像Java一样需要显示申明return返回

select {
case msg := <-ch1:fmt.Println("Received from ch1:", msg)
case msg := <-ch2:fmt.Println("Received from ch2:", msg)
case <-time.After(time.Second):fmt.Println("Timeout")
}

select 语句会阻塞,直到其中一个 case 准备好。time.After 是一个用于超时的示例,它创建一个定时器 channel。
默认分支:

可以使用 default 分支来避免 select 阻塞。

func main() {test()
}
func test() {chA := make(chan int)chB := make(chan int)chC := make(chan int)defer func() {close(chA)close(chB)close(chC)}()// 开启一个新的协程go func() {// 向A管道写入数据chA <- 1}()go func() {// 向A管道写入数据chB <- 1}()go func() {// 向A管道写入数据chC <- 1}()// 选择器 这样就可以监控 多个通道 有数据的时候就触发 如果都没有数据 就阻塞// 这里的select 语句会阻塞,直到有某个通道有数据可读//如果都同时 检查到有数据可读,则随机选择一个进行读取
Loop: //break Loop 后不会执行:后的代码了for {select {case n, ok := <-chA:fmt.Println("写入通道A")fmt.Println(n, ok)case n, ok := <-chB:fmt.Println("写入通道B")fmt.Println(n, ok)case n, ok := <-chC:fmt.Println("写入通道C")fmt.Println(n, ok)//超时时间跳出进行下一次循环case <-time.After(time.Second): // 设置1秒的超时时间 //返回只读通通道break Loop // 退出循环 避免死循环锁毒素 还可以一直监听}}}

底层原理:

select 通过轮询和系统调用来检查多个 channel 的状态,使用系统级的 poll 或 epoll(在 Linux 上)机制来实现高效的 I/O 多路复用。

go中的锁也相对轻量

Mutex(互斥锁):

概念:

sync.Mutex 用于保护临界区,确保在同一时间只有一个 goroutine 可以访问共享资源。

使用:

var mu sync.Mutex
mu.Lock()
// 临界区代码
mu.Unlock()

比如这种代码进行应用

/*
*
这种情况肯定是脏数据的 10个携程并发执行肯定有脏数据 并且还无法确 还没有等待确定有序性解决方案:
. 加锁 互斥锁l
*/
func main() {wait.Add(10) // 等待10个协程完成for i := 0; i < 10; i++ {go func(data *int) {// 模拟访问耗时// 加锁 同一时间只有有所的次啊可以执行lock.Lock()time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))// 访问数据temp := *data// 模拟计算耗时time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))ans := 1// 修改数据*data = temp + anslock.Unlock() //数据修改完毕了才可以解锁fmt.Println(*data)wait.Done()}(&count)}wait.Wait()fmt.Println("最终结果", count)
}

只有数据处理时候加锁才能保证有序性,多个携程并发处理数据 会造成脏数据

底层原理:基于自旋锁和操作系统的原子操作实现,使用互斥锁来保护数据的一致性。

RWMutex(读写锁):

概念:

sync.RWMutex 允许多个 goroutine 同时读取数据,但在写操作时独占访问。本质就是为了优化读写分离的情况,写锁还是写携程之间互斥,读锁是为了保证有读锁的携程允许时候写锁的携程就堵塞

func main() {wait.Add(12) //12个任务携程// 读多写少go func() {for i := 0; i < 3; i++ {go Write(&count)}wait.Done()}()go func() {for i := 0; i < 7; i++ {go Read(&count)}wait.Done()}()// 等待子协程结束wait.Wait()fmt.Println("最终结果", count)
}func Read(i *int) {time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))rw.RLock()fmt.Println("拿到读锁")time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))fmt.Println("释放读锁", *i)rw.RUnlock()wait.Done()
}func Write(i *int) {time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))rw.Lock()fmt.Println("拿到写锁")temp := *itime.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))*i = temp + 1fmt.Println("释放写锁", *i)rw.Unlock()wait.Done()
}

Context

概念:

context 包用于在 goroutine 中传递取消信号、截止时间和请求范围的信息,类似于 Java 中的 Future 或 ExecutorService 的取消机制。

在 Go 中,context 包提供了上下文管理的功能,主要用于控制 goroutine 的生命周期、传递请求范围的值以及处理超时和取消信号。context 是在并发编程中不可或缺的工具,尤其是在处理网络请求和后台任务时。

Context 的主要用途

  • 控制 goroutine 的生命周期:通过 context,你可以在父任务结束或取消时,通知所有子 goroutine 停止执行,从而避免 goroutine 泄漏。

  • 传递请求范围的值:context 可以在函数调用链之间传递值,这些值通常是与请求有关的信息,比如用户身份、授权令牌、请求截止时间等。

  • 处理超时和取消信号:context 可以设定超时时间或在外部取消信号时终止操作,这对于网络请求和长时间运行的任务非常有用。

Context 的类型

  • context.Background():通常作为主函数的起点使用,表示一个空的上下文。
  • context.TODO():当你还不确定要用什么样的上下文时,可以使用它作为占位符。

Context 的使用模式
在实际编程中,context 通常以以下几种方式使用:

取消任务:
当需要在某个操作完成后取消所有相关的 goroutine 时,使用 context.WithCancel。
超时控制:
当一个操作需要在指定时间内完成时,使用 context.WithTimeout。
截止时间控制:
与超时类似,但使用的是具体的时间点而不是时间间隔,使用 context.WithDeadline。
Context 实例详细讲解
go

package mainimport ("context""fmt""time"
)// 模拟一个处理请求的函数
func processRequest(ctx context.Context, duration time.Duration) {select {case <-time.After(duration):fmt.Println("Request processed")case <-ctx.Done():fmt.Println("Request canceled:", ctx.Err())}
}func main() {// 创建一个 context,设置超时时间为 3 秒ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)defer cancel() // 确保取消函数被调用,以释放资源// 启动一个 goroutine 来处理请求,预计处理时间为 5 秒go processRequest(ctx, 5*time.Second)// 模拟一些其他操作time.Sleep(2 * time.Second)// 主 goroutine 等待 6 秒,以便观察子 goroutine 的处理情况time.Sleep(6 * time.Second)fmt.Println("Main function ends")
}

详细解释
创建 Context:context.WithTimeout(context.Background(), 3*time.Second) 创建了一个有超时限制的 context,3 秒后自动取消。
处理请求:processRequest 是一个模拟的请求处理函数,它要么在指定的 duration 后完成请求处理,要么在 context 被取消时停止操作。
启动 goroutine:我们在主函数中启动了一个 goroutine 来处理请求,这个 goroutine 模拟了一个 5 秒的处理时间。
观察取消效果:由于 context 设置的超时时间是 3 秒,而请求的处理时间是 5 秒,3 秒后 context 会被自动取消,导致 processRequest 提前退出并输出取消原因。
实际用途
在实际应用中,context 经常用于以下场景:

API 请求的超时控制:确保 API 请求在设定的时间内完成,避免服务阻塞。
批量任务的并发处理:在父任务取消时,自动终止所有子任务,避免资源浪费。
数据库操作:结合 context 来设置数据库查询的超时时间,确保数据库操作不会无限制地阻塞。

go中的池化技术–工作池

即使 Goroutine 很轻量,但在某些情况下,仍需要限制同时运行的 Goroutine 数量:

资源限制:

当每个任务都涉及到大量的资源(如 CPU、内存、网络)时,过多的 Goroutine 会导致资源竞争,影响系统性能。
后端服务限制:如果你的程序需要调用外部服务或数据库,这些服务可能有并发连接的限制,需要控制并发量。
稳定性和可控性:使用工作池可以更好地管理任务的执行,提供任务队列、超时、重试等机制,提升系统的稳定性和可靠性。
工作池的作用:

限制并发数:

通过固定数量的工作 Goroutine,限制同时执行的任务数量。
任务调度:将任务放入队列,等待空闲的 Worker 处理。
结果收集:汇总任务执行的结果,方便后续处理。
错误处理:统一管理任务执行中的错误和异常。

在 Go 中,实现工作池通常有以下几种方式:

使用 Channel 实现简单的工作池:通过创建一定数量的 Worker Goroutine,从任务 Channel 中获取任务并执行。
使用同步包(如 sync.WaitGroup)协调任务执行:确保所有任务执行完毕后再进行后续操作。
使用第三方库:社区中有一些成熟的工作池库,如 antlabs/workerpool、gammazero/workerpool 等,提供了丰富的功能和更好的性能。
接下来,下面的代码示例来演示如何在 Go 中实现工作池。


// 任务类型
type Task struct {id int
}// 模拟处理任务的方法
func (t Task) Process() {fmt.Printf("Processing task with id: %d\n", t.id)time.Sleep(time.Second) // 模拟任务处理耗时
}// 工作池函数
func worker(id int, tasks <-chan Task, wg *sync.WaitGroup) {defer wg.Done()for task := range tasks {fmt.Printf("Worker %d started task %d\n", id, task.id)task.Process()fmt.Printf("Worker %d finished task %d\n", id, task.id)}
}func main() {const numWorkers = 3 // Worker 的数量const numTasks = 10  // 任务的数量tasks := make(chan Task, numTasks) // 创建一个任务通道var wg sync.WaitGroup// 启动 workerfor i := 1; i <= numWorkers; i++ {wg.Add(1)go worker(i, tasks, &wg)}// 发送任务到任务通道for i := 1; i <= numTasks; i++ {tasks <- Task{id: i}}close(tasks) // 关闭任务通道,不再发送新任务// 等待所有 worker 完成wg.Wait()fmt.Println("All tasks processed")
}

详细解释

  • Task 类型:

这是一个简单的任务类型,其中包含了任务的 ID。Process 方法模拟任务的处理过程。
Worker 函数:

worker 函数是每个 worker 执行的工作。它从 tasks 通道中接收任务,并调用 Process 方法处理任务。sync.WaitGroup 用于等待所有 worker 完成任务。

  • 启动 Worker:

numWorkers 决定了 worker 的数量。在这里,启动了 3 个 worker。
发送任务:

  • 将 numTasks 数量的任务发送到 tasks 通道,任务会被 worker 逐一处理。
    关闭通道:

任务发送完毕后,关闭任务通道,告诉所有 worker 没有更多的任务了。
等待所有 Worker 完成:

使用 sync.WaitGroup 确保主程序在所有 worker 完成任务后再退出。

Java 线程池通过 Executors 工具类来创建不同类型的线程池,Go 则通过自定义的 worker pool 和 goroutine 来管理并发

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • LEAP模型在能源环境发展、碳排放建模预测及分析中实践应用
  • 伏图芯片应力仿真功能介绍
  • 如何正确使用 Parallels Desktop 的快照功能
  • 分意图 Prompt 调试、后置判别改写、RLHF 缓解大模型生成可控性
  • Transformer模型:Position Embedding实现
  • GlusterFS-分布式文件系统:概念、案例
  • linux网络问题排查必须要懂的命令
  • 普元EOS-微前端的base基座介绍
  • 【0316】Postgres内核之VACUUM (FULL)运行 portal multi query (11)
  • python——requests
  • 解决Element-plus中Carousel(走马灯)图片无法正常加载的bug
  • react 路由创建与使用
  • WiFi的IP和电脑IP一样吗?怎么更改wifi的ip地址
  • 线段树+二分,CF 431E - Chemistry Experiment
  • Verilog刷题笔记60
  • #Java异常处理
  • 《Java8实战》-第四章读书笔记(引入流Stream)
  • Git学习与使用心得(1)—— 初始化
  • Java比较器对数组,集合排序
  • JS题目及答案整理
  • log4j2输出到kafka
  • Making An Indicator With Pure CSS
  • Node.js 新计划:使用 V8 snapshot 将启动速度提升 8 倍
  • PV统计优化设计
  • QQ浏览器x5内核的兼容性问题
  • React 快速上手 - 07 前端路由 react-router
  • SQLServer之索引简介
  • Vue源码解析(二)Vue的双向绑定讲解及实现
  • 大型网站性能监测、分析与优化常见问题QA
  • 分享自己折腾多时的一套 vue 组件 --we-vue
  • 关于List、List?、ListObject的区别
  • Unity3D - 异步加载游戏场景与异步加载游戏资源进度条 ...
  • 扩展资源服务器解决oauth2 性能瓶颈
  • 完善智慧办公建设,小熊U租获京东数千万元A+轮融资 ...
  • $().each和$.each的区别
  • (1)安装hadoop之虚拟机准备(配置IP与主机名)
  • (14)目标检测_SSD训练代码基于pytorch搭建代码
  • (2024,Vision-LSTM,ViL,xLSTM,ViT,ViM,双向扫描)xLSTM 作为通用视觉骨干
  • (Arcgis)Python编程批量将HDF5文件转换为TIFF格式并应用地理转换和投影信息
  • (java版)排序算法----【冒泡,选择,插入,希尔,快速排序,归并排序,基数排序】超详细~~
  • (第30天)二叉树阶段总结
  • (二)测试工具
  • (二)丶RabbitMQ的六大核心
  • (附表设计)不是我吹!超级全面的权限系统设计方案面世了
  • (剑指Offer)面试题41:和为s的连续正数序列
  • (六)DockerCompose安装与配置
  • (免费领源码)Java#Springboot#mysql农产品销售管理系统47627-计算机毕业设计项目选题推荐
  • (转载)利用webkit抓取动态网页和链接
  • .Mobi域名介绍
  • .net core 依赖注入的基本用发
  • .NET Micro Framework初体验
  • .net 写了一个支持重试、熔断和超时策略的 HttpClient 实例池
  • .NET开源快速、强大、免费的电子表格组件
  • .one4-V-XXXXXXXX勒索病毒数据怎么处理|数据解密恢复
  • .pings勒索病毒的威胁:如何应对.pings勒索病毒的突袭?