Go Channel 拾遗

date
May 23, 2022
slug
go-channel
status
Published
tags
Go
type
Post
https://twitter.com/tangdahe
summary
go channel
作为 Go 核心的数据结构和 Goroutine 之间的通信方式,Channel 是支撑 Go 语言高性能并发编程模型的重要结构本篇会详细探索管道 Channel 的设计原理、数据结构和常见操作,例如 Channel 的创建、发送、接收和关闭。

设计原理

CSP 思想

Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而要通过通信来实现内存共享。CSP 经常被认为是 Go 在并发编程上成功的关键因素。CSP 全称是 “Communicating Sequential Processes”,这也是 Tony Hoare 在 1978 年发表在 ACM 的一篇论文。论文里指出一门编程语言应该重视 input 和 output 的原语,尤其是并发编程的代码。
在文章中,CSP 也是一门自定义的编程语言,作者定义了输入输出语句,用于 processes 间的通信(communication)。processes 被认为是需要输入驱动,并且产生输出,供其他 processes 消费,processes 可以是进程、线程、甚至是代码块。输入命令是:!,用来向 processes 写入;输出是:?,用来从 processes 读出。
  • 不要通过共享内存来通信:不要直接让多个 Goroutines 访问和修改共享的内存,因为这样可能导致数据竞争和难以调试的问题。Go 鼓励在不同的 Goroutines 之间传递消息而不是直接访问共享的内存。
  • 而要通过通信来实现内存共享:这表示在多个 Goroutines 之间共享数据时,通过通道进行通信。通道充当了数据共享的媒介,通过发送和接收操作,Goroutines 可以安全地交换信息。这种方式可以有效避免常见的并发问题。
在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程竞争,我们需要限制同一时间能够读写这些变量的线程数量,然而这与 Go 语言鼓励的设计并不相同。
多线程使用共享内存传递数据
多线程使用共享内存传递数据
Go 语言中借用 CSP 思想,使用通信的方式来共享内存,其具体实现中会通过 Channel 来传递数据。
下图中会创建两个 Goroutine,一个会向 Channel 发送数据,一个会从 Channel 中接收数据,两者之间独立运行并不存在直接联系,但其通信可以通过 Channel 间接完成。
Goroutine 使用 Channel 传递数据
Goroutine 使用 Channel 传递数据
 

无锁管道 

锁是一种常见的并发控制技术,我们一般会将锁分成乐观锁和悲观锁,即乐观并发控制和悲观并发控制,无锁(lock-free)队列更准确的描述是使用乐观并发控制的队列。乐观并发控制也叫乐观锁,很多人都会误以为乐观锁是与悲观锁差不多,然而它并不是真正的锁,只是一种并发控制的思想。
乐观并发控制本质上是基于验证的协议,我们使用原子指令 CAS(compare-and-swap 或者 compare-and-set)在多线程中同步数据,无锁队列的实现也依赖这一原子指令。
Channel 在运行时的内部表示是 runtime.hchan,该结构体中包含了用于保护成员变量的互斥锁,从某种程度上说,Channel 是一个用于同步和通信的有锁队列,使用互斥锁解决程序中可能存在的线程竞争问题是很常见的,我们能很容易地实现有锁队列。
然而锁导致的休眠和唤醒会带来额外的上下文切换,如果临界区过大,加锁解锁导致的额外开销就会成为性能瓶颈。1994 年的论文 Implementing lock-free queues 就研究了如何使用无锁的数据结构实现先进先出队列,而 Go 语言社区也在 2014 年提出了无锁 Channel 的实现方案,该方案将 Channel 分成了以下三种类型:
  • 同步 Channel — 不需要缓冲区,发送方会直接将数据交给(Handoff)接收方;
  • 异步 Channel — 基于环形缓存的传统生产者消费者模型;
  • chan struct{} 类型的异步 Channel — struct{} 类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义;
