写在前面

Go语言是一门在语言层面支持用户级线程的高级语言,因此并发同步在Go程序编写中尤其重要,其中channel虽然作为并发控制的高级抽象,但它的底层就是依赖于sync标准库中的mutex来实现的,因此了解sync标准库是每一个Gopher的必备技能之一。

笔者使用的Go版本是1.18.1

sync.WaitGroup

sync.WaitGroup使得多个并发执行的代码块在达到WaitGroup显式指定的同步条件后才得以继续执行Wait()调用后的代码,即达到并发goroutine执行屏障的效果。

在以下代码中,我们希望达到多个goroutine异步执行完输出任务后,main goroutine才退出的效果,此时程序执行完毕。转换为实例,就是使得程序输出110,此处我们并不关心main()中创建的两个goroutine之间的执行顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import "fmt"

func main() {
go func() {
fmt.Print(1)
}()
go func() {
fmt.Print(1)
}()
fmt.Print(0)
}

此时输出

1
0

与期望结果不符,程序运行完毕时只输出了0,这是因为goroutine的创建和调度需要时间,在两个goroutine创建期间,main goroutine已经输出了0,导致main函数结束,程序执行完毕。我们可以使用WaitGroup来完成上述需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"sync"
)

func main() {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
fmt.Print(1)
}()
go func() {
defer wg.Done()
fmt.Print(1)
}()
wg.Wait()
fmt.Print(0)
}

此时输出110

1
110

sync.WaitGroup中记录了仍在并发执行的代码块的数量,Add()相当于对这个数量执行+1操作,不难想到Done()则为执行了Add(-1),事实确实如此。

1
2
3
4
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

WaitGroup

在此之前,我们先明确设定状态运行计数与等待计数信号信号计数

WaitGroup的结构体记录了运行计数,等待计数和信号计数三个uint32来对并发的goroutine进行不同目的的计数,在笔者使用版本中,其实是一个uint64与一个uint32。这里兼容了32位系统,因为在32位机器上无法实现对64位字段的原子操作(64位字段相当于两个指令,无法同时完成)。在32位平台上,如果wg.state1元素依然按照64位平台的顺序返回(waiter, counter, sema),那么wg.state1[0]的内存地址是32位对齐的,不能保证一定是64位对齐的,就无法进行64位原子操作(后续需要对状态进行整体的原子更改atomic.AddUnit64)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type WaitGroup struct {
// 表示 `WaitGroup` 是不可复制的,只能用指针传递,保证全局唯一
// noCopy是个空接口,占用0字节,因此内存对齐可以忽略此字段
noCopy noCopy

state1 uint64
state2 uint32
}

// state 返回指向存储在 wg.state 中的 state(运行计数和等待计数) 和 sema(信号计数) 字段的指针。
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
// 判断是否对8对齐,32位也可能满足这种情况,否则要手动padding
if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 这里对8字节对齐
// 前8bytes做uint64指针state,后4bytes做sema
return &wg.state1, &wg.state2
} else {
// 前4bytes做sema,后8bytes做uint64指针state
state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
}
}

为了保证在32位系统上也能原子访问64位对齐的64位字,通过state()来消除高层上实现的差异,具体可以参考issue-6404。由于在64位机器上,8字节是单个机器字的长度,内存地址对8取模可以判断该数据对象的内存地址是否为64位对齐,state()wg.state1的内存地址对8进行取模来判断程序是允许在64位平台还是32位平台上,根据结果来返回信息

  • 等于0返回: wg.state1(wg.state1[0])和wg.state1[2]的内存地址
    • 此时返回状态、信号
  • 不等于0(32位环境)返回:wg.state1[1]wg.state1[0]的内存地址
    • 此时返回信号、状态

state调整的前提:如果不能保证对8字节对齐,需要手动移位对齐,这里用了内存对齐的padding

在4字节对齐的环境中,8字节可能跨越了两个cache line,不保证64位的原子操作。

img

在32位架构中,WaitGroup在初始化的时候,分配内存地址的时候是随机的,所以WaitGroup结构体state1起始的位置不一定是64位对齐,可能会是:uintptr(unsafe.Pointer(&wg.state1))%8 = 4,如果出现这样的情况,那么就需要用state1的第一个元素做padding + 4,这样操作后两组就能对8字节进行对齐了,用state1的后两个元素合并成uint64来表示statep,以下是一个小实验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import (
"unsafe"
)

type a struct {
b byte
}

type w struct {
state1 uint64
state2 uint32
}

func main() {
b := a{}
println(unsafe.Sizeof(b), uintptr(unsafe.Pointer(&b)), uintptr(unsafe.Pointer(&b))%8 == 0)
wg := w{}
// 64环境下
// 1 824634031959 false
// 16 824634031968 true
// 16 824634031972 false
// 32环境下 如果是64位win,需要在cmd情况下 set GOARCH=386
// 1 285454255 false
// 12 285454260 false
// 12 285454264 true 经过第一个uint32的padding,后续的两位uint32对8字节对齐
println(unsafe.Sizeof(wg), uintptr(unsafe.Pointer(&wg.state1)), uintptr(unsafe.Pointer(&wg.state1))%8 == 0)
state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
println(unsafe.Sizeof(wg), uintptr(unsafe.Pointer(&state[1])), uintptr(unsafe.Pointer(&state[1]))%8 == 0)
}

Wait()

Wait()主要用于阻塞g,直到WaitGroup的计数为0。先获取访问计数值的指针(state,sema),在自旋循环体中,通过检查计数来检查目前还没有达成同步条件的并行代码块的数量,并且在每次完成检查后增加一次等待计数。此处没有使用密集循环来构造自旋锁等待,是处于性能考虑:为了保证其他goroutine能够得到充分调度。如果每一次检查计数时没有达成同步条件,下次循环如果当前goroutine不主动让出CPU,会导致CPU空转,降低性能。这里用了runtime_Semacquire(semap),如果等待计数被成功记录,则直接增加信号量,挂起当前g,否则再进行一次循环获取最新的同步状态。

Wait()可以在不同的g上执行,且调用Wait()的g数量也可能不唯一,因此需要等待计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Wait blocks until the WaitGroup counter is zero.
// race检测相关代码已略去
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
// 原子操作
state := atomic.LoadUint64(statep)
// 运行counter(state的高32位)
v := int32(state >> 32)
// counter为0时直接返回
if v == 0 {
return
}
// 增加等待计数
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 增加信号量并挂起当前g,使当前g让出cpu
// 信号量为0时唤醒
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}

Add()

Add()不止是简单的将信号量增delta,还需要考虑很多因素

  • 内部运行计数不能为负
  • Add必须与Wait属于happens before关系
    • 毕竟Wait是同步屏障,没有Add,Wait就没有了意义
  • 通过信号量通知所有正在等待的goroutine

