摘抄自极客时间课程

go并发编程实战课:https://b.geekbang.org/member/course/intro/100061801


检查程序的工具:




一、开篇词

学习 Go 并发编程,有哪些困难? 那学习 Go 并发会有哪些困难呢?

主要总结为 5 大问题。

  1. 在面对并发难题时,感觉无从下手,不知道该用什么并发原语来解决问题。
  2. 如果多个并发原语都可以解决问题,那么,究竟哪个是最优解呢?比如说是用互斥锁,还是用 Channel。
  3. 不知道如何编排并发任务。并发编程不像是传统的串行编程,程序的运行存在着很大的不确定性。这个时候,就会面临一个问题,怎么才能让相应的任务按照你设想的流程运行呢?
  4. 有时候,按照正常理解的并发方式去实现的程序,结果莫名其妙就 panic 或者死锁了,排查起来非常困难。
  5. 已知的并发原语都不能解决并发问题,程序写起来异常复杂,而且代码混乱,容易出错。

怎么提升 Go 并发编程能力?

学习这件事儿,最怕的就是不成体系,即使知识点之间是彼此独立的,也必定存在着联系。我们要做的,就是找出逻辑关系,拎出知识线。关于 Go 并发编程,可以分为两条主线,分别是知识主线和学习主线。具体是啥意思呢?可以看下面的这张知识地图。

从图中可以看到,在知识主线层面,这门课程的核心内容设计了 5 个模块:

  1. 基本并发原语:在这部分,将会介绍 Mutex、RWMutex、Waitgroup、Cond、Pool、Context 等标准库中的并发原语,这些都是传统的并发原语,在其它语言中也很常见,是我们在并发编程中常用的类型。
  2. 原子操作:在这部分,会介绍 Go 标准库中提供的原子操作。原子操作是其它并发原语的基础,学会了你就可以自己创造新的并发原语。
  3. Channel:Channel 类型是 Go 语言独特的类型,因为比较新,所以难以掌握。但是别怕,本文会带你全方位地学习 Channel 类型,你不仅能掌握它的基本用法,而且还能掌握它的处理场景和应用模式,避免踩坑。
  4. 扩展并发原语:目前来看,Go 开发组不准备在标准库中扩充并发原语了,但是还有一些并发原语应用广泛,比如信号量、SingleFlight、循环栅栏、ErrGroup 等。掌握了它们,就可以在处理一些并发问题时,取得事半功倍的效果。
  5. 分布式并发原语:分布式并发原语是应对大规模的应用程序中并发问题的并发类型。本文主要会介绍使用 etcd 实现的一些分布式并发原语,比如 Leader 选举、分布式互斥锁、分布式读写锁、分布式队列等,在处理分布式场景的并发问题时,特别有用。




二、基本并发原语

2.1 Mutex: 如何解决资源并发访问问题

说起并发访问问题,真是太常见了,比如多个 goroutine 并发更新同一个资源,像计数器;同时更新用户的账户信息;秒杀系统;往同一个 buffer 中并发写入数据等等。如果没有互斥控制,就会出现一些异常情况,比如计数器的计数不准确、用户的账户可能出现透支、秒杀系统出现超卖、buffer 中的数据混乱,等等,后果都很严重。

这些问题怎么解决呢?对,用互斥锁,那在 Go 语言里,就是 Mutex。 这节课,将会详细了解互斥锁的实现机制,以及 Go 标准库的互斥锁 Mutex 的基本使用方法。在后面的 3 小节里,还会讲解 Mutex 的具体实现原理、易错场景和一些拓展用法。

2.1.1 互斥锁的实现机制

互斥锁是并发控制的一个基本手段,是为了避免竞争而建立的一种并发控制机制。在学习它的具体实现原理前,我们要先搞懂一个概念,就是临界区

在并发编程中,如果程序中的一部分会被并发访问或修改,那么,为了避免并发访问导致的意想不到的结果,这部分程序需要被保护起来,这部分被保护起来的程序,就叫做临界区。

可以说,临界区就是一个被共享的资源,或者说是一个整体的一组共享资源,比如对数据库的访问、对某一个共享数据结构的操作、对一个 I/O 设备的使用、对一个连接池中的连接的调用,等等。

如果很多线程同步访问临界区,就会造成访问或操作错误,这当然不是我们希望看到的结果。所以,我们可以使用互斥锁,限定临界区只能同时由一个线程持有

当临界区由一个线程持有的时候,其它线程如果想进入这个临界区,就会返回失败,或者是等待。直到持有的线程退出临界区,这些等待线程中的某一个才有机会接着持有这个临界区。

互斥锁就很好地解决了资源竞争问题,有人也把互斥锁叫做排它锁。那在 Go 标准库中,它提供了 Mutex 来实现互斥锁这个功能。

根据 2019 年第一篇全面分析 Go 并发 Bug 的论文Understanding Real-World Concurrency Bugs in GoMutex 是使用最广泛的同步原语(Synchronization primitives,有人也叫做并发原语。我们在这个课程中根据英文直译优先用同步原语,但是并发原语的指代范围更大,还可以包括任务编排的类型,所以后面我们讲 Channel 或者扩展类型时也会用并发原语)。关于同步原语,并没有一个严格的定义,你可以把它看作解决并发问题的一个基础的数据结构。


在这门课的前两个模块,会讲互斥锁 Mutex、读写锁 RWMutex、并发编排 WaitGroup、条件变量 Cond、Channel 等同步原语。

所以,在这里,先说一下同步原语的适用场景。

  • 共享资源。并发地读写共享资源,会出现数据竞争(data race)的问题,所以需要 Mutex、RWMutex 这样的并发原语来保护。
  • 任务编排。需要 goroutine 按照一定的规律执行,而 goroutine 之间有相互等待或者依赖的顺序关系,我们常常使用 WaitGroup 或者 Channel 来实现。
  • 消息传递。信息交流以及不同的 goroutine 之间的线程安全的数据交流,常常使用 Channel 来实现。




2.1.2 Mutex 的基本使用方法

在正式看 Mutex 用法之前呢,先看一下:Locker 接口。

在 Go 的标准库中,package sync 提供了锁相关的一系列同步原语,这个 package 还定义了一个 Locker 的接口,Mutex 就实现了这个接口。

Locker 的接口定义了锁同步原语的方法集:

1
2
3
4
type Locker interface {
Lock()
Unlock()
}

可以看到,Go 定义的锁接口的方法集很简单,就是请求锁(Lock)和释放锁(Unlock)这两个方法,秉承了 Go 语言一贯的简洁风格。

但是,这个接口在实际项目应用得不多,因为我们一般会直接使用具体的同步原语,而不是通过接口。

我们这一讲介绍的 Mutex 以及后面会介绍的读写锁 RWMutex 都实现了 Locker 接口,所以首先把这个接口介绍了,让大家做到心中有数。


下面我们直接看 Mutex。

简单来说,互斥锁 Mutex 就提供两个方法 Lock 和 Unlock:进入临界区之前调用 Lock 方法,退出临界区的时候调用 Unlock 方法

1
2
func(m *Mutex)Lock()
func(m *Mutex)Unlock()

当一个 goroutine 通过调用 Lock 方法获得了这个锁的拥有权后, 其它请求锁的 goroutine 就会阻塞在 Lock 方法的调用上,直到锁被释放并且自己获取到了这个锁的拥有权


不加锁的例子:

我们创建了 10 个 goroutine,同时不断地对一个变量(count)进行加 1 操作,每个 goroutine 负责执行 10 万次的加 1 操作,我们期望的最后计数的结果是 10 * 100000 = 1000000 (一百万)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"sync"
)

func main() {
var count = 0
// 使用WaitGroup等待10个goroutine完成
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
// 对变量count执行10次加1
for j := 0; j < 100000; j++ {
count++
}
}()
}
// 等待10个goroutine完成
wg.Wait()
fmt.Println(count)
}

在这段代码中,我们使用 sync.WaitGroup 来等待所有的 goroutine 执行完毕后,再输出最终的结果。sync.WaitGroup 这个同步原语会在后面的课程中具体介绍,现在只需要知道,我们使用它来控制等待一组 goroutine 全部做完任务。

但是,每次运行,你都可能得到不同的结果,基本上不会得到理想中的一百万的结果。


这是为什么呢?

其实,这是因为,count++ 不是一个原子操作,它至少包含几个步骤,比如读取变量 count 的当前值,对这个值加 1,把结果再保存到 count 中。因为不是原子操作,就可能有并发的问题。

比如,10 个 goroutine 同时读取到 count 的值为 9527,接着各自按照自己的逻辑加 1,值变成了 9528,然后把这个结果再写回到 count 变量。但是,实际上,此时我们增加的总数应该是 10 才对,这里却只增加了 1,好多计数都被“吞”掉了。这是并发访问共享数据的常见错误。

1
2
3
4
// count++操作的汇编代码
MOVQ "".count(SB), AX
LEAQ 1(AX), CX
MOVQ CX, "".count(SB)

这个问题,有经验的开发人员还是比较容易发现的,但是,很多时候,并发问题隐藏得非常深,即使是有经验的人,也不太容易发现或者 Debug 出来。


针对这个问题,Go 提供了一个检测并发访问共享资源是否有问题的工具: race detector,它可以帮助我们自动发现程序有没有 data race 的问题。

Go race detector 是基于 Google 的 C/C++ sanitizers 技术实现的,编译器通过探测所有的内存访问,加入代码能监视对这些内存地址的访问(读还是写)。在代码运行的时候,race detector 就能监控到对共享变量的非同步访问,出现 race 的时候,就会打印出警告信息。

我们来看看这个工具怎么用。 在编译(compile)、测试(test)或者运行(run)Go 代码的时候,加上 race 参数,就有可能发现并发问题。比如在上面的例子中,我们可以加上 race 参数运行,检测一下是不是有并发问题。如果你 go run -race counter.go,就会输出警告信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
go run -race counter.go
==================
WARNING: DATA RACE
Read at 0x00c00010a038 by goroutine 14:
main.main.func1()
/Users/guowei.gong/Documents/workspace/projects/learning/demo/counter.go:18 +0x94

Previous write at 0x00c00010a038 by goroutine 6:
main.main.func1()
/Users/guowei.gong/Documents/workspace/projects/learning/demo/counter.go:18 +0xa4

Goroutine 14 (running) created at:
main.main()
/Users/guowei.gong/Documents/workspace/projects/learning/demo/counter.go:14 +0x74

Goroutine 6 (running) created at:
main.main()
/Users/guowei.gong/Documents/workspace/projects/learning/demo/counter.go:14 +0x74
==================
==================
WARNING: DATA RACE
Write at 0x00c00010a038 by goroutine 14:
main.main.func1()
/Users/guowei.gong/Documents/workspace/projects/learning/demo/counter.go:18 +0xa4

Previous write at 0x00c00010a038 by goroutine 6:
main.main.func1()
/Users/guowei.gong/Documents/workspace/projects/learning/demo/counter.go:18 +0xa4

Goroutine 14 (running) created at:
main.main()
/Users/guowei.gong/Documents/workspace/projects/learning/demo/counter.go:14 +0x74

Goroutine 6 (running) created at:
main.main()
/Users/guowei.gong/Documents/workspace/projects/learning/demo/counter.go:14 +0x74
==================
919480
Found 2 data race(s)
exit status 66
make: *** [run_race] Error 1

Process finished with exit code 2

这个警告不但会告诉你有并发问题,而且还会告诉你哪个 goroutine 在哪一行对哪个变量有写操作,同时,哪个 goroutine 在哪一行对哪个变量有读操作,就是这些并发的读写访问,引起了 data race。

例子中的 goroutine 10 对内存地址 0x00c000126010 有读的操作(counter.go 文件第 16 行),同时,goroutine 7 对内存地址 0x00c000126010 有写的操作(counter.go 文件第 16 行)。而且还可能有多个 goroutine 在同时进行读写,所以,警告信息可能会很长。

虽然这个工具使用起来很方便,但是,因为它的实现方式,只能通过真正对实际地址进行读写访问的时候才能探测,所以它并不能在编译的时候发现 data race 的问题。而且,在运行的时候,只有在触发了 data race 之后,才能检测到,如果碰巧没有触发(比如一个 data race 问题只能在 2 月 14 号零点或者 11 月 11 号零点才出现),是检测不出来的。

而且,把开启了 race 的程序部署在线上,还是比较影响性能的。运行 go tool compile -race -S counter.go,可以查看计数器例子的代码,重点关注一下 count++ 前后的编译后的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
0x002a 00042 (counter.go:13)    CALL    runtime.racefuncenter(SB)
......
0x0061 00097 (counter.go:14) JMP 173
0x0063 00099 (counter.go:15) MOVQ AX, "".j+8(SP)
0x0068 00104 (counter.go:16) PCDATA $0, $1
0x0068 00104 (counter.go:16) MOVQ "".&count+128(SP), AX
0x0070 00112 (counter.go:16) PCDATA $0, $0
0x0070 00112 (counter.go:16) MOVQ AX, (SP)
0x0074 00116 (counter.go:16) CALL runtime.raceread(SB)
0x0079 00121 (counter.go:16) PCDATA $0, $1
0x0079 00121 (counter.go:16) MOVQ "".&count+128(SP), AX
0x0081 00129 (counter.go:16) MOVQ (AX), CX
0x0084 00132 (counter.go:16) MOVQ CX, ""..autotmp_8+16(SP)
0x0089 00137 (counter.go:16) PCDATA $0, $0
0x0089 00137 (counter.go:16) MOVQ AX, (SP)
0x008d 00141 (counter.go:16) CALL runtime.racewrite(SB)
0x0092 00146 (counter.go:16) MOVQ ""..autotmp_8+16(SP), AX
......
0x00b6 00182 (counter.go:18) CALL runtime.deferreturn(SB)
0x00bb 00187 (counter.go:18) CALL runtime.racefuncexit(SB)
0x00c0 00192 (counter.go:18) MOVQ 104(SP), BP
0x00c5 00197 (counter.go:18) ADDQ $112, SP

在编译的代码中,增加了 runtime.racefuncenter、runtime.raceread、runtime.racewrite、runtime.racefuncexit 等检测 data race 的方法。通过这些插入的指令,Go race detector 工具就能够成功地检测出 data race 问题了。

总结一下,通过在编译的时候插入一些指令,在运行时通过这些插入的指令检测并发读写从而发现 data race 问题,就是这个工具的实现机制。


既然这个例子存在 data race 问题,我们就要想办法来解决它。这个时候,我们这节课的主角 Mutex 就要登场了,它可以轻松地消除掉 data race。 具体怎么做呢?下面,就结合这个例子,来具体讲一讲 Mutex 的基本用法。

我们知道,这里的共享资源是 count 变量,临界区是 count++,只要在临界区前面获取锁,在离开临界区的时候释放锁,就能完美地解决 data race 的问题了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"sync"
)

func main() {
// 互斥锁保护计数器
var mu sync.Mutex
// 计数器的值
var count = 0

// 辅助变量,用来确认所有的goroutine都完成
var wg sync.WaitGroup
wg.Add(10)

// 启动10个gourontine
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
// 累加10万次
for j := 0; j < 100000; j++ {
mu.Lock()
count++
mu.Unlock()
}
}()
}
wg.Wait()
fmt.Println(count)
}

如果你再运行一下程序,就会发现,data race 警告没有了,系统干脆地输出了 1000000

这里有一点需要注意:Mutex 的零值是还没有 goroutine 等待的未加锁的状态,所以你不需要额外的初始化,直接声明变量(如 var mu sync.Mutex)即可。


那 Mutex 还有哪些用法呢?

很多情况下,Mutex 会嵌入到其它 struct 中使用,比如下面的方式:

1
2
3
4
type Counter struct {
mu sync.Mutex
Count uint64
}

在初始化嵌入的 struct 时,也不必初始化这个 Mutex 字段,不会因为没有初始化出现空指针或者是无法获取到锁的情况。

有时候,我们还可以采用嵌入字段的方式。通过嵌入字段,你可以在这个 struct 上直接调用 Lock/Unlock 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
var counter Counter
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < 100000; j++ {
counter.Lock()
counter.Count++
counter.Unlock()
}
}()
}
wg.Wait()
fmt.Println(counter.Count)
}


type Counter struct {
sync.Mutex
Count uint64
}

如果嵌入的 struct 有多个字段,我们一般会把 Mutex 放在要控制的字段上面,然后使用空格把字段分隔开来

即使你不这样做,代码也可以正常编译,只不过,用这种风格去写的话,逻辑会更清晰,也更易于维护。

甚至,你还可以把获取锁、释放锁、计数加一的逻辑封装成一个方法,对外不需要暴露锁等逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func main() {
// 封装好的计数器
var counter Counter

var wg sync.WaitGroup
wg.Add(10)

// 启动10个goroutine
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
// 执行10万次累加
for j := 0; j < 100000; j++ {
counter.Incr() // 受到锁保护的方法
}
}()
}
wg.Wait()
fmt.Println(counter.Count())
}

// 线程安全的计数器类型
type Counter struct {
CounterType int
Name string

mu sync.Mutex
count uint64
}

// 加1的方法,内部使用互斥锁保护
func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}

// 得到计数器的值,也需要锁保护
func (c *Counter) Count() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}

在项目开发的初始阶段,我们可能并没有仔细地考虑资源的并发问题,因为在初始阶段,我们还不确定这个资源是否被共享。经过更加深入的设计,或者新功能的增加、代码的完善,这个时候,我们就需要考虑共享资源的并发问题了。

当然,如果你能在初始阶段预见到资源会被共享并发访问就更好了。 意识到共享资源的并发访问的早晚不重要,重要的是,一旦你意识到这个问题,你就要及时通过互斥锁等手段去解决。 比如 Docker issue 37583355173282630696等、kubernetes issue 7236171617等,都是后来发现的 data race 而采用互斥锁 Mutex 进行修复的。





2.2 Mutex:庖丁解牛看实现

本节把 Mutex 的架构演进分成了四个阶段,下面给你画了一张图来说明。

“初版”的 Mutex 使用一个 flag 来表示锁是否被持有,实现比较简单;后来照顾到新来的 goroutine,所以会让新的 goroutine 也尽可能地先获取到锁,这是第二个阶段,我们把它叫作“给新人机会”;那么,接下来就是第三阶段“多给些机会”,照顾新来的和被唤醒的 goroutine;但是这样会带来饥饿问题,所以目前又加入了饥饿的解决方案,也就是第四阶段“解决饥饿”。





2.2.1 初版的互斥锁

我们先来看怎么实现一个最简单的互斥锁。在开始之前,你可以先想一想,如果是你,你会怎么设计呢?

你可能会想到,可以通过一个 flag 变量,标记当前的锁是否被某个 goroutine 持有。如果这个 flag 的值是 1,就代表锁已经被持有,那么,其它竞争的 goroutine 只能等待;如果这个 flag 的值是 0,就可以通过 CAS(compare-and-swap,或者 compare-and-set)将这个 flag 设置为 1,标识锁被当前的这个 goroutine 持有了。

实际上,Russ Cox 在 2008 年提交的第一版 Mutex 就是这样实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// CAS操作,当时还没有抽象出atomic包
func cas(val *int32, old, new int32) bool
func semacquire(*int32)
func semrelease(*int32)
// 互斥锁的结构,包含两个字段
type Mutex struct {
key int32 // 锁是否被持有的标识
sema int32 // 信号量专用,用以阻塞/唤醒goroutine
}

// 保证成功在val上增加delta的值
func xadd(val *int32, delta int32) (new int32) {
for {
v := *val
if cas(val, v, v+delta) {
return v + delta
}
}
panic("unreached")
}

// 请求锁
func (m *Mutex) Lock() {
if xadd(&m.key, 1) == 1 { //标识加1,如果等于1,成功获取到锁
return
}
semacquire(&m.sema) // 否则阻塞等待
}

func (m *Mutex) Unlock() {
if xadd(&m.key, -1) == 0 { // 将标识减去1,如果等于0,则没有其它等待者
return
}
semrelease(&m.sema) // 唤醒其它阻塞的goroutine
}

这里呢,先简单补充介绍下刚刚提到的 CAS。

CAS 指令将给定的值和一个内存地址中的值进行比较,如果它们是同一个值,就使用新值替换内存地址中的值,这个操作是原子性的。那啥是原子性呢?如果你还不太理解这个概念,那么在这里只需要明确一点就行了,那就是原子性保证这个指令总是基于最新的值进行计算,如果同时有其它线程已经修改了这个值,那么,CAS 会返回失败

CAS 是实现互斥锁和同步原语的基础,我们很有必要掌握它。

好了,我们继续来分析下刚才的这段代码。

虽然当时的 Go 语法和现在的稍微有些不同,而且标准库的布局、实现和现在的也有很大的差异,但是,这些差异不会影响我们对代码的理解,因为最核心的结构体(struct)和函数、方法的定义几乎是一样的。

Mutex 结构体包含两个字段:

  • 字段 key:是一个 flag,用来标识这个排外锁是否被某个 goroutine 所持有,如果 key 大于等于 1,说明这个排外锁已经被持有;
  • 字段 sema:是个信号量变量,用来控制等待 goroutine 的阻塞休眠和唤醒。

调用 Lock 请求锁的时候,通过 xadd 方法进行 CAS 操作(第 24 行),xadd 方法通过循环执行 CAS 操作直到成功,保证对 key 加 1 的操作成功完成。如果比较幸运,锁没有被别的 goroutine 持有,那么,Lock 方法成功地将 key 设置为 1,这个 goroutine 就持有了这个锁;如果锁已经被别的 goroutine 持有了,那么,当前的 goroutine 会把 key 加 1,而且还会调用 semacquire 方法(第 27 行),使用信号量将自己休眠,等锁释放的时候,信号量会将它唤醒。

持有锁的 goroutine 调用 Unlock 释放锁时,它会将 key 减 1(第 31 行)。如果当前没有其它等待这个锁的 goroutine,这个方法就返回了。但是,如果还有等待此锁的其它 goroutine,那么,它会调用 semrelease 方法(第 34 行),利用信号量唤醒等待锁的其它 goroutine 中的一个。

所以,到这里,我们就知道了,初版的 Mutex 利用 CAS 原子操作,对 key 这个标志量进行设置。key 不仅仅标识了锁是否被 goroutine 所持有,还记录了当前持有和等待获取锁的 goroutine 的数量。

Mutex 的整体设计非常简洁,学习起来一点也没有障碍。但是,注意,要划重点了。

Unlock 方法可以被任意的 goroutine 调用释放锁,即使是没持有这个互斥锁的 goroutine,也可以进行这个操作。这是因为,Mutex 本身并没有包含持有这把锁的 goroutine 的信息,所以,Unlock 也不会对此进行检查。Mutex 的这个设计一直保持至今。

这就带来了一个有趣而危险的功能。为什么这么说呢?

你看,其它 goroutine 可以强制释放锁,这是一个非常危险的操作,因为在临界区的 goroutine 可能不知道锁已经被释放了,还会继续执行临界区的业务操作,这可能会带来意想不到的结果,因为这个 goroutine 还以为自己持有锁呢,有可能导致 data race 问题。

所以,我们在使用 Mutex 的时候,必须要保证 goroutine 尽可能不去释放自己未持有的锁,一定要遵循“谁申请,谁释放”的原则。在真实的实践中,我们使用互斥锁的时候,很少在一个方法中单独申请锁,而在另外一个方法中单独释放锁,一般都会在同一个方法中获取锁和释放锁。

如果你接触过其它语言(比如 Java 语言)的互斥锁的实现,就会发现这一点和其它语言的互斥锁不同,所以,如果是从其它语言转到 Go 语言开发的同学,一定要注意。

以前,我们经常会基于性能的考虑,及时释放掉锁,所以在一些 if-else 分支中加上释放锁的代码,代码看起来很臃肿。而且,在重构的时候,也很容易因为误删或者是漏掉而出现死锁的现象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Foo struct {
mu sync.Mutex
count int
}

func (f *Foo) Bar() {
f.mu.Lock()

if f.count < 1000 {
f.count += 3
f.mu.Unlock() // 此处释放锁
return
}

f.count++
f.mu.Unlock() // 此处释放锁
return
}

从 1.14 版本起,Go 对 defer 做了优化,采用更有效的内联方式,取代之前的生成 defer 对象到 defer chain 中,defer 对耗时的影响微乎其微了,所以基本上修改成下面简洁的写法也没问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (f *Foo) Bar() {
f.mu.Lock()
defer f.mu.Unlock()


if f.count < 1000 {
f.count += 3
return
}


f.count++
return
}

这样做的好处就是 Lock/Unlock 总是成对紧凑出现,不会遗漏或者多调用,代码更少。

但是,如果临界区只是方法中的一部分,为了尽快释放锁,还是应该第一时间调用 Unlock,而不是一直等到方法返回时才释放。

初版的 Mutex 实现之后,Go 开发组又对 Mutex 做了一些微调,比如把字段类型变成了 uint32 类型;调用 Unlock 方法会做检查;使用 atomic 包的同步原语执行原子操作等等,这些小的改动,都不是核心功能,你简单知道就行了,就不详细介绍了。

但是,初版的 Mutex 实现有一个问题:请求锁的 goroutine 会排队等待获取互斥锁。虽然这貌似很公平,但是从性能上来看,却不是最优的。因为如果我们能够把锁交给正在占用 CPU 时间片的 goroutine 的话,那就不需要做上下文的切换,在高并发的情况下,可能会有更好的性能。

接下来,我们就继续探索 Go 开发者是怎么解决这个问题的。





2.2.2 给新人机会

Go 开发者在 2011 年 6 月 30 日的 commit 中对 Mutex 做了一次大的调整,调整后的 Mutex 实现如下:

1
2
3
4
5
6
7
8
9
10
11
type Mutex struct {
state int32
sema uint32
}


const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)

虽然 Mutex 结构体还是包含两个字段,但是第一个字段已经改成了 state,它的含义也不一样了。

state 是一个复合型的字段,一个字段包含多个意义,这样可以通过尽可能少的内存来实现互斥锁。这个字段的第一位(最小的一位)来表示这个锁是否被持有,第二位代表是否有唤醒的 goroutine,剩余的位数代表的是等待此锁的 goroutine 数。所以,state 这一个字段被分成了三部分,代表三个数据。

请求锁的方法 Lock 也变得复杂了。复杂之处不仅仅在于对字段 state 的操作难以理解,而且代码逻辑也变得相当复杂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Mutex) Lock() {
// Fast path: 幸运case,能够直接获取到锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}

awoke := false
for {
old := m.state
new := old | mutexLocked // 新状态加锁
if old&mutexLocked != 0 {
new = old + 1<<mutexWaiterShift //等待者数量加一
}
if awoke {
// goroutine是被唤醒的,
// 新状态清除唤醒标志
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
if old&mutexLocked == 0 { // 锁原状态未加锁
break
}
runtime.Semacquire(&m.sema) // 请求信号量
awoke = true
}
}
}

首先是通过 CAS 检测 state 字段中的标志(第 3 行),如果没有 goroutine 持有锁,也没有等待持有锁的 gorutine,那么,当前的 goroutine 就很幸运,可以直接获得锁,这也是注释中的 Fast path 的意思。

如果不够幸运,state 不是零值,那么就通过一个循环进行检查。接下来的第 7 行到第 26 行这段代码虽然只有几行,但是理解起来却要费一番功夫,因为涉及到对 state 不同标志位的操作。这里的位操作以及操作后的结果和数值比较,并没有明确的解释,有时候你需要根据后续的处理进行推断。所以说,如果你充分理解了这段代码,那么对最新版的 Mutex 也会比较容易掌握了,因为你已经清楚了这些位操作的含义。

