当前位置: 首页 > news >正文

Go语言设计与实现 学习笔记 第六章 并发编程(2)

SingleFlight

singleflight是Go语言扩展包中提供的另一种同步原语,它能够在一个服务中抑制对下游的多次重复请求,一个比较常见的使用场景是——我们使用Redis对数据库中的一些热门数据进行缓存并设置了超时时间,缓存超时的一瞬间可能有非常多的并行请求发现了Redis中已经不包含任何缓存,所以大量的流量会打到数据库上影响服务的延时和质量。
在这里插入图片描述
但是singleflight就能有效解决这个问题,它的主要作用是对同一个Key最终只进行一次函数调用,在这个上下文中就是只会进行一次数据库查询,查询的结果会写回Redis并同步给所有请求对应Key的用户:
在这里插入图片描述
这其实就减少了对下游的瞬时流量,获取下游资源非常耗时。访问缓存+数据库等场景下就非常适合使用singleflight对服务进行优化,上例中我们就可以使用singleflight减少下游的压力。它的使用也很简单,我们可以直接使用singleflight.Group{}创建一个新的Group结构体,然后通过调用Do方法对相同的请求进行抑制:

type service struct {requestGroup singleflight.Group
}func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {rows, err := // select * from tablesif err != nil {return nil, err}return rows, nil})if err != nil {return nil, err}return Response{rows: rows,}, nil
}

上述代码使用请求的哈希作为抑制相同请求的键,我们也可以选择一些比较关键或重要的字段作为Do方法的第一个参数避免对下游的瞬时大量请求。

结构体

Group结构体本身由一个互斥锁Mutex和一个从Keycall结构体指针的映射表组成,每一个call结构体都保存了当前这次调用对应的信息:

type Group struct {mu sync.Mutexm  map[string]*call
}type call struct {wg sync.WaitGroupval interface{}err errordups  intchans []chan<- Result
}

call结构体中的valerr字段都是在执行传入的函数时只被赋值一次,它们也只会在WaitGroup等待结束后被读取,而dupschans字段分别用于存储当前singleflight抑制的请求数量以及在结果返回时传递给调用方。

操作

singleflight包提供了两个用于抑制相同请求的方法,其中一个是同步等待的方法Do,另一个是返回Channel的DoChan,这两个方法在功能上没有太多区别,只是在接口的表现上稍有不同。

每次调用Do方法时都会获取互斥锁并尝试对Group持有的映射表进行懒加载,随后判断是否已经存在key对应的函数调用:
1.当不存在对应的call结构体时:
(1)初始化一个新的call结构体指针;

(2)增加WaitGroup持有的计数器;

(3)将call结构体指针添加到映射表;

(4)释放持有的互斥锁Mutex

(5)阻塞地调用doCall方法等待结果的返回;

2.当已经存在对应的call结构体时:
(1)增加dups计数器,它表示当前重复的调用次数;

(2)释放持有的互斥锁Mutex

(3)通过WaitGroup.Wait等待请求的返回;

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {g.mu.Lock()if g.m == nil {g.m = make(map[string]*call)}if c, ok := g.m[key]; ok {c.dups++g.mu.Unlock()c.wg.Wait()return c.val, c.err, true}c := new(call)c.wg.Add(1)g.m[key] = cg.mu.Unlock()g.doCall(c, key, fn)return c.val, c.err, c.dups > 0
}

因为valerr两个字段都只会在doCall方法中被赋值,所以当doCall方法和WaitGroup.Wait方法返回时,这两个值就会返回给Do函数的调用者。

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {c.val, c.err = fn()c.wg.Done()g.mu.Lock()delete(g.m, key)for _, ch := range c.chans {ch <- Result{c.val, c.err, c.dups > 0}}g.mu.Unlock()
}

doCall中会运行传入的函数fn,该函数的返回值会赋值给c.valc.err,函数执行结束后就会调用WaitGroup.Done方法通知当前函数已经执行完成给所有被抑制的请求,可以从call结构体中取出返回值并返回了;在这之后,doCall方法会获取持有的互斥锁并通过管道将信息同步给使用DoChan方法的调用方。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {ch := make(chan Result, 1)g.mu.Lock()if g.m == nil {g.m = make(map[string]*call)}if c, ok := g.m[key]; ok {c.dups++c.chans = append(c.chans, ch)g.mu.Unlock()return ch}c := &call{chans: []chan<- Result{ch}}c.wg.Add(1)g.m[key] = cg.mu.Unlock()go g.doCall(c, key, fn)return ch
}

DoChan方法和Do的区别是,它使用Goroutine异步执行doCall并向call持有的chans切片中追加chan Result变量,这也是它能够提供异步传值的原因。

小结

singleflight包提供的Group接口非常好用,当我们需要这种抑制对下游的相同请求时就可以通过这个方法来增加吞吐量和服务质量,在使用过程中我们需要注意以下问题:
1.DoDoChan一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并通过Channel接受函数的返回值;

2.Forget方法可以通知singleflight在持有的映射表中删除某个键,接下来对该键的调用就会直接执行方法而不是等待前面的函数返回;

3.一旦调用的函数返回了错误,所有在等待的Goroutine也都会接收到同样的错误;

总结

我们在这一节中介绍了Go语言标准库中提供的基本原语以及扩展包中的扩展原语,这些并发编程的原语能够帮助我们更好地利用Go语言的特性构建高吞吐量、低延时的服务,并解决由于并发带来的错误,到这里我们再重新回顾一下这一节介绍的内容:
1.Mutex互斥锁:
(1)如果互斥锁处于初始化状态,就会直接通过置位mutexLocked加锁;

(2)如果互斥锁处于mutexLocked且在普通模式下工作,就会进入自旋,执行30次PAUSE指令消耗CPU时间等待锁的释放;

(3)如果当前Goroutine等待锁的时间超过了1ms,互斥锁就会被切换到饥饿模式;

(4)互斥锁在正常情况下会通过runtime_SemacquireMutex方法将调用Lock的Goroutine切换至休眠状态,等待持有信号量的Goroutine唤醒当前协程;

(5)如果当前Goroutine是互斥锁上的最后一个等待的协程或者等待的时间小于1ms,当前Goroutine会将互斥锁切换回正常模式;

(6)如果互斥锁已经被解锁,那么调用Unlock会直接抛出异常;

(7)如果互斥锁处于饥饿模式,会直接将锁的所有权交给队列中的下一个等待者,等待者会负责设置mutexLocked标志位;

(8)如果互斥锁处于普通模式,并且没有Goroutine等待锁的释放或者已经有被唤醒的Goroutine获得了锁就会直接返回,否则通过runtime_Semrelease唤醒对应的Goroutine;

2.RWMutex读写互斥锁:
(1)readerSem——读写锁释放时通知由于获取读锁等待的Goroutine;

(2)writerSem——读锁释放时通知由于获取读写锁等待的Goroutine;

(3)w互斥锁——保证写操作之间的互斥;

(4)readerCount——统计当前进行读操作的协程数,触发写锁时会将其减少rwmutexMaxReaders以阻塞后续的读操作;

(5)readerWait——当前读写锁等待的进行读操作的协程数,在触发Lock后的每次RUnlock都会将其减一,当它归零时该Goroutine就会获得写锁;

(6)当读写锁被释放,即Unlock时,首先会通知所有读操作,然后才会释放持有的互斥锁,这样能够保证读操作不会被连续的写操作“饿死”;

3.WaitGroup等待一组Goroutine结束:
(1)Add不能和Wait方法在Goroutine中并发调用,一旦出现就会造成程序崩溃;

(2)WaitGroup必须在Wait方法返回后才能被重新使用;

(3)Done只是对Add方法的简单封装,我们可以向Add方法传入任意负数(需保证计数器非负)快速将计数器归零以唤醒其他等待的Goroutine;

(4)可以同时有多个Goroutine等待当前WaitGroup计数器的归零,这些Goroutine也会被“同时”唤醒;

4.Once程序运行期间仅执行一次:
(1)Do方法中传入的函数只会被执行一次,哪怕发生了panic

(2)两次调用Do方法传入不同的函数时只会执行第一次调用的函数;

5.Cond发生指定事件时唤醒:
(1)Wait方法在调用前一定要使用L.Lock持有该资源,否则会发生panic导致程序崩溃;

(2)Signal方法唤醒的Goroutine都是队列最前面、等待最久的Goroutine;

(3)Broadcast虽然是广播通知全部等待的Goroutine,但真正被唤醒时也是按照一定顺序的;

6.ErrGroup为一组Goroutine提供同步、错误传播以及上下文取消的功能:
(1)出现错误或等待结束后都会调用Contextcancel方法取消上下文;

(2)只有第一个出现的错误才会被返回,剩余的错误都会被直接抛弃;

7.Semaphore带权重的信号量:
(1)AcquireTryAcquire方法都可以用于获取资源,前者用于同步获取,会等待锁的释放,后者会在无法获取锁时直接返回;

(2)Release方法会按照FIFO的顺序唤醒可以被唤醒的Goroutine;

(3)如果一个Goroutine获取了较多的资源,由于Release的释放策略可能会等待比较长的时间;

8.SingleFlight用于抑制对下游的重复请求:
(1)DoDoChan一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并通过Channel接受函数的返回值;

(2)Forget方法可以通知singleflight在持有的映射表中删除某个键,接下来对该键的调用就会直接执行方法而不是等待前面的函数返回;

(3)一旦调用的函数返回了错误,所有在等待的Goroutine也都会接收到同样的错误;

这些同步原语的实现不仅要考虑API接口的易用、解决并发编程中可能遇到的线程竞争问题,还需要对尾延时进行优化避免某些Goroutine无法获取锁或者资源而被饿死,对同步原语的学习也能增强我们对并发编程的理解和认识,也是了解并发编程无法跨越的一个步骤。

6.3 定时器

对于任何一个正在运行的应用,如何获取准确的绝对时间都非常重要,但是在一个分布式系统中我们很难保证各个节点上绝对时间的一致性,哪怕通过NTP这种标准的对时协议也只能把时间的误差控制在毫秒级,所以相对时间在一个分布式系统中显得更为重要,我们在这一节就会介绍Go语言中的定时器以及它在并发编程中起什么样的作用。

