写在前面

Go 语言是一门在语言层面支持用户级线程的高级语言,因此并发同步在 Go 程序编写中尤其重要,其中 channel 虽然作为并发控制的高级抽象,但它的底层就是依赖于 sync 标准库中的 mutex 来实现的,因此了解 sync 标准库是每一个 Gopher 的必备技能之一。

笔者使用的 Go 版本是 1.18.1

sync.WaitGroup

sync.WaitGroup 使得多个并发执行的代码块在达到 WaitGroup 显式指定的同步条件后才得以继续执行Wait()调用后的代码,即达到并发 goroutine 执行屏障的效果。

在以下代码中,我们希望达到多个 goroutine 异步执行完输出任务后,main goroutine 才退出的效果,此时程序执行完毕。转换为实例,就是使得程序输出 110,此处我们并不关心main()中创建的两个 goroutine 之间的执行顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import "fmt"

func main() {
go func() {
fmt.Print(1)
}()
go func() {
fmt.Print(1)
}()
fmt.Print(0)
}

此时输出

1
0

与期望结果不符,程序运行完毕时只输出了 0,这是因为 goroutine 的创建和调度需要时间,在两个 goroutine 创建期间,main goroutine 已经输出了 0,导致 main 函数结束,程序执行完毕。我们可以使用 WaitGroup 来完成上述需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"sync"
)

func main() {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
fmt.Print(1)
}()
go func() {
defer wg.Done()
fmt.Print(1)
}()
wg.Wait()
fmt.Print(0)
}

此时输出 110

1
110

sync.WaitGroup中记录了仍在并发执行的代码块的数量,Add()相当于对这个数量执行 +1 操作,不难想到Done()则为执行了Add(-1),事实确实如此。

1
2
3
4
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

WaitGroup

在此之前,我们先明确设定状态运行计数与等待计数信号信号计数

WaitGroup 的结构体记录了运行计数,等待计数和信号计数三个 uint32 来对并发的 goroutine 进行不同目的的计数,在笔者使用版本中,其实是一个 uint64 与一个 uint32。这里兼容了 32 位系统,因为在32 位机器上无法实现对 64 位字段的原子操作(64 位字段相当于两个指令,无法同时完成)。在 32 位平台上,如果wg.state1元素依然按照 64 位平台的顺序返回(waiter, counter, sema),那么wg.state1[0]的内存地址是 32 位对齐的,不能保证一定是 64 位对齐的,就无法进行 64 位原子操作(后续需要对状态进行整体的原子更改atomic.AddUnit64)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type WaitGroup struct {
// 表示 `WaitGroup` 是不可复制的,只能用指针传递,保证全局唯一
// noCopy是个空接口,占用0字节,因此内存对齐可以忽略此字段
noCopy noCopy

state1 uint64
state2 uint32
}

// state 返回指向存储在 wg.state 中的 state(运行计数和等待计数) 和 sema(信号计数) 字段的指针。
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
// 判断是否对8对齐,32位也可能满足这种情况,否则要手动padding
if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 这里对8字节对齐
// 前8bytes做uint64指针state,后4bytes做sema
return &wg.state1, &wg.state2
} else {
// 前4bytes做sema,后8bytes做uint64指针state
state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
}
}

为了保证在 32 位系统上也能原子访问 64 位对齐的 64 位字,通过 state() 来消除高层上实现的差异,具体可以参考issue-6404。由于在 64 位机器上,8 字节是单个机器字的长度,内存地址对 8 取模可以判断该数据对象的内存地址是否为 64 位对齐,state()wg.state1的内存地址对 8 进行取模来判断程序是允许在 64 位平台还是 32 位平台上,根据结果来返回信息

  • 等于 0 返回: wg.state1(wg.state1[0]) 和wg.state1[2]的内存地址
    • 此时返回状态、信号
  • 不等于 0(32 位环境)返回:wg.state1[1]wg.state1[0]的内存地址
    • 此时返回信号、状态

state 调整的前提:如果不能保证对 8 字节对齐,需要手动移位对齐,这里用了内存对齐的 padding

在 4 字节对齐的环境中,8 字节可能跨越了两个 cache line,不保证 64 位的原子操作。

img

在 32 位架构中,WaitGroup 在初始化的时候,分配内存地址的时候是随机的,所以 WaitGroup 结构体 state1 起始的位置不一定是 64 位对齐,可能会是:uintptr(unsafe.Pointer(&wg.state1))%8 = 4,如果出现这样的情况,那么就需要用 state1 的第一个元素做 padding + 4,这样操作后两组就能对 8 字节进行对齐了,用 state1 的后两个元素合并成 uint64 来表示 statep,以下是一个小实验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import (
"unsafe"
)

type a struct {
b byte
}

type w struct {
state1 uint64
state2 uint32
}

func main() {
b := a{}
println(unsafe.Sizeof(b), uintptr(unsafe.Pointer(&b)), uintptr(unsafe.Pointer(&b))%8 == 0)
wg := w{}
// 64环境下
// 1 824634031959 false
// 16 824634031968 true
// 16 824634031972 false
// 32环境下 如果是64位win,需要在cmd情况下 set GOARCH=386
// 1 285454255 false
// 12 285454260 false
// 12 285454264 true 经过第一个uint32的padding,后续的两位uint32对8字节对齐
println(unsafe.Sizeof(wg), uintptr(unsafe.Pointer(&wg.state1)), uintptr(unsafe.Pointer(&wg.state1))%8 == 0)
state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
println(unsafe.Sizeof(wg), uintptr(unsafe.Pointer(&state[1])), uintptr(unsafe.Pointer(&state[1]))%8 == 0)
}

Wait()

Wait()主要用于阻塞 g,直到 WaitGroup 的计数为 0。先获取访问计数值的指针 (state,sema),在自旋循环体中,通过检查计数来检查目前还没有达成同步条件的并行代码块的数量,并且在每次完成检查后增加一次等待计数。此处没有使用密集循环来构造自旋锁等待,是处于性能考虑:为了保证其他 goroutine 能够得到充分调度。如果每一次检查计数时没有达成同步条件,下次循环如果当前 goroutine 不主动让出 CPU,会导致 CPU 空转,降低性能。这里用了runtime_Semacquire(semap),如果等待计数被成功记录,则直接增加信号量,挂起当前 g,否则再进行一次循环获取最新的同步状态。

Wait() 可以在不同的 g 上执行,且调用 Wait() 的 g 数量也可能不唯一,因此需要等待计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Wait blocks until the WaitGroup counter is zero.
// race检测相关代码已略去
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
// 原子操作
state := atomic.LoadUint64(statep)
// 运行counter(state的高32位)
v := int32(state >> 32)
// counter为0时直接返回
if v == 0 {
return
}
// 增加等待计数
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 增加信号量并挂起当前g,使当前g让出cpu
// 信号量为0时唤醒
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}

Add()

Add()不止是简单的将信号量增 delta,还需要考虑很多因素

  • 内部运行计数不能为负
  • Add 必须与 Wait 属于 happens before 关系
    • 毕竟 Wait 是同步屏障,没有 Add,Wait 就没有了意义
  • 通过信号量通知所有正在等待的 goroutine