我们先前知道,如果想要获取锁的 goroutine 没有机会获取到锁,就会进行休眠,但是在锁释放唤醒之后,它并不能像先前一样直接获取到锁,还是要和正在请求锁的 goroutine 进行竞争。这会给后来请求锁的 goroutine 一个机会,也让 CPU 中正在执行的 goroutine 有更多的机会获取到锁,在一定程度上提高了程序的性能。

for 循环是不断尝试获取锁,如果获取不到,就通过 runtime.Semacquire(&m.sema) 休眠,休眠醒来之后 awoke 置为 true,尝试争抢锁。

代码中的第 10 行将当前的 flag 设置为加锁状态,如果能成功地通过 CAS 把这个新值赋予 state(第 19 行和第 20 行),就代表抢夺锁的操作成功了。

不过,需要注意的是,如果成功地设置了 state 的值,但是之前的 state 是有锁的状态,那么,state 只是清除 mutexWoken 标志或者增加一个 waiter 而已。

请求锁的 goroutine 有两类,一类是新来请求锁的 goroutine,另一类是被唤醒的等待请求锁的 goroutine。锁的状态也有两种:加锁和未加锁。用一张表格,来说明一下 goroutine 不同来源不同状态下的处理逻辑。


刚刚说的都是获取锁,接下来,我们再来看看释放锁。释放锁的 Unlock 方法也有些复杂,我们来看一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志
if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁
panic("sync: unlock of unlocked mutex")
}

old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者,或者有唤醒的waiter,或者锁原来已加锁
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken // 新状态,准备唤醒goroutine,并设置唤醒标志
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime.Semrelease(&m.sema)
return
}
old = m.state
}
}

下面来解释一下这个方法。

第 3 行是尝试将持有锁的标识设置为未加锁的状态,这是通过减 1 而不是将标志位置零的方式实现。第 4 到 6 行还会检测原来锁的状态是否已经未加锁的状态,如果是 Unlock 一个未加锁的 Mutex 会直接 panic。

不过,即使将加锁置为未加锁的状态,这个方法也不能直接返回,还需要一些额外的操作,因为还可能有一些等待这个锁的 goroutine(有时候本文也把它们称之为 waiter)需要通过信号量的方式唤醒它们中的一个。所以接下来的逻辑有两种情况。

第一种情况,如果没有其它的 waiter,说明对这个锁的竞争的 goroutine 只有一个,那就可以直接返回了;如果这个时候有唤醒的 goroutine,或者是又被别人加了锁,那么,无需我们操劳,其它 goroutine 自己干得都很好,当前的这个 goroutine 就可以放心返回了。

第二种情况,如果有等待者,并且没有唤醒的 waiter,那就需要唤醒一个等待的 waiter。在唤醒之前,需要将 waiter 数量减 1,并且将 mutexWoken 标志设置上,这样,Unlock 就可以返回了。

通过这样复杂的检查、判断和设置,我们就可以安全地将一把互斥锁释放了。

相对于初版的设计,这次的改动主要就是,新来的 goroutine 也有机会先获取到锁,甚至一个 goroutine 可能连续获取到锁,打破了先来先得的逻辑。但是,代码复杂度也显而易见。

虽然这一版的 Mutex 已经给新来请求锁的 goroutine 一些机会,让它参与竞争,没有空闲的锁或者竞争失败才加入到等待队列中。但是其实还可以进一步优化。我们接着往下看。





2.2.3 多给些机会

在 2015 年 2 月的改动中,如果新来的 goroutine 或者是被唤醒的 goroutine 首次获取不到锁,它们就会通过自旋(spin,通过循环不断尝试,spin 的逻辑是在runtime 实现的)的方式,尝试检查锁是否被释放。在尝试一定的自旋次数后,再执行原来的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (m *Mutex) Lock() {
// Fast path: 幸运之路,正好获取到锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}

awoke := false
iter := 0
for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
old := m.state // 先保存当前锁的状态
new := old | mutexLocked // 新状态设置加锁标志
if old&mutexLocked != 0 { // 锁还没被释放
if runtime_canSpin(iter) { // 还可以自旋
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
continue // 自旋,再次尝试请求锁
}
new = old + 1<<mutexWaiterShift
}
if awoke { // 唤醒状态
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
new &^= mutexWoken // 新状态清除唤醒标记
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 { // 旧状态锁已释放,新状态成功持有了锁,直接返回
break
}
runtime_Semacquire(&m.sema) // 阻塞等待
awoke = true // 被唤醒
iter = 0
}
}
}

这次的优化,增加了第 13 行到 21 行、第 25 行到第 27 行以及第 36 行。现在来解释一下主要的逻辑,也就是第 13 行到 21 行。

如果可以 spin 的话,第 9 行的 for 循环会重新检查锁是否释放。对于临界区代码执行非常短的场景来说,这是一个非常好的优化。因为临界区的代码耗时很短,锁很快就能释放,而抢夺锁的 goroutine 不用通过休眠唤醒方式等待调度,直接 spin 几次,可能就获得了锁。





2.2.4 解决饥饿

经过几次优化,Mutex 的代码越来越复杂,应对高并发争抢锁的场景也更加公平。但是你有没有想过,因为新来的 goroutine 也参与竞争,有可能每次都会被新来的 goroutine 抢到获取锁的机会,在极端情况下,等待中的 goroutine 可能会一直获取不到锁,这就是饥饿问题。

先前版本的 Mutex 遇到的也是同样的困境,“悲惨”的 goroutine 总是得不到锁。

Mutex 不能容忍这种事情发生。所以,2016 年 Go 1.9 中 Mutex 增加了饥饿模式,让锁变得更公平,不公平的等待时间限制在 1 毫秒,并且修复了一个大 Bug:总是把唤醒的 goroutine 放在等待队列的尾部,会导致更加不公平的等待时间。

之后,2018 年,Go 开发者将 fast path 和 slow path 拆成独立的方法,以便内联,提高性能。2019 年也有一个 Mutex 的优化,虽然没有对 Mutex 做修改,但是,对于 Mutex 唤醒后持有锁的那个 waiter,调度器可以有更高的优先级去执行,这已经是很细致的性能优化了。

为了避免代码过多,这里只列出当前的 Mutex 实现。想要理解当前的 Mutex,我们需要好好泡一杯茶,仔细地品一品了。

当然,现在的 Mutex 代码已经复杂得接近不可读的状态了,而且代码也非常长,删减后占了几乎三页纸。但是,作为第一个要详细介绍的同步原语,本文还是希望能更清楚地剖析 Mutex 的实现,向你展示它的演化和为了一个貌似很小的 feature 不得不将代码变得非常复杂的原因。

当然,你也可以暂时略过这一段,以后慢慢品,只需要记住,Mutex 绝不容忍一个 goroutine 被落下,永远没有机会获取锁。不抛弃不放弃是它的宗旨,而且它也尽可能地让等待较长的 goroutine 更有机会获取到锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
type Mutex struct {
state int32
sema uint32
}

const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving // 从state字段中分出一个饥饿标记
mutexWaiterShift = iota

starvationThresholdNs = 1e6
)

func (m *Mutex) Lock() {
// Fast path: 幸运之路,一下就获取到了锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
m.lockSlow()
}

func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false // 此goroutine的饥饿标记
awoke := false // 唤醒标记
iter := 0 // 自旋次数
old := m.state // 当前的锁的状态
for {
// 锁是非饥饿状态,锁还没被释放,尝试自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
continue
}
new := old
if old&mutexStarving == 0 {
new |= mutexLocked // 非饥饿状态,加锁
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // waiter数量加1
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving // 设置饥饿状态
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 新状态清除唤醒标记
}
// 成功设置新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 处理饥饿状态

// 如果以前就在队列里面,加入到队列头
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 唤醒之后检查锁是否应该处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果锁已经处于饥饿状态,直接抢到锁,返回
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 有点绕,加锁并且将waiter数减1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清除饥饿标记
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}

func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}

跟之前的实现相比,当前的 Mutex 最重要的变化,就是增加饥饿模式。第 12 行将饥饿模式的最大等待时间阈值设置成了 1 毫秒,这就意味着,一旦等待者等待的时间超过了这个阈值,Mutex 的处理就有可能进入饥饿模式,优先让等待者先获取到锁,新来的同学主动谦让一下,给老同志一些机会。

通过加入饥饿模式,可以避免把机会全都留给新来的 goroutine,保证了请求锁的 goroutine 获取锁的公平性,对于我们使用锁的业务代码来说,不会有业务一直等待锁不被处理。


那么,接下来的部分就是选学内容了。如果你还有精力,并且对饥饿模式很感兴趣,那就一起继续来挑战吧。如果你现在理解起来觉得有困难,也没关系,后面可以随时回来复习。

饥饿模式和正常模式
Mutex 可能处于两种操作模式下:正常模式和饥饿模式。

接下来我们分析一下 Mutex 对饥饿模式和正常模式的处理。

请求锁时调用的 Lock 方法中一开始是 fast path,这是一个幸运的场景,当前的 goroutine 幸运地获得了锁,没有竞争,直接返回,否则就进入了 lockSlow 方法。这样的设计,方便编译器对 Lock 方法进行内联,你也可以在程序开发中应用这个技巧。

正常模式下,waiter 都是进入先入先出队列,被唤醒的 waiter 并不会直接持有锁,而是要和新来的 goroutine 进行竞争。新来的 goroutine 有先天的优势,它们正在 CPU 中运行,可能它们的数量还不少,所以,在高并发情况下,被唤醒的 waiter 可能比较悲剧地获取不到锁,这时,它会被插入到队列的前面。如果 waiter 获取不到锁的时间超过阈值 1 毫秒,那么,这个 Mutex 就进入到了饥饿模式。

在饥饿模式下,Mutex 的拥有者将直接把锁交给队列最前面的 waiter。新来的 goroutine 不会尝试获取锁,即使看起来锁没有被持有,它也不会去抢,也不会 spin,它会乖乖地加入到等待队列的尾部。

如果拥有 Mutex 的 waiter 发现下面两种情况的其中之一,它就会把这个 Mutex 转换成正常模式:

  • 此 waiter 已经是队列中的最后一个 waiter 了,没有其它的等待锁的 goroutine 了;
  • 此 waiter 的等待时间小于 1 毫秒。

正常模式拥有更好的性能,因为即使有等待抢锁的 waiter,goroutine 也可以连续多次获取到锁。

饥饿模式是对公平性和性能的一种平衡,它避免了某些 goroutine 长时间的等待锁。在饥饿模式下,优先对待的是那些一直在等待的 waiter。

接下来,我们逐步分析下 Mutex 代码的关键行,彻底搞清楚饥饿模式的细节。 我们从请求锁(lockSlow)的逻辑看起。

第 9 行对 state 字段又分出了一位,用来标记锁是否处于饥饿状态。现在一个 state 的字段被划分成了阻塞等待的 waiter 数量、饥饿标记、唤醒标记和持有锁的标记四个部分。

第 25 行记录此 goroutine 请求锁的初始时间,第 26 行标记是否处于饥饿状态,第 27 行标记是否是唤醒的,第 28 行记录 spin 的次数。

第 31 行到第 40 行和以前的逻辑类似,只不过加了一个不能是饥饿状态的逻辑。它会对正常状态抢夺锁的 goroutine 尝试 spin,和以前的目的一样,就是在临界区耗时很短的情况下提高性能。

第 42 行到第 44 行,非饥饿状态下抢锁。怎么抢?就是要把 state 的锁的那一位,置为加锁状态,后续 CAS 如果成功就可能获取到了锁。

第 46 行到第 48 行,如果锁已经被持有或者锁处于饥饿状态,我们最好的归宿就是等待,所以 waiter 的数量加 1。

第 49 行到第 51 行,如果此 goroutine 已经处在饥饿状态,并且锁还被持有,那么,我们需要把此 Mutex 设置为饥饿状态。

第 52 行到第 57 行,是清除 mutexWoken 标记,因为不管是获得了锁还是进入休眠,我们都需要清除 mutexWoken 标记。

第 59 行就是尝试使用 CAS 设置 state。如果成功,第 61 行到第 63 行是检查原来的锁的状态是未加锁状态,并且也不是饥饿状态的话就成功获取了锁,返回。

第 67 行判断是否第一次加入到 waiter 队列。到这里,你应该就能明白第 25 行为什么不对 waitStartTime 进行初始化了,我们需要利用它在这里进行条件判断。

第 72 行将此 waiter 加入到队列,如果是首次,加入到队尾,先进先出。如果不是首次,那么加入到队首,这样等待最久的 goroutine 优先能够获取到锁。此 goroutine 会进行休眠。

第 74 行判断此 goroutine 是否处于饥饿状态。注意,执行这一句的时候,它已经被唤醒了。

第 77 行到第 88 行是对锁处于饥饿状态下的一些处理。

第 82 行设置一个标志,这个标志稍后会用来加锁,而且还会将 waiter 数减 1。

第 84 行,设置标志,在没有其它的 waiter 或者此 goroutine 等待还没超过 1 毫秒,则会将Mutex 转为正常状态。

第 86 行则是将这个标识应用到 state 字段上。

释放锁(Unlock)时调用的 Unlock 的 fast path 不用多少,所以我们主要看 unlockSlow 方法就行。

如果 Mutex 处于饥饿状态,第 123 行直接唤醒等待队列中的 waiter。

如果 Mutex 处于正常状态,如果没有 waiter,或者已经有在处理的情况了,那么释放就好,不做额外的处理(第 112 行到第 114 行)。

否则,waiter 数减 1,mutexWoken 标志设置上,通过 CAS 更新 state 的值(第 115 行到第 119 行)。


总结

“罗马不是一天建成的”,Mutex 的设计也是从简单设计到复杂处理逐渐演变的。初版的 Mutex 设计非常简洁,充分展示了 Go 创始者的简单、简洁的设计哲学。但是,随着大家的使用,逐渐暴露出一些缺陷,为了弥补这些缺陷,Mutex 不得不越来越复杂。

有一点值得我们学习的,同时也体现了 Go 创始者的哲学,就是他们强调 Go 语言和标准库的稳定性,新版本要向下兼容,用新的版本总能编译老的代码。Go 语言从出生到现在已经 10 多年了,这个 Mutex 对外的接口却没有变化,依然向下兼容,即使现在 Go 出了两个版本,每个版本也会向下兼容,保持 Go 语言的稳定性,你也能领悟他们软件开发和设计的思想。

还有一点,你也可以观察到,为了一个程序 20% 的特性,你可能需要添加 80% 的代码,这也是程序越来越复杂的原因。所以,最开始的时候,如果能够有一个清晰而且易于扩展的设计,未来增加新特性时,也会更加方便。





2.3 Mutex:4种易错场景大盘点

上一讲,我们一起领略了 Mutex 的架构演进之美,现在我们已经清楚 Mutex 的实现细节了。当前 Mutex 的实现貌似非常复杂,其实主要还是针对饥饿模式和公平性问题,做了一些额外处理。但是,我们在第一讲中已经体验过了,Mutex 使用起来还是非常简单的,毕竟,它只有 Lock 和 Unlock 两个方法,使用起来还能复杂到哪里去?

正常使用 Mutex 时,确实是这样的,很简单,基本不会有什么错误,即使出现错误,也是在一些复杂的场景中,比如跨函数调用 Mutex 或者是在重构或者修补 Bug 时误操作。但是,我们使用 Mutex 时,确实会出现一些 Bug,比如说忘记释放锁、重入锁、复制已使用了的 Mutex 等情况。那在这一讲中,我们就一起来看看使用 Mutex 常犯的几个错误,做到“Bug 提前知,后面早防范”。

使用 Mutex 常见的错误场景有 4 类,分别是

  • Lock/Unlock 不是成对出现、
  • Copy 已使用的 Mutex
  • 重入
  • 死锁

2.3.1 Lock/Unlock 不是成对出现

Lock/Unlock 没有成对出现,就意味着会出现死锁的情况,或者是因为 Unlock 一个未加锁的 Mutex 而导致 panic。

我们先来看看缺少 Unlock 的场景,常见的有三种情况:

  1. 代码中有太多的 if-else 分支,可能在某个分支中漏写了 Unlock;
  2. 在重构的时候把 Unlock 给删除了;
  3. Unlock 误写成了 Lock。

在这种情况下,锁被获取之后,就不会被释放了,这也就意味着,其它的 goroutine 永远都没机会获取到锁。

我们再来看缺少 Lock 的场景,这就很简单了,一般来说就是误操作删除了 Lock。 比如先前使用 Mutex 都是正常的,结果后来其他人重构代码的时候,由于对代码不熟悉,或者由于开发者的马虎,把 Lock 调用给删除了,或者注释掉了。比如下面的代码,mu.Lock() 一行代码被删除了,直接 Unlock 一个未加锁的 Mutex 会 panic:

1
2
3
4
5
func foo() {
var mu sync.Mutex
defer mu.Unlock()
fmt.Println("hello world!")
}




2.3.2 Copy 已使用的 Mutex

第二种误用是 Copy 已使用的 Mutex。在正式分析这个错误之前,先交代一个小知识点,那就是 Package sync 的同步原语在使用后是不能复制的。我们知道 Mutex 是最常用的一个同步原语,那它也是不能复制的。为什么呢?

原因在于,Mutex 是一个有状态的对象,它的 state 字段记录这个锁的状态。如果你要复制一个已经加锁的 Mutex 给一个新的变量,那么新的刚初始化的变量居然被加锁了,这显然不符合你的期望,因为你期望的是一个零值的 Mutex。关键是在并发环境下,你根本不知道要复制的 Mutex 状态是什么,因为要复制的 Mutex 是由其它 goroutine 并发访问的,状态可能总是在变化。

当然,你可能说,你说的我都懂,你的警告我都记下了,但是实际在使用的时候,一不小心就踩了这个坑,我们来看一个例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Counter struct {
sync.Mutex
Count int
}


func main() {
var c Counter
c.Lock()
defer c.Unlock()
c.Count++
foo(c) // 复制锁
}

// 这里Counter的参数是通过复制的方式传入的
func foo(c Counter) {
c.Lock()
defer c.Unlock()
fmt.Println("in foo")
}

第 12 行在调用 foo 函数的时候,调用者会复制 Mutex 变量 c 作为 foo 函数的参数,不幸的是,复制之前已经使用了这个锁,这就导致,复制的 Counter 是一个带状态 Counter。

怎么办呢?Go 在运行时,有死锁的检查机制(checkdead() 方法),它能够发现死锁的 goroutine。这个例子中因为复制了一个使用了的 Mutex,导致锁无法使用,程序处于死锁的状态。程序运行的时候,死锁检查机制能够发现这种死锁情况并输出错误信息,如下图中错误信息以及错误堆栈:


你肯定不想运行的时候才发现这个因为复制 Mutex 导致的死锁问题,那么你怎么能够及时发现问题呢?可以使用 vet 工具,把检查写在 Makefile 文件中,在持续集成的时候跑一跑,这样可以及时发现问题,及时修复。我们可以使用 go vet 检查这个 Go 文件:

你看,使用这个工具就可以发现 Mutex 复制的问题,错误信息显示得很清楚,是在调用 foo 函数的时候发生了 lock value 复制的情况,还告诉我们出问题的代码行数以及 copy lock 导致的错误。

那么,vet 工具是怎么发现 Mutex 复制使用问题的呢?我们简单分析一下。

检查是通过copylock分析器静态分析实现的。这个分析器会分析函数调用、range 遍历、复制、声明、函数返回值等位置,有没有锁的值 copy 的情景,以此来判断有没有问题。可以说,只要是实现了 Locker 接口,就会被分析。我们看到,下面的代码就是确定什么类型会被分析,其实就是实现了 Lock/Unlock 两个方法的 Locker 接口:

1
2
3
4
5
6
7
8
9
10
11
var lockerType *types.Interface

// Construct a sync.Locker interface type.
func init() {
nullary := types.NewSignature(nil, nil, nil, false) // func()
methods := []*types.Func{
types.NewFunc(token.NoPos, nil, "Lock", nullary),
types.NewFunc(token.NoPos, nil, "Unlock", nullary),
}
lockerType = types.NewInterface(methods, nil).Complete()
}

其实,有些没有实现 Locker 接口的同步原语(比如 WaitGroup),也能被分析。后面我们会介绍这种情况是怎么实现的。





2.3.3 重入

接下来,我们来讨论“重入”这个问题。在说这个问题前,我先解释一下个概念,叫“可重入锁”。

如果你学过 Java,可能会很熟悉 ReentrantLock,就是可重入锁,这是 Java 并发包中非常常用的一个同步原语。它的基本行为和互斥锁相同,但是加了一些扩展功能。

如果你没接触过 Java,也没关系,这里只是提一下,帮助会 Java 的同学对比来学。那下面来具体讲解可重入锁是咋回事儿。

当一个线程获取锁时,如果没有其它线程拥有这个锁,那么,这个线程就成功获取到这个锁。之后,如果其它线程再请求这个锁,就会处于阻塞等待的状态。但是,如果拥有这把锁的线程再请求这把锁的话,不会阻塞,而是成功返回,所以叫可重入锁(有时候也叫做递归锁)。只要你拥有这把锁,你可以可着劲儿地调用,比如通过递归实现一些算法,调用者不会阻塞或者死锁。

了解了可重入锁的概念,那我们来看 Mutex 使用的错误场景。划重点了:Mutex 不是可重入的锁

想想也不奇怪,因为 Mutex 的实现中没有记录哪个 goroutine 拥有这把锁。理论上,任何 goroutine 都可以随意地 Unlock 这把锁,所以没办法计算重入条件。

所以,一旦误用 Mutex 的重入,就会导致报错。下面是一个误用 Mutex 的重入例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func foo(l sync.Locker) {
fmt.Println("in foo")
l.Lock()
bar(l)
l.Unlock()
}


func bar(l sync.Locker) {
l.Lock()
fmt.Println("in bar")
l.Unlock()
}


func main() {
l := &sync.Mutex{}
foo(l)
}

写完这个 Mutex 重入的例子后,运行一下,你会发现类似下面的错误。程序一直在请求锁,但是一直没有办法获取到锁,结果就是 Go 运行时发现死锁了,没有其它地方能够释放锁让程序运行下去,你通过下面的错误堆栈信息就能定位到哪一行阻塞请求锁。

学到这里,你可能要问了,虽然标准库 Mutex 不是可重入锁,但是如果我就是想要实现一个可重入锁,可以吗?

可以,那我们就自己实现一个。这里的关键就是,实现的锁要能记住当前是哪个 goroutine 持有这个锁。以下提供两个方案。

  • 方案一:通过 hacker 的方式获取到 goroutine id,记录下获取锁的 goroutine id,它可以实现 Locker 接口。
  • 方案二:调用 Lock/Unlock 方法时,由 goroutine 提供一个 token,用来标识它自己,而不是我们通过 hacker 的方式获取到 goroutine id,但是,这样一来,就不满足 Locker 接口了。

可重入锁(递归锁)解决了代码重入或者递归调用带来的死锁问题,同时它也带来了另一个好处,就是我们可以要求,只有持有锁的 goroutine 才能 unlock 这个锁。这也很容易实现,因为在上面这两个方案中,都已经记录了是哪一个 goroutine 持有这个锁。


下面我们具体来看这两个方案怎么实现。

  • 方案一:goroutine id

    这个方案的关键第一步是获取 goroutine id,方式有两种,分别是简单方式和 hacker 方式。

    简单方式,就是通过 runtime.Stack 方法获取栈帧信息,栈帧信息里包含 goroutine id。你可以看看上面 panic 时候的贴图,goroutine id 明明白白地显示在那里。runtime.Stack 方法可以获取当前的 goroutine 信息,第二个参数为 true 会输出所有的 goroutine 信息,信息的格式如下:

    1
    2
    3
    goroutine 1 [running]:
    main.main()
    ....../main.go:19 +0xb1

    第一行格式为 goroutine xxx,其中 xxx 就是 goroutine id,你只要解析出这个 id 即可。解析的方法可以采用下面的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func GoID() int {
    var buf [64]byte
    n := runtime.Stack(buf[:], false)
    // 得到id字符串
    idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
    id, err := strconv.Atoi(idField)
    if err != nil {
    panic(fmt.Sprintf("cannot get goroutine id: %v", err))
    }
    return id
    }

    了解了简单方式,接下来我们来看 hacker 的方式,这也是我们方案一采取的方式。

    首先,我们获取运行时的 g 指针,反解出对应的 g 的结构。每个运行的 goroutine 结构的 g 指针保存在当前 goroutine 的一个叫做 TLS 对象中。

    第一步:我们先获取到 TLS 对象;

    第二步:再从 TLS 中获取 goroutine 结构的 g 指针;

    第三步:再从 g 指针中取出 goroutine id。

    需要注意的是,不同 Go 版本的 goroutine 的结构可能不同,所以需要根据 Go 的不同版本进行调整。当然了,如果想要搞清楚各个版本的 goroutine 结构差异,所涉及的内容又过于底层而且复杂,学习成本太高。怎么办呢?

    我们可以重点关注一些库。我们没有必要重复发明轮子,直接使用第三方的库来获取 goroutine id 就可以了。 好消息是现在已经有很多成熟的方法了,可以支持多个 Go 版本的 goroutine id,给你推荐一个常用的库:petermattis/goid

    知道了如何获取 goroutine id,接下来就是最后的关键一步了,我们实现一个可以使用的可重入锁:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // RecursiveMutex 包装一个Mutex,实现可重入
    type RecursiveMutex struct {
    sync.Mutex
    owner int64 // 当前持有锁的goroutine id
    recursion int32 // 这个goroutine 重入的次数
    }

    func (m *RecursiveMutex) Lock() {
    gid := goid.Get()
    // 如果当前持有锁的goroutine就是这次调用的goroutine,说明是重入
    if atomic.LoadInt64(&m.owner) == gid {
    m.recursion++
    return
    }
    m.Mutex.Lock()
    // 获得锁的goroutine第一次调用,记录下它的goroutine id,调用次数加1
    atomic.StoreInt64(&m.owner, gid)
    m.recursion = 1
    }

    func (m *RecursiveMutex) Unlock() {
    gid := goid.Get()
    // 非持有锁的goroutine尝试释放锁,错误的使用
    if atomic.LoadInt64(&m.owner) != gid {
    panic(fmt.Sprintf("wrong the owner(%d): %d!", m.owner, gid))
    }
    // 调用次数减1
    m.recursion--
    if m.recursion != 0 { // 如果这个goroutine还没有完全释放,则直接返回
    return
    }
    // 此goroutine最后一次调用,需要释放锁
    atomic.StoreInt64(&m.owner, -1)
    m.Mutex.Unlock()
    }

    上面这段代码你可以拿来即用。我们一起来看下这个实现,真是非常巧妙,它相当于给 Mutex 打一个补丁,解决了记录锁的持有者的问题。可以看到,我们用 owner 字段,记录当前锁的拥有者 goroutine 的 id;recursion 是辅助字段,用于记录重入的次数。

    提醒一点,尽管拥有者可以多次调用 Lock,但是也必须调用相同次数的 Unlock,这样才能把锁释放掉。这是一个合理的设计,可以保证 Lock 和 Unlock 一一对应。

  • 方案二:token

    方案一是用 goroutine id 做 goroutine 的标识,我们也可以让 goroutine 自己来提供标识。不管怎么说,Go 开发者不期望你利用 goroutine id 做一些不确定的东西,所以,他们没有暴露获取 goroutine id 的方法。

    下面的代码是第二种方案。调用者自己提供一个 token,获取锁的时候把这个 token 传入,释放锁的时候也需要把这个 token 传入。通过用户传入的 token 替换方案一中 goroutine id,其它逻辑和方案一一致。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    // Token方式的递归锁
    type TokenRecursiveMutex struct {
    sync.Mutex
    token int64
    recursion int32
    }

    // 请求锁,需要传入token
    func (m *TokenRecursiveMutex) Lock(token int64) {
    if atomic.LoadInt64(&m.token) == token { //如果传入的token和持有锁的token一致,说明是递归调用
    m.recursion++
    return
    }
    m.Mutex.Lock() // 传入的token不一致,说明不是递归调用
    // 抢到锁之后记录这个token
    atomic.StoreInt64(&m.token, token)
    m.recursion = 1
    }

    // 释放锁
    func (m *TokenRecursiveMutex) Unlock(token int64) {
    if atomic.LoadInt64(&m.token) != token { // 释放其它token持有的锁
    panic(fmt.Sprintf("wrong the owner(%d): %d!", m.token, token))
    }
    m.recursion-- // 当前持有这个锁的token释放锁
    if m.recursion != 0 { // 还没有回退到最初的递归调用
    return
    }
    atomic.StoreInt64(&m.token, 0) // 没有递归调用了,释放锁
    m.Mutex.Unlock()
    }