这个提案的目的也不是实现完全无锁的队列,只是在一些关键路径上通过无锁提升 Channel 的性能。社区中已经有无锁 Channel 的实现,但是在实际的基准测试中,无锁队列在多核测试中的表现还需要进一步的改进。
因为目前通过 CAS 实现的无锁 Channel 没有提供先进先出的特性,所以该提案暂时也被搁浅了。

Channel 底层数据结构

Go 语言的 Channel 在运行时使用 runtime.hchan 结构体表示。我们在 Go 语言中创建新的 Channel 时,实际上创建的都是如下所示的结构:
其中 sendq 和 recvq 存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog 结构:
runtime.sudog 表示一个在等待列表中的 Goroutine,该结构中存储了两个分别指向前后 runtime.sudog 的指针以构成链表。
lock 用来保证每个读 channel 或写 channel 的操作都是原子的。

例如创建一个容量为 6 的,元素为 int 类型的 channel 数据结构如下:
notion image

创建管道

一般而言,使用 make 创建一个能收能发的通道:
Go 语言中所有 Channel 的创建都会使用 make 关键字。编译器会将 make(chan int, 10) 表达式转换成 OMAKE 类型的节点,并在类型检查阶段将 OMAKE 类型的节点转换成 OMAKECHAN 类型:
这一阶段会对传入 make 关键字的缓冲区大小进行检查,如果我们不向 make 传递表示缓冲区大小的参数,那么就会设置一个默认值 0,也就是当前的 Channel 不存在缓冲区。
OMAKECHAN 类型的节点最终都会在 SSA 中间代码生成阶段之前被转换成调用 runtime.makechan 或者 runtime.makechan64 的函数:
runtime.makechan 和 runtime.makechan64 会根据传入的参数类型和缓冲区大小创建一个新的 Channel 结构,其中后者用于处理缓冲区大小大于 2 的 32 次方的情况,因为这在 Channel 中并不常见,所以我们重点关注 runtime.makechan
在函数的最后会统一更新 runtime.hchan 的 elemsizeelemtype 和 dataqsiz 几个字段。
关于指针类型和值类型,当使用 channel 时,通道中是否包含指针是取决于通道的元素类型的。如果通道的元素类型本身是指针类型,通道中就包含指针;如果通道的元素类型是非指针类型,通道中就不包含指针。前者通道中包含指向实际数据的指针,节省内存和性能开销;后者通道中包含的是元素的副本,需要进行数据的拷贝。

发送数据

当我们想要向 Channel 发送数据时,就需要使用 ch <- i 语句,编译器会将它解析成 OSEND 节点并在 cmd/compile/internal/gc.walkexpr 中转换成 runtime.chansend1
runtime.chansend1 只是调用了 runtime.chansend 并传入 Channel 和需要发送的数据。runtime.chansend 是向 Channel 中发送数据时一定会调用的函数,该函数包含了发送数据的全部逻辑,如果我们在调用时将 block 参数设置成 true,那么表示当前发送操作是阻塞的:
因为 runtime.chansend 函数的实现比较复杂,所以我们这里将该函数的执行过程分成以下的三个部分:
  • 当存在等待的接收者时,通过 runtime.send 直接将数据发送给阻塞的接收者;
  • 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区;
  • 当不存在缓冲区或者缓冲区已满时,等待其他 Goroutine 从 Channel 接收数据;

直接发送

如果目标 Channel 没有被关闭并且已经有处于读等待的 Goroutine,那么 runtime.chansend 会从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向它发送数据:
 
下图展示了 Channel 中存在等待数据的 Goroutine 时,向 Channel 发送数据的过程:
直接发送数据的过程
直接发送数据的过程
发送数据时会调用 runtime.send,该函数的执行可以分成两个部分:
  1. 调用 runtime.sendDirect 将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上;
  1. 调用 runtime.goready 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;