绝对时间一定不会是完全准确的,它对于一个运行中的分布式系统其实没有太多指导意义,但是由于相对时间的计算不依赖于外部的系统,所以它的计算可以做的比较准确,我们在这一节中就会介绍Go语言中用于计算相对时间的定时器的实现原理。

结构

timer就是Golang定时器的内部表示,每一个timer其实都存储在堆中,tb就是用于存储当前定时器的桶,而i是当前定时器在堆中的索引,我们可以通过这两个变量找到当前定时器在堆中的位置:

type timer struct {tb *timersBucketi  intwhen   int64period int64f      func(interface{}, uintptr)arg    interface{}seq    uintptr
}

when表示当前定时器(Timer)被唤醒的时间,而period表示两次被唤醒的间隔,每当定时器被唤醒时都会调用f(args, now)函数并传入args和当前时间作为参数。然而这里的timer作为一个私有结构体其实只是定时器的运行时表示,time包对外暴露的定时器使用了如下所示的结构体:

type Timer struct {C <-chan Timer runtimeTimer
}

Timer定时器必须通过NewTimerAfterFunc函数进行创建,其中的runtimeTimer就是上面介绍的timer结构体,当定时器失效时,失效的时间就会被发送给当前定时器持有的Channel C,订阅管道中消息的Goroutine就会收到当前定时器失效的时间。

time包中,除了timerTimer两个分别用于表示运行时定时器和对外暴露的API之外,timersBucket这个用于存储定时器的结构体也非常重要,它会存储一个处理器上的全部定时器,不过如果当前机器的核数超过了64核,也就是机器上的处理器P的个数超过了64个,多个处理器上的定时器就可能存储在同一个桶中:

type timersBucket struct {lock         mutexgp           *gcreated      boolsleeping     boolrescheduling boolsleepUntil   int64waitnote     notet            []*timer
}

每一个timersBucket中的t就是用于存储定时器指针的切片,每一个运行的Go语言程序都会在内存中存储着64个桶,这些桶中都存储定时器的信息:
在这里插入图片描述
每一个桶持有的timer切片其实都是一个最小堆,这个最小堆会按照timer应该触发的时间对它们进行排序,最小堆最上面的定时器就是最近需要被唤醒的timer,我们会在下面介绍定时器的创建和触发过程。

工作原理

既然我们已经介绍了定时器的数据结构,接下来就可以开始分析它的常见操作以及工作原理了,在这一节中我们将介绍定时器的创建、触发、time.Sleep与定时器的关系、计时器Ticker的实现原理。

创建

time包对外提供了两种创建定时器的方法,第一种方法是NewTimer接口,这个接口会创建一个用于通知触发时间的Channel、调用startTimer方法、返回一个指向创建的Timer结构体的指针:

func NewTimer(d Duration) *Timer {c := make(chan Time, 1)t := &Timer{C: c,r: runtimeTimer{when: when(d),f:    sendTime,arg:  c,},}startTimer(&t.r)return t
}

另一个用于创建Timer的方法AfterFunc其实也提供了类似的结构,与NewTimer方法不同的是该方法没有创建一个用于通知触发时间的Channel,它只会在定时器到期时调用传入的方法:

func AfterFunc(d Duration, f func()) *Timer {t := &Timer{r: runtimeTimer{when: when(d),f:    goFunc,arg:  f,},}startTimer(&t.r)return t
}

startTimer基本上就是创建定时器的入口了,所有定时器的创建和重启基本上都需要调用该函数:

func startTimer(t *timer) {addtimer(t)
}func addtimer(t *timer) {tb := t.assignBucket()tb.addtimerLocked(t)
}

它会调用addTimer函数,这个函数总共做了两件事,首先通过assignBucket方法为当前定时器选择一个timerBucket桶,我们会根据当前Goroutine所在处理器P的id选择一个合适的桶,随后调用addTimerLocked方法将当前定时器加入桶中:

func (tb *timersBucket) addtimerLocked(t *timer) bool {// 将t的索引设为当前桶的大小,即将新定时器t放到切片的末尾t.i = len(tb.t)// 将t加到切片末尾tb.t = append(tb.t, t)// 调整新加入的定时器,使其维持堆的特性,如果调整失败,siftupTimer函数返回falseif !siftupTimer(tb.t, t.i) {return false}// 如果新加入的定时器是桶中第一个定时器if t.i == 0 {// 如果定时器桶处于休眠状态 && 休眠持续时间大于定时器的触发时间if tb.sleeping && tb.sleepUntil > t.when {// 将定时器桶的休眠状态设为false,意味着桶需要被唤醒tb.sleeping = false// 唤醒正在等待这个定时器桶的Goroutinenotewakeup(&tb.waitnote)}// 如果定时器桶正在进行重新调度if tb.rescheduling {// 停止重新调度tb.rescheduling = false// 将与定时器桶相关联的Goroutine设为准备运行状态goready(tb.gp, 0)}// 如果定时器桶尚未创建if !tb.created {// 标记定时器桶为已创建tb.created = true// 启动一个新的Goroutine处理这个定时器桶中的定时器go timerproc(tb)}}return true
}

addtimerLocked会先将最新加入的定时器加到队列末尾,随后调用siftupTimer将当前定时器与四叉树(或四叉堆)中的父节点进行比较,保证父节点的到期时间一定小于子节点:
在这里插入图片描述
这个四叉树只能保证父节点的到期时间小于子节点,这对于我们来说就足够了,因为我们只关心即将被触发的计数器,如果当前定时器是第一个被加入四叉树的定时器,我们还会通过go timerproc(tb)启动一个Goroutine用于处理当前树中的定时器,这也是处理定时器的核心方法。

触发

定时器的触发都是由timerproc中的一个双层for循环控制的,外层的for循环主要负责对当前Goroutine进行控制,它不仅会负责锁的获取和释放,还会在合适的时机触发当前Goroutine的休眠:

func timerproc(tb *timerBucket) {// 获取当前Goroutine的指针,用于跟踪和管理与该定时器桶管理的Goroutinetb.gp = getg()for {// 将当前定时器桶标记为不处于休眠状态tb.sleeping = false// 获取当前的纳秒时间now := nanotime()// 初始化时间差,该变量用于计算Goroutine的下次休眠时间delta := int64(-1)// inner loop// 如果时间差小于0,意味着没有到期的定时器if delta < 0 {// 将定时器桶标记为正在进行调度处理tb.rescheduling = true// 暂停当前Goroutine的运行goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)continue}// 将定时器桶的休眠状态设为truetb.sleeping = true// 设置定时器桶的应醒来时间tb.sleepUntil = now + delta// 清除与定时器相关的等待通知noteclear(&tb.waitnote)// 将Goroutine投入睡眠,直到waitnote被唤醒或指定的delta时间过去notesleepg(&tb.waitnote, delta)}
}

如果距离下一个定时器被唤醒的时间小于0,当前timerproc就会将rescheduling标记设置成true并立刻陷入睡眠,这其实也意味着当前timerproc中不包含任何待处理的定时器,当我们再向该timerBucket加入定时器时就会重新唤醒timerproc Goroutine。

在其他情况下,也就是下一次计数器的响应时间是now + delta时,timerproc中的外层循环会通过notesleepg将当前Goroutine投入休眠。

func notesleepg(n *note, ns int64) bool {gp := getg()// 如果当前Goroutine是系统级Goroutine// g0是每个M(操作系统级线程)都有的特殊Goroutine,用于执行调度和系统级任务if gp == gp.m.g0 {// 直接跑错,g0不应该执行与具体任务相关的阻塞和睡眠,因为它负责调度和系统任务throw("notesleepg on g0")}// 创建与当前M相关联的信号量semacreate(gp.m)// 标记当前Goroutine进入一个系统调用,且会被阻塞// 这对调度器是一个信号,说明该Goroutine正在等待外部事件,因此不会占用CPUentersyscallblock()// 实际执行睡眠逻辑的函数ok := notesleep_interrnal(n, ns, nil, 0)// 退出系统调用状态,使调度器可以再次调度此Goroutineexitsyscall()return ok
}

该函数会先获取当前Goroutine并在当前“CPU”上创建一个信号量,随后在entersyscallblockexitsyscall之间执行系统调用让当前Goroutine陷入休眠并在ns纳秒后返回。

内部循环的主要作用就是触发已经到期的定时器,在这个内部循环中,我们会按以下流程对当前桶中的定时器进行处理:
1.如果桶中不包含任何定时器就会直接返回并陷入休眠等待定时器加入当前桶;

2.如果四叉树最上面的定时器还没有到期会通过notesleepg方法陷入休眠等待最近定时器的到期;

3.如果四叉树最上面的定时器已经到期:
(1)当定时器的period > 0(重复每隔period时间就触发一次)就会设置下一次触发定时器的时间并将当前定时器向下移动到对应位置;

(2)当定时器的period <= 0就会将当前定时器从四叉树中移除;

