写在前面 Go语言是一门在语言层面支持用户级线程的高级语言,因此并发同步在Go程序编写中尤其重要,其中channel虽然作为并发控制的高级抽象,但它的底层就是依赖于sync标准库中的mutex来实现的,因此了解sync标准库是每一个Gopher的必备技能之一。
笔者使用的Go版本是1.18.1
sync.WaitGroup sync.WaitGroup使得多个并发执行的代码块在达到WaitGroup显式指定的同步条件后才得以继续执行Wait()
调用后的代码,即达到并发goroutine执行屏障的效果。
在以下代码中,我们希望达到多个goroutine异步执行完输出任务后,main goroutine才退出的效果,此时程序执行完毕。转换为实例,就是使得程序输出110,此处我们并不关心main()
中创建的两个goroutine之间的执行顺序。
1 2 3 4 5 6 7 8 9 10 11 12 13 package mainimport "fmt" func main () { go func () { fmt.Print(1 ) }() go func () { fmt.Print(1 ) }() fmt.Print(0 ) }
此时输出
与期望结果不符,程序运行完毕时只输出了0,这是因为goroutine的创建和调度需要时间,在两个goroutine创建期间,main goroutine已经输出了0,导致main函数结束,程序执行完毕。我们可以使用WaitGroup来完成上述需求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package mainimport ( "fmt" "sync" ) func main () { wg := sync.WaitGroup{} wg.Add(2 ) go func () { defer wg.Done() fmt.Print(1 ) }() go func () { defer wg.Done() fmt.Print(1 ) }() wg.Wait() fmt.Print(0 ) }
此时输出110
sync.WaitGroup
中记录了仍在并发执行的代码块的数量,Add()
相当于对这个数量执行+1操作,不难想到Done()
则为执行了Add(-1)
,事实确实如此。
1 2 3 4 func (wg *WaitGroup) Done () { wg.Add(-1 ) }
WaitGroup 在此之前,我们先明确设定状态 为运行计数与等待计数 ,信号 为信号计数 。
WaitGroup的结构体记录了运行计数,等待计数和信号计数三个uint32来对并发的goroutine进行不同目的的计数,在笔者使用版本中,其实是一个uint64与一个uint32。这里兼容了32位系统,因为在32位机器上无法实现对64位字段的原子操作 (64位字段相当于两个指令,无法同时完成)。在32位平台上,如果wg.state1
元素依然按照64位平台的顺序返回(waiter, counter, sema),那么wg.state1[0]
的内存地址是32位对齐的,不能保证一定是64位对齐的,就无法进行64位原子操作(后续需要对状态进行整体的原子更改atomic.AddUnit64
)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type WaitGroup struct { noCopy noCopy state1 uint64 state2 uint32 } func (wg *WaitGroup) state () (statep *uint64 , semap *uint32 ) { if unsafe.Alignof(wg.state1) == 8 || uintptr (unsafe.Pointer(&wg.state1))%8 == 0 { return &wg.state1, &wg.state2 } else { state := (*[3 ]uint32 )(unsafe.Pointer(&wg.state1)) return (*uint64 )(unsafe.Pointer(&state[1 ])), &state[0 ] } }
为了保证在32位系统上也能原子访问64位对齐的64位字,通过state()来消除高层上实现的差异 ,具体可以参考issue-6404 。由于在64位机器上,8字节是单个机器字的长度,内存地址对8取模可以判断该数据对象的内存地址是否为64位对齐,state()
对wg.state1
的内存地址对8进行取模来判断程序是允许在64位平台还是32位平台上,根据结果来返回信息
等于0返回: wg.state1
(wg.state1[0]
)和wg.state1[2]
的内存地址
不等于0(32位环境)返回:wg.state1[1]
和wg.state1[0]
的内存地址
state调整的前提:如果不能保证对8字节对齐,需要手动移位对齐,这里用了内存对齐的padding
在4字节对齐的环境中,8字节可能跨越了两个cache line,不保证64位的原子操作。
在32位架构中,WaitGroup在初始化的时候,分配内存地址的时候是随机的,所以WaitGroup结构体state1起始的位置不一定是64位对齐 ,可能会是:uintptr(unsafe.Pointer(&wg.state1))%8 = 4
,如果出现这样的情况,那么就需要用state1的第一个元素做padding + 4,这样操作后两组就能对8字节进行对齐了,用state1的后两个元素合并成uint64来表示statep,以下是一个小实验:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import ( "unsafe" ) type a struct { b byte } type w struct { state1 uint64 state2 uint32 } func main () { b := a{} println (unsafe.Sizeof(b), uintptr (unsafe.Pointer(&b)), uintptr (unsafe.Pointer(&b))%8 == 0 ) wg := w{} println (unsafe.Sizeof(wg), uintptr (unsafe.Pointer(&wg.state1)), uintptr (unsafe.Pointer(&wg.state1))%8 == 0 ) state := (*[3 ]uint32 )(unsafe.Pointer(&wg.state1)) println (unsafe.Sizeof(wg), uintptr (unsafe.Pointer(&state[1 ])), uintptr (unsafe.Pointer(&state[1 ]))%8 == 0 ) }
Wait() Wait()
主要用于阻塞g,直到WaitGroup的计数为0 。先获取访问计数值的指针(state,sema),在自旋循环体中,通过检查计数来检查目前还没有达成同步条件的并行代码块的数量,并且在每次完成检查后增加一次等待计数。此处没有使用密集循环来构造自旋锁等待,是处于性能考虑:为了保证其他goroutine能够得到充分调度 。如果每一次检查计数时没有达成同步条件,下次循环如果当前goroutine不主动让出CPU,会导致CPU空转,降低性能。这里用了runtime_Semacquire(semap)
,如果等待计数被成功记录,则直接增加信号量,挂起当前g,否则再进行一次循环获取最新的同步状态。
Wait()可以在不同的g上执行,且调用Wait()的g数量也可能不唯一,因此需要等待计数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (wg *WaitGroup) Wait () { statep, semap := wg.state() for { state := atomic.LoadUint64(statep) v := int32 (state >> 32 ) if v == 0 { return } if atomic.CompareAndSwapUint64(statep, state, state+1 ) { runtime_Semacquire(semap) if *statep != 0 { panic ("sync: WaitGroup is reused before previous Wait has returned" ) } return } } }
Add() Add()
不止是简单的将信号量增delta,还需要考虑很多因素
内部运行计数不能为负
Add必须与Wait属于happens before关系
毕竟Wait是同步屏障,没有Add,Wait就没有了意义
通过信号量通知所有正在等待的goroutine
先假设statep的高32位=1,代表有一个运行计数。当Add(-1)
时,statep的高32位+负数的补码32个1,会溢出1导致statep的高32位=0,即运行计数清零,Wait操作达成同步条件
具体过程如下:
通过state获取状态指针statep和信号指针semap,statep的高32位为counter,低32位为waiter
调用atomic.AddUint64()
将传入的delta左移四位加上statep,即counter+delta
counter可能为负,所以用int32来存值,waiter不可能为负,所以用uint32存值
经过一系列校验,counter为负则panic,w不等于0且delta>0且v值为delta,说明add在wait后调用,会panic,因为waitGroup不允许Wait方法调用后还调用add方法
v > 0或w != 0时直接return,此时不需要释放waiter
到了*statep != state,状态只能是waiter>0且counter==0,当waiter>0时,肯定不能add,且counter==0时,wait不会再自增waiter,结果一定是一致的,否则触发panic
将statep置为0,释放所有waiter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 func (wg *WaitGroup) Add (delta int ) { statep, semap := wg.state() state := atomic.AddUint64(statep, uint64 (delta)<<32 ) v := int32 (state >> 32 ) w := uint32 (state) if v < 0 { panic ("sync: negative WaitGroup counter" ) } if w != 0 && delta > 0 && v == int32 (delta) { panic ("sync: WaitGroup misuse: Add called concurrently with Wait" ) } if v > 0 || w == 0 { return } if *statep != state { panic ("sync: WaitGroup misuse: Add called concurrently with Wait" ) } *statep = 0 for ; w != 0 ; w-- { runtime_Semrelease(semap, false , 0 ) } } func (wg *WaitGroup) Done () { wg.Add(-1 ) }
WaitGroup的实现原理总结 WaitGroup内部维护了3个uint32(实际上是一个uint64,一个uint32),分别是状态和信号,状态包括了运行计数counter和等待计数waiter,信号指信号计数sema。运行计数代表了调用Add()
添加的delta值,等待计数代表了调用Wait()
陷入等待的goroutine的数量,信号量sema是runtime内部信号量的实现,用于挂起和唤醒goroutine。在Add()
的时候增加运行计数,Wait()
的时候增加等待计数,如果运行计数不为0,则将goroutine挂起,等到调用Done()
->Add(-1)使等待计数为0时会唤醒所有挂起的goroutine。
我觉得比较核心的地方在于3个uint32中兼容32位机器和64位机器的实现 。由于状态是64位的,需要进行64位原子操作更新,但是由于32位的环境只对4字节对齐,首字段可能不是对8字节对齐的,因此要用state进行兼容,如果不对8字节对齐,则将状态放在后面两位uint32中,前面一个4字节的作为padding,存放信号计数sema。如果对8字节对齐,状态直接放在前两位即可,这样就兼容了32位和64位对64位原子操作的支持。
sync.Pool sync.Pool也许应该叫sync.Cache,简单来说就是为了避免频繁分配、回收内存给GC带来负担 的cache,pool与连接池类似。sync.Pool可以将暂时不用的对象缓存起来,等到下次需要的时候直接使用 ,也不用再次经过内存分配,复用对象的内存,减轻了GC的压力,提升系统性能。
如果没有sync.Pool 多个goroutine都需同时创建一个对象时,如果goroutine数过多,会导致对象的创建数目递增,导致GC压力过大。形成’并发大->占用内存大->GC缓慢->处理并发能力降低->并发更大’的恶性循环。解决此问题的关键思想就在于对象的复用 ,避免重复创建、销毁。
以下是一个简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package mainimport ( "fmt" "sync" ) var pool *sync.Pooltype Person struct { Name string } func init () { pool = &sync.Pool{ New: func () interface {} { fmt.Println("creating a new person" ) return new (Person) }, } } func main () { person := pool.Get().(*Person) fmt.Println("Get Pool Object:" , person) person.Name = "first" pool.Put(person) fmt.Println("Get Pool Object:" , pool.Get().(*Person)) fmt.Println("Get Pool Object:" , pool.Get().(*Person)) }
输出结果如下
1 2 3 4 5 creating a new person Get Pool Object: &{} Get Pool Object: &{first} creating a new person Get Pool Object: &{}
在以上代码中,init()
创建了一个sync.Pool,实现的New()
方法为创建一个person对象,并打印一句话,main()
中调用了三次Get()和一次Put。根据输出结果看来,如果在调用Get()
时,pool中没有对象,那么就会调用New()
创建新的对象,否则会从pool中的对象获取。我们还可以看到,put到pool中的对象属性依然是之前设定的,并没有被重置。
sync.Pool广泛运用于各种场景,典型例子是fmt包中的print:
sync.Pool的底层实现 1 2 3 4 5 6 7 8 9 10 11 12 type Pool struct { noCopy noCopy local unsafe.Pointer localSize uintptr victim unsafe.Pointer victimSize uintptr New func () any }
noCopy代表这个结构体是禁止拷贝的,在使用go vet工具时生效。
local是一个poolLocal数组(其实是切片local := make([]poolLocal, size)
)的指针,localSize代表这个数组的大小,victim也是一个poolLocal数组的指针。
New函数在创建pool时设置,当pool中没有缓存对象时,会调用New方法生成一个新的对象。
在索引poolLocal时,P的id对应[P]poolLocal下标索引,这样在多个goroutine使用同一个pool时能减少竞争,提升了性能。在一轮GC到来时,victim和victimSize会分别接管local和localSize,victim机制用于减少GC后冷启动导致的性能抖动,使得分配对象更加平滑
Victim Cache 是计算机架构里的一个概念,是CPU硬件处理缓存的一种技术,sync.Pool引入它的目的在于降低GC压力的同时提高命中率。
poolLocal
这里得提到伪共享 问题。伪共享问题,就是在多核CPU架构下,为了满足数据一致性维护一致性协议MESI,频繁刷新同一cache line导致高速缓存并未起到应有的作用的问题。试想一下,两个独立线程要更新两个独立变量,但俩独立变量都在同一个cache line上 ,当前cache line是share状态。如果core0的thread0去更新cache line,会导致core1中的cache line状态变为Invalid,随后thread1去更新时必须通知core0将cache line刷回主存,然后它再从主从中load该cache line进高速缓存之后再进行修改,但是该修改又会使得core0的cache line失效,重复上述过程,导致高速缓存相当于没有一样,反而还因为频繁更新cache影响了性能。
这里poolLocal的字段pad就是用于防止伪共享问题,cache line在x86_64体系下一般是64字节
1 2 3 4 5 6 7 type poolLocal struct { poolLocalInternal pad [128 - unsafe.Sizeof(poolLocalInternal{})%128 ]byte }
poolLocal数组的大小是程序中P的数量,Pool的最大个数是runtime.GOMAXPROCS(0)
。
1 2 3 4 5 6 7 type poolLocalInternal struct { private any shared poolChain }
poolLocalInternal中private代表缓存了一个元素,只能由相应的一个P存取。因为一个P同时只能执行一个goroutine,因此不会有并发问题,使用时不需要加锁。
shared则可以由任意的P访问,但是只有本地的P才能pushHead或popHead,其他P可以popTail。
poolChain 看看poolChain的实现,这是一个双端队列的实现
其中poolDequeue是PoolQueue的一个实现,实现为单生产者多消费者的固定大小的无锁Ring式队列,通过atomic实现,底层存储用数组,head,tail标记。
生产者可以从head插入,tail删除,而消费者只能从tail删除。headTail变量通过位运算存储了head和tail的指针,分别指向队头与队尾。
poolChain没有使用完整的poolDequeue,而是封装了一层,这是因为它的大小是固定长度的,而pool则是不限制大小的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 type poolChain struct { head *poolChainElt tail *poolChainElt } type poolChainElt struct { poolDequeue next, prev *poolChainElt } type poolDequeue struct { headTail uint64 vals []eface } type eface struct { typ, val unsafe.Pointer }
由此图可以看到pool的整体结构
获取一个对象-Get() Get()
的过程清晰明了:
首先通过调用p.pin()
将当前goroutine与P绑定,禁止被抢占,返回当前P对应的poolLocal以及pid。
获取local的private赋给x,并置local的private为nil
判断x是否为空,若为空,则尝试从local的shared头部获取一个对象,赋值给x。如果x仍然为空,会调用getSlow()
从其他P的shared尾部偷取一个对象
调用runtime_procUnpin()
解除非抢占。
如果到此时还没有获取到对象,调用设置的New()
创建一个新对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (p *Pool) Get () any { ... l, pid := p.pin() x := l.private l.private = nil if x == nil { x, _ = l.shared.popHead() if x == nil { x = p.getSlow(pid) } } runtime_procUnpin() ... if x == nil && p.New != nil { x = p.New() } return x }
pin() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (p *Pool) pin () (*poolLocal, int ) { pid := runtime_procPin() s := runtime_LoadAcquintptr(&p.localSize) l := p.local if uintptr (pid) < s { return indexLocal(l, pid), pid } return p.pinSlow() } func indexLocal (l unsafe.Pointer, i int ) *poolLocal { lp := unsafe.Pointer(uintptr (l) + uintptr (i)*unsafe.Sizeof(poolLocal{})) return (*poolLocal)(lp) }
作用是将当前goroutine与P绑定在一起,禁止抢占 ,且返回对应poolLocal和P的id。
如果goroutine被抢占,那么g的状态会从running变为runnable,会被放回P的localq或globalq,等待下一次调度。但当goroutine下次再次执行时,就不一定和现在的P结合了,因为之后会用到pid,如果被抢占,可能接下来使用的pid与绑定的pid不是同一个。
绑定的逻辑主要在procPin()
中。它将当前gorotuine绑定的m上的locks字段+1,即完成了绑定。调度器执行调度时,又是会抢占当前执行goroutine所绑定的P,防止一个goroutine占用CPU过长时间。而判断一个goroutine能被被抢占的条件就是看m.locks是否为0,若为0,则可以被抢占。而在procPin()
中,m.locks+1,表示不能被抢占。
1 2 3 4 5 6 7 8 func procPin () int { _g_ := getg() mp := _g_.m mp.locks++ return int (mp.p.ptr().id) }
在p.pin()
中,获取到p.localSize和p.local后,如果当前pid小于p.localSize,则直接获取poolLocal数组中pid索引处的位置,否则说明Pool还没有创建poolLocal,调用p.pinSlow()
完成创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (p *Pool) pinSlow () (*poolLocal, int ) { runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() s := p.localSize l := p.local if uintptr (pid) < s { return indexLocal(l, pid), pid } if p.local == nil { allPools = append (allPools, p) } size := runtime.GOMAXPROCS(0 ) local := make ([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0 ])) runtime_StoreReluintptr(&p.localSize, uintptr (size)) return &local[pid], pid }
pinSlow()
在加锁的情况下进行重试,加全局锁创建一个poolLocal。整体过程如下:
popHead() 1 2 3 4 5 6 7 8 9 10 11 12 func (c *poolChain) popHead () (any, bool ) { d := c.head for d != nil { if val, ok := d.popHead(); ok { return val, ok } d = loadPoolChainElt(&d.prev) } return nil , false }
popHead()
只会被生产者调用。函数执行时先拿到头节点,如果不为空,则调用头节点的popHead()
。这俩popHead()
的实现不一致。poolDequeue的popHead()
移除并返回queue的头节点,如果queue为空,会返回false。此处queue中存储的对象就是Pool里缓存的对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (d *poolDequeue) popHead () (any, bool ) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) if tail == head { return nil , false } head-- ptrs2 := d.pack(head, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[head&uint32 (len (d.vals)-1 )] break } } val := *(*any)(unsafe.Pointer(slot)) if val == dequeueNil(nil ) { val = nil } *slot = eface{} return val, true }
回到poolChain.popHead()
,如果获取成功则直接返回,否则继续尝试。
getSlow() getSlow()
在shared没有获取到缓存对象的情况下,会尝试从其他P的poolLocal偷取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (p *Pool) getSlow (pid int ) any { size := runtime_LoadAcquintptr(&p.localSize) locals := p.local for i := 0 ; i < int (size); i++ { l := indexLocal(locals, (pid+i+1 )%int (size)) if x, _ := l.shared.popTail(); x != nil { return x } } size = atomic.LoadUintptr(&p.victimSize) if uintptr (pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) if x := l.private; x != nil { l.private = nil return x } for i := 0 ; i < int (size); i++ { l := indexLocal(locals, (pid+i)%int (size)) if x, _ := l.shared.popTail(); x != nil { return x } } atomic.StoreUintptr(&p.victimSize, 0 ) return nil }
回到Get()
,如果实在偷不到,后面会通过New()
创建一个新的对象。
popTail() popTail()
将queue尾部元素弹出,类似popHead()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func (c *poolChain) popTail () (any, bool ) { d := loadPoolChainElt(&c.tail) if d == nil { return nil , false } for { d2 := loadPoolChainElt(&d.next) if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { return nil , false } if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { storePoolChainElt(&d2.prev, nil ) } d = d2 } }
底层用的还是poolDequeue的popTail()
,与寻常实现大体类似,也是CAS。因为要移除尾部元素,所以tail自增1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (d *poolDequeue) popTail () (any, bool ) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) if tail == head { return nil , false } ptrs2 := d.pack(head, tail+1 ) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[tail&uint32 (len (d.vals)-1 )] break } } val := *(*any)(unsafe.Pointer(slot)) if val == dequeueNil(nil ) { val = nil } slot.val = nil atomic.StorePointer(&slot.typ, nil ) return val, true }
存放一个对象-Put() Put()
将对象添加到Pool中,主要过程如下:
绑定当前goroutine与P,然后尝试将x赋值给private
如果失败,则将其放入local shared的头部
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (p *Pool) Put (x any) { if x == nil { return } ... l, _ := p.pin() if l.private == nil { l.private = x x = nil } if x != nil { l.shared.pushHead(x) } runtime_procUnpin() }
pushHead() 如果头节点为空,则初始化一下,默认大小为8。然后调用poolDequeue.pushHead()
将其push到队列中,如果失败,则说明队列已满,会创建一个两倍大小的dequeue,然后再次调用poolDequeue.pushHead()
。由于前面g与p已经绑定了,所以不会有竞态条件,这里只需要一次重试就行了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 const dequeueBits = 32 const dequeueLimit = (1 << dequeueBits) / 4 func (c *poolChain) pushHead (val any) { d := c.head if d == nil { const initSize = 8 d = new (poolChainElt) d.vals = make ([]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } if d.pushHead(val) { return } newSize := len (d.vals) * 2 if newSize >= dequeueLimit { newSize = dequeueLimit } d2 := &poolChainElt{prev: d} d2.vals = make ([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) d2.pushHead(val) }
底层用的还是poolDequeue的pushHead()
,他将val添加到队列头部,如果队列满了会返回false,走刚才创建新一个两倍大小队列的路,这个函数只能被一个生产者调用,因此不会有竞态条件。首先通过位运算判断队列是否已满,将tail加上当前dequeue内节点的数量,即d.vals的长度,再取低31位,看看它与head是否相等,相等的话就说明队列满了,如果满了直接返回false。否则通过head找到即将填充的slot位置,去head指针的低31位,判断是否有另一个goroutine在popTail这个slot,如果有则返回false。这里是判断typ是否为空,因为popTail是先设置val,再将typ设置为nil的。最后将val赋值给slot,自增head。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (d *poolDequeue) pushHead (val any) bool { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) if (tail+uint32 (len (d.vals)))&(1 <<dequeueBits-1 ) == head { return false } slot := &d.vals[head&uint32 (len (d.vals)-1 )] typ := atomic.LoadPointer(&slot.typ) if typ != nil { return false } if val == nil { val = dequeueNil(nil ) } *(*any)(unsafe.Pointer(slot)) = val atomic.AddUint64(&d.headTail, 1 <<dequeueBits) return true }
在*(*any)(unsafe.Pointer(slot)) = val
中,由于slot是eface类型,先将slot转换为interface{}类型,这样val就能以interface{}类型赋值给slot,使得slot.typ和slot.val指向其内存块,slot的typ, val均不为空。
pack() 再来看看pack()
和unpack()
,它们的作用是打包和解包head和tail俩指针。实际上很简单,pack()
就是将head左移32位,或上tail与低31位全1,返回整合成的uint64。
1 2 3 4 5 func (d *poolDequeue) pack (head, tail uint32 ) uint64 { const mask = 1 <<dequeueBits - 1 return (uint64 (head) << dequeueBits) | uint64 (tail&mask) }
unpack()
则将整合的uint64右移32位与上低31位全1,得到head,而tail则是低32位与上低31位全1。
1 2 3 4 5 6 func (d *poolDequeue) unpack (ptrs uint64 ) (head, tail uint32 ) { const mask = 1 <<dequeueBits - 1 head = uint32 ((ptrs >> dequeueBits) & mask) tail = uint32 (ptrs & mask) return }
GC Pool实际上也不能无限扩展,否则会因为对象占用内存过多导致OOM。在pool.go的init()
中,注册了GC发生时如何清理Pool的函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 func init () { runtime_registerPoolCleanup(poolCleanup) } var poolcleanup func () func sync_runtime_registerPoolCleanup (f func () ) { poolcleanup = f }
实际上调用的是pool.poolCleanup()
,主要是将local与victim进行交换,不至于让GC将所有的Pool都清空,有victim兜底,需要两个GC周期才会被释放。如果sync.Pool的获取、释放速度稳定,就不会有新的Pool对象进行分配,如果获取的速度下降,那么对象可能会在两个GC周期内被释放,而不是以前的一个GC周期。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func poolCleanup () { for _, p := range oldPools { p.victim = nil p.victimSize = 0 } for _, p := range allPools { p.victim = p.local p.victimSize = p.localSize p.local = nil p.localSize = 0 } oldPools, allPools = allPools, nil }
下面模拟一下调用poolCleanup()
前后 ,oldPools,allPools与p.victim的变化:
初始时oldPools与allPools都为nil
第一次调用Get()
,因为p.local为nil,会通过pinSlow()
创建p.local,将p放入allPools,此时allPools长度为1,oldPools为nil
对象使用完毕,调用Put()
放回对象
第一次GC STW,allPools中所有p.local赋值给victim,并置为nil。allPools赋值给oldPools,置为nil。此时oldPools长度为1,allPools为nil
第二次调用Get()
,由于p.local为nil,会尝试从p.victim中获取对象。
对象使用完毕,调用Put()
放回对象。由于p.local为nil,会重新创建p.local,并放回对象,此时allPools长度为1,oldPools长度也为1
第二次GC STW,oldPools中所有p.victim置为nil,前一次cache在本次GC时被回收,allPools中所有p.local将值赋值给victim并置为nil。最后allPools为nil,oldPools长度为1。
从以上可以看出,p.victim的定位是次级缓存 ,在GC时将对象放到其中,下次GC来临前,如果有Get()
调用则从其中获取,直到再一次GC到来时回收。从victim中取出的对象并不放回victim中,一定程度上也减小了下一次GC的开销,使得原先一次GC的开销被拉长到两次,有一定程度的开销减小。
sync.Mutex Mutex是公平互斥锁
每个g去获取锁的时候都会尝试自旋几次 ,如果没有获取到则进入等待队列尾部(先入先出 FIFO)。当持有锁的g释放锁时,位于等待队列头部的g会被唤醒,但是需要与后来g竞争,当然竞争不过,因为后来g运行在cpu上处于自旋状态,且后来g会有很多,而刚唤醒的g只有一个,只能被迫重新插回头部。当等待的g本次加锁等待时间超过1ms都没有获得锁时,它会将当前Mutex从正常模式切换为饥饿模式 ,Mutex所有权会直接从释放锁的g上直接传给队头的g,后来者不自旋也不会尝试获取锁,会直接进入等待队列尾部。
当发生以下两种情况时Mutex会从饥饿模式切换为正常模式
获取到锁的g刚来,等待时间小于1ms
该g是等待队列中最后一个g
饥饿模式下不再尝试自旋,所有g都要排队,严格先来后到,可以防止尾端延迟
互斥锁state的最低三位分别标识mutexLocked 、mutexWoken 和mutexStarving ,剩余位置用于标识当前有多少个g在等待互斥锁释放
mutexLocked
— 表示互斥锁的锁定状态;
mutexWoken
— 表示从正常模式被唤醒;
mutexStarving
— 当前的互斥锁进入饥饿状态;
waitersCount
— 当前互斥锁上等待的 Goroutine 个数;
1 2 3 4 5 6 7 8 9 10 type Mutex struct { state int32 sema uint32 }
看看lock与unlock。lock会锁住Mutex,如果这个锁在被使用,那么调用的g会被阻塞直到这个互斥锁被释放。当锁state为0时,会将mutexLocked位置置为1,当state不是0时,会调用sync.Mutex.lockSlow()
尝试通过自旋等方式来等待锁的释放。
自旋是一种多线程同步机制,当前进程在进入自旋的过程中会一直保持对CPU的占用,持续检查某个条件是否为真,在多核CPU上,自旋可以避免goroutine的切换,某些情况下能对显著提升性能。g在进入自旋的需要满足的条件如下
互斥锁只有在普通模式才能进入自旋
runtime.sync_runtime_canSpin()
返回true
在多CPU机器上
当前g为了获取该锁进入自旋次数少于4次
GOMAXPROCS > 1,至少一个其他P在running,且当前p本地runq为空
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 func (m *Mutex) Lock () { if atomic.CompareAndSwapInt32(&m.state, 0 , mutexLocked) { ... race检测相关 return } m.lockSlow() } func (m *Mutex) lockSlow () { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new &mutexWoken == 0 { throw("sync: inconsistent mutex state" ) } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new ) { if old&(mutexLocked|mutexStarving) == 0 { break } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1 ) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state" ) } delta := int32 (mutexLocked - 1 <<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } } const ( active_spin = 4 ) func sync_runtime_canSpin (i int ) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32 (sched.npidle+sched.nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
unlock的过程比lock的过程简单,fast path通过去除mutexLocked标志位来快速解锁,如果失败,则进入slow path。slow path先判断是否已经被解锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func (m *Mutex) Unlock () { ...race检测相关 new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new ) } } func (m *Mutex) unlockSlow (new int32 ) { if (new +mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex" ) } if new &mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1 <<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new ) { runtime_Semrelease(&m.sema, false , 1 ) return } old = m.state } } else { runtime_Semrelease(&m.sema, true , 1 ) } }
Mutex的实现原理总结 Mutex底层是CAS实现的,内部维护了一个int32的状态state,用于标识锁状态,以及一个uint32的信号量sema用于挂起和阻塞goroutine。Mutex分为正常模式和饥饿模式,不同模式下Lock()
和UnLock()
的对加锁、解锁处理方式不同。Mutex中的state第4位及其高位用于存放等待计数,低三位的第一位表示锁状态,第二位表示从正常模式被唤醒,第三位表示进入饥饿模式。
Lock()先通过CAS置state为1,如果失败,则进入slow path处理:
先判断是否可以自旋,饥饿模式下无法自旋
如果是正常模式,且可以自旋(运行在多CPU机器上、当前g为了争取该锁进入自旋的次数少于4、当前机器上至少有个正在运行的P且runq为空),尝试进行自旋准备:通知运行的goroutine不要唤醒其他挂起的gorotuine,解锁时直接让当前g获取锁即可。然后调用runtime_doSpin()
进入自旋,执行30次PAUSE指令占用CPU,递增自旋次数,重新计算状态
计算锁状态
使用CAS更新状态
成功获取锁:返回
判断等待时间是否为0,如果是0则放在队尾,如果非0则放在头部,进入阻塞
唤醒
锁是否要进入饥饿状态:等待时间超过1ms
重新获取锁状态
判断是否处于饥饿状态
是则可以直接获取锁:自减等待计数,设置状态获取锁,如果starving不为饥饿,或等待时间没有超过1ms,或者只有一个g在等待队列中,满足任一条件则切换为正常状态
否:再次循环抢占
UnLock()
先CAS置锁状态最低位为0,如果返回结果不为0,进入slow path:
也是分别对正常模式和饥饿模式两种进行分别处理,饥饿模式下将锁的所有权直接移交给等待队列头的g,并让出时间片,以便于它可以立即开始运行。
正常模式下,通过CAS更新状态值,唤醒等待队列中的waiter。当然,如果没有waiter,或低三位标志位中有一个不为0说明有其他g在处理了,直接返回。
sync.RWMutex 读写互斥锁不限制并行读,但是读写、写读、写写操作无法并行执行
1 2 3 4 5 6 7 type RWMutex struct { w Mutex writerSem uint32 readerSem uint32 readerCount int32 readerWait int32 }
写锁使用sync.RWMutex.Lock
与UnLock
,读锁使用RLock
和RUnlock
Lock与UnLock Lock中,先获取内置的互斥锁,获取成功后,其余竞争者g会陷入自旋或阻塞。atomic.AddInt32
用于阻塞后续的读操作,如果仍有活跃的读操作g,那么当前写操作g会调用runtime.SemacquireMutex进入休眠状态等待全部读锁持有者结束后释放writerSem信号量,将当前g唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (rw *RWMutex) Lock () { rw.w.Lock() r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false , 0 ) } }
写锁的释放过程与加锁过程相反
atomic.AddInt32将readerCount变为正数,释放读锁
通过for循环唤醒所有阻塞的读操作g
释放写锁
获取写锁时先阻塞写锁获取,后阻塞读锁获取,释放写锁时,先释放读锁唤醒读操作,后释放写锁。这种策略能够保证读操作不会被连续的写操作饿死
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (rw *RWMutex) Unlock () { r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() throw("sync: Unlock of unlocked RWMutex" ) } for i := 0 ; i < int (r); i++ { runtime_Semrelease(&rw.readerSem, false , 0 ) } rw.w.Unlock() }
RLock与RUnlock 读锁的加锁方法不能用于递归读锁定,为不可重入锁,同时,Lock调用会阻止新的读者获取锁。其中只是将readerCount+1,如果返回了负数,说明其他g获得了写锁,当前g就会调用runtime_SemacquireMutex()
陷入休眠等待写锁释放。如果返回了正数,则代表g没有获取写锁,当前方法返回成功
1 2 3 4 5 6 7 8 9 func (rw *RWMutex) RLock () { if atomic.AddInt32(&rw.readerCount, 1 ) < 0 { runtime_SemacquireMutex(&rw.readerSem, false , 0 ) } }
释放读锁的方法也很简单,atomic.AddInt32减少readerCount正在读资源的数量,如果返回大于等于0,则说明解锁成功,如果小于0,说明有一个正在执行的写操作,会调用rUnlockSlow进入slow path处理。rUnlockSlow会减少写操作等待的读操作数readerWait并在所有读操作释放后触发写操作的信号量writeSem,当该信号量触发时,调度器会唤醒尝试获取写锁的g
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (rw *RWMutex) RUnlock () { if r := atomic.AddInt32(&rw.readerCount, -1 ); r < 0 { rw.rUnlockSlow(r) } } func (rw *RWMutex) rUnlockSlow (r int32 ) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex" ) } if atomic.AddInt32(&rw.readerWait, -1 ) == 0 { runtime_Semrelease(&rw.writerSem, false , 1 ) } }
与Mutex 对于读操作而言主要是使用信号量限制,写操作则是使用互斥锁与信号量限制
获取写锁时
每次解锁读锁都会将readerCount-1,归零时说明没有读锁获取
将readerCount减少rwmutexMaxReaders阻塞后续的读操作(将readerCount变为负数)
释放写锁时
先通知所有读操作
将readerCount置为正数,释放写锁互斥锁
RWMutex在Mutex上提供了额外的细粒度控制,能在读操作远远多于写操作时提升性能。
sync.noCopy sync.noCopy 是一个特殊的私有结构体,tools/go/analysis/passes/copylock 包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopy 或者实现了 Lock 和 Unlock 方法 ,如果包含该结构体或者实现了对应的方法就会报错
1 2 3 $ go vet proc.go./prog.go:10:10: assignment copies lock value to yawg: sync.WaitGroup ./prog.go:11:14: call of fmt.Println copies lock value: sync.WaitGroup ./prog.go:11:18: call of fmt.Println copies lock value: sync.WaitGroupv
semaTable semaTable存储了可供g使用的信号量,是大小为251的数组。每一个元素存储了一个平衡树的根,节点是sudog类型,在使用时需要一个记录信号量数值的变量sema,根据它的地址映射到数组中的某个位置,找到对应的节点就找到对应信号的等待队列。
channel没有使用信号量,而是自己实现了一套排队逻辑
1 2 3 4 5 type semaRoot struct { lock mutex treap *sudog nwait uint32 }
sync.Once sync.once文件内容很少,只有一个结构体与两个方法,其中sync.Once用于保证go程序运行期间的某段代码只执行一次,暴露出的Do方法用于执行给定的方法
在以下代码中,只会输出一次only once
1 2 3 4 5 6 7 8 func main () { o := &sync.Once{} for i := 0 ; i < 10 ; i++ { o.Do(func () { fmt.Println("only once" ) }) } }
输出结果如下:
1 2 $ go run main.go only once
Once的结构体也很简单:
1 2 3 4 type Once struct { done uint32 m Mutex }
看看Do方法的实现。在源代码的注释中说明了为什么不直接CAS设定值,if方法中调用函数,而要使用这种方式,是因为Do保证了当它返回时,f函数已经完成,而直接CAS后成功执行不成功返回是无法这样保证的。对于panic而言,defer保证了panic也会将done置为1,因此即使f调用中panic,依然算已经执行过。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (o *Once) Do (f func () ) { if atomic.LoadUint32(&o.done) == 0 { o.doSlow(f) } } func (o *Once) doSlow (f func () ) { o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1 ) f() } }
sync.Cond Cond可以让一组goroutine在满足特定条件时被唤醒。在以下代码中同时运行了11个goroutine,其中10个goroutine通过sync.Cond.Wait()
等待特定条件瞒住,1个goroutine通过sync.Cond.Broadcast()
唤醒所有陷入等待的goroutine。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package mainimport ( "fmt" "os" "os/signal" "sync" "sync/atomic" "time" ) var status int64 func main () { c := sync.NewCond(&sync.Mutex{}) for i := 0 ; i < 10 ; i++ { go listen(c) } time.Sleep(1 * time.Second) go broadcast(c) ch := make (chan os.Signal, 1 ) signal.Notify(ch, os.Interrupt) <-ch } func broadcast (c *sync.Cond) { c.L.Lock() atomic.StoreInt64(&status, 1 ) c.Broadcast() c.L.Unlock() } func listen (c *sync.Cond) { c.L.Lock() for atomic.LoadInt64(&status) != 1 { c.Wait() } fmt.Println("listen" ) c.L.Unlock() }
运行结果如下:
1 2 3 4 5 6 7 8 9 10 listen listen listen listen listen listen listen listen listen listen
结构体sync.Cond中包含了四个字段,最主要的还是notify。
1 2 3 4 5 6 type Cond struct { noCopy noCopy L Locker notify notifyList checker copyChecker }
notifyList维护了一个goroutine链表,以及不同状态的goroutine索引
1 2 3 4 5 6 7 type notifyList struct { wait uint32 notify uint32 lock uintptr head unsafe.Pointer tail unsafe.Pointer }
在创建一个Cond时,必须传入一个mutex以关联这个Cond,保证这个Cond的同步属性。
1 2 3 func NewCond (l Locker) *Cond { return &Cond{L: l} }
sync.Cond.Wait() Wait()
会使得当前goroutine陷入休眠,执行过程分为两个步骤:
调用runtime_notifyListAdd()
将等待计数器+1并解锁
调用runtime_notifyListWait()
等待其他goroutine的唤醒并加锁
1 2 3 4 5 6 7 8 9 10 11 12 func (c *Cond) Wait () { c.checker.check() t := runtime_notifyListAdd(&c.notify) c.L.Unlock() runtime_notifyListWait(&c.notify, t) c.L.Lock() } func notifyListAdd (l *notifyList) uint32 { return atomic.Xadd(&l.wait, 1 ) - 1 }
notifyListWait()
则将当前goroutine封装为sudog,追加到goroutine通知链表的末尾,然后挂起当前goroutine。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func notifyListWait (l *notifyList, t uint32 ) { lockWithRank(&l.lock, lockRankNotifyList) if less(t, l.notify) { unlock(&l.lock) return } s := acquireSudog() s.g = getg() s.ticket = t s.releasetime = 0 t0 := int64 (0 ) if blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if l.tail == nil { l.head = s } else { l.tail.next = s } l.tail = s goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3 ) if t0 != 0 { blockevent(s.releasetime-t0, 2 ) } releaseSudog(s) }
Signal() Signal()
会唤醒队列最前面的goroutine,核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func (c *Cond) Signal () { c.checker.check() runtime_notifyListNotifyOne(&c.notify) } func notifyListNotifyOne (l *notifyList) { if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } lockWithRank(&l.lock, lockRankNotifyList) t := l.notify if t == atomic.Load(&l.wait) { unlock(&l.lock) return } atomic.Store(&l.notify, t+1 ) for p, s := (*sudog)(nil ), l.head; s != nil ; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4 ) return } } unlock(&l.lock) }
Broadcast() Broadcast()
会唤醒所有满足条件的goroutine,这个唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (c *Cond) Broadcast () { c.checker.check() runtime_notifyListNotifyAll(&c.notify) } func notifyListNotifyAll (l *notifyList) { if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } lockWithRank(&l.lock, lockRankNotifyList) s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, atomic.Load(&l.wait)) unlock(&l.lock) for s != nil { next := s.next s.next = nil readyWithTime(s, 4 ) s = next } } func readyWithTime (s *sudog, traceskip int ) { if s.releasetime != 0 { s.releasetime = cputicks() } goready(s.g, traceskip) }
在条件长时间无法满足时,与使用for {}
的忙等相比,sync.Cond能够让出处理器的使用权,提高 CPU 的利用率。
sync.Map Go原生map不是线程安全的,在查找、赋值、遍历、删除的过程中都会检测写标志,一旦发现写标志置位(等于1),则直接 panic。而sync.Map则是并发安全的,读取、插入、删除都保持着常数级的时间复杂度,且sync.Map的零值是有效的,是一个空map。sync.Map更适用于读多写少的场景,写多的场景中会导致read map缓存失效,需要加锁,导致冲突增多,而且因为未命中read map次数变多,导致dirty map提升为read map,是一个O(N)的操作,会降低性能。
一般解决并发读写map的思路是加一把大锁,在读写的时候先进行加锁,或把一个map分成若干个小map,对key进行哈希操作,只操作对应的小map,前者锁粒度大,影响并发性能,而后者实现较为复杂,容易出错。
与原生map相比,sync.Map仅遍历的方式有些不同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package mainimport ( "fmt" "sync" ) func main () { var m sync.Map m.Store("test1" , 1 ) m.Store("test2" , 2 ) age, _ := m.Load("test1" ) fmt.Println(age) m.Range(func (key, value any) bool { name := key.(string ) age := value.(int ) fmt.Println(name, age) return true }) m.Delete("test1" ) age, ok := m.Load("test1" ) fmt.Println(age, ok) m.LoadOrStore("test2" , 3 ) age, _ = m.Load("test2" ) fmt.Println(age) }
输出结果如下:
1 2 3 4 5 1 test2 2 test1 1 <nil> false 2
sync.Map的底层实现 由四个字段组成,其中mu互斥锁用于保护read和dirty字段。
1 2 3 4 5 6 type Map struct { mu Mutex read atomic.Value dirty map [any]*entry misses int }
真正存储key/value的是read与dirty,但是它们存储的方式是不一样的,前者用atomic.Value,后者单纯使用原生map,原因是read用的是无锁操作,需要保证load/store的原子性,而dirty map的load+store操作是由mu互斥锁来保护的。
readOnly是一个支持原子性的存储的只读数据结构,底层也是一个原生map。其中entry包含了一个指针,指向value。dirty的value也是entry类型的,这里可以看出read和dirty各自维护了一套key,key指向的是同一个value,只要修改了entry,对read和dirty都是可见的。
1 2 3 4 5 6 7 8 type readOnly struct { m map [any]*entry amended bool } type entry struct { p unsafe.Pointer }
entry的指针p共有三种状态:
p == nil:说明该key/value已被删除,且m.dirty == nil 或 m.dirty[k]指向该key
p == expunged,说该key/value已被删除,且m.dirty不为nil,且m.dirty中没有这个key
p 指向一个正常值:表示实际interface{}的地址,且被记录在m.read.m[key]中,如果此时m.dirty也不为nil,那么它也被记录在m.dirty[key]中,二者指向同一个地址。
当删除key时,sync.Map并不会真正地删除key,而是通过CAS将entry的p设置为nil,标记为被删除。如果之后创建m.dirty,p又会CAS设置为expunged,且不会复制到m.dirty中。
如果p不为expunged,和entry关联的value则可以被原子地更新,如果p为expunged,那么只有在它初次被设置到m.dirty后才能被更新。
Store() expunged实际上是一个指向任意类型的指针,用于标记从dirty map中删除的entry。
1 var expunged = unsafe.Pointer(new (any))
Store直接看代码即可,如果key在read中,会先调用tryStore,使用for循环+CAS尝试更新entry,如果更新成功则直接返回。接下来要么read中没有这个key,要么key被标记为已删除了,需要先加锁再操作。
先去read中double check下,如果存在key,但p为expunged,说明m.dirty不为nil,且m.dirty不存在该key。此时将p状态设置为nil,将key插入dirty map中,更新对应value
如果read中没有此key,而dirty中有,直接更新对应value
如果read和dirty都没有该key,先看看dirty是否为空,为空就要创建一个dirty,并从read中复制没被删除的元素。然后更新amended标记为true,标识dirty中存在read没有的key ,将key/value写入dirty map中。更新对应的value。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 func (m *Map) Store (key, value any) { read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok && e.tryStore(&value) { return } m.mu.Lock() read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { if e.unexpungeLocked() { m.dirty[key] = e } e.storeLocked(&value) } else if e, ok := m.dirty[key]; ok { e.storeLocked(&value) } else { if !read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true }) } m.dirty[key] = newEntry(value) } m.mu.Unlock() } func (m *Map) dirtyLocked () { if m.dirty != nil { return } read, _ := m.read.Load().(readOnly) m.dirty = make (map [any]*entry, len (read.m)) for k, e := range read.m { if !e.tryExpungeLocked() { m.dirty[k] = e } } } func (e *entry) tryStore (i *any) bool { for { p := atomic.LoadPointer(&e.p) if p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } } } func (e *entry) unexpungeLocked () (wasExpunged bool ) { return atomic.CompareAndSwapPointer(&e.p, expunged, nil ) }
Load() 流程比Store更简单。先从read中找,找到了直接调用entry.load()
,否则看看amended,如果是false,说明dirty为空,直接返回nil和false,如果emended为true,说明dirty中可能存在要找的key。先上锁,然后double check从read找,还没找到就去dirty中找,不管dirty中找没找到,都得在missed记一下,在dirty被提升为read前都会走这条路。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (m *Map) Load (key any) (value any, ok bool ) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { e, ok = m.dirty[key] m.missLocked() } m.mu.Unlock() } if !ok { return nil , false } return e.load() }
missLocked直接将misses+1,标识有一次未命中,如果未命中的次数小于m.dirty长度,直接返回,否则将m.dirty提升为read,并清空dirty和misses计数。这样之前一段时间加的key就会进到read中,提高read的命中率。
1 2 3 4 5 6 7 8 9 func (m *Map) missLocked () { m.misses++ if m.misses < len (m.dirty) { return } m.read.Store(readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }
entry.load()
对p为nil和p为expunged的entry直接返回nil和false,否则将其转为interface{}返回。
1 2 3 4 5 6 7 func (e *entry) load () (value any, ok bool ) { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return nil , false } return *(*any)(p), true }
Delete() 套路与Load()
和Store()
相似,从read中查是否有这个key,如果有的话调用entry.delete()
,将p置为nil。read中没找到的话,如果dirty不为nil,就去dirty中找,先上锁,再double check,还没在read中找到就去dirty中找,然后执行删除操作,再调用missLocked看看是否要将dirty上升到read,这里不管是否在dirty中找到都得标记一下misses。如果找到并删除了,调用entry.delete()
,否则返回nil和false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (m *Map) Delete (key any) { m.LoadAndDelete(key) } func (m *Map) LoadAndDelete (key any) (value any, loaded bool ) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { e, ok = m.dirty[key] delete (m.dirty, key) m.missLocked() } m.mu.Unlock() } if ok { return e.delete () } return nil , false }
entry.delete()
也是CAS操作,将p置为nil,当判断p为nil或expunged时会直接返回nil和false。
1 2 3 4 5 6 7 8 9 10 11 func (e *entry) delete () (value any, ok bool ) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return nil , false } if atomic.CompareAndSwapPointer(&e.p, p, nil ) { return *(*any)(p), true } } }
如果read中找到了key,仅仅是将p置为nil做一个标记,这样在仅有dirty中有这个key的时候才会直接删除这个key。这样的目的在于在下次查找这个key时会命中read,提升效率,如果只在dirty存在,read就无法起到缓存的作用,会直接删除。key本身是需要在missLocked前将key从dirty中删除,才能使其被垃圾回收。
LoadOrStore() 结合了Load和Store的功能,如果map存在该key,就返回对应value,否则将value设置给该key。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 func (m *Map) LoadOrStore (key, value any) (actual any, loaded bool ) { read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok { actual, loaded, ok := e.tryLoadOrStore(value) if ok { return actual, loaded } } m.mu.Lock() read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { if e.unexpungeLocked() { m.dirty[key] = e } actual, loaded, _ = e.tryLoadOrStore(value) } else if e, ok := m.dirty[key]; ok { actual, loaded, _ = e.tryLoadOrStore(value) m.missLocked() } else { if !read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true }) } m.dirty[key] = newEntry(value) actual, loaded = value, false } m.mu.Unlock() return actual, loaded } func (e *entry) tryLoadOrStore (i any) (actual any, loaded, ok bool ) { p := atomic.LoadPointer(&e.p) if p == expunged { return nil , false , false } if p != nil { return *(*any)(p), true , true } ic := i for { if atomic.CompareAndSwapPointer(&e.p, nil , unsafe.Pointer(&ic)) { return i, false , true } p = atomic.LoadPointer(&e.p) if p == expunged { return nil , false , false } if p != nil { return *(*any)(p), true , true } } }
Range() 参数需要传入一个函数,Range()
在遍历时会将调用时刻所有key/value传给该函数,如果返回了false会停止遍历。由于会遍历所有key,是一个O(N)的操作,所以将dirty提升为read,将开销分摊开,提升了效率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (m *Map) Range (f func (key, value any) bool ) { read, _ := m.read.Load().(readOnly) if read.amended { m.mu.Lock() read, _ = m.read.Load().(readOnly) if read.amended { read = readOnly{m: m.dirty} m.read.Store(read) m.dirty = nil m.misses = 0 } m.mu.Unlock() } for k, e := range read.m { v, ok := e.load() if !ok { continue } if !f(k, v) { break } } }
参考