Appearance
Go Channel 原理
1. 设计原理
1.1. 先入先出
目前的 Channel 收发操作均遵循了先进先出的设计,具体规则如下:
- 先从 Channel 读取数据的 Goroutine 会先接收到数据;
- 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;
这种 FIFO 的设计是相对好理解的,但是稍早的 Go 语言实现却没有严格遵循这一语义,我们能在 runtime: make sure blocked channels run operations in FIFO order 中找到关于带缓冲区的 Channel 在执行收发操作时没有遵循先进先出的讨论。
- 发送方会向缓冲区中写入数据,然后唤醒接收方,多个接收方会尝试从缓冲区中读取数据,如果没有读取到会重新陷入休眠;
- 接收方会从缓冲区中读取数据,然后唤醒发送方,发送方会尝试向缓冲区写入数据,如果缓冲区已满会重新陷入休眠;
这种基于重试的机制会导致 Channel 的处理不会遵循先进先出的原则。经过 runtime: simplify buffered channels 和 runtime: simplify chan ops, take 2 两个提交的修改,带缓冲区和不带缓冲区的 Channel 都会遵循先入先出发送和接收数据。
1.2. 无锁管道
Go 语言社区在 2014 年提出了无锁 Channel 的实现方案,该方案将 Channel 分成了以下三种类型:
- 同步 Channel — 不需要缓冲区,发送方会直接将数据交给(Handoff)接收方;
- 异步 Channel — 基于环形缓存的传统生产者消费者模型;
chan struct{}类型的异步 Channel —struct{}类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义;
这个提案的目的也不是实现完全无锁的队列,只是在一些关键路径上通过无锁提升 Channel 的性能。社区中已经有无锁 Channel 的实现,但是在实际的基准测试中,无锁队列在多核测试中的表现还需要进一步的改进。
因为目前通过 CAS 实现的无锁 Channel 没有提供先进先出的特性,所以该提案暂时也被搁浅了。
原子指令 CAS(compare-and-swap 或者 compare-and-set)在多线程中同步数据,无锁队列的实现也依赖这一原子指令。
2. 数据结构
Go 语言的 Channel 在运行时使用 runtime.hchan 结构体表示。我们在 Go 语言中创建新的 Channel 时,实际上创建的都是如下所示的结构:
Go
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
runtime.hchan 结构体中的五个字段 qcount、dataqsiz、buf、sendx、recv 构建底层的循环队列:
qcount— Channel 中的元素个数;dataqsiz— Channel 中的循环队列的长度;buf— Channel 的缓冲区数据指针;sendx— Channel 的发送操作处理到的位置;recvx— Channel 的接收操作处理到的位置;
除此之外,elemsize 和 elemtype 分别表示当前 Channel 能够收发的元素类型和大小;sendq 和 recvq 存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog 结构:
Go
type waitq struct {
first *sudog
last *sudog
}1
2
3
4
2
3
4
runtime.sudog 表示一个在等待列表中的 Goroutine,该结构中存储了两个分别指向前后 runtime.sudog 的指针以构成链表。
3. 发送数据
在发送数据的逻辑执行之前会先为当前 Channel 加锁,防止多个线程并发修改数据。
使用 ch <- i 表达式向 Channel 发送数据时遇到的几种情况:
如果当前 Channel 的
recvq上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;需要注意的是,发送数据的过程只是将接收方的 Goroutine 放到了处理器的
runnext中,程序没有立刻执行该 Goroutine。如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区
sendx所在的位置上,并增加sendx索引和qcount计数器;buf是一个循环数组,所以当sendx等于dataqsiz时会重新回到数组开始的位置。如果不满足上面的两种情况,会创建一个
runtime.sudog结构并将其加入 Channel 的sendq队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据(使用select关键字可以向 Channel 非阻塞地发送消息);
发送数据的过程中包含几个会触发 Goroutine 调度的时机:
- 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的
runnext属性,但是并不会立刻触发调度; - 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 Channel 的
sendq队列并调用runtime.goparkunlock触发 Goroutine 的调度让出处理器的使用权;
4. 接收数据
从 Channel 中接收数据时可能会发生的五种情况:
- 如果 Channel 为空,那么会直接调用
runtime.gopark挂起当前 Goroutine; - 如果 Channel 已经关闭并且缓冲区没有任何数据,
runtime.chanrecv会直接返回; - 如果 Channel 的
sendq队列中存在挂起的 Goroutine,会将recvx索引所在的数据拷贝到接收变量所在的内存空间上并将sendq队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方(调用runtime.goready将当前处理器的runnext设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒); - 如果 Channel 的缓冲区中包含数据,那么直接读取
recvx索引对应的数据; - 当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞的(与
select语句结合使用时就可能会使用到非阻塞的接收操作),将runtime.sudog结构加入recvq队列并陷入休眠等待调度器的唤醒;
从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:
- 当 Channel 为空时;
- 当缓冲区中不存在数据并且也不存在数据的发送者时;