需要注意的是,发送数据的过程只是将接收方的 Goroutine 放到了处理器的 runnext 中,程序没有立刻执行该 Goroutine。
这里涉及到一个 goroutine 直接写另一个 goroutine 栈的操作,一般而言,不同 goroutine 的栈是各自独有的。而这也违反了 GC 的一些假设。为了不出问题,写的过程中增加了写屏障,保证正确地完成写操作。这样做的好处是减少了一次内存 copy:不用先拷贝到 channel 的 buf,直接由发送者到接收者,效率得以提高。
然后,解锁、唤醒接收者,等待调度器的光临,接收者也得以重见天日,可以继续执行接收操作之后的代码了。
  • 如果 c.qcount < c.dataqsiz,说明缓冲区可用(肯定是缓冲型的 channel)。先通过函数取出待发送元素应该去到的位置:

缓冲区

如果创建的 Channel 包含缓冲区并且 Channel 中的数据没有装满,会执行下面这段代码:
在这里我们首先会使用 runtime.chanbuf 计算出下一个可以存储数据的位置,然后通过 runtime.typedmemmove 将发送的数据拷贝到缓冲区中并增加 sendx 索引和 qcount 计数器。
向缓冲区写入数据
向缓冲区写入数据
如果当前 Channel 的缓冲区未满,向 Channel 发送的数据会存储在 Channel 的 sendx 索引所在的位置并将 sendx 索引加一。因为这里的 buf 是一个循环数组,所以当 sendx 等于 dataqsiz 时会重新回到数组开始的位置。

阻塞发送

当 Channel 没有接收者能够处理数据时,向 Channel 发送数据会被下游阻塞,当然使用 select 关键字可以向 Channel 非阻塞地发送消息。向 Channel 阻塞地发送数据会执行下面的代码,我们可以简单梳理一下这段代码的逻辑:
  1. 调用 runtime.getg 获取发送数据使用的 Goroutine;
  1. 执行 runtime.acquireSudog 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息,例如发送的 Channel、是否在 select 中和待发送数据的内存地址等;
  1. 将刚刚创建并初始化的 runtime.sudog 加入发送等待队列,并设置到当前 Goroutine 的 waiting 上,表示 Goroutine 正在等待该 sudog 准备就绪;
  1. 调用 runtime.goparkunlock 将当前的 Goroutine 陷入沉睡等待唤醒;
  1. 被调度器唤醒后会执行一些收尾工作,将一些属性置零并且释放 runtime.sudog 结构体;
函数在最后会返回 true 表示这次我们已经成功向 Channel 发送了数据。

小结

我们在这里可以简单梳理和总结一下使用 ch <- i 表达式向 Channel 发送数据时遇到的几种情况:
  1. 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;
  1. 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上;
  1. 如果不满足上面的两种情况,会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;
发送数据的过程中包含几个会触发 Goroutine 调度的时机:
  1. 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度;
  1. 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 Channel 的 sendq 队列并调用 runtime.goparkunlock 触发 Goroutine 的调度让出处理器的使用权;
 

案例分析

在发送小节里我们说到 G1 和 G2 现在被挂起来了,等待 sender 的解救。在第 17 行ch <- 3,主协程向 ch 发送了一个元素 3,来看下接下来会发生什么。
根据前面源码分析的结果,我们知道,sender 发现 ch 的 recvq 里有 receiver 在等待着接收,就会出队一个 sudog,把 recvq 里 first 指针的 sudo “推举”出来了,并将其加入到 P 的可运行 goroutine 队列中。
然后,sender 把发送元素拷贝到 sudog 的 elem 地址处,最后会调用 goready 将 G1 唤醒,状态变为 runnable。
notion image
当调度器光顾 G1 时,将 G1 变成 running 状态,执行 goroutineA 接下来的代码。G 表示其他可能有的 goroutine。
这里其实涉及到一个协程写另一个协程栈的操作。有两个 receiver 在 channel 的一边虎视眈眈地等着,这时 channel 另一边来了一个 sender 准备向 channel 发送数据,为了高效,用不着通过 channel 的 buf “中转”一次,直接从源地址把数据 copy 到目的地址就可以了,效率高啊!
notion image
上图是一个示意图,3 会被拷贝到 G1 栈上的某个位置,也就是 val 的地址处,保存在 elem 字段。
 
 