2.3.4 死锁

我们来分析一下死锁产生的必要条件。如果你想避免死锁,只要破坏这四个条件中的一个或者几个,就可以了。

  1. 互斥: 至少一个资源是被排他性独享的,其他线程必须处于等待状态,直到资源被释放。
  2. 持有和等待:goroutine 持有一个资源,并且还在请求其它 goroutine 持有的资源,也就是咱们常说的“吃着碗里,看着锅里”的意思。
  3. 不可剥夺:资源只能由持有它的 goroutine 来释放。
  4. 环路等待:一般来说,存在一组等待进程,P={P1,P2,…,PN},P1 等待 P2 持有的资源,P2 等待 P3 持有的资源,依此类推,最后是 PN 等待 P1 持有的资源,这就形成了一个环路等待的死结。

Go 运行时,有死锁探测的功能,能够检查出是否出现了死锁的情况,如果出现了,这个时候你就需要调整策略来处理了。

你可以引入一个第三方的锁,大家都依赖这个锁进行业务处理,比如现在政府推行的一站式政务服务中心。或者是解决持有等待问题,物业不需要看到派出所的证明才给开物业证明,等等。 好了,到这里,我们讲了使用 Mutex 常见的 4 类问题。你是不是觉得,哎呀,这几类问题也太不应该了吧,真的会有人犯这么基础的错误吗? 还真是有。虽然 Mutex 使用起来很简单,但是,仍然可能出现使用错误的问题。而且,就连一些经验丰富的开发人员,也会出现一些 Mutex 使用的问题。接下来,我们围观几个非常流行的 Go 开发项目,看看这些错误是怎么产生和修复的。





2.3.5 流行的 Go 开发项目踩坑记

Docker

Docker 容器是一个开源的应用容器引擎,开发者可以以统一的方式,把他们的应用和依赖包打包到一个可移植的容器中,然后发布到任何安装了 docker 引擎的服务器上。 Docker 是使用 Go 开发的,也算是 Go 的一个杀手级产品了,它的 Mutex 相关的 Bug 也不少,我们来看几个典型的 Bug。

issue 36114

Docker 的issue 36114 是一个死锁问题。 原因在于,hotAddVHDsAtStart 方法执行的时候,执行了加锁 svm 操作。但是,在其中调用 hotRemoveVHDsAtStart 方法时,这个 hotRemoveVHDsAtStart 方法也是要加锁 svm 的。很不幸,Go 标准库中的 Mutex 是不可重入的,所以,代码执行到这里,就出现了死锁的现象。

针对这个问题,解决办法就是,再提供一个不需要锁的 hotRemoveVHDsNoLock 方法,避免 Mutex 的重入。

issue 34881

issue 34881本来是修复 Docker 的一个简单问题,如果节点在初始化的时候,发现自己不是一个 swarm mananger,就快速返回,这个修复就几行代码,你看出问题来了吗?

在第 34 行,节点发现不满足条件就返回了,但是,c.mu 这个锁没有释放!为什么会出现这个问题呢?其实,这是在重构或者添加新功能的时候经常犯的一个错误,因为不太了解上下文,或者是没有仔细看函数的逻辑,从而导致锁没有被释放。现在的 Docker 当然已经没有这个问题了。

这样的 issue 还有很多,就不一一列举了。推荐几个关于 Mutex 的 issue 或者 pull request,你可以关注一下,分别是 36840、37583、35517、35482、33305、32826、30696、29554、29191、28912、26507 等。


Kubernetes

issue 45192

issue 45192也是一个返回时忘记 Unlock 的典型例子,和 docker issue 34881 犯的错误都是一样的。

两大知名项目的开发者都犯了这个错误,所以,你就可以知道,引入这个 Bug 是多么容易,记住晁老师这句话:保证 Lock/Unlock 成对出现,尽可能采用 defer mutex.Unlock 的方式,把它们成对、紧凑地写在一起


gRPC

gRPC 是 Google 发起的一个开源远程过程调用 (Remote procedure call)系统。该系统基于 HTTP/2 协议传输,使用 Protocol Buffers 作为接口描述语言。它提供 Go 语言的实现。

即使是 Google 官方出品的系统,也有一些 Mutex 的 issue。

issue 795

issue 795是一个你可能想不到的 bug,那就是将 Unlock 误写成了 Lock。


etcd

etcd 是一个非常知名的分布式一致性的 key-value 存储技术, 被用来做配置共享和服务发现。

issue 10419

issue 10419是一个锁重入导致的问题。 Store 方法内对请求了锁,而调用的 Compact 的方法内又请求了锁,这个时候,会导致死锁,一直等待,解决办法就是提供不需要加锁的 Compact 方法。





2.4 Mutex:骇客编程,如何拓展额外功能?

“Hacker”一词指的是以一种非传统或未被官方直接支持的方式来修改或增强现有软件或系统。在这个上下文中,它特别指的是对Go语言中sync.Mutex互斥锁进行定制化扩展,以提供超出其标准功能集的特性。

前面三讲,我们学习了互斥锁 Mutex 的基本用法、实现原理以及易错场景,可以说是涵盖了互斥锁的方方面面。如果能熟练掌握这些内容,那么,在大多数的开发场景中,都可以得心应手。

但是,在一些特定的场景中,这些基础功能是不足以应对的。这个时候,我们就需要开发一些扩展功能了。

举几个例子。 比如说,我们知道,如果互斥锁被某个 goroutine 获取了,而且还没有释放,那么,其他请求这把锁的 goroutine,就会阻塞等待,直到有机会获得这把锁。有时候阻塞并不是一个很好的主意,比如你请求锁更新一个计数器,如果获取不到锁的话没必要等待,大不了这次不更新,我下次更新就好了,如果阻塞的话会导致业务处理能力的下降。

再比如,如果我们要监控锁的竞争情况,一个监控指标就是,等待这把锁的 goroutine 数量。我们可以把这个指标推送到时间序列数据库中,再通过一些监控系统(比如 Grafana)展示出来。要知道,锁是性能下降的“罪魁祸首”之一,所以,有效地降低锁的竞争,就能够很好地提高性能。因此,监控关键互斥锁上等待的 goroutine 的数量,是我们分析锁竞争的激烈程度的一个重要指标

实际上,不论是不希望锁的 goroutine 继续等待,还是想监控锁,我们都可以基于标准库中 Mutex 的实现,通过 Hacker 的方式,为 Mutex 增加一些额外的功能。这节课,就来教你实现几个扩展功能,包括实现 TryLock,获取等待者的数量等指标,以及实现一个线程安全的队列。


TryLock

我们可以为 Mutex 添加一个 TryLock 的方法,也就是尝试获取排外锁。PS:在 Go 1.18 官方标准库中,已经为 Mutex/RWMutex 增加了 TryLock 方法。

这个方法具体是什么意思呢?现在解释一下这里的逻辑。当一个 goroutine 调用这个 TryLock 方法请求锁的时候,如果这把锁没有被其他 goroutine 所持有,那么,这个 goroutine 就持有了这把锁,并返回 true;如果这把锁已经被其他 goroutine 所持有,或者是正在准备交给某个被唤醒的 goroutine,那么,这个请求锁的 goroutine 就直接返回 false,不会阻塞在方法调用上。

在实际开发中,如果要更新配置数据,我们通常需要加锁,这样可以避免同时有多个 goroutine 并发修改数据。有的时候,我们也会使用 TryLock。这样一来,当某个 goroutine 想要更改配置数据时,如果发现已经有 goroutine 在更改了,其他的 goroutine 调用 TryLock,返回了 false,这个 goroutine 就会放弃更改。

很多语言(比如 Java)都为锁提供了 TryLock 的方法,但是,Go 官方issue 6123有一个讨论(后来一些 issue 中也提到过),标准库的 Mutex 不会添加 TryLock 方法。虽然通过 Go 的 Channel 我们也可以实现 TryLock 的功能,但是基于 Channel 的实现我们会放在 Channel 那一讲中去介绍,这一次我们还是基于 Mutex 去实现,毕竟大部分的程序员还是熟悉传统的同步原语,而且传统的同步原语也不容易出错。所以这节课,还是希望带你掌握基于 Mutex 实现的方法。

那怎么实现一个扩展 TryLock 方法的 Mutex 呢?我们直接来看代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 复制Mutex定义的常量
const (
mutexLocked = 1 << iota // 加锁标识位置
mutexWoken // 唤醒标识位置
mutexStarving // 锁饥饿标识位置
mutexWaiterShift = iota // 标识waiter的起始bit位置
)

// 扩展一个Mutex结构
type Mutex struct {
sync.Mutex
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
// 如果能成功抢到锁
if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) {
return true
}

// 如果处于唤醒、加锁或者饥饿状态,这次请求就不参与竞争了,返回false
old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
if old&(mutexLocked|mutexStarving|mutexWoken) != 0 {
return false
}

// 尝试在竞争的状态下请求锁
new := old | mutexLocked
return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new)
}

第 17 行是一个 fast path,如果幸运,没有其他 goroutine 争这把锁,那么,这把锁就会被这个请求的 goroutine 获取,直接返回。

如果锁已经被其他 goroutine 所持有,或者被其他唤醒的 goroutine 准备持有,那么,就直接返回 false,不再请求,代码逻辑在第 23 行。

如果没有被持有,也没有其它唤醒的 goroutine 来竞争锁,锁也不处于饥饿状态,就尝试获取这把锁(第 29 行),不论是否成功都将结果返回。因为,这个时候,可能还有其他的 goroutine 也在竞争这把锁,所以,不能保证成功获取这把锁。

我们可以写一个简单的测试程序,来测试我们的 TryLock 的机制是否工作。

这个测试程序的工作机制是这样子的:程序运行时会启动一个 goroutine 持有这把我们自己实现的锁,经过随机的时间才释放。主 goroutine 会尝试获取这把锁。如果前一个 goroutine 一秒内释放了这把锁,那么,主 goroutine 就有可能获取到这把锁了,输出“got the lock”,否则没有获取到也不会被阻塞,会直接输出“can’t get the lock”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func try() {
var mu Mutex
go func() { // 启动一个goroutine持有一段时间的锁
mu.Lock()
time.Sleep(time.Duration(rand.Intn(2)) * time.Second)
mu.Unlock()
}()

time.Sleep(time.Second)

ok := mu.TryLock() // 尝试获取到锁
if ok { // 获取成功
fmt.Println("got the lock")
// do something
mu.Unlock()
return
}

// 没有获取到
fmt.Println("can't get the lock")
}

获取等待者的数量等指标

接下来,我们聊聊怎么获取等待者数量等指标。 第二讲中,我们已经学习了 Mutex 的结构。先来回顾一下 Mutex 的数据结构,如下面的代码所示。它包含两个字段,state 和 sema。前四个字节(int32)就是 state 字段。

1
2
3
4
type Mutex struct {
state int32
sema uint32
}

Mutex 结构中的 state 字段有很多个含义,通过 state 字段,你可以知道锁是否已经被某个 goroutine 持有、当前是否处于饥饿状态、是否有等待的 goroutine 被唤醒、等待者的数量等信息。但是,state 这个字段并没有暴露出来,所以,我们需要想办法获取到这个字段,并进行解析。
怎么获取未暴露的字段呢?

很简单,我们可以通过 unsafe 的方式实现。举一个例子,你一看就明白了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
)

type Mutex struct {
sync.Mutex
}

func (m *Mutex) Count() int {
// 获取state字段的值
v := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
v = v >> mutexWaiterShift + (v & mutexLocked)
return int(v)
}

这个例子的第 14 行通过 unsafe 操作,我们可以得到 state 字段的值。第 15 行我们右移三位(这里的常量 mutexWaiterShift 的值为 3),就得到了当前等待者的数量。如果当前的锁已经被其他 goroutine 持有,那么,我们就稍微调整一下这个值,加上一个 1(第 16 行),你基本上可以把它看作是当前持有和等待这把锁的 goroutine 的总数。

state 这个字段的第一位是用来标记锁是否被持有,第二位用来标记是否已经唤醒了一个等待者,第三位标记锁是否处于饥饿状态,通过分析这个 state 字段我们就可以得到这些状态信息。我们可以为这些状态提供查询的方法,这样就可以实时地知道锁的状态了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 锁是否被持有
func (m *Mutex) IsLocked() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexLocked == mutexLocked
}

// 是否有等待者被唤醒
func (m *Mutex) IsWoken() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexWoken == mutexWoken
}

// 锁是否处于饥饿状态
func (m *Mutex) IsStarving() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexStarving == mutexStarving
}

我们可以写一个程序测试一下,比如,在 1000 个 goroutine 并发访问的情况下,我们可以把锁的状态信息输出出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func count() {
var mu Mutex
for i := 0; i < 1000; i++ { // 启动1000个goroutine
go func() {
mu.Lock()
time.Sleep(time.Second)
mu.Unlock()
}()
}

time.Sleep(time.Second)
// 输出锁的信息
fmt.Printf("waitings: %d, isLocked: %t, woken: %t, starving: %t\n", mu.Count(), mu.IsLocked(), mu.IsWoken(), mu.IsStarving())
}

有一点你需要注意一下,在获取 state 字段的时候,并没有通过 Lock 获取这把锁,所以获取的这个 state 的值是一个瞬态的值,可能在你解析出这个字段之后,锁的状态已经发生了变化。不过没关系,因为你查看的就是调用的那一时刻的锁的状态。


使用 Mutex 实现一个线程安全的队列

最后,我们来讨论一下,如何使用 Mutex 实现一个线程安全的队列。

为什么要讨论这个话题呢?因为 Mutex 经常会和其他非线程安全(对于 Go 来说,我们其实指的是 goroutine 安全)的数据结构一起,组合成一个线程安全的数据结构。新数据结构的业务逻辑由原来的数据结构提供,而 Mutex 提供了锁的机制,来保证线程安全。

比如队列,我们可以通过 Slice 来实现,但是通过 Slice 实现的队列不是线程安全的,出队(Dequeue)和入队(Enqueue)会有 data race 的问题。这个时候,Mutex 就要隆重出场了,通过它,我们可以在出队和入队的时候加上锁的保护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type SliceQueue struct {
data []interface{}
mu sync.Mutex
}

func NewSliceQueue(n int) (q *SliceQueue) {
return &SliceQueue{data: make([]interface{}, 0, n)}
}

// Enqueue 把值放在队尾
func (q *SliceQueue) Enqueue(v interface{}) {
q.mu.Lock()
q.data = append(q.data, v)
q.mu.Unlock()
}

// Dequeue 移去队头并返回
func (q *SliceQueue) Dequeue() interface{} {
q.mu.Lock()
if len(q.data) == 0 {
q.mu.Unlock()
return nil
}
v := q.data[0]
q.data = q.data[1:]
q.mu.Unlock()
return v
}

因为标准库中没有线程安全的队列数据结构的实现,所以,你可以通过 Mutex 实现一个简单的队列。通过 Mutex 我们就可以为一个非线程安全的 data interface{}实现线程安全的访问。





2.5 RWMutex:读写锁的实现原理及避坑指南

在前面的四节课中,我们学习了第一个同步原语,即 Mutex,我们使用它来保证读写共享资源的安全性。不管是读还是写,我们都通过 Mutex 来保证只有一个 goroutine 访问共享资源,这在某些情况下有点“浪费”。比如说,在写少读多的情况下,即使一段时间内没有写操作,大量并发的读访问也不得不在 Mutex 的保护下变成了串行访问,这个时候,使用 Mutex,对性能的影响就比较大。

怎么办呢?你是不是已经有思路了,对,就是区分读写操作。

具体解释一下。如果某个读操作的 goroutine 持有了锁,在这种情况下,其它读操作的 goroutine 就不必一直傻傻地等待了,而是可以并发地访问共享变量,这样我们就可以将串行的读变成并行读,提高读操作的性能。当写操作的 goroutine 持有锁的时候,它就是一个排外锁,其它的写操作和读操作的 goroutine,需要阻塞等待持有这个锁的 goroutine 释放锁。

这一类并发读写问题叫作readers-writers 问题,意思就是,同时可能有多个读或者多个写,但是只要有一个线程在执行写操作,其它的线程都不能执行读写操作。

Go 标准库中的 RWMutex(读写锁)就是用来解决这类 readers-writers 问题的。所以,这节课,我们就一起来学习 RWMutex。本文会给你介绍读写锁的使用场景、实现原理以及容易掉入的坑,你一定要记住这些陷阱,避免在实际的开发中犯相同的错误。


什么是 RWMutex?

先简单解释一下读写锁 RWMutex。标准库中的 RWMutex 是一个 reader/writer 互斥锁。RWMutex 在某一时刻只能由任意数量的 reader 持有,或者是只被单个的 writer 持有。

RWMutex 的方法也很少,总共有 5 个。

  • Lock/Unlock:写操作时调用的方法。如果锁已经被 reader 或者 writer 持有,那么,Lock 方法会一直阻塞,直到能获取到锁;Unlock 则是配对的释放锁的方法。
  • RLock/RUnlock:读操作时调用的方法。如果锁已经被 writer 持有的话,RLock 方法会一直阻塞,直到能获取到锁,否则就直接返回;而 RUnlock 是 reader 释放锁的方法。
  • RLocker:这个方法的作用是为读操作返回一个 Locker 接口的对象。它的 Lock 方法会调用 RWMutex 的 RLock 方法,它的 Unlock 方法会调用 RWMutex 的 RUnlock 方法。

RWMutex 的零值是未加锁的状态,所以,当你使用 RWMutex 的时候,无论是声明变量,还是嵌入到其它 struct 中,都不必显式地初始化。

以计数器为例,来说明一下,如何使用 RWMutex 保护共享资源。计数器的 count++操作是写操作,而获取 count 的值是读操作,这个场景非常适合读写锁,因为读操作可以并行执行,写操作时只允许一个线程执行,这正是 readers-writers 问题。

在这个例子中,使用 10 个 goroutine 进行读操作,每读取一次,sleep 1 毫秒,同时,还有一个 gorotine 进行写操作,每一秒写一次,这是一个 1 writer-n reader 的读写场景,而且写操作还不是很频繁(一秒一次):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func main() {
var counter Counter
for i := 0; i < 10; i++ { // 10个reader
go func() {
for {
counter.Count() // 计数器读操作
time.Sleep(time.Millisecond)
}
}()
}

for { // 一个writer
counter.Incr() // 计数器写操作
time.Sleep(time.Second)
}
}
// 一个线程安全的计数器
type Counter struct {
mu sync.RWMutex
count uint64
}

// 使用写锁保护
func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}

// 使用读锁保护
func (c *Counter) Count() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.count
}

可以看到,Incr 方法会修改计数器的值,是一个写操作,我们使用 Lock/Unlock 进行保护。Count 方法会读取当前计数器的值,是一个读操作,我们使用 RLock/RUnlock 方法进行保护。

Incr 方法每秒才调用一次,所以,writer 竞争锁的频次是比较低的,而 10 个 goroutine 每毫秒都要执行一次查询,通过读写锁,可以极大提升计数器的性能,因为在读取的时候,可以并发进行。如果使用 Mutex,性能就不会像读写锁这么好。因为多个 reader 并发读的时候,使用互斥锁导致了 reader 要排队读的情况,没有 RWMutex 并发读的性能好。

如果你遇到可以明确区分 reader 和 writer goroutine 的场景,且有大量的并发读、少量的并发写,并且有强烈的性能需求,你就可以考虑使用读写锁 RWMutex 替换 Mutex

在实际使用 RWMutex 的时候,如果我们在 struct 中使用 RWMutex 保护某个字段,一般会把它和这个字段放在一起,用来指示两个字段是一组字段。除此之外,我们还可以采用匿名字段的方式嵌入 struct,这样,在使用这个 struct 时,我们就可以直接调用 Lock/Unlock、RLock/RUnlock 方法了,这和我们前面在 2.1小节 中介绍 Mutex 的使用方法很类似,可以回去复习一下。


RWMutex 的实现原理

RWMutex 是很常见的并发原语,很多编程语言的库都提供了类似的并发类型。RWMutex 一般都是基于互斥锁、条件变量(condition variables)或者信号量(semaphores)等并发原语来实现。Go 标准库中的 RWMutex 是基于 Mutex 实现的

readers-writers 问题一般有三类,基于对读和写操作的优先级,读写锁的设计和实现也分成三类。

  • Read-preferring:读优先的设计可以提供很高的并发性,但是,在竞争激烈的情况下可能会导致写饥饿。这是因为,如果有大量的读,这种设计会导致只有所有的读都释放了锁之后,写才可能获取到锁。
  • Write-preferring:写优先的设计意味着,如果已经有一个 writer 在等待请求锁的话,它会阻止新来的请求锁的 reader 获取到锁,所以优先保障 writer。当然,如果有一些 reader 已经请求了锁的话,新请求的 writer 也会等待已经存在的 reader 都释放锁之后才能获取。所以,写优先级设计中的优先权是针对新来的请求而言的。这种设计主要避免了 writer 的饥饿问题。
  • 不指定优先级:这种设计比较简单,不区分 reader 和 writer 优先级,某些场景下这种不指定优先级的设计反而更有效,因为第一类优先级会导致写饥饿,第二类优先级可能会导致读饥饿,这种不指定优先级的访问不再区分读写,大家都是同一个优先级,解决了饥饿的问题。

Go 标准库中的 RWMutex 设计是 Write-preferring 方案。一个正在阻塞的 Lock 调用会排除新的 reader 请求到锁

RWMutex 包含一个 Mutex,以及四个辅助字段 writerSem、readerSem、readerCount 和 readerWait:

1
2
3
4
5
6
7
8
9
type RWMutex struct {
w Mutex // 互斥锁解决多个writer的竞争
writerSem uint32 // writer信号量
readerSem uint32 // reader信号量
readerCount int32 // reader的数量
readerWait int32 // writer等待完成的reader的数量
}

const rwmutexMaxReaders = 1 << 30

简单解释一下这几个字段。

  • 字段 w:为 writer 的竞争锁而设计;
  • 字段 readerCount:记录当前 reader 的数量(以及是否有 writer 竞争锁);
  • readerWait:记录 writer 请求锁时需要等待 read 完成的 reader 的数量;
  • writerSem 和 readerSem:都是为了阻塞设计的信号量。

这里的常量 rwmutexMaxReaders,定义了最大的 reader 数量。

好了,知道了 RWMutex 的设计方案和具体字段,下面解释一下具体的方法实现。


RLock/RUnlock 的实现

首先,我们看一下移除了 race 等无关紧要的代码后的 RLock 和 RUnlock 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// rw.readerCount是负值的时候,意味着此时有writer等待请求锁,因为writer优先级高,所以把后来的reader阻塞休眠
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r) // 有等待的writer
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 最后一个reader了,writer终于有机会获得锁了
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

第 2 行是对 reader 计数加 1。你可能比较困惑的是,readerCount 怎么还可能为负数呢?其实,这是因为,readerCount 这个字段有双重含义:

  • 没有 writer 竞争或持有锁时,readerCount 和我们正常理解的 reader 的计数是一样的;
  • 但是,如果有 writer 竞争锁或者持有锁时,那么,readerCount 不仅仅承担着 reader 的计数功能,还能够标识当前是否有 writer 竞争或持有锁,在这种情况下,请求锁的 reader 的处理进入第 4 行,阻塞等待锁的释放。

调用 RUnlock 的时候,我们需要将 Reader 的计数减去 1(第 8 行),因为 reader 的数量减少了一个。但是,第 8 行的 AddInt32 的返回值还有另外一个含义。如果它是负值,就表示当前有 writer 竞争锁,在这种情况下,还会调用 rUnlockSlow 方法,检查是不是 reader 都释放读锁了,如果读锁都释放了,那么可以唤醒请求写锁的 writer 了。

当一个或者多个 reader 持有锁的时候,竞争锁的 writer 会等待这些 reader 释放完,才可能持有这把锁。打个比方,在房地产行业中有条规矩叫做“买卖不破租赁”,意思是说,就算房东把房子卖了,新业主也不能把当前的租户赶走,而是要等到租约结束后,才能接管房子。这和 RWMutex 的设计是一样的。当 writer 请求锁的时候,是无法改变既有的 reader 持有锁的现实的,也不会强制这些 reader 释放锁,它的优先权只是限定后来的 reader 不要和它抢。

所以,rUnlockSlow 将持有锁的 reader 计数减少 1 的时候,会检查既有的 reader 是不是都已经释放了锁,如果都释放了锁,就会唤醒 writer,让 writer 持有锁。


Lock

RWMutex 是一个多 writer 多 reader 的读写锁,所以同时可能有多个 writer 和 reader。那么,为了避免 writer 之间的竞争,RWMutex 就会使用一个 Mutex 来保证 writer 的互斥。

一旦一个 writer 获得了内部的互斥锁,就会反转 readerCount 字段,把它从原来的正整数 readerCount(>=0) 修改为负数(readerCount-rwmutexMaxReaders),让这个字段保持两个含义(既保存了 reader 的数量,又表示当前有 writer)。

我们来看下下面的代码。第 5 行,还会记录当前活跃的 reader 数量,所谓活跃的 reader,就是指持有读锁还没有释放的那些 reader。

1
2
3
4
5
6
7
8
9
10
func (rw *RWMutex) Lock() {
// 首先解决其他writer竞争问题
rw.w.Lock()
// 反转readerCount,告诉reader有writer竞争锁
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 如果当前有reader持有锁,那么需要等待
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}

如果 readerCount 不是 0,就说明当前有持有读锁的 reader,RWMutex 需要把这个当前 readerCount 赋值给 readerWait 字段保存下来(第 7 行), 同时,这个 writer 进入阻塞等待状态(第 8 行)。

每当一个 reader 释放读锁的时候(调用 RUnlock 方法时),readerWait 字段就减 1,直到所有的活跃的 reader 都释放了读锁,才会唤醒这个 writer。


Unlock

