模型三部分组成:
发送等待队列接收等待队列管道 缓存区
定义
在 runtime
的chan.go
中
type hchan struct {qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16elemtype *_type // element type // 上面的这几个组成了一个环形缓存区closed uint32 // 关闭状态 // 下面4个组成了2个队列 g的sendx uint // send indexrecvx uint // receive indexrecvq waitq // list of recv waiterssendq waitq // list of send waiters// lock protects all fields in hchan, as well as severallock mutex // 锁,用于在操作 channel的 元素时候,保证并发安全 }
特殊缓存区
qcount 已经存储的个数dataqsiz 环形队列缓存的容量,即允许缓存的消息最大个数buf 指向这个缓存区的地址elemsize 元素的大小elemtype 元素的类型
设计成这样,主要目的不需要gc来清理,因为环形机构,会自动把删除数据内存给占了。
环形缓存可以大幅降低GC的开销
两个队列
这里还要看下 waitq
的定义
type waitq struct { first *sudog last *sudog }// 在sema中也有用到,就g结构体的封装type sudog struct { g *g next *sudogprev *sudogelem unsafe.Pointer // 接受参数的地址 // 当是接收g时候,如果有缓存中有数据,直接把数据拷贝到这个地址
}
注意:这里的sendx 并不是指向发送队列中的g,而且发送队列应该写入环形缓存区的index,
同理,recvx也是,指向接受数据的g,应该从缓冲区的那个index取数据
互斥锁
lock mutex
互斥锁并不是排队发送/接收数据
不是让发送和接收队列来排队的,这些发送和接收数据的队列,休眠也不是在锁的sema里
互斥锁保护的hchan结构体本身
所以, Channel并不是无锁的
状态值
closed uint32 // 关闭状态
0为开启、1为关闭
当一个关闭的channel,再往里写或者重复关闭、就会panic。但是可以读。
发送数据
c<- 关键字是一个语法糖编译阶段,会把C<- 转化为 ruintime.chansend1chansend1 会调用 chansend0 方法
直接发送 (已经有接收队列在等)
发送数据前,己经有G在休眠等待接收此时缓存肯定是空的,不用考虑缓存将数据直接拷贝给G的接收变量,唤醒G
实现
// entry point for c <- x from compiled code.func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())}// 部分源码func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { //有加锁,如果这时候,再来一个g也发送,就会休眠去 sema队列了lock(&c.lock)if c.closed != 0 { // 如果已经关闭,就会报错,上面讲过给一个关闭的chan发送会panicunlock(&c.lock)panic(plainError("send on closed channel"))} // 从接收队列里面拿一个 gif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any). // 上面的注释讲的很清楚,直接把值给接收者,绕过 channel的buffersend(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}}func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 和之前讲的 sudog的elem对上了 if sg.elem != nil { // 直接把数据拷贝到 接收者的 elem中sendDirect(c.elemtype, sg, ep)sg.elem = nil}gp := sg.gunlockf()gp.param = unsafe.Pointer(sg)sg.success = trueif sg.releasetime != 0 {sg.releasetime = cputicks()} // 唤醒这个协程 接收者的ggoready(gp, skip+1)}
步骤:
从队列里取出一个等待接收的G将数据直接拷贝到接收变量中唤醒G,接收者的g
放入缓存
没有G在休眠等待,但是有缓存空间将数据放入缓存
实现
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 上面是直接发送的源码,被截除 // 当缓存队列存入的数量 小于 缓存的容量,就是还有缓存空间if c.qcount < c.dataqsiz { // 缓存区接受数据的地址qp := chanbuf(c, c.sendx)// 将数据拷贝过去typedmemmove(c.elemtype, qp, ep) // 指示下一个发送数据,存在那个缓冲区,这里有个逻辑下面讲c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0} // 元素个数加1,释放锁并返回c.qcount++unlock(&c.lock)return true} } qp := chanbuf(c, c.sendx) // chanbuf(c, i) is pointer to the i'th slot in the buffer // 返回缓冲区的 第几个槽 // 写入缓存区 c.sendx++ 意味着这里使用 sendx 来指引下一个发送的数据,写到几个槽, 所以才有下面,如果满了,就从0开始,形成环形 if c.sendx == c.dataqsiz { c.sendx = 0 }
整个逻辑比较清晰:
获取可存入的缓存地址 存入数据 维护索引
休眠等待
没有G在休眠等待,而旦没有缓存或满了自己进入发送队列,休眠等待
实现
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 接着 直接放入缓存的代码 // Block on the channel. Some receiver will complete our operation for us.gp := getg() mysg := acquireSudog() // 把自己组装成一个 sudog结构体mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}mysg.elem = ep // 要发送的数据 也是放到这个 elem中的mysg.waitlink = nilmysg.g = gp // sudug 的g等于自己的g结构体mysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nilc.sendq.enqueue(mysg)gp.parkingOnChan.Store(true)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) // 进入waitReasonChanSend 休眠 // 然后去 gopark 休眠了,这个方法在将 协程切换的时候,讲过 // 协程到这里就 休眠了,不继续执行了,直到被唤醒。}
步骤:
把自己包装成sudog sudog放入sendq队列 休眠并解锁 被唤醒后,数据已经被取走,维护其他数据 (下面讲 接收时候讲)
发送小结
编译阶段,会把<-转化为 runtime.chansend10
直接发送时,将数据直接拷贝到目标变量
放入缓存时,将数据放入环形缓存,成功返回
休眠等待时,将自己包装后放入sendp, 休眠
接收<-c 关键字
<-c 关键字是一个语法糖编译阶段,i<-C转化为 runtime.chanrecv()编译阶段,i, ok<-c转化为 runtime.chanrecv()最后会调用 chanrecv() 方法
无缓存区、有发送协程在等待
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {if c == nil { // 如果读一个 block 为true的 channel ,协程会直接休眠, // 正常读channel这个 block都是 trueif !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)throw("unreachable")} // 中间删了一段 block 为 false的情况lock(&c.lock)if c.closed != 0 { // 如果channel已经关闭if c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep) // 有数据 放回数据}return true, false // 没数据,返回一个 false}} else {if sg := c.sendq.dequeue(); sg != nil { // 如果发送队列中有grecv(c, sg, ep, func() { unlock(&c.lock) }, 3) // 调用这个 recv方法return true, true}}}func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if c.dataqsiz == 0 { // 无缓存区if ep != nil { // copy data from senderrecvDirect(c.elemtype, sg, ep) // 直接把 传过来的g 的数据取走}} // 给g跟新下参数 sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) // 唤醒g,这时候,发送的数据已经被取走了}func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {src := sg.elemtypeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)memmove(dst, src, t.Size_)}
步骤:
判断有G 在发送队列等待,进入recv() 判断此 Channel 无缓存 直接从等待的 G 中取走数据,唤醒G
这里有两个地方要注意,代码中也标记了:
如果channel为nil,再去读会直接休眠阻塞。这里只的是 block为 true的读,block为false的情况后面讲,正常channe都是true
如果channel close了, 去读有值返回值,没值返回 false
有等待的g,缓存区满了
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 上面是缓存区为0的情况 qp := chanbuf(c, c.recvx) // 取缓存区的数据,用的recvx 标记,和开头的总结对应上了 // ep 就是 a <-c ,这个a的地址,如果a是nil,说明传递的值,没有用,只是要个时机// ep may be nil, in which case received data is ignored.if ep != nil {typedmemmove(c.elemtype, ep, qp)} // 将发送队列中休眠的这个g的数据 拷贝到了 缓存去// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)c.recvx++ // 取数据的地址增加if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz // 唤醒了这个 发送数据的g,因为这个g的数据已经放到了缓存区,不用休眠等待了 sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
步骤:
接收数据前,已经有G 在休眠等待发送 而且这个 Channel 有缓存 从缓存取走一个数据 将休眠G 的数据放进缓存,唤醒G
接收缓存,没有发送g在等待
直接从 缓存区拿数据走就行
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx) // 取出数据if raceenabled {racenotify(c, c.recvx, nil)} // 上面讲过 也许接收的变量为空if ep != nil {typedmemmove(c.elemtype, ep, qp) // 数据拷贝过去}typedmemclr(c.elemtype, qp)//clr 是 clear的意思 清理缓存区已经取走的这个数据的空间c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}}
步骤:
判断没有 G 在发送队列等待 判断此 Channel 有缓存 从缓存中取走一个数据
接收阻塞
比较多用在不让协程退出,除非收到 context的cancel消息等。
没有 G 在休眠等待,而旦没有缓存或缓存空
自己进入接收队列,休眠等待
代码实现
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // no sender available: block on this channel. gp := getg() mysg := acquireSudog() //把自己包装成 sudog mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep // 接收数据的地址,拷贝到了elem mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // 加入了等待接收队列 gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) // 休眠 }
这里和上面的发送时候,已经有等待接收的g,对上了。
步骤:
判断没有 G 在发送队列等待判断此 Channel 无缓存将自己包装成 sudogsudog 放入接收等待队列,休眠唤醒时,发送的 G 已经把数据拷贝到位
接收总结:
编译阶段,<-C 会转化为 chanrecv()有等待的G,旦无缓存时,从G 接收有等待的 G,且有缓存时,从缓存接收无等待的 G,且缓存有数据,从缓存接收无等待的 G,且缓存无数据,等待喂数据
看上面代码时候,讲过一般使用时候,那个block
是true
的,什么情况下block
为false
,下篇聊。