接收数据

我们接下来继续介绍 Channel 操作的另一方:接收数据。Go 语言中可以使用两种不同的方式去接收 Channel 中的数据:
一种带 “ok”,反应 channel 是否关闭;一种不带 “ok”,这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。两种写法,都有各自的应用场景。这两种不同的方法经过编译器的处理都会变成 ORECV 类型的节点,后者会在类型检查阶段被转换成 OAS2RECV 类型。数据的接收操作遵循以下的路线图:
notion image
虽然不同的接收方式会被转换成 runtime.chanrecv1 和 runtime.chanrecv2 两种不同函数的调用,但是这两个函数最终还是会调用 runtime.chanrecv
当我们从一个空 Channel 接收数据时会直接调用 runtime.gopark 让出处理器的使用权。
当一个 Go channel 已经关闭,但它的缓冲区中还有数据时,仍然可以从中读取数据。关闭 channel 只是通知接收方不会再有新的数据发送到 channel 中,但接收方仍然可以从缓冲区中读取已经存在的数据。直至数据为空,返回一个零值和 false。
除了上述两种特殊情况,使用 runtime.chanrecv 从 Channel 接收数据时还包含以下三种不同情况:
  • 当存在等待的发送者时,通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据;
  • 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据;
  • 当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据;

接收逻辑

当 Channel 的 sendq 队列中包含处于等待状态的 Goroutine 时,该函数会取出队列头等待的 Goroutine,处理的逻辑和发送时相差无几,只是发送数据时调用的是 runtime.send 函数,而接收数据时使用 runtime.recv
当我们观察到 channel 没准备好接收:
  1. 非缓冲型,等待发送列队里没有 goroutine 在等待
  1. 缓冲型,但 buf 里没有元素
之后,又观察到 closed == 0,即 channel 未关闭。
因为 channel 不可能被重复打开,所以前一个观测的时候, channel 也是未关闭的,因此在这种情况下可以直接宣布接收失败,快速返回。因为没被选中,也没接收到数据,所以返回值为 (false, false)。
  • 接下来的操作,首先会上一把锁,粒度比较大。如果 channel 已关闭,并且循环数组 buf 里没有元素。对应非缓冲型关闭和缓冲型关闭但 buf 无元素的情况,返回对应类型的零值,但 received 标识是 false,告诉调用者此 channel 已关闭,你取出来的值并不是正常由发送者发送过来的数据。但是如果处于 select 语境下,这种情况是被选中了的。很多将 channel 用作通知信号的场景就是命中了这里。
  • 接下来,如果有等待发送的队列,说明 channel 已经满了,要么是非缓冲型的 channel,要么是缓冲型的 channel,但 buf 满了。这两种情况下都可以正常接收数据。
runtime.recv 的实现比较复杂:
该函数会根据缓冲区的大小分别处理不同的情况:
  • 如果 Channel 不存在缓冲区;
      1. 调用 runtime.recvDirect 将 Channel 发送队列中 Goroutine 存储的 elem 数据拷贝到目标内存地址中;
  • 如果 Channel 存在缓冲区;
      1. 将队列中的数据拷贝到接收方的内存地址;
      1. 将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方;
无论发生哪种情况,运行时都会调用 runtime.goready 将当前处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。
从发送队列中获取数据
从发送队列中获取数据
上图展示了 Channel 在缓冲区已经没有空间并且发送队列中存在等待的 Goroutine 时,运行 <-ch 的执行过程。发送队列头的 runtime.sudog 中的元素会替换接收索引 recvx 所在位置的元素,原有的元素会被拷贝到接收数据的变量对应的内存空间上。
 
如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove 将缓冲区中的数据拷贝到内存中、清除队列中的数据并完成收尾工作。
从缓冲区中接接收数据
从缓冲区中接接收数据
收尾工作包括递增 recvx,一旦发现索引超过了 Channel 的容量时,会将它归零重置循环队列的索引;除此之外,该函数还会减少 qcount 计数器并释放持有 Channel 的锁。
 

