一文了解 Go 语言 Sync 标准库
写在前面
Go 语言是一门在语言层面支持用户级线程的高级语言,因此并发同步在 Go 程序编写中尤其重要,其中 channel 虽然作为并发控制的高级抽象,但它的底层就是依赖于 sync 标准库中的 mutex 来实现的,因此了解 sync 标准库是每一个 Gopher 的必备技能之一。
笔者使用的 Go 版本是 1.18.1
sync.WaitGroup
sync.WaitGroup 使得多个并发执行的代码块在达到 WaitGroup 显式指定的同步条件后才得以继续执行Wait()
调用后的代码,即达到并发 goroutine 执行屏障的效果。
在以下代码中,我们希望达到多个 goroutine 异步执行完输出任务后,main goroutine 才退出的效果,此时程序执行完毕。转换为实例,就是使得程序输出 110,此处我们并不关心main()
中创建的两个 goroutine 之间的执行顺序。
1 | package main |
此时输出
1 | 0 |
与期望结果不符,程序运行完毕时只输出了 0,这是因为 goroutine 的创建和调度需要时间,在两个 goroutine 创建期间,main goroutine 已经输出了 0,导致 main 函数结束,程序执行完毕。我们可以使用 WaitGroup 来完成上述需求。
1 | package main |
此时输出 110
1 | 110 |
sync.WaitGroup
中记录了仍在并发执行的代码块的数量,Add()
相当于对这个数量执行 +1 操作,不难想到Done()
则为执行了Add(-1)
,事实确实如此。
1 | // Done decrements the WaitGroup counter by one. |
WaitGroup
在此之前,我们先明确设定状态为运行计数与等待计数,信号为信号计数。
WaitGroup 的结构体记录了运行计数,等待计数和信号计数三个 uint32 来对并发的 goroutine 进行不同目的的计数,在笔者使用版本中,其实是一个 uint64 与一个 uint32。这里兼容了 32 位系统,因为在32 位机器上无法实现对 64 位字段的原子操作(64 位字段相当于两个指令,无法同时完成)。在 32 位平台上,如果wg.state1
元素依然按照 64 位平台的顺序返回(waiter, counter, sema),那么wg.state1[0]
的内存地址是 32 位对齐的,不能保证一定是 64 位对齐的,就无法进行 64 位原子操作(后续需要对状态进行整体的原子更改atomic.AddUnit64
)。
1 | type WaitGroup struct { |
为了保证在 32 位系统上也能原子访问 64 位对齐的 64 位字,通过 state() 来消除高层上实现的差异,具体可以参考issue-6404。由于在 64 位机器上,8 字节是单个机器字的长度,内存地址对 8 取模可以判断该数据对象的内存地址是否为 64 位对齐,state()
对wg.state1
的内存地址对 8 进行取模来判断程序是允许在 64 位平台还是 32 位平台上,根据结果来返回信息
- 等于 0 返回:
wg.state1
(wg.state1[0]
) 和wg.state1[2]
的内存地址- 此时返回状态、信号
- 不等于 0(32 位环境)返回:
wg.state1[1]
和wg.state1[0]
的内存地址- 此时返回信号、状态
state 调整的前提:如果不能保证对 8 字节对齐,需要手动移位对齐,这里用了内存对齐的 padding
在 4 字节对齐的环境中,8 字节可能跨越了两个 cache line,不保证 64 位的原子操作。
在 32 位架构中,WaitGroup 在初始化的时候,分配内存地址的时候是随机的,所以 WaitGroup 结构体 state1 起始的位置不一定是 64 位对齐,可能会是:uintptr(unsafe.Pointer(&wg.state1))%8 = 4
,如果出现这样的情况,那么就需要用 state1 的第一个元素做 padding + 4,这样操作后两组就能对 8 字节进行对齐了,用 state1 的后两个元素合并成 uint64 来表示 statep,以下是一个小实验:
1 | import ( |
Wait()
Wait()
主要用于阻塞 g,直到 WaitGroup 的计数为 0。先获取访问计数值的指针 (state,sema),在自旋循环体中,通过检查计数来检查目前还没有达成同步条件的并行代码块的数量,并且在每次完成检查后增加一次等待计数。此处没有使用密集循环来构造自旋锁等待,是处于性能考虑:为了保证其他 goroutine 能够得到充分调度。如果每一次检查计数时没有达成同步条件,下次循环如果当前 goroutine 不主动让出 CPU,会导致 CPU 空转,降低性能。这里用了runtime_Semacquire(semap)
,如果等待计数被成功记录,则直接增加信号量,挂起当前 g,否则再进行一次循环获取最新的同步状态。
Wait() 可以在不同的 g 上执行,且调用 Wait() 的 g 数量也可能不唯一,因此需要等待计数。
1 | // Wait blocks until the WaitGroup counter is zero. |
Add()
Add()
不止是简单的将信号量增 delta,还需要考虑很多因素
- 内部运行计数不能为负
- Add 必须与 Wait 属于 happens before 关系
- 毕竟 Wait 是同步屏障,没有 Add,Wait 就没有了意义
- 通过信号量通知所有正在等待的 goroutine
先假设 statep 的高 32 位=1,代表有一个运行计数。当Add(-1)
时,statep 的高 32 位 + 负数的补码 32 个 1,会溢出 1 导致 statep 的高 32 位=0,即运行计数清零,Wait 操作达成同步条件
具体过程如下:
- 通过 state 获取状态指针 statep 和信号指针 semap,statep 的高 32 位为 counter,低 32 位为 waiter
- 调用
atomic.AddUint64()
将传入的 delta 左移四位加上 statep,即 counter+delta - counter 可能为负,所以用 int32 来存值,waiter 不可能为负,所以用 uint32 存值
- 经过一系列校验,counter 为负则 panic,w 不等于 0 且 delta>0 且 v 值为 delta,说明 add 在 wait 后调用,会 panic,因为 waitGroup 不允许 Wait 方法调用后还调用 add 方法
- v > 0 或 w != 0 时直接 return,此时不需要释放 waiter
- 到了*statep != state,状态只能是 waiter>0 且 counter==0,当 waiter>0 时,肯定不能 add,且 counter==0 时,wait 不会再自增 waiter,结果一定是一致的,否则触发 panic
- 将 statep 置为 0,释放所有 waiter
1 | // 已略去race检测相关代码 |
WaitGroup 的实现原理总结
WaitGroup 内部维护了 3 个 uint32(实际上是一个 uint64,一个 uint32),分别是状态和信号,状态包括了运行计数 counter 和等待计数 waiter,信号指信号计数 sema。运行计数代表了调用Add()
添加的 delta 值,等待计数代表了调用Wait()
陷入等待的 goroutine 的数量,信号量 sema 是 runtime 内部信号量的实现,用于挂起和唤醒 goroutine。在Add()
的时候增加运行计数,Wait()
的时候增加等待计数,如果运行计数不为 0,则将 goroutine 挂起,等到调用Done()
->Add(-1) 使等待计数为 0 时会唤醒所有挂起的 goroutine。
我觉得比较核心的地方在于3 个 uint32 中兼容 32 位机器和 64 位机器的实现。由于状态是 64 位的,需要进行 64 位原子操作更新,但是由于 32 位的环境只对 4 字节对齐,首字段可能不是对 8 字节对齐的,因此要用 state 进行兼容,如果不对 8 字节对齐,则将状态放在后面两位 uint32 中,前面一个 4 字节的作为 padding,存放信号计数 sema。如果对 8 字节对齐,状态直接放在前两位即可,这样就兼容了 32 位和 64 位对 64 位原子操作的支持。
sync.Pool
sync.Pool 也许应该叫 sync.Cache,简单来说就是为了避免频繁分配、回收内存给 GC 带来负担的 cache,pool 与连接池类似。sync.Pool 可以将暂时不用的对象缓存起来,等到下次需要的时候直接使用,也不用再次经过内存分配,复用对象的内存,减轻了 GC 的压力,提升系统性能。
如果没有 sync.Pool
多个 goroutine 都需同时创建一个对象时,如果 goroutine 数过多,会导致对象的创建数目递增,导致 GC 压力过大。形成’并发大->占用内存大->GC 缓慢->处理并发能力降低->并发更大’的恶性循环。解决此问题的关键思想就在于对象的复用,避免重复创建、销毁。
以下是一个简单的例子:
1 | package main |
输出结果如下
1 | creating a new person |
在以上代码中,init()
创建了一个 sync.Pool,实现的New()
方法为创建一个 person 对象,并打印一句话,main()
中调用了三次 Get() 和一次 Put。根据输出结果看来,如果在调用Get()
时,pool 中没有对象,那么就会调用New()
创建新的对象,否则会从 pool 中的对象获取。我们还可以看到,put 到 pool 中的对象属性依然是之前设定的,并没有被重置。
sync.Pool 广泛运用于各种场景,典型例子是 fmt 包中的 print:
sync.Pool 的底层实现
1 | type Pool struct { |
noCopy 代表这个结构体是禁止拷贝的,在使用 go vet 工具时生效。
local 是一个 poolLocal 数组(其实是切片local := make([]poolLocal, size)
)的指针,localSize 代表这个数组的大小,victim 也是一个 poolLocal 数组的指针。
New 函数在创建 pool 时设置,当 pool 中没有缓存对象时,会调用 New 方法生成一个新的对象。
在索引 poolLocal 时,P 的 id 对应[P]poolLocal 下标索引,这样在多个 goroutine 使用同一个 pool 时能减少竞争,提升了性能。在一轮 GC 到来时,victim 和 victimSize 会分别接管 local 和 localSize,victim 机制用于减少 GC 后冷启动导致的性能抖动,使得分配对象更加平滑
Victim Cache 是计算机架构里的一个概念,是 CPU 硬件处理缓存的一种技术,sync.Pool 引入它的目的在于降低 GC 压力的同时提高命中率。
poolLocal
这里得提到伪共享问题。伪共享问题,就是在多核 CPU 架构下,为了满足数据一致性维护一致性协议 MESI,频繁刷新同一 cache line 导致高速缓存并未起到应有的作用的问题。试想一下,两个独立线程要更新两个独立变量,但俩独立变量都在同一个 cache line 上,当前 cache line 是 share 状态。如果 core0 的 thread0 去更新 cache line,会导致 core1 中的 cache line 状态变为 Invalid,随后 thread1 去更新时必须通知 core0 将 cache line 刷回主存,然后它再从主从中 load 该 cache line 进高速缓存之后再进行修改,但是该修改又会使得 core0 的 cache line 失效,重复上述过程,导致高速缓存相当于没有一样,反而还因为频繁更新 cache 影响了性能。
这里 poolLocal 的字段 pad 就是用于防止伪共享问题,cache line 在 x86_64 体系下一般是 64 字节
1 | type poolLocal struct { |
poolLocal 数组的大小是程序中 P 的数量,Pool 的最大个数是runtime.GOMAXPROCS(0)
。
1 | // Local per-P Pool appendix. |
poolLocalInternal 中 private 代表缓存了一个元素,只能由相应的一个 P 存取。因为一个 P 同时只能执行一个 goroutine,因此不会有并发问题,使用时不需要加锁。
shared 则可以由任意的 P 访问,但是只有本地的 P 才能 pushHead 或 popHead,其他 P 可以 popTail。
poolChain
看看 poolChain 的实现,这是一个双端队列的实现
其中 poolDequeue 是 PoolQueue 的一个实现,实现为单生产者多消费者的固定大小的无锁 Ring 式队列,通过 atomic 实现,底层存储用数组,head,tail 标记。
生产者可以从 head 插入,tail 删除,而消费者只能从 tail 删除。headTail 变量通过位运算存储了 head 和 tail 的指针,分别指向队头与队尾。
poolChain 没有使用完整的 poolDequeue,而是封装了一层,这是因为它的大小是固定长度的,而 pool 则是不限制大小的。
1 | // file: /sync/poolqueue.go |
由此图可以看到 pool 的整体结构
获取一个对象-Get()
Get()
的过程清晰明了:
- 首先通过调用
p.pin()
将当前 goroutine 与 P 绑定,禁止被抢占,返回当前 P 对应的 poolLocal 以及 pid。 - 获取 local 的 private 赋给 x,并置 local 的 private 为 nil
- 判断 x 是否为空,若为空,则尝试从 local 的 shared 头部获取一个对象,赋值给 x。如果 x 仍然为空,会调用
getSlow()
从其他 P 的 shared 尾部偷取一个对象 - 调用
runtime_procUnpin()
解除非抢占。 - 如果到此时还没有获取到对象,调用设置的
New()
创建一个新对象。
1 | func (p *Pool) Get() any { |
pin()
1 | func (p *Pool) pin() (*poolLocal, int) { |
作用是将当前 goroutine 与 P 绑定在一起,禁止抢占,且返回对应 poolLocal 和 P 的 id。
如果 goroutine 被抢占,那么 g 的状态会从 running 变为 runnable,会被放回 P 的 localq 或 globalq,等待下一次调度。但当 goroutine 下次再次执行时,就不一定和现在的 P 结合了,因为之后会用到 pid,如果被抢占,可能接下来使用的 pid 与绑定的 pid 不是同一个。
绑定的逻辑主要在procPin()
中。它将当前 gorotuine 绑定的 m 上的 locks 字段 +1,即完成了绑定。调度器执行调度时,又是会抢占当前执行 goroutine 所绑定的 P,防止一个 goroutine 占用 CPU 过长时间。而判断一个 goroutine 能被被抢占的条件就是看 m.locks 是否为 0,若为 0,则可以被抢占。而在procPin()
中,m.locks+1,表示不能被抢占。
1 | //go:nosplit |
在p.pin()
中,获取到 p.localSize 和 p.local 后,如果当前 pid 小于 p.localSize,则直接获取 poolLocal 数组中 pid 索引处的位置,否则说明 Pool 还没有创建 poolLocal,调用p.pinSlow()
完成创建。
1 | func (p *Pool) pinSlow() (*poolLocal, int) { |
pinSlow()
在加锁的情况下进行重试,加全局锁创建一个 poolLocal。整体过程如下:
popHead()
1 | func (c *poolChain) popHead() (any, bool) { |
popHead()
只会被生产者调用。函数执行时先拿到头节点,如果不为空,则调用头节点的popHead()
。这俩popHead()
的实现不一致。poolDequeue 的popHead()
移除并返回 queue 的头节点,如果 queue 为空,会返回 false。此处 queue 中存储的对象就是 Pool 里缓存的对象。
1 | func (d *poolDequeue) popHead() (any, bool) { |
回到poolChain.popHead()
,如果获取成功则直接返回,否则继续尝试。
getSlow()
getSlow()
在 shared 没有获取到缓存对象的情况下,会尝试从其他 P 的 poolLocal 偷取
1 | func (p *Pool) getSlow(pid int) any { |
回到Get()
,如果实在偷不到,后面会通过New()
创建一个新的对象。
popTail()
popTail()
将 queue 尾部元素弹出,类似popHead()
1 | func (c *poolChain) popTail() (any, bool) { |
底层用的还是 poolDequeue 的popTail()
,与寻常实现大体类似,也是 CAS。因为要移除尾部元素,所以 tail 自增 1。
1 | func (d *poolDequeue) popTail() (any, bool) { |
存放一个对象-Put()
Put()
将对象添加到 Pool 中,主要过程如下:
- 绑定当前 goroutine 与 P,然后尝试将 x 赋值给 private
- 如果失败,则将其放入 local shared 的头部
1 | func (p *Pool) Put(x any) { |
pushHead()
如果头节点为空,则初始化一下,默认大小为 8。然后调用poolDequeue.pushHead()
将其 push 到队列中,如果失败,则说明队列已满,会创建一个两倍大小的 dequeue,然后再次调用poolDequeue.pushHead()
。由于前面 g 与 p 已经绑定了,所以不会有竞态条件,这里只需要一次重试就行了。
1 | const dequeueBits = 32 |
底层用的还是 poolDequeue 的pushHead()
,他将 val 添加到队列头部,如果队列满了会返回 false,走刚才创建新一个两倍大小队列的路,这个函数只能被一个生产者调用,因此不会有竞态条件。首先通过位运算判断队列是否已满,将 tail 加上当前 dequeue 内节点的数量,即 d.vals 的长度,再取低 31 位,看看它与 head 是否相等,相等的话就说明队列满了,如果满了直接返回 false。否则通过 head 找到即将填充的 slot 位置,去 head 指针的低 31 位,判断是否有另一个 goroutine 在 popTail 这个 slot,如果有则返回 false。这里是判断 typ 是否为空,因为 popTail 是先设置 val,再将 typ 设置为 nil 的。最后将 val 赋值给 slot,自增 head。
1 | func (d *poolDequeue) pushHead(val any) bool { |
在*(*any)(unsafe.Pointer(slot)) = val
中,由于 slot 是 eface 类型,先将 slot 转换为 interface{}类型,这样 val 就能以 interface{}类型赋值给 slot,使得 slot.typ 和 slot.val 指向其内存块,slot 的 typ, val 均不为空。
pack()
再来看看pack()
和unpack()
,它们的作用是打包和解包 head 和 tail 俩指针。实际上很简单,pack()
就是将 head 左移 32 位,或上 tail 与低 31 位全 1,返回整合成的 uint64。
1 | func (d *poolDequeue) pack(head, tail uint32) uint64 { |
unpack()
则将整合的 uint64 右移 32 位与上低 31 位全 1,得到 head,而 tail 则是低 32 位与上低 31 位全 1。
1 | func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) { |
GC
Pool 实际上也不能无限扩展,否则会因为对象占用内存过多导致 OOM。在 pool.go 的init()
中,注册了 GC 发生时如何清理 Pool 的函数。
1 | // sync/pool.go |
实际上调用的是pool.poolCleanup()
,主要是将 local 与 victim 进行交换,不至于让 GC 将所有的 Pool 都清空,有 victim 兜底,需要两个 GC 周期才会被释放。如果 sync.Pool 的获取、释放速度稳定,就不会有新的 Pool 对象进行分配,如果获取的速度下降,那么对象可能会在两个 GC 周期内被释放,而不是以前的一个 GC 周期。
1 | func poolCleanup() { |
下面模拟一下调用poolCleanup()
前后,oldPools,allPools 与 p.victim 的变化:
- 初始时 oldPools 与 allPools 都为 nil
- 第一次调用
Get()
,因为 p.local 为 nil,会通过pinSlow()
创建 p.local,将 p 放入 allPools,此时 allPools 长度为 1,oldPools 为 nil - 对象使用完毕,调用
Put()
放回对象 - 第一次 GC STW,allPools 中所有 p.local 赋值给 victim,并置为 nil。allPools 赋值给 oldPools,置为 nil。此时 oldPools 长度为 1,allPools 为 nil
- 第二次调用
Get()
,由于 p.local 为 nil,会尝试从 p.victim 中获取对象。 - 对象使用完毕,调用
Put()
放回对象。由于 p.local 为 nil,会重新创建 p.local,并放回对象,此时 allPools 长度为 1,oldPools 长度也为 1 - 第二次 GC STW,oldPools 中所有 p.victim 置为 nil,前一次 cache 在本次 GC 时被回收,allPools 中所有 p.local 将值赋值给 victim 并置为 nil。最后 allPools 为 nil,oldPools 长度为 1。
从以上可以看出,p.victim 的定位是次级缓存,在 GC 时将对象放到其中,下次 GC 来临前,如果有Get()
调用则从其中获取,直到再一次 GC 到来时回收。从 victim 中取出的对象并不放回 victim 中,一定程度上也减小了下一次 GC 的开销,使得原先一次 GC 的开销被拉长到两次,有一定程度的开销减小。
sync.Mutex
Mutex 是公平互斥锁
每个 g 去获取锁的时候都会尝试自旋几次,如果没有获取到则进入等待队列尾部(先入先出FIFO)。当持有锁的 g 释放锁时,位于等待队列头部的 g 会被唤醒,但是需要与后来 g 竞争,当然竞争不过,因为后来 g 运行在 cpu 上处于自旋状态,且后来 g 会有很多,而刚唤醒的 g 只有一个,只能被迫重新插回头部。当等待的 g 本次加锁等待时间超过 1ms 都没有获得锁时,它会将当前 Mutex 从正常模式切换为饥饿模式,Mutex 所有权会直接从释放锁的 g 上直接传给队头的 g,后来者不自旋也不会尝试获取锁,会直接进入等待队列尾部。
当发生以下两种情况时 Mutex 会从饥饿模式切换为正常模式
- 获取到锁的 g 刚来,等待时间小于 1ms
- 该 g 是等待队列中最后一个 g
饥饿模式下不再尝试自旋,所有 g 都要排队,严格先来后到,可以防止尾端延迟
互斥锁 state 的最低三位分别标识mutexLocked、mutexWoken和mutexStarving,剩余位置用于标识当前有多少个 g 在等待互斥锁释放
mutexLocked
— 表示互斥锁的锁定状态;mutexWoken
— 表示从正常模式被唤醒;mutexStarving
— 当前的互斥锁进入饥饿状态;waitersCount
— 当前互斥锁上等待的 Goroutine 个数;
1 | // A Mutex is a mutual exclusion lock. |
看看 lock 与 unlock。lock 会锁住 Mutex,如果这个锁在被使用,那么调用的 g 会被阻塞直到这个互斥锁被释放。当锁 state 为 0 时,会将 mutexLocked 位置置为 1,当 state 不是 0 时,会调用sync.Mutex.lockSlow()
尝试通过自旋等方式来等待锁的释放。
自旋是一种多线程同步机制,当前进程在进入自旋的过程中会一直保持对 CPU 的占用,持续检查某个条件是否为真,在多核 CPU 上,自旋可以避免 goroutine 的切换,某些情况下能对显著提升性能。g 在进入自旋的需要满足的条件如下
- 互斥锁只有在普通模式才能进入自旋
runtime.sync_runtime_canSpin()
返回 true- 在多 CPU 机器上
- 当前 g 为了获取该锁进入自旋次数少于 4 次
- GOMAXPROCS > 1,至少一个其他 P 在 running,且当前 p 本地 runq 为空
1 | func (m *Mutex) Lock() { |
unlock 的过程比 lock 的过程简单,fast path 通过去除 mutexLocked 标志位来快速解锁,如果失败,则进入 slow path。slow path 先判断是否已经被解锁。
1 | func (m *Mutex) Unlock() { |
Mutex 的实现原理总结
Mutex 底层是 CAS 实现的,内部维护了一个 int32 的状态 state,用于标识锁状态,以及一个 uint32 的信号量 sema 用于挂起和阻塞 goroutine。Mutex 分为正常模式和饥饿模式,不同模式下Lock()
和UnLock()
的对加锁、解锁处理方式不同。Mutex 中的 state 第 4 位及其高位用于存放等待计数,低三位的第一位表示锁状态,第二位表示从正常模式被唤醒,第三位表示进入饥饿模式。
Lock() 先通过 CAS 置 state 为 1,如果失败,则进入 slow path 处理:
- 先判断是否可以自旋,饥饿模式下无法自旋
- 如果是正常模式,且可以自旋(运行在多 CPU 机器上、当前 g 为了争取该锁进入自旋的次数少于 4、当前机器上至少有个正在运行的 P 且 runq 为空),尝试进行自旋准备:通知运行的 goroutine 不要唤醒其他挂起的 gorotuine,解锁时直接让当前 g 获取锁即可。然后调用
runtime_doSpin()
进入自旋,执行 30 次 PAUSE 指令占用 CPU,递增自旋次数,重新计算状态
- 如果是正常模式,且可以自旋(运行在多 CPU 机器上、当前 g 为了争取该锁进入自旋的次数少于 4、当前机器上至少有个正在运行的 P 且 runq 为空),尝试进行自旋准备:通知运行的 goroutine 不要唤醒其他挂起的 gorotuine,解锁时直接让当前 g 获取锁即可。然后调用
- 计算锁状态
- 使用 CAS 更新状态
- 成功获取锁:返回
- 判断等待时间是否为 0,如果是 0 则放在队尾,如果非 0 则放在头部,进入阻塞
- 唤醒
- 锁是否要进入饥饿状态:等待时间超过 1ms
- 重新获取锁状态
- 判断是否处于饥饿状态
- 是则可以直接获取锁:自减等待计数,设置状态获取锁,如果 starving 不为饥饿,或等待时间没有超过 1ms,或者只有一个 g 在等待队列中,满足任一条件则切换为正常状态
- 否:再次循环抢占
UnLock()
先 CAS 置锁状态最低位为 0,如果返回结果不为 0,进入 slow path:
也是分别对正常模式和饥饿模式两种进行分别处理,饥饿模式下将锁的所有权直接移交给等待队列头的 g,并让出时间片,以便于它可以立即开始运行。
正常模式下,通过 CAS 更新状态值,唤醒等待队列中的 waiter。当然,如果没有 waiter,或低三位标志位中有一个不为 0 说明有其他 g 在处理了,直接返回。
sync.RWMutex
读写互斥锁不限制并行读,但是读写、写读、写写操作无法并行执行
1 | type RWMutex struct { |
写锁使用sync.RWMutex.Lock
与UnLock
,读锁使用RLock
和RUnlock
Lock 与 UnLock
Lock 中,先获取内置的互斥锁,获取成功后,其余竞争者 g 会陷入自旋或阻塞。atomic.AddInt32
用于阻塞后续的读操作,如果仍有活跃的读操作 g,那么当前写操作 g 会调用 runtime.SemacquireMutex 进入休眠状态等待全部读锁持有者结束后释放 writerSem 信号量,将当前 g 唤醒
1 | // Lock locks rw for writing. |
写锁的释放过程与加锁过程相反
- atomic.AddInt32 将 readerCount 变为正数,释放读锁
- 通过 for 循环唤醒所有阻塞的读操作 g
- 释放写锁
获取写锁时先阻塞写锁获取,后阻塞读锁获取,释放写锁时,先释放读锁唤醒读操作,后释放写锁。这种策略能够保证读操作不会被连续的写操作饿死
1 | // Unlock unlocks rw for writing. It is a run-time error if rw is |
RLock 与 RUnlock
读锁的加锁方法不能用于递归读锁定,为不可重入锁,同时,Lock 调用会阻止新的读者获取锁。其中只是将 readerCount+1,如果返回了负数,说明其他 g 获得了写锁,当前 g 就会调用runtime_SemacquireMutex()
陷入休眠等待写锁释放。如果返回了正数,则代表 g 没有获取写锁,当前方法返回成功
1 | // RLock locks rw for reading. |
释放读锁的方法也很简单,atomic.AddInt32 减少 readerCount 正在读资源的数量,如果返回大于等于 0,则说明解锁成功,如果小于 0,说明有一个正在执行的写操作,会调用 rUnlockSlow 进入 slow path 处理。rUnlockSlow 会减少写操作等待的读操作数 readerWait 并在所有读操作释放后触发写操作的信号量 writeSem,当该信号量触发时,调度器会唤醒尝试获取写锁的 g
1 | // RUnlock undoes a single RLock call; |
与 Mutex
对于读操作而言主要是使用信号量限制,写操作则是使用互斥锁与信号量限制
- 获取写锁时
- 每次解锁读锁都会将 readerCount-1,归零时说明没有读锁获取
- 将 readerCount 减少 rwmutexMaxReaders 阻塞后续的读操作(将 readerCount 变为负数)
- 释放写锁时
- 先通知所有读操作
- 将 readerCount 置为正数,释放写锁互斥锁
RWMutex 在 Mutex 上提供了额外的细粒度控制,能在读操作远远多于写操作时提升性能。
sync.noCopy
sync.noCopy 是一个特殊的私有结构体,tools/go/analysis/passes/copylock 包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopy 或者实现了 Lock 和 Unlock 方法,如果包含该结构体或者实现了对应的方法就会报错
1 | $ go vet proc.go./prog.go:10:10: assignment copies lock value to yawg: sync.WaitGroup |
semaTable
semaTable 存储了可供 g 使用的信号量,是大小为 251 的数组。每一个元素存储了一个平衡树的根,节点是 sudog 类型,在使用时需要一个记录信号量数值的变量 sema,根据它的地址映射到数组中的某个位置,找到对应的节点就找到对应信号的等待队列。
channel 没有使用信号量,而是自己实现了一套排队逻辑
1 | type semaRoot struct { |
sync.Once
sync.once 文件内容很少,只有一个结构体与两个方法,其中 sync.Once 用于保证 go 程序运行期间的某段代码只执行一次,暴露出的 Do 方法用于执行给定的方法
在以下代码中,只会输出一次 only once
1 | func main() { |
输出结果如下:
1 | $ go run main.go |
Once 的结构体也很简单:
1 | type Once struct { |
看看 Do 方法的实现。在源代码的注释中说明了为什么不直接 CAS 设定值,if 方法中调用函数,而要使用这种方式,是因为Do 保证了当它返回时,f 函数已经完成,而直接 CAS 后成功执行不成功返回是无法这样保证的。对于 panic 而言,defer 保证了 panic 也会将 done 置为 1,因此即使 f 调用中 panic,依然算已经执行过。
1 | // Do 调用函数 f 当且仅当 Do 为这个 Once 的实例第一次被调用时 |
sync.Cond
Cond 可以让一组 goroutine 在满足特定条件时被唤醒。在以下代码中同时运行了 11 个 goroutine,其中 10 个 goroutine 通过sync.Cond.Wait()
等待特定条件瞒住,1 个 goroutine 通过sync.Cond.Broadcast()
唤醒所有陷入等待的 goroutine。
1 | package main |
运行结果如下:
1 | listen |
结构体 sync.Cond 中包含了四个字段,最主要的还是 notify。
1 | type Cond struct { |
notifyList 维护了一个 goroutine 链表,以及不同状态的 goroutine 索引
1 | type notifyList struct { |
在创建一个 Cond 时,必须传入一个 mutex 以关联这个 Cond,保证这个 Cond 的同步属性。
1 | func NewCond(l Locker) *Cond { |
sync.Cond.Wait()
Wait()
会使得当前 goroutine 陷入休眠,执行过程分为两个步骤:
- 调用
runtime_notifyListAdd()
将等待计数器 +1 并解锁 - 调用
runtime_notifyListWait()
等待其他 goroutine 的唤醒并加锁
1 | func (c *Cond) Wait() { |
notifyListWait()
则将当前 goroutine 封装为 sudog,追加到 goroutine 通知链表的末尾,然后挂起当前 goroutine。
1 | func notifyListWait(l *notifyList, t uint32) { |
Signal()
Signal()
会唤醒队列最前面的 goroutine,核心代码如下:
1 | func (c *Cond) Signal() { |
Broadcast()
Broadcast()
会唤醒所有满足条件的 goroutine,这个唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒。
1 | func (c *Cond) Broadcast() { |
在条件长时间无法满足时,与使用for {}
的忙等相比,sync.Cond 能够让出处理器的使用权,提高 CPU 的利用率。
sync.Map
Go 原生 map 不是线程安全的,在查找、赋值、遍历、删除的过程中都会检测写标志,一旦发现写标志置位(等于 1),则直接 panic。而 sync.Map 则是并发安全的,读取、插入、删除都保持着常数级的时间复杂度,且 sync.Map 的零值是有效的,是一个空 map。sync.Map 更适用于读多写少的场景,写多的场景中会导致 read map 缓存失效,需要加锁,导致冲突增多,而且因为未命中 read map 次数变多,导致 dirty map 提升为 read map,是一个 O(N) 的操作,会降低性能。
一般解决并发读写 map 的思路是加一把大锁,在读写的时候先进行加锁,或把一个 map 分成若干个小 map,对 key 进行哈希操作,只操作对应的小 map,前者锁粒度大,影响并发性能,而后者实现较为复杂,容易出错。
与原生 map 相比,sync.Map 仅遍历的方式有些不同。
1 | package main |
输出结果如下:
1 | 1 |
sync.Map 的底层实现
由四个字段组成,其中 mu 互斥锁用于保护 read 和 dirty 字段。
1 | type Map struct { |
真正存储key/value的是read与dirty,但是它们存储的方式是不一样的,前者用atomic.Value,后者单纯使用原生map,原因是read用的是无锁操作,需要保证load/store的原子性,而dirty map 的 load+store 操作是由 mu 互斥锁来保护的。
readOnly 是一个支持原子性的存储的只读数据结构,底层也是一个原生 map。其中 entry 包含了一个指针,指向 value。dirty 的 value 也是 entry 类型的,这里可以看出 read 和 dirty 各自维护了一套 key,key 指向的是同一个 value,只要修改了 entry,对 read 和 dirty 都是可见的。
1 | type readOnly struct { |
entry 的指针 p 共有三种状态:
- p == nil:说明该 key/value 已被删除,且 m.dirty == nil 或 m.dirty[k]指向该 key
- p == expunged,说该 key/value 已被删除,且 m.dirty 不为 nil,且 m.dirty 中没有这个 key
- p 指向一个正常值:表示实际 interface{}的地址,且被记录在 m.read.m[key]中,如果此时 m.dirty 也不为 nil,那么它也被记录在 m.dirty[key]中,二者指向同一个地址。
当删除 key 时,sync.Map 并不会真正地删除 key,而是通过 CAS 将 entry 的 p 设置为 nil,标记为被删除。如果之后创建 m.dirty,p 又会 CAS 设置为 expunged,且不会复制到 m.dirty 中。
如果 p 不为 expunged,和 entry 关联的 value 则可以被原子地更新,如果 p 为 expunged,那么只有在它初次被设置到 m.dirty 后才能被更新。
Store()
expunged 实际上是一个指向任意类型的指针,用于标记从 dirty map 中删除的 entry。
1 | var expunged = unsafe.Pointer(new(any)) |
Store 直接看代码即可,如果 key 在 read 中,会先调用 tryStore,使用 for 循环+CAS 尝试更新 entry,如果更新成功则直接返回。接下来要么 read 中没有这个 key,要么 key 被标记为已删除了,需要先加锁再操作。
- 先去 read 中 double check 下,如果存在 key,但 p 为 expunged,说明 m.dirty 不为 nil,且 m.dirty 不存在该 key。此时将 p 状态设置为 nil,将 key 插入 dirty map 中,更新对应 value
- 如果 read 中没有此 key,而 dirty 中有,直接更新对应 value
- 如果 read 和 dirty 都没有该 key,先看看 dirty 是否为空,为空就要创建一个 dirty,并从 read 中复制没被删除的元素。然后更新 amended 标记为 true,标识 dirty 中存在 read 没有的 key,将 key/value 写入 dirty map 中。更新对应的 value。
1 | func (m *Map) Store(key, value any) { |
Load()
流程比 Store 更简单。先从 read 中找,找到了直接调用entry.load()
,否则看看 amended,如果是 false,说明 dirty 为空,直接返回 nil 和 false,如果 emended 为 true,说明 dirty 中可能存在要找的 key。先上锁,然后 double check 从 read 找,还没找到就去 dirty 中找,不管 dirty 中找没找到,都得在 missed 记一下,在 dirty 被提升为 read 前都会走这条路。
1 | func (m *Map) Load(key any) (value any, ok bool) { |
missLocked 直接将 misses+1,标识有一次未命中,如果未命中的次数小于 m.dirty 长度,直接返回,否则将 m.dirty 提升为 read,并清空 dirty 和 misses 计数。这样之前一段时间加的 key 就会进到 read 中,提高 read 的命中率。
1 | func (m *Map) missLocked() { |
entry.load()
对 p 为 nil 和 p 为 expunged 的 entry 直接返回 nil 和 false,否则将其转为 interface{}返回。
1 | func (e *entry) load() (value any, ok bool) { |
Delete()
套路与Load()
和Store()
相似,从 read 中查是否有这个 key,如果有的话调用entry.delete()
,将 p 置为 nil。read 中没找到的话,如果 dirty 不为 nil,就去 dirty 中找,先上锁,再 double check,还没在 read 中找到就去 dirty 中找,然后执行删除操作,再调用 missLocked 看看是否要将 dirty 上升到 read,这里不管是否在 dirty 中找到都得标记一下 misses。如果找到并删除了,调用entry.delete()
,否则返回 nil 和 false。
1 | // Delete deletes the value for a key. |
entry.delete()
也是 CAS 操作,将 p 置为 nil,当判断 p 为 nil 或 expunged 时会直接返回 nil 和 false。
1 | func (e *entry) delete() (value any, ok bool) { |
如果 read 中找到了 key,仅仅是将 p 置为 nil 做一个标记,这样在仅有 dirty 中有这个 key 的时候才会直接删除这个 key。这样的目的在于在下次查找这个 key 时会命中 read,提升效率,如果只在 dirty 存在,read 就无法起到缓存的作用,会直接删除。key 本身是需要在 missLocked 前将 key 从 dirty 中删除,才能使其被垃圾回收。
LoadOrStore()
结合了 Load 和 Store 的功能,如果 map 存在该 key,就返回对应 value,否则将 value 设置给该 key。
1 | func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) { |
Range()
参数需要传入一个函数,Range()
在遍历时会将调用时刻所有key/value传给该函数,如果返回了false会停止遍历。由于会遍历所有key,是一个O(N)的操作,所以将dirty提升为read,将开销分摊开,提升了效率。
1 | func (m *Map) Range(f func(key, value any) bool) { |