先假设 statep 的高 32 位=1,代表有一个运行计数。当Add(-1)时,statep 的高 32 位 + 负数的补码 32 个 1,会溢出 1 导致 statep 的高 32 位=0,即运行计数清零,Wait 操作达成同步条件

img

具体过程如下:

  1. 通过 state 获取状态指针 statep 和信号指针 semap,statep 的高 32 位为 counter,低 32 位为 waiter
  2. 调用atomic.AddUint64()将传入的 delta 左移四位加上 statep,即 counter+delta
  3. counter 可能为负,所以用 int32 来存值,waiter 不可能为负,所以用 uint32 存值
  4. 经过一系列校验,counter 为负则 panic,w 不等于 0 且 delta>0 且 v 值为 delta,说明 add 在 wait 后调用,会 panic,因为 waitGroup 不允许 Wait 方法调用后还调用 add 方法
  5. v > 0 或 w != 0 时直接 return,此时不需要释放 waiter
  6. 到了*statep != state,状态只能是 waiter>0 且 counter==0,当 waiter>0 时,肯定不能 add,且 counter==0 时,wait 不会再自增 waiter,结果一定是一致的,否则触发 panic
  7. 将 statep 置为 0,释放所有 waiter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 已略去race检测相关代码
func (wg *WaitGroup) Add(delta int) {
// 获取状态指针和信号指针
statep, semap := wg.state()
// 在运行计数上记录delta
// 高32bit是计数值v,所以把delta左移32,增加到计数上
state := atomic.AddUint64(statep, uint64(delta)<<32)
// 运行计数
v := int32(state >> 32)
// 等待计数(低32位)
w := uint32(state)
// 任务计数器不能为负数
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// 添加与等待同时调用(应该是happens before的关系)
// 已经执行了Wait,不容许再执行Add
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 运行计数>0或没有writer在等待,直接返回
if v > 0 || w == 0 {
return
}
// happens before,add和wait并发调用
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 唤醒所有等待的goroutine,并将等待计数清零
// 此时counter一定为0,waiter一定>0
*statep = 0
for ; w != 0; w-- {
// 执行一次释放一个,唤醒一个waiter
runtime_Semrelease(semap, false, 0)
}
}

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

WaitGroup 的实现原理总结

WaitGroup 内部维护了 3 个 uint32(实际上是一个 uint64,一个 uint32),分别是状态和信号,状态包括了运行计数 counter 和等待计数 waiter,信号指信号计数 sema。运行计数代表了调用Add()添加的 delta 值,等待计数代表了调用Wait()陷入等待的 goroutine 的数量,信号量 sema 是 runtime 内部信号量的实现,用于挂起和唤醒 goroutine。在Add()的时候增加运行计数,Wait()的时候增加等待计数,如果运行计数不为 0,则将 goroutine 挂起,等到调用Done()->Add(-1) 使等待计数为 0 时会唤醒所有挂起的 goroutine。

我觉得比较核心的地方在于3 个 uint32 中兼容 32 位机器和 64 位机器的实现。由于状态是 64 位的,需要进行 64 位原子操作更新,但是由于 32 位的环境只对 4 字节对齐,首字段可能不是对 8 字节对齐的,因此要用 state 进行兼容,如果不对 8 字节对齐,则将状态放在后面两位 uint32 中,前面一个 4 字节的作为 padding,存放信号计数 sema。如果对 8 字节对齐,状态直接放在前两位即可,这样就兼容了 32 位和 64 位对 64 位原子操作的支持。

sync.Pool

sync.Pool 也许应该叫 sync.Cache,简单来说就是为了避免频繁分配、回收内存给 GC 带来负担的 cache,pool 与连接池类似。sync.Pool 可以将暂时不用的对象缓存起来,等到下次需要的时候直接使用,也不用再次经过内存分配,复用对象的内存,减轻了 GC 的压力,提升系统性能。

如果没有 sync.Pool

多个 goroutine 都需同时创建一个对象时,如果 goroutine 数过多,会导致对象的创建数目递增,导致 GC 压力过大。形成’并发大->占用内存大->GC 缓慢->处理并发能力降低->并发更大’的恶性循环。解决此问题的关键思想就在于对象的复用,避免重复创建、销毁。

以下是一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"sync"
)

var pool *sync.Pool

type Person struct {
Name string
}

func init() {
pool = &sync.Pool{
New: func() interface{} {
fmt.Println("creating a new person")
return new(Person)
},
}
}

func main() {
person := pool.Get().(*Person)
fmt.Println("Get Pool Object:", person)
person.Name = "first"
pool.Put(person)

fmt.Println("Get Pool Object:", pool.Get().(*Person))
fmt.Println("Get Pool Object:", pool.Get().(*Person))
}

输出结果如下

1
2
3
4
5
creating a new person
Get Pool Object: &{}
Get Pool Object: &{first}
creating a new person
Get Pool Object: &{}

在以上代码中,init()创建了一个 sync.Pool,实现的New()方法为创建一个 person 对象,并打印一句话,main()中调用了三次 Get() 和一次 Put。根据输出结果看来,如果在调用Get()时,pool 中没有对象,那么就会调用New()创建新的对象,否则会从 pool 中的对象获取。我们还可以看到,put 到 pool 中的对象属性依然是之前设定的,并没有被重置。

sync.Pool 广泛运用于各种场景,典型例子是 fmt 包中的 print:

img

sync.Pool 的底层实现

1
2
3
4
5
6
7
8
9
10
11
12
type Pool struct {
// go1.7引入的一个静态检查机制,代表该对象不希望被复制,可以使用go vet工具检测到是否被复制
// 在使用时需要实现noCopy保证一个对象第一次使用后不会发生复制
noCopy noCopy
local unsafe.Pointer // 每个P的本地队列,实际类型为[P]poolLocal, 一个切片
localSize uintptr // 大小
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// 自定义创建对象回调函数,当pool中没有可用对象时会调用此函数
// 当没有可用对象时,如果不设置该函数,get()会返回nil
New func() any
}

noCopy 代表这个结构体是禁止拷贝的,在使用 go vet 工具时生效。

local 是一个 poolLocal 数组(其实是切片local := make([]poolLocal, size))的指针,localSize 代表这个数组的大小,victim 也是一个 poolLocal 数组的指针。

New 函数在创建 pool 时设置,当 pool 中没有缓存对象时,会调用 New 方法生成一个新的对象。

在索引 poolLocal 时,P 的 id 对应[P]poolLocal 下标索引,这样在多个 goroutine 使用同一个 pool 时能减少竞争,提升了性能。在一轮 GC 到来时,victim 和 victimSize 会分别接管 local 和 localSize,victim 机制用于减少 GC 后冷启动导致的性能抖动,使得分配对象更加平滑

Victim Cache 是计算机架构里的一个概念,是 CPU 硬件处理缓存的一种技术,sync.Pool 引入它的目的在于降低 GC 压力的同时提高命中率。

poolLocal

img