小结

我们梳理一下从 Channel 中接收数据时可能会发生的五种情况:
  1. 如果 Channel 为空,那么会直接调用 runtime.gopark 挂起当前 Goroutine;
  1. 如果 Channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回;
  1. 如果 Channel 的 sendq 队列中存在挂起的 Goroutine,会将 recvx 索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 Goroutine 的数据拷贝到缓冲区;
  1. 如果 Channel 的缓冲区中包含数据,那么直接读取 recvx 索引对应的数据;
  1. 在默认情况下会挂起当前的 Goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒;
我们总结一下从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:
  1. 当 Channel 为空时;
  1. 当缓冲区中不存在数据并且也不存在数据的发送者时;
 

案例分析

从 channel 接收和向 channel 发送数据的过程我们均会使用下面这个例子来进行说明:
首先创建了一个无缓冲的 channel,接着启动两个 goroutine,并将前面创建的 channel 传递进去。然后,向这个 channel 中发送数据 3,最后 sleep 1 秒后程序退出。
程序第 14 行 ch := make(chan int) 创建了一个非缓冲型的 channel,我们只看 chan 结构体中的一些重要字段,来从整体层面看一下 chan 的状态,一开始什么都没有。
接着,第 15、16 行分别创建了一个 goroutine,各自执行了一个接收操作。通过前面的源码分析,我们知道,这两个 goroutine (后面称为 G1 和 G2 好了)都会被阻塞在接收操作。G1 和 G2 会挂在 channel 的 recq 队列中,形成一个双向循环链表。
在程序的 17 行之前,chan 的整体数据结构如下:
notion image
buf 指向一个长度为 0 的数组,qcount 为 0,表示 channel 中没有元素。重点关注 recvq 和 sendq,它们是 waitq 结构体,而 waitq 实际上就是一个双向链表,链表的元素是 sudog,里面包含 g 字段,g 表示一个 goroutine,所以 sudog 可以看成一个 goroutine。recvq 存储那些尝试读取 channel 但被阻塞的 goroutine,sendq 则存储那些尝试写入 channel,但被阻塞的 goroutine。
此时,我们可以看到,recvq 里挂了两个 goroutine,也就是前面启动的 G1 和 G2。因为没有 goroutine 接收,而 channel 又是无缓冲类型,所以 G1 和 G2 被阻塞。sendq 没有被阻塞的 goroutine。
recvq 的数据结构如下:
notion image
再从整体上来看一下 chan 此时的状态:
notion image
G1 和 G2 被挂起了,状态是 WAITING。关于 goroutine 调度器这块不是今天的重点,当然后面肯定会写相关的文章。这里先简单说下,goroutine 是用户态的协程,由 Go runtime 进行管理,作为对比,内核线程由 OS 进行管理。Goroutine 更轻量,因此我们可以轻松创建数万 goroutine。
一个内核线程可以管理多个 goroutine,当其中一个 goroutine 阻塞时,内核线程可以调度其他的 goroutine 来运行,内核线程本身不会阻塞。这就是通常我们说的 M:N 模型:
notion image
M:N 模型通常由三部分构成:M、P、G。M 是内核线程,负责运行 goroutine;P 是 context,保存 goroutine 运行所需要的上下文,它还维护了可运行(runnable)的 goroutine 列表;G 则是待运行的 goroutine。M 和 P 是 G 运行的基础。
notion image
继续回到例子。假设我们只有一个 M,当 G1(go goroutineA(ch)) 运行到 val := <- a 时,它由本来的 running 状态变成了 waiting 状态(调用了 gopark 之后的结果):
notion image
G1 脱离与 M 的关系,但调度器可不会让 M 闲着,所以会接着调度另一个 goroutine 来运行:
notion image
G2 也是同样的遭遇。现在 G1 和 G2 都被挂起了,等待着一个 sender 往 channel 里发送数据,才能得到解救。