当一个 writer 释放锁的时候,它会再次反转 readerCount 字段。可以肯定的是,因为当前锁由 writer 持有,所以,readerCount 字段是反转过的,并且减去了 rwmutexMaxReaders 这个常数,变成了负数。所以,这里的反转方法就是给它增加 rwmutexMaxReaders 这个常数值。

既然 writer 要释放锁了,那么就需要唤醒之后新来的 reader,不必再阻塞它们了,让它们开开心心地继续执行就好了。

在 RWMutex 的 Unlock 返回之前,需要把内部的互斥锁释放。释放完毕后,其他的 writer 才可以继续竞争这把锁。

1
2
3
4
5
6
7
8
9
10
11
func (rw *RWMutex) Unlock() {
// 告诉reader没有活跃的writer了
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)

// 唤醒阻塞的reader们
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 释放内部的互斥锁
rw.w.Unlock()
}

在这段代码中,删除了 race 的处理和异常情况的检查,总体看来还是比较简单的。这里有几个重点,需要再提醒你一下。首先,你要理解 readerCount 这个字段的含义以及反转方式。其次,你还要注意字段的更改和内部互斥锁的顺序关系。在 Lock 方法中,是先获取内部互斥锁,才会修改的其他字段;而在 Unlock 方法中,是先修改的其他字段,才会释放内部互斥锁,这样才能保证字段的修改也受到互斥锁的保护。

好了,到这里我们就完整学习了 RWMutex 的概念和实现原理。RWMutex 的应用场景非常明确,就是解决 readers-writers 问题。学完了今天的内容,之后当你遇到这类问题时,要优先想到 RWMutex。另外,Go 并发原语代码实现的质量都很高,非常精炼和高效,所以,你可以通过看它们的实现原理,学习一些编程的技巧。当然,还有非常重要的一点就是要知道 reader 或者 writer 请求锁的时候,既有的 reader/writer 和后续请求锁的 reader/writer 之间的(释放锁 / 请求锁)顺序关系。


虽然 RWMutex 暴露的 API 也很简单,使用起来也没有复杂的逻辑,但是和 Mutex 一样,在实际使用的时候,也会很容易踩到一些坑。接下来,重点介绍 3 个常见的踩坑点。

RWMutex 的 3 个踩坑点

坑点 1:不可复制

前面刚刚说过,RWMutex 是由一个互斥锁和四个辅助字段组成的。我们很容易想到,互斥锁是不可复制的,再加上四个有状态的字段,RWMutex 就更加不能复制使用了。

不能复制的原因和互斥锁一样。一旦读写锁被使用,它的字段就会记录它当前的一些状态。这个时候你去复制这把锁,就会把它的状态也给复制过来。但是,原来的锁在释放的时候,并不会修改你复制出来的这个读写锁,这就会导致复制出来的读写锁的状态不对,可能永远无法释放锁。

那该怎么办呢?其实,解决方案也和互斥锁一样。你可以借助 vet 工具,在变量赋值、函数传参、函数返回值、遍历数据、struct 初始化等时,检查是否有读写锁隐式复制的情景。

坑点 2:重入导致死锁

读写锁因为重入(或递归调用)导致死锁的情况更多。 先介绍第一种情况。因为读写锁内部基于互斥锁实现对 writer 的并发访问,而互斥锁本身是有重入问题的,所以,writer 重入调用 Lock 的时候,就会出现死锁的现象,这个问题,我们在学习互斥锁的时候已经了解过了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func foo(l *sync.RWMutex) {
fmt.Println("in foo")
l.Lock()
bar(l)
l.Unlock()
}

func bar(l *sync.RWMutex) {
l.Lock()
fmt.Println("in bar")
l.Unlock()
}

func main() {
l := &sync.RWMutex{}
foo(l)
}

运行这个程序,你就会得到死锁的错误输出,在 Go 运行的时候,很容易就能检测出来。

第二种死锁的场景有点隐蔽。我们知道,有活跃 reader 的时候,writer 会等待,如果我们在 reader 的读操作时调用 writer 的写操作(它会调用 Lock 方法),那么,这个 reader 和 writer 就会形成互相依赖的死锁状态。Reader 想等待 writer 完成后再释放锁,而 writer 需要这个 reader 释放锁之后,才能不阻塞地继续执行。这是一个读写锁常见的死锁场景。

第三种死锁的场景更加隐蔽。 当一个 writer 请求锁的时候,如果已经有一些活跃的 reader,它会等待这些活跃的 reader 完成,才有可能获取到锁,但是,如果之后活跃的 reader 再依赖新的 reader 的话,这些新的 reader 就会等待 writer 释放锁之后才能继续执行,这就形成了一个环形依赖: writer 依赖活跃的 reader -> 活跃的 reader 依赖新来的 reader -> 新来的 reader 依赖 writer。

这个死锁相当隐蔽,原因在于它和 RWMutex 的设计和实现有关。啥意思呢?我们来看一个计算阶乘 (n!) 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func main() {
var mu sync.RWMutex

// writer,稍微等待,然后制造一个调用Lock的场景
go func() {
time.Sleep(200 * time.Millisecond)
mu.Lock()
fmt.Println("Lock")
time.Sleep(100 * time.Millisecond)
mu.Unlock()
fmt.Println("Unlock")
}()

go func() {
factorial(&mu, 10) // 计算10的阶乘, 10!
}()

select {}
}

// 递归调用计算阶乘
func factorial(m *sync.RWMutex, n int) int {
if n < 1 { // 阶乘退出条件
return 0
}
fmt.Println("RLock")
m.RLock()
defer func() {
fmt.Println("RUnlock")
m.RUnlock()
}()
time.Sleep(100 * time.Millisecond)
return factorial(m, n-1) * n // 递归调用
}

factoria 方法是一个递归计算阶乘的方法,我们用它来模拟 reader。为了更容易地制造出死锁场景,在这里加上了 sleep 的调用,延缓逻辑的执行。这个方法会调用读锁(第 27 行),在第 33 行递归地调用此方法,每次调用都会产生一次读锁的调用,所以可以不断地产生读锁的调用,而且必须等到新请求的读锁释放,这个读锁才能释放。

同时,我们使用另一个 goroutine 去调用 Lock 方法,来实现 writer,这个 writer 会等待 200 毫秒后才会调用 Lock,这样在调用 Lock 的时候,factoria 方法还在执行中不断调用 RLock。

这两个 goroutine 互相持有锁并等待,谁也不会退让一步,满足了“writer 依赖活跃的 reader -> 活跃的 reader 依赖新来的 reader -> 新来的 reader 依赖 writer”的死锁条件,所以就导致了死锁的产生。

所以,使用读写锁最需要注意的一点就是尽量避免重入,重入带来的死锁非常隐蔽,而且难以诊断。

坑点 3:释放未加锁的 RWMutex

和互斥锁一样,Lock 和 Unlock 的调用总是成对出现的,RLock 和 RUnlock 的调用也必须成对出现。Lock 和 RLock 多余的调用会导致锁没有被释放,可能会出现死锁,而 Unlock 和 RUnlock 多余的调用会导致 panic。在生产环境中出现 panic 是大忌,你总不希望半夜爬起来处理生产环境程序崩溃的问题吧?所以,在使用读写锁的时候,一定要注意,不遗漏不多余。


流行的 Go 开发项目中的坑

Docker issue 36840

issue 36840修复的是错误地把 writer 当成 reader 的 Bug。 这个地方本来需要修改数据,需要调用的是写锁,结果用的却是读锁。或许是被它紧挨着的 findNode 方法调用迷惑了,认为这只是一个读操作。可实际上,代码后面还会有 changeNodeState 方法的调用,这是一个写操作。修复办法也很简单,只需要改成 Lock/Unlock 即可。

Kubernetes issue 62464

issue 62464就是读写锁第二种死锁的场景,这是一个典型的 reader 导致的死锁的例子。知道墨菲定律吧?“凡是可能出错的事,必定会出错”。你可能觉得前面讲的 RWMutex 的坑绝对不会被人踩的,因为道理大家都懂,但是你看,Kubernetes 就踩了这个重入的坑。

这个 issue 在移除 pod 的时候可能会发生,原因就在于,GetCPUSetOrDefault 方法会请求读锁,同时,它还会调用 GetCPUSet 或 GetDefaultCPUSet 方法,这时又会请求读锁。如果期间有其它 goroutine 请求写锁的话,GetCPUSetOrDefault 方法调用 GetCPUSet 或 GetDefaultCPUSet 方法时就不会返回了,请求写锁的 goroutine 也不会返回,这就会形成死锁。





2.6 WaitGroup:协同等待,任务编排利器

其实,WaitGroup 很简单,就是 package sync 用来做任务编排的一个并发原语。它要解决的就是并发 - 等待的问题:现在有一个 goroutine A 在检查点(checkpoint)等待一组 goroutine 全部完成,如果在执行任务的这些 goroutine 还没全部完成,那么 goroutine A 就会阻塞在检查点,直到所有 goroutine 都完成后才能继续执行。

我们来看一个使用 WaitGroup 的场景。

比如,我们要完成一个大的任务,需要使用并行的 goroutine 执行三个小任务,只有这三个小任务都完成,我们才能去执行后面的任务。如果通过轮询的方式定时询问三个小任务是否完成,会存在两个问题:一是,性能比较低,因为三个小任务可能早就完成了,却要等很长时间才被轮询到;二是,会有很多无谓的轮询,空耗 CPU 资源。

那么,这个时候使用 WaitGroup 并发原语就比较有效了,它可以阻塞等待的 goroutine。等到三个小任务都完成了,再即时唤醒它们。

其实,很多操作系统和编程语言都提供了类似的并发原语。比如,Linux 中的 barrier、Pthread(POSIX 线程)中的 barrier、C++ 中的 std::barrier、Java 中的 CyclicBarrier 和 CountDownLatch 等。由此可见,这个并发原语还是一个非常基础的并发类型。所以,我们要认真掌握今天的内容,这样就可以举一反三,轻松应对其他场景下的需求了。
我们还是从 WaitGroup 的基本用法学起吧。


WaitGroup 的基本用法

Go 标准库中的 WaitGroup 提供了三个方法,保持了 Go 简洁的风格。

1
2
3
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()

我们分别看下这三个方法:

  • Add,用来设置 WaitGroup 的计数值;
  • Done,用来将 WaitGroup 的计数值减 1,其实就是调用了 Add(-1);
  • Wait,调用这个方法的 goroutine 会一直阻塞,直到 WaitGroup 的计数值变为 0。

接下来,我们通过一个使用 WaitGroup 的例子,来看下 Add、Done、Wait 方法的基本用法。

在这个例子中,我们使用了以前实现的计数器 struct。我们启动了 10 个 worker,分别对计数值加一,10 个 worker 都完成后,我们期望输出计数器的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 线程安全的计数器
type Counter struct {
mu sync.Mutex
count uint64
}
// 对计数值加一
func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
// 获取当前的计数值
func (c *Counter) Count() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
// sleep 1秒,然后计数值加1
func worker(c *Counter, wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(time.Second)
c.Incr()
}

func main() {
var counter Counter

var wg sync.WaitGroup
wg.Add(10) // WaitGroup的值设置为10

for i := 0; i < 10; i++ { // 启动10个goroutine执行加1任务
go worker(&counter, &wg)
}
// 检查点,等待goroutine都完成任务
wg.Wait()
// 输出当前计数器的值
fmt.Println(counter.Count())
}

我们一起来分析下这段代码。

第 28 行,声明了一个 WaitGroup 变量,初始值为零。
第 29 行,把 WaitGroup 变量的计数值设置为 10。因为我们需要编排 10 个 goroutine(worker) 去执行任务,并且等待 goroutine 完成。
第 35 行,调用 Wait 方法阻塞等待。
第 32 行,启动了 goroutine,并把我们定义的 WaitGroup 指针当作参数传递进去。goroutine 完成后,需要调用 Done 方法,把 WaitGroup 的计数值减 1。等 10 个 goroutine 都调用了 Done 方法后,WaitGroup 的计数值降为 0,这时,第 35 行的主 goroutine 就不再阻塞,会继续执行,在第 37 行输出计数值。

这就是我们使用 WaitGroup 编排这类任务的常用方式。而“这类任务”指的就是,需要启动多个 goroutine 执行任务,主 goroutine 需要等待子 goroutine 都完成后才继续执行。

这就是我们使用 WaitGroup 编排这类任务的常用方式。而“这类任务”指的就是,需要启动多个 goroutine 执行任务,主 goroutine 需要等待子 goroutine 都完成后才继续执行。

熟悉了 WaitGroup 的基本用法后,我们再看看它具体是如何实现的吧。


WaitGroup 的实现

首先,我们看看 WaitGroup 的数据结构。它包括了一个 noCopy 的辅助字段,一个 state1 记录 WaitGroup 状态的数组。

  • noCopy 的辅助字段,主要就是辅助 vet 工具检查是否通过 copy 赋值这个 WaitGroup 实例。本文会在后面和你详细分析这个字段;
  • state1,一个具有复合意义的字段,包含 WaitGroup 的计数、阻塞在检查点的 waiter 数和信号量。

WaitGroup 的数据结构定义以及 state 信息的获取方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type WaitGroup struct {
// 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则
noCopy noCopy
// 64bit(8bytes)的值分成两段,高32bit是计数值,低32bit是waiter的计数
// 另外32bit是用作信号量的
// 因为64bit值的原子操作需要64bit对齐,但是32bit编译器不支持,所以数组中的元素在不同的架构中不一样,具体处理看下面的方法
// 总之,会找到对齐的那64bit作为state,其余的32bit做信号量
state1 [3]uint32
}


// 得到state的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}

因为对 64 位整数的原子操作要求整数的地址是 64 位对齐的,所以针对 64 位和 32 位环境的 state 字段的组成是不一样的。

在 64 位环境下,state1 的第一个元素是 waiter 数,第二个元素是 WaitGroup 的计数值,第三个元素是信号量。

在 32 位环境下,如果 state1 不是 64 位对齐的地址,那么 state1 的第一个元素是信号量,后两个元素分别是 waiter 数和计数值。

然后,我们继续深入源码,看一下 Add、Done 和 Wait 这三个方法的实现。

在查看这部分源码实现时,我们会发现,除了这些方法本身的实现外,还会有一些额外的代码,主要是 race 检查和异常检查的代码。其中,有几个检查非常关键,如果检查不通过,会出现 panic,这部分内容会在下一小节分析 WaitGroup 的错误使用场景时介绍。现在,我们先专注在 Add、Wait 和 Done 本身的实现代码上。

先梳理下 Add 方法的逻辑。Add 方法主要操作的是 state 的计数部分。你可以为计数值增加一个 delta 值,内部通过原子操作把这个值加到计数值上。需要注意的是,这个 delta 也可以是个负数,相当于为计数值减去一个值,Done 方法内部其实就是通过 Add(-1) 实现的。

它的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 高32bit是计数值v,所以把delta左移32,增加到计数上
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) // 当前计数值
w := uint32(state) // waiter count

if v > 0 || w == 0 {
return
}

// 如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量
// 将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}


// Done方法实际就是计数器减1
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

Wait 方法的实现逻辑是:不断检查 state 的值。如果其中的计数值变为了 0,那么说明所有的任务已完成,调用者不必再等待,直接返回。如果计数值大于 0,说明此时还有任务没完成,那么调用者就变成了等待者,需要加入 waiter 队列,并且阻塞住自己。

其主干实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()

for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 当前计数值
w := uint32(state) // waiter的数量
if v == 0 {
// 如果计数值为0, 调用这个方法的goroutine不必再等待,继续执行它后面的逻辑即可
return
}
// 否则把waiter数量加1。期间可能有并发调用Wait的情况,所以最外层使用了一个for循环
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 阻塞休眠等待
runtime_Semacquire(semap)
// 被唤醒,不再阻塞,返回
return
}
}
}

使用 WaitGroup 时的常见错误

在分析 WaitGroup 的 Add、Done 和 Wait 方法的实现的时候,为避免干扰,删除了异常检查的代码。但是,这些异常检查非常有用。

我们在开发的时候,经常会遇见或看到误用 WaitGroup 的场景,究其原因就是没有弄明白这些检查的逻辑。所以接下来,我们就通过几个小例子,一起学习下在开发时绝对要避免的 3 个问题。

常见问题一:计数器设置为负值

WaitGroup 的计数器的值必须大于等于 0。我们在更改这个计数值的时候,WaitGroup 会先做检查,如果计数值被设置为负数,就会导致 panic。 一般情况下,有两种方法会导致计数器设置为负数。

第一种方法是:调用 Add 的时候传递一个负数。如果你能保证当前的计数器加上这个负数后还是大于等于 0 的话,也没有问题,否则就会导致 panic。

比如下面这段代码,计数器的初始值为 10,当第一次传入 -10 的时候,计数值被设置为 0,不会有啥问题。但是,再紧接着传入 -1 以后,计数值就被设置为负数了,程序就会出现 panic。

1
2
3
4
5
6
7
8
func main() {
var wg sync.WaitGroup
wg.Add(10)

wg.Add(-10)//将-10作为参数调用Add,计数值被设置为0

wg.Add(-1)//将-1作为参数调用Add,如果加上-1计数值就会变为负数。这是不对的,所以会触发panic
}

第二个方法是:调用 Done 方法的次数过多,超过了 WaitGroup 的计数值。

使用 WaitGroup 的正确姿势是,预先确定好 WaitGroup 的计数值,然后调用相同次数的 Done 完成相应的任务。 比如,在 WaitGroup 变量声明之后,就立即设置它的计数值,或者在 goroutine 启动之前增加 1,然后在 goroutine 中调用 Done。

如果你没有遵循这些规则,就很可能会导致 Done 方法调用的次数和计数值不一致,进而造成死锁(Done 调用次数比计数值少)或者 panic(Done 调用次数比计数值多)。

比如下面这个例子中,多调用了一次 Done 方法后,会导致计数值为负,所以程序运行到这一行会出现 panic。

1
2
3
4
5
6
7
8
func main() {
var wg sync.WaitGroup
wg.Add(1)

wg.Done()

wg.Done()
}

常见问题二:不期望的 Add 时机

在使用 WaitGroup 的时候,你一定要遵循的原则就是,等所有的 Add 方法调用之后再调用 Wait,否则就可能导致 panic 或者不期望的结果。

我们构造这样一个场景:只有部分的 Add/Done 执行完后,Wait 就返回。我们看一个例子:启动四个 goroutine,每个 goroutine 内部调用 Add(1) 然后调用 Done(),主 goroutine 调用 Wait 等待任务完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
var wg sync.WaitGroup
go dosomething(100, &wg) // 启动第一个goroutine
go dosomething(110, &wg) // 启动第二个goroutine
go dosomething(120, &wg) // 启动第三个goroutine
go dosomething(130, &wg) // 启动第四个goroutine

wg.Wait() // 主goroutine等待完成
fmt.Println("Done")
}

func dosomething(millisecs time.Duration, wg *sync.WaitGroup) {
duration := millisecs * time.Millisecond
time.Sleep(duration) // 故意sleep一段时间

wg.Add(1)
fmt.Println("后台执行, duration:", duration)
wg.Done()
}

在这个例子中,我们原本设想的是,等四个 goroutine 都执行完毕后输出 Done 的信息,但是它的错误之处在于,将 WaitGroup.Add 方法的调用放在了子 gorotuine 中。等主 goorutine 调用 Wait 的时候,因为四个任务 goroutine 一开始都休眠,所以可能 WaitGroup 的 Add 方法还没有被调用,WaitGroup 的计数还是 0,所以它并没有等待四个子 goroutine 执行完毕才继续执行,而是立刻执行了下一步。

导致这个错误的原因是,没有遵循先完成所有的 Add 之后才 Wait。要解决这个问题,一个方法是,预先设置计数值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func main() {
var wg sync.WaitGroup
wg.Add(4) // 预先设定WaitGroup的计数值

go dosomething(100, &wg) // 启动第一个goroutine
go dosomething(110, &wg) // 启动第二个goroutine
go dosomething(120, &wg) // 启动第三个goroutine
go dosomething(130, &wg) // 启动第四个goroutine

wg.Wait() // 主goroutine等待
fmt.Println("Done")
}

func dosomething(millisecs time.Duration, wg *sync.WaitGroup) {
duration := millisecs * time.Millisecond
time.Sleep(duration)

fmt.Println("后台执行, duration:", duration)
wg.Done()
}

另一种方法是在启动子 goroutine 之前才调用 Add:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
var wg sync.WaitGroup

dosomething(100, &wg) // 调用方法,把计数值加1,并启动任务goroutine
dosomething(110, &wg) // 调用方法,把计数值加1,并启动任务goroutine
dosomething(120, &wg) // 调用方法,把计数值加1,并启动任务goroutine
dosomething(130, &wg) // 调用方法,把计数值加1,并启动任务goroutine

wg.Wait() // 主goroutine等待,代码逻辑保证了四次Add(1)都已经执行完了
fmt.Println("Done")
}

func dosomething(millisecs time.Duration, wg *sync.WaitGroup) {
wg.Add(1) // 计数值加1,再启动goroutine

go func() {
duration := millisecs * time.Millisecond
time.Sleep(duration)
fmt.Println("后台执行, duration:", duration)
wg.Done()
}()
}

可见,无论是怎么修复,都要保证所有的 Add 方法是在 Wait 方法之前被调用的。

常见问题三:前一个 Wait 还没结束就重用 WaitGroup

“前一个 Wait 还没结束就重用 WaitGroup”这一点似乎不太好理解,现在借用田径比赛的例子和你解释下吧。在田径比赛的百米小组赛中,需要把选手分成几组,一组选手比赛完之后,就可以进行下一组了。为了确保两组比赛时间上没有冲突,我们在模型化这个场景的时候,可以使用 WaitGroup。

WaitGroup 等一组比赛的所有选手都跑完后 5 分钟,才开始下一组比赛。下一组比赛还可以使用这个 WaitGroup 来控制,因为 WaitGroup 是可以重用的。只要 WaitGroup 的计数值恢复到零值的状态,那么它就可以被看作是新创建的 WaitGroup,被重复使用。

但是,如果我们在 WaitGroup 的计数值还没有恢复到零值的时候就重用,就会导致程序 panic。我们看一个例子,初始设置 WaitGroup 的计数值为 1,启动一个 goroutine 先调用 Done 方法,接着就调用 Add 方法,Add 方法有可能和主 goroutine 并发执行。

1
2
3
4
5
6
7
8
9
10
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
time.Sleep(time.Millisecond)
wg.Done() // 计数器减1
wg.Add(1) // 计数值加1
}()
wg.Wait() // 主goroutine等待,有可能和第7行并发执行
}

在这个例子中,第 6 行虽然让 WaitGroup 的计数恢复到 0,但是因为第 9 行有个 waiter 在等待,如果等待 Wait 的 goroutine,刚被唤醒就和 Add 调用(第 7 行)有并发执行的冲突,所以就会出现 panic。

总结一下:WaitGroup 虽然可以重用,但是是有一个前提的,那就是必须等到上一轮的 Wait 完成之后,才能重用 WaitGroup 执行下一轮的 Add/Wait,如果你在 Wait 还没执行完的时候就调用下一轮 Add 方法,就有可能出现 panic。


noCopy:辅助 vet 检查

我们刚刚在学习 WaitGroup 的数据结构时,提到了里面有一个 noCopy 字段。你还记得它的作用吗?其实,它就是指示 vet 工具在做检查的时候,这个数据结构不能做值复制使用。更严谨地说,是不能在第一次使用之后复制使用 ( must not be copied after first use)。

你可能会说了,为什么要把 noCopy 字段单独拿出来讲呢?一方面,把 noCopy 字段穿插到 waitgroup 代码中讲解,容易干扰我们对 WaitGroup 整体的理解。另一方面,也是非常重要的原因,noCopy 是一个通用的计数技术,其他并发原语中也会用到,所以单独介绍有助于你以后在实践中使用这个技术。

我们在 2.3小节 学习 Mutex 的时候用到了 vet 工具。vet 会对实现 Locker 接口的数据类型做静态检查,一旦代码中有复制使用这种数据类型的情况,就会发出警告。但是,WaitGroup 同步原语不就是 Add、Done 和 Wait 方法吗?vet 能检查出来吗?

其实是可以的。通过给 WaitGroup 添加一个 noCopy 字段,我们就可以为 WaitGroup 实现 Locker 接口,这样 vet 工具就可以做复制检查了。而且因为 noCopy 字段是未输出类型,所以 WaitGroup 不会暴露 Lock/Unlock 方法。

noCopy 字段的类型是 noCopy,它只是一个辅助的、用来帮助 vet 检查用的类型:

1
2
3
4
5
type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}

如果你想要自己定义的数据结构不被复制使用,或者说,不能通过 vet 工具检查出复制使用的报警,就可以通过嵌入 noCopy 这个数据类型来实现。


流行的 Go 开发项目中的坑

有网友在 Go 的issue 28123中提了以下的例子,你能发现这段代码有什么问题吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type TestStruct struct {
Wait sync.WaitGroup
}

func main() {
w := sync.WaitGroup{}
w.Add(1)
t := &TestStruct{
Wait: w,
}

t.Wait.Done()
fmt.Println("Finished")
}

这段代码最大的一个问题,就是第 9 行 copy 了 WaitGroup 的实例 w。虽然这段代码能执行成功,但确实是违反了 WaitGroup 使用之后不要复制的规则。在项目中,我们可以通过 vet 工具检查出这样的错误。

Docker issue 28161issue 27011 ,都是因为在重用 WaitGroup 的时候,没等前一次的 Wait 结束就 Add 导致的错误。Etcd issue 6534 也是重用 WaitGroup 的 Bug,没有等前一个 Wait 结束就 Add。

Kubernetes issue 59574 的 Bug 是忘记 Wait 之前增加计数了,这就属于我们通常认为几乎不可能出现的 Bug。

即使是开发 Go 语言的开发者自己,在使用 WaitGroup 的时候,也可能会犯错。比如 issue 12813,因为 defer 的使用,Add 方法可能在 Done 之后才执行,导致计数负值的 panic。


总结

学完这一讲,我们知道了使用 WaitGroup 容易犯的错,是不是有些手脚被束缚的感觉呢?其实大可不必,只要我们不是特别复杂地使用 WaitGroup,就不用有啥心理负担。

而关于如何避免错误使用 WaitGroup 的情况,我们只需要尽量保证下面 5 点就可以了:

  • 不重用 WaitGroup。新建一个 WaitGroup 不会带来多大的资源开销,重用反而更容易出错。
  • 保证所有的 Add 方法调用都在 Wait 之前。
  • 不传递负数给 Add 方法,只通过 Done 来给计数值减 1。
  • 不做多余的 Done 方法调用,保证 Add 的计数值和 Done 方法调用的数量是一样的。
  • 不遗漏 Done 方法的调用,否则会导致 Wait hang 住无法返回。




2.7 Cond:条件变量的实现机制及避坑指南

在 Java 面试中,经常被问到的一个知识点就是等待 / 通知(wait/notify)机制。面试官经常会这样考察候选人:请实现一个限定容量的队列(queue),当队列满或者空的时候,利用等待 / 通知机制实现阻塞或者唤醒。

在 Go 中,也可以实现一个类似的限定容量的队列,而且实现起来也比较简单,只要用条件变量(Cond)并发原语就可以。Cond 并发原语相对来说不是那么常用,但是在特定的场景使用会事半功倍,比如你需要在唤醒一个或者所有的等待者做一些检查操作的时候。
那么今天这一讲,我们就学习下 Cond 这个并发原语。