这里得提到伪共享问题。伪共享问题,就是在多核 CPU 架构下,为了满足数据一致性维护一致性协议 MESI,频繁刷新同一 cache line 导致高速缓存并未起到应有的作用的问题。试想一下,两个独立线程要更新两个独立变量,但俩独立变量都在同一个 cache line 上,当前 cache line 是 share 状态。如果 core0 的 thread0 去更新 cache line,会导致 core1 中的 cache line 状态变为 Invalid,随后 thread1 去更新时必须通知 core0 将 cache line 刷回主存,然后它再从主从中 load 该 cache line 进高速缓存之后再进行修改,但是该修改又会使得 core0 的 cache line 失效,重复上述过程,导致高速缓存相当于没有一样,反而还因为频繁更新 cache 影响了性能。

这里 poolLocal 的字段 pad 就是用于防止伪共享问题,cache line 在 x86_64 体系下一般是 64 字节

1
2
3
4
5
6
7
type poolLocal struct {
poolLocalInternal
// 将poolLocal补齐至缓存行的大小,防止false sharing(伪共享)
// 在多数平台上128 mod (cache line size) = 0可以防止伪共享
// 伪共享,仅占位用,防止在cache line上分配多个poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

poolLocal 数组的大小是程序中 P 的数量,Pool 的最大个数是runtime.GOMAXPROCS(0)

1
2
3
4
5
6
7
// Local per-P Pool appendix.
type poolLocalInternal struct {
// P的私有缓存区,使用时不需要加锁
private any // Can be used only by the respective P.
// 公共缓存区,本地P可用pushHead/popHead。其他的P只能popTail
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}

poolLocalInternal 中 private 代表缓存了一个元素,只能由相应的一个 P 存取。因为一个 P 同时只能执行一个 goroutine,因此不会有并发问题,使用时不需要加锁。

shared 则可以由任意的 P 访问,但是只有本地的 P 才能 pushHead 或 popHead,其他 P 可以 popTail。

poolChain

看看 poolChain 的实现,这是一个双端队列的实现

其中 poolDequeue 是 PoolQueue 的一个实现,实现为单生产者多消费者的固定大小的无锁 Ring 式队列,通过 atomic 实现,底层存储用数组,head,tail 标记。

生产者可以从 head 插入,tail 删除,而消费者只能从 tail 删除。headTail 变量通过位运算存储了 head 和 tail 的指针,分别指向队头与队尾。

poolChain 没有使用完整的 poolDequeue,而是封装了一层,这是因为它的大小是固定长度的,而 pool 则是不限制大小的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// file: /sync/poolqueue.go
type poolChain struct {
// 只有生产者会push,因此不需要加锁同步
head *poolChainElt

// 能被消费者使用,所以操作必须要有原子性
tail *poolChainElt
}

type poolChainElt struct {
poolDequeue
// next被producer写,consumer读,所以只会从nil变成non-nil
// prev被consumer写,producerr读,所以只会从non-nil变成nil
next, prev *poolChainElt
}

type poolDequeue struct {
// 包含了一个32位head指针和一个32位tail指针,都与len(vals) - 1取模过
// tail是队列中最老是数据,head指向下一个要填充的slot
// slots范围是[tail, head),由consumers持有
// 高32位为head,低32位为tail
headTail uint64
// vals是一个存储interface{}的环形队列,size必须是2的幂
// 如果slot为空,则vals[i].typ为空
// 一个slot在此宣告无效,那么tail就不指向它了,vals[i].typ为nil
// 由consumers设置为nil,由producer读
vals []eface
}

type eface struct {
typ, val unsafe.Pointer
}

由此图可以看到 pool 的整体结构

img

获取一个对象-Get()

Get()的过程清晰明了:

  1. 首先通过调用p.pin()将当前 goroutine 与 P 绑定,禁止被抢占,返回当前 P 对应的 poolLocal 以及 pid。
  2. 获取 local 的 private 赋给 x,并置 local 的 private 为 nil
  3. 判断 x 是否为空,若为空,则尝试从 local 的 shared 头部获取一个对象,赋值给 x。如果 x 仍然为空,会调用getSlow()从其他 P 的 shared 尾部偷取一个对象
  4. 调用runtime_procUnpin()解除非抢占。
  5. 如果到此时还没有获取到对象,调用设置的New()创建一个新对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (p *Pool) Get() any {
...
// 将当前goroutine绑定到当前p上
l, pid := p.pin()
// 优先从local的private中获取
x := l.private
l.private = nil
if x == nil {
// 如果local的private没有,尝试获取local shared的head
x, _ = l.shared.popHead()
// 如果还没有,则进入slow path
// 调用
if x == nil {
x = p.getSlow(pid)
}
}
// 解除抢占
runtime_procUnpin()
...
// 如果没有获取到,尝试使用New()创建一个新对象
if x == nil && p.New != nil {
x = p.New()
}
return x
}

pin()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
// 传入的i是数组的index
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}

作用是将当前 goroutine 与 P 绑定在一起,禁止抢占,且返回对应 poolLocal 和 P 的 id。

如果 goroutine 被抢占,那么 g 的状态会从 running 变为 runnable,会被放回 P 的 localq 或 globalq,等待下一次调度。但当 goroutine 下次再次执行时,就不一定和现在的 P 结合了,因为之后会用到 pid,如果被抢占,可能接下来使用的 pid 与绑定的 pid 不是同一个。

绑定的逻辑主要在procPin()中。它将当前 gorotuine 绑定的 m 上的 locks 字段 +1,即完成了绑定。调度器执行调度时,又是会抢占当前执行 goroutine 所绑定的 P,防止一个 goroutine 占用 CPU 过长时间。而判断一个 goroutine 能被被抢占的条件就是看 m.locks 是否为 0,若为 0,则可以被抢占。而在procPin()中,m.locks+1,表示不能被抢占。

1
2
3
4
5
6
7
8
//go:nosplit
func procPin() int {
_g_ := getg()
mp := _g_.m

mp.locks++
return int(mp.p.ptr().id)
}

p.pin()中,获取到 p.localSize 和 p.local 后,如果当前 pid 小于 p.localSize,则直接获取 poolLocal 数组中 pid 索引处的位置,否则说明 Pool 还没有创建 poolLocal,调用p.pinSlow()完成创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (p *Pool) pinSlow() (*poolLocal, int) {
// 解除绑定
// 避免上大锁造成阻塞,浪费资源
runtime_procUnpin()
// 加全局锁
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 重新绑定
pid := runtime_procPin()
// 已经加了全局锁,此时不需要再用原子操作
s := p.localSize
l := p.local
// 对pid重新检查,因为pinSlow途中可能已经被其他线程调用了
// 如果已经创建过了,那么直接返回即可
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 初始化时会将pool放到allPools中
if p.local == nil {
allPools = append(allPools, p)
}
// 当前P数量
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
// 回收旧的local
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}

pinSlow()在加锁的情况下进行重试,加全局锁创建一个 poolLocal。整体过程如下:

img

popHead()

1
2
3
4
5
6
7
8
9
10
11
12
func (c *poolChain) popHead() (any, bool) {
d := c.head
for d != nil {
if val, ok := d.popHead(); ok {
// 如果成功获得值,则返回
return val, ok
}
// 继续尝试获取缓存的对象
d = loadPoolChainElt(&d.prev)
}
return nil, false
}