4.在每次循环的最后都会从定时器中取出定时器中的函数、参数、序列号,并调用函数触发该计时器;

        for {if len(tb.t) == 0 {delta = -1break}t := tb.t[0]delta = t.when - nowif delta > 0 {break}ok := trueif t.period > 0 {t.when += t.period * (1 + -delta/t.period)if !siftdownTimer(tb.t, 0) {ok = false}} else {last := len(tb.t) - 1// 如果还有其他定时器,重新调整堆if last > 0 {tb.t[0] = tb.t[last]tb.t[0].i = 0}tb.t[last] = niltb.t = tb.t[:last]if last > 0 {if !siftdownTimer(tb.t, 0) {ok = false}}t.i = -1 // mark as removed}f := t.farg := t.argseq := t.seqf(arg, seq)}

使用NewTimer创建的定时器,传入的函数是sendTime,它会将当前时间发送到定时器持有的Channel中,而使用AfterFunc创建的定时器,在内层循环中调用的函数是调用方传入的函数。

休眠

如果你使用过一段时间Go语言,你一定在项目中使用过time包中的Sleep方法让当前Goroutine陷入休眠以等待某些条件的完成或者触发一些定时任务,time.Sleep就是通过如下所示的timeSleep方法完成的:

func timeSleep(ns int64) {if ns <= 0 {return}gp := getg()// 获取当前Goroutine相关联的timert := gp.timerif t == nil {t = new(timer)gp.timer = t}*t = timer{}t.when = nanotime() + nst.f = goroutineReadyt.arg = gptb := t.assignBucket()lock(&tb.lock)// 如果将定时器加入桶失败if !tb.addtimerLocked(t) {unlock(&tb.lock)// 处理错误情况,可能是记日志或抛异常?badTimer()}// 进入休眠goparkunlock(&tb.lock, waitReasonSleep, traceEvGoSleep, 2)
}

timeSleep会创建一个新的timer结构体,在初始化的过程中我们会传入当前Goroutine应该被唤醒的时间以及唤醒时需要调用的函数goroutineReady,随后会调用goparkunlock将当前Goroutine陷入休眠状态,当定时器到期时就会调用goroutineReady方法唤醒当前Goroutine:

func goroutineReady(arg interface{}, seq uintptr) {goready(arg.(*g), 0)
}

time.Sleep方法其实只是创建了一个会在到期时唤醒当前Goroutine的定时器并通过goparkunlock将当前协程陷入休眠状态等待定时器触发的唤醒。

Ticker

除了只用于一次的定时器(Timer)之外,Go语言的time包中还提供了用于多次通知的Ticker计时器,计时器中包含了一个用于接受通知的Channel和一个定时器,这两个字段共同组成了用于连续多次触发事件的计时器:

type Ticker struct {C <-chan Time // The channel on which the ticks are delivered.r runtimeTimer
}

想要在Go语言中创建一个计时器只有两种方法,一种是使用NewTicker方法显式创建Ticker计时器指针,另一种可以直接通过Tick方法获取一个会定期发送消息的Channel:

func NewTicker(d Duration) *Ticker {if d <= 0 {panic(errors.New("non-positive interval for NewTicker")}c := make(chan Time, 1)t := &Ticker{C: c,r: runtimeTimer{when:   when(d),period: int64(d),f:      sendTime,arg:    c,},}startTimer(&t.r)return t
}func Tick(d Duration) <-chan Time {if d <= 0 {return nil}return NewTicker(d).C
}

Tick其实也只是对NewTicker的简单封装,从实现上我们就能看出来它其实就是调用了NewTicker获取了计时器并返回了计时器中的Channel,两个创建计时器的方法的实现都不复杂且容易理解,所以在这里也就不详细展开介绍了。

需要注意的是每一个NewTicker方法开启的计时器都需要在不使用时调用Stop进行关闭,如果不显式调用Stop方法,创建的计时器就没办法被垃圾回收,而通过Tick创建的计时器由于只对外提供了Channel,所以是一定没有办法关闭的,我们一定要谨慎使用这一接口创建计时器。

性能分析

定时器在内部使用四叉树的方式进行实现和存储,当我们在生产环境中使用定时器进行毫秒级别的计时时,在高并发场景下会有比较明显的性能问题,我们可以通过实验测试一下定时器在高并发时的性能,假设我们有以下代码:

func runTimers(count int) {durationCh := make(chan time.Duration, count)wg := sync.WaitGroup{}// 等待count个任务结束wg.Add(count)// 启动count个Goroutinefor i := 0; i < count; i++ {go func() {startedAt := time.Now()time.AfterFunc(10*time.Millisecond, func() {defer wg.Done()durationCh <- time.Since(startedAt)})}}wg.Wait()close(durationCh)durations := []time.Duration{}totalDuration := 0 * time.Millisecondfor duration := range durationCh {durations = append(durations, duration)totalDuration += duration}averageDuration := totalDuration / time.Duration(count)sort.Slice(durations, func(i, j int) bool {return duration[i] < duration[j]})fmt.Printf("run %v timers with average=%v, pct50=%v, pct99=%v\n", count, averageDuration, durations[count/2], durations[int(float64(count)*0.99)])
}

完整的性能测试代码可以在benchmark_timers.go中找到,需要注意的是:由于机器和性能的不同,多次运行测试可能会有不一样的结果。

这段代码开了N个Goroutine并在每个Goroutine中运行一个定时器,我们会在定时器到期时计算从开始计时到定时器到期所用的时间,并将该时间加入Channel用于之后的统计,在函数的最后我们会计算出N个Goroutine中定时器到期时间的平均数、50分位数和99分位数:
在这里插入图片描述
我们将上述代码输出的结果绘制成如下图所示的折线图,其中横轴是并行定时器的个数,纵轴表示定时器从开始到触发时间的差值,三个不同的线分别表示时间的平均值、50分位数、99分位数:
在这里插入图片描述
虽然测试的数据可能有一些误差,但是从图中我们也能得出一些跟定时器性能和现象有关的结论:
1.定时器触发的时间一定会晚于创建时传入的时间,假设定时器需要等待10ms触发,那它触发的时间一定是晚于10ms的;

2.当并发的定时器数量达到5000时,定时器的平均误差达到了~18%,99分位数上的误差达到了~26%

3.并发定时器的数量超过5000之后,定时器的误差就变得非常明显,不能有效、准确地完成计时任务;

这其实也是因为定时器从开始到触发的时间间隔非常短,当我们将计时的时间改到100ms时就会发现性能问题有比较明显的改善:
在这里插入图片描述
哪怕并行运行了10w个定时器,99分位数的误差也只有~12%,我们其实能够发现Go语言标准库中的定时器在计时时间较短并且并发较高时有着非常明显的问题,所以在一些性能非常敏感的基础服务中使用定时器一定要非常注意——它可能达不到我们预期的效果。

不过哪怕我们不主动使用定时器,而是使用context.WithDeadline这种方法,由于它的底层也会使用定时器实现,所以仍然会受到影响。

总结

Go语言的定时器在并发编程中起到了非常重要的作用,它能够为我们提供比较准确的相对时间,基于它的功能,标准库中还提供了计时器、休眠等接口能够帮助我们在Go语言程序中更好地处理过期和超时等问题。

标准库中的定时器在大多数情况下是能够正常工作且高效完成任务的,但是在遇到极端情况或性能敏感场景时,它可能没有办法胜任,而在10ms这个粒度下,作者在社区中也没有找到能够使用的定时器实现,一些使用时间轮算法的开源库也不能很好地完成这个任务。

6.4 Channel

这一节中的内容总共包含两个部分,我们会先介绍Channel的设计原理以及它在Go语言中的数据结构,接下来我们会分析常见的Channel操作,例如创建、发送、接收、关闭的实现原理,由于在Range和Select两节中我们提到了Channel在不同的控制结构中组合使用时的现象,,所以这一节还是会将重点放到Channel的常见操作上。

概述

作为Go语言中核心的数据结构和Goroutine之间的通信方式,Channel是支撑Go语言高性能并发编程模型的结构之一,我们首先需要了解Channel背后的设计原理以及它的底层数据结构。

设计原理

在Go语言中,一个最常见的也是经常被人提及的设计模式就是不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存,在很多主流的编程语言中,当我们想要并发执行一些代码时,我们往往会在多个线程之间共享变量,同时为了解决线程冲突的问题,我们又需要在读写这些变量时加锁。
在这里插入图片描述
Go语言对于并发编程的设计与上述这种共享内存的方式完全不同,虽然我们在Golang中也能使用共享内存加互斥锁来实现并发编程,但与此同时,Go语言也提供了一种不同的并发模型,也就是CSP,即通信顺序进程(Communicating sequential processes),Goroutine其实就是CSP中的实体,Channel就是用于传递信息的通道,使用CSP并发模型的Goroutine就会通过Channel来传递消息。
在这里插入图片描述
上图中的两个Goroutine,一个会负责向Channel中发送消息,另一个会负责从Channel中接收消息,它们两者并没有任何直接的关联,能够独立地工作和运行,但是间接地通过Channel完成了通信。

我们在这一节中不会展开介绍Go语言选择实现CSP模型的原因,也不会比较不同并发模型的优劣,而是会将重点放到与Channel本身更相关的内容上,有关并发模型和程序设计的其他问题,作者将在其他文章中展开介绍。

数据结构

虽然我们在使用Go语言时接触到的Channel类型都是类似chan int的,但是它们在Go语言中都是以hchan结构体的形式存在的,每当在Go语言中创建新的Channel时,实际上创建的都是一个如下的结构体:

type hchan struct {qcount   uintdataqsiz uintbuf      unsafe.Pointerelemsize uint16closed   uint32elemtype *_typesendx    uintrecvx    uintrecvq    waitqsendq    waitqlock mutex
}

hchan结构体中的字段qcountdataqsizbufsendxrecvx的主要作用就是构建底层的循环队列,其中qcount保存了当前Channel中的元素个数,dataqsiz表示Channel中的循环队列的长度,buf指向了一个长度为dataqsiz的数组,sendxrecvx负责标识当前Channel的发送和接收已经处理到了数组中的哪个位置。

除此之外,elemsizeelemtype分别表示了当前Channel能够收发的元素类型和大小,sendqrecvq的主要作用就是存储当前Channel由于缓冲区空间不足而阻塞的Goroutine列表:

type waitq struct {first *sudoglast  *sudog
}

sudog是一个运行时的结构体,它的主要作用是表示一个在等待列表中的Goroutine,其中存储着关于这一次阻塞的信息以及两个分别指向前后的sudog指针。

基本操作

当我们使用Channel时,能够执行的操作其实也就只有创建、发送、接收、关闭几种:

ch := make(chan int, 1)
ch <- 1
<-ch
close(ch)

我们在这一节中要介绍的就是这四种不同的Channel操作,在Go语言中的实现原理,包括它们的编译过程以及底层实际调用的方法和执行过程。

创建

首先我们先来了解一下Channel在Go语言中是如何创建的,就像我们在上面的代码中演示的,所有Go语言Channel的创建都是由make关键字完成的,我们在前面介绍数组和哈希表的创建时都介绍了使用make关键字初始化数据结构的过程,我们在这里也需要说一下如何使用make创建管道。

Golang中所有形如make(chan int, 10)在编译期间会先被转换成OMAKE类型的节点,随后的类型检查阶段在发现make的第一个参数是Channel类型时会将OMAKE类型节点转换成OMAKECHAN

func typecheck1(n *Node, top int) (res *Node) {switch n.Op {case OMAKE:// ...switch t.Etype {// make chan操作case TCHAN:// 初始化l,l用于存储可能的参数值l = nil// 如果还有参数if i < len(args) {// 获取该参数l = args[i]// 递增参数索引i++l = typecheck(l, ctxExpr)l = defaultlit(l, types.Types[TINT])if l.Type == nil {n.Type = nilreturn n}if !checkmake(t, "buffer", 1) {n.Type = nilreturn n}// 将节点n的左子节点设为l(即通道的缓冲区大小)n.Left = l} else {n.Left = nodintconst(0)}n.Op = OMAKECHAN}}

在这一阶段其实就会对传入的缓冲区大小进行检查,当然如果我们不向make中传递表示缓冲区大小的参数,那么就会设置一个默认值0,也就是当前Channel不存在缓冲区。

OMAKECHAN类型的节点最终都会在SSA中间代码生成阶段之前被转换成makechan或者makechan64的函数调用:

func walkexpr(n *Node, init *Nodes) *Node {switch n.Op {case OMAKECHAN:// 获取要创建的Channel的大小size := n.Left// 默认使用makechan64函数创建通道fnname := "makechan64"// 默认使用int64作为Channel大小的类型argtype := types.Types[TINT64]// 如果通道大小是类型是TIDEAL(它表示其大小没有具体限制) || // 大小类型的最大值小于uint类型的最大值if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {// 使用更合适的函数和参数类型fnname = "makechan"argtype = types.Types[TINT]}// 生成具体的函数调用节点n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))}
}

无论是makechan还是makechan64,它们都是Go语言的运行时方法,这两个方法的主要作用是根据传入的参数类型和缓冲区大小创建一个新的Channel结构,两者的主要区别是后者用于处理缓冲区大于32位的情况,我们可以直接来看makechan的实现:

func makechan(t *chantype, size int) *hchan {elem := t.elem// 计算缓冲区所需字节数,即元素大小elem.size * 元素数量sizemem, _ := math.MulUintptr(elem.size, uintptr(size))var c *hchanswitch {// 缓冲区大小为0时case mem == 0:// 只分配hchan结构的内存c = (*hchan)(mallocgc(hchanSize, nil, true))// 将缓冲区指向一个特定的地址c.buf = c.raceaddr()// 如果元素类型不包含指针case elem.kind&kindNoPointers != 0:// 为hchan和缓冲区分配一块连续内存c = (*hchan)(mallocgc(hchanSize+mem, nil, true))// 将缓冲区指向hchan结构后的位置c.buf = add(unsafe.Pointer(c), hchanSize)default:// 其他情况先为hchan结构分配内存c = new(hchan)// 再单独为缓冲区分配内存c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)return c
}

这里会根据Channel中收发元素的类型和Channel缓冲区的大小为当前的hchan结构体和用于缓冲的底层循环数组分配空间:
1.如果当前Channel中不存在缓冲区,那么就只会为hchan分配一段内存;

2.如果当前Channel中存储的类型不是指针类型,就会直接为当前的Channel和底层的数组分配一块连续的内存空间;

3.在默认情况下会单独为hchan和缓冲区分配内存;

在函数的最后会更新hchanelemsizeelemtypedataqsiz几个字段,从代码中我们可以看出Channel的初始化其实非常简单,比较关键的地方在于它会根据传入的缓冲区大小初始化循环数组,我们会在之后看到这个循环数组的使用方法。

发送

当我们想要向一个Channel发送数据时,就需要使用类似ch <- i的表达式,这个表达式会被编译器解析成OSEND节点,同样地在SSA中间代码的生成期间,这些OSEND节点也会被转换成chansend1的函数调用:

func walkexpr(n *Node, init *Nodes) *Node {switch n.Op {case OSEND:// n.Right是待发送的值的表达式节点n1 := n.Right// 将要发送的值类型转换为管道中的元素类型,第三个参数是这次转换的描述,用于错误信息或调试n1 = assignconv(n1, n.Left.Type.Elem(), "chan send")// 递归调用walkexpr处理n1节点,这是表达式树的遍历过程n1 = walkexpr(n1, init)// 创建一个新节点OADDR,表示取地址,我们把n1表示的值的地址取出来,准备传递给发送操作// Channel发送实际上是通过地址来传递值n1 = nod(OADDR, n1, nil)n = mkcall1(chanfn("chansend1", 2, n.Left.Type), nil, init, n.Left, n1)}
}

chansend1只是调用了chansend并传入Channel和需要发送的数据,在调用chansend时其实也传入了block=true,这就标识了当前的发送操作是一个阻塞的操作,chansend就是向Channel中发送数据时最终会调用的函数,这个函数负责了发送数据的全部逻辑:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {lock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel")}

在发送数据的逻辑执行前会先为当前Channel加锁,防止出现竞争条件的问题,如果当前Channel结构已经通过closed字段被标记成了关闭,那么在向该Channel发送数据时就会直接panic报出一个非常常见的错误"send on closed channel"并返回。

直接发送

如果目标Channel没有被关闭并且已经有处于读等待的Goroutine,那么chansend函数会通过dequeuerecvq中取出最先陷入等待的Goroutine并直接向它发送数据:

    if sg := c.recvq.dequeue(); sg != nil {send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}

我们可以从下图中简单了解一下如果Channel中存在等待消息的Goroutine时,发送消息的处理过程:
在这里插入图片描述
发送数据时调用的是send函数,这个函数的执行其实可以分成两个部分,第一部分是使用sendDirect函数将发送的消息直接拷贝到类似x = <-c的表达式中的变量x所在的内存地址上:

// ep指针参数指向要发送的内容
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {// sudog结构的elem字段在Channel接收操作中,表示将接收值存放到的目标地址,即上述的x变量的地址// 在Channel发送操作中,该字段指向要发送的数据// 如果接收方使用变量接收值if sg.elem != nil {sendDirect(c.elemtype, sg, ep)sg.elem = nil}gp := sg.gunlockf()gp.param = unsafe.Pointer(sg)goready(gp, skip+1)
}

随后的goready函数会将等待接收数据的Goroutine标记成Grunnable并把该协程放到发送方所在的处理器上等待执行,该处理器在下一次调度时就会立刻唤醒消息接收方所在的协程。
在这里插入图片描述
我们在这里简单总结一下执行过程,当我们向Channel发送消息且Channel中存在处于等待状态的Goroutine协程时,会执行以下过程:
1.调用sendDirect函数将发送的消息拷贝到接收方持有的目标内存地址上;

2.将接收方Goroutine的状态修改成Grunnable并更新发送方所在处理器P的runnext属性,当处理器P再次发生调度时就会优先执行runnext中的协程;

需要注意的是,每次遇到上述情况都会将recvq队列中的sudog结构体出队;除此之外,接收方Goroutine被调度的时机也十分有趣,通过阅读源代码我们其实可以看到在发送过程中其实只是将接收方的Goroutine放到了runnext中,实际上P并没有立刻执行该协程,作者使用以下代码来验证调度发生的时机:

package mainfunc main() { // breakpoint 1// 创建无缓冲管道ch := make(chan int)go func() {for i := range ch {println(i) // breakpoint 2}}()ch <- 1wait := make(chan int) // breakpoint 3<-wait
}

我们在上述代码的三个地方打了断点,当使用delve进行调试时其实就会发现执行的顺序是1->3->2,具体的调试过程作者就不详细介绍了,感兴趣的读者可以学习一下delve的使用并亲自试验一下;总而言之,在这种情况下,向管道发送数据的过程并不是阻塞的(因为有接收者,即不是接收者接收完发送者才返回)。

缓冲区

向Channel中发送数据时遇到的第二种情况就是创建的Channel包含缓冲区且Channel中的数据没有装满,这时就会执行下面这段代码:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callrpc uintptr) bool {// 省略以上直接发送的部分// 如果缓冲区中元素数量小于缓冲区大小if c.qcount < c.dataqsiz {// 获取缓冲区中下一个元素位置的指针,c.sendx是缓冲区索引,指示下一个发送的元素存放的位置qp := chanbuf(c, c.sendx)// 将要发送的数据复制到缓冲区typedmemmove(c.elemtype, qp, ep)// 递增缓冲区发送索引c.sendx++// 循环队列if c.sendx == c.dataqsiz {c.sendx = 0}// 增加缓冲区中元素数量c.qcount++unlock(&c.lock)return true}

在这里我们首先会使用chanbuf计算出下一个可以放置待处理变量的位置,然后通过typedmemmove将发送的消息拷贝到缓冲区中并增加sendx索引和qcount计数器,在函数的最后会释放持有的锁。
在这里插入图片描述
如果当前Channel的缓冲区未满时,向Channel发送数据时就会直接存储在Channel中的sendx索引所在的位置并将sendx索引加一,由于这里的buf其实是一个循环数组,所以在sendx的值等于dataqsiz的大小时就会重新回到数组开始的位置。

阻塞发送

最后要介绍的是向Channel发送但是遇到下游无法处理的“阻塞发送”了,当然如果传入的参数block=false,那么就会直接释放持有的锁并返回false表示这一次的发送不成功。

在常见的场景中,向Channel发送消息的操作基本上都是阻塞的,在这时就会执行下面的代码,我们简单梳理一下这段代码:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callrpc uintptr) bool {// ...if !block {unlock(&c.lock)return false}// 获取当前Goroutine(发送者)的指针gp := getg()// 获取一个sudog结构,用于在队列中表示当前Goroutinemysg := acquireSudog()mysg.releasetime = 0// 设置要发送的数据mysg.elem = ep// 设置下一个节点,此处为nilmysg.waitlink = nil// 关联sudog结构与当前Goroutinemysg.g = gpmysg.isSelect = false// 关联sudog结构与channelmysg.c = c// 关联当前Goroutine和sudog结构gp.waiting = mysg// 清空之前可能设置的参数gp.param = nil// 将sudog结构加入到发送队列中c.sendq.enqueue(mysg)// 阻塞当前Goroutine,并释放指定的锁goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)// 唤醒后的清理gp.waiting = nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true
}

1.调用getg获取发送操作时使用的Goroutine协程;

2.执行acquireSudog函数获取一个sudog结构体并设置这一次阻塞发送的相关信息,例如发送的Channel、是否在select控制结构中、发送数据所在的地址等;

3.将刚刚创建并初始化的sudog结构体加入sendq等待队列,并设置到当前Goroutine的waiting上,表示Goroutine正在等待该sudog准备就绪;

4.调用goparkunlock函数将当前的Goroutine更新成Gwaiting状态并解锁,该Goroutine可以被调用goready再次唤醒;

5.当前的Goroutine其实就会在这里陷入阻塞状态等待被调度器唤醒了;

6.如果被调度器唤醒就会执行一些收尾的工作,将一些属性置零并且释放sudog结构体;在最后,函数会返回true表示这一次发送的结束并继续运行当前Goroutine应该执行的逻辑。

小结

我们在这里简单梳理和总结一下使用ch <- i表达式向Channel发送数据时遇到的几种情况:
1.如果当前Channel的recvq上存在已经被阻塞的Goroutine,那么会直接将数据发送被阻塞的Goroutine并将其设置成下一个运行的协程;

2.如果Channel存在缓冲区且其中还有空闲的容量,我们就会直接将数据存储到当前缓冲区sendx所在的位置上;

3.如果都不满足上面的两种情况,就会创建一个sudog结构并加入Channel的sendq队列并更新到Goroutine的waiting字段上,同时当前的Goroutine就会陷入阻塞等待其他协程从Channel接收数据,一旦有其他的协程从Channel接收数据就会唤醒当前Goroutine;

发送数据的过程中包含几个会触发Goroutine调度的时机,首先是发送数据时发现Channel上存在等待接收数据的Goroutine,这会立刻设置处理器的runnext属性,但不会立刻触发调度,第二个时机是发送数据时没有等待的接收方且缓冲区已经满了,这时就会将自己加入Channel的sendq队列并立刻调用goparkunlock触发Goroutine的调度让出处理器的使用权。

接收

分析了Channel发送数据的过程之后,我们就可以继续介绍数据处理的另一端,也就是数据的接收了,我们在Go语言中其实有两种不同的方式去接收管道中的数据:

i <- ch
i, ok <- ch

这两种不同的方法经过编译器的处理都会变成ORECV类型的节点,但是后者会在类型检查阶段被转换成OAS2RECV节点,我们可以简单看一下这里的转换路线图:
在这里插入图片描述
虽然这两种不同的接收方式会被转换成chanrecv1chanrecv2两种不同函数的调用,但是这两个函数最终调用的还是chanrecv

chanrecv处理数据接收时总共可以分成五种不同的情况,当我们从一个空Channel中接收数据时会直接调用gopark让出当前Goroutine处理器的使用权。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// 如果通道为nilif c == nil {if !block {return}// 阻塞情况下,直接休眠gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)// 不可能被唤醒throw("unreachable")}lock(&c.lock)// 如果通道已被关闭 && 通道中元素数量为0if c.closed != 0 && c.qcount == 0 {unlock(&c.lock)if ep != nil {// 清空接收变量的值typedmemclr(c.elemtype, ep)}return true, false}

如果当前的Channel已被关闭且缓冲区中不存在任何数据,那么就会直接解锁当前Channel并清除ep指针的指向的数据。

直接接收

当Channel的sendq队列中包含处于等待状态的Goroutine时,我们就会直接取出队列头的Goroutine,这里处理的逻辑和发送时所差无几,只是发送数据时调用的是send函数,而这里是recv函数:

    if sg := c.sendq.dequeue(); sg != nil {recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}

recv函数的实现其实也与send非常相似,我们可以简单看一下这里执行的逻辑:

// 接收且发送队列非空时调用
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {// 如果通道是无缓冲的if c.dataqsiz == 0 {// 如果有接收变量if ep != nil {// 直接接收recvDirect(c.elemtype, sg, ep)}} else {// 获取c.recvx索引指示的缓冲区qp := chanbuf(c, c.recvx)// 如果有接收变量if ep != nil {// 将要接收的值存放到接收变量中typedmemmove(c.elemtype, ep, qp)}// 将sg.elem的值(sendq中等待的Goroutine要发送的值)存放到刚刚缓冲区中取出元素的位置// 注意,此时发送队列非空,说明缓冲区已满typedmemmove(c.elemtype, qp, sg.elem)// 增加接收索引c.recvx++// 环形队列if c.recvx == c.dataqsiz {c.recvx = 0}// 这里为什么要同步发送和接收的位置?// 因为本函数是直接从sendq中取出一个等待发送的Goroutine,这意味着缓冲区已满(sendx=recvx)// 我们刚从缓冲区中取出一个元素,然后需要把sendx和recvx都更新到下一个缓冲区的位置c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}// 等待队列中的元素已被放到缓冲区,可以清空了sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)// 将发送Goroutine标为就绪状态goready(gp, skip+1)
}

1.如果当前Channel缓冲区大小为0,就会调用recvDirect,这个函数会将sendq中Goroutine存储的elem数据拷贝到目标内存地址中;

2.如果当前Channel有缓冲区,就会通过typedmemmove将队列中的数据拷贝到接收方的内存地址中并将发送方的数据拷贝到队列中,这样我们可以释放一个阻塞的发送方Goroutine;

3.在最后会解锁Channel并调用goready函数将当前处理器的runnext设置成接收数据的Goroutine,随后<- ch会返回并执行后面的逻辑;
在这里插入图片描述
上图展示了Channel在缓冲区已经没有空间且sendq中存在等待的Goroutine时,使用<- ch发生的变化,sendq队列中的第一个sudog结构中的元素会替换sendx/recvx索引所在位置的元素,原有的元素会被拷贝到接收<- ch结果的内存空间上。

缓冲区

另一种接收数据时遇到的情况就是,Channel的缓冲区中已经包含了一些元素,在这时如果使用<- ch从Channel中接收元素,我们就会直接从缓冲区中recvx索引位置中取出数据进行处理:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// ...if c.qcount > 0 {qp := chanbuf(c, c.recvx)if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}

如果接收数据的内存地址不为空,那么就会直接使用typedmemmove将缓冲区中的数据拷贝到内存中,在这之后会清除队列中的数据并完成收尾工作。
在这里插入图片描述
收尾工作包括递增recvx索引,当发现索引超过当前队列的容量时,由于这是一个循环队列,所以就会将它归零;除此之外,这个函数还会减少qcount计数器并释放持有Channel的锁。

阻塞接收

当Channel的sendq队列中不存在等待的Goroutine且缓冲区中也不存在任何数据时,从管道中接收数据的操作在大多数时候会变成一个阻塞操作:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// ...if !block {unlock(&c.lock)return false, false}// 下面这坨代码会获取一个sudog结构,然后加入接收队列// 获取执行此接收操作的Goroutinegp := getg()// 获取一个sudog结构mysg := acquireSudog()// 设置mysg的各个字段mysg.releasetime = 0// 接收到的值要存放到ep中mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = false// 将sudog与当前Channel关联mysg.c = cgp.param = nil// 将sudog加入接收队列c.recvq.enqueue(mysg)// 阻塞当前Goroutinegoparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)// 处理唤醒后的情况gp.waiting = nil// gp.param为nil时说明Channel已被关闭,此时没有数据可接收closed := gp.param == nilgp.param = nilmysg.c = nil// 释放sudog结构releaseSudog(mysg)return true, !closed
}

这种阻塞的情况其实有一个例外,也就是与select语句结合使用时就可能会使用到非阻塞block=false的接收操作,这段代码在这时(作者在此处的“这时”指的应该是阻塞的情况下,如果非阻塞,根据以上代码,就会直接返回)就会获取一个sudog结构体设置到当前Goroutine的waiting上并将其入队到recvq中。

除此之外,当前代码片段还会调用goparkunlock函数立刻触发Goroutine的调度,将当前Goroutine的状态改成Gwaiting并让出处理器的使用权,在这时Goroutine就会处于休眠状态等待调度器的调度,重新执行时就会从gp.waiting = nil处继续运行下面的代码对数据进行清理。

小结

我们简单梳理一下从Channel中接收数据时的几种情况:
1.如果Channel是空的,那么就会直接调用gopark挂起当前的Goroutine;

2.如果Channel已经关闭且缓冲区没有任何数据,chanrecv函数就会直接返回;

3.如果Channel上的sendq队列中存在挂起的Goroutine(此时缓冲区已满),就会将recvx索引所在的数据拷贝到接收变量所在的内存空间上并将sendq队列中的Goroutine的数据拷贝到缓冲区中;

4.如果Channel的缓冲区中包含数据就会直接从recvx所在的索引上进行读取;

5.在其他情况下(没有数据可接收)会直接挂起当前Goroutine,将sudog结构加入recvq队列并更新Goroutine的waiting属性,最后陷入休眠等待调度器的唤醒;

在从管道中接收数据的过程中,其实会在两个时间点触发Goroutine的调度,首先空的Channel意味着永远接收不到消息,那么就会直接挂起当前Goroutine,第二个时间点是缓冲区中不存在数据,在这时也会直接挂起当前的Goroutine等待发送方发送数据。

关闭

用于关闭管道的close关键字也与上面提到的收发处理过程一样,它会在编译期间转换成OCLOSE节点,然后变成closechan的函数调用,如果Channel是一个空指针或者已经被关闭了,这两种情况都会直接panic,抛出异常:

func closechan(c *hchan) {if c == nil {panic(plainError("close of nil channel"))}lock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}

处理完了这些异常情况之后就可以开始执行关闭Channel的逻辑了,下面这段代码的主要工作就是将recvqsendq两个队列中的Goroutine信息加入到列表gList中,与此同时会清除所有sudog上未被处理的元素:

    c.closed = 1var glist gListfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}gp := sg.ggp.param = nilglist.push(gp)}for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilgp := sg.ggp.param = nilglist.push(gp)}unlock(&c.lock)for !glist.empty() {gp := glist.pop()gp.schedlink = 0goready(gp, 3)}
}

在函数执行的最后会为所有被阻塞的Goroutine调用goready函数重新对这些协程进行调度,在上面我们已经简单介绍过goready的实现,所以在这里就不展开详细介绍了。

总结

Channel作为Go语言中的基础数据结构,是Go语言能够提供强大并发能力的原因之一,我们在这一节其实只介绍了Channel的数据结构以及相关的基本操作,在后面的章节中会提到各种关键字是如何与Channel结合并发挥作用的,除此之外在谈到并发编程和协程调度等内容时,我们还是会重新提及Channel。

6.5 Goroutine

Go语言在并发编程方面有着非常强大的能力,这也离不开语言层面对并发编程的支持,我们会在Go语言中使用Goroutine并行执行任务并将Channel作为Goroutine之间的通信方式,虽然使用互斥锁和共享内存在Go语言中也可以完成Goroutine间的通信,但是使用Channel才是更推荐的做法——不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。
在这里插入图片描述
我们在这一节中不仅会介绍Go语言中的协程(Goroutine)的数据结构和实现原理,还会介绍调度器是如何对Goroutine进行调度的,其中包括调度的时机和过程。

概述

谈到Go语言的调度器,我们不得不提的就是操作系统、进程、线程这些概念,操作系统其实为我们提供的POSIX API中就包含对线程的相关操作,线程也是操作系统做调度时的最基本单元,线程和进程的实现在不同操作系统上也有所不同,但是在大多数的实现中线程都是进程的一个组件:
在这里插入图片描述
多个线程可以存在于同一个进程中并共享同一片内存空间,由于不需要创建新的虚拟内存空间,所以它们也不需要内存管理单元处理上下文的切换,线程之间的通信也正是基于共享的内存进行的,相比于重量级的进程,线程显得比较轻量,我们可以在一个进程中创建出多个线程。

虽然线程相对进程比较轻量,但是线程仍然会占用较多的资源且调度时也会造成比较大的额外开销,每个线程都会占用1M以上的内存空间,在对线程进行切换时不止会消耗较多的内存空间,对寄存器中的内容进行恢复还需要向操作系统申请或销毁对应的资源,每一次线程上下文的切换都需要消耗~1us左右的时间,但是Go调度器对Goroutine的上下文切换只需~0.2us,减少了80%的额外开销。除了减少上下文切换带来的开销,Golang的调度器还能更有效地利用CPU缓存。
在这里插入图片描述
Go语言的调度器其实就是通过使用数量合适的线程并在每一个线程上执行更多的工作来降低操作系统和硬件的负载。

数据结构

相信各位读者已经对Go语言调度相关的数据结构非常熟悉了,但是我们在这里还是要简单回顾一下其中的三个组成部分,线程M、协程G、处理器P:
在这里插入图片描述
1.M——表示操作系统的线程,它是被操作系统管理的线程,与POSIX中的标准线程非常类似;

2.G——表示Goroutine,每一个Goroutine都包含堆栈、指令指针、其他用于调度的重要信息;

3.P——表示调度的上下文,它可以被看做一个运行于线程M上的本地调度器;

我们会在这一节中分别介绍不同的组成部分,详细介绍它们的基本作用、数据结构、在运行期间可能处于的状态。

G

Go语言中并发的执行单元是Goroutine,它很像操作系统中的线程,但是占用了更小的内存空间并降低了Goroutine切换的开销。

Goroutine只存在于Go语言的运行时,它是Go语言在用户态为我们提供的“线程”,如果一个Goroutine由于IO操作而陷入阻塞,操作系统并不会对上下文进行切换,但是Go语言的调度器会将陷入阻塞的Goroutine“切换”下去等待系统调用结束并让出计算资源,作为一种粒度更细的资源调度单元,如果使用得当能够在高并发的场景下更高效地利用机器的CPU。

结构体

Goroutine在Go语言运行时使用一个名为g的私有结构体表示,这个私有结构体非常复杂,总共有40多个用于表示各种状态的成员变量,我们在这里也不能介绍全部的属性,而是会挑选其中的一部分重点进行介绍,我们可以在runtime2.go#L387-L450文件中查看g结构体的全部属性:

type g struct {m            *m // current m; offset known to arm liblinksched        gobufsyscallsp    uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gcsyscallpc    uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gcparam        unsafe.Pointer // passed parameter on wakeupatomicstatus uint32goid         int64schedlink    guintptrwaitsince    int64      // approx time when the g become blockedwaitreason   waitReason // if status==Gwaitingpreempt      bool       // preemption signal, duplicates stackguard0 = stackpreemptlockedm      muintptrwritebuf     []bytesigcode0     uintptrsigcode1     uintptrsigpc        uintptrgopc         uintptr // pc of go statement that created this goroutinestartpc      uintptr // pc of goroutine function// sudog structures this g is waiting on (that have a valid elem ptr); in lock orderwaiting      *sudog
}

为了减少无关的干扰项,我们在这里删除了跟堆栈以及追踪相关的字段,剩下的都是g结构体中比较重要的字段。

状态

g结构体的字段atomicstatus存储了当前Goroutine的状态,runtime2.go文件中定义了Goroutine全部可能存在的状态。除了几个已经不被使用的以及与GC相关的状态外,全部常见的状态都展示在这里:
在这里插入图片描述
在这里插入图片描述
上述状态中比较常见的是_Grunnable_Grunning_Gsyscall_Gwaiting四个状态,我们在这里也会重点介绍这几个状态,Goroutine中所有状态的迁移是一个非常复杂的过程,会触发Goroutine状态迁移的方法也非常多,在这里我们也没有办法介绍全部的迁移路线,我们会从中选择一些进行介绍:
在这里插入图片描述
虽然Goroutine定义在运行时中的状态非常多且复杂,但是我们可以将这些不同的状态聚合成最终的三种:等待中、可运行、运行中,在运行期间我们会在这三种不同的状态来回切换:
1.等待中:表示当前Goroutine等待某些条件满足后才会继续执行,例如当前Goroutine正在执行系统调用或者同步操作;

2.可运行:表示当前Goroutine在等待某个M来执行Goroutine的指令,如果当前程序中有非常多的Goroutine,每个Goroutine就可能会等待更多的时间;

3.运行中:表示当前Goroutine正在某个M上执行指令;

M

Go语言并发模型中的M表示操作系统线程,默认情况下调度器能够允许创建10000个线程,但是其中绝大多数线程都不会执行用户代码(可能陷入系统调用),最多只会有GOMAXPROCS个线程M能够正常运行。

所有Golang程序中的最大“可运行”线程数等于GOMAXPROCS变量的值;默认情况下,它会被设置成当前应用的核数,我们也可以使用runtime.GOMAXPROCS方法改变当前程序中最大的线程数。
在这里插入图片描述
在默认情况下,一个四核机器上会创建四个操作系统线程,每一个线程都是一个m结构体,我们也可通过runtime.GOMAXPROCS改变最大可运行线程的数量,我们可以使用runtime.GOMAXPROCS(3)将Go程序中的线程数改成3个。

大多数情况下,我们都会使用Go的默认设置,也就是#thread == #CPU,在这种情况下不会触发操作系统级别的线程调度和上下文切换(这句话有问题,如果只有Go进程运行的情况下才是如此,但肯定还会有其他进程抢占CPU,最起码还有操作系统进程),所有的调度都会发生在用户态,由Go语言调度器触发,能够减少非常多的额外开销。

结构体

操作系统线程在Go语言中就会使用私有结构体m来表示,这个结构体中也包含了几十个私有的字段,我们这里还是对其进行了简单的删减,感兴趣的读者可以查看runtime2.go#L452-L521了解更多的内容:

type m struct {g0   *g // goroutine with scheduling stackcurg *g // current running goroutine// ...
}

其中g0是持有调度堆栈的Goroutine,curg是在当前线程上运行的Goroutine,这也是作为操作系统线程唯一关心的两个Goroutine了。

P

Go语言调度器中的最后一个重要结构就是处理器P,其实就是线程需要的上下文环境,也是用于处理代码逻辑的处理器,通过处理器P的调度,每一个内核线程M都能够执行多个G,这样就能在G进行一些IO操作时及时对它们进行切换,提高CPU的利用率。

每一个Go语言程序中所有处理器的数量一定会等于GOMAXPROCS,这是因为调度器在启动时就会创建GOMAXPROCS个处理器P,这些处理器会绑定到不同的线程M上并为它们调度Goroutine。

结构体

处理器在Go语言运行时中同样会使用私有结构体p表示,作为调度器的内部实现,它包含的字段也非常多,我们在这里就简单展示一下结构体中的大致内容,感兴趣的读者可以查看runtime2.go#L523-L602。

type p struct {id          int32status      uint32 // one of pidle/prunning/...link        puintptrschedtick   uint32     // incremented on every scheduler callsyscalltick uint32     // incremented on every system callsysmontick  sysmontick // last tick observed by sysmonm           muintptr   // back-link to associated m (nil if idle)mcache      *mcacherunqhead uint32runqtail uint32runq     [256]guintptrrunnext  guintptrsudogcache []*sudogsudogbuf   [128]*sudog// ...
}

我们将结构体中GC以及用于追踪调试的字段全部删除以简化这里需要展示的属性,在上述字段中,status表示当前处理器的状态,runheadruntailrunqrunnext等字段表示处理器持有的运行队列,运行队列中就包含待执行的Goroutine列表。

状态

p结构体中的状态status会是以下五种状态之一,我们能在runtime2.go#L99-L147文件中找到处理器P的全部状态:
在这里插入图片描述
通过分析处理器P的这些状态,我们其实能够对处理器的工作过程有一些简单的理解,例如处理器在执行用户代码时会处于_Prunning状态,在当前线程执行IO操作时会陷入_Psyscall状态。

小结

我们在这一小节中简单介绍了Go语言调度器中常见的数据结构,包括线程M、处理器P、Goroutine G,它们在Go语言运行时中分别使用不同的私有结构体表示,我们在下面的小节中会深入介绍Go语言调度器的实现原理。

实现原理

这里会以Go语言中Goroutine的相关操作为入口,详细介绍Goroutine的不同状态、它是如何被创建和销毁的、调度器的启动过程。

创建Goroutine

想要启动一个新的Goroutine来执行任务时,我们需要使用Go语言中的go关键字,这个关键字会在编译期间通过以下方法stmtcall两个方法将该关键字转换成newproc函数调用,代码的路径和原理与defer关键字几乎完全相同,两者的区别也只是defer被转化为deferproc,而go被转换成newproc方法:

func (s *state) stmt(n *Node) {switch n.Op {// 如果是go语句case OGO:// n.Left应该是要并发执行的函数调用节点s.call(n.Left, callGo)}
}func (s *state) call(n *Node, k callKind) *ssa.Value {// ...// 如果是处理defer语句的情况if k == callDeferStack {// ...} else {switch {case k == callGo:// 创建一个函数调用的SSA值call = s.newValue1A(ssa.OpStaticCall, types.TypeMem, newproc, s.mem())default:}}// ...return ...
}

经过了Go语言的中间代码生成的过程,所有的go关键字都会被转换成newproc函数调用,我们向newproc中传入一个表示函数的指针funcval,在这个函数中我们还会获取当前调用newproc函数的Goroutine以及调用方的程序计数器PC,然后调用newproc1函数:

func newproc(siz int32, fn *funcval) {argp := add(unsafe.Pointer(&fn), sys.PtrSize)gp := getp()pc := getcallerpc()newproc1(fn, (*uint8)(argp), siz, gp, pc)
}

newproc1函数的主要作用就是创建一个运行传入参数fng结构体,在这个方法中我们也会拷贝当前方法的全部参数,argpnarg共同表示函数fn的入参,我们在该方法中其实也会直接将所有参数对应的内存空间整片地拷贝到新Goroutine的栈上。

// argp指向传递给fn的参数
// narg是参数的字节大小
// callergp是调用者的Goroutine指针
// callerpc是调用者的程序计数器位置
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {// 获取当前执行的Goroutine指针_g_ := getg()siz := narg// 将参数大小调整为8的倍数,用于内存对齐siz = (siz + 7) &^ 7// 获取当前处理器P_p_ := _g_.m.p.ptr()// 尝试从当前P的空闲列表中获取一个空闲的Goroutinenewg := gfget(_p_)// 如果找不到空闲Goroutine,就创建一个新的if newg == nil {// 分配一个Goroutine结构,并提供最小的栈空间newg = malg(_StackMin)// 将新Goroutine的状态从闲置改为死亡(即初始化状态)casgstatus(newg, _Gidle, _Gdead)// 将新Goroutine添加到全局列表中allgadd(newg)}// 计算栈所需总大小,包括寄存器、参数、最小帧大小totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize// 确保栈顶对齐// 如果totalSize是5,sys.SpAlign是4// -5的补码是11111011,3的补码是00000011,按位与的结果是00000011// 即3,而5加上3正好是4的下一个倍数,这样就可以栈顶对齐totalSize += -totalSize & (sys.SpAlign - 1)// 设置新Goroutine的栈顶指针sp := newg.statck.hi - totalSizespArg := sp// 如果有参数if narg > 0 {// 将参数memmove到新栈中memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))}// 清除newg.sched中的数据(不涉及堆指针的管理),确保新Goroutine的调度器信息被清空memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))// 设置新Goroutine的栈顶指针到调度器newg.sched.sp = sp// 保存栈顶位置的值,通常用于跟踪栈的初始位置newg.stktopsp = sp// 设置程序计数器到goexit函数的地址加上pc最小步长newg.sched.pc = funcPC(goexit) + sys.PCQuantum// 设置新Goroutine的调度器关联的Goroutine指针newg.sched.g = guintptr(unsafe.Pointer(newg))// 将fn设为新Goroutine开始执行的函数gostartcallfn(&newg.sched, fn)// 记录创建该Goroutine的程序计数器位置newg.gopc = callerpc// 设置新Goroutine执行的函数入口地址newg.startpc = fn.fn// newg如果是系统Goroutineif isSystemGoroutine(newg, false) {// 增加系统Goroutine的数量atomic.Xadd(&sched.ngsys, +1)}// 将Goroutine的状态从死亡改为可运行casgstatus(newg, _Gdead, _Grunnable)// 为新Goroutine分配一个idnewg.goid = int64(_p_.goidcache)// 增加Goroutine id,为下一个Goroutine做准备_p_.goidcache++// 将新Goroutine放到处理器P的运行队列中,P是本地处理器的抽象runqput(_p_, newg, true)// 如果有空闲的处理器P && 没有P在自旋 && 主调度器已经启动if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {// 直接唤醒一个空闲的P,加快新Goroutine的调度速度wakep()}
}

newproc1函数的执行过程其实可以分成以下的几个步骤:
1.获取当前Goroutine对应的处理器P并从它的列表中取出一个空闲的Goroutine,如果当前不存在空闲的Goroutine,就会通过malg方法重新分配一个g结构体并将它的状态从_Gidle转换成_Gdead

2.获取新创建Goroutine的堆栈并直接通过memmove将函数fn需要的参数全部拷贝到栈中;

3.初始化新Goroutine的栈指针、程序计数器、调用方程序计数器等属性;

4.将新Goroutine的状态从_Gdead切换成_Grunnable并设置Goroutine的标识符(goid);

5.runqput函数会将新的Goroutine添加到处理器P的结构体中;

6.如果符合条件,当前函数会通过wakep来添加一个新的p结构体来执行Goroutine;

获取结构体

在这个过程中我们会有两种不同的方法获取一个新的g结构体,一种情况是直接从当前Goroutine所在处理器的p.gFree列表或者调度器的sched.gFree列表中获取g结构体,另一种方式是通过malg方法生成一个新的g结构体并将当前结构体追加到全局的Goroutine列表allgs中。
在这里插入图片描述
gfget函数中包含两部分逻辑,当处理器结构体中的空闲Goroutine列表已经为空时就会将调度器持有的空闲Goroutine转移到当前处理器结构体的空闲Goroutine列表中,当调度器的空闲Goroutine数量充足时它会将当前处理器的空闲Goroutine数量“装载”到32个,随后gfget函数就会从当前处理器的非空gFree列表中获取空闲的Goroutine:

// 在newfunc1中,传来的p参数是执行go关键字的Goroutine的处理器结构
func gfget(_p_ *p) *g {
retry:// 如果当前处理器的空闲Goroutine列表为空 && 调度器的带栈或不带栈的空闲Goroutine列表有一个非空if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {// 对调度器的空闲Goroutine池加锁lock(&sched.gFree.lock)// 如果当前处理器的空闲Goroutine数量小于32for _p_.gFree.n < 32 {// 尝试从调度器的带栈的Goroutine空闲列表中弹出一个gp := sched.gFree.stack.pop()// 如果带栈的Goroutine空闲列表为空if gp == nil {// 从调度器的不带栈的Goroutine空闲列表中弹出一个gp = sched.gFree.noStack.pop()// 如果不带栈的Goroutine空闲列表也为空if gp == nil {// 跳出循环break}}// 减少调度器的空闲Goroutine数量sched.gFree.n--// 把刚获取的Goroutine加入当前处理器的空闲列表_p_.gFree.push(gp)// 增加当前处理器的空闲Goroutine数量_p_.gFree.n++}// 解锁调度器的空闲Goroutine池锁unlock(&sched.gFree.lock)// 跳转到retry标签,重新检查条件且可能继续获取Goroutinegoto retry}// 从当前处理器的空闲Goroutine列表中弹出一个gp := _p_.gFree.pop()// 如果没有空闲Goroutine了if gp == nil {return nil}// 减少当前处理器的空闲Goroutine数量_p_.gFree.n--// 如果获取的Goroutine的栈底地址是否为0(为0表示没有栈)if gp.stack.lo == 0 {// 为Goroutine分配一个固定大小的栈gp.stack = stackalloc(_FixedStack)// 设置Goroutine的栈保护,以便在栈即将耗尽时触发更多的栈空间分配gp.stackguard0 = gp.stack.lo + _StackGuard}return gp
}

当然调度器的gFree和当前处理器的gFree列表都为空时,我们就会调用malg方法初始化一个新的g结构体,如果申请的堆栈大小大于0,在这里我们就会通过stackalloc初始化一片栈空间,栈空间的大小一般是1KB:

func malg(stacksize int32) *g {newg := new(g)if stacksize > 0 {// 计算实际栈大小,_StackSystem是为栈预留的空间,用于存放系统信息// round2函数向上舍入到2的幂stacksize = round2(_StackSystem + stacksize)newg.stack = stackalloc(uint32(stacksize))// 设置保护页的起始地址,超过此界限会触发栈溢出保护newg.stackguard0 = newg.stack.lo + _StackGuard// 计算最大的uintptr值,防止栈溢出newg.stackguard1 = ^uintptr(0)}return newg
}

这就是newproc1获取g结构体的两种不同路径,通过malg获取的结构体会被存储到全局变量allgs中。

运行队列

新创建的Goroutine在大多数情况下都可以通过调用runqput函数将当前Goroutine添加到处理器P的运行队列上,该运行队列是一个使用数组构成的环形链表,其中最多能够存储256个指向Goroutine的指针,除了runq中能够存储待执行的Goroutine之外,runnext指针中也可以存储Goroutine,runnext指向的Goroutine会成为下一个被运行的Goroutine:

// runqput函数将一个Goroutine放到指定的一个处理器的运行队列中
func runqput(_p_ *p, gp *g, next bool) {// 如果next为true,此时会尝试把gp设置为下一个即将执行的Goroutineif next {retryNext:// 获取原计划下一个要执行的Goroutine的指针oldnext := _p_.runnext// 使用cas操作更新runnext,如果失败(原因可能是其他_p_.runnext被其他线程修改)则重新尝试if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {goto retryNext}// 如果原计划下一个要执行的Goroutine为空,则直接返回if oldnext == 0 {return}// 此处说明有下一个要执行的Goroutine,将其设为gp,以便后面将其放入运行队列gp = oldnext.ptr()}retry:// 使用原子操作加载当前处理器运行队列头部的索引h := atomic.LoadAcq(&_p_.runqhead)// 获取处理器运行队列尾部的索引t := _p_.runqtail// 如果运行队列未满if t-h < uint32(len(_p_.runq)) {// 将gp放在循环队列尾_p_.runq[t%uint32(len(_p_.runq))].set(gp)// 更新队列尾部索引为t+1atomic.StoreRel(&_p_.runqtail, t+1)return}// 运行到此处说明运行队列已满,此时调用runqputslow将一些Goroutine和gp加入全局调度器上if runqputslow(_p_, gp, h, t) {return}goto retry
}

1.当next=true时将Goroutine设置到处理器的runnext上作为下一个被当前处理器执行的Goroutine;

2.当next=false并且运行队列还有剩余空间时,将Goroutine加入处理器持有的本地运行队列;

3.当处理器的本地运行队列已经没有剩余空间时就会把本地队列中的一部分Goroutine和待加入的Goroutine通过runqputslow添加到调度器持有的全局运行队列上;
在这里插入图片描述
简单总结一下,Go语言中有两个运行队列,其中一个是处理器本地的运行队列,另一个是调度器持有的全局运行队列,只有在本地运行队列没有剩余空间时才会使用全局队列存储Goroutine。

Goroutine调度

在Go语言程序的运行期间,所有触发Goroutine调度的方式最终都会调用gopark函数让出当前处理器P的控制权,gopark函数中会更新当前处理器的状态并在处理器上设置该Goroutine的等待原因:

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {// 获取操作系统线程Mmp := acquirem()// 获取当前M正在执行的G并设置等待相关的参数gp := mp.curgmp.waitlock = lockmp.waitunlockf = unlockfgp.waitreason = reasonmp.waittraceev = traceEvmp.waittraceskip = traceskip// 释放当前M,使其不再与任何G关联,这样M可以去执行其他任务releasem(mp)// 调用park_m函数,执行实际挂起操作// mcall函数确保park_m函数在系统级M上执行mcall(park_m)
}

上述函数中调用的park_m函数会将当前Goroutine的状态从_Grunning切换至_Gwaiting并调用waitunlockf函数进行解锁,如果解锁失败就会将该Goroutine的状态切换回_Grunning并重新执行:

func park_m(gp *g) {// 获取当前执行的Goroutine_g_ := getg()// 将gp的状态用cas操作从_Grunning改为_Gwaitingcasgstatus(gp, _Grunning, _Gwaiting)// 将当前Goroutine与M的关联断开dropg()// 如果存在解锁函数if fn := _g_.m.waitunlockf; fn != nil {// 执行解锁函数,在挂起前释放Goroutine持有的锁或资源ok := fn(gp, _g_.m.waitlock)_g_.m.waitunlockf = nil_g_.m.waitlock = nil// 如果解锁失败if !ok {// 用cas操作将Goroutine的状态从_Gwaiting改为_Grunnablecasgstatus(gp, _Gwaiting, _Grunnable)// 使用execute函数立即重新调度Goroutine,且execute函数永不返回execute(gp, true) // Schedule it back, never returns.}}// 重新调度,找到其他Goroutine执行schedule()
}

在大多数情况下都会调用schedule触发一次Goroutine调度,这个函数的主要作用是从不同的地方查找待执行的Goroutine:

func schedule() {// 获取当前正在执行的Goroutine指针_g_ := getg()top:// 将要执行的Goroutinevar gp *g// 是否继承时间片var inheritTime boolif gp == nil {// 如果当前处理器的schedtick是61的倍数 && 调度器的运行队列不为空// 处理器的schedtick在每次调度Goroutine时会自增,此处使其有几率从全局运行队列中查找Goroutineif _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {lock(&sched.lock)// 从全局运行队列中获取一个Goroutine,需要加锁保护队列gp = globrunqget(_g_.m.p.ptr(), 1)unlock(&sched.lock)}}// 如果全局运行队列中没有Goroutineif gp == nil {// 从当前处理器P的运行队列中获取一个Goroutinegp, inheritTime = runqget(_g_.m.p.ptr())// 如果获取到了Goroutine && 处理器P状态是忙等状态// 理论上忙等状态不会有可运行的Goroutine,如果发生了,直接抛异常,此处是安全检查if gp != nil && _g_.m.spinning {throw("schedule: spinning with local work")}}// 如果当前处理器上也没有可运行的Goroutineif gp == nil {// 调用findrunnable,一直阻塞,直到有可运行的Goroutinegp, inheritTime = findrunnable() // blocks until work is available}// 执行找到的Goroutine,inheritTime表示是否继承找到的Goroutine的时间片剩余execute(gp, inheritTime)
}

1.为了保证公平,当全局运行队列中有待执行的Goroutine时,有一定几率会从全局的运行队列中查找对应的Goroutine;

2.从当前处理器本地的运行队列中查找待执行的Goroutine;

3.如果前两种方法都没有找到Goroutine,就会通过findrunnable进行查找,这个函数的实现相对比较复杂,它会尝试从其他处理器上取出一部分Goroutine,如果没有可执行的任务就会阻塞直到条件满足;findrunnable函数会再次从本地运行队列、全局运行队列、网络轮询器、其他处理器中获取待执行的任务,该方法一定会返回待执行的Goroutine,否则就会一直阻塞。

获取可以执行的任务之后就会调用execute函数执行该Goroutine,执行的过程中会先将其状态修改成_Grunning、与线程M建立起双向的关系并调用gogo触发调度。

func execute(gp *g, inheritTime bool) {_g_ := getg()// 使用cas操作将gp的状态从_Grunnable改为_Grunningcasgstatus(gp, _Grunnable, _Grunning)// 重置gp的等待时间,这通常用于调度器的统计和监控gp.waitsince = 0// 设置抢占标志为false,表明当前Goroutine不应被抢占gp.preempt = false// 设置栈保护边界,防止栈溢出gp.stackguard0 = gp.stack.lo + _StackGuard// 如果不继承gp的时间片剩余时间if !inheritTime {// 增加当前处理器P的调度器的调度计数_g_.m.p.ptr().schedtick++}// 更新当前处理器M正在执行的Goroutine_g_.m.curg = gp// 设置gp所属Mgp.n = _g_.m// 启动Goroutine的执行gogo(&gp.sched)
}

gogo在不同处理器架构上的实现都不相同,但是不同的实现其实也大同小异,下面是该函数在386架构上的实现:

// 声明一个名为runtime.gogo的函数,执行时不进行栈分割,函数帧大小为8字节,参数大小为4字节
TEXT runtime.gogo(SB), NOSPLIT, $8-4// 将参数buf(位于帧指针FP偏移0字节的位置)的值加载到寄存器BXMOVL  buf+0(FP), BX // gobuf// 从gobuf中加载Goroutine的地址到寄存器DXMOVL  gobuf_g(BX), DX// 加载Goroutine地址指向位置的值到寄存器CXMOVL  0(DX), CX // make sure g != nil// 调用get_tls将线程局部存储加载到CXget_tls(CX)// 将寄存器DX中存放的Goroutine的指针加载到TLS的g字段,用于追踪当前正在执行的GoroutineMOVL  DX, g(CX)// 将gobuf的Goroutine的栈指针加载到SP,用于恢复Goroutine的栈状态MOVL  gobuf_sp(BX), SP // restore SP// 将gobuf的返回地址加载到寄存器AXMOVL  gobuf_ret(BX), AX// 将gobuf中保存的上下文加载到寄存器DXMOVL  gobuf_ctxt(BX), DX// 清空gobuf的栈指针、返回地址、保存的上下文,以帮助垃圾收集MOVL  $0, gobuf_sp(BX) // clear to help garbage collectorMOVL  $0, gobuf_ret(BX)MOVL  $0, gobuf_ctxt(BX)// 加载gobuf的程序计数器pc到寄存器BXMOVL  gobuf_pc(BX), BX// 跳转到寄存器BX中保存的Goroutine的程序计数器,以恢复Goroutine的执行JMP   BX

这个函数会从gobuf中取出Goroutine指针、栈指针、返回值、上下文、程序计数器,并通过JMP指令跳转至Goroutine应该继续执行代码的位置。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Ubuntu/Linux 配置 locale
  • 【AI学习笔记】AIGC,AI绘画 ComfyUI+ComfyUI Manager安装
  • [服务器_1]rpc框架收集
  • 入行「游戏策划」,该从何处下手?
  • django外键表查询
  • 黑马程序员Python数据挖掘|1Jupyter Notebook的使用
  • JAVA学习-练习试用Java实现“数据流的中位数”
  • 金融科技初创企业建设指南
  • 解决方案:在autodl环境下安装torch被killed掉
  • 随笔十、音频扩展模块测试
  • ZW3D二次开发_UI_ZsCc::OptionRadios控件回调
  • windows C++ 并行编程-在 UWP 应用中使用 C++ AMP
  • 【Python报错已解决】ValueError: cannot reindex from a duplicate axis
  • python之os处理文件和目录的函数
  • 每日定期分享诗歌
  • 9月CHINA-PUB-OPENDAY技术沙龙——IPHONE
  • SegmentFault for Android 3.0 发布
  • Asm.js的简单介绍
  • Elasticsearch 参考指南(升级前重新索引)
  • JSONP原理
  • Mithril.js 入门介绍
  • MobX
  • PermissionScope Swift4 兼容问题
  • php中curl和soap方式请求服务超时问题
  • Puppeteer:浏览器控制器
  • React的组件模式
  • Redis中的lru算法实现
  • Spring思维导图,让Spring不再难懂(mvc篇)
  • webpack入门学习手记(二)
  • Webpack入门之遇到的那些坑,系列示例Demo
  • Web设计流程优化:网页效果图设计新思路
  • 大主子表关联的性能优化方法
  • 对象引论
  • 基于 Babel 的 npm 包最小化设置
  • 实现简单的正则表达式引擎
  • 数组大概知多少
  • 学习ES6 变量的解构赋值
  • 阿里云服务器如何修改远程端口?
  • ​ 无限可能性的探索:Amazon Lightsail轻量应用服务器引领数字化时代创新发展
  • ​渐进式Web应用PWA的未来
  • ​软考-高级-信息系统项目管理师教程 第四版【第19章-配置与变更管理-思维导图】​
  • ​水经微图Web1.5.0版即将上线
  • # 消息中间件 RocketMQ 高级功能和源码分析(七)
  • (2024,Vision-LSTM,ViL,xLSTM,ViT,ViM,双向扫描)xLSTM 作为通用视觉骨干
  • (35)远程识别(又称无人机识别)(二)
  • (BAT向)Java岗常问高频面试汇总:MyBatis 微服务 Spring 分布式 MySQL等(1)
  • (c语言+数据结构链表)项目:贪吃蛇
  • (zt)最盛行的警世狂言(爆笑)
  • (动手学习深度学习)第13章 计算机视觉---微调
  • (附源码)spring boot火车票售卖系统 毕业设计 211004
  • (六)Flink 窗口计算
  • (每日持续更新)jdk api之FileFilter基础、应用、实战
  • (三)Kafka离线安装 - ZooKeeper开机自启
  • (四)汇编语言——简单程序
  • (译)计算距离、方位和更多经纬度之间的点