Appearance
The Go Programming Language - 并发安全
1. 竞争条件
通常我们可以采用下面两种方式避免数据竞争:
避免从多个 goroutine 访问变量;
由于其它的 goroutine 不能够直接访问变量,它们只能使用一个 channel 发送给指定的 goroutine 进行查询/更新变量。这也就是 Go 的口头禅 “不要使用共享数据来通信,使用通信来共享数据”。一个提供对一个指定的变量通过 channel 来请求的 goroutine 叫做这个变量的 monitor goroutine。
这是一个非并发安全的示例:
Go// Package bank implements a bank with only one account. package bank var balance int func Deposit(amount int) { balance = balance + amount } func Balance() int { return balance }1
2
3
4
5改造后的代码:
Go// Package bank provides a concurrency-safe bank with one account. package bank var deposits = make(chan int) // send amount to deposit var balances = make(chan int) // receive balance func Deposit(amount int) { deposits <- amount } func Balance() int { return <-balances } func teller() { var balance int // balance is confined to teller goroutine for { select { case amount := <-deposits: balance += amount case balances <- balance: } } } func init() { go teller() // start the monitor goroutine }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23允许多个 goroutine 去访问变量,但是在同一个时刻最多只有一个 goroutine 在访问,这种方式被称为 “互斥”,稍后我们将对其进行讨论;
2. sync.Mutex 互斥锁
之前我们使用了一个 buffered channel 作为一个计数信号量,来保证最多只有 20 个 goroutine 会同时执行 HTTP 请求。同理,我们可以用一个容量只有 1 的 channel 来保证最多只有一个 goroutine 在同一时刻访问一个共享变量。一个只能为 1 和 0 的信号量叫做二元信号量(binary semaphore)。
Go
var (
sema = make(chan struct{}, 1) // a binary semaphore guarding balance
balance int
)
func Deposit(amount int) {
sema <- struct{}{} // acquire token
balance = balance + amount
<-sema // release token
}
func Balance() int {
sema <- struct{}{} // acquire token
b := balance
<-sema // release token
return b
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
这种互斥很实用,而且被 sync 包里的 Mutex 类型直接支持。它的 Lock 方法能够获取到 token(这里叫锁),并且 Unlock 方法会释放这个 token:
Go
import "sync"
var (
mu sync.Mutex // guards balance
balance int
)
func Deposit(amount int) {
mu.Lock()
balance = balance + amount
mu.Unlock()
}
func Balance() int {
mu.Lock()
b := balance
mu.Unlock()
return b
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
通过 defer 可以确保 Unlock 函数在函数结束时一定会被调用:
Go
func Balance() int {
mu.Lock()
defer mu.Unlock()
return balance
}1
2
3
4
5
2
3
4
5
接下来我们再增加一个 Withdraw 函数(在此版本中,此函数在并发情况下有可能会导致 balance 被扣减至 0 以下):
Go
// NOTE: not atomic!
func Withdraw(amount int) bool {
Deposit(-amount)
if Balance() < 0 {
Deposit(amount)
return false // insufficient funds
}
return true
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
如果在 Withdraw 函数中再获取一次互斥锁:
Go
// NOTE: incorrect!
func Withdraw(amount int) bool {
mu.Lock()
defer mu.Unlock()
Deposit(-amount)
if Balance() < 0 {
Deposit(amount)
return false // insufficient funds
}
return true
}1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
在以上这个版本中 Deposit 会调用 mu.Lock() 第二次去获取互斥锁,但因为 mutex 已经锁上了,而无法被重入(go 里没有重入锁),也就是说没法对一个已经锁上的 mutex 来再次上锁,这会导致程序死锁,没法继续执行下去,Withdraw 会永远阻塞下去。
一个通用的解决方案是将一个函数分离为多个函数,比如我们把 Deposit 分离成两个:一个不导出的函数 deposit,这个函数假设锁总是会被保持并去做实际的操作,另一个是导出的函数 Deposit,这个函数会调用 deposit,但在调用前会先去获取锁。于是我们可以将 Withdraw 修改成下面这种形式:
Go
func Withdraw(amount int) bool {
mu.Lock()
defer mu.Unlock()
deposit(-amount)
if balance < 0 {
deposit(amount)
return false // insufficient funds
}
return true
}
func Deposit(amount int) {
mu.Lock()
defer mu.Unlock()
deposit(amount)
}
func Balance() int {
mu.Lock()
defer mu.Unlock()
return balance
}
// This function requires that the lock be held.
func deposit(amount int) { balance += amount }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
3. sync.RWMutex 读写锁
Go 的读写锁是通过 sync.RWMutex 实现的:
Go
var mu sync.RWMutex
var balance int
func Balance() int {
mu.RLock() // readers lock
defer mu.RUnlock()
return balance
}1
2
3
4
5
6
7
2
3
4
5
6
7
Balance 函数现在调用了 RLock 和 RUnlock 方法来获取和释放一个读取或者共享锁。Deposit 函数没有变化(调用 mu.Lock 和 mu.Unlock 方法来获取和释放一个写或互斥锁)。
RWMutex 只有当获得锁的大部分 goroutine 都是读操作的情况时,才最能带来好处的。因为 RWMutex 需要更复杂的内部记录,所以会让它比一般的无竞争锁的 mutex 慢一些。
4. sync.Once 初始化
下面是一个延迟初始化的例子(并发安全):
Go
var mu sync.RWMutex // guards icons
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
mu.RLock()
if icons != nil {
icon := icons[name]
mu.RUnlock()
return icon
}
mu.RUnlock()
// acquire an exclusive lock
mu.Lock()
if icons == nil { // NOTE: must recheck for nil
loadIcons()
}
icon := icons[name]
mu.Unlock()
return icon
}
func loadIcons() {
icons = make(map[string]image.Image)
icons["spades.png"] = loadIcon("spades.png")
icons["hearts.png"] = loadIcon("hearts.png")
icons["diamonds.png"] = loadIcon("diamonds.png")
icons["clubs.png"] = loadIcon("clubs.png")
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
上面的写法过于繁琐,下面我们使用 sync.Once 对其进行简化:
Go
var loadIconsOnce sync.Once
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
5. 竞争条件检测
即使我们小心到不能再小心,但在并发程序中犯错还是太容易了。幸运的是,Go 的 runtime 和工具链为我们装备了一个复杂但好用的动态分析工具,竞争检查器(the race detector)。
只要在 go build,go run 或者 go test 命令后面加上 -race 的 flag,就会使编译器创建一个你的应用的 “修改版” 或者一个附带了能够记录所有运行期对共享变量访问工具的 test,并且会记录下每一个读或者写共享变量的 goroutine 的身份信息。另外,修改版的程序会记录下所有的同步事件,比如 go 语句,channel 操作,以及对 (*sync.Mutex).Lock、(*sync.WaitGroup).Wait 等等的调用。(完整的同步事件集合是在 The Go Memory Model 文档中有说明,该文档是和语言文档放在一起的。译注:https://golang.org/ref/mem)
竞争检查器会检查这些事件,会寻找在哪一个 goroutine 中出现了这样的 case,例如其读或者写了一个共享变量,这个共享变量是被另一个 goroutine 在没有进行干预同步操作便直接写入的。这种情况也就表明了是对一个共享变量的并发访问,即数据竞争。这个工具会打印一份报告,内容包含变量身份,读取和写入的 goroutine 中活跃的函数的调用栈。这些信息在定位问题时通常很有用。在后续章节中会有一个竞争检查器的实战样例。
竞争检查器会报告所有的已经发生的数据竞争。然而,它只能检测到运行时的竞争条件;并不能证明之后不会发生数据竞争。所以为了使结果尽量正确,请保证你的测试并发地覆盖到了你到包。
由于需要额外的记录,因此构建时加了竞争检测的程序跑起来会慢一些,且需要更大的内存,即使是这样,这些代价对于很多生产环境的工作来说还是可以接受的。对于一些偶发的竞争条件来说,让竞争检查器来干活可以节省无数日夜的 debugging。
6. 示例: 并发的非阻塞缓存
本节中我们会做一个无阻塞的缓存。下面是我们要设计的 cache 的第一个 “草稿”:
Go
// Package memo provides a concurrency-unsafe
// memoization of a function of type Func.
package memo
// A Memo caches the results of calling a Func.
type Memo struct {
f Func
cache map[string]result
}
// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]result)}
}
// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
return res.value, res.err
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
以并发的形式进行调用:
Go
m := memo.New(httpGetBody)
var n sync.WaitGroup
for url := range incomingURLs() {
n.Add(1)
go func(url string) {
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
fmt.Printf("%s, %s, %d bytes\n",
url, time.Since(start), len(value.([]byte)))
n.Done()
}(url)
}
n.Wait()
func httpGetBody(url string) (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
不幸的是这段代码并不是每次都能够正常工作。比如,cache miss(缓存未命中),或者命中了缓存但却返回了错误的值,或者甚至会直接崩溃。此时我们可以使用 -race 这个 flag 来重新运行程序,竞争检测器会打印像下面这样的报告:
Bash
$ go test -run=TestConcurrent -race -v gopl.io/ch9/memo1
=== RUN TestConcurrent
...
WARNING: DATA RACE
Write by goroutine 36:
runtime.mapassign1()
~/go/src/runtime/hashmap.go:411 +0x0
gopl.io/ch9/memo1.(*Memo).Get()
~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Previous write by goroutine 35:
runtime.mapassign1()
~/go/src/runtime/hashmap.go:411 +0x0
gopl.io/ch9/memo1.(*Memo).Get()
~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Found 1 data race(s)
FAIL gopl.io/ch9/memo1 2.393s1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
memo.go 的第 32 行出现了两次(9 和 15 行),说明有两个 goroutine 在没有同步干预的情况下更新了 cache map,这表明 Get 不是并发安全的,存在数据竞争。
我们可以先简单的使用 sync.Mutex 处理并发问题:
Go
type Memo struct {
f Func
mu sync.Mutex // guards cache
cache map[string]result
}
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
res, ok := memo.cache[key]
memo.mu.Unlock()
if !ok {
res.value, res.err = memo.f(key)
// Between the two critical sections, several goroutines
// may race to compute f(key) and update the map.
memo.mu.Lock()
memo.cache[key] = res
memo.mu.Unlock()
}
return res.value, res.err
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
在这个版本中,在同一时刻请求相同的 URL 会同时发起多个相同的 HTTP 请求,理想情况下我们应避免掉多余的工作,这种 “避免” 工作一般被称为 duplicate suppression(重复抑制/避免):
Go
type entry struct {
res result
ready chan struct{} // closed when res is ready
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]*entry)}
}
type Memo struct {
f Func
mu sync.Mutex // guards cache
cache map[string]*entry
}
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
e := memo.cache[key]
if e == nil {
// This is the first request for this key.
// This goroutine becomes responsible for computing
// the value and broadcasting the ready condition.
e = &entry{ready: make(chan struct{})}
memo.cache[key] = e
memo.mu.Unlock()
e.res.value, e.res.err = memo.f(key)
close(e.ready) // broadcast ready condition
} else {
// This is a repeat request for this key.
memo.mu.Unlock()
<-e.ready // wait for ready condition
}
return e.res.value, e.res.err
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
这样并发、不重复、无阻塞的 cache 就完成了。不顾我们还可以再尝试基于 channel 进行实现:
Go
// A request is a message requesting that the Func be applied to key.
type request struct {
key string
response chan<- result // the client wants a single result
}
type Memo struct{ requests chan request }
// New returns a memoization of f. Clients must subsequently call Close.
func New(f Func) *Memo {
memo := &Memo{requests: make(chan request)}
go memo.server(f)
return memo
}
func (memo *Memo) Get(key string) (interface{}, error) {
response := make(chan result)
memo.requests <- request{key, response}
res := <-response
return res.value, res.err
}
func (memo *Memo) Close() { close(memo.requests) }
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
for req := range memo.requests {
e := cache[req.key]
if e == nil {
// This is the first request for this key.
e = &entry{ready: make(chan struct{})}
cache[req.key] = e
go e.call(f, req.key) // call f(key)
}
go e.deliver(req.response)
}
}
func (e *entry) call(f Func, key string) {
// Evaluate the function.
e.res.value, e.res.err = f(key)
// Broadcast the ready condition.
close(e.ready)
}
func (e *entry) deliver(response chan<- result) {
// Wait for the ready condition.
<-e.ready
// Send the result to the client.
response <- e.res
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
这个例子说明我们无论是用锁,还是 channel 来建立并发程序都是可行的。上面的两种方案并不好说特定情境下哪种更好,不过了解他们还是有价值的。有时候从一种方式切换到另一种可以使你的代码更为简洁(PS:不是说好的 golang 推崇通信并发么)。
7. sync.Mutex、channel 使用指导原则
一般使用指南:
| Channel | Mutex |
|---|---|
| 传递数据所有权 分配工作单元 传递异步结果 | 缓存 状态 |
8. Goroutines 和线程的区别
8.1. 动态栈
每一个 OS 线程都有一个固定大小的内存块(一般会是 2MB)来做栈,这个栈会用来存储当前正在被调用或挂起(指在调用其它函数时)的函数的内部变量。这个固定大小的栈同时很大又很小。因为 2MB 的栈对于一个小小的 goroutine 来说是很大的内存浪费,比如对于我们用到的,一个只是用来 WaitGroup 之后关闭 channel 的 goroutine 来说。而对于 go 程序来说,同时创建成百上千个 goroutine 是非常普遍的,如果每一个 goroutine 都需要这么大的栈的话,那这么多的 goroutine 就不太可能了。除去大小的问题之外,固定大小的栈对于更复杂或者更深层次的递归函数调用来说显然是不够的。修改固定的大小可以提升空间的利用率允许创建更多的线程,并且可以允许更深的递归调用,不过这两者是没法同时兼备的。
相反,一个 goroutine 会以一个很小的栈开始其生命周期,一般只需要 2KB。一个 goroutine 的栈,和操作系统线程一样,会保存其活跃或挂起的函数调用的本地变量,但是和 OS 线程不太一样的是一个 goroutine 的栈大小并不是固定的;栈的大小会根据需要动态地伸缩。而 goroutine 的栈的最大值有 1GB,比传统的固定大小的线程栈要大得多,尽管一般情况下,大多 goroutine 都不需要这么大的栈。
8.2. Goroutine 调度
OS 线程会被操作系统内核调度。每几毫秒,一个硬件计时器会中断处理器,这会调用一个叫作 scheduler 的内核函数。这个函数会挂起当前执行的线程并保存内存中它的寄存器内容,检查线程列表并决定下一次哪个线程可以被运行,并从内存中恢复该线程的寄存器信息,然后恢复执行该线程的现场并开始执行线程。因为操作系统线程是被内核所调度,所以从一个线程向另一个 “移动” 需要完整的上下文切换,也就是说,保存一个用户线程的状态到内存,恢复另一个线程的到寄存器,然后更新调度器的数据结构。这几步操作很慢,因为其局部性很差需要几次内存访问,并且会增加运行的 cpu 周期。
Go 的运行时包含了其自己的调度器,这个调度器使用了一些技术手段,比如 m:n 调度,因为其会在 n 个操作系统线程上多工(调度)m 个 goroutine。Go 调度器的工作和内核的调度是相似的,但是这个调度器只关注单独的 Go 程序中的 goroutine(按程序独立)。
和操作系统的线程调度不同的是,Go 调度器并不是用一个硬件定时器而是被 Go 语言 “建筑” 本身进行调度的。例如当一个 goroutine 调用了 time.Sleep 或者被 channel 调用或者 mutex 操作阻塞时,调度器会使其进入休眠并开始执行另一个 goroutine 直到时机到了再去唤醒第一个 goroutine。因为这种调度方式不需要进入内核的上下文,所以重新调度一个 goroutine 比调度一个线程代价要低得多。
8.3. GOMAXPROCS
Go 的调度器使用了一个叫做 GOMAXPROCS 的变量来决定会有多少个操作系统的线程同时执行 Go 的代码。其默认的值是运行机器上的 CPU 的核心数,所以在一个有 8 个核心的机器上时,调度器一次会在 8 个 OS 线程上去调度 Go 代码。(GOMAXPROCS 是前面说的 m:n 调度中的 n)。在休眠中的或者在通信中被阻塞的 goroutine 是不需要一个对应的线程来做调度的。在 I/O 中或系统调用中或调用非 Go 语言函数时,是需要一个对应的操作系统线程的,但是 GOMAXPROCS 并不需要将这几种情况计算在内。
你可以用 GOMAXPROCS 的环境变量来显式地控制这个参数,或者也可以在运行时用 runtime.GOMAXPROCS 函数来修改它。我们在下面的小程序中会看到 GOMAXPROCS 的效果,这个程序会无限打印 0 和 1。
Go
for {
go fmt.Print(0)
fmt.Print(1)
}1
2
3
4
2
3
4
Bash
$ GOMAXPROCS=1 go run hacker-cliché.go
111111111111111111110000000000000000000011111...1
2
2
Bash
$ GOMAXPROCS=2 go run hacker-cliché.go
010101010101010101011001100101011010010100110...1
2
2
在第一次执行时,最多同时只能有一个 goroutine 被执行。初始情况下只有 main goroutine 被执行,所以会打印很多 1。过了一段时间后,GO 调度器会将其置为休眠,并唤醒另一个 goroutine,这时候就开始打印很多 0 了,在打印的时候,goroutine 是被调度到操作系统线程上的。在第二次执行时,我们使用了两个操作系统线程,所以两个 goroutine 可以一起被执行,以同样的频率交替打印 0 和 1。我们必须强调的是 goroutine 的调度是受很多因子影响的,而 runtime 也是在不断地发展演进的,所以这里的你实际得到的结果可能会因为版本的不同而与我们运行的结果有所不同。
8.4. Goroutine 没有 ID 号
在大多数支持多线程的操作系统和程序语言中,当前的线程都有一个独特的身份(id),并且这个身份信息可以以一个普通值的形式被被很容易地获取到,典型的可以是一个 integer 或者指针值。这种情况下我们做一个抽象化的 thread-local storage(线程本地存储,多线程编程中不希望其它线程访问的内容)就很容易,只需要以线程的 id 作为 key 的一个 map 就可以解决问题,每一个线程以其 id 就能从中获取到值,且和其它线程互不冲突。
goroutine 没有可以被程序员获取到的身份(id)的概念。这一点是设计上故意而为之,由于 thread-local storage 总是会被滥用。比如说,一个 web server 是用一种支持 tls 的语言实现的,而非常普遍的是很多函数会去寻找 HTTP 请求的信息,这代表它们就是去其存储层(这个存储层有可能是 tls)查找的。这就像是那些过分依赖全局变量的程序一样,会导致一种非健康的 “距离外行为”,在这种行为下,一个函数的行为可能不是由其自己内部的变量所决定,而是由其所运行在的线程所决定。因此,如果线程本身的身份会改变——比如一些 worker 线程之类的——那么函数的行为就会变得神秘莫测。
Go 鼓励更为简单的模式,这种模式下参数对函数的影响都是显式的。这样不仅使程序变得更易读,而且会让我们自由地向一些给定的函数分配子任务时不用担心其身份信息影响行为。