popHead()只会被生产者调用。函数执行时先拿到头节点,如果不为空,则调用头节点的popHead()。这俩popHead()的实现不一致。poolDequeue 的popHead()移除并返回 queue 的头节点,如果 queue 为空,会返回 false。此处 queue 中存储的对象就是 Pool 里缓存的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (d *poolDequeue) popHead() (any, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// queue为空
if tail == head {
return nil, false
}
// 验证尾节点,并自减头节点指针,这个操作在读出slot的value之前执行。
// 此处是为了锁住head指针的位置,下一步CAS保证去除的必然是头节点
head--
ptrs2 := d.pack(head, tail)
// 典型CAS
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// 成功取出value
// 实际就是取head低n位的值
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*any)(unsafe.Pointer(slot))
// 获取到nil的话就是nil了
if val == dequeueNil(nil) {
val = nil
}
// 重置slot。和popTail不同,这里不会与pushHead产生竞态条件。
*slot = eface{}
return val, true
}

回到poolChain.popHead(),如果获取成功则直接返回,否则继续尝试。

getSlow()

getSlow()在 shared 没有获取到缓存对象的情况下,会尝试从其他 P 的 poolLocal 偷取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (p *Pool) getSlow(pid int) any {
// See the comment in pin regarding ordering of the loads.
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 尝试从其他P偷取对象
for i := 0; i < int(size); i++ {
// 从索引pid+1处开始投
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 在尝试从其他P偷取对象失败后,会尝试从victim cache中取对象
// 这样可以使得victim中的对象更容易被回收
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 清空victim,防止后来人再来这里找
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}

回到Get(),如果实在偷不到,后面会通过New()创建一个新的对象。

popTail()

popTail()将 queue 尾部元素弹出,类似popHead()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (c *poolChain) popTail() (any, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
// TODO: pop前先加载next,此处与一般的双向链表是相反的
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
// 队列为空,只有一个尾结点
return nil, false
}
// 双向链表尾节点的queue已经为空,看下一个节点
// 因为它为空,需要pop掉
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
storePoolChainElt(&d2.prev, nil)
}
// 防止下次popTail的时候会看到一个空的dequeue
d = d2
}
}

底层用的还是 poolDequeue 的popTail(),与寻常实现大体类似,也是 CAS。因为要移除尾部元素,所以 tail 自增 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (d *poolDequeue) popTail() (any, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
return nil, false
}
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
return val, true
}

img

存放一个对象-Put()

Put()将对象添加到 Pool 中,主要过程如下:

  1. 绑定当前 goroutine 与 P,然后尝试将 x 赋值给 private
  2. 如果失败,则将其放入 local shared 的头部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (p *Pool) Put(x any) {
if x == nil {
return
}
...
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
}

pushHead()

如果头节点为空,则初始化一下,默认大小为 8。然后调用poolDequeue.pushHead()将其 push 到队列中,如果失败,则说明队列已满,会创建一个两倍大小的 dequeue,然后再次调用poolDequeue.pushHead()。由于前面 g 与 p 已经绑定了,所以不会有竞态条件,这里只需要一次重试就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const dequeueBits = 32
const dequeueLimit = (1 << dequeueBits) / 4

func (c *poolChain) pushHead(val any) {
d := c.head
if d == nil {
// 初始化头节点,初始大小为8
const initSize = 8
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
// 将其push到队列中,如果成功则直接返回
if d.pushHead(val) {
return
}
// 当前dequeue满了,分配一个新的两倍大小的dequeue
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// dequeue最大限制为2^30
newSize = dequeueLimit
}

d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}

底层用的还是 poolDequeue 的pushHead(),他将 val 添加到队列头部,如果队列满了会返回 false,走刚才创建新一个两倍大小队列的路,这个函数只能被一个生产者调用,因此不会有竞态条件。首先通过位运算判断队列是否已满,将 tail 加上当前 dequeue 内节点的数量,即 d.vals 的长度,再取低 31 位,看看它与 head 是否相等,相等的话就说明队列满了,如果满了直接返回 false。否则通过 head 找到即将填充的 slot 位置,去 head 指针的低 31 位,判断是否有另一个 goroutine 在 popTail 这个 slot,如果有则返回 false。这里是判断 typ 是否为空,因为 popTail 是先设置 val,再将 typ 设置为 nil 的。最后将 val 赋值给 slot,自增 head。

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (d *poolDequeue) pushHead(val any) bool {
ptrs := atomic.LoadUint64(&d.headTail)
// 解包,高32位为head,低32位为tail
head, tail := d.unpack(ptrs)
// 判断队列是否已满
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// 队列满了
return false
}
// 找到即将填充的slot位置
slot := &d.vals[head&uint32(len(d.vals)-1)]
// 检查slot是否与popTail有冲突
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// 另一个g在popTail这个slot,说明这个队列仍然是满的
return false
}
if val == nil {
val = dequeueNil(nil)
}
// 将val赋值给slot
*(*any)(unsafe.Pointer(slot)) = val
// 自增head
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}

在*(*any)(unsafe.Pointer(slot)) = val中,由于 slot 是 eface 类型,先将 slot 转换为 interface{}类型,这样 val 就能以 interface{}类型赋值给 slot,使得 slot.typ 和 slot.val 指向其内存块,slot 的 typ, val 均不为空。

pack()

再来看看pack()unpack(),它们的作用是打包和解包 head 和 tail 俩指针。实际上很简单,pack()就是将 head 左移 32 位,或上 tail 与低 31 位全 1,返回整合成的 uint64。

1
2
3
4
5
func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}

unpack()则将整合的 uint64 右移 32 位与上低 31 位全 1,得到 head,而 tail 则是低 32 位与上低 31 位全 1。

1
2
3
4
5
6
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1<<dequeueBits - 1
head = uint32((ptrs >> dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
}

GC

Pool 实际上也不能无限扩展,否则会因为对象占用内存过多导致 OOM。在 pool.go 的init()中,注册了 GC 发生时如何清理 Pool 的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
// sync/pool.go
func init() {
runtime_registerPoolCleanup(poolCleanup)
}

// runtime/mgc.go
var poolcleanup func()

//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
// 利用编译器标志将sync包的清理注册到runtime
func sync_runtime_registerPoolCleanup(f func()) {
poolcleanup = f
}

实际上调用的是pool.poolCleanup(),主要是将 local 与 victim 进行交换,不至于让 GC 将所有的 Pool 都清空,有 victim 兜底,需要两个 GC 周期才会被释放。如果 sync.Pool 的获取、释放速度稳定,就不会有新的 Pool 对象进行分配,如果获取的速度下降,那么对象可能会在两个 GC 周期内被释放,而不是以前的一个 GC 周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func poolCleanup() {
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}

// Move primary cache to victim cache.
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}

// The pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches.
oldPools, allPools = allPools, nil
}

下面模拟一下调用poolCleanup()前后,oldPools,allPools 与 p.victim 的变化:

  1. 初始时 oldPools 与 allPools 都为 nil
  2. 第一次调用Get(),因为 p.local 为 nil,会通过pinSlow()创建 p.local,将 p 放入 allPools,此时 allPools 长度为 1,oldPools 为 nil
  3. 对象使用完毕,调用Put()放回对象
  4. 第一次 GC STW,allPools 中所有 p.local 赋值给 victim,并置为 nil。allPools 赋值给 oldPools,置为 nil。此时 oldPools 长度为 1,allPools 为 nil
  5. 第二次调用Get(),由于 p.local 为 nil,会尝试从 p.victim 中获取对象。
  6. 对象使用完毕,调用Put()放回对象。由于 p.local 为 nil,会重新创建 p.local,并放回对象,此时 allPools 长度为 1,oldPools 长度也为 1
  7. 第二次 GC STW,oldPools 中所有 p.victim 置为 nil,前一次 cache 在本次 GC 时被回收,allPools 中所有 p.local 将值赋值给 victim 并置为 nil。最后 allPools 为 nil,oldPools 长度为 1。

