type hchan struct {
qcount uint // total data in the queue 队列中的所有数据数
dataqsiz uint // size of the circular queue 环形队列的大小
buf unsafe.Pointer // points to an array of dataqsiz elements 指向大小为 dataqsiz 的数组
elemsize uint16 //元素大小
closed uint32 //是否关闭
elemtype *_type // element type 元素类型
sendx uint // send index 发送索引
recvx uint // receive index 接收索引
recvq waitq // list of recv waiters recv 等待列表,即( <-ch )
sendq waitq // list of send waiters send 等待列表,即( ch<- )
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
- buf里存放channel数据,是一个环形队列
- elemtype是channel里存放的数据类型,这里和反射一样用type类型存储
- sendx,recvx是接收和发送的偏移
- recvq,sendq是和channel相关的协程队列
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
如果向一个未初始化的channel写数据,会panic。
写已关闭channel
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
写已关闭channel也会panic。
写有等待者的channel
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
写阻塞channel
获取协程变量,将自己挂到channel的发送协程队列中,然后挂起自己,等待被接收协程唤醒。
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
读channel
读空channel
读空channel和写空channel一样,会导致fatal:
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
读已关闭且无数据channel
读关闭channel不会报错,但是如果channel中无数据,则直接返回
读有发送方的channel
当channel上有阻塞的发送者时,会从对端直接拷贝数据,不走buf,同时唤醒对方协程。
无等待者,有buf
从buf拷贝数据,返回
读buf为空的channel
将自己挂在等待协程队列中,同时挂起自己等待被发送者唤醒
关闭channel
首先对 Channel 上锁,而后依次将阻塞在 Channel 的 g 添加到一个 gList 中,当所有的 g 均从 Channel 上移除时,可释放锁,并唤醒 gList 中的所有接收方和发送方。