先假设statep的高32位=1,代表有一个运行计数。当Add(-1)时,statep的高32位+负数的补码32个1,会溢出1导致statep的高32位=0,即运行计数清零,Wait操作达成同步条件

img

具体过程如下:

  1. 通过state获取状态指针statep和信号指针semap,statep的高32位为counter,低32位为waiter
  2. 调用atomic.AddUint64()将传入的delta左移四位加上statep,即counter+delta
  3. counter可能为负,所以用int32来存值,waiter不可能为负,所以用uint32存值
  4. 经过一系列校验,counter为负则panic,w不等于0且delta>0且v值为delta,说明add在wait后调用,会panic,因为waitGroup不允许Wait方法调用后还调用add方法
  5. v > 0或w != 0时直接return,此时不需要释放waiter
  6. 到了*statep != state,状态只能是waiter>0且counter==0,当waiter>0时,肯定不能add,且counter==0时,wait不会再自增waiter,结果一定是一致的,否则触发panic
  7. 将statep置为0,释放所有waiter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 已略去race检测相关代码
func (wg *WaitGroup) Add(delta int) {
// 获取状态指针和信号指针
statep, semap := wg.state()
// 在运行计数上记录delta
// 高32bit是计数值v,所以把delta左移32,增加到计数上
state := atomic.AddUint64(statep, uint64(delta)<<32)
// 运行计数
v := int32(state >> 32)
// 等待计数(低32位)
w := uint32(state)
// 任务计数器不能为负数
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// 添加与等待同时调用(应该是happens before的关系)
// 已经执行了Wait,不容许再执行Add
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 运行计数>0或没有writer在等待,直接返回
if v > 0 || w == 0 {
return
}
// happens before,add和wait并发调用
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 唤醒所有等待的goroutine,并将等待计数清零
// 此时counter一定为0,waiter一定>0
*statep = 0
for ; w != 0; w-- {
// 执行一次释放一个,唤醒一个waiter
runtime_Semrelease(semap, false, 0)
}
}

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

WaitGroup的实现原理总结

WaitGroup内部维护了3个uint32(实际上是一个uint64,一个uint32),分别是状态和信号,状态包括了运行计数counter和等待计数waiter,信号指信号计数sema。运行计数代表了调用Add()添加的delta值,等待计数代表了调用Wait()陷入等待的goroutine的数量,信号量sema是runtime内部信号量的实现,用于挂起和唤醒goroutine。在Add()的时候增加运行计数,Wait()的时候增加等待计数,如果运行计数不为0,则将goroutine挂起,等到调用Done()->Add(-1)使等待计数为0时会唤醒所有挂起的goroutine。

我觉得比较核心的地方在于3个uint32中兼容32位机器和64位机器的实现。由于状态是64位的,需要进行64位原子操作更新,但是由于32位的环境只对4字节对齐,首字段可能不是对8字节对齐的,因此要用state进行兼容,如果不对8字节对齐,则将状态放在后面两位uint32中,前面一个4字节的作为padding,存放信号计数sema。如果对8字节对齐,状态直接放在前两位即可,这样就兼容了32位和64位对64位原子操作的支持。

sync.Pool

sync.Pool也许应该叫sync.Cache,简单来说就是为了避免频繁分配、回收内存给GC带来负担的cache,pool与连接池类似。sync.Pool可以将暂时不用的对象缓存起来,等到下次需要的时候直接使用,也不用再次经过内存分配,复用对象的内存,减轻了GC的压力,提升系统性能。

如果没有sync.Pool

多个goroutine都需同时创建一个对象时,如果goroutine数过多,会导致对象的创建数目递增,导致GC压力过大。形成’并发大->占用内存大->GC缓慢->处理并发能力降低->并发更大’的恶性循环。解决此问题的关键思想就在于对象的复用,避免重复创建、销毁。

以下是一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"sync"
)

var pool *sync.Pool

type Person struct {
Name string
}

func init() {
pool = &sync.Pool{
New: func() interface{} {
fmt.Println("creating a new person")
return new(Person)
},
}
}

func main() {
person := pool.Get().(*Person)
fmt.Println("Get Pool Object:", person)
person.Name = "first"
pool.Put(person)

fmt.Println("Get Pool Object:", pool.Get().(*Person))
fmt.Println("Get Pool Object:", pool.Get().(*Person))
}

输出结果如下

1
2
3
4
5
creating a new person
Get Pool Object: &{}
Get Pool Object: &{first}
creating a new person
Get Pool Object: &{}

在以上代码中,init()创建了一个sync.Pool,实现的New()方法为创建一个person对象,并打印一句话,main()中调用了三次Get()和一次Put。根据输出结果看来,如果在调用Get()时,pool中没有对象,那么就会调用New()创建新的对象,否则会从pool中的对象获取。我们还可以看到,put到pool中的对象属性依然是之前设定的,并没有被重置。

sync.Pool广泛运用于各种场景,典型例子是fmt包中的print:

img

sync.Pool的底层实现

1
2
3
4
5
6
7
8
9
10
11
12
type Pool struct {
// go1.7引入的一个静态检查机制,代表该对象不希望被复制,可以使用go vet工具检测到是否被复制
// 在使用时需要实现noCopy保证一个对象第一次使用后不会发生复制
noCopy noCopy
local unsafe.Pointer // 每个P的本地队列,实际类型为[P]poolLocal, 一个切片
localSize uintptr // 大小
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// 自定义创建对象回调函数,当pool中没有可用对象时会调用此函数
// 当没有可用对象时,如果不设置该函数,get()会返回nil
New func() any
}

noCopy代表这个结构体是禁止拷贝的,在使用go vet工具时生效。

local是一个poolLocal数组(其实是切片local := make([]poolLocal, size))的指针,localSize代表这个数组的大小,victim也是一个poolLocal数组的指针。

New函数在创建pool时设置,当pool中没有缓存对象时,会调用New方法生成一个新的对象。

在索引poolLocal时,P的id对应[P]poolLocal下标索引,这样在多个goroutine使用同一个pool时能减少竞争,提升了性能。在一轮GC到来时,victim和victimSize会分别接管local和localSize,victim机制用于减少GC后冷启动导致的性能抖动,使得分配对象更加平滑

Victim Cache 是计算机架构里的一个概念,是CPU硬件处理缓存的一种技术,sync.Pool引入它的目的在于降低GC压力的同时提高命中率。

poolLocal

img

这里得提到伪共享问题。伪共享问题,就是在多核CPU架构下,为了满足数据一致性维护一致性协议MESI,频繁刷新同一cache line导致高速缓存并未起到应有的作用的问题。试想一下,两个独立线程要更新两个独立变量,但俩独立变量都在同一个cache line上,当前cache line是share状态。如果core0的thread0去更新cache line,会导致core1中的cache line状态变为Invalid,随后thread1去更新时必须通知core0将cache line刷回主存,然后它再从主从中load该cache line进高速缓存之后再进行修改,但是该修改又会使得core0的cache line失效,重复上述过程,导致高速缓存相当于没有一样,反而还因为频繁更新cache影响了性能。