从以上可以看出,p.victim 的定位是次级缓存,在 GC 时将对象放到其中,下次 GC 来临前,如果有Get()调用则从其中获取,直到再一次 GC 到来时回收。从 victim 中取出的对象并不放回 victim 中,一定程度上也减小了下一次 GC 的开销,使得原先一次 GC 的开销被拉长到两次,有一定程度的开销减小。

sync.Mutex

Mutex 是公平互斥锁

每个 g 去获取锁的时候都会尝试自旋几次,如果没有获取到则进入等待队列尾部(先入先出FIFO)。当持有锁的 g 释放锁时,位于等待队列头部的 g 会被唤醒,但是需要与后来 g 竞争,当然竞争不过,因为后来 g 运行在 cpu 上处于自旋状态,且后来 g 会有很多,而刚唤醒的 g 只有一个,只能被迫重新插回头部。当等待的 g 本次加锁等待时间超过 1ms 都没有获得锁时,它会将当前 Mutex 从正常模式切换为饥饿模式,Mutex 所有权会直接从释放锁的 g 上直接传给队头的 g,后来者不自旋也不会尝试获取锁,会直接进入等待队列尾部。

当发生以下两种情况时 Mutex 会从饥饿模式切换为正常模式

  • 获取到锁的 g 刚来,等待时间小于 1ms
  • 该 g 是等待队列中最后一个 g

饥饿模式下不再尝试自旋,所有 g 都要排队,严格先来后到,可以防止尾端延迟

img

互斥锁 state 的最低三位分别标识mutexLockedmutexWokenmutexStarving,剩余位置用于标识当前有多少个 g 在等待互斥锁释放

  • mutexLocked — 表示互斥锁的锁定状态;
  • mutexWoken — 表示从正常模式被唤醒;
  • mutexStarving — 当前的互斥锁进入饥饿状态;
  • waitersCount — 当前互斥锁上等待的 Goroutine 个数;
1
2
3
4
5
6
7
8
9
10
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
// 状态
state int32
// 信号量
sema uint32
}

看看 lock 与 unlock。lock 会锁住 Mutex,如果这个锁在被使用,那么调用的 g 会被阻塞直到这个互斥锁被释放。当锁 state 为 0 时,会将 mutexLocked 位置置为 1,当 state 不是 0 时,会调用sync.Mutex.lockSlow()尝试通过自旋等方式来等待锁的释放。

自旋是一种多线程同步机制,当前进程在进入自旋的过程中会一直保持对 CPU 的占用,持续检查某个条件是否为真,在多核 CPU 上,自旋可以避免 goroutine 的切换,某些情况下能对显著提升性能。g 在进入自旋的需要满足的条件如下

  1. 互斥锁只有在普通模式才能进入自旋
  2. runtime.sync_runtime_canSpin()返回 true
    1. 在多 CPU 机器上
    2. 当前 g 为了获取该锁进入自旋次数少于 4 次
    3. GOMAXPROCS > 1,至少一个其他 P 在 running,且当前 p 本地 runq 为空
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 争锁,实现fast path
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
... race检测相关
return
}
// 便于编译器对fast path进行内联优化
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}

// 如果CAS没有获得锁则进入slow path
// 主体是一个很大的for循环,主要由以下过程组成
// 1. 判断当前g能否进入自旋
// 2. 通过自旋等待互斥锁释放
// 3. 计算互斥锁的最新状态
// 4. 更新互斥锁的状态并获取锁
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// 饥饿模式下无法自旋
// 告知持有锁的g,在唤醒锁的时候不用再唤醒其他g了
// old&(mutexLocked|mutexStarving) == mutexLocked 必须是上锁、不能处于饥饿状态
// runtime_canSpin(iter)看看自旋次数iter是否超过4,是否在多CPU机器上运行,是否有运行中的P且runq为空
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// !awoke是否是唤醒状态
// old&mutexWoken == 0没有其他正在唤醒的节点
// old>>mutexWaiterShift != 0 表示当前有正在等待的goroutine
// atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) CAS将mutexWoken状态位设置为1
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 设置唤醒状态位真
awoke = true
}
// 执行30次PAUSE指令占用CPU并消耗CPU时间
runtime_doSpin()
// 自旋次数加一
iter++
// 获取当前锁状态
old = m.state
continue
}
// ---------------------------------------------------
// 处理完自旋逻辑后,会根据上下文计算当前互斥锁的最新状态
// 当前情况有两种1.自旋超过了次数 2.目前锁没有被持有
new := old
if old&mutexStarving == 0 {
// 如果当前不是饥饿模式,那么将mutexLocked状态位设置1,表示加锁
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
// 如果old被锁定或者处于饥饿模式,则waiter加一,表示等待一个等待计数
new += 1 << mutexWaiterShift
}
// 如果是饥饿状态,并且已经上锁了,那么mutexStarving状态位设置为1,设置为饥饿状态
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// awoke为true则表明当前线程在上面自旋的时候,修改mutexWoken状态成功
if awoke {
// g被唤醒了,无论要抢锁还是排队,操作完后都不是被唤醒的g了
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 清除唤醒标志位,后续流程可能g被挂起,需要其他释放锁的g来唤醒
new &^= mutexWoken
}
// ---------------------------------------------------
// 使用CAS函数更新状态
// 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
// 在饥饿模式下,当前 Goroutine 会获得互斥锁
// 如果等待队列中只有当前 Goroutine,互斥锁还会从饥饿模式中退出;
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 1.如果原来状态没有上锁,也没有饥饿,那么直接返回,表示获取到锁
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 2.到这里是没有获取到锁,判断一下等待时长是否不为0
// 如果之前已经等过,则放到队列头部
queueLifo := waitStartTime != 0
// 3.如果等待时间为0,那么初始化等待时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 4.阻塞等待
// 如果没有通过CAS获取到锁,则会调用此函数通过信号量来保证资源不会被两个g同时获取
// 会在方法中不断尝试获取锁并陷入休眠等待信号量释放, 一旦当前g获取到信号量,会立即返回继续执行下文逻辑
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 5.唤醒之后检查锁是否应该处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 6.判断是否已经处于饥饿状态
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 7.加锁并且将waiter数减1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 8.如果当前goroutine不是饥饿状态,就从饥饿模式切换会正常模式
delta -= mutexStarving
}
// 9.设置状态
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}

const(
active_spin = 4
)

// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq or on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}

