#### golang channel的结构 ####

📅 2026/6/24 6:41:21
#### golang channel的结构 ####
严正声明部分摘自深入理解Golang之channel - 掘金仅做个人备份浏览请看原文目录1、前言2、channel概述2.1、## 用法2.2、## 几个注意点2.3、unbuffered channels 与 buffered channels2.4、channel的遍历2.4.1、## for range2.4.2、## for select3、channel原理3.1、## 数据结构3.2、编译分析3.3、创建3.4、发送3.4.1、## 存在等待接收的Goroutine3.4.2、## 当缓冲区未满时​编辑3.4.3、## 阻塞发送3.5、接收3.5.1、## 存在等待发送的Goroutine3.5.2、## 缓冲区buf中还有数据3.5.3、## 阻塞接收3.6、## 关闭1、前言Golang在并发编程上有两大利器分别是channel和goroutineGo语言在sync包中提供了传统的锁机制但更推荐使用channel来解决并发问题。Channel常常结合go select使用select思想来源于网络IO模型中的select本质上也是IO多路复用只不过这里的IO是基于channel而不是基于网络同时go select也有一些自己不同的特性每个case都必须是一个通信所有channel表达式都会被求值所有被发送的表达式都会被求值如果任意某个通信可以进行它就执行其他被忽略。如果有多个case都可以运行select会伪随机选出一个执行。其他不会执行。否则执行default子句(如果有)如果没有default字句select将阻塞直到某个通信可以运行Go不会重新对channel或值进行求值。2、channel概述2.1、## 用法func main() { ch : make(chan int, 1) // 创建一个类型为int缓冲区大小为1的channel ch - 2 // 将2发送到ch n, ok : - ch // n接收从ch发出的值 if ok { fmt.Println(n) // 2 } close(ch) // 关闭channel }2.2、##几个注意点channel关闭后不可以继续向channel发送消息向一个已经关闭的channel发送消息会引发运行时恐慌panic但可以继续从channel接收消息当channel关闭并且缓冲区为空时继续从从channel接收消息会得到一个对应类型的零值。向一个nil channel发送消息会一直阻塞。2.3、unbuffered channels 与 buffered channels【unbuffered channels】是指缓冲区大小为0的channel这种channel的接收者会阻塞直至接收到消息发送者会阻塞直至接收者接收到消息这种机制可以用于两个goroutine进行状态同步。【buffered channels】拥有缓冲区当缓冲区已满时发送者会阻塞当缓冲区为空时接收者会阻塞。2.4、channel的遍历2.4.1、## for rangechannel支持 for range 的方式进行遍历。在遍历时如果channel 没有关闭那么会一直等待下去。如果在遍历时channel已经关闭那么在遍历完数据后自动退出遍历。func main() { ci : make(chan int, 5) for i : 1; i 5; i { ci - i } close(ci) for i : range ci { fmt.Println(i) } }2.4.2、## for selectselect中有case代码块用于channel发送或接收消息。任意一个case代码块准备好时执行其对应内容多个case代码块准备好时随机选择一个case代码块并执行所有case代码块都没有准备好则等待还可以有一个default代码块所有case代码块都没有准备好时执行default代码块。func main() { ci : make(chan int, 2) for i : 1; i 2; i { ci - i } close(ci) cs : make(chan string, 2) cs - hi cs - golang close(cs) ciClosed, csClosed : false, false for { if ciClosed csClosed { return } select { case i, ok : -ci: if ok { fmt.Println(i) } else { ciClosed true fmt.Println(ci closed) } case s, ok : -cs: if ok { fmt.Println(s) } else { csClosed true fmt.Println(cs closed) } default: fmt.Println(waiting...) } } }3、channel原理源码地址go/chan.go at master · golang/go · GitHub包路径/runtime/chan.go3.1、## 数据结构channel中有把互斥锁往channel里写数据时会拿锁首先概述channel的缓冲区其实是一个环形队列qcount 表示队列中元素的数量。dataqsiz 表示环形队列的总大小。buf 表示一个指向循环数组的指针sendx、recvx 分别用来标识当前发送和接收的元素在循环数组中的位置。recvq、sendq 是2个列表分别用于存储当前处于等待接收和等待发送的Goroutine。即一个是存往这个chan里写数据而卡死等待的协程一个是存从这个chan里取数据而卡死等待的协程。后续在对此chan进行发送和接受时会有机会唤醒卡在recvq、sendq里的协程。看channel的结构体type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 // channel元素大小 closed uint32 // 是否已经关闭 elemtype *_type // element type channel中元素的类型 sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters lock mutex }再看一下waitq的数据结构type waitq struct { first *sudog last *sudog } // 以下是sudog结构体包路径为/runtime/runtime2.go type sudog struct { // 当前goroutine g *g // isSelect indicates g is participating in a select, so // g.selectDone must be CASd to win the wake-up race. isSelect bool next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }其中sudog表示处于等待列表中的Goroutine封装包含了一些上下文信息first和last分别指向等待列表的首位的Goroutine。3.2、编译分析在分析channel的原理之前我们先使用go tool分析以下代码看看channel的各种操作在底层调用了什么运行时方法ch : make(chan int, 2) ch - 2 ch - 1 -ch n, ok : -ch if ok { fmt.Println(n) } close(ch)编译go build test.go go tool objdump -s main\.main test | grep CALL把CALL过滤出来3.3、创建从上面的编译分析可以看出在创建channel时调用了运行时方法makechan:func makechan(t *chantype, size int) *hchan { elem : t.elem // compiler checks this but be safe. if elem.size 116 { throw(makechan: invalid channel element type) } if hchanSize%maxAlign ! 0 || elem.align maxAlign { throw(makechan: bad alignment) } // 计算缓冲区需要的总大小缓冲区大小*元素大小并判断是否超出最大可分配范围 mem, overflow : math.MulUintptr(elem.size, uintptr(size)) if overflow || mem maxAlloc-hchanSize || size 0 { panic(plainError(makechan: size out of range)) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoGs are referenced from their owning thread so they cant be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem 0: // 缓冲区大小为0或者channel中元素大小为0struct{}{}时只需分配channel必需的空间即可 // Queue or element size is zero. c (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf c.raceaddr() case elem.ptrdata 0: // 通过位运算知道channel中元素类型不是指针分配一片连续内存空间所需空间等于 缓冲区数组空间 hchan必需的空间。 // Elements do not contain pointers. // Allocate hchan and buf in one call. c (*hchan)(mallocgc(hchanSizemem, nil, true)) c.buf add(unsafe.Pointer(c), hchanSize) default: // 元素中包含指针为hchan和缓冲区分别分配空间 // Elements contain pointers. c new(hchan) c.buf mallocgc(mem, elem, true) } c.elemsize uint16(elem.size) c.elemtype elem c.dataqsiz uint(size) if debugChan { print(makechan: chan, c, ; elemsize, elem.size, ; dataqsiz, size, \n) } return c }makechan的代码逻辑还是比较简单的首先校验元素类型和缓冲区空间大小然后创建hchan分配所需空间。这里有三种情况当缓冲区大小为0或者channel中元素大小为0时只需分配channel必需的空间即可当channel元素类型不是指针时则只需要为hchan和缓冲区分配一片连续内存空间空间大小为缓冲区数组空间加上hchan必需的空间默认情况缓冲区包含指针则需要为hchan和缓冲区分别分配内存。最后更新hchan的其他字段包括elemsizeelemtypedataqsiz。3.4、发送channel的发送操作调用了运行时方法chansend1, 在chansend1内部又调用了chansend直接来看chansend的实现func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // channel为nil if c nil { // 如果是非阻塞直接返回发送不成功 if !block { return false } // 否则当前Goroutine阻塞挂起 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw(unreachable) } if debugChan { print(chansend: chan, c, \n) } if raceenabled { racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not closed, we observe that the channel is // not ready for sending. Each of these observations is a single word-sized read // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel). // Because a closed channel cannot transition from ready for sending to // not ready for sending, even if the channel is closed between the two observations, // they imply a moment between the two when the channel was both not yet closed // and not ready for sending. We behave as if we observed the channel at that moment, // and report that the send cannot proceed. // // It is okay if the reads are reordered here: if we observe that the channel is not // ready for sending and then observe that it is not closed, that implies that the // channel wasnt closed during the first observation. // 对于非阻塞且channel未关闭如果无缓冲区且没有等待接收的Goroutine或者有缓冲区且缓冲区已满那么都直接返回发送不成功 if !block c.closed 0 ((c.dataqsiz 0 c.recvq.first nil) || (c.dataqsiz 0 c.qcount c.dataqsiz)) { return false } var t0 int64 if blockprofilerate 0 { t0 cputicks() } // 加锁 lock(c.lock) // 如果channel已关闭 if c.closed ! 0 { // 解锁直接panic unlock(c.lock) panic(plainError(send on closed channel)) } // 除了以上情况当channel未关闭时就有以下几种情况 // 1、当存在等待接收的Goroutine if sg : c.recvq.dequeue(); sg ! nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). // 那么直接把正在发送的值发送给等待接收的Goroutine send(c, sg, ep, func() { unlock(c.lock) }, 3) return true } // 2、当缓冲区未满时 if c.qcount c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. // 获取指向缓冲区数组中位于sendx位置的元素的指针 qp : chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } // 将当前发送的值拷贝到缓冲区 typedmemmove(c.elemtype, qp, ep) // sendx索引长度 c.sendx // 因为是循环队列sendx等于队列长度时置为0 if c.sendx c.dataqsiz { c.sendx 0 } // 队列中元素总数加一并解锁返回发送成功 c.qcount unlock(c.lock) return true } // 3、当既没有等待接收的Goroutine缓冲区也没有剩余空间如果是非阻塞的发送那么直接解锁返回发送失败 if !block { unlock(c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. // 4、如果是阻塞发送那么就将当前的Goroutine打包成一个sudog结构体并加入到channel的发送队列sendq里 gp : getg() mysg : acquireSudog() mysg.releasetime 0 if t0 ! 0 { mysg.releasetime -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem ep mysg.waitlink nil mysg.g gp mysg.isSelect false mysg.c c gp.waiting mysg gp.param nil c.sendq.enqueue(mysg) // 调用goparkunlock将当前Goroutine设置为等待状态并解锁进入休眠等待被唤醒 gopark(chanparkcommit, unsafe.Pointer(c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs arent considered as roots of the // stack tracer. KeepAlive(ep) // someone woke us up. // 被唤醒之后执行清理工作并释放sudog结构体 if mysg ! gp.waiting { throw(G waiting list is corrupted) } gp.waiting nil gp.activeStackChans false if gp.param nil { if c.closed 0 { throw(chansend: spurious wakeup) } panic(plainError(send on closed channel)) } gp.param nil if mysg.releasetime 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c nil releaseSudog(mysg) return true }chansend的执行逻辑上面的注释已经写得很清楚了我们再来梳理一下。对于非阻塞发送或者channel已经关闭条件下的几种发送失败的情况处理逻辑比较简单读者可以对照注释来看这里我们重点关注channel未关闭时几种常规情况3.4.1、## 存在等待接收的Goroutine如果等待接收的队列recvq中存在Goroutine那么直接把正在发送的值发送给等待接收的Goroutine对应recvq和sendq。示意图如下具体看一下send方法func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ... if sg.elem ! nil { // 将发送的值直接拷贝到接收值比如v -ch 中的v的内存地址 sendDirect(c.elemtype, sg, ep) sg.elem nil } // 获取等待接收数据的Goroutine gp : sg.g unlockf() gp.param unsafe.Pointer(sg) if sg.releasetime ! 0 { sg.releasetime cputicks() } // 唤醒之前等待接收数据的Goroutine goready(gp, skip1) }这里有必要说明一下Goroutine在调度过程中的几种状态_Gidle iota // goroutine刚刚分配还没有初始化_Grunnable // goroutine处于运行队列中, 还没有运行没有自己的栈_Grunning // goroutine在运行中拥有自己的栈被分配了M(线程)和P(调度上下文)_Gsyscall // goroutine在执行系统调用_Gwaiting // goroutine被阻塞_Gdead // goroutine没有被使用可能是刚刚退出或者正在初始化中_Gcopystack // 表示g当前的栈正在被移除并分配新栈当调用goready时将Goroutine的状态从 _Gwaiting置为_Grunnable等待下一次调度再次执行。3.4.2、## 当缓冲区未满时当缓冲区未满时找到sendx所指向的缓冲区数组的位置将正在发送的值拷贝到该位置并增加sendx索引以及释放锁示意图如下3.4.3、## 阻塞发送如果是阻塞发送那么就将当前的Goroutine打包成一个sudog结构体并加入到channel的发送队列sendq里。示意图如下之后则调用goparkunlock将当前Goroutine设置为_Gwaiting状态并解锁进入阻塞状态等待被唤醒调用goready如果被调度器唤醒执行清理工作并最终释放对应的sudog结构体。3.5、接收channel的接收有两种形式-ch n, ok : -ch这两种方式分别调用运行时方法chanrecv1和chanrecv2:// entry points for - c from compiled code //go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } //go:nosplit func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received chanrecv(c, elem, true) return }这两个方法最终都会调用chanrecv方法func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // raceenabled: dont need to check ep, as it is always on the stack // or is new memory allocated by reflect. if debugChan { print(chanrecv: chan, c, \n) } // channel为nil if c nil { // 非阻塞直接返回false, false if !block { return } // 阻塞接收则当前Goroutine阻塞挂起 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw(unreachable) } // Fast path: check for failed non-blocking operation without acquiring the lock. // 非阻塞模式对于以下两种情况 // 1、无缓冲区且等待发送队列也为空 // 2、有缓冲区但缓冲区数组为空且channel未关闭 // 这两种情况都是接收失败, 直接返回false, false if !block (c.dataqsiz 0 c.sendq.first nil || c.dataqsiz 0 atomic.Loaduint(c.qcount) 0) atomic.Load(c.closed) 0 { return } var t0 int64 if blockprofilerate 0 { t0 cputicks() } // 加锁 lock(c.lock) // 如果channel已关闭并且缓冲区无元素 if c.closed ! 0 c.qcount 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(c.lock) // 有等待接收的变量即 v -ch中的v if ep ! nil { //根据channel元素的类型清理ep对应地址的内存即ep接收了channel元素类型的零值 typedmemclr(c.elemtype, ep) } // 返回true, false即接收到值但不是从channel中接收的有效值 return true, false } // 除了以上非常规情况还有有以下几种常见情况 // 1、等待发送的队列sendq里存在Goroutine那么有两种情况当前channel无缓冲区或者当前channel已满 if sg : c.sendq.dequeue(); sg ! nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add senders value to the tail of the queue (both map to // the same buffer slot because the queue is full). // 如果无缓冲区那么直接从sender接收数据否则从buf队列的头部接收数据并把sender的数据加到buf队列的尾部 recv(c, sg, ep, func() { unlock(c.lock) }, 3) // 接收成功 return true, true } // 2、缓冲区buf中有元素 if c.qcount 0 { // Receive directly from queue // 从recvx指向的位置获取元素 qp : chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep ! nil { // 将从buf中取出的元素拷贝到当前协程 typedmemmove(c.elemtype, ep, qp) } // 同时将取出的数据所在的内存清空 typedmemclr(c.elemtype, qp) // 接收索引 c.recvx if c.recvx c.dataqsiz { c.recvx 0 } // buf元素总数 -1 c.qcount-- // 解锁返回接收成功 unlock(c.lock) return true, true } // 3、非阻塞模式且没有数据可以接受 if !block { // 解锁直接返回接收失败 unlock(c.lock) return false, false } // no sender available: block on this channel. // 4、阻塞模式获取当前Goroutine打包一个sudog gp : getg() mysg : acquireSudog() mysg.releasetime 0 if t0 ! 0 { mysg.releasetime -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem ep mysg.waitlink nil gp.waiting mysg mysg.g gp mysg.isSelect false mysg.c c gp.param nil // 加入到channel的等待接收队列recvq中 c.recvq.enqueue(mysg) // 挂起当前Goroutine设置为_Gwaiting状态并解锁进入休眠等待被唤醒 gopark(chanparkcommit, unsafe.Pointer(c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up // 被唤醒之后执行清理工作并释放sudog结构体 if mysg ! gp.waiting { throw(G waiting list is corrupted) } gp.waiting nil gp.activeStackChans false if mysg.releasetime 0 { blockevent(mysg.releasetime-t0, 2) } closed : gp.param nil gp.param nil mysg.c nil releaseSudog(mysg) return true, !closed }chanrecv方法的处理逻辑与chansend非常类似我们这里仍然只分析几种常见情况其他情况上述注释也解释得比较清楚了读者可对照相应代码和注释查看。3.5.1、## 存在等待发送的Goroutine如果等待发送的队列sendq里存在挂起的Goroutine那么有两种情况当前channel无缓冲区或者当前channel已满。从sendq中取出最先阻塞的Goroutine然后调用recv方法func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz 0 { // 无缓冲区 if raceenabled { racesync(c, sg) } if ep ! nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { // 缓冲区已满 // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. qp : chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) } // copy data from queue to receiver if ep ! nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx if c.recvx c.dataqsiz { c.recvx 0 } c.sendx c.recvx // c.sendx (c.sendx1) % c.dataqsiz } sg.elem nil gp : sg.g unlockf() gp.param unsafe.Pointer(sg) if sg.releasetime ! 0 { sg.releasetime cputicks() } // 将等待发送数据的Goroutine的状态从_Gwaiting置为 _Grunnable等待下一次调度 goready(gp, skip1) }1、如果无缓冲区那么直接从sender接收数据2、如果缓冲区已满从buf队列的头部接收数据并把sender的数据加到buf队列的尾部3、最后调用goready函数将等待发送数据的Goroutine的状态从_Gwaiting置为_Grunnable等待下一次调度。下图示意了当缓冲区已满时的处理过程3.5.2、## 缓冲区buf中还有数据如果缓冲区buf中还有元素那么就走正常的接收将从buf中取出的元素拷贝到当前协程的接收数据目标内存地址中。值得注意的是即使此时channel已经关闭仍然可以正常地从缓冲区buf中接收数据。这种情况比较简单示意图就不画了。3.5.3、## 阻塞接收如果是阻塞模式且当前没有数据可以接收那么就需要将当前Goroutine打包成一个sudog加入到channel的等待接收队列recvq中将当前Goroutine的状态置为_Gwaiting等待唤醒。示意图如下如果之后当前Goroutine被调度器唤醒则执行清理工作并最终释放对应的sudog结构体。3.6、## 关闭1、关闭channel时会遍历recvq和sendq实际只有recvq或者sendq取出sudog中挂起的Goroutine加入到glist列表中并清除sudog上的一些信息和状态。2、然后遍历glist列表为每个Goroutine调用goready函数将所有Goroutine置为_Grunnable状态等待调度。3、当Goroutine被唤醒之后会继续执行chansend和chanrecv函数中当前Goroutine被唤醒后的剩余逻辑。func closechan(c *hchan) { // nil channel检查 if c nil { panic(plainError(close of nil channel)) } lock(c.lock) // 已关闭的channel不能再次关闭 if c.closed ! 0 { unlock(c.lock) panic(plainError(close of closed channel)) } if raceenabled { callerpc : getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } // 设置关闭状态为1 c.closed 1 var glist gList // release all readers // 遍历recvq清除sudog的数据取出其中处于_Gwaiting状态的Goroutine加入到glist中 for { sg : c.recvq.dequeue() if sg nil { break } if sg.elem ! nil { typedmemclr(c.elemtype, sg.elem) sg.elem nil } if sg.releasetime ! 0 { sg.releasetime cputicks() } gp : sg.g gp.param nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) // 遍历sendq清除sudog的数据取出其中处于_Gwaiting状态的Goroutine加入到glist中 for { sg : c.sendq.dequeue() if sg nil { break } sg.elem nil if sg.releasetime ! 0 { sg.releasetime cputicks() } gp : sg.g gp.param nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(c.lock) // Ready all Gs now that weve dropped the channel lock. // 将glist中所有Goroutine的状态置为_Grunnable等待调度器进行调度 for !glist.empty() { gp : glist.pop() gp.schedlink 0 goready(gp, 3) } }