关闭管道的流程

编译器会将用于关闭管道的 close 关键字转换成 OCLOSE 节点以及 runtime.closechan 函数。
当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接崩溃并抛出异常:

逻辑实现

处理完了这些异常的情况之后就可以开始执行关闭 Channel 的逻辑了,下面这段代码的主要工作就是将 recvq 和 sendq 两个队列中的数据加入到 Goroutine 列表 gList 中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素:
close 逻辑比较简单,对于一个 channel,recvq 和 sendq 中分别保存了阻塞的发送者和接收者。关闭 channel 后,对于等待接收者而言,会收到一个相应类型的零值。对于等待发送者,会直接 panic。所以,在不了解 channel 还有没有接收者的情况下,不能贸然关闭 channel。
close 函数先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒。
唤醒之后,该干嘛干嘛。sender 会继续执行 chansend 函数里 goparkunlock 函数之后的代码,很不幸,检测到 channel 已经关闭了,panic。receiver 则比较幸运,进行一些扫尾工作后,返回。这里,selected 返回 true,而返回值 received 则要根据 channel 是否关闭,返回不同的值。如果 channel 关闭,received 为 false,否则为 true。这我们分析的这种情况下,received 返回 false。
 

从已经关闭的 channel 仍能读出数据?

从一个有缓冲的 channel 里读数据,当 channel 被关闭,依然能读出有效值。只有当返回的 ok 为 false 时,读出的数据才是无效的。
 
先创建了一个有缓冲的 channel,向其发送一个元素,然后关闭此 channel。之后两次尝试从 channel 中读取数据,第一次仍然能正常读出值。第二次返回的 ok 为 false,说明 channel 已关闭,且通道里没有数据。

小结

操作情况总结

操作
nil channel
closed channel
not nil, not closed channel
close
panic
panic
正常关闭
读 <- ch
阻塞
读到对应类型的零值
阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞
写 ch <-
阻塞
panic
阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞
总结一下,发生 panic 的情况有三种:向一个关闭的 channel 进行写操作;关闭一个 nil 的 channel;重复关闭一个 channel。读、写一个 nil channel 都会被阻塞。
 

优雅的关闭 Channel

  1. 在不改变 channel 自身状态的情况下,无法获知一个 channel 是否关闭。
  1. 关闭一个 closed channel 会导致 panic。所以,如果关闭 channel 的一方在不知道 channel 是否处于关闭状态时就去贸然关闭 channel 是很危险的事情。
  1. 向一个 closed channel 发送数据会导致 panic。所以,如果向 channel 发送数据的一方不知道 channel 是否处于关闭状态时就去贸然向 channel 发送数据是很危险的事情。
一个比较粗糙的检查 channel 是否关闭的函数:
存在几个问题:
  1. IsClosed 函数是一个有副作用的函数。每调用一次,都会读出 channel 里的一个元素,改变了 channel 的状态。
  1. IsClosed 函数返回的结果仅代表调用那个瞬间,并不能保证调用之后会不会有其他 goroutine 对它进行了一些操作(如 close),改变了它的这种状态。
有一条广泛流传的关闭 channel 的原则:
don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.
不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel。
比较好理解,向 channel 发送元素的就是 sender,因此 sender 可以决定何时不发送数据,并且关闭 channel。但是如果有多个 sender,某个 sender 同样没法确定其他 sender 的情况,这时也不能贸然关闭 channel。
但是上面所说的并不是最本质的,最本质的原则就只有一条:
don’t close (or send values to) closed channels.
有两个不那么优雅地关闭 channel 的方法:
  1. 使用 defer-recover 机制,放心大胆地关闭 channel 或者向 channel 发送数据。即使发生了 panic,有 defer-recover 在兜底。
  1. 使用 sync.Once 来保证只关闭一次。
 
那到底应该如何优雅地关闭 channel?
根据 sender 和 receiver 的个数,分下面几种情况:
  1. 一个 sender,一个 receiver
  1. 一个 sender, M 个 receiver
  1. N 个 sender,一个 reciver
  1. N 个 sender, M 个 receiver
