Appearance
The Go Programming Language - Goroutines
1. Goroutines
当一个程序启动时,其主函数即在一个单独的 goroutine 中运行,我们叫它 main goroutine。新的 goroutine 会用 go 语句来创建。在语法上,go 语句是一个普通的函数或方法调用前加上关键字 go。go 语句会使其语句中的函数在一个新创建的 goroutine 中运行。而 go 语句本身会迅速地完成。
Go
f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait1
2
2
下面的例子,main goroutine 将计算菲波那契数列的第 45 个元素值。由于计算函数使用低效的递归,所以会运行相当长时间,在此期间我们想让用户看到一个可见的标识来表明程序依然在正常运行,所以来做一个动画的小图标:
Go
func main() {
go spinner(100 * time.Millisecond)
const n = 45
fibN := fib(n) // slow
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}
func spinner(delay time.Duration) {
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}
func fib(x int) int {
if x < 2 {
return x
}
return fib(x-1) + fib(x-2)
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
动画显示了几秒之后,fib(45) 的调用成功地返回,并且打印结果:
Text
Fibonacci(45) = 1134903170然后主函数返回。主函数返回时,所有的 goroutine 都会被直接打断,程序退出。除了从主函数退出或者直接终止程序之外,没有其它的编程方法能够让一个 goroutine 来打断另一个的执行,但是之后可以看到一种方式来实现这个目的,通过 goroutine 之间的通信来让一个 goroutine 请求其它的 goroutine,并让被请求的 goroutine 自行结束执行。
2. Channels
一个 channel 是一个通信机制,它可以让一个 goroutine 通过它给另一个 goroutine 发送值信息。每个 channel 都有一个特殊的类型,也就是 channels 可发送数据的类型。一个可以发送 int 类型数据的 channel 一般写为 chan int。
使用内置的 make 函数,我们可以创建一个 channel:
Go
ch := make(chan int) // ch has type 'chan int'和 map 类似,channel 也对应一个 make 创建的底层数据结构的引用。当我们复制一个 channel 或用于函数参数传递时,我们只是拷贝了一个 channel 引用,因此调用者和被调用者将引用同一个 channel 对象。和其它的引用类型一样,channel 的零值也是 nil。两个相同类型的 channel 可以使用 == 运算符比较。如果两个 channel 引用的是相同的对象,那么比较的结果为真。一个 channel 也可以和 nil 进行比较。
一个 channel 有发送和接受两个主要操作,都是通信行为。一个发送语句将一个值从一个 goroutine 通过 channel 发送到另一个执行接收操作的 goroutine:
Go
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded1
2
3
2
3
Channel 还支持 close 操作,用于关闭 channel,随后对基于该 channel 的任何发送操作都将导致 panic 异常。对一个已经被 close 过的 channel 进行接收操作依然可以接受到之前已经成功发送的数据;如果 channel 中已经没有数据的话将产生一个零值的数据。
使用内置的 close 函数就可以关闭一个 channel:
Go
close(ch)以最简单方式调用 make 函数创建的是一个无缓存的 channel。通过第二个参数可以指定值的类型,第三个参数指定 channel 的容量(如果 channel 的容量大于零,那么该 channel 就是带缓存的 channel):
Go
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 31
2
3
2
3
2.1. 不带缓存的 Channels
一个基于无缓存 Channels 的发送操作将导致发送者 goroutine 阻塞,直到另一个 goroutine 在相同的 Channels 上执行接收操作,当发送的值通过 Channels 成功传输之后,两个 goroutine 可以继续执行后面的语句。反之,如果接收操作先发生,那么接收者 goroutine 也将阻塞,直到有另一个 goroutine 在相同的 Channels 上执行发送操作。
基于无缓存 Channels 的发送和接收操作将导致两个 goroutine 做一次同步操作。因为这个原因,无缓存 Channels 有时候也被称为同步 Channels。
在下面这个示例中,我们将让主 goroutine 等待后台 goroutine 完成工作后再退出:
Go
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // NOTE: ignoring errors
log.Println("done")
done <- struct{}{} // signal the main goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // wait for background goroutine to finish
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
基于 channels 发送消息有两个重要方面。首先每个消息都有一个值,但是有时候通讯的事实和发生的时刻也同样重要。当我们更希望强调通讯发生的时刻时,我们将它称为消息事件。有些消息事件并不携带额外的信息,它仅仅是用作两个 goroutine 之间的同步,这时候我们可以用 struct{} 空结构体作为 channels 元素的类型,虽然也可以使用 bool 或 int 类型实现同样的功能,done <- 1 语句也比 done <- struct{}{} 更短。
2.2. 串联的 Channels(Pipeline)
Channels 也可以用于将多个 goroutine 连接在一起,一个 Channel 的输出作为下一个 Channel 的输入。这种串联的 Channels 就是所谓的管道(pipeline)。下面的程序用两个 channels 将三个 goroutine 串联起来:

下面是一个简单的示例:
Go
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()
// Squarer
go func() {
for {
x := <-naturals
squares <- x * x
}
}()
// Printer (in main goroutine)
for {
fmt.Println(<-squares)
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
如果发送者知道,没有更多的值需要发送到 channel 的话,那么让接收者也能及时知道没有多余的值可接收将是有用的,因为接收者可以停止不必要的接收等待。这可以通过内置的 close 函数来关闭 channel 实现:
Go
close(naturals)实际上关闭一个 channels 还会触发一个广播机制,我们将在后续讨论。
当一个 channel 被关闭后,再向该 channel 发送数据将导致 panic 异常。当一个被关闭的 channel 中已经发送的数据都被成功接收后,后续的接收操作将不再阻塞,它们会立即返回一个零值。关闭上面例子中的 naturals 变量对应的 channel 并不能终止循环,它依然会收到一个永无休止的零值序列,然后将它们发送给打印者 goroutine。
实际上接收操作还有一个变体形式:它多接收一个结果,多接收的第二个结果是一个布尔值,ture 表示成功从 channels 接收到值,false 表示 channels 已经被关闭并且里面没有值可接收。使用这个特性,我们可以修改 squarer 函数中的循环代码,当 naturals 对应的 channel 被关闭并没有值可接收时跳出循环,并且也关闭 squares 对应的 channel:
Go
// Squarer
go func() {
for {
x, ok := <-naturals
if !ok {
break // channel was closed and drained
}
squares <- x * x
}
close(squares)
}()1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
实际上 Go 语言的 range 循环也可直接在 channels 上面迭代,所以我们还可以这样:
Go
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
// Squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()
// Printer (in main goroutine)
for x := range squares {
fmt.Println(x)
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2.3. 单方向的 Channel
当一个 channel 作为一个函数参数时,它一般总是被专门用于只发送或者只接收。为了表明这种意图并防止被滥用,Go 语言的类型系统提供了单方向的 channel 类型,分别用于只发送或只接收的 channel。类型 chan<- int 表示一个只发送 int 的 channel,只能发送不能接收。相反,类型 <-chan int 表示一个只接收 int 的 channel,只能接收不能发送(箭头 <- 和关键字 chan 的相对位置表明了 channel 的方向)。这种限制将在编译期检测。
因为关闭操作只用于断言不再向 channel 发送新的数据,所以只有在发送者所在的 goroutine 才会调用 close 函数,因此对一个只接收的 channel 调用 close 将会导致编译错误。
以下是上个示例的改进版本:
Go
func counter(out chan<- int) {
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
在这个示例中,调用 counter(naturals) 时,naturals 的类型将隐式地从 chan int 转换成 chan<- int。调用 printer(squares) 也会导致相似的隐式转换,这一次是转换为 <-chan int 类型只接收型的 channel。任何双向 channel 向单向 channel 变量的赋值操作都将导致该隐式转换。这里并没有反向转换的语法:也就是不能将一个类似 chan<- int 类型的单向型的 channel 转换为 chan int 类型的双向型的 channel。
2.4. 带缓存的 Channels
带缓存的 Channel 内部持有一个元素队列。队列的最大容量是在调用 make 函数创建 channel 时通过第二个参数指定的。下面的语句创建了一个可以持有三个字符串元素的带缓存 Channel:
Go
ch = make(chan string, 3)
向缓存 Channel 的发送操作就是向内部缓存队列的尾部插入元素,接收操作则是从队列的头部删除元素。如果内部缓存队列是满的,那么发送操作将阻塞直到因另一个 goroutine 执行接收操作而释放了新的队列空间。相反,如果 channel 是空的,接收操作将阻塞直到有另一个 goroutine 执行发送操作而向队列插入元素。
我们可以在无阻塞的情况下连续向新创建的 channel 发送三个值,此时 channel 的内部缓存队列将是满的,如果有第四个发送操作将发生阻塞:
Go
ch <- "A"
ch <- "B"
ch <- "C"1
2
3
2
3

如果我们接收一个值:
Go
fmt.Println(<-ch) // "A"那么 channel 的缓存队列将不是满的也不是空的,因此对该 channel 执行的发送或接收操作都不会发生阻塞。
在某些特殊情况下,程序可能需要知道 channel 内部缓存的容量,可以用内置的 cap 函数获取:
Go
fmt.Println(cap(ch)) // "3"通过内置的 len 函数,可以得到 channel 内部缓存队列中有效元素的个数(因为在并发程序中该信息会随着接收操作而失效,但是它对某些故障诊断和性能优化会有帮助)。
Go
fmt.Println(len(ch)) // "2"在下面这个例子中,它并发地向三个镜像站点发出请求。它们分别将收到的响应发送到带缓存 channel,最后接收者只接收第一个收到的响应,也就是最快的那个响应。因此 mirroredQuery 函数可能在另外两个响应慢的镜像站点响应之前就返回了结果:
Go
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
如果我们使用了无缓存的 channel,那么两个慢的 goroutines 将会因为没有人接收而被永远卡住。这种情况,称为 goroutines 泄漏,这将是一个 BUG。和垃圾变量不同,泄漏的 goroutines 并不会被自动回收,因此确保每个不再需要的 goroutine 能正常退出是重要的。
3. 示例:并发的循环
下面的程序会循环迭代一些图片文件名,并为每一张图片生成一个缩略图:
Go
// makeThumbnails makes thumbnails of the specified files.
func makeThumbnails(filenames []string) {
for _, f := range filenames {
if _, err := thumbnail.ImageFile(f); err != nil {
log.Println(err)
}
}
}1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
通过简单的修改,将其改成并发版本:
Go
// NOTE: incorrect!
func makeThumbnails2(filenames []string) {
for _, f := range filenames {
go thumbnail.ImageFile(f) // NOTE: ignoring errors
}
}1
2
3
4
5
6
2
3
4
5
6
这里需要注意的是 makeThumbnails2 在它还没有完成工作之前就已经返回了:它启动了所有的 goroutine,但没有等待它们一直到执行完毕。
于是我们再稍作修改,使其能够等待所有的 goroutine 执行完毕:
Go
// makeThumbnails3 makes thumbnails of the specified files in parallel.
func makeThumbnails3(filenames []string) {
ch := make(chan struct{})
for _, f := range filenames {
go func(f string) {
thumbnail.ImageFile(f) // NOTE: ignoring errors
ch <- struct{}{}
}(f)
}
// Wait for goroutines to complete.
for range filenames {
<-ch
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
注意以上第 11 行,因为 filenames 是一个 slice 对象,所以我们可以确切的知道一共有 len(filenames) 个内部 goroutine,所以我们直接迭代执行 len(filenames) 次 <-ch 就可以了。
另外在以上第 5 ~ 8 行中,我们将 f 的值作为一个显式的变量传给了函数,而不是直接在闭包中引用(以下是错误的写法):
Go
for _, f := range filenames {
go func() {
thumbnail.ImageFile(f) // NOTE: incorrect!
// ...
}()
}1
2
3
4
5
6
2
3
4
5
6
在 makeThumbnails3 中,我们忽略了对错误的处理,所以我们再稍加修改,使其能够返回 goroutine 遇到的第一个错误:
Go
// makeThumbnails4 makes thumbnails for the specified files in parallel.
// It returns an error if any step failed.
func makeThumbnails4(filenames []string) error {
errors := make(chan error)
for _, f := range filenames {
go func(f string) {
_, err := thumbnail.ImageFile(f)
errors <- err
}(f)
}
for range filenames {
if err := <-errors; err != nil {
return err // NOTE: incorrect: goroutine leak!
}
}
return nil
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
注意现在在 makeThumbnails4 中有一个严重的错误:当它遇到第一个非 nil 的 error 时会直接将 error 返回到调用方,并没有处理排空 errors channel,这样当剩下的 worker goroutine 再向 errors channel 中发送值时,会永远地阻塞下去,并且永远都不会退出。这种情况叫做 goroutine 泄露,可能会导致整个程序卡住或者跑出 out of memory 的错误。
下个版本,makeThumbnails5 使用了一个 buffered channel(第 10 行)来返回生成的图片文件的名字以及生成时的错误:
Go
// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
type item struct {
thumbfile string
err error
}
ch := make(chan item, len(filenames))
for _, f := range filenames {
go func(f string) {
var it item
it.thumbfile, it.err = thumbnail.ImageFile(f)
ch <- it
}(f)
}
for range filenames {
it := <-ch
if it.err != nil {
return nil, it.err
}
thumbfiles = append(thumbfiles, it.thumbfile)
}
return thumbfiles, nil
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
在最后一个版本中,makeThumbnails6 返回了所有缩略图文件的大小总计数(bytes)。和前面的版本都不一样的一点是我们在这个版本里没有把文件名放在 slice 里,而是通过一个 string 的 channel 传过来,所以我们无法对循环的次数进行预测。
为了知道最后一个 goroutine 什么时候结束,我们需要一个递增的计数器,在每一个 goroutine 启动时加一,在 goroutine 退出时减一。这需要一种特殊的计数器,这个计数器需要在多个 goroutine 操作时做到安全并且提供在其减为零之前一直等待的一种方法。这种计数类型被称为 sync.WaitGroup,下面的代码就用到了这种方法:
Go
// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {
sizes := make(chan int64)
var wg sync.WaitGroup // number of working goroutines
for f := range filenames {
wg.Add(1)
// worker
go func(f string) {
defer wg.Done() // wg.Add(-1)
thumb, err := thumbnail.ImageFile(f)
if err != nil {
log.Println(err)
return
}
info, _ := os.Stat(thumb) // OK to ignore error
sizes <- info.Size()
}(f)
}
// closer
go func() {
wg.Wait()
close(sizes)
}()
var total int64
for size := range sizes {
total += size
}
return total
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
注意以上第 22 ~ 25 行,我们另起了一个单独的 goroutine 来执行 wait 和 close 操作,为何要这样处理呢?设想一下,如果我们不这样另起一个 closer goroutine,而是直接将该操作放在 main goroutine 中运行,我们会遇到什么问题?这里我们再分为两种情况:
- 若将该操作放在 for range
sizes前运行;则在第一个 worker goroutine 执行完毕后,sizes已被写入一条消息,并且该消息无法被消费,导致后续的 worker goroutine 写sizes阻塞,而 wait 操作将也无法完成,程序陷入死循环; - 若将该操作放在 for range
sizes后运行;则将因为sizes无法关闭,而导致程序一致阻塞在 for rangesizes;
makeThumbnails6 中的事件顺序示意图:

4. 示例:并发的 Web 爬虫
改造前的版本(单线程 + 广度优先):
Go
// breadthFirst calls f for each item in the worklist.
// Any items returned by f are added to the worklist.
// f is called at most once for each item.
func breadthFirst(f func(item string) []string, worklist []string) {
seen := make(map[string]bool)
for len(worklist) > 0 {
items := worklist
worklist = nil
for _, item := range items {
if !seen[item] {
seen[item] = true
worklist = append(worklist, f(item)...)
}
}
}
}
func crawl(url string) []string {
fmt.Println(url)
list, err := links.Extract(url)
if err != nil {
log.Print(err)
}
return list
}
func main() {
// Crawl the web breadth-first,
// starting from the command-line arguments.
breadthFirst(crawl, os.Args[1:])
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
初步改造后的版本(worklist 由原来的 slice 替换成了 channel):
Go
func main() {
worklist := make(chan []string)
// Start with the command-line arguments.
go func() { worklist <- os.Args[1:] }()
// Crawl the web concurrently.
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
现在这个改造后的版本中有个两个比较重要的问题:
这个程序永远都不会终止。这里我们可以使用一个
n变量对worklist的数量进行计数:Gofunc main() { worklist := make(chan []string) var n int // number of pending sends to worklist // Start with the command-line arguments. n++ go func() { worklist <- os.Args[1:] }() // Crawl the web concurrently. seen := make(map[string]bool) for ; n > 0; n-- { list := <-worklist for _, link := range list { if !seen[link] { seen[link] = true n++ go func(link string) { worklist <- crawl(link) }(link) } } } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24就是 “太过于并行” 了,我们需要对 crawler goroutine 的数量做一个限制。我们可以用一个有容量限制的 buffered channel 来控制并发,这类似于操作系统里的计数信号量概念。从概念上讲,channel 里的 n 个空槽代表 n 个可以处理内容的 token(通行证),从 channel 里接收一个值会释放其中的一个 token,并且生成一个新的空槽位。这样保证了在没有接收介入时最多有 n 个发送操作。重写后的
crawl函数:Go// tokens is a counting semaphore used to // enforce a limit of 20 concurrent requests. var tokens = make(chan struct{}, 20) func crawl(url string) []string { fmt.Println(url) tokens <- struct{}{} // acquire a token list, err := links.Extract(url) <-tokens // release the token if err != nil { log.Print(err) } return list }1
2
3
4
5
6
7
8
9
10
11
12
13
14或者我们也可以不改造
crawl函数,换一种避免过度并发的思路:使用 20 个常驻的 crawler goroutine,以此来保证最多只有 20 个 HTTP 并发请求:Gofunc main() { worklist := make(chan []string) // lists of URLs, may have duplicates unseenLinks := make(chan string) // de-duplicated URLs // Add command-line arguments to worklist. go func() { worklist <- os.Args[1:] }() // Create 20 crawler goroutines to fetch each unseen link. for i := 0; i < 20; i++ { go func() { for link := range unseenLinks { foundLinks := crawl(link) go func() { worklist <- foundLinks }() } }() } // The main goroutine de-duplicates worklist items // and sends the unseen ones to the crawlers. seen := make(map[string]bool) for list := range worklist { for _, link := range list { if !seen[link] { seen[link] = true unseenLinks <- link } } } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29这里需要注意第 13 行,我们使用了一个额外的 goroutine 来将
foundLinks发送到worklist,其目的是为了避免死锁:设想一下,当 main goroutine 执行到第 25 行时,worklist中有数据未取出,unseenLinks中也有数据未取出,那么 main goroutine 将阻塞在第 25 行,而所有的 crawler goroutine 也因为worklist中已有数据而未能立即写入导致阻塞,便产生了死锁。
5. 基于 select 的多路复用
下面的程序模拟火箭发射的倒计时(time.Tick 函数返回一个 channel,程序会周期性地像一个节拍器一样向这个 channel 发送事件,每一个事件的值是一个时间戳):
Go
func main() {
fmt.Println("Commencing countdown.")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
<-tick
}
launch()
}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
现在让我们稍加修改一下这个程序,使其能够支持当用户按下 return 键时直接中断发射流程:
Go
func main() {
// create abort channel
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
abort <- struct{}{}
}()
fmt.Println("Commencing countdown. Press return to abort.")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
select {
case <-tick:
// Do nothing.
case <-abort:
fmt.Println("Launch aborted!")
return
}
}
launch()
}在上面这个例子中我们使用到了 select 语句,该例子中的 select 语句会一直等待,直到两个事件中的其中一个发生,无论是 abort 事件还是 tick 事件。
Note
当以上程序执行完
for循环后,它会停止从tick中接收事件,但是 ticker goroutine 还依然存活着,它会继续徒劳地尝试向tickchannel 中发送值。不过这时候已经没有其它的 goroutine 会从该 channel 中接收值了,这种情况被称为 goroutine 泄露。
time.Tick函数虽然挺方便的,但是只有当程序整个生命周期都需要这个时间时我们使用它才比较合适。否则的话,我们推荐以下这种写法:Goticker := time.NewTicker(1 * time.Second) <-ticker.C // receive from the ticker's channel ticker.Stop() // cause the ticker's goroutine to terminate1
2
3
下面是 select 语句的一般形式:
Go
select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
和 switch 语句稍微有点相似,也会有几个 case 和最后的 default 选择支。每一个 case 代表一个通信操作(在某个 channel 上进行发送或者接收)并且会包含一些语句组成的一个语句块。
select 会等待 case 中有能够执行的 case 时去执行(当 select 语句中不含 default 语句块时)。当条件满足时,select 才会去通信并执行 case 之后的语句(其它 case 中的通信是不会执行的)。如果有多个 case 同时就绪,select 会随机地选择一个执行,这样来保证每一个 channel 都有平等的被选择的机会。
一个没有任何
case的select语句写作select{},会永远地等待下去。
另外 select 还可以实现 channel 的非阻塞操作,实现方式也非常简单,那就是在 select 语句中申明 default 语句块。比如下面这个例子实现了 channel 的轮询机制:
Go
select {
case <-abort:
fmt.Printf("Launch aborted!\n")
return
default:
// do nothing
}1
2
3
4
5
6
7
2
3
4
5
6
7
Note
channel 的零值是
nil,对一个nil的 channel 发送和接收操作会永远阻塞,在select语句中nilchannel 的操作永远都不会被 select 到。这使得我们可以通过nil来激活或者禁用case,来达成处理其它输入或输出事件时超时和取消的逻辑,稍后我们将会看到这样的例子。
6. 示例: 并发的目录遍历
下面这个程序可以输出指定目录的硬盘占用空间情况:
Go
package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
)
func main() {
// Determine the initial directories.
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
// Traverse the file tree.
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
// Print the results.
var nfiles, nbytes int64
for size := range fileSizes {
nfiles++
nbytes += size
}
printDiskUsage(nfiles, nbytes)
}
// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
下面是一个改进后的版本,它可以间歇性的输出磁盘空间占用情况(当我们提供了 -v 的 flag),如果 -v 的 flag 在运行时没有传入的话,tick 这个 channel 会保持为 nil,这样在 select 里的 case 也就相当于被禁用了。
Go
var verbose = flag.Bool("v", false, "show verbose progress messages")
func main() {
// ...start background goroutine...
// Print the results periodically.
var tick <-chan time.Time
if *verbose {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes was closed
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes) // final totals
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Note
由于我们的程序不再使用 range 循环,所以我们在第一个
select的case中必须显式地判断fileSizes的 channel 是不是已经被关闭了(第 15 ~ 18 行)。
在以下第三个版本中,我们对每一个 walkDir 的调用创建一个新的 goroutine:
Go
func main() {
// ...determine roots...
// Traverse each root of the file tree in parallel.
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()
// ...select loop...
}
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
// sema is a counting semaphore for limiting concurrency in dirents.
var sema = make(chan struct{}, 20)
// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
sema <- struct{}{} // acquire token
defer func() { <-sema }() // release token
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Note
- 我们使用了一个
sync.WaitGroup来对仍旧活跃的walkDir调用进行计数,并另起一个 goroutine,在计数器减为零的时候将fileSizes这个 channel 进行关闭;- 另外我们还修改了
dirents函数,用一个计数信号量来阻止他同时打开太多的文件;
7. 并发的退出
当我们关闭了一个 channel 并且消费掉了所有已发送的值之后,后续再从 channel 取值的代码可以立即被执行,并且返回一个零值。利用这个特性,我们可以将其作为一个广播机制:不向 channel 发送值,而是关闭 channel。
8. 示例: 聊天服务
我们用一个聊天服务器来终结本章节的内容,这个程序可以让一些用户通过服务器向其它所有用户广播文本消息。这个程序中有四种 goroutine。main 和 broadcaster 各自是一个 goroutine 实例,每一个客户端的连接都会有一个 handleConn 和 clientWriter 的 goroutine。broadcaster 是 select 用法的不错的样例,因为它需要处理三种不同类型的消息。
下面演示的 main goroutine 的工作,是 listen 和 accept(译注:网络编程里的概念)从客户端过来的连接。对每一个连接,程序都会建立一个新的 handleConn 的 goroutine,就像我们在本章开头的并发的 echo 服务器里所做的那样。
Go
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
go broadcaster()
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
然后是 broadcaster 的 goroutine。他的内部变量 clients 会记录当前建立连接的客户端集合。其记录的内容是每一个客户端的消息发出 channel 的 “资格” 信息。
Go
type client chan<- string // an outgoing message channel
var (
entering = make(chan client)
leaving = make(chan client)
messages = make(chan string) // all incoming client messages
)
func broadcaster() {
clients := make(map[client]bool) // all connected clients
for {
select {
case msg := <-messages:
// Broadcast incoming message to all
// clients' outgoing message channels.
for cli := range clients {
cli <- msg
}
case cli := <-entering:
clients[cli] = true
case cli := <-leaving:
delete(clients, cli)
close(cli)
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
broadcaster 监听来自全局的 entering 和 leaving 的 channel 来获知客户端的到来和离开事件。当其接收到其中的一个事件时,会更新 clients 集合,当该事件是离开行为时,它会关闭客户端的消息发出 channel。broadcaster 也会监听全局的消息 channel,所有的客户端都会向这个 channel 中发送消息。当 broadcaster 接收到什么消息时,就会将其广播至所有连接到服务端的客户端。
现在让我们看看每一个客户端的 goroutine。handleConn 函数会为它的客户端创建一个消息发出 channel 并通过 entering channel 来通知客户端的到来。然后它会读取客户端发来的每一行文本,并通过全局的消息 channel 来将这些文本发送出去,并为每条消息带上发送者的前缀来标明消息身份。当客户端发送完毕后,handleConn 会通过 leaving 这个 channel 来通知客户端的离开并关闭连接。
Go
func handleConn(conn net.Conn) {
ch := make(chan string) // outgoing client messages
go clientWriter(conn, ch)
who := conn.RemoteAddr().String()
ch <- "You are " + who
messages <- who + " has arrived"
entering <- ch
input := bufio.NewScanner(conn)
for input.Scan() {
messages <- who + ": " + input.Text()
}
// NOTE: ignoring potential errors from input.Err()
leaving <- ch
messages <- who + " has left"
conn.Close()
}
func clientWriter(conn net.Conn, ch <-chan string) {
for msg := range ch {
fmt.Fprintln(conn, msg) // NOTE: ignoring network errors
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
另外,handleConn 为每一个客户端创建了一个 clientWriter 的 goroutine 来接收向客户端发出消息 channel 中发送的广播消息,并将它们写入到客户端的网络连接。客户端的读取方循环会在 broadcaster 接收到 leaving 通知并关闭了 channel 后终止。
下面演示的是当服务器有两个活动的客户端连接,并且在两个窗口中运行的情况,使用 netcat 来聊天:
Bash
$ go build gopl.io/ch8/chat
$ go build gopl.io/ch8/netcat3
$ ./chat &
$ ./netcat3
You are 127.0.0.1:64208 $ ./netcat3
127.0.0.1:64211 has arrived You are 127.0.0.1:64211
Hi!
127.0.0.1:64208: Hi!
127.0.0.1:64208: Hi!
Hi yourself.
127.0.0.1:64211: Hi yourself. 127.0.0.1:64211: Hi yourself.
^C
127.0.0.1:64208 has left
$ ./netcat3
You are 127.0.0.1:64216 127.0.0.1:64216 has arrived
Welcome.
127.0.0.1:64211: Welcome. 127.0.0.1:64211: Welcome.
^C
127.0.0.1:64211 has left”1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
当与 n 个客户端保持聊天 session 时,这个程序会有 2n + 2 个并发的 goroutine,然而这个程序却并不需要显式的锁。clients 这个 map 被限制在了一个独立的 goroutine 中,broadcaster,所以它不能被并发地访问。多个 goroutine 共享的变量只有这些 channel 和 net.Conn 的实例,两个东西都是并发安全的。我们会在下一章中更多地解决约束,并发安全以及 goroutine 中共享变量的含义。