Go 标准库的 Cond

Go 标准库提供 Cond 原语的目的是,为等待 / 通知场景下的并发问题提供支持。Cond 通常应用于等待某个条件的一组 goroutine,等条件变为 true 的时候,其中一个 goroutine 或者所有的 goroutine 都会被唤醒执行。

顾名思义,Cond 是和某个条件相关,这个条件需要一组 goroutine 协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine 都会被阻塞住,只有这一组 goroutine 通过协作达到了这个条件,等待的 goroutine 才可能继续进行下去。

那这里等待的条件是什么呢?等待的条件,可以是某个变量达到了某个阈值或者某个时间点,也可以是一组变量分别都达到了某个阈值,还可以是某个对象的状态满足了特定的条件。总结来讲,等待的条件是一种可以用来计算结果是 true 还是 false 的条件。

从开发实践上,我们真正使用 Cond 的场景比较少,因为一旦遇到需要使用 Cond 的场景,我们更多地会使用 Channel 的方式(我们会在第 12 和第 13 讲展开 Channel 的用法)去实现,因为那才是更地道的 Go 语言的写法,甚至 Go 的开发者有个“把 Cond 从标准库移除”的提议(issue 21165)。而有的开发者认为,Cond 是唯一难以掌握的 Go 并发原语。至于其中原因,先卖个关子,到这一讲的后半部分再解释。

今天,这一讲我们就带你仔细地学一学 Cond 这个并发原语吧。


Cond 的基本用法

标准库中的 Cond 并发原语初始化的时候,需要关联一个 Locker 接口的实例,一般我们使用 Mutex 或者 RWMutex。

我们看一下 Cond 的实现:

1
2
3
4
5
type Cond
func NeWCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()

首先,Cond 关联的 Locker 实例可以通过 c.L 访问,它内部维护着一个先入先出的等待队列。

然后,我们分别看下它的三个方法 Broadcast、Signal 和 Wait 方法。

Signal 方法,允许调用者 Caller 唤醒一个等待此 Cond 的 goroutine。如果此时没有等待的 goroutine,显然无需通知 waiter;如果 Cond 等待队列中有一个或者多个等待的 goroutine,则需要从等待队列中移除第一个 goroutine 并把它唤醒。在其他编程语言中,比如 Java 语言中,Signal 方法也被叫做 notify 方法。

调用 Signal 方法时,不强求你一定要持有 c.L 的锁。

Broadcast 方法,允许调用者 Caller 唤醒所有等待此 Cond 的 goroutine。如果此时没有等待的 goroutine,显然无需通知 waiter;如果 Cond 等待队列中有一个或者多个等待的 goroutine,则清空所有等待的 goroutine,并全部唤醒。在其他编程语言中,比如 Java 语言中,Broadcast 方法也被叫做 notifyAll 方法。

同样地,调用 Broadcast 方法时,也不强求你一定持有 c.L 的锁。

Wait 方法,会把调用者 Caller 放入 Cond 的等待队列中并阻塞,直到被 Signal 或者 Broadcast 的方法从等待队列中移除并唤醒。

调用 Wait 方法时必须要持有 c.L 的锁。

Go 实现的 sync.Cond 的方法名是 Wait、Signal 和 Broadcast,这是计算机科学中条件变量的通用方法名。比如,C 语言中对应的方法名是 pthread_cond_wait、pthread_cond_signal 和 pthread_cond_broadcast。

知道了 Cond 提供的三个方法后,我们再通过一个百米赛跑开始时的例子,来学习下 Cond 的使用方法。10 个运动员进入赛场之后需要先做拉伸活动活动筋骨,向观众和粉丝招手致敬,在自己的赛道上做好准备;等所有的运动员都准备好之后,裁判员才会打响发令枪。

每个运动员做好准备之后,将 ready 加一,表明自己做好准备了,同时调用 Broadcast 方法通知裁判员。因为裁判员只有一个,所以这里可以直接替换成 Signal 方法调用。调用 Broadcast 方法的时候,我们并没有请求 c.L 锁,只是在更改等待变量的时候才使用到了锁。

裁判员会等待运动员都准备好(第 22 行)。虽然每个运动员准备好之后都唤醒了裁判员,但是裁判员被唤醒之后需要检查等待条件是否满足(运动员都准备好了)。可以看到,裁判员被唤醒之后一定要检查等待条件,如果条件不满足还是要继续等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func main() {
c := sync.NewCond(&sync.Mutex{})
var ready int

for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

// 加锁更改等待条件
c.L.Lock()
ready++
c.L.Unlock()

log.Printf("运动员#%d 已准备就绪\n", i)
// 广播唤醒所有的等待者
c.Broadcast()
}(i)
}

c.L.Lock()
for ready != 10 {
c.Wait()
log.Println("裁判员被唤醒一次")
}
c.L.Unlock()

//所有的运动员是否就绪
log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
}

你看,Cond 的使用其实没那么简单。它的复杂在于:一,这段代码有时候需要加锁,有时候可以不加;二,Wait 唤醒后需要检查条件;三,条件变量的更改,其实是需要原子操作或者互斥锁保护的。所以,有的开发者会认为,Cond 是唯一难以掌握的 Go 并发原语。


Cond 的实现原理
其实,Cond 的实现非常简单,或者说复杂的逻辑已经被 Locker 或者 runtime 的等待队列实现了。我们直接看看 Cond 的源码吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Cond struct {
noCopy noCopy

// 当观察或者修改等待条件的时候需要加锁
L Locker

// 等待队列
notify notifyList
checker copyChecker
}

func NewCond(l Locker) *Cond {
return &Cond{L: l}
}

func (c *Cond) Wait() {
c.checker.check()
// 增加到等待队列中
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 阻塞休眠直到被唤醒
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}

func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}

这部分源码确实很简单,以下是其中比较关键的逻辑。

runtime_notifyListXXX 是运行时实现的方法,实现了一个等待 / 通知的队列。如果你想深入学习这部分,可以再去看看 runtime/sema.go 代码中。

copyChecker 是一个辅助结构,可以在运行时检查 Cond 是否被复制使用。

Signal 和 Broadcast 只涉及到 notifyList 数据结构,不涉及到锁。

Wait 把调用者加入到等待队列时会释放锁,在被唤醒之后还会请求锁。在阻塞休眠期间,调用者是不持有锁的,这样能让其他 goroutine 有机会检查或者更新等待变量。

我们继续看看使用 Cond 常见的两个错误,一个是调用 Wait 的时候没有加锁,另一个是没有检查条件是否满足程序就继续执行了。


使用 Cond 的 2 个常见错误

我们先看 Cond 最常见的使用错误,也就是调用 Wait 的时候没有加锁

以前面百米赛跑的程序为例,在调用 cond.Wait 时,把前后的 Lock/Unlock 注释掉,如下面的代码中的第 20 行和第 25 行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func main() {
c := sync.NewCond(&sync.Mutex{})
var ready int

for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

// 加锁更改等待条件
c.L.Lock()
ready++
c.L.Unlock()

log.Printf("运动员#%d 已准备就绪\n", i)
// 广播唤醒所有的等待者
c.Broadcast()
}(i)
}

// c.L.Lock()
for ready != 10 {
c.Wait()
log.Println("裁判员被唤醒一次")
}
// c.L.Unlock()

//所有的运动员是否就绪
log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
}

再运行程序,就会报释放未加锁的 panic:

出现这个问题的原因在于,cond.Wait 方法的实现是,把当前调用者加入到 notify 队列之中后会释放锁(如果不释放锁,其他 Wait 的调用者就没有机会加入到 notify 队列中了),然后一直等待;等调用者被唤醒之后,又会去争抢这把锁。如果调用 Wait 之前不加锁的话,就有可能 Unlock 一个未加锁的 Locker。所以切记,调用 cond.Wait 方法之前一定要加锁

使用 Cond 的另一个常见错误是,只调用了一次 Wait,没有检查等待条件是否满足,结果条件没满足,程序就继续执行了。出现这个问题的原因在于,误以为 Cond 的使用,就像 WaitGroup 那样调用一下 Wait 方法等待那么简单。比如下面的代码中,把第 21 行和第 24 行注释掉:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func main() {
c := sync.NewCond(&sync.Mutex{})
var ready int

for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

// 加锁更改等待条件
c.L.Lock()
ready++
c.L.Unlock()

log.Printf("运动员#%d 已准备就绪\n", i)
// 广播唤醒所有的等待者
c.Broadcast()
}(i)
}

c.L.Lock()
// for ready != 10 {
c.Wait()
log.Println("裁判员被唤醒一次")
// }
c.L.Unlock()

//所有的运动员是否就绪
log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
}

运行这个程序,你会发现,可能只有几个运动员准备好之后程序就运行完了,而不是我们期望的所有运动员都准备好才进行下一步。原因在于,每一个运动员准备好之后都会唤醒所有的等待者,也就是这里的裁判员,比如第一个运动员准备好后就唤醒了裁判员,结果这个裁判员傻傻地没做任何检查,以为所有的运动员都准备好了,就继续执行了。

所以,我们一定要记住,waiter goroutine 被唤醒不等于等待条件被满足,只是有 goroutine 把它唤醒了而已,等待条件有可能已经满足了,也有可能不满足,我们需要进一步检查。你也可以理解为,等待者被唤醒,只是得到了一次检查的机会而已。

到这里,我们小结下。如果你想在使用 Cond 的时候避免犯错,只要时刻记住调用 cond.Wait 方法之前一定要加锁,以及 waiter goroutine 被唤醒不等于等待条件被满足这两个知识点。


知名项目中 Cond 的使用

Cond 在实际项目中被使用的机会比较少,原因总结起来有两个。

第一,同样的场景我们会使用其他的并发原语来替代。Go 特有的 Channel 类型,有一个应用很广泛的模式就是通知机制,这个模式使用起来也特别简单。所以很多情况下,我们会使用 Channel 而不是 Cond 实现 wait/notify 机制。

第二,对于简单的 wait/notify 场景,比如等待一组 goroutine 完成之后继续执行余下的代码,我们会使用 WaitGroup 来实现。因为 WaitGroup 的使用方法更简单,而且不容易出错。比如,上面百米赛跑的问题,就可以很方便地使用 WaitGroup 来实现。

所以,我在这一讲开头提到,Cond 的使用场景很少。先前的标准库内部有几个地方使用了 Cond,比如 io/pipe.go 等,后来都被其他的并发原语(比如 Channel)替换了,sync.Cond 的路越走越窄。但是,还是有一批忠实的“粉丝”坚持在使用 Cond,原因在于 Cond 有三点特性是 Channel 无法替代的:

  • Cond 和一个 Locker 关联,可以利用这个 Locker 对相关的依赖条件更改提供保护。

  • Cond 可以同时支持 Signal 和 Broadcast 方法,而 Channel 只能同时支持其中一种。

  • Cond 的 Broadcast 方法可以被重复调用。等待条件再次变成不满足的状态后,我们又可以调用 Broadcast 再次唤醒等待的 goroutine。这也是

    Channel 不能支持的,Channel 被 close 掉了之后不支持再 open。

开源项目中使用 sync.Cond 的代码少之又少,包括标准库原先一些使用 Cond 的代码也改成使用 Channel 实现了,所以别说找 Cond 相关的使用 Bug 了,想找到的一个使用的例子都不容易,我找了 Kubernetes 中的一个例子,我们一起看看它是如何使用 Cond 的。

Kubernetes 项目中定义了优先级队列 PriorityQueue 这样一个数据结构,用来实现 Pod 的调用。它内部有三个 Pod 的队列,即 activeQ、podBackoffQ 和 unschedulableQ,其中 activeQ 就是用来调度的活跃队列(heap)。

Pop 方法调用的时候,如果这个队列为空,并且这个队列没有 Close 的话,会调用 Cond 的 Wait 方法等待。

你可以看到,调用 Wait 方法的时候,调用者是持有锁的,并且被唤醒的时候检查等待条件(队列是否为空)。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 从队列中取出一个元素
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
p.lock.Lock()
defer p.lock.Unlock()
for p.activeQ.Len() == 0 { // 如果队列为空
if p.closed {
return nil, fmt.Errorf(queueClosed)
}
p.cond.Wait() // 等待,直到被唤醒
}
......
return pInfo, err
}

当 activeQ 增加新的元素时,会调用条件变量的 Boradcast 方法,通知被 Pop 阻塞的调用者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 增加元素到队列中
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
pInfo := p.newQueuedPodInfo(pod)
if err := p.activeQ.Add(pInfo); err != nil {//增加元素到队列中
klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
return err
}
......
p.cond.Broadcast() //通知其它等待的goroutine,队列中有元素了

return nil
}

这个优先级队列被关闭的时候,也会调用 Broadcast 方法,避免被 Pop 阻塞的调用者永远 hang 住。

1
2
3
4
5
6
7
func (p *PriorityQueue) Close() {
p.lock.Lock()
defer p.lock.Unlock()
close(p.stop)
p.closed = true
p.cond.Broadcast() //关闭时通知等待的goroutine,避免它们永远等待
}

你可以思考一下,这里为什么使用 Cond 这个并发原语,能不能换成 Channel 实现呢?


总结

好了,我们来做个总结。

Cond 是为等待 / 通知场景下的并发问题提供支持的。它提供了条件变量的三个基本方法 Signal、Broadcast 和 Wait,为并发的 goroutine 提供等待 / 通知机制。

在实践中,处理等待 / 通知的场景时,我们常常会使用 Channel 替换 Cond,因为 Channel 类型使用起来更简洁,而且不容易出错。但是对于需要重复调用 Broadcast 的场景,比如上面 Kubernetes 的例子,每次往队列中成功增加了元素后就需要调用 Broadcast 通知所有的等待者,使用 Cond 就再合适不过了。

使用 Cond 之所以容易出错,就是 Wait 调用需要加锁,以及被唤醒后一定要检查条件是否真的已经满足。你需要牢记这两点。

虽然我们讲到的百米赛跑的例子,也可以通过 WaitGroup 来实现,但是本质上 WaitGroup 和 Cond 是有区别的:WaitGroup 是主 goroutine 等待确定数量的子 goroutine 完成任务;而 Cond 是等待某个条件满足,这个条件的修改可以被任意多的 goroutine 更新,而且 Cond 的 Wait 不关心也不知道其他 goroutine 的数量,只关心等待条件。而且 Cond 还有单个通知的机制,也就是 Signal 方法。





2.8 Once:一个简约而不简单的并发原语

本节来讲一个简单的并发原语:Once。为什么要学习 Once 呢?Once 可以用来执行且仅仅执行一次动作,常常用于单例对象的初始化场景

那这节课,我们就从对单例对象进行初始化这件事儿说起。

初始化单例资源有很多方法,比如定义 package 级别的变量,这样程序在启动的时候就可以初始化:

1
2
3
4
5
package abc

import time

var startTime = time.Now()

或者在 init 函数中进行初始化:

1
2
3
4
5
6
7
package abc

var startTime time.Time

func init() {
startTime = time.Now()
}

又或者在 main 函数开始执行的时候,执行一个初始化的函数:

1
2
3
4
5
6
7
8
9
10
package abc

var startTime time.Tim

func initApp() {
startTime = time.Now()
}
func main() {
initApp()
}

这三种方法都是线程安全的,并且后两种方法还可以根据传入的参数实现定制化的初始化操作。

但是很多时候我们是要延迟进行初始化的,所以有时候单例资源的初始化,我们会使用下面的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"net"
"sync"
"time"
)

// 使用互斥锁保证线程(goroutine)安全
var connMu sync.Mutex
var conn net.Conn

func getConn() net.Conn {
connMu.Lock()
defer connMu.Unlock()

// 返回已创建好的连接
if conn != nil {
return conn
}

// 创建连接
conn, _ = net.DialTimeout("tcp", "baidu.com:80", 10*time.Second)
return conn
}

// 使用连接
func main() {
conn := getConn()
if conn == nil {
panic("conn is nil")
}
}

这种方式虽然实现起来简单,但是有性能问题。一旦连接创建好,每次请求的时候还是得竞争锁才能读取到这个连接,这是比较浪费资源的,因为连接如果创建好之后,其实就不需要锁的保护了。怎么办呢?

这个时候就可以使用这一讲要介绍的 Once 并发原语了。接下来会详细介绍 Once 的使用、实现和易错场景。


Once 的使用场景

sync.Once 只暴露了一个方法 Do,你可以多次调用 Do 方法,但是只有第一次调用 Do 方法时 f 参数才会执行,这里的 f 是一个无参数无返回值的函数

1
func (o *Once) Do(f func())

因为当且仅当第一次调用 Do 方法的时候参数 f 才会执行,即使第二次、第三次、第 n 次调用时 f 参数的值不一样,也不会被执行,比如下面的例子,虽然 f1 和 f2 是不同的函数,但是第二个函数 f2 就不会执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main


import (
"fmt"
"sync"
)

func main() {
var once sync.Once

// 第一个初始化函数
f1 := func() {
fmt.Println("in f1")
}
once.Do(f1) // 打印出 in f1

// 第二个初始化函数
f2 := func() {
fmt.Println("in f2")
}
once.Do(f2) // 无输出
}

因为这里的 f 参数是一个无参数无返回的函数,所以你可能会通过闭包的方式引用外面的参数,比如:

1
2
3
4
5
6
7
8
var addr = "baidu.com"

var conn net.Conn
var err error

once.Do(func() {
conn, err = net.Dial("tcp", addr)
})

而且在实际的使用中,绝大多数情况下,你会使用闭包的方式去初始化外部的一个资源。

你看,Once 的使用场景很明确,所以,在标准库内部实现中也常常能看到 Once 的身影。

比如标准库内部cache的实现上,就使用了 Once 初始化 Cache 资源,包括 defaultDir 值的获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func Default() *Cache { // 获取默认的Cache
defaultOnce.Do(initDefaultCache) // 初始化cache
return defaultCache
}

// 定义一个全局的cache变量,使用Once初始化,所以也定义了一个Once变量
var (
defaultOnce sync.Once
defaultCache *Cache
)

func initDefaultCache() { //初始化cache,也就是Once.Do使用的f函数
......
defaultCache = c
}

// 其它一些Once初始化的变量,比如defaultDir
var (
defaultDirOnce sync.Once
defaultDir string
defaultDirErr error
)

还有一些测试的时候初始化测试的资源(export_windows_test):

1
2
3
4
5
6
// 测试window系统调用时区相关函数
func ForceAusFromTZIForTesting() {
ResetLocalOnceForTest()
// 使用Once执行一次初始化
localOnce.Do(func() { initLocalFromTZI(&aus) })
}

除此之外,还有保证只调用一次 copyenv 的 envOnce,strings 包下的 Replacer,time 包中的测试,Go 拉取库时的proxy,net.pipe,crc64,Regexp,……,数不胜数。本小节重点介绍一下很值得我们学习的 math/big/sqrt.go 中实现的一个数据结构,它通过 Once 封装了一个只初始化一次的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 值是3.0或者0.0的一个数据结构
var threeOnce struct {
sync.Once
v *Float
}

// 返回此数据结构的值,如果还没有初始化为3.0,则初始化
func three() *Float {
threeOnce.Do(func() { // 使用Once初始化
threeOnce.v = NewFloat(3.0)
})
return threeOnce.v
}

它将 sync.Once 和 *Float 封装成一个对象,提供了只初始化一次的值 v。 你看它的 three 方法的实现,虽然每次都调用 threeOnce.Do 方法,但是参数只会被调用一次。

当你使用 Once 的时候,你也可以尝试采用这种结构,将值和 Once 封装成一个新的数据结构,提供只初始化一次的值。

总结一下 Once 并发原语解决的问题和使用场景:Once 常常用来初始化单例资源,或者并发访问只需初始化一次的共享资源,或者在测试的时候初始化一次测试资源


如何实现一个 Once?

很多人认为实现一个 Once 一样的并发原语很简单,只需使用一个 flag 标记是否初始化过即可,最多是用 atomic 原子操作这个 flag,比如下面的实现:

1
2
3
4
5
6
7
8
9
10
type Once struct {
done uint32
}

func (o *Once) Do(f func()) {
if !atomic.CompareAndSwapUint32(&o.done, 0, 1) {
return
}
f()
}

这确实是一种实现方式,但是,这个实现有一个很大的问题,就是如果参数 f 执行很慢的话,后续调用 Do 方法的 goroutine 虽然看到 done 已经设置为执行过了,但是获取某些初始化资源的时候可能会得到空的资源,因为 f 还没有执行完。

所以,一个正确的 Once 实现要使用一个互斥锁,这样初始化的时候如果有并发的 goroutine,就会进入 doSlow 方法。互斥锁的机制保证只有一个 goroutine 进行初始化,同时利用双检查的机制(double-checking),再次判断 o.done 是否为 0,如果为 0,则是第一次执行,执行完毕后,就将 o.done 设置为 1,然后释放锁。

即使此时有多个 goroutine 同时进入了 doSlow 方法,因为双检查的机制,后续的 goroutine 会看到 o.done 的值为 1,也不会再次执行 f。

这样既保证了并发的 goroutine 会等待 f 完成,而且还不会多次执行 f。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Once struct {
done uint32
m Mutex
}

func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}

func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
// 双检查
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}

好了,到这里我们就了解了 Once 的使用场景,很明确,同时呢,也感受到 Once 的实现也是相对简单的。在实践中,其实很少会出现错误使用 Once 的情况,但是就像墨菲定律说的,凡是可能出错的事就一定会出错。使用 Once 也有可能出现两种错误场景,尽管非常罕见。这里提前讲给你,咱打个预防针。


使用 Once 可能出现的 2 种错误

第一种错误:死锁

你已经知道了 Do 方法会执行一次 f,但是如果 f 中再次调用这个 Once 的 Do 方法的话,就会导致死锁的情况出现。这还不是无限递归的情况,而是的的确确的 Lock 的递归调用导致的死锁。

1
2
3
4
5
6
7
8
func main() {
var once sync.Once
once.Do(func() {
once.Do(func() {
fmt.Println("初始化")
})
})
}

当然,想要避免这种情况的出现,就不要在 f 参数中调用当前的这个 Once,不管是直接的还是间接的。

第二种错误:未初始化

如果 f 方法执行的时候 panic,或者 f 执行初始化资源的时候失败了,这个时候,Once 还是会认为初次执行已经成功了,即使再次调用 Do 方法,也不会再次执行 f。

比如下面的例子,由于一些防火墙的原因,googleConn 并没有被正确的初始化,后面如果想当然认为既然执行了 Do 方法 googleConn 就已经初始化的话,会抛出空指针的错误:

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
var once sync.Once
var googleConn net.Conn // 到Google网站的一个连接

once.Do(func() {
// 建立到google.com的连接,有可能因为网络的原因,googleConn并没有建立成功,此时它的值为nil
googleConn, _ = net.Dial("tcp", "google.com:80")
})
// 发送http请求
googleConn.Write([]byte("GET / HTTP/1.1\r\nHost: google.com\r\n Accept: */*\r\n\r\n"))
io.Copy(os.Stdout, googleConn)
}

既然执行过 Once.Do 方法也可能因为函数执行失败的原因未初始化资源,并且以后也没机会再次初始化资源,那么这种初始化未完成的问题该怎么解决呢?

这里来介绍一招独家秘笈,我们可以自己实现一个类似 Once 的并发原语,既可以返回当前调用 Do 方法是否正确完成,还可以在初始化失败后调用 Do 方法再次尝试初始化,直到初始化成功才不再初始化了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 一个功能更加强大的Once
type Once struct {
m sync.Mutex
done uint32
}
// 传入的函数f有返回值error,如果初始化失败,需要返回失败的error
// Do方法会把这个error返回给调用者
func (o *Once) Do(f func() error) error {
if atomic.LoadUint32(&o.done) == 1 { //fast path
return nil
}
return o.slowDo(f)
}
// 如果还没有初始化
func (o *Once) slowDo(f func() error) error {
o.m.Lock()
defer o.m.Unlock()
var err error
if o.done == 0 { // 双检查,还没有初始化
err = f()
if err == nil { // 初始化成功才将标记置为已初始化
atomic.StoreUint32(&o.done, 1)
}
}
return err
}

我们所做的改变就是 Do 方法和参数 f 函数都会返回 error,如果 f 执行失败,会把这个错误信息返回。

对 slowDo 方法也做了调整,如果 f 调用失败,我们不会更改 done 字段的值,这样后续的 goroutine 还会继续调用 f。如果 f 执行成功,才会修改 done 的值为 1。

可以说,真是一顿操作猛如虎,我们使用 Once 有点得心应手的感觉了。等等,还有个问题,我们怎么查询是否初始化过呢?

目前的 Once 实现可以保证你调用任意次数的 once.Do 方法,它只会执行这个方法一次。但是,有时候我们需要打一个标记。如果初始化后我们就去执行其它的操作,标准库的 Once 并不会告诉你是否初始化完成了,只是让你放心大胆地去执行 Do 方法,所以,你还需要一个辅助变量,自己去检查是否初始化过了,比如通过下面的代码中的 inited 字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
type AnimalStore struct {once   sync.Once;inited uint32}
func (a *AnimalStore) Init() // 可以被并发调用
a.once.Do(func() {
longOperationSetupDbOpenFilesQueuesEtc()
atomic.StoreUint32(&a.inited, 1)
})
}
func (a *AnimalStore) CountOfCats() (int, error) { // 另外一个goroutine
if atomic.LoadUint32(&a.inited) == 0 { // 初始化后才会执行真正的业务逻辑
return 0, NotYetInitedError
}
//Real operation
}

当然,通过这段代码,我们可以解决这类问题,但是,如果官方的 Once 类型有 Done 这样一个方法的话,我们就可以直接使用了。这是有人在 Go 代码库中提出的一个 issue(#41690)。对于这类问题,一般都会被建议采用其它类型,或者自己去扩展。我们可以尝试扩展这个并发原语:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Once 是一个扩展的sync.Once类型,提供了一个Done方法
type Once struct {
sync.Once
}

// Done 返回此Once是否执行过
// 如果执行过则返回true
// 如果没有执行过或者正在执行,返回false
func (o *Once) Done() bool {
return atomic.LoadUint32((*uint32)(unsafe.Pointer(&o.Once))) == 1
}

func main() {
var flag Once
fmt.Println(flag.Done()) //false

flag.Do(func() {
time.Sleep(time.Second)
})

fmt.Println(flag.Done()) //true
}

好了,到这里关于并发原语 Once 的内容讲得就差不多了。最后呢,和你分享一个 Once 的踩坑案例。

其实啊,使用 Once 真的不容易犯错,想犯错都很困难,因为很少有人会傻傻地在初始化函数 f 中递归调用 f,这种死锁的现象几乎不会发生。另外如果函数初始化不成功,我们一般会 panic,或者在使用的时候做检查,会及早发现这个问题,在初始化函数中加强代码。

所以查看大部分的 Go 项目,几乎找不到 Once 的错误使用场景,不过原作者还是发现了一个。这个 issue 先从另外一个需求 (go#25955) 谈起。

Once 的踩坑案例

go#25955 有网友提出一个需求,希望 Once 提供一个 Reset 方法,能够将 Once 重置为初始化的状态。比如下面的例子,St 通过两个 Once 控制它的 Open/Close 状态。但是在 Close 之后再调用 Open 的话,不会再执行 init 函数,因为 Once 只会执行一次初始化函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type St struct {
openOnce *sync.Once
closeOnce *sync.Once
}

func(st *St) Open(){
st.openOnce.Do(func() { ... }) // init
...
}

func(st *St) Close(){
st.closeOnce.Do(func() { ... }) // deinit
...
}

所以提交这个 Issue 的开发者希望 Once 增加一个 Reset 方法,Reset 之后再调用 once.Do 就又可以初始化了。

Go 的核心开发者 Ian Lance Taylor 给他了一个简单的解决方案。在这个例子中,只使用一个 ponce *sync.Once 做初始化,Reset 的时候给 ponce 这个变量赋值一个新的 Once 实例即可 (ponce = new(sync.Once))。Once 的本意就是执行一次,所以 Reset 破坏了这个并发原语的本意。

这个解决方案一点都没问题,可以很好地解决这位开发者的需求。Docker 较早的版本(1.11.2)中使用了它们的一个网络库 libnetwork,这个网络库在使用 Once 的时候就使用 Ian Lance Taylor 介绍的方法,但是不幸的是,它的 Reset 方法中又改变了 Once 指针的值,导致程序 panic 了。原始逻辑比较复杂,一个简化版可重现的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"fmt"
"sync"
"time"
)