对于 1,2,只有一个 sender 的情况就不用说了,直接从 sender 端关闭就好了,没有问题。重点关注第 3,4 种情况。
第 3 种情形下,优雅关闭 channel 的方法是:the only receiver says “please stop sending more” by closing an additional signal channel。
解决方案就是增加一个传递关闭信号的 channel,receiver 通过信号 channel 下达关闭数据 channel 指令。senders 监听到关闭信号后,停止发送数据。代码如下:
这里的 stopCh 就是信号 channel,它本身只有一个 sender,因此可以直接关闭它。senders 收到了关闭信号后,select 分支 “case <- stopCh” 被选中,退出函数,不再发送数据。
需要说明的是,上面的代码并没有明确关闭 dataCh。在 Go 语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 gc 回收。所以,在这种情形下,所谓的优雅地关闭 channel 就是不关闭 channel,让 gc 代劳。
 
最后一种情况,优雅关闭 channel 的方法是:any one of them says “let’s end the game” by notifying a moderator to close an additional signal channel。
和第 3 种情况不同,这里有 M 个 receiver,如果直接还是采取第 3 种解决方案,由 receiver 直接关闭 stopCh 的话,就会重复关闭一个 channel,导致 panic。因此需要增加一个中间人,M 个 receiver 都向它发送关闭 dataCh 的“请求”,中间人收到第一个请求后,就会直接下达关闭 dataCh 的指令(通过关闭 stopCh,这时就不会发生重复关闭的情况,因为 stopCh 的发送方只有中间人一个)。另外,这里的 N 个 sender 也可以向中间人发送关闭 dataCh 的请求。
代码里 toStop 就是中间人的角色,使用它来接收 senders 和 receivers 发送过来的关闭 dataCh 请求。
这里将 toStop 声明成了一个 缓冲型的 channel。假设 toStop 声明的是一个非缓冲型的 channel,那么第一个发送的关闭 dataCh 请求可能会丢失。因为无论是 sender 还是 receiver 都是通过 select 语句来发送请求,如果中间人所在的 goroutine 没有准备好,那 select 语句就不会选中,直接走 default 选项,什么也不做。这样,第一个关闭 dataCh 的请求就会丢失。
如果,我们把 toStop 的容量声明成 Num(senders) + Num(receivers),那发送 dataCh 请求的部分可以改成更简洁的形式:
直接向 toStop 发送请求,因为 toStop 容量足够大,所以不用担心阻塞,自然也就不用 select 语句再加一个 default case 来避免阻塞。
可以看到,这里同样没有真正关闭 dataCh,原因同第 3 种情况。
 

Channel 发送和接受元素的本质是什么?

Channel 发送和接收元素的本质是什么?
All transfer of value on the go channels happens with the copy of value.
就是说 channel 的发送和接收操作本质上都是 “值的拷贝”,无论是从 sender goroutine 的栈到 chan buf,还是从 chan buf 到 receiver goroutine,或者是直接从 sender goroutine 到 receiver goroutine。
 

Channel 是否会引起资源泄露?

Channel 可能会引发 goroutine 泄漏。
泄漏的原因是 goroutine 操作 channel 后,处于发送或接收阻塞状态,而 channel 处于满或空的状态,一直得不到改变。同时,垃圾回收器也不会回收此类资源,进而导致 gouroutine 会一直处于等待队列中,不见天日。
另外,程序运行过程中,对于一个 channel,如果没有任何 goroutine 引用了,gc 会对其进行回收操作,不会引起内存泄漏。
 

Channel 中的 happend-before 关系