unlock 的过程比 lock 的过程简单,fast path 通过去除 mutexLocked 标志位来快速解锁,如果失败,则进入 slow path。slow path 先判断是否已经被解锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (m *Mutex) Unlock() {
...race检测相关
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 等待队列有g在排队
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
// 先判断是否已经被解锁
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 如果是正常模式
if new&mutexStarving == 0 {
old := new
for {
// 如果等待队列为空,或者一个g已经被唤醒或抢到了锁,则不需要唤醒任何g
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 获取唤醒某个g的机会
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式:将互斥锁的所有权直接移交给等待队列头的g,并让出时间片,以便于它可以立即开始运行
// mutexLocked没有设置1,在等待队列头的g被唤醒后才设置
// 如果设置了饥饿模式,mutex仍然是被认定为锁定的, 这样才能让新的g不会获取它
runtime_Semrelease(&m.sema, true, 1)
}
}

Mutex 的实现原理总结

Mutex 底层是 CAS 实现的,内部维护了一个 int32 的状态 state,用于标识锁状态,以及一个 uint32 的信号量 sema 用于挂起和阻塞 goroutine。Mutex 分为正常模式和饥饿模式,不同模式下Lock()UnLock()的对加锁、解锁处理方式不同。Mutex 中的 state 第 4 位及其高位用于存放等待计数,低三位的第一位表示锁状态,第二位表示从正常模式被唤醒,第三位表示进入饥饿模式。

Lock() 先通过 CAS 置 state 为 1,如果失败,则进入 slow path 处理:

  1. 先判断是否可以自旋,饥饿模式下无法自旋
    1. 如果是正常模式,且可以自旋(运行在多 CPU 机器上、当前 g 为了争取该锁进入自旋的次数少于 4、当前机器上至少有个正在运行的 P 且 runq 为空),尝试进行自旋准备:通知运行的 goroutine 不要唤醒其他挂起的 gorotuine,解锁时直接让当前 g 获取锁即可。然后调用runtime_doSpin()进入自旋,执行 30 次 PAUSE 指令占用 CPU,递增自旋次数,重新计算状态
  2. 计算锁状态
  3. 使用 CAS 更新状态
    1. 成功获取锁:返回
    2. 判断等待时间是否为 0,如果是 0 则放在队尾,如果非 0 则放在头部,进入阻塞
    3. 唤醒
      1. 锁是否要进入饥饿状态:等待时间超过 1ms
      2. 重新获取锁状态
      3. 判断是否处于饥饿状态
        1. 是则可以直接获取锁:自减等待计数,设置状态获取锁,如果 starving 不为饥饿,或等待时间没有超过 1ms,或者只有一个 g 在等待队列中,满足任一条件则切换为正常状态
        2. 否:再次循环抢占

UnLock()先 CAS 置锁状态最低位为 0,如果返回结果不为 0,进入 slow path:

也是分别对正常模式和饥饿模式两种进行分别处理,饥饿模式下将锁的所有权直接移交给等待队列头的 g,并让出时间片,以便于它可以立即开始运行。

正常模式下,通过 CAS 更新状态值,唤醒等待队列中的 waiter。当然,如果没有 waiter,或低三位标志位中有一个不为 0 说明有其他 g 在处理了,直接返回。

sync.RWMutex

读写互斥锁不限制并行读,但是读写、写读、写写操作无法并行执行

1
2
3
4
5
6
7
type RWMutex struct {
w Mutex // 被正在写的g持有
writerSem uint32 // 写等待读信号量
readerSem uint32 // 读等待写信号量
readerCount int32 // 正在读的数量
readerWait int32 // 写操作被阻塞时,等待读的数量
}

写锁使用sync.RWMutex.LockUnLock,读锁使用RLockRUnlock

Lock 与 UnLock

Lock 中,先获取内置的互斥锁,获取成功后,其余竞争者 g 会陷入自旋或阻塞。atomic.AddInt32用于阻塞后续的读操作,如果仍有活跃的读操作 g,那么当前写操作 g 会调用 runtime.SemacquireMutex 进入休眠状态等待全部读锁持有者结束后释放 writerSem 信号量,将当前 g 唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
// ...省略race检测
// 首先解决与其他写操作的竞争
rw.w.Lock()
// 通知读操作者,这是一个写操作
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 等待活跃的读操作执行完成
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
// SemacquireMutex 类似于 Semacquire,但用于分析争用的互斥体。如果 lifo 为真,则在等待队列的头部排队等待服务员。 skipframes 是跟踪期间要忽略的帧数,从 runtime_SemacquireMutex 的调用者开始计算。
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
// ...省略race检测代码
}

写锁的释放过程与加锁过程相反

  1. atomic.AddInt32 将 readerCount 变为正数,释放读锁
  2. 通过 for 循环唤醒所有阻塞的读操作 g
  3. 释放写锁

获取写锁时先阻塞写锁获取,后阻塞读锁获取,释放写锁时,先释放读锁唤醒读操作,后释放写锁。这种策略能够保证读操作不会被连续的写操作饿死

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
// ...省略race检测相关
// 通知所有读者,没有活跃的写操作了
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// 解锁不存在的读锁会throw
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 唤醒所有阻塞的读操作
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 解锁允许其他写操作者抢占
rw.w.Unlock()
// ...省略race检测相关
}

RLock 与 RUnlock

读锁的加锁方法不能用于递归读锁定,为不可重入锁,同时,Lock 调用会阻止新的读者获取锁。其中只是将 readerCount+1,如果返回了负数,说明其他 g 获得了写锁,当前 g 就会调用runtime_SemacquireMutex()陷入休眠等待写锁释放。如果返回了正数,则代表 g 没有获取写锁,当前方法返回成功

1
2
3
4
5
6
7
8
9
// RLock locks rw for reading.
func (rw *RWMutex) RLock() {
// ...省略检测race相关
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wamkit for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
// ...省略检测race相关
}

释放读锁的方法也很简单,atomic.AddInt32 减少 readerCount 正在读资源的数量,如果返回大于等于 0,则说明解锁成功,如果小于 0,说明有一个正在执行的写操作,会调用 rUnlockSlow 进入 slow path 处理。rUnlockSlow 会减少写操作等待的读操作数 readerWait 并在所有读操作释放后触发写操作的信号量 writeSem,当该信号量触发时,调度器会唤醒尝试获取写锁的 g

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
// ...省略检测race相关
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
// ...省略检测race相关
}

func (rw *RWMutex) rUnlockSlow(r int32) {
// 解锁不存在的读锁会throw
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

与 Mutex

对于读操作而言主要是使用信号量限制,写操作则是使用互斥锁与信号量限制

  • 获取写锁时
    • 每次解锁读锁都会将 readerCount-1,归零时说明没有读锁获取
    • 将 readerCount 减少 rwmutexMaxReaders 阻塞后续的读操作(将 readerCount 变为负数)
  • 释放写锁时
    • 先通知所有读操作
    • 将 readerCount 置为正数,释放写锁互斥锁

RWMutex 在 Mutex 上提供了额外的细粒度控制,能在读操作远远多于写操作时提升性能。

sync.noCopy

sync.noCopy 是一个特殊的私有结构体,tools/go/analysis/passes/copylock 包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopy 或者实现了 Lock 和 Unlock 方法,如果包含该结构体或者实现了对应的方法就会报错

1
2
3
$ go vet proc.go./prog.go:10:10: assignment copies lock value to yawg: sync.WaitGroup
./prog.go:11:14: call of fmt.Println copies lock value: sync.WaitGroup
./prog.go:11:18: call of fmt.Println copies lock value: sync.WaitGroupv

semaTable

semaTable 存储了可供 g 使用的信号量,是大小为 251 的数组。每一个元素存储了一个平衡树的根,节点是 sudog 类型,在使用时需要一个记录信号量数值的变量 sema,根据它的地址映射到数组中的某个位置,找到对应的节点就找到对应信号的等待队列。

channel 没有使用信号量,而是自己实现了一套排队逻辑

1
2
3
4
5
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}