// 一个组合的并发原语
type MuOnce struct {
sync.RWMutex
sync.Once
mtime time.Time
vals []string
}

// 相当于reset方法,会将m.Once重新复制一个Once
func (m *MuOnce) refresh() {
m.Lock()
defer m.Unlock()
m.Once = sync.Once{}
m.mtime = time.Now()
m.vals = []string{m.mtime.String()}
}

// 获取某个初始化的值,如果超过某个时间,会reset Once
func (m *MuOnce) strings() []string {
now := time.Now()
m.RLock()
if now.After(m.mtime) {
defer m.Do(m.refresh) // 使用refresh函数重新初始化
}
vals := m.vals
m.RUnlock()
return vals
}

func main() {
fmt.Println("Hello, playground")
m := new(MuOnce)
fmt.Println(m.strings())
fmt.Println(m.strings())
}

如果你执行这段代码就会 panic:

原因在于第 31 行执行 m.Once.Do 方法的时候,使用的是 m.Once 的指针,然后调用 m.refresh,在执行 m.refresh 的时候 Once 内部的 Mutex 首先会加锁(可以再翻看一下这一讲的 Once 的实现原理), 但是,在 refresh 中更改了 Once 指针的值之后,结果在执行完 refresh 释放锁的时候,释放的是一个刚初始化未加锁的 Mutex,所以就 panic 了。

如果你还不太明白,再给你简化成一个更简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main


import (
"sync"
)

type Once struct {
m sync.Mutex
}

func (o *Once) doSlow() {
o.m.Lock()
defer o.m.Unlock()

// 这里更新的o指针的值!!!!!!!, 会导致上一行Unlock出错
*o = Once{}
}

func main() {
var once Once
once.doSlow()
}

doSlow 方法就演示了这个错误。Ian Lance Taylor 介绍的 Reset 方法没有错误,但是你在使用的时候千万别再初始化函数中 Reset 这个 Once,否则势必会导致 Unlock 一个未加锁的 Mutex 的错误。

总的来说,这还是对 Once 的实现机制不熟悉,又进行复杂使用导致的错误。不过最新版的 libnetwork 相关的地方已经去掉了 Once 的使用了。所以,我们一起来看这个案例,主要目的还是想巩固一下对 Once 的理解。


总结

今天我们一起学习了 Once,我们常常使用它来实现单例模式。

单例是 23 种设计模式之一,也是常常引起争议的设计模式之一,甚至有人把它归为反模式。为什么说它是反模式呢,我拿标准库中的单例模式给你介绍下。

因为 Go 没有 immutable 类型,导致我们声明的全局变量都是可变的,别的地方或者第三方库可以随意更改这些变量。比如 package io 中定义了几个全局变量,比如 io.EOF:

1
var EOF = errors.New("EOF") 

因为它是一个 package 级别的变量,我们可以在程序中偷偷把它改了,这会导致一些依赖 io.EOF 这个变量做判断的代码出错。

1
io.EOF = errors.New("我们自己定义的EOF")

从我个人的角度来说,一些单例(全局变量)的确很方便,比如 Buffer 池或者连接池,所以有时候我们也不要谈虎色变。虽然有人把单例模式称之为反模式,但毕竟只能代表一部分开发者的观点,否则也不会把它列在 23 种设计模式中了。

如果你真的担心这个 package 级别的变量被人修改,你可以不把它们暴露出来,而是提供一个只读的 GetXXX 的方法,这样别人就不会进行修改了。

而且,Once 不只应用于单例模式,一些变量在也需要在使用的时候做延迟初始化,所以也是可以使用 Once 处理这些场景的。

总而言之,Once 的应用场景还是很广泛的。一旦你遇到只需要初始化一次的场景,首先想到的就应该是 Once 并发原语





2.9 map:如何实现线程安全的map类型?

哈希表(Hash Table)这个数据结构,我们已经非常熟悉了。它实现的就是 key-value 之间的映射关系,主要提供的方法包括 Add、Lookup、Delete 等。因为这种数据结构是一个基础的数据结构,每个 key 都会有一个唯一的索引值,通过索引可以很快地找到对应的值,所以使用哈希表进行数据的插入和读取都是很快的。Go 语言本身就内建了这样一个数据结构,也就是 map 数据类型

今天呢,我们就先来学习 Go 语言内建的这个 map 类型,了解它的基本使用方法和使用陷阱,然后再学习如何实现线程安全的 map 类型,最后还会给你介绍 Go 标准库中线程安全的 sync.Map 类型。学完了这节课,你可以学会几种可以并发访问的 map 类型。


map 的基本使用方法

Go 内建的 map 类型如下:

1
map[K]V

其中,key 类型的 K 必须是可比较的(comparable),也就是可以通过 == 和 != 操作符进行比较;value 的值和类型无所谓,可以是任意的类型,或者为 nil。

在 Go 语言中,bool、整数、浮点数、复数、字符串、指针、Channel、接口都是可比较的,包含可比较元素的 struct 和数组,这俩也是可比较的,而 slice、map、函数值都是不可比较的。

那么,上面这些可比较的数据类型都可以作为 map 的 key 吗?显然不是。通常情况下,我们会选择内建的基本类型,比如整数、字符串做 key 的类型,因为这样最方便。

这里有一点需要注意,如果使用 struct 类型做 key 其实是有坑的,因为如果 struct 的某个字段值修改了,查询 map 时无法获取它 add 进去的值,如下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type mapKey struct {
key int
}

func main() {
var m = make(map[mapKey]string)
var key = mapKey{10}


m[key] = "hello"
fmt.Printf("m[key]=%s\n", m[key])


// 修改key的字段的值后再次查询map,无法获取刚才add进去的值
key.key = 100
fmt.Printf("再次查询m[key]=%s\n", m[key])
}

那该怎么办呢?如果要使用 struct 作为 key,我们要保证 struct 对象在逻辑上是不可变的,这样才会保证 map 的逻辑没有问题。

以上就是选取 key 类型的注意点了。接下来,我们看一下使用 map[key]函数时需要注意的一个知识点。在 Go 中,map[key]函数返回结果可以是一个值,也可以是两个值,这是容易让人迷惑的地方。原因在于,如果获取一个不存在的 key 对应的值时,会返回零值。为了区分真正的零值和 key 不存在这两种情况,可以根据第二个返回值来区分,如下面的代码的第 6 行、第 7 行:

1
2
3
4
5
6
7
8
9
func main() {
var m = make(map[string]int)
m["a"] = 0
fmt.Printf("a=%d; b=%d\n", m["a"], m["b"])

av, aexisted := m["a"]
bv, bexisted := m["b"]
fmt.Printf("a=%d, existed: %t; b=%d, existed: %t\n", av, aexisted, bv, bexisted)
}

map 是无序的,所以当遍历一个 map 对象的时候,迭代的元素的顺序是不确定的,无法保证两次遍历的顺序是一样的,也不能保证和插入的顺序一致。那怎么办呢?如果我们想要按照 key 的顺序获取 map 的值,需要先取出所有的 key 进行排序,然后按照这个排序的 key 依次获取对应的值。而如果我们想要保证元素有序,比如按照元素插入的顺序进行遍历,可以使用辅助的数据结构,比如orderedmap,来记录插入顺序。

好了,总结下关于 map 我们需要掌握的内容:map 的类型是 map[key],key 类型的 K 必须是可比较的,通常情况下,我们会选择内建的基本类型,比如整数、字符串做 key 的类型。如果要使用 struct 作为 key,我们要保证 struct 对象在逻辑上是不可变的。在 Go 中,map[key]函数返回结果可以是一个值,也可以是两个值。map 是无序的,如果我们想要保证遍历 map 时元素有序,可以使用辅助的数据结构,比如orderedmap


使用 map 的 2 种常见错误

那接下来,我们来看使用 map 最常犯的两个错误,就是未初始化和并发读写。

常见错误一:未初始化

和 slice 或者 Mutex、RWmutex 等 struct 类型不同,map 对象必须在使用之前初始化。如果不初始化就直接赋值的话,会出现 panic 异常,比如下面的例子,m 实例还没有初始化就直接进行操作会导致 panic(第 3 行):

1
2
3
4
func main() {
var m map[int]int
m[100] = 100
}

解决办法就是在第 2 行初始化这个实例(m := make(map[int]int))。

从一个 nil 的 map 对象中获取值不会 panic,而是会得到零值,所以下面的代码不会报错:

1
2
3
4
func main() {
var m map[int]int
fmt.Println(m[100])
}

这个例子很简单,我们可以意识到 map 的初始化问题。但有时候 map 作为一个 struct 字段的时候,就很容易忘记初始化了。

1
2
3
4
5
6
7
8
9
10
11
12
13
type Counter struct {
Website string
Start time.Time
PageCounters map[string]int
}

func main() {
var c Counter
c.Website = "baidu.com"


c.PageCounters["/"]++
}

所以,关于初始化这一点,再强调一下,目前还没有工具可以检查,我们只能记住“别忘记初始化”这一条规则。

常见错误二:并发读写

对于 map 类型,另一个很容易犯的错误就是并发访问问题。这个易错点,相当令人讨厌,如果没有注意到并发问题,程序在运行的时候就有可能出现并发读写导致的 panic。

Go 内建的 map 对象不是线程(goroutine)安全的,并发读写的时候运行时会有检查,遇到并发问题就会导致 panic。

我们一起看一个并发访问 map 实例导致 panic 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
var m = make(map[int]int,10) // 初始化一个map
go func() {
for {
m[1] = 1 //设置key
}
}()

go func() {
for {
_ = m[2] //访问这个map
}
}()
select {}
}

虽然这段代码看起来是读写 goroutine 各自操作不同的元素,貌似 map 也没有扩容的问题,但是运行时检测到同时对 map 对象有并发访问,就会直接 panic。panic 信息会告诉我们代码中哪一行有读写问题,根据这个错误信息你就能快速定位出来是哪一个 map 对象在哪里出的问题了。

这个错误非常常见,是几乎每个人都会踩到的坑。其实,不只是我们写代码时容易犯这个错,一些知名的项目中也是屡次出现这个问题,比如 Docker issue 40772,在删除 map 对象的元素时忘记了加锁.

Docker issue 40772,Docker issue 35588、34540、39643 等等,也都有并发读写 map 的问题。

除了 Docker 中,Kubernetes 的 issue 84431、72464、68647、64484、48045、45593、37560 等,以及 TiDB 的 issue 14960 和 17494 等,也出现了这个错误。

这么多人都会踩的坑,有啥解决方案吗?肯定有,那接下来,我们就继续来看如何解决内建 map 的并发读写问题。


加读写锁:扩展 map,支持并发读写

比较遗憾的是,目前 Go 还没有正式发布泛型特性,我们还不能实现一个通用的支持泛型的加锁 map。但是,将要发布的泛型方案已经可以验证测试了,离发布也不远了,也许发布之后 sync.Map 就支持泛型了。

当然了,如果没有泛型支持,我们也能解决这个问题。我们可以通过 interface{}来模拟泛型,但还是要涉及接口和具体类型的转换,比较复杂,还不如将要发布的泛型方案更直接、性能更好。

这里以一个具体的 map 类型为例,来演示利用读写锁实现线程安全的 map[int]int 类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type RWMap struct { // 一个读写锁保护的线程安全的map
sync.RWMutex // 读写锁保护下面的map字段
m map[int]int
}
// 新建一个RWMap
func NewRWMap(n int) *RWMap {
return &RWMap{
m: make(map[int]int, n),
}
}
func (m *RWMap) Get(k int) (int, bool) { //从map中读取一个值
m.RLock()
defer m.RUnlock()
v, existed := m.m[k] // 在锁的保护下从map中读取
return v, existed
}

func (m *RWMap) Set(k int, v int) { // 设置一个键值对
m.Lock() // 锁保护
defer m.Unlock()
m.m[k] = v
}

func (m *RWMap) Delete(k int) { //删除一个键
m.Lock() // 锁保护
defer m.Unlock()
delete(m.m, k)
}

func (m *RWMap) Len() int { // map的长度
m.RLock() // 锁保护
defer m.RUnlock()
return len(m.m)
}

func (m *RWMap) Each(f func(k, v int) bool) { // 遍历map
m.RLock() //遍历期间一直持有读锁
defer m.RUnlock()

for k, v := range m.m {
if !f(k, v) {
return
}
}
}

正如这段代码所示,对 map 对象的操作,无非就是增删改查和遍历等几种常见操作。我们可以把这些操作分为读和写两类,其中,查询和遍历可以看做读操作,增加、修改和删除可以看做写操作。如例子所示,我们可以通过读写锁对相应的操作进行保护。


分片加锁:更高效的并发 map

虽然使用读写锁可以提供线程安全的 map,但是在大量并发读写的情况下,锁的竞争会非常激烈。在第 4 讲中提到过,锁是性能下降的万恶之源之一。

在并发编程中,我们的一条原则就是尽量减少锁的使用。一些单线程单进程的应用(比如 Redis 等),基本上不需要使用锁去解决并发线程访问的问题,所以可以取得很高的性能。但是对于 Go 开发的应用程序来说,并发是常用的一个特性,在这种情况下,我们能做的就是,尽量减少锁的粒度和锁的持有时间

你可以优化业务处理的代码,以此来减少锁的持有时间,比如将串行的操作变成并行的子任务执行。不过,这就是另外的故事了,今天我们还是主要讲对同步原语的优化,所以这里重点讲如何减少锁的粒度。

减少锁的粒度常用的方法就是分片(Shard),将一把锁分成几把锁,每个锁控制一个分片。Go 比较知名的分片并发 map 的实现是orcaman/concurrent-map

它默认采用 32 个分片,GetShard 是一个关键的方法,能够根据 key 计算出分片索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
var SHARD_COUNT = 32

// 分成SHARD_COUNT个分片的map
type ConcurrentMap []*ConcurrentMapShared

// 通过RWMutex保护的线程安全的分片,包含一个map
type ConcurrentMapShared struct {
items map[string]interface{}
sync.RWMutex // Read Write mutex, guards access to internal map.
}

// 创建并发map
func New() ConcurrentMap {
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
}
return m
}

// 根据key计算分片索引
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}

增加或者查询的时候,首先根据分片索引得到分片对象,然后对分片对象加锁进行操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (m ConcurrentMap) Set(key string, value interface{}) {
// 根据key计算出对应的分片
shard := m.GetShard(key)
shard.Lock() //对这个分片加锁,执行业务操作
shard.items[key] = value
shard.Unlock()
}

func (m ConcurrentMap) Get(key string) (interface{}, bool) {
// 根据key计算出对应的分片
shard := m.GetShard(key)
shard.RLock()
// 从这个分片读取key的值
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}

当然,除了 GetShard 方法,ConcurrentMap 还提供了很多其他的方法。这些方法都是通过计算相应的分片实现的,目的是保证把锁的粒度限制在分片上。

好了,到这里我们就学会了解决 map 并发 panic 的两个方法:加锁和分片。

在原作者个人使用并发 map 的过程中,加锁和分片加锁这两种方案都比较常用,如果是追求更高的性能,显然是分片加锁更好,因为它可以降低锁的粒度,进而提高访问此 map 对象的吞吐。如果并发性能要求不是那么高的场景,简单加锁方式更简单

接下来,继续介绍 sync.Map,这是 Go 官方线程安全 map 的标准实现。虽然是官方标准,反而是不常用的,为什么呢?一句话来说就是 map 要解决的场景很难描述,很多时候在做抉择时根本就不知道该不该用它。但是呢,确实有一些特定的场景,我们需要用到 sync.Map 来实现,所以还是很有必要学习这个知识点。具体什么场景呢,我慢慢给你道来。


应对特殊场景的 sync.Map

Go 内建的 map 类型不是线程安全的,所以 Go 1.9 中增加了一个线程安全的 map,也就是 sync.Map。但是,我们一定要记住,这个 sync.Map 并不是用来替换内建的 map 类型的,它只能被应用在一些特殊的场景里。

那这些特殊的场景是啥呢?官方的文档中指出,在以下两个场景中使用 sync.Map,会比使用 map+RWMutex 的方式,性能要好得多:

  • 只会增长的缓存系统中,一个 key 只写入一次而被读很多次;
  • 多个 goroutine 为不相交的键集读、写和重写键值对。

这两个场景说得都比较笼统,而且,这些场景中还包含了一些特殊的情况。所以,官方建议你针对自己的场景做性能评测,如果确实能够显著提高性能,再使用 sync.Map。

这么来看,我们能用到 sync.Map 的场景确实不多。即使是 sync.Map 的作者 Bryan C. Mills,也很少使用 sync.Map,即便是在使用 sync.Map 的时候,也是需要临时查询它的 API,才能清楚记住它的功能。所以,我们可以把 sync.Map 看成一个生产环境中很少使用的同步原语。


sync.Map 的实现

那 sync.Map 是怎么实现的呢?它是如何解决并发问题提升性能的呢?其实 sync.Map 的实现有几个优化点,这里先列出来,我们后面慢慢分析。

  • 空间换时间。通过冗余的两个数据结构(只读的 read 字段、可写的 dirty),来减少加锁对性能的影响。对只读字段(read)的操作不需要加锁。
  • 优先从 read 字段读取、更新、删除,因为对 read 字段的读取不需要锁。
  • 动态调整。miss 次数多了之后,将 dirty 数据提升为 read,避免总是从 dirty 中加锁读取。 double-checking。加锁之后先还要再检查 read 字段,确定真的不存在才操作 dirty 字段。
  • 延迟删除。删除一个键值只是打标记,只有在提升 dirty 字段为 read 字段的时候才清理删除的数据。

要理解 sync.Map 这些优化点,我们还是得深入到它的设计和实现上,去学习它的处理方式。

我们先看一下 map 的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type Map struct {
mu Mutex
// 基本上你可以把它看成一个安全的只读的map
// 它包含的元素其实也是通过原子操作更新的,但是已删除的entry就需要加锁操作了
read atomic.Value // readOnly

// 包含需要加锁才能访问的元素
// 包括所有在read字段中但未被expunged(删除)的元素以及新加的元素
dirty map[interface{}]*entry

// 记录从read中读取miss的次数,一旦miss数和dirty长度一样了,就会把dirty提升为read,并把dirty置空
misses int
}

type readOnly struct {
m map[interface{}]*entry
amended bool // 当dirty中包含read没有的数据时为true,比如新增一条数据
}

// expunged是用来标识此项已经删掉的指针
// 当map中的一个项目被删除了,只是把它的值标记为expunged,以后才有机会真正删除此项
var expunged = unsafe.Pointer(new(interface{}))

// entry代表一个值
type entry struct {
p unsafe.Pointer // *interface{}
}

如果 dirty 字段非 nil 的话,map 的 read 字段和 dirty 字段会包含相同的非 expunged 的项,所以如果通过 read 字段更改了这个项的值,从 dirty 字段中也会读取到这个项的新值,因为本来它们指向的就是同一个地址。

dirty 包含重复项目的好处就是,一旦 miss 数达到阈值需要将 dirty 提升为 read 的话,只需简单地把 dirty 设置为 read 对象即可。不好的一点就是,当创建新的 dirty 对象的时候,需要逐条遍历 read,把非 expunged 的项复制到 dirty 对象中。

接下来,我们就深入到源码去看看 sync.map 的实现。在看这部分源码的过程中,我们只要重点关注 Store、Load 和 Delete 这 3 个核心的方法就可以了。

Store、Load 和 Delete 这三个核心函数的操作都是先从 read 字段中处理的,因为读取 read 字段的时候不用加锁。

Store 方法

我们先来看 Store 方法,它是用来设置一个键值对,或者更新一个键值对的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
// 如果read字段包含这个项,说明是更新,cas更新项目的值即可
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

// read中不存在,或者cas更新失败,就需要加锁访问dirty了
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok { // 双检查,看看read是否已经存在了
if e.unexpungeLocked() {
// 此项目先前已经被删除了,通过将它的值设置为nil,标记为unexpunged
m.dirty[key] = e
}
e.storeLocked(&value) // 更新
} else if e, ok := m.dirty[key]; ok { // 如果dirty中有此项
e.storeLocked(&value) // 直接更新
} else { // 否则就是一个新的key
if !read.amended { //如果dirty为nil
// 需要创建dirty对象,并且标记read的amended为true,
// 说明有元素它不包含而dirty包含
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value) //将新值增加到dirty对象中
}
m.mu.Unlock()
}

可以看出,Store 既可以是新增元素,也可以是更新元素。如果运气好的话,更新的是已存在的未被删除的元素,直接更新即可,不会用到锁。如果运气不好,需要更新(重用)删除的对象、更新还未提升的 dirty 中的对象,或者新增加元素的时候就会使用到了锁,这个时候,性能就会下降。

所以从这一点来看,sync.Map 适合那些只会增长的缓存系统,可以进行更新,但是不要删除,并且不要频繁地增加新元素。

新加的元素需要放入到 dirty 中,如果 dirty 为 nil,那么需要从 read 字段中复制出来一个 dirty 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (m *Map) dirtyLocked() {
if m.dirty != nil { // 如果dirty字段已经存在,不需要创建了
return
}

read, _ := m.read.Load().(readOnly) // 获取read字段
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m { // 遍历read字段
if !e.tryExpungeLocked() { // 把非punged的键值对复制到dirty中
m.dirty[k] = e
}
}
}

Load 方法

Load 方法用来读取一个 key 对应的值。它也是从 read 开始处理,一开始并不需要锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 首先从read处理
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended { // 如果不存在并且dirty不为nil(有新的元素)
m.mu.Lock()
// 双检查,看看read中现在是否存在此key
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {//依然不存在,并且dirty不为nil
e, ok = m.dirty[key]// 从dirty中读取
// 不管dirty中存不存在,miss数都加1
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load() //返回读取的对象,e既可能是从read中获得的,也可能是从dirty中获得的
}

如果幸运的话,我们从 read 中读取到了这个 key 对应的值,那么就不需要加锁了,性能会非常好。但是,如果请求的 key 不存在或者是新加的,就需要加锁从 dirty 中读取。所以,读取不存在的 key 会因为加锁而导致性能下降,读取还没有提升的新值的情况下也会因为加锁性能下降。

其中,missLocked 增加 miss 的时候,如果 miss 数等于 dirty 长度,会将 dirty 提升为 read,并将 dirty 置空。

1
2
3
4
5
6
7
8
9
func (m *Map) missLocked() {
m.misses++ // misses计数加一
if m.misses < len(m.dirty) { // 如果没达到阈值(dirty字段的长度),返回
return
}
m.read.Store(readOnly{m: m.dirty}) //把dirty字段的内存提升为read字段
m.dirty = nil // 清空dirty
m.misses = 0 // misses数重置为0
}

Delete 方法

sync.map 的第 3 个核心方法是 Delete 方法。在 Go 1.15 中欧长坤提供了一个 LoadAndDelete 的实现(go#issue 33762),所以 Delete 方法的核心改在了对 LoadAndDelete 中实现了。

同样地,Delete 方法是先从 read 操作开始,原因我们已经知道了,因为不需要锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// 双检查
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// 这一行长坤在1.15中实现的时候忘记加上了,导致在特殊的场景下有些key总是没有被回收
delete(m.dirty, key)
// miss数加1
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}

func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}

如果 read 中不存在,那么就需要从 dirty 中寻找这个项目。最终,如果项目存在就删除(将它的值标记为 nil)。如果项目不为 nil 或者没有被标记为 expunged,那么还可以把它的值返回。

最后,我补充一点,sync.map 还有一些 LoadAndDelete、LoadOrStore、Range 等辅助方法,但是没有 Len 这样查询 sync.Map 的包含项目数量的方法,并且官方也不准备提供。如果你想得到 sync.Map 的项目数量的话,你可能不得不通过 Range 逐个计数。


总结

Go 内置的 map 类型使用起来很方便,但是它有一个非常致命的缺陷,那就是它存在着并发问题,所以如果有多个 goroutine 同时并发访问这个 map,就会导致程序崩溃。所以 Go 官方 Blog 很早就提供了一种加锁的方法,还有后来提供了适用特定场景的线程安全的 sync.Map,还有第三方实现的分片式的 map,这些方法都可以应用于并发访问的场景。

这里的建议,也是 Go 开发者给的建议,就是通过性能测试,看看某种线程安全的 map 实现是否满足你的需求。

当然还有一些扩展其它功能的 map 实现,比如带有过期功能的timedmap、使用红黑树实现的 key 有序的treemap等,因为和并发问题没有关系,就不详细介绍了。这里我给你提供了链接,你可以自己探索。





2.10 Pool:性能提升大杀器 (pending)

Go 是一个自动垃圾回收的编程语言,采用三色并发标记算法标记对象并回收。和其它没有自动垃圾回收的编程语言不同,使用 Go 语言创建对象的时候,我们没有回收 / 释放的心理负担,想用就用,想创建就创建。

但是,如果你想使用 Go 开发一个高性能的应用程序的话,就必须考虑垃圾回收给性能带来的影响,毕竟,Go 的自动垃圾回收机制还是有一个 STW(stop-the-world,程序暂停)的时间,而且,大量地创建在堆上的对象,也会影响垃圾回收标记的时间。

所以,一般我们做性能优化的时候,会采用对象池的方式,把不用的对象回收起来,避免被垃圾回收掉,这样使用的时候就不必在堆上重新创建了。

不止如此,像数据库连接、TCP 的长连接,这些连接在创建的时候是一个非常耗时的操作。如果每次都创建一个新的连接对象,耗时较长,很可能整个业务的大部分耗时都花在了创建连接上。

所以,如果我们能把这些连接保存下来,避免每次使用的时候都重新创建,不仅可以大大减少业务的耗时,还能提高应用程序的整体性能。

Go 标准库中提供了一个通用的 Pool 数据结构,也就是 sync.Pool,我们使用它可以创建池化的对象。这节课我会详细给你介绍一下 sync.Pool 的使用方法、实现原理以及常见的坑,帮助你全方位地掌握标准库的 Pool。

不过,这个类型也有一些使用起来不太方便的地方,就是它池化的对象可能会被垃圾回收掉,这对于数据库长连接等场景是不合适的。所以在这一讲中,我会专门介绍其它的一些 Pool,包括 TCP 连接池、数据库连接池等等。

除此之外,我还会专门介绍一个池的应用场景: Worker Pool,或者叫做 goroutine pool,这也是常用的一种并发模式,可以使用有限的 goroutine 资源去处理大量的业务数据。