这里poolLocal的字段pad就是用于防止伪共享问题,cache line在x86_64体系下一般是64字节

1
2
3
4
5
6
7
type poolLocal struct {
poolLocalInternal
// 将poolLocal补齐至缓存行的大小,防止false sharing(伪共享)
// 在多数平台上128 mod (cache line size) = 0可以防止伪共享
// 伪共享,仅占位用,防止在cache line上分配多个poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

poolLocal数组的大小是程序中P的数量,Pool的最大个数是runtime.GOMAXPROCS(0)

1
2
3
4
5
6
7
// Local per-P Pool appendix.
type poolLocalInternal struct {
// P的私有缓存区,使用时不需要加锁
private any // Can be used only by the respective P.
// 公共缓存区,本地P可用pushHead/popHead。其他的P只能popTail
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}

poolLocalInternal中private代表缓存了一个元素,只能由相应的一个P存取。因为一个P同时只能执行一个goroutine,因此不会有并发问题,使用时不需要加锁。

shared则可以由任意的P访问,但是只有本地的P才能pushHead或popHead,其他P可以popTail。

poolChain

看看poolChain的实现,这是一个双端队列的实现

其中poolDequeue是PoolQueue的一个实现,实现为单生产者多消费者的固定大小的无锁Ring式队列,通过atomic实现,底层存储用数组,head,tail标记。

生产者可以从head插入,tail删除,而消费者只能从tail删除。headTail变量通过位运算存储了head和tail的指针,分别指向队头与队尾。

poolChain没有使用完整的poolDequeue,而是封装了一层,这是因为它的大小是固定长度的,而pool则是不限制大小的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// file: /sync/poolqueue.go
type poolChain struct {
// 只有生产者会push,因此不需要加锁同步
head *poolChainElt

// 能被消费者使用,所以操作必须要有原子性
tail *poolChainElt
}

type poolChainElt struct {
poolDequeue
// next被producer写,consumer读,所以只会从nil变成non-nil
// prev被consumer写,producerr读,所以只会从non-nil变成nil
next, prev *poolChainElt
}

type poolDequeue struct {
// 包含了一个32位head指针和一个32位tail指针,都与len(vals) - 1取模过
// tail是队列中最老是数据,head指向下一个要填充的slot
// slots范围是[tail, head),由consumers持有
// 高32位为head,低32位为tail
headTail uint64
// vals是一个存储interface{}的环形队列,size必须是2的幂
// 如果slot为空,则vals[i].typ为空
// 一个slot在此宣告无效,那么tail就不指向它了,vals[i].typ为nil
// 由consumers设置为nil,由producer读
vals []eface
}

type eface struct {
typ, val unsafe.Pointer
}

由此图可以看到pool的整体结构

img

获取一个对象-Get()

Get()的过程清晰明了:

  1. 首先通过调用p.pin()将当前goroutine与P绑定,禁止被抢占,返回当前P对应的poolLocal以及pid。
  2. 获取local的private赋给x,并置local的private为nil
  3. 判断x是否为空,若为空,则尝试从local的shared头部获取一个对象,赋值给x。如果x仍然为空,会调用getSlow()从其他P的shared尾部偷取一个对象
  4. 调用runtime_procUnpin()解除非抢占。
  5. 如果到此时还没有获取到对象,调用设置的New()创建一个新对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (p *Pool) Get() any {
...
// 将当前goroutine绑定到当前p上
l, pid := p.pin()
// 优先从local的private中获取
x := l.private
l.private = nil
if x == nil {
// 如果local的private没有,尝试获取local shared的head
x, _ = l.shared.popHead()
// 如果还没有,则进入slow path
// 调用
if x == nil {
x = p.getSlow(pid)
}
}
// 解除抢占
runtime_procUnpin()
...
// 如果没有获取到,尝试使用New()创建一个新对象
if x == nil && p.New != nil {
x = p.New()
}
return x
}

pin()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
// 传入的i是数组的index
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}

作用是将当前goroutine与P绑定在一起,禁止抢占,且返回对应poolLocal和P的id。

如果goroutine被抢占,那么g的状态会从running变为runnable,会被放回P的localq或globalq,等待下一次调度。但当goroutine下次再次执行时,就不一定和现在的P结合了,因为之后会用到pid,如果被抢占,可能接下来使用的pid与绑定的pid不是同一个。

绑定的逻辑主要在procPin()中。它将当前gorotuine绑定的m上的locks字段+1,即完成了绑定。调度器执行调度时,又是会抢占当前执行goroutine所绑定的P,防止一个goroutine占用CPU过长时间。而判断一个goroutine能被被抢占的条件就是看m.locks是否为0,若为0,则可以被抢占。而在procPin()中,m.locks+1,表示不能被抢占。

1
2
3
4
5
6
7
8
//go:nosplit
func procPin() int {
_g_ := getg()
mp := _g_.m

mp.locks++
return int(mp.p.ptr().id)
}

p.pin()中,获取到p.localSize和p.local后,如果当前pid小于p.localSize,则直接获取poolLocal数组中pid索引处的位置,否则说明Pool还没有创建poolLocal,调用p.pinSlow()完成创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (p *Pool) pinSlow() (*poolLocal, int) {
// 解除绑定
// 避免上大锁造成阻塞,浪费资源
runtime_procUnpin()
// 加全局锁
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 重新绑定
pid := runtime_procPin()
// 已经加了全局锁,此时不需要再用原子操作
s := p.localSize
l := p.local
// 对pid重新检查,因为pinSlow途中可能已经被其他线程调用了
// 如果已经创建过了,那么直接返回即可
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 初始化时会将pool放到allPools中
if p.local == nil {
allPools = append(allPools, p)
}
// 当前P数量
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
// 回收旧的local
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}

pinSlow()在加锁的情况下进行重试,加全局锁创建一个poolLocal。整体过程如下:

img

popHead()

1
2
3
4
5
6
7
8
9
10
11
12
func (c *poolChain) popHead() (any, bool) {
d := c.head
for d != nil {
if val, ok := d.popHead(); ok {
// 如果成功获得值,则返回
return val, ok
}
// 继续尝试获取缓存的对象
d = loadPoolChainElt(&d.prev)
}
return nil, false
}