sync.Once

sync.once 文件内容很少,只有一个结构体与两个方法,其中 sync.Once 用于保证 go 程序运行期间的某段代码只执行一次,暴露出的 Do 方法用于执行给定的方法

在以下代码中,只会输出一次 only once

1
2
3
4
5
6
7
8
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("only once")
})
}
}

输出结果如下:

1
2
$ go run main.go
only once

Once 的结构体也很简单:

1
2
3
4
type Once struct {
done uint32 // 标识代码块是否执行过done
m Mutex // 互斥锁,用于保证原子性操作
}

看看 Do 方法的实现。在源代码的注释中说明了为什么不直接 CAS 设定值,if 方法中调用函数,而要使用这种方式,是因为Do 保证了当它返回时,f 函数已经完成,而直接 CAS 后成功执行不成功返回是无法这样保证的。对于 panic 而言,defer 保证了 panic 也会将 done 置为 1,因此即使 f 调用中 panic,依然算已经执行过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Do 调用函数 f 当且仅当 Do 为这个 Once 的实例第一次被调用时
func (o *Once) Do(f func()) {
// fast path,快速判断是否已执行。如果未执行则进入slow path
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
}

func (o *Once) doSlow(f func()) {
// 加锁保证原子性操作
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
// defer 保证了如果panic也能设置已完成
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

sync.Cond

Cond 可以让一组 goroutine 在满足特定条件时被唤醒。在以下代码中同时运行了 11 个 goroutine,其中 10 个 goroutine 通过sync.Cond.Wait()等待特定条件瞒住,1 个 goroutine 通过sync.Cond.Broadcast()唤醒所有陷入等待的 goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
)

var status int64

func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1 * time.Second)
go broadcast(c)

ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}

func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreInt64(&status, 1)
c.Broadcast()
c.L.Unlock()
}

func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
}
fmt.Println("listen")
c.L.Unlock()
}

运行结果如下:

1
2
3
4
5
6
7
8
9
10
listen
listen
listen
listen
listen
listen
listen
listen
listen
listen

结构体 sync.Cond 中包含了四个字段,最主要的还是 notify。

1
2
3
4
5
6
type Cond struct {
noCopy noCopy // 保证结构体不会在编译期间内被拷贝
L Locker // 用于保护内部的notify字段
notify notifyList // 一个goroutine链表,是实现同步机制的核心结构
checker copyChecker // 禁止运行期间发生的拷贝
}

notifyList 维护了一个 goroutine 链表,以及不同状态的 goroutine 索引

1
2
3
4
5
6
7
type notifyList struct {
wait uint32 // 正在等待的goroutine索引
notify uint32 // 已通知到的goroutine索引
lock uintptr // key field of the mutex
head unsafe.Pointer // 指向链表头节点
tail unsafe.Pointer // 指向链表尾节点
}

在创建一个 Cond 时,必须传入一个 mutex 以关联这个 Cond,保证这个 Cond 的同步属性。

1
2
3
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}

sync.Cond.Wait()

Wait()会使得当前 goroutine 陷入休眠,执行过程分为两个步骤:

  1. 调用runtime_notifyListAdd()将等待计数器 +1 并解锁
  2. 调用runtime_notifyListWait()等待其他 goroutine 的唤醒并加锁
1
2
3
4
5
6
7
8
9
10
11
12
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
// 即将wait以原子方式+1
return atomic.Xadd(&l.wait, 1) - 1
}

notifyListWait()则将当前 goroutine 封装为 sudog,追加到 goroutine 通知链表的末尾,然后挂起当前 goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)
// 如果已经被唤醒,直接返回
if less(t, l.notify) {
unlock(&l.lock)
return
}
// 封装为sudog,并入队
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// 挂起当前goroutine
// 让出当前cpu,并等待scheduler唤醒
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}

Signal()

Signal()会唤醒队列最前面的 goroutine,核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}

func notifyListNotifyOne(l *notifyList) {
// fast path:如果在上次signal后,没有新的waiter,不需要锁了,直接返回即可
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
t := l.notify
// 在加锁的情况下recheck
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
atomic.Store(&l.notify, t+1)
// 从头开始找到满足sudog.ticket == l.notify的goroutine,唤醒并返回
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}

Broadcast()

Broadcast()会唤醒所有满足条件的 goroutine,这个唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}

func notifyListNotifyAll(l *notifyList) {
// fast path
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}

func readyWithTime(s *sudog, traceskip int) {
if s.releasetime != 0 {
s.releasetime = cputicks()
}
// Mark g ready to run.
goready(s.g, traceskip)
}

在条件长时间无法满足时,与使用for {}的忙等相比,sync.Cond 能够让出处理器的使用权,提高 CPU 的利用率。

sync.Map

Go 原生 map 不是线程安全的,在查找、赋值、遍历、删除的过程中都会检测写标志,一旦发现写标志置位(等于 1),则直接 panic。而 sync.Map 则是并发安全的,读取、插入、删除都保持着常数级的时间复杂度,且 sync.Map 的零值是有效的,是一个空 map。sync.Map 更适用于读多写少的场景,写多的场景中会导致 read map 缓存失效,需要加锁,导致冲突增多,而且因为未命中 read map 次数变多,导致 dirty map 提升为 read map,是一个 O(N) 的操作,会降低性能。

一般解决并发读写 map 的思路是加一把大锁,在读写的时候先进行加锁,或把一个 map 分成若干个小 map,对 key 进行哈希操作,只操作对应的小 map,前者锁粒度大,影响并发性能,而后者实现较为复杂,容易出错。

与原生 map 相比,sync.Map 仅遍历的方式有些不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"sync"
)

func main() {
var m sync.Map
// 存放
m.Store("test1", 1)
m.Store("test2", 2)
// 取值
age, _ := m.Load("test1")
fmt.Println(age)
// 遍历
m.Range(func(key, value any) bool {
name := key.(string)
age := value.(int)
fmt.Println(name, age)
return true
})
// 删除
m.Delete("test1")
age, ok := m.Load("test1")
fmt.Println(age, ok)
// 读取或写入
m.LoadOrStore("test2", 3)
age, _ = m.Load("test2")
fmt.Println(age)
}

输出结果如下:

1
2
3
4
5
1
test2 2
test1 1
<nil> false
2

sync.Map 的底层实现

由四个字段组成,其中 mu 互斥锁用于保护 read 和 dirty 字段。

1
2
3
4
5
6
type Map struct {
mu Mutex
read atomic.Value // 实际上存储的是readOnly,可以并发读
dirty map[any]*entry // 原生map,包含新写入的key,且包含read中所有被删除的key
misses int // 每次从read中读取失败就会自增misses,达到一定阈值后会将dirt提升为read
}

真正存储key/value的是read与dirty,但是它们存储的方式是不一样的,前者用atomic.Value,后者单纯使用原生map,原因是read用的是无锁操作,需要保证load/store的原子性,而dirty map 的 load+store 操作是由 mu 互斥锁来保护的。

readOnly 是一个支持原子性的存储的只读数据结构,底层也是一个原生 map。其中 entry 包含了一个指针,指向 value。dirty 的 value 也是 entry 类型的,这里可以看出 read 和 dirty 各自维护了一套 key,key 指向的是同一个 value,只要修改了 entry,对 read 和 dirty 都是可见的。

1
2
3
4
5
6
7
8
type readOnly struct {
m map[any]*entry
amended bool // true if the dirty map contains some key not in m.
}

type entry struct {
p unsafe.Pointer // *interface{}
}

img

entry 的指针 p 共有三种状态:

  1. p == nil:说明该 key/value 已被删除,且 m.dirty == nil 或 m.dirty[k]指向该 key
  2. p == expunged,说该 key/value 已被删除,且 m.dirty 不为 nil,且 m.dirty 中没有这个 key
  3. p 指向一个正常值:表示实际 interface{}的地址,且被记录在 m.read.m[key]中,如果此时 m.dirty 也不为 nil,那么它也被记录在 m.dirty[key]中,二者指向同一个地址。

当删除 key 时,sync.Map 并不会真正地删除 key,而是通过 CAS 将 entry 的 p 设置为 nil,标记为被删除。如果之后创建 m.dirty,p 又会 CAS 设置为 expunged,且不会复制到 m.dirty 中。

如果 p 不为 expunged,和 entry 关联的 value 则可以被原子地更新,如果 p 为 expunged,那么只有在它初次被设置到 m.dirty 后才能被更新。

Store()

expunged 实际上是一个指向任意类型的指针,用于标记从 dirty map 中删除的 entry。

1
var expunged = unsafe.Pointer(new(any))

Store 直接看代码即可,如果 key 在 read 中,会先调用 tryStore,使用 for 循环+CAS 尝试更新 entry,如果更新成功则直接返回。接下来要么 read 中没有这个 key,要么 key 被标记为已删除了,需要先加锁再操作。

  1. 先去 read 中 double check 下,如果存在 key,但 p 为 expunged,说明 m.dirty 不为 nil,且 m.dirty 不存在该 key。此时将 p 状态设置为 nil,将 key 插入 dirty map 中,更新对应 value
  2. 如果 read 中没有此 key,而 dirty 中有,直接更新对应 value
  3. 如果 read 和 dirty 都没有该 key,先看看 dirty 是否为空,为空就要创建一个 dirty,并从 read 中复制没被删除的元素。然后更新 amended 标记为 true,标识 dirty 中存在 read 没有的 key,将 key/value 写入 dirty map 中。更新对应的 value。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func (m *Map) Store(key, value any) {
// 如果read map中存在该key,则尝试直接修改
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// 如果read map中存在该key,但p为expunged,说明m.dirty不为nil且m.dirty不存在该key
// 此时将p的状态修改为nil,并在dirty map中插入key
m.dirty[key] = e
}
// 更新p指向value
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 如果read中不存在该key,但dirty map中存在该key,直接写入更新entry
// 此时read map中仍然没有该key
e.storeLocked(&value)
} else {
// read和dirty中都没有该key
// 如果dirty map为nil,需要创建dirty map,并从read map中复制未删除的元素到新创建的dirty map中
// 更新emended字段未true,表示dirty map中存在read map中没有的key
// 将key/value写入dirty map
if !read.amended {
// 添加第一个新key到dirty map中
// 此处先判断dirty map是否为空,若为空,则浅拷贝read map一次
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}

// 如果为空,则创建一个dirty map,并从read map中复制未删除的元素到新创建的dirty map中
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}

read, _ := m.read.Load().(readOnly)
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}

// CAS设置entry,当p为expunged时返回false
func (e *entry) tryStore(i *any) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}

// 确保entry没有被标记为已清除
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

Load()

流程比 Store 更简单。先从 read 中找,找到了直接调用entry.load(),否则看看 amended,如果是 false,说明 dirty 为空,直接返回 nil 和 false,如果 emended 为 true,说明 dirty 中可能存在要找的 key。先上锁,然后 double check 从 read 找,还没找到就去 dirty 中找,不管 dirty 中找没找到,都得在 missed 记一下,在 dirty 被提升为 read 前都会走这条路。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (m *Map) Load(key any) (value any, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}

missLocked 直接将 misses+1,标识有一次未命中,如果未命中的次数小于 m.dirty 长度,直接返回,否则将 m.dirty 提升为 read,并清空 dirty 和 misses 计数。这样之前一段时间加的 key 就会进到 read 中,提高 read 的命中率。

1
2
3
4
5
6
7
8
9
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}

entry.load()对 p 为 nil 和 p 为 expunged 的 entry 直接返回 nil 和 false,否则将其转为 interface{}返回。

1
2
3
4
5
6
7
func (e *entry) load() (value any, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*any)(p), true
}

Delete()

套路与Load()Store()相似,从 read 中查是否有这个 key,如果有的话调用entry.delete(),将 p 置为 nil。read 中没找到的话,如果 dirty 不为 nil,就去 dirty 中找,先上锁,再 double check,还没在 read 中找到就去 dirty 中找,然后执行删除操作,再调用 missLocked 看看是否要将 dirty 上升到 read,这里不管是否在 dirty 中找到都得标记一下 misses。如果找到并删除了,调用entry.delete(),否则返回 nil 和 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Delete deletes the value for a key.
func (m *Map) Delete(key any) {
m.LoadAndDelete(key)
}

func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
// 不管是否存在都记一下misses
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}

entry.delete()也是 CAS 操作,将 p 置为 nil,当判断 p 为 nil 或 expunged 时会直接返回 nil 和 false。

1
2
3
4
5
6
7
8
9
10
11
func (e *entry) delete() (value any, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*any)(p), true
}
}
}

如果 read 中找到了 key,仅仅是将 p 置为 nil 做一个标记,这样在仅有 dirty 中有这个 key 的时候才会直接删除这个 key。这样的目的在于在下次查找这个 key 时会命中 read,提升效率,如果只在 dirty 存在,read 就无法起到缓存的作用,会直接删除。key 本身是需要在 missLocked 前将 key 从 dirty 中删除,才能使其被垃圾回收。

LoadOrStore()

结合了 Load 和 Store 的功能,如果 map 存在该 key,就返回对应 value,否则将 value 设置给该 key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) {
// fast path
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}

m.mu.Lock()
// 与store类似
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()

return actual, loaded
}

func (e *entry) tryLoadOrStore(i any) (actual any, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*any)(p), true, true
}
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*any)(p), true, true
}
}
}

Range()

参数需要传入一个函数,Range()在遍历时会将调用时刻所有key/value传给该函数,如果返回了false会停止遍历。由于会遍历所有key,是一个O(N)的操作,所以将dirty提升为read,将开销分摊开,提升了效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Map) Range(f func(key, value any) bool) {
read, _ := m.read.Load().(readOnly)
// dirty存在read没有的key
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
// double check
if read.amended {
// 将dirty提升为read
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
// O(N)遍历,当f返回false时停止
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}

参考