维基百科上给的定义:
In computer science, the happened-before relation (denoted: ->) is a relation between the result of two events, such that if one event should happen before another event, the result must reflect that, even if those events are in reality executed out of order (usually to optimize program flow).
简单来说就是如果事件 a 和事件 b 存在 happened-before 关系,即 a -> b,那么 a,b 完成后的结果一定要体现这种关系。由于现代编译器、CPU 会做各种优化,包括编译器重排、内存重排等等,在并发代码里,happened-before 限制就非常重要了。
根据晃岳攀老师在 Gopher China 2019 上的并发编程分享,关于 channel 的发送(send)、发送完成(send finished)、接收(receive)、接收完成(receive finished)的 happened-before 关系如下:
  1. 第 n 个 send 一定 happened before 第 n 个 receive finished,无论是缓冲型还是非缓冲型的 channel。
  1. 对于容量为 m 的缓冲型 channel,第 n 个 receive 一定 happened before 第 n+m 个 send finished
  1. 对于非缓冲型的 channel,第 n 个 receive 一定 happened before 第 n 个 send finished
  1. channel close 一定 happened before receiver 得到通知。
我们来逐条解释一下。
第一条,我们从源码的角度看也是对的,send 不一定是 happened before receive,因为有时候是先 receive,然后 goroutine 被挂起,之后被 sender 唤醒,send happened after receive。但不管怎样,要想完成接收,一定是要先有发送。
第二条,缓冲型的 channel,当第 n+m 个 send 发生后,有下面两种情况:
若第 n 个 receive 没发生。这时,channel 被填满了,send 就会被阻塞。那当第 n 个 receive 发生时,sender goroutine 会被唤醒,之后再继续发送过程。这样,第 n 个 receive 一定 happened before 第 n+m 个 send finished
若第 n 个 receive 已经发生过了,这直接就符合了要求。
第三条,也是比较好理解的。第 n 个 send 如果被阻塞,sender goroutine 挂起,第 n 个 receive 这时到来,先于第 n 个 send finished。如果第 n 个 send 未被阻塞,说明第 n 个 receive 早就在那等着了,它不仅 happened before send finished,它还 happened before send。
第四条,回忆一下源码,先设置完 closed = 1,再唤醒等待的 receiver,并将零值拷贝给 receiver。
 
我们直接来看例子:
先定义了一个 done channel 和一个待打印的字符串。在 main 函数里,启动一个 goroutine,等待从 done 里接收到一个值后,执行打印 msg 的操作。如果 main 函数中没有 <-done 这行代码,打印出来的 msg 为空,因为 aGoroutine 来不及被调度,还来不及给 msg 赋值,主程序就会退出。而在 Go 语言里,主协程退出时不会等待其他协程。
加了 <-done 这行代码后,就会阻塞在此。等 aGoroutine 里向 done 发送了一个值之后,才会被唤醒,继续执行打印 msg 的操作。而这在之前,msg 已经被赋值过了,所以会打印出 hello, world
这里依赖的 happened before 就是前面讲的第一条。第一个 send 一定 happened before 第一个 receive finished,即 done <- true 先于 <-done 发生,这意味着 main 函数里执行完 <-done 后接着执行 println(msg) 这一行代码时,msg 已经被赋过值了,所以会打印出想要的结果。
进一步利用前面提到的第 3 条 happened before 规则,修改一下代码:
同样可以得到相同的结果,为什么?根据第三条规则,对于非缓冲型的 channel,第一个 receive 一定 happened before 第一个 send finished。也就是说, 在 done <- true 完成之前,<-done 就已经发生了,也就意味着 msg 已经被赋上值了,最终也会打印出 hello, world
 

Channel 应用场景

  1. 停止信号
  1. 任务定时:有时候,需要执行某项操作,但又不想它耗费太长的时间
  1. 解耦生产方和消费方
  1. 控制并发数:有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太高,因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控制并发数。
 
Channel 是 Go 语言能够提供强大并发能力的原因之一,在这里探索了 Channel 的设计原理、数据结构以及发送数据、接收数据和关闭 Channel 这些基本操作,相信能够更好地理解 Channel 的工作原理。
 
 
参考:
  1. https://golang.design/go-questions/channel/csp/
  1. https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/

© Craig Hart 2021 - 2025