popHead()只会被生产者调用。函数执行时先拿到头节点,如果不为空,则调用头节点的popHead()。这俩popHead()的实现不一致。poolDequeue的popHead()移除并返回queue的头节点,如果queue为空,会返回false。此处queue中存储的对象就是Pool里缓存的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (d *poolDequeue) popHead() (any, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// queue为空
if tail == head {
return nil, false
}
// 验证尾节点,并自减头节点指针,这个操作在读出slot的value之前执行。
// 此处是为了锁住head指针的位置,下一步CAS保证去除的必然是头节点
head--
ptrs2 := d.pack(head, tail)
// 典型CAS
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// 成功取出value
// 实际就是取head低n位的值
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*any)(unsafe.Pointer(slot))
// 获取到nil的话就是nil了
if val == dequeueNil(nil) {
val = nil
}
// 重置slot。和popTail不同,这里不会与pushHead产生竞态条件。
*slot = eface{}
return val, true
}

回到poolChain.popHead(),如果获取成功则直接返回,否则继续尝试。

getSlow()

getSlow()在shared没有获取到缓存对象的情况下,会尝试从其他P的poolLocal偷取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (p *Pool) getSlow(pid int) any {
// See the comment in pin regarding ordering of the loads.
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 尝试从其他P偷取对象
for i := 0; i < int(size); i++ {
// 从索引pid+1处开始投
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 在尝试从其他P偷取对象失败后,会尝试从victim cache中取对象
// 这样可以使得victim中的对象更容易被回收
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 清空victim,防止后来人再来这里找
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}

回到Get(),如果实在偷不到,后面会通过New()创建一个新的对象。

popTail()

popTail()将queue尾部元素弹出,类似popHead()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (c *poolChain) popTail() (any, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
// TODO: pop前先加载next,此处与一般的双向链表是相反的
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
// 队列为空,只有一个尾结点
return nil, false
}
// 双向链表尾节点的queue已经为空,看下一个节点
// 因为它为空,需要pop掉
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
storePoolChainElt(&d2.prev, nil)
}
// 防止下次popTail的时候会看到一个空的dequeue
d = d2
}
}

底层用的还是poolDequeue的popTail(),与寻常实现大体类似,也是CAS。因为要移除尾部元素,所以tail自增1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (d *poolDequeue) popTail() (any, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
return nil, false
}
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
return val, true
}

img

存放一个对象-Put()

Put()将对象添加到Pool中,主要过程如下:

  1. 绑定当前goroutine与P,然后尝试将x赋值给private
  2. 如果失败,则将其放入local shared的头部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (p *Pool) Put(x any) {
if x == nil {
return
}
...
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
}

pushHead()

如果头节点为空,则初始化一下,默认大小为8。然后调用poolDequeue.pushHead()将其push到队列中,如果失败,则说明队列已满,会创建一个两倍大小的dequeue,然后再次调用poolDequeue.pushHead()。由于前面g与p已经绑定了,所以不会有竞态条件,这里只需要一次重试就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const dequeueBits = 32
const dequeueLimit = (1 << dequeueBits) / 4

func (c *poolChain) pushHead(val any) {
d := c.head
if d == nil {
// 初始化头节点,初始大小为8
const initSize = 8
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
// 将其push到队列中,如果成功则直接返回
if d.pushHead(val) {
return
}
// 当前dequeue满了,分配一个新的两倍大小的dequeue
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// dequeue最大限制为2^30
newSize = dequeueLimit
}

d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}

底层用的还是poolDequeue的pushHead(),他将val添加到队列头部,如果队列满了会返回false,走刚才创建新一个两倍大小队列的路,这个函数只能被一个生产者调用,因此不会有竞态条件。首先通过位运算判断队列是否已满,将tail加上当前dequeue内节点的数量,即d.vals的长度,再取低31位,看看它与head是否相等,相等的话就说明队列满了,如果满了直接返回false。否则通过head找到即将填充的slot位置,去head指针的低31位,判断是否有另一个goroutine在popTail这个slot,如果有则返回false。这里是判断typ是否为空,因为popTail是先设置val,再将typ设置为nil的。最后将val赋值给slot,自增head。

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (d *poolDequeue) pushHead(val any) bool {
ptrs := atomic.LoadUint64(&d.headTail)
// 解包,高32位为head,低32位为tail
head, tail := d.unpack(ptrs)
// 判断队列是否已满
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// 队列满了
return false
}
// 找到即将填充的slot位置
slot := &d.vals[head&uint32(len(d.vals)-1)]
// 检查slot是否与popTail有冲突
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// 另一个g在popTail这个slot,说明这个队列仍然是满的
return false
}
if val == nil {
val = dequeueNil(nil)
}
// 将val赋值给slot
*(*any)(unsafe.Pointer(slot)) = val
// 自增head
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}

在*(*any)(unsafe.Pointer(slot)) = val中,由于slot是eface类型,先将slot转换为interface{}类型,这样val就能以interface{}类型赋值给slot,使得slot.typ和slot.val指向其内存块,slot的typ, val均不为空。

pack()

再来看看pack()unpack(),它们的作用是打包和解包head和tail俩指针。实际上很简单,pack()就是将head左移32位,或上tail与低31位全1,返回整合成的uint64。

1
2
3
4
5
func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}

unpack()则将整合的uint64右移32位与上低31位全1,得到head,而tail则是低32位与上低31位全1。

1
2
3
4
5
6
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1<<dequeueBits - 1
head = uint32((ptrs >> dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
}

GC

Pool实际上也不能无限扩展,否则会因为对象占用内存过多导致OOM。在pool.go的init()中,注册了GC发生时如何清理Pool的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
// sync/pool.go
func init() {
runtime_registerPoolCleanup(poolCleanup)
}

// runtime/mgc.go
var poolcleanup func()

//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
// 利用编译器标志将sync包的清理注册到runtime
func sync_runtime_registerPoolCleanup(f func()) {
poolcleanup = f
}

实际上调用的是pool.poolCleanup(),主要是将local与victim进行交换,不至于让GC将所有的Pool都清空,有victim兜底,需要两个GC周期才会被释放。如果sync.Pool的获取、释放速度稳定,就不会有新的Pool对象进行分配,如果获取的速度下降,那么对象可能会在两个GC周期内被释放,而不是以前的一个GC周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func poolCleanup() {
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}

// Move primary cache to victim cache.
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}

// The pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches.
oldPools, allPools = allPools, nil
}

下面模拟一下调用poolCleanup()前后 ,oldPools,allPools与p.victim的变化:

  1. 初始时oldPools与allPools都为nil
  2. 第一次调用Get(),因为p.local为nil,会通过pinSlow()创建p.local,将p放入allPools,此时allPools长度为1,oldPools为nil
  3. 对象使用完毕,调用Put()放回对象
  4. 第一次GC STW,allPools中所有p.local赋值给victim,并置为nil。allPools赋值给oldPools,置为nil。此时oldPools长度为1,allPools为nil
  5. 第二次调用Get(),由于p.local为nil,会尝试从p.victim中获取对象。
  6. 对象使用完毕,调用Put()放回对象。由于p.local为nil,会重新创建p.local,并放回对象,此时allPools长度为1,oldPools长度也为1
  7. 第二次GC STW,oldPools中所有p.victim置为nil,前一次cache在本次GC时被回收,allPools中所有p.local将值赋值给victim并置为nil。最后allPools为nil,oldPools长度为1。