sync.Pool

首先,我们来学习下标准库提供的 sync.Pool 数据类型。

sync.Pool 数据类型用来保存一组可独立访问的临时对象。请注意这里加粗的“临时”这两个字,它说明了 sync.Pool 这个数据类型的特点,也就是说,它池化的对象会在未来的某个时候被毫无预兆地移除掉。而且,如果没有别的对象引用这个被移除的对象的话,这个被移除的对象就会被垃圾回收掉。

因为 Pool 可以有效地减少新对象的申请,从而提高程序性能,所以 Go 内部库也用到了 sync.Pool,比如 fmt 包,它会使用一个动态大小的 buffer 池做输出缓存,当大量的 goroutine 并发输出的时候,就会创建比较多的 buffer,并且在不需要的时候回收掉。

有两个知识点你需要记住:

  1. sync.Pool 本身就是线程安全的,多个 goroutine 可以并发地调用它的方法存取对象;
  2. sync.Pool 不可在使用之后再复制使用。

sync.Pool 的使用方法

知道了 sync.Pool 这个数据类型的特点,接下来,我们来学习下它的使用方法。其实,这个数据类型不难,它只提供了三个对外的方法:New、Get 和 Put。

  1. New

    Pool struct 包含一个 New 字段,这个字段的类型是函数 func() interface{}。当调用 Pool 的 Get 方法从池中获取元素,没有更多的空闲元素可返回时,就会调用这个 New 方法来创建新的元素。如果你没有设置 New 字段,没有更多的空闲元素可返回时,Get 方法将返回 nil,表明当前没有可用的元素。

    有趣的是,New 是可变的字段。这就意味着,你可以在程序运行的时候改变创建元素的方法。当然,很少有人会这么做,因为一般我们创建元素的逻辑都是一致的,要创建的也是同一类的元素,所以你在使用 Pool 的时候也没必要玩一些“花活”,在程序运行时更改 New 的值。

  2. Get

    如果调用这个方法,就会从 Pool 取走一个元素,这也就意味着,这个元素会从 Pool 中移除,返回给调用者。不过,除了返回值是正常实例化的元素,Get 方法的返回值还可能会是一个 nil(Pool.New 字段没有设置,又没有空闲元素可以返回),所以你在使用的时候,可能需要判断。

  3. Put

    这个方法用于将一个元素返还给 Pool,Pool 会把这个元素保存到池中,并且可以复用。但如果 Put 一个 nil 值,Pool 就会忽略这个值。


好了,了解了这几个方法,下面我们看看 sync.Pool 最常用的一个场景:buffer 池(缓冲池)。

因为 byte slice 是经常被创建销毁的一类对象,使用 buffer 池可以缓存已经创建的 byte slice,比如,著名的静态网站生成工具 Hugo 中,就包含这样的实现bufpool,你可以看一下下面这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var buffers = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}

func GetBuffer() *bytes.Buffer {
return buffers.Get().(*bytes.Buffer)
}

func PutBuffer(buf *bytes.Buffer) {
buf.Reset()
buffers.Put(buf)
}

除了 Hugo,这段 buffer 池的代码非常常用。很可能你在阅读其它项目的代码的时候就碰到过,或者是你自己实现 buffer 池的时候也会这么去实现,但是请你注意了,这段代码是有问题的,你一定不要将上面的代码应用到实际的产品中。它可能会有内存泄漏的问题,下面我会重点讲这个问题。

bytes.Buffer

bytes.Buffer 是 Go 标准库 bytes 包中的一个结构体,提供了一个可变大小的字节缓冲区,用于高效地操作字节切片。它在处理字符串构建、数据流操作以及实现 io.Readerio.Writer 接口时非常有用。

主要特性

  1. 动态扩展

    • bytes.Buffer 可以根据需要动态扩展其容量,避免频繁分配内存,提高性能。
  2. 实现接口

    • 实现了 io.Readerio.Writer 接口,使其可以方便地用于各种 I/O 操作中,如文件读写、网络通信等。
  3. 高效的字符串构建

    • 相比于使用字符串拼接(+ 操作符),使用 bytes.Buffer 构建字符串更高效,特别是在需要进行大量拼接时。
  4. 内存复用

    • 通过复用内部缓冲区,减少了内存分配和垃圾回收的开销。

常见用途

  1. 字符串和字节切片的构建

    • 适用于需要逐步构建复杂字符串或字节数据的场景。
  2. 数据流操作

    • 作为临时存储,聚合来自不同来源的数据,然后一次性处理或传输。
  3. 实现 io.Readerio.Writer

    • 在需要读取或写入数据的自定义函数或方法中,作为中间缓冲区使用。
  4. 网络编程

    • 构建需要发送的消息或解析接收到的消息,尤其是在处理协议数据时。

常用方法

  • Write(p []byte) (n int, err error)
    向缓冲区写入字节切片 p

  • WriteString(s string) (n int, err error)
    向缓冲区写入字符串 s

  • Read(p []byte) (n int, err error)
    从缓冲区读取数据到 p

  • Bytes() []byte
    返回缓冲区的内容作为字节切片。

  • String() string
    返回缓冲区的内容作为字符串。

  • Reset()
    重置缓冲区,清空所有内容。

  • Len() int
    返回缓冲区当前的长度。

  • Cap() int
    返回缓冲区的容量。


示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"bytes"
"fmt"
"io"
"os"
)

func main() {
// 创建一个新的 Buffer
var buffer bytes.Buffer

// 向 Buffer 写入字符串
buffer.Write([]byte("Hello "))
buffer.WriteString("World!")

// 将 Buffer 的内容作为字符串输出
fmt.Println(buffer.String()) // 输出: Hello World!

// 将 Buffer 的内容写入标准输出
buffer.WriteTo(os.Stdout) // 输出: Hello World!

// 从 Buffer 读取数据
data := make([]byte, 5)
buffer.Read(data)
fmt.Println(string(data)) // 输出: Hello

// 重置 Buffer
buffer.Reset()
fmt.Println("Buffer reset, length:", buffer.Len()) // 输出: Buffer reset, length: 0
}

注意事项

  1. 并发安全性

    • bytes.Buffer 不是并发安全的。如果需要在多个 goroutine 中同时访问同一个 Buffer 实例,需使用外部同步机制(如 sync.Mutex)。
  2. 性能优化

    • 在只需要构建字符串的场景下,可以考虑使用 strings.Builder,它在某些情况下比 bytes.Buffer 更高效。
  3. 内存管理

    • 虽然 bytes.Buffer 会自动管理缓冲区大小,但在处理非常大的数据时,仍需注意内存的使用情况,避免过度分配。

总结

bytes.Buffer 是 Go 语言中一个强大且灵活的工具,适用于多种需要高效字节和字符串操作的场景。通过其实现的接口和丰富的方法,开发者可以轻松地进行数据的读写、构建和转换操作。在实际开发中,合理选择和使用 bytes.Buffer 能显著提升代码的性能和可维护性。


实现原理

了解了 sync.Pool 的基本使用方法,下面我们就来重点学习下它的实现。

Go 1.13 之前的 sync.Pool 的实现有 2 大问题:

  1. 每次 GC 都会回收创建的对象。

    如果缓存元素数量太多,就会导致 STW 耗时变长;缓存元素都被回收后,会导致 Get 命中率下降,Get 方法不得不新创建很多对象

  2. 底层实现使用了 Mutex,对这个锁并发请求竞争激烈的时候,会导致性能的下降。

在 Go 1.13 中,sync.Pool 做了大量的优化。前几讲中我提到过,提高并发程序性能的优化点是尽量不要使用锁,如果不得已使用了锁,就把锁 Go 的粒度降到最低。Go 对 Pool 的优化就是避免使用锁,同时将加锁的 queue 改成 lock-free 的 queue 的实现,给即将移除的元素再多一次“复活”的机会

当前,sync.Pool 的数据结构如下图所示:

Pool 最重要的两个字段是 local 和 victim,因为它们两个主要用来存储空闲的元素。弄清楚这两个字段的处理逻辑,你就能完全掌握 sync.Pool 的实现了。下面我们来看看这两个字段的关系。

每次垃圾回收的时候,Pool 会把 victim 中的对象移除,然后把 local 的数据给 victim,这样的话,local 就会被清空,而 victim 就像一个垃圾分拣站,里面的东西可能会被当做垃圾丢弃了,但是里面有用的东西也可能被捡回来重新使用。

victim 中的元素如果被 Get 取走,那么这个元素就很幸运,因为它又“活”过来了。但是,如果这个时候 Get 的并发不是很大,元素没有被 Get 取走,那么就会被移除掉,因为没有别人引用它的话,就会被垃圾回收掉。

下面的代码是垃圾回收时 sync.Pool 的处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func poolCleanup() {
// 丢弃当前victim, STW所以不用加锁
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}

// 将local复制给victim, 并将原local置为nil
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}

oldPools, allPools = allPools, nil
}

在这段代码中,你需要关注一下 local 字段,因为所有当前主要的空闲可用的元素都存放在 local 字段中,请求元素时也是优先从 local 字段中查找可用的元素。local 字段包含一个 poolLocalInternal 字段,并提供 CPU 缓存对齐,从而避免 false sharing。

而 poolLocalInternal 也包含两个字段:private 和 shared。

  • private,代表一个缓存的元素,而且只能由相应的一个 P 存取。因为一个 P 同时只能执行一个 goroutine,所以不会有并发的问题。
  • shared,可以由任意的 P 访问,但是只有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail,相当于只有一个本地的 P 作为生产者(Producer),多个 P 作为消费者(Consumer),它是使用一个 local-free 的 queue 列表实现的。




2.11 Context:信息穿透上下文

假设有一天你进入办公室,突然同事们都围住你,然后大喊“小王小王你最帅”,此时你可能一头雾水,只能尴尬地笑笑。为啥呢?因为你缺少上下文的信息,不知道之前发生了什么。

但是,如果同事告诉你,由于你业绩突出,一天之内就把云服务化的主要架构写好了,因此被评为 9 月份的工作之星,总经理还特意给你发 1 万元的奖金,那么,你心里就很清楚了,原来同事恭喜你,是因为你的工作被表扬了,还获得了奖金。同事告诉你的这些前因后果,就是上下文信息,他把上下文传递给你,你接收后,就可以获取之前不了解的信息。

你看,上下文(Context)就是这么重要。在我们的开发场景中,上下文也是不可或缺的,缺少了它,我们就不能获取完整的程序信息。那到底啥是上下文呢?其实,这就是指,在 API 之间或者方法调用之间,所传递的除了业务参数之外的额外信息。

比如,服务端接收到客户端的 HTTP 请求之后,可以把客户端的 IP 地址和端口、客户端的身份信息、请求接收的时间、Trace ID 等信息放入到上下文中,这个上下文可以在后端的方法调用中传递,后端的业务方法除了利用正常的参数做一些业务处理(如订单处理)之外,还可以从上下文读取到消息请求的时间、Trace ID 等信息,把服务处理的时间推送到 Trace 服务中。Trace 服务可以把同一 Trace ID 的不同方法的调用顺序和调用时间展示成流程图,方便跟踪。

不过,Go 标准库中的 Context 功能还不止于此,它还提供了超时(Timeout)和取消(Cancel)的机制,下面就让我一一道来。


Context 的来历

在学习 Context 的功能之前呢,我先带你了解下它的来历。毕竟,知道了它的来龙去脉,我们才能应用得更加得心应手一些。

Go 在 1.7 的版本中才正式把 Context 加入到标准库中。在这之前,很多 Web 框架在定义自己的 handler 时,都会传递一个自定义的 Context,把客户端的信息和客户端的请求信息放入到 Context 中。Go 最初提供了 golang.org/x/net/context 库用来提供上下文信息,最终还是在 Go1.7 中把此库提升到标准库 context 包中。

为啥呢?这是因为,在 Go1.7 之前,有很多库都依赖 golang.org/x/net/context 中的 Context 实现,这就导致 Go 1.7 发布之后,出现了标准库 Context 和 golang.org/x/net/context 并存的状况。新的代码使用标准库 Context 的时候,没有办法使用这个标准库的 Context 去调用旧有的使用 x/net/context 实现的方法。

所以,在 Go1.9 中,还专门实现了一个叫做 type alias 的新特性,然后把 x/net/context 中的 Context 定义成标准库 Context 的别名,以解决新旧 Context 类型冲突问题,你可以看一下下面这段代码:

1
2
3
4
5
6
7
// +build go1.9
package context

import "context"

type Context = context.Context
type CancelFunc = context.CancelFunc

Go 标准库的 Context 不仅提供了上下文传递的信息,还提供了 cancel、timeout 等其它信息,这些信息貌似和 context 这个包名没关系,但是还是得到了广泛的应用。所以,你看,context 包中的 Context 不仅仅传递上下文信息,还有 timeout 等其它功能,是不是“名不副实”呢?

其实啊,这也是这个 Context 的一个问题,比较容易误导人,Go 布道师 Dave Cheney 还专门写了一篇文章讲述这个问题:Context isn’t for cancellation

同时,也有一些批评者针对 Context 提出了批评:Context should go away for Go 2,这篇文章把 Context 比作病毒,病毒会传染,结果把所有的方法都传染上了病毒(加上 Context 参数),绝对是视觉污染。

Go 的开发者也注意到了“关于 Context,存在一些争议”这件事儿,所以,Go 核心开发者 Ian Lance Taylor 专门开了一个issue 28342,用来记录当前的 Context 的问题:

  • Context 包名导致使用的时候重复 ctx context.Context;
  • Context.WithValue 可以接受任何类型的值,非类型安全;
  • Context 包名容易误导人,实际上,Context 最主要的功能是取消 goroutine 的执行;
  • Context 漫天飞,函数污染。

尽管有很多的争议,但是,在很多场景下,使用 Context 其实会很方便,所以现在它已经在 Go 生态圈中传播开来了,包括很多的 Web 应用框架,都切换成了标准库的 Context。标准库中的 database/sql、os/exec、net、net/http 等包中都使用到了 Context。而且,如果我们遇到了下面的一些场景,也可以考虑使用 Context:

  • 上下文信息传递 (request-scoped),比如处理 http 请求、在请求处理链路上传递信息;
  • 控制子 goroutine 的运行;
  • 超时控制的方法调用;
  • 可以取消的方法调用。

所以,我们需要掌握 Context 的具体用法,这样才能在不影响主要业务流程实现的时候,实现一些通用的信息传递,或者是能够和其它 goroutine 协同工作,提供 timeout、cancel 等机制。


Context 基本使用方法

首先,我们来学习一下 Context 接口包含哪些方法,这些方法都是干什么用的。

包 context 定义了 Context 接口,Context 的具体实现包括 4 个方法,分别是 Deadline、Done、Err 和 Value,如下所示:

1
2
3
4
5
6
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}

下面我来具体解释下这 4 个方法。

  • Deadline 方法会返回这个 Context 被取消的截止日期。如果没有设置截止日期,ok 的值是 false。后续每次调用这个对象的 Deadline 方法时,都会返回和第一次调用相同的结果。

  • Done 方法返回一个 Channel 对象。在 Context 被取消时,此 Channel 会被 close,如果没被取消,可能会返回 nil。后续的 Done 调用总是返回相同的结果。当 Done 被 close 的时候,你可以通过 ctx.Err 获取错误信息。Done 这个方法名其实起得并不好,因为名字太过笼统,不能明确反映 Done 被 close 的原因,因为 cancel、timeout、deadline 都可能导致 Done 被 close,不过,目前还没有一个更合适的方法名称。

    关于 Done 方法,你必须要记住的知识点就是:如果 Done 没有被 close,Err 方法返回 nil;如果 Done 被 close,Err 方法会返回 Done 被 close 的原因。

  • Value 返回此 ctx 中和指定的 key 相关联的 value。

    Context 中实现了 2 个常用的生成顶层 Context 的方法。

    • context.Background():返回一个非 nil 的、空的 Context,没有任何值,不会被 cancel,不会超时,没有截止日期。一般用在主函数、初始化、测试以及创建根 Context 的时候。
    • context.TODO():返回一个非 nil 的、空的 Context,没有任何值,不会被 cancel,不会超时,没有截止日期。当你不清楚是否该用 Context,或者目前还不知道要传递一些什么上下文信息的时候,就可以使用这个方法。

官方文档是这么讲的,你可能会觉得像没说一样,因为界限并不是很明显。其实,你根本不用费脑子去考虑,可以直接使用 context.Background。事实上,它们两个底层的实现是一模一样的:

1
2
3
4
5
6
7
8
9
10
11
12
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)

func Background() Context {
return background
}

func TODO() Context {
return todo
}

在使用 Context 的时候,有一些约定俗成的规则。

  1. 一般函数使用 Context 的时候,会把这个参数放在第一个参数的位置。

  2. 从来不把 nil 当做 Context 类型的参数值,可以使用 context.Background() 创建一个空的上下文对象,也不要使用 nil。

  3. Context 只用来临时做函数之间的上下文透传,不能持久化 Context 或者把 Context 长久保存。把 Context 持久化到数据库、本地文件或者全局变量、缓存中都是错误的用法。

  4. key 的类型不应该是字符串类型或者其它内建类型,否则容易在包之间使用 Context 时候产生冲突。使用 WithValue 时,key 的类型应该是自己定义的类型。

    why?
    1. 避免键名冲突

    当使用像字符串或其他内建类型(如 intstring)作为键时,可能会在不同的包或应用组件之间产生冲突。例如,如果两个不同的包都使用字符串 "userID" 作为键来存储在 Context 中的值,它们可能会无意中覆盖对方的数据,导致难以调试的错误和不可预见的行为。

    1. 类型安全

    使用自定义类型(通常是不可导出的类型)作为键可以增加类型安全性。这意味着,只有创建该键的包才能访问相应的值,其他包即使尝试使用相同的名字也无法访问这些值。这样可以避免误用键值对,保护数据不被其他包意外修改或读取。

    1. 封装

    自定义类型作为键还可以增强封装性,有助于定义清晰的接口,并减少包之间的依赖。通过限制键的类型和范围,可以更好地控制谁可以访问或修改 Context 中的数据。

    1. 实现细节的隐藏

    使用不可导出的类型(通过定义一个私有的结构体或其他类型)作为 Context 的键,可以隐藏实现细节,使得键的创建和管理完全由原始包控制,外部代码无法构造相同的键,这样就保证了数据的独立性和安全性。

  5. 常常使用 struct{}作为底层类型定义 key 的类型。对于 exported key 的静态类型,常常是接口或者指针。这样可以尽量减少内存分配。

其实官方的文档也是比较搞笑的,文档中强调 key 的类型不要使用 string,结果接下来的例子中就是用 string 类型作为 key 的类型。你自己把握住这个要点就好,如果你能保证别人使用你的 Context 时不会和你定义的 key 冲突,那么 key 的类型就比较随意,因为你自己保证了不同包的 key 不会冲突,否则建议你尽量采用保守的 unexported 的类型。


创建特殊用途 Context 的方法

接下来,我会介绍标准库中几种创建特殊用途 Context 的方法:WithValue、WithCancel、WithTimeout 和 WithDeadline,包括它们的功能以及实现方式。

WithValue

WithValue 基于 parent Context 生成一个新的 Context,保存了一个 key-value 键值对。它常常用来传递上下文。

WithValue 方法其实是创建了一个类型为 valueCtx 的 Context,它的类型定义如下:

1
2
3
4
type valueCtx struct {
Context
key, val interface{}
}

它持有一个 key-value 键值对,还持有 parent 的 Context。它覆盖了 Value 方法,优先从自己的存储中检查这个 key,不存在的话会从 parent 中继续检查。

Go 标准库实现的 Context 还实现了链式查找。如果不存在,还会向 parent Context 去查找,如果 parent 还是 valueCtx 的话,还是遵循相同的原则:valueCtx 会嵌入 parent,所以还是会查找 parent 的 Value 方法的。

1
2
3
4
5
6
7
ctx = context.TODO()
ctx = context.WithValue(ctx, "key1", "0001")
ctx = context.WithValue(ctx, "key2", "0001")
ctx = context.WithValue(ctx, "key3", "0001")
ctx = context.WithValue(ctx, "key4", "0004")

fmt.Println(ctx.Value("key1"))

WithCancel

WithCancel 方法返回 parent 的副本,只是副本中的 Done Channel 是新建的对象,它的类型是 cancelCtx。

我们常常在一些需要主动取消长时间的任务时,创建这种类型的 Context,然后把这个 Context 传给长时间执行任务的 goroutine。当需要中止任务时,我们就可以 cancel 这个 Context,这样长时间执行任务的 goroutine,就可以通过检查这个 Context,知道 Context 已经被取消了。

WithCancel 返回值中的第二个值是一个 cancel 函数。其实,这个返回值的名称(cancel)和类型(Cancel)也非常迷惑人。

记住,不是只有你想中途放弃,才去调用 cancel,只要你的任务正常完成了,就需要调用 cancel,这样,这个 Context 才能释放它的资源(通知它的 children 处理 cancel,从它的 parent 中把自己移除,甚至释放相关的 goroutine)。很多同学在使用这个方法的时候,都会忘记调用 cancel,切记切记,而且一定尽早释放。

我们来看下 WithCancel 方法的实现代码:

1
2
3
4
5
6
7
8
9
10
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)// 把c朝上传播
return &c, func() { c.cancel(true, Canceled) }
}

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}

代码中调用的 propagateCancel 方法会顺着 parent 路径往上找,直到找到一个 cancelCtx,或者为 nil。如果不为空,就把自己加入到这个 cancelCtx 的 child,以便这个 cancelCtx 被取消的时候通知自己。如果为空,会新起一个 goroutine,由它来监听 parent 的 Done 是否已关闭。

当这个 cancelCtx 的 cancel 函数被调用的时候,或者 parent 的 Done 被 close 的时候,这个 cancelCtx 的 Done 才会被 close。

cancel 是向下传递的,如果一个 WithCancel 生成的 Context 被 cancel 时,如果它的子 Context(也有可能是孙,或者更低,依赖子的类型)也是 cancelCtx 类型的,就会被 cancel,但是不会向上传递。parent Context 不会因为子 Context 被 cancel 而 cancel。

cancelCtx 被取消时,它的 Err 字段就是下面这个 Canceled 错误:

1
var Canceled = errors.New("context canceled")

WithTimeout

WithTimeout 其实是和 WithDeadline 一样,只不过一个参数是超时时间,一个参数是截止时间。超时时间加上当前时间,其实就是截止时间,因此,WithTimeout 的实现是:

1
2
3
4
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
// 当前时间+timeout就是deadline
return WithDeadline(parent, time.Now().Add(timeout))
}

WithDeadline

WithDeadline 会返回一个 parent 的副本,并且设置了一个不晚于参数 d 的截止时间,类型为 timerCtx(或者是 cancelCtx)。

如果它的截止时间晚于 parent 的截止时间,那么就以 parent 的截止时间为准,并返回一个类型为 cancelCtx 的 Context,因为 parent 的截止时间到了,就会取消这个 cancelCtx。

如果当前时间已经超过了截止时间,就直接返回一个已经被 cancel 的 timerCtx。否则就会启动一个定时器,到截止时间取消这个 timerCtx。

综合起来,timerCtx 的 Done 被 Close 掉,主要是由下面的某个事件触发的:

  • 截止时间到了;
  • cancel 函数被调用;
  • parent 的 Done 被 close。
  • 下面的代码是 WithDeadline 方法的实现:

下面的代码是 WithDeadline 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
// 如果parent的截止时间更早,直接返回一个cancelCtx即可
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c) // 同cancelCtx的处理逻辑
dur := time.Until(d)
if dur <= 0 { //当前时间已经超过了截止时间,直接cancel
c.cancel(true, DeadlineExceeded)
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
// 设置一个定时器,到截止时间后取消
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}

和 cancelCtx 一样,WithDeadline(WithTimeout)返回的 cancel 一定要调用,并且要尽可能早地被调用,这样才能尽早释放资源,不要单纯地依赖截止时间被动取消。正确的使用姿势是啥呢?我们来看一个例子。

1
2
3
4
5
func slowOperationWithTimeout(ctx context.Context) (Result, error) {
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel() // 一旦慢操作完成就立马调用cancel
return slowOperation(ctx)
}

总结

我们经常使用 Context 来取消一个 goroutine 的运行,这是 Context 最常用的场景之一,Context 也被称为 goroutine 生命周期范围(goroutine-scoped)的 Context,把 Context 传递给 goroutine。但是,goroutine 需要尝试检查 Context 的 Done 是否关闭了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
ctx, cancel := context.WithCancel(context.Background())

go func() {
defer func() {
fmt.Println("goroutine exit")
}()

for {
select {
case <-ctx.Done():
return
default:
time.Sleep(time.Second)
}
}
}()

time.Sleep(time.Second)
cancel()
time.Sleep(2 * time.Second)
}

如果你要为 Context 实现一个带超时功能的调用,比如访问远程的一个微服务,超时并不意味着你会通知远程微服务已经取消了这次调用,大概率的实现只是避免客户端的长时间等待,远程的服务器依然还执行着你的请求。

所以,有时候,Context 并不会减少对服务器的请求负担。如果在 Context 被 cancel 的时候,你能关闭和服务器的连接,中断和数据库服务器的通讯、停止对本地文件的读写,那么,这样的超时处理,同时能减少对服务调用的压力,但是这依赖于你对超时的底层处理机制。



三、原子操作

3.1 atomic:要保证原子操作,一定要使用这几种方法

前面我们在学习 Mutex、RWMutex 等并发原语的实现时,你可以看到,最底层是通过 atomic 包中的一些原子操作来实现的。当时,为了让你的注意力集中在这些原语的功能实现上,我并没有展开介绍这些原子操作是干什么用的。

你可能会说,这些并发原语已经可以应对大多数的并发场景了,为啥还要学习原子操作呢?其实,这是因为,在很多场景中,使用并发原语实现起来比较复杂,而原子操作可以帮助我们更轻松地实现底层的优化。

所以,现在,我会专门用一节课,带你仔细地了解一下什么是原子操作,atomic 包都提供了哪些实现原子操作的方法。另外,我还会带你实现一个基于原子操作的数据结构。好了,接下来我们先来学习下什么是原子操作。


原子操作的基础知识





四、channel

4.1 channel:另辟蹊径,解决并发问题

Channel 是 Go 语言内建的 first-class 类型,也是 Go 语言与众不同的特性之一。Go 语言的 Channel 设计精巧简单,以至于也有人用其它语言编写了类似 Go 风格的 Channel 库,比如docker/libchantylertreat/chan,但是并不像 Go 语言一样把 Channel 内置到了语言规范中。从这一点,你也可以看出来,Channel 的地位在编程语言中的地位之高,比较罕见。

所以,这节课,我们就来学习下 Channel。


Channel 的发展

要想了解 Channel 这种 Go 编程语言中的特有的数据结构,我们要追溯到 CSP 模型,学习一下它的历史,以及它对 Go 创始人设计 Channel 类型的影响。

CSP 是 Communicating Sequential Process 的简称,中文直译为通信顺序进程,或者叫做交换信息的循序进程,是用来描述并发系统中进行交互的一种模式。