从以上可以看出,p.victim的定位是次级缓存,在GC时将对象放到其中,下次GC来临前,如果有Get()调用则从其中获取,直到再一次GC到来时回收。从victim中取出的对象并不放回victim中,一定程度上也减小了下一次GC的开销,使得原先一次GC的开销被拉长到两次,有一定程度的开销减小。

sync.Mutex

Mutex是公平互斥锁

每个g去获取锁的时候都会尝试自旋几次,如果没有获取到则进入等待队列尾部(先入先出FIFO)。当持有锁的g释放锁时,位于等待队列头部的g会被唤醒,但是需要与后来g竞争,当然竞争不过,因为后来g运行在cpu上处于自旋状态,且后来g会有很多,而刚唤醒的g只有一个,只能被迫重新插回头部。当等待的g本次加锁等待时间超过1ms都没有获得锁时,它会将当前Mutex从正常模式切换为饥饿模式,Mutex所有权会直接从释放锁的g上直接传给队头的g,后来者不自旋也不会尝试获取锁,会直接进入等待队列尾部。

当发生以下两种情况时Mutex会从饥饿模式切换为正常模式

  • 获取到锁的g刚来,等待时间小于1ms
  • 该g是等待队列中最后一个g

饥饿模式下不再尝试自旋,所有g都要排队,严格先来后到,可以防止尾端延迟

img

互斥锁state的最低三位分别标识mutexLockedmutexWokenmutexStarving,剩余位置用于标识当前有多少个g在等待互斥锁释放

  • mutexLocked — 表示互斥锁的锁定状态;
  • mutexWoken — 表示从正常模式被唤醒;
  • mutexStarving — 当前的互斥锁进入饥饿状态;
  • waitersCount — 当前互斥锁上等待的 Goroutine 个数;
1
2
3
4
5
6
7
8
9
10
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
// 状态
state int32
// 信号量
sema uint32
}

看看lock与unlock。lock会锁住Mutex,如果这个锁在被使用,那么调用的g会被阻塞直到这个互斥锁被释放。当锁state为0时,会将mutexLocked位置置为1,当state不是0时,会调用sync.Mutex.lockSlow()尝试通过自旋等方式来等待锁的释放。

自旋是一种多线程同步机制,当前进程在进入自旋的过程中会一直保持对CPU的占用,持续检查某个条件是否为真,在多核CPU上,自旋可以避免goroutine的切换,某些情况下能对显著提升性能。g在进入自旋的需要满足的条件如下

  1. 互斥锁只有在普通模式才能进入自旋
  2. runtime.sync_runtime_canSpin()返回true
    1. 在多CPU机器上
    2. 当前g为了获取该锁进入自旋次数少于4次
    3. GOMAXPROCS > 1,至少一个其他P在running,且当前p本地runq为空
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 争锁,实现fast path
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
... race检测相关
return
}
// 便于编译器对fast path进行内联优化
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}

// 如果CAS没有获得锁则进入slow path
// 主体是一个很大的for循环,主要由以下过程组成
// 1. 判断当前g能否进入自旋
// 2. 通过自旋等待互斥锁释放
// 3. 计算互斥锁的最新状态
// 4. 更新互斥锁的状态并获取锁
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// 饥饿模式下无法自旋
// 告知持有锁的g,在唤醒锁的时候不用再唤醒其他g了
// old&(mutexLocked|mutexStarving) == mutexLocked 必须是上锁、不能处于饥饿状态
// runtime_canSpin(iter)看看自旋次数iter是否超过4,是否在多CPU机器上运行,是否有运行中的P且runq为空
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// !awoke是否是唤醒状态
// old&mutexWoken == 0没有其他正在唤醒的节点
// old>>mutexWaiterShift != 0 表示当前有正在等待的goroutine
// atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) CAS将mutexWoken状态位设置为1
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 设置唤醒状态位真
awoke = true
}
// 执行30次PAUSE指令占用CPU并消耗CPU时间
runtime_doSpin()
// 自旋次数加一
iter++
// 获取当前锁状态
old = m.state
continue
}
// ---------------------------------------------------
// 处理完自旋逻辑后,会根据上下文计算当前互斥锁的最新状态
// 当前情况有两种1.自旋超过了次数 2.目前锁没有被持有
new := old
if old&mutexStarving == 0 {
// 如果当前不是饥饿模式,那么将mutexLocked状态位设置1,表示加锁
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
// 如果old被锁定或者处于饥饿模式,则waiter加一,表示等待一个等待计数
new += 1 << mutexWaiterShift
}
// 如果是饥饿状态,并且已经上锁了,那么mutexStarving状态位设置为1,设置为饥饿状态
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// awoke为true则表明当前线程在上面自旋的时候,修改mutexWoken状态成功
if awoke {
// g被唤醒了,无论要抢锁还是排队,操作完后都不是被唤醒的g了
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 清除唤醒标志位,后续流程可能g被挂起,需要其他释放锁的g来唤醒
new &^= mutexWoken
}
// ---------------------------------------------------
// 使用CAS函数更新状态
// 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
// 在饥饿模式下,当前 Goroutine 会获得互斥锁
// 如果等待队列中只有当前 Goroutine,互斥锁还会从饥饿模式中退出;
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 1.如果原来状态没有上锁,也没有饥饿,那么直接返回,表示获取到锁
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 2.到这里是没有获取到锁,判断一下等待时长是否不为0
// 如果之前已经等过,则放到队列头部
queueLifo := waitStartTime != 0
// 3.如果等待时间为0,那么初始化等待时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 4.阻塞等待
// 如果没有通过CAS获取到锁,则会调用此函数通过信号量来保证资源不会被两个g同时获取
// 会在方法中不断尝试获取锁并陷入休眠等待信号量释放, 一旦当前g获取到信号量,会立即返回继续执行下文逻辑
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 5.唤醒之后检查锁是否应该处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 6.判断是否已经处于饥饿状态
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 7.加锁并且将waiter数减1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 8.如果当前goroutine不是饥饿状态,就从饥饿模式切换会正常模式
delta -= mutexStarving
}
// 9.设置状态
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}

const(
active_spin = 4
)

// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq or on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}

unlock的过程比lock的过程简单,fast path通过去除mutexLocked标志位来快速解锁,如果失败,则进入slow path。slow path先判断是否已经被解锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (m *Mutex) Unlock() {
...race检测相关
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 等待队列有g在排队
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
// 先判断是否已经被解锁
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 如果是正常模式
if new&mutexStarving == 0 {
old := new
for {
// 如果等待队列为空,或者一个g已经被唤醒或抢到了锁,则不需要唤醒任何g
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 获取唤醒某个g的机会
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式:将互斥锁的所有权直接移交给等待队列头的g,并让出时间片,以便于它可以立即开始运行
// mutexLocked没有设置1,在等待队列头的g被唤醒后才设置
// 如果设置了饥饿模式,mutex仍然是被认定为锁定的, 这样才能让新的g不会获取它
runtime_Semrelease(&m.sema, true, 1)
}
}

Mutex的实现原理总结

Mutex底层是CAS实现的,内部维护了一个int32的状态state,用于标识锁状态,以及一个uint32的信号量sema用于挂起和阻塞goroutine。Mutex分为正常模式和饥饿模式,不同模式下Lock()UnLock()的对加锁、解锁处理方式不同。Mutex中的state第4位及其高位用于存放等待计数,低三位的第一位表示锁状态,第二位表示从正常模式被唤醒,第三位表示进入饥饿模式。

Lock()先通过CAS置state为1,如果失败,则进入slow path处理:

  1. 先判断是否可以自旋,饥饿模式下无法自旋
    1. 如果是正常模式,且可以自旋(运行在多CPU机器上、当前g为了争取该锁进入自旋的次数少于4、当前机器上至少有个正在运行的P且runq为空),尝试进行自旋准备:通知运行的goroutine不要唤醒其他挂起的gorotuine,解锁时直接让当前g获取锁即可。然后调用runtime_doSpin()进入自旋,执行30次PAUSE指令占用CPU,递增自旋次数,重新计算状态
  2. 计算锁状态
  3. 使用CAS更新状态
    1. 成功获取锁:返回
    2. 判断等待时间是否为0,如果是0则放在队尾,如果非0则放在头部,进入阻塞
    3. 唤醒
      1. 锁是否要进入饥饿状态:等待时间超过1ms
      2. 重新获取锁状态
      3. 判断是否处于饥饿状态
        1. 是则可以直接获取锁:自减等待计数,设置状态获取锁,如果starving不为饥饿,或等待时间没有超过1ms,或者只有一个g在等待队列中,满足任一条件则切换为正常状态
        2. 否:再次循环抢占

UnLock()先CAS置锁状态最低位为0,如果返回结果不为0,进入slow path:

也是分别对正常模式和饥饿模式两种进行分别处理,饥饿模式下将锁的所有权直接移交给等待队列头的g,并让出时间片,以便于它可以立即开始运行。

正常模式下,通过CAS更新状态值,唤醒等待队列中的waiter。当然,如果没有waiter,或低三位标志位中有一个不为0说明有其他g在处理了,直接返回。

sync.RWMutex

读写互斥锁不限制并行读,但是读写、写读、写写操作无法并行执行

1
2
3
4
5
6
7
type RWMutex struct {
w Mutex // 被正在写的g持有
writerSem uint32 // 写等待读信号量
readerSem uint32 // 读等待写信号量
readerCount int32 // 正在读的数量
readerWait int32 // 写操作被阻塞时,等待读的数量
}

写锁使用sync.RWMutex.LockUnLock,读锁使用RLockRUnlock

Lock与UnLock

Lock中,先获取内置的互斥锁,获取成功后,其余竞争者g会陷入自旋或阻塞。atomic.AddInt32用于阻塞后续的读操作,如果仍有活跃的读操作g,那么当前写操作g会调用runtime.SemacquireMutex进入休眠状态等待全部读锁持有者结束后释放writerSem信号量,将当前g唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
// ...省略race检测
// 首先解决与其他写操作的竞争
rw.w.Lock()
// 通知读操作者,这是一个写操作
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 等待活跃的读操作执行完成
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
// SemacquireMutex 类似于 Semacquire,但用于分析争用的互斥体。如果 lifo 为真,则在等待队列的头部排队等待服务员。 skipframes 是跟踪期间要忽略的帧数,从 runtime_SemacquireMutex 的调用者开始计算。
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
// ...省略race检测代码
}

写锁的释放过程与加锁过程相反

  1. atomic.AddInt32将readerCount变为正数,释放读锁
  2. 通过for循环唤醒所有阻塞的读操作g
  3. 释放写锁

获取写锁时先阻塞写锁获取,后阻塞读锁获取,释放写锁时,先释放读锁唤醒读操作,后释放写锁。这种策略能够保证读操作不会被连续的写操作饿死

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
// ...省略race检测相关
// 通知所有读者,没有活跃的写操作了
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// 解锁不存在的读锁会throw
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 唤醒所有阻塞的读操作
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 解锁允许其他写操作者抢占
rw.w.Unlock()
// ...省略race检测相关
}

RLock与RUnlock

读锁的加锁方法不能用于递归读锁定,为不可重入锁,同时,Lock调用会阻止新的读者获取锁。其中只是将readerCount+1,如果返回了负数,说明其他g获得了写锁,当前g就会调用runtime_SemacquireMutex()陷入休眠等待写锁释放。如果返回了正数,则代表g没有获取写锁,当前方法返回成功

1
2
3
4
5
6
7
8
9
// RLock locks rw for reading.
func (rw *RWMutex) RLock() {
// ...省略检测race相关
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wamkit for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
// ...省略检测race相关
}

释放读锁的方法也很简单,atomic.AddInt32减少readerCount正在读资源的数量,如果返回大于等于0,则说明解锁成功,如果小于0,说明有一个正在执行的写操作,会调用rUnlockSlow进入slow path处理。rUnlockSlow会减少写操作等待的读操作数readerWait并在所有读操作释放后触发写操作的信号量writeSem,当该信号量触发时,调度器会唤醒尝试获取写锁的g

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
// ...省略检测race相关
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
// ...省略检测race相关
}

func (rw *RWMutex) rUnlockSlow(r int32) {
// 解锁不存在的读锁会throw
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

与Mutex

对于读操作而言主要是使用信号量限制,写操作则是使用互斥锁与信号量限制

  • 获取写锁时
    • 每次解锁读锁都会将readerCount-1,归零时说明没有读锁获取
    • 将readerCount减少rwmutexMaxReaders阻塞后续的读操作(将readerCount变为负数)
  • 释放写锁时
    • 先通知所有读操作
    • 将readerCount置为正数,释放写锁互斥锁

RWMutex在Mutex上提供了额外的细粒度控制,能在读操作远远多于写操作时提升性能。

sync.noCopy

sync.noCopy 是一个特殊的私有结构体,tools/go/analysis/passes/copylock 包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopy 或者实现了 Lock 和 Unlock 方法,如果包含该结构体或者实现了对应的方法就会报错

1
2
3
$ go vet proc.go./prog.go:10:10: assignment copies lock value to yawg: sync.WaitGroup
./prog.go:11:14: call of fmt.Println copies lock value: sync.WaitGroup
./prog.go:11:18: call of fmt.Println copies lock value: sync.WaitGroupv

semaTable

semaTable存储了可供g使用的信号量,是大小为251的数组。每一个元素存储了一个平衡树的根,节点是sudog类型,在使用时需要一个记录信号量数值的变量sema,根据它的地址映射到数组中的某个位置,找到对应的节点就找到对应信号的等待队列。

channel没有使用信号量,而是自己实现了一套排队逻辑

1
2
3
4
5
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}

sync.Once

sync.once文件内容很少,只有一个结构体与两个方法,其中sync.Once用于保证go程序运行期间的某段代码只执行一次,暴露出的Do方法用于执行给定的方法

在以下代码中,只会输出一次only once

1
2
3
4
5
6
7
8
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("only once")
})
}
}

输出结果如下:

1
2
$ go run main.go
only once

Once的结构体也很简单:

1
2
3
4
type Once struct {
done uint32 // 标识代码块是否执行过done
m Mutex // 互斥锁,用于保证原子性操作
}

看看Do方法的实现。在源代码的注释中说明了为什么不直接CAS设定值,if方法中调用函数,而要使用这种方式,是因为Do保证了当它返回时,f函数已经完成,而直接CAS后成功执行不成功返回是无法这样保证的。对于panic而言,defer保证了panic也会将done置为1,因此即使f调用中panic,依然算已经执行过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Do 调用函数 f 当且仅当 Do 为这个 Once 的实例第一次被调用时
func (o *Once) Do(f func()) {
// fast path,快速判断是否已执行。如果未执行则进入slow path
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
}

func (o *Once) doSlow(f func()) {
// 加锁保证原子性操作
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
// defer 保证了如果panic也能设置已完成
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

sync.Cond

Cond可以让一组goroutine在满足特定条件时被唤醒。在以下代码中同时运行了11个goroutine,其中10个goroutine通过sync.Cond.Wait()等待特定条件瞒住,1个goroutine通过sync.Cond.Broadcast()唤醒所有陷入等待的goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
)

var status int64

func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1 * time.Second)
go broadcast(c)

ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}

func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreInt64(&status, 1)
c.Broadcast()
c.L.Unlock()
}

func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
}
fmt.Println("listen")
c.L.Unlock()
}

运行结果如下:

1
2
3
4
5
6
7
8
9
10
listen
listen
listen
listen
listen
listen
listen
listen
listen
listen

结构体sync.Cond中包含了四个字段,最主要的还是notify。

1
2
3
4
5
6
type Cond struct {
noCopy noCopy // 保证结构体不会在编译期间内被拷贝
L Locker // 用于保护内部的notify字段
notify notifyList // 一个goroutine链表,是实现同步机制的核心结构
checker copyChecker // 禁止运行期间发生的拷贝
}

notifyList维护了一个goroutine链表,以及不同状态的goroutine索引

1
2
3
4
5
6
7
type notifyList struct {
wait uint32 // 正在等待的goroutine索引
notify uint32 // 已通知到的goroutine索引
lock uintptr // key field of the mutex
head unsafe.Pointer // 指向链表头节点
tail unsafe.Pointer // 指向链表尾节点
}

在创建一个Cond时,必须传入一个mutex以关联这个Cond,保证这个Cond的同步属性。

1
2
3
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}

sync.Cond.Wait()

Wait()会使得当前goroutine陷入休眠,执行过程分为两个步骤:

  1. 调用runtime_notifyListAdd()将等待计数器+1并解锁
  2. 调用runtime_notifyListWait()等待其他goroutine的唤醒并加锁
1
2
3
4
5
6
7
8
9
10
11
12
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
// 即将wait以原子方式+1
return atomic.Xadd(&l.wait, 1) - 1
}

notifyListWait()则将当前goroutine封装为sudog,追加到goroutine通知链表的末尾,然后挂起当前goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)
// 如果已经被唤醒,直接返回
if less(t, l.notify) {
unlock(&l.lock)
return
}
// 封装为sudog,并入队
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// 挂起当前goroutine
// 让出当前cpu,并等待scheduler唤醒
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}

Signal()

Signal()会唤醒队列最前面的goroutine,核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}

func notifyListNotifyOne(l *notifyList) {
// fast path:如果在上次signal后,没有新的waiter,不需要锁了,直接返回即可
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
t := l.notify
// 在加锁的情况下recheck
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
atomic.Store(&l.notify, t+1)
// 从头开始找到满足sudog.ticket == l.notify的goroutine,唤醒并返回
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}

Broadcast()

Broadcast()会唤醒所有满足条件的goroutine,这个唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}

func notifyListNotifyAll(l *notifyList) {
// fast path
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}

func readyWithTime(s *sudog, traceskip int) {
if s.releasetime != 0 {
s.releasetime = cputicks()
}
// Mark g ready to run.
goready(s.g, traceskip)
}

在条件长时间无法满足时,与使用for {}的忙等相比,sync.Cond能够让出处理器的使用权,提高 CPU 的利用率。

sync.Map

Go原生map不是线程安全的,在查找、赋值、遍历、删除的过程中都会检测写标志,一旦发现写标志置位(等于1),则直接 panic。而sync.Map则是并发安全的,读取、插入、删除都保持着常数级的时间复杂度,且sync.Map的零值是有效的,是一个空map。sync.Map更适用于读多写少的场景,写多的场景中会导致read map缓存失效,需要加锁,导致冲突增多,而且因为未命中read map次数变多,导致dirty map提升为read map,是一个O(N)的操作,会降低性能。

一般解决并发读写map的思路是加一把大锁,在读写的时候先进行加锁,或把一个map分成若干个小map,对key进行哈希操作,只操作对应的小map,前者锁粒度大,影响并发性能,而后者实现较为复杂,容易出错。

与原生map相比,sync.Map仅遍历的方式有些不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"sync"
)

func main() {
var m sync.Map
// 存放
m.Store("test1", 1)
m.Store("test2", 2)
// 取值
age, _ := m.Load("test1")
fmt.Println(age)
// 遍历
m.Range(func(key, value any) bool {
name := key.(string)
age := value.(int)
fmt.Println(name, age)
return true
})
// 删除
m.Delete("test1")
age, ok := m.Load("test1")
fmt.Println(age, ok)
// 读取或写入
m.LoadOrStore("test2", 3)
age, _ = m.Load("test2")
fmt.Println(age)
}

输出结果如下:

1
2
3
4
5
1
test2 2
test1 1
<nil> false
2

sync.Map的底层实现

由四个字段组成,其中mu互斥锁用于保护read和dirty字段。

1
2
3
4
5
6
type Map struct {
mu Mutex
read atomic.Value // 实际上存储的是readOnly,可以并发读
dirty map[any]*entry // 原生map,包含新写入的key,且包含read中所有被删除的key
misses int // 每次从read中读取失败就会自增misses,达到一定阈值后会将dirt提升为read
}

真正存储key/value的是read与dirty,但是它们存储的方式是不一样的,前者用atomic.Value,后者单纯使用原生map,原因是read用的是无锁操作,需要保证load/store的原子性,而dirty map的load+store操作是由mu互斥锁来保护的。

readOnly是一个支持原子性的存储的只读数据结构,底层也是一个原生map。其中entry包含了一个指针,指向value。dirty的value也是entry类型的,这里可以看出read和dirty各自维护了一套key,key指向的是同一个value,只要修改了entry,对read和dirty都是可见的。

1
2
3
4
5
6
7
8
type readOnly struct {
m map[any]*entry
amended bool // true if the dirty map contains some key not in m.
}

type entry struct {
p unsafe.Pointer // *interface{}
}

img

entry的指针p共有三种状态:

  1. p == nil:说明该key/value已被删除,且m.dirty == nil 或 m.dirty[k]指向该key
  2. p == expunged,说该key/value已被删除,且m.dirty不为nil,且m.dirty中没有这个key
  3. p 指向一个正常值:表示实际interface{}的地址,且被记录在m.read.m[key]中,如果此时m.dirty也不为nil,那么它也被记录在m.dirty[key]中,二者指向同一个地址。

当删除key时,sync.Map并不会真正地删除key,而是通过CAS将entry的p设置为nil,标记为被删除。如果之后创建m.dirty,p又会CAS设置为expunged,且不会复制到m.dirty中。

如果p不为expunged,和entry关联的value则可以被原子地更新,如果p为expunged,那么只有在它初次被设置到m.dirty后才能被更新。

Store()

expunged实际上是一个指向任意类型的指针,用于标记从dirty map中删除的entry。

1
var expunged = unsafe.Pointer(new(any))

Store直接看代码即可,如果key在read中,会先调用tryStore,使用for循环+CAS尝试更新entry,如果更新成功则直接返回。接下来要么read中没有这个key,要么key被标记为已删除了,需要先加锁再操作。

  1. 先去read中double check下,如果存在key,但p为expunged,说明m.dirty不为nil,且m.dirty不存在该key。此时将p状态设置为nil,将key插入dirty map中,更新对应value
  2. 如果read中没有此key,而dirty中有,直接更新对应value
  3. 如果read和dirty都没有该key,先看看dirty是否为空,为空就要创建一个dirty,并从read中复制没被删除的元素。然后更新amended标记为true,标识dirty中存在read没有的key,将key/value写入dirty map中。更新对应的value。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func (m *Map) Store(key, value any) {
// 如果read map中存在该key,则尝试直接修改
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// 如果read map中存在该key,但p为expunged,说明m.dirty不为nil且m.dirty不存在该key
// 此时将p的状态修改为nil,并在dirty map中插入key
m.dirty[key] = e
}
// 更新p指向value
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 如果read中不存在该key,但dirty map中存在该key,直接写入更新entry
// 此时read map中仍然没有该key
e.storeLocked(&value)
} else {
// read和dirty中都没有该key
// 如果dirty map为nil,需要创建dirty map,并从read map中复制未删除的元素到新创建的dirty map中
// 更新emended字段未true,表示dirty map中存在read map中没有的key
// 将key/value写入dirty map
if !read.amended {
// 添加第一个新key到dirty map中
// 此处先判断dirty map是否为空,若为空,则浅拷贝read map一次
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}

// 如果为空,则创建一个dirty map,并从read map中复制未删除的元素到新创建的dirty map中
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}

read, _ := m.read.Load().(readOnly)
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}

// CAS设置entry,当p为expunged时返回false
func (e *entry) tryStore(i *any) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}

// 确保entry没有被标记为已清除
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

Load()

流程比Store更简单。先从read中找,找到了直接调用entry.load(),否则看看amended,如果是false,说明dirty为空,直接返回nil和false,如果emended为true,说明dirty中可能存在要找的key。先上锁,然后double check从read找,还没找到就去dirty中找,不管dirty中找没找到,都得在missed记一下,在dirty被提升为read前都会走这条路。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (m *Map) Load(key any) (value any, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}

missLocked直接将misses+1,标识有一次未命中,如果未命中的次数小于m.dirty长度,直接返回,否则将m.dirty提升为read,并清空dirty和misses计数。这样之前一段时间加的key就会进到read中,提高read的命中率。

1
2
3
4
5
6
7
8
9
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}

entry.load()对p为nil和p为expunged的entry直接返回nil和false,否则将其转为interface{}返回。

1
2
3
4
5
6
7
func (e *entry) load() (value any, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*any)(p), true
}

Delete()

套路与Load()Store()相似,从read中查是否有这个key,如果有的话调用entry.delete(),将p置为nil。read中没找到的话,如果dirty不为nil,就去dirty中找,先上锁,再double check,还没在read中找到就去dirty中找,然后执行删除操作,再调用missLocked看看是否要将dirty上升到read,这里不管是否在dirty中找到都得标记一下misses。如果找到并删除了,调用entry.delete(),否则返回nil和false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Delete deletes the value for a key.
func (m *Map) Delete(key any) {
m.LoadAndDelete(key)
}

func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
// 不管是否存在都记一下misses
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}

entry.delete()也是CAS操作,将p置为nil,当判断p为nil或expunged时会直接返回nil和false。

1
2
3
4
5
6
7
8
9
10
11
func (e *entry) delete() (value any, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*any)(p), true
}
}
}

如果read中找到了key,仅仅是将p置为nil做一个标记,这样在仅有dirty中有这个key的时候才会直接删除这个key。这样的目的在于在下次查找这个key时会命中read,提升效率,如果只在dirty存在,read就无法起到缓存的作用,会直接删除。key本身是需要在missLocked前将key从dirty中删除,才能使其被垃圾回收。

LoadOrStore()

结合了Load和Store的功能,如果map存在该key,就返回对应value,否则将value设置给该key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) {
// fast path
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}

m.mu.Lock()
// 与store类似
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()

return actual, loaded
}

func (e *entry) tryLoadOrStore(i any) (actual any, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*any)(p), true, true
}
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*any)(p), true, true
}
}
}

Range()

参数需要传入一个函数,Range()在遍历时会将调用时刻所有key/value传给该函数,如果返回了false会停止遍历。由于会遍历所有key,是一个O(N)的操作,所以将dirty提升为read,将开销分摊开,提升了效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Map) Range(f func(key, value any) bool) {
read, _ := m.read.Load().(readOnly)
// dirty存在read没有的key
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
// double check
if read.amended {
// 将dirty提升为read
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
// O(N)遍历,当f返回false时停止
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}

参考