CSP 最早出现于计算机科学家 Tony Hoare 在 1978 年发表的论文中(你可能不熟悉 Tony Hoare 这个名字,但是你一定很熟悉排序算法中的 Quicksort 算法,他就是 Quicksort 算法的作者,图灵奖的获得者)。最初,论文中提出的 CSP 版本在本质上不是一种进程演算,而是一种并发编程语言,但之后又经过了一系列的改进,最终发展并精炼出 CSP 的理论。CSP 允许使用进程组件来描述系统,它们独立运行,并且只通过消息传递的方式通信

就像 Go 的创始人之一 Rob Pike 所说的:“每一个计算机程序员都应该读一读 Tony Hoare 1978 年的关于 CSP 的论文。”他和 Ken Thompson 在设计 Go 语言的时候也深受此论文的影响,并将 CSP 理论真正应用于语言本身(Russ Cox 专门写了一篇文章记录这个历史),通过引入 Channel 这个新的类型,来实现 CSP 的思想。

Channel 类型是 Go 语言内置的类型,你无需引入某个包,就能使用它。虽然 Go 也提供了传统的并发原语,但是它们都是通过库的方式提供的,你必须要引入 sync 包或者 atomic 包才能使用它们,而 Channel 就不一样了,它是内置类型,使用起来非常方便。

Channel 和 Go 的另一个独特的特性 goroutine 一起为并发编程提供了优雅的、便利的、与传统并发控制不同的方案,并演化出很多并发模式。接下来,我们就来看一看 Channel 的应用场景。


Channel 的应用场景

先看一条 Go 语言中流传很广的谚语:

Don’t communicate by sharing memory, share memory by communicating.

Go Proverbs by Rob Pike

这是 Rob Pike 在 2015 年的一次 Gopher 会议中提到的一句话,虽然有一点绕,但也指出了使用 Go 语言的哲学,我尝试着来翻译一下:“执行业务处理的 goroutine 不要通过共享内存的方式通信,而是要通过 Channel 通信的方式分享数据。”

“communicate by sharing memory”和“share memory by communicating”是两种不同的并发处理模式。“communicate by sharing memory”是传统的并发编程处理方式,就是指,共享的数据需要用锁进行保护,goroutine 需要获取到锁,才能并发访问数据。

“share memory by communicating”则是类似于 CSP 模型的方式,通过通信的方式,一个 goroutine 可以把数据的“所有权”交给另外一个 goroutine(虽然 Go 中没有“所有权”的概念,但是从逻辑上说,你可以把它理解为是所有权的转移)。

从 Channel 的历史和设计哲学上,我们就可以了解到,Channel 类型和基本并发原语是有竞争关系的,它应用于并发场景,涉及到 goroutine 之间的通讯,可以提供并发的保护,等等。

综合起来,我把 Channel 的应用场景分为五种类型。这里你先有个印象,这样你可以有目的地去学习 Channel 的基本原理。下节课我会借助具体的例子,来带你掌握这几种类型。

  • 数据交流:当作并发的 buffer 或者 queue,解决生产者 - 消费者问题。多个 goroutine 可以并发当作生产者(Producer)和消费者(Consumer)。
  • 数据传递:一个 goroutine 将数据交给另一个 goroutine,相当于把数据的拥有权 (引用) 托付出去。
  • 信号通知:一个 goroutine 可以将信号 (closing、closed、data ready 等) 传递给另一个或者另一组 goroutine 。
  • 任务编排:可以让一组 goroutine 按照一定的顺序并发或者串行的执行,这就是编排的功能。
  • :利用 Channel 也可以实现互斥锁的机制。

下面,我们来具体学习下 Channel 的基本用法。


Channel 基本用法

你可以往 Channel 中发送数据,也可以从 Channel 中接收数据,所以,Channel 类型(为了说起来方便,我们下面都把 Channel 叫做 chan)分为只能接收、只能发送、既可以接收又可以发送三种类型。下面是它的语法定义:

1
ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType .

相应地,Channel 的正确语法如下:

1
2
3
chan string          // 可以发送接收string
chan<- struct{} // 只能发送struct{}
<-chan int // 只能从chan接收int

我们把既能接收又能发送的 chan 叫做双向的 chan,把只能发送和只能接收的 chan 叫做单向的 chan。其中,“<-”表示单向的 chan,如果你记不住,我告诉你一个简便的方法:这个箭头总是射向左边的,元素类型总在最右边。如果箭头指向 chan,就表示可以往 chan 中塞数据;如果箭头远离 chan,就表示 chan 会往外吐数据。

chan 中的元素是任意的类型,所以也可能是 chan 类型,我来举个例子,比如下面的 chan 类型也是合法的:

1
2
3
4
chan<- chan int   
chan<- <-chan int
<-chan <-chan int
chan (<-chan int)

可是,怎么判定箭头符号属于哪个 chan 呢?其实,“<-”有个规则,总是尽量和左边的 chan 结合(The <- operator associates with the leftmost chan possible:),因此,上面的定义和下面的使用括号的划分是一样的:

1
2
3
4
chan<- (chan int// <- 和第一个chan结合
chan<- (<-chan int// 第一个<-和最左边的chan结合,第二个<-和左边第二个chan结合
<-chan (<-chan int// 第一个<-和最左边的chan结合,第二个<-和左边第二个chan结合
chan (<-chan int) // 因为括号的原因,<-和括号内第一个chan结合

通过 make,我们可以初始化一个 chan,未初始化的 chan 的零值是 nil。你可以设置它的容量,比如下面的 chan 的容量是 9527,我们把这样的 chan 叫做 buffered chan;如果没有设置,它的容量是 0,我们把这样的 chan 叫做 unbuffered chan

1
make(chan int, 9527)

如果 chan 中还有数据,那么,从这个 chan 接收数据的时候就不会阻塞,如果 chan 还未满(“满”指达到其容量),给它发送数据也不会阻塞,否则就会阻塞。unbuffered chan 只有读写都准备好之后才不会阻塞,这也是很多使用 unbuffered chan 时的常见 Bug。

还有一个知识点需要你记住:nil 是 chan 的零值,是一种特殊的 chan,对值是 nil 的 chan 的发送接收调用者总是会阻塞


下面,我来具体给你介绍几种基本操作,分别是发送数据、接收数据,以及一些其它操作。学会了这几种操作,你就能真正地掌握 Channel 的用法了。

  1. 发送数据

往 chan 中发送一个数据使用“ch<-”,发送数据是一条语句:

1
ch <- 2000

这里的 ch 是 chan int 类型或者是 chan <-int。

  1. 接收数据

从 chan 中接收一条数据使用“<-ch”,接收数据也是一条语句:

1
2
3
x := <-ch // 把接收的一条数据赋值给变量x
foo(<-ch) // 把接收的一个的数据作为参数传给函数
<-ch // 丢弃接收的一条数据

这里的 ch 类型是 chan T 或者 <-chan T。

接收数据时,还可以返回两个值。第一个值是返回的 chan 中的元素,很多人不太熟悉的是第二个值。第二个值是 bool 类型,代表是否成功地从 chan 中读取到一个值,如果第二个参数是 false,chan 已经被 close 而且 chan 中没有缓存的数据,这个时候,第一个值是零值。所以,如果从 chan 读取到一个零值,可能是 sender 真正发送的零值,也可能是 closed 的并且没有缓存元素产生的零值。

  1. 其它操作

Go 内建的函数 close、cap、len 都可以操作 chan 类型:close 会把 chan 关闭掉,cap 返回 chan 的容量,len 返回 chan 中缓存的还未被取走的元素数量。

send 和 recv 都可以作为 select 语句的 case clause,如下面的例子:

1
2
3
4
5
6
7
8
9
10
func main() {
var ch = make(chan int, 10)
for i := 0; i < 10; i++ {
select {
case ch <- i:
case v := <-ch:
fmt.Println(v)
}
}
}

chan 还可以应用于 for-range 语句中,比如:

1
2
3
for v := range ch {
fmt.Println(v)
}

或者是忽略读取的值,只是清空 chan:

1
2
for range ch {
}

好了,到这里,Channel 的基本用法,我们就学完了。下面从代码实现的角度分析 chan 类型的实现。毕竟,只有掌握了原理,你才能真正地用好它。


Channel 的实现原理

接下来,我会给你介绍 chan 的数据结构、初始化的方法以及三个重要的操作方法,分别是 send、recv 和 close。通过学习 Channel 的底层实现,你会对 Channel 的功能和异常情况有更深的理解。

chan 数据结构

chan 类型的数据结构如下图所示,它的数据类型是runtime.hchan

下面来具体解释各个字段的意义。

  • qcount:代表 chan 中已经接收但还没被取走的元素的个数。内建函数 len 可以返回这个字段的值。
  • dataqsiz:队列的大小。chan 使用一个循环队列来存放元素,循环队列很适合这种生产者 - 消费者的场景(我很好奇为什么这个字段省略 size 中的 e)。
  • buf:存放元素的循环队列的 buffer。
  • elemtype 和 elemsize:chan 中元素的类型和 size。因为 chan 一旦声明,它的元素类型是固定的,即普通类型或者指针类型,所以元素大小也是固定的。
  • sendx:处理发送数据的指针在 buf 中的位置。一旦接收了新的数据,指针就会加上 elemsize,移向下一个位置。buf 的总大小是 elemsize 的整数倍,而且 buf 是一个循环列表。
  • recvx:处理接收请求时的指针在 buf 中的位置。一旦取出数据,此指针会移动到下一个位置。
  • recvq:chan 是多生产者多消费者的模式,如果消费者因为没有数据可读而被阻塞了,就会被加入到 recvq 队列中。

初始化

Go 在编译的时候,会根据容量的大小选择调用 makechan64,还是 makechan。

下面的代码是处理 make chan 的逻辑,它会决定是使用 makechan 还是 makechan64 来实现 chan 的初始化:

我们只关注 makechan 就好了,因为 makechan64 只是做了 size 检查,底层还是调用 makechan 实现的。makechan 的目标就是生成 hchan 对象。
那么,接下来,就让我们来看一下 makechan 的主要逻辑。主要的逻辑我都加上了注释,它会根据 chan 的容量的大小和元素的类型不同,初始化不同的存储空间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func makechan(t *chantype, size int) *hchan {
elem := t.elem

// 略去检查代码
mem, overflow := math.MulUintptr(elem.size, uintptr(size))

//
var c *hchan
switch {
case mem == 0:
// chan的size或者元素的size是0,不必创建buf
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素不是指针,分配一块连续的内存给hchan数据结构和buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// hchan数据结构后面紧接着就是buf
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指针,那么单独分配buf
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

// 元素大小、类型、容量都记录下来
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)

return c
}

最终,针对不同的容量和元素类型,这段代码分配了不同的对象来初始化 hchan 对象的字段,返回 hchan 对象。

send

Go 在编译发送数据给 chan 的时候,会把 send 语句转换成 chansend1 函数,chansend1 函数会调用 chansend,我们分段学习它的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 第一部分
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
......
}

最开始,第一部分是进行判断:如果 chan 是 nil 的话,就把调用者 goroutine park(阻塞休眠), 调用者就永远被阻塞住了,所以,第 11 行是不可能执行到的代码。

1
2
3
4
5
// 第二部分,如果chan没有被close,并且chan满了,直接返回
if !block && c.closed == 0 && full(c) {
return false
}
......

第二部分的逻辑是当你往一个已经满了的 chan 实例发送数据时,并且想不阻塞当前调用,那么这里的逻辑是直接返回。chansend1 方法在调用 chansend 的时候设置了阻塞参数,所以不会执行到第二部分的分支里。

1
2
3
4
5
6
7
// 第三部分,chan已经被close的情景
lock(&c.lock) // 开始加锁
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
......

第三部分显示的是,如果 chan 已经被 close 了,再往里面发送数据的话会 panic。

1
2
3
4
5
6
7
// 第四部分,从接收队列中出队一个等待的receiver
if sg := c.recvq.dequeue(); sg != nil {
//
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
......

第四部分,如果等待队列中有等待的 receiver,那么这段代码就把它从队列中弹出,然后直接把数据交给它(通过 memmove(dst, src, t.size)),而不需要放入到 buf 中,速度可以更快一些。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 第五部分,buf还没满
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
......

第五部分说明当前没有 receiver,需要把数据放入到 buf 中,放入之后,就成功返回了。

1
2
3
4
5
6
7
// 第六部分,buf满。
// chansend1不会进入if块里,因为chansend1的block=true
if !block {
unlock(&c.lock)
return false
}
......

第六部分是处理 buf 满的情况。如果 buf 满了,发送者的 goroutine 就会加入到发送者的等待队列中,直到被唤醒。这个时候,数据或者被取走了,或者 chan 被 close 了。

recv

在处理从 chan 中接收数据时,Go 会把代码转换成 chanrecv1 函数,如果要返回两个返回值,会转换成 chanrecv2,chanrecv1 函数和 chanrecv2 会调用 chanrecv。我们分段学习它的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 第一部分,chan为nil
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
......

chanrecv1 和 chanrecv2 传入的 block 参数的值是 true,都是阻塞方式,所以我们分析 chanrecv 的实现的时候,不考虑 block=false 的情况。

第一部分是 chan 为 nil 的情况。和 send 一样,从 nil chan 中接收(读取、获取)数据时,调用者会被永远阻塞。

1
2
3
4
5
// 第二部分, block=false且c为空
if !block && empty(c) {
......
}
......

第二部分你可以直接忽略,因为不是我们这次要分析的场景。

1
2
3
4
5
6
7
8
9
10
11
// 加锁,返回时释放锁
lock(&c.lock)
// 第三部分,c已经被close,且chan为空empty
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
......

第三部分是 chan 已经被 close 的情况。如果 chan 已经被 close 了,并且队列中没有缓存的元素,那么返回 true、false。

1
2
3
4
5
6
// 第四部分,如果sendq队列中有等待发送的sender
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
......

第四部分是处理 buf 满的情况。这个时候,如果是 unbuffer 的 chan,就直接将 sender 的数据复制给 receiver,否则就从队列头部读取一个值,并把这个 sender 的值加入到队列尾部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 第五部分, 没有等待的sender, buf中有数据
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

// 第六部分, buf中没有元素,阻塞
......

第五部分是处理没有等待的 sender 的情况。这个是和 chansend 共用一把大锁,所以不会有并发的问题。如果 buf 有元素,就取出一个元素给 receiver。

第六部分是处理 buf 中没有元素的情况。如果没有元素,那么当前的 receiver 就会被阻塞,直到它从 sender 中接收了数据,或者是 chan 被 close,才返回。

close

通过 close 函数,可以把 chan 关闭,编译器会替换成 closechan 方法的调用。

下面的代码是 close chan 的主要逻辑。如果 chan 为 nil,close 会 panic;如果 chan 已经 closed,再次 close 也会 panic。否则的话,如果 chan 不为 nil,chan 也没有 closed,就把等待队列中的 sender(writer)和 receiver(reader)从队列中全部移除并唤醒。

下面的代码就是 close chan 的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func closechan(c *hchan) {
if c == nil { // chan为nil, panic
panic(plainError("close of nil channel"))
}

lock(&c.lock)
if c.closed != 0 {// chan已经closed, panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

c.closed = 1

var glist gList

// 释放所有的reader
for {
sg := c.recvq.dequeue()
......
gp := sg.g
......
glist.push(gp)
}

// 释放所有的writer (它们会panic)
for {
sg := c.sendq.dequeue()
......
gp := sg.g
......
glist.push(gp)
}
unlock(&c.lock)

for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}

掌握了 Channel 的基本用法和实现原理,下面再来给你讲一讲容易犯的错误。你一定要认真看,毕竟,这些可都是帮助你避坑的。

使用 Channel 容易犯的错误

根据 2019 年第一篇全面分析 Go 并发 Bug 的论文,那些知名的 Go 项目中使用 Channel 所犯的 Bug 反而比传统的并发原语的 Bug 还要多。主要有两个原因:一个是,Channel 的概念还比较新,程序员还不能很好地掌握相应的使用方法和最佳实践;第二个是,Channel 有时候比传统的并发原语更复杂,使用起来很容易顾此失彼。

使用 Channel 最常见的错误是 panic 和 goroutine 泄漏。
首先,我们来总结下会 panic 的情况,总共有 3 种:

  1. close 为 nil 的 chan;
  2. send 已经 close 的 chan;
  3. close 已经 close 的 chan。

goroutine 泄漏的问题也很常见,下面的代码也是一个实际项目中的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func process(timeout time.Duration) bool {
ch := make(chan bool)

go func() {
// 模拟处理耗时的业务
time.Sleep((timeout + time.Second))
ch <- true // block
fmt.Println("exit goroutine")
}()
select {
case result := <-ch:
return result
case <-time.After(timeout):
return false
}
}

在这个例子中,process 函数会启动一个 goroutine,去处理需要长时间处理的业务,处理完之后,会发送 true 到 chan 中,目的是通知其它等待的 goroutine,可以继续处理了。

我们来看一下第 10 行到第 15 行,主 goroutine 接收到任务处理完成的通知,或者超时后就返回了。这段代码有问题吗?

如果发生超时,process 函数就返回了,这就会导致 unbuffered 的 chan 从来就没有被读取。我们知道,unbuffered chan 必须等 reader 和 writer 都准备好了才能交流,否则就会阻塞。超时导致未读,结果就是子 goroutine 就阻塞在第 7 行永远结束不了,进而导致 goroutine 泄漏。

解决这个 Bug 的办法很简单,就是将 unbuffered chan 改成容量为 1 的 chan,这样第 7 行就不会被阻塞了。

Go 的开发者极力推荐使用 Channel,不过,这两年,大家意识到,Channel 并不是处理并发问题的“银弹”,有时候使用并发原语更简单,而且不容易出错。所以,我给你提供一套选择的方法:

  1. 共享资源的并发访问使用传统并发原语;
  2. 复杂的任务编排和消息传递使用 Channel;
  3. 消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;
  4. 简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
  5. 需要和 Select 语句结合,使用 Channel;
  6. 需要和超时配合时,使用 Channel 和 Context。

它们踩过的坑

接下来,我带你围观下知名 Go 项目的 Channel 相关的 Bug。
etcd issue 6857是一个程序 hang 住的问题:在异常情况下,没有往 chan 实例中填充所需的元素,导致等待者永远等待。具体来说,Status 方法的逻辑是生成一个 chan Status,然后把这个 chan 交给其它的 goroutine 去处理和写入数据,最后,Status 返回获取的状态信息。

不幸的是,如果正好节点停止了,没有 goroutine 去填充这个 chan,会导致方法 hang 在返回的那一行上(下面的截图中的第 466 行)。解决办法就是,在等待 status chan 返回元素的同时,也检查节点是不是已经停止了(done 这个 chan 是不是 close 了)。

其实,感觉这个修改还是有问题的。问题就在于,如果程序执行了 466 行,成功地把 c 写入到 Status 待处理队列后,执行到第 467 行时,如果停止了这个节点,那么,这个 Status 方法还是会阻塞在第 467 行。你可以自己研究研究,看看是不是这样。

etcd issue 5505 虽然没有任何的 Bug 描述,但是从修复内容上看,它是一个往已经 close 的 chan 写数据导致 panic 的问题。

etcd issue 11256 是因为 unbuffered chan goroutine 泄漏的问题。TestNodeProposeAddLearnerNode 方法中一开始定义了一个 unbuffered 的 chan,也就是 applyConfChan,然后启动一个子 goroutine,这个子 goroutine 会在循环中执行业务逻辑,并且不断地往这个 chan 中添加一个元素。TestNodeProposeAddLearnerNode 方法的末尾处会从这个 chan 中读取一个元素。

这段代码在 for 循环中就往此 chan 中写入了一个元素,结果导致 TestNodeProposeAddLearnerNode 从这个 chan 中读取到元素就返回了。悲剧的是,子 goroutine 的 for 循环还在执行,阻塞在下图中红色的第 851 行,并且一直 hang 在那里。

这个 Bug 的修复也很简单,只要改动一下 applyConfChan 的处理逻辑就可以了:只有子 goroutine 的 for 循环中的主要逻辑完成之后,才往 applyConfChan 发送一个元素,这样,TestNodeProposeAddLearnerNode 收到通知继续执行,子 goroutine 也不会被阻塞住了。

etcd issue 9956 是往一个已 close 的 chan 发送数据,其实它是 grpc 的一个 bug(grpc issue 2695),修复办法就是不 close 这个 chan 就好了


总结

chan 的值和状态有多种情况,而不同的操作(send、recv、close)又可能得到不同的结果,这是使用 chan 类型时经常让人困惑的地方。

为了帮助你快速地了解不同状态下各种操作的结果,以下总结了一个表格,你一定要特别关注下那些 panic 的情况,另外还要掌握那些会 block 的场景,它们是导致死锁或者 goroutine 泄露的罪魁祸首。

还有一个值得注意的点是,只要一个 chan 还有未读的数据,即使把它 close 掉,你还是可以继续把这些未读的数据消费完,之后才是读取零值数据






怎么解决配置中心这种,写很少,但是读特别多的问题?

1
go vet ./...

sync.Map 和普通的 map 在 Go 中的主要区别确实在于 线程安全,但除此之外,它们在实现方式、性能特点和使用场景上也有一些显著的差异。

  1. 线程安全性

    • sync.Mapsync.Map 是线程安全的,可以在并发环境下直接使用而不需要额外的锁。因此,在需要频繁并发读写的场景中,sync.Map 能够避免手动加锁操作,提供较好的安全性和便捷性。
    • 普通 map:普通的 map 在并发读写时并不是线程安全的,必须使用 sync.RWMutex 等机制进行同步保护,否则会引发并发读写错误(如 fatal error: concurrent map read and map write)。
  2. 性能和使用场景

    • sync.Map:适用于读多写少的场景,尤其是在数据量较大时,频繁读写可能导致性能下降。sync.Map 采用了内部分段锁和延迟删除机制(不立即删除而是标记删除),在高并发、读多写少的情况表现更好。
    • 普通 map:对于写多读少或非并发的场景,普通 map 加上 sync.RWMutex 可能会有更好的性能,因为 sync.Map 复杂的内部逻辑(如分段锁、延迟删除等)在写多的情况下反而可能会增加额外的开销。
  3. API 不同

    • sync.Map
      • 使用的是 LoadStoreLoadOrStoreDeleteRange 等方法,没有直接的索引操作(如 m[key])。
      • LoadOrStore 方法特别适合在高并发时使用,可以实现“检查并存储”的操作,相比普通 map 更加简洁高效。
    • 普通 map
      • 可以通过索引操作 m[key] 来直接读取或写入数据,使用方式更加简洁。
      • 普通 map 直接支持的操作方式更加灵活,比如通过直接的迭代、元素删除等操作。
  4. 内存管理和清理

    • sync.Map:使用了延迟删除机制,标记删除的元素不会立即被移除,而是在后续操作或 Range 时被逐步清理。这种机制在频繁删除和读写的情况下可能会占用更多内存。
    • 普通 map:删除元素时直接释放内存,更加简单直接,不会有延迟删除的问题。
  5. 遍历顺序

    • sync.Mapsync.Map 中元素的遍历顺序是不确定的,因为其内部实现可能会对元素存储进行重新分配、优化等操作。
    • 普通 map:虽然遍历顺序也不保证一致,但在一次遍历过程中元素的顺序是固定的,且通常不会因为 Range 而改变(除非手动插入或删除元素)。
  6. 实现原理的不同

    • sync.Mapsync.Map 并不是简单地加锁来实现线程安全的,它采用了一种更复杂的数据结构,基于读多写少场景设计。其内部结构大致由两部分组成:只读部分(read-only)和脏部分(dirty)。
      • 只读部分:存储稳定的数据,用于快速读取,没有锁,因此读性能很高。
      • 脏部分:用于存储最近的写入数据,并在适当时机合并到只读部分。
      • 当写入操作发生时,会优先尝试在脏部分写入,写入次数增多时,会将脏部分的数据合并到只读部分中。这种设计提高了并发访问的性能。
    • 普通 map:普通的 map 是基于哈希表实现的,不具备并发安全特性。

总结

  • 线程安全sync.Map 的最大优势是线程安全,适合并发读写场景,普通 map 在并发下需要手动加锁。
  • 使用场景sync.Map 适用于读多写少的高并发场景,而普通 map 更适合非并发场景或写多读少的场景。
  • 实现细节sync.Map 采用分段锁和延迟删除,牺牲了部分写性能来换取读性能和线程安全,而普通 map 是一个简单的哈希表。

因此,如果你的应用场景是并发读多写少,sync.Map 会更合适;如果是非并发或写多读少场景,则可以使用普通的 map,配合 sync.RWMutex 达到更好的性能。


并发编程中避免数据竞争的总结笔记

在实际生产环境中,数据竞争(Data Race)是并发编程中常见的问题。我们应尽量在设计阶段就避免数据竞争,而不是依赖加锁等手段在问题发生后再去解决,因为加锁会导致并发性能的下降。

一、避免共享可变状态

  1. 无共享设计(No Shared State Design)

    • 消除共享数据:通过设计,使得线程或协程之间不共享数据,从根本上避免数据竞争。
    • 函数式编程理念:使用不可变数据结构,避免状态变化。
  2. 消息传递(Message Passing)

    • 使用通信机制:如 Go 语言的 channel,线程或协程之间通过消息传递数据,而不是共享内存。

二、使用不可变数据和值传递

  1. 不可变数据(Immutable Data)

    • 创建后不修改:不可变数据在并发环境下是安全的,因为它们不会被修改。
  2. 值传递

    • 传递数据副本:将数据的副本传递给线程或协程,避免对原始数据的并发修改。

三、数据局部化

  1. 线程/协程本地存储(Thread/Coroutine Local Storage)

    • 数据局部化:将数据限制在单个线程或协程内部,防止跨线程的数据竞争。
  2. 写时复制(Copy-on-Write, COW)

    • 延迟复制:在需要修改数据时才复制,减少不必要的复制开销。

四、使用无锁数据结构

  • 无锁(Lock-free)和无等待(Wait-free)数据结构
    • 原子操作:利用原子操作(如 CAS)实现线程安全,而不需要锁。

五、使用原子操作和内存屏障

  1. 原子操作(Atomic Operations)

    • 线程安全的基本操作:如原子加减,可以保证对单个变量的线程安全访问。
  2. 内存屏障(Memory Barrier)

    • 防止指令重排:确保内存操作的执行顺序,避免因编译器优化导致的意外行为。

六、使用并发安全的数据结构

  • 并发安全容器
    • :Java 的 ConcurrentHashMap、Go 的 sync.Map
    • 特性:内部实现了必要的同步机制,适合在并发环境下使用。

七、合理使用同步机制

  1. 最小化锁的粒度

    • 减少锁的持有时间:仅在必要的代码块加锁,避免影响其他并发操作。
  2. 读写锁(Read-Write Lock)

    • 读多写少场景:允许多个读操作并发进行,写操作仍需独占。

八、Go 语言中的实践

  1. 优先使用值传递或不可变数据传递给 goroutine

    • 避免数据竞争:传递数据副本或不可变数据,防止 goroutine 修改共享数据。
  2. 使用 channel 传递数据

    • 通信共享内存:通过 channel 传递数据,而不是共享内存,符合 Go 的并发哲学。
  3. 避免共享可变数据

    • 慎用指针和引用类型:除非有明确的同步机制,否则应避免在 goroutine 间共享可变数据。

总结

在并发编程中,预防胜于治疗。通过设计无共享的数据结构、使用不可变数据、值传递和并发安全的通信机制,可以有效避免数据竞争问题,提升程序的并发性能和可靠性。依赖加锁虽然能解决数据竞争,但会带来性能开销和死锁风险,应尽量避免。