-
Notifications
You must be signed in to change notification settings - Fork 78
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
leo yang
committed
Oct 11, 2019
1 parent
6ed0c8b
commit cd89c82
Showing
77 changed files
with
16,715 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,397 @@ | ||
# Go Channel | ||
|
||
[toc] | ||
|
||
## hchan 数据结构 | ||
|
||
``` | ||
// channel 在 runtime 中的结构体 | ||
type hchan struct { | ||
// 队列中目前的元素计数 | ||
qcount uint // total data in the queue | ||
// 环形队列的总大小,ch := make(chan int, 10) => 就是这里这个 10 | ||
dataqsiz uint // size of the circular queue | ||
// void * 的内存 buffer 区域 | ||
buf unsafe.Pointer // points to an array of dataqsiz elements | ||
// sizeof chan 中的数据 | ||
elemsize uint16 | ||
// runtime._type,代表 channel 中的元素类型的 runtime 结构体 | ||
elemtype *_type // element type | ||
// 是否已被关闭 | ||
closed uint32 | ||
// 发送索引 | ||
sendx uint // send index | ||
// 接收索引 | ||
recvx uint // receive index | ||
// 接收 goroutine 对应的 sudog 队列 | ||
recvq waitq // list of recv waiters | ||
// 发送 goroutine 对应的 sudog 队列 | ||
sendq waitq // list of send waiters | ||
lock mutex | ||
} | ||
``` | ||
|
||
## 初始化 | ||
|
||
- 如果当前 Channel 中不存在缓冲区,那么就只会为 hchan 分配一段内存空间; | ||
- 如果当前 Channel 中存储的类型不是指针类型,就会直接为当前的 Channel 和底层的数组分配一块连续的内存空间; | ||
- 在默认情况下会单独为 hchan 和缓冲区分配内存; | ||
|
||
``` | ||
func makechan(t *chantype, size int) *hchan { | ||
elem := t.elem | ||
// 如果 hchan 中的元素不包含有指针,那么就没什么和 GC 相关的信息了 | ||
var c *hchan | ||
switch { | ||
case size == 0 || elem.size == 0: | ||
// 如果 channel 的缓冲区大小是 0: var a = make(chan int) | ||
// 或者 channel 中的元素大小是 0: struct{}{} | ||
// Queue or element size is zero. | ||
c = (*hchan)(mallocgc(hchanSize, nil, true)) | ||
// Race detector uses this location for synchronization. | ||
c.buf = unsafe.Pointer(c) | ||
case elem.kind&kindNoPointers != 0: | ||
// Elements do not contain pointers. | ||
// Allocate hchan and buf in one call. | ||
// 通过位运算知道 channel 中的元素不包含指针 | ||
// 占用的空间比较容易计算 | ||
// 直接用 元素数*元素大小 + channel 必须的空间就行了 | ||
// 这种情况下 gc 不会对 channel 中的元素进行 scan | ||
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true)) | ||
c.buf = add(unsafe.Pointer(c), hchanSize) | ||
default: | ||
// Elements contain pointers. | ||
// 和上面那个 case 的写法的区别:调用了两次分配空间的函数 new/mallocgc | ||
c = new(hchan) | ||
c.buf = mallocgc(uintptr(size)*elem.size, elem, true) | ||
} | ||
c.elemsize = uint16(elem.size) | ||
c.elemtype = elem | ||
c.dataqsiz = uint(size) | ||
return c | ||
} | ||
``` | ||
|
||
## send 发送 | ||
|
||
- 直接发送:如果目标 Channel 没有被关闭并且已经有处于读等待的 Goroutine,那么chansend 函数会通过 dequeue 从 recvq 中取出最先陷入等待的 Goroutine 并直接向它发送数据; | ||
- 缓冲区:向 Channel 中发送数据时遇到的第二种情况就是创建的 Channel 包含缓冲区并且 Channel 中的数据没有装满. 在这里我们首先会使用 chanbuf 计算出下一个可以放置待处理变量的位置,然后通过 typedmemmove 将发送的消息拷贝到缓冲区中并增加 sendx 索引和 qcount 计数器,在函数的最后会释放持有的锁。 | ||
- 阻塞发送: | ||
- 调用 getg 获取发送操作时使用的 Goroutine 协程; | ||
- 执行 acquireSudog 函数获取一个 sudog 结构体并设置这一次阻塞发送的相关信息,例如发送的 Channel、是否在 Select 控制结构中、发送数据所在的地址等; | ||
- 将刚刚创建并初始化的 sudog 结构体加入 sendq 等待队列,并设置到当前 Goroutine 的 waiting 上,表示 Goroutine 正在等待该 sudog 准备就绪; | ||
- 调用 goparkunlock 函数将当前的 Goroutine 更新成 Gwaiting 状态并解锁,该 Goroutine 可以被调用 goready 再次唤醒; | ||
|
||
``` | ||
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { | ||
lock(&c.lock) | ||
if sg := c.recvq.dequeue(); sg != nil { | ||
// 寻找一个等待中的 receiver | ||
// 越过 channel 的 buffer | ||
// 直接把要发的数据拷贝给这个 receiver | ||
// 然后就返 | ||
send(c, sg, ep, func() { unlock(&c.lock) }, 3) | ||
return true | ||
} | ||
// qcount 是 buffer 中已塞进的元素数量 | ||
// dataqsize 是 buffer 的总大小 | ||
// 说明还有余量 | ||
if c.qcount < c.dataqsiz { | ||
// Space is available in the channel buffer. Enqueue the element to send. | ||
qp := chanbuf(c, c.sendx) | ||
// 将 goroutine 的数据拷贝到 buffer 中 | ||
typedmemmove(c.elemtype, qp, ep) | ||
c.sendx++ | ||
// 环形队列,所以如果已经加到最大了,就回 0 | ||
if c.sendx == c.dataqsiz { | ||
c.sendx = 0 | ||
} | ||
// 将 buffer 的元素计数 +1 | ||
c.qcount++ | ||
unlock(&c.lock) | ||
return true | ||
} | ||
// 在 channel 上阻塞,receiver 会帮我们完成后续的工作 | ||
gp := getg() | ||
mysg := acquireSudog() | ||
mysg.releasetime = 0 | ||
// 打包 sudog | ||
mysg.elem = ep | ||
mysg.waitlink = nil | ||
mysg.g = gp | ||
mysg.isSelect = false | ||
mysg.c = c | ||
gp.waiting = mysg | ||
gp.param = nil | ||
// 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中 | ||
c.sendq.enqueue(mysg) | ||
// 将这个发送 g 从 Grunning -> Gwaiting | ||
// 进入休眠 | ||
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) | ||
// 这里是被唤醒后要执行的代码 | ||
KeepAlive(ep) | ||
gp.waiting = nil | ||
gp.param = nil | ||
mysg.c = nil | ||
releaseSudog(mysg) | ||
return true | ||
} | ||
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { | ||
// receiver 的 sudog 已经在对应区域分配过空间 | ||
// 我们只要把数据拷贝过去 | ||
if sg.elem != nil { | ||
sendDirect(c.elemtype, sg, ep) | ||
sg.elem = nil | ||
} | ||
gp := sg.g | ||
unlockf() | ||
gp.param = unsafe.Pointer(sg) | ||
// Gwaiting -> Grunnable | ||
goready(gp, skip+1) | ||
} | ||
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { | ||
// src is on our stack, dst is a slot on another stack. | ||
// Once we read sg.elem out of sg, it will no longer | ||
// be updated if the destination's stack gets copied (shrunk). | ||
// So make sure that no preemption points can happen between read & use. | ||
dst := sg.elem | ||
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) | ||
// No need for cgo write barrier checks because dst is always | ||
// Go memory. | ||
memmove(dst, src, t.size) | ||
} | ||
``` | ||
|
||
## receive 接收 | ||
|
||
- 直接接收:当 Channel 的 sendq 队列中包含处于等待状态的 Goroutine 时,我们其实就会直接取出队列头的 Goroutine,这里处理的逻辑和发送时所差无几,只是发送数据时调用的是 send 函数,而这里是 recv 函数 | ||
- 缓冲区:另一种接收数据时遇到的情况就是,Channel 的缓冲区中已经包含了一些元素,在这时如果使用 <-ch 从 Channel 中接收元素,我们就会直接从缓冲区中 recvx 的索引位置中取出数据进行处理 | ||
- 阻塞接收:当 Channel 的 sendq 队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作在大多数时候就会变成一个阻塞的操作. | ||
|
||
``` | ||
func chanrecv1(c *hchan, elem unsafe.Pointer) { | ||
chanrecv(c, elem, true) | ||
} | ||
//go:nosplit | ||
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { | ||
_, received = chanrecv(c, elem, true) | ||
return | ||
} | ||
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { | ||
lock(&c.lock) | ||
// sender 队列中有 sudog 在等待 | ||
// 直接从该 sudog 中获取数据拷贝到当前 g 即可 | ||
if sg := c.sendq.dequeue(); sg != nil { | ||
recv(c, sg, ep, func() { unlock(&c.lock) }, 3) | ||
return true, true | ||
} | ||
if c.qcount > 0 { | ||
// Receive directly from queue | ||
qp := chanbuf(c, c.recvx) | ||
// 直接从 buffer 里拷贝数据 | ||
if ep != nil { | ||
typedmemmove(c.elemtype, ep, qp) | ||
} | ||
typedmemclr(c.elemtype, qp) | ||
// 接收索引 +1 | ||
c.recvx++ | ||
if c.recvx == c.dataqsiz { | ||
c.recvx = 0 | ||
} | ||
// buffer 元素计数 -1 | ||
c.qcount-- | ||
unlock(&c.lock) | ||
return true, true | ||
} | ||
// no sender available: block on this channel. | ||
gp := getg() | ||
mysg := acquireSudog() | ||
mysg.releasetime = 0 | ||
if t0 != 0 { | ||
mysg.releasetime = -1 | ||
} | ||
// No stack splits between assigning elem and enqueuing mysg | ||
// on gp.waiting where copystack can find it. | ||
// 打包成 sudog | ||
mysg.elem = ep | ||
mysg.waitlink = nil | ||
gp.waiting = mysg | ||
mysg.g = gp | ||
mysg.isSelect = false | ||
mysg.c = c | ||
gp.param = nil | ||
// 进入 recvq 队列 | ||
c.recvq.enqueue(mysg) | ||
// Grunning -> Gwaiting | ||
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) | ||
// someone woke us up | ||
// 被唤醒 | ||
if mysg != gp.waiting { | ||
throw("G waiting list is corrupted") | ||
} | ||
gp.waiting = nil | ||
if mysg.releasetime > 0 { | ||
blockevent(mysg.releasetime-t0, 2) | ||
} | ||
closed := gp.param == nil | ||
gp.param = nil | ||
mysg.c = nil | ||
releaseSudog(mysg) | ||
// 如果 channel 未被关闭,那就是真的 recv 到数据了 | ||
return true, !closed | ||
} | ||
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { | ||
if c.dataqsiz == 0 { | ||
if ep != nil { | ||
// copy data from sender | ||
recvDirect(c.elemtype, sg, ep) | ||
} | ||
} else { | ||
// Queue is full. Take the item at the | ||
// head of the queue. Make the sender enqueue | ||
// its item at the tail of the queue. Since the | ||
// queue is full, those are both the same slot. | ||
qp := chanbuf(c, c.recvx) | ||
// copy data from queue to receiver | ||
if ep != nil { | ||
typedmemmove(c.elemtype, ep, qp) | ||
} | ||
// 虽然数据已经给了接受者,但是还是要在 chan 中记录一下 | ||
typedmemmove(c.elemtype, qp, sg.elem) | ||
c.recvx++ | ||
if c.recvx == c.dataqsiz { | ||
c.recvx = 0 | ||
} | ||
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz | ||
} | ||
sg.elem = nil | ||
gp := sg.g | ||
unlockf() | ||
gp.param = unsafe.Pointer(sg) | ||
if sg.releasetime != 0 { | ||
sg.releasetime = cputicks() | ||
} | ||
// Gwaiting -> Grunnable | ||
goready(gp, skip+1) | ||
} | ||
``` | ||
|
||
## close 关闭 | ||
|
||
在函数执行的最后会为所有被阻塞的 Goroutine 调用 goready 函数重新对这些协程进行调度. | ||
|
||
``` | ||
func closechan(c *hchan) { | ||
// 上锁,这个锁的粒度比较大,一直到释放完所有的 sudog 才解锁 | ||
lock(&c.lock) | ||
c.closed = 1 | ||
var glist *g | ||
// release all readers | ||
for { | ||
sg := c.recvq.dequeue() | ||
// 弹出的 sudog 是 nil | ||
// 说明读队列已经空了 | ||
if sg == nil { | ||
break | ||
} | ||
// sg.elem unsafe.Pointer,指向 sudog 的数据元素 | ||
// 该元素可能在堆上分配,也可能在栈上 | ||
if sg.elem != nil { | ||
// 释放对应的内存 | ||
typedmemclr(c.elemtype, sg.elem) | ||
sg.elem = nil | ||
} | ||
if sg.releasetime != 0 { | ||
sg.releasetime = cputicks() | ||
} | ||
// 将 goroutine 入 glist | ||
// 为最后将全部 goroutine 都 ready 做准备 | ||
gp := sg.g | ||
gp.param = nil | ||
gp.schedlink.set(glist) | ||
glist = gp | ||
} | ||
// release all writers (they will panic) | ||
// 将所有挂在 channel 上的 writer 从 sendq 中弹出 | ||
// 该操作会使所有 writer panic | ||
for { | ||
sg := c.sendq.dequeue() | ||
if sg == nil { | ||
break | ||
} | ||
sg.elem = nil | ||
if sg.releasetime != 0 { | ||
sg.releasetime = cputicks() | ||
} | ||
// 将 goroutine 入 glist | ||
// 为最后将全部 goroutine 都 ready 做准备 | ||
gp := sg.g | ||
gp.param = nil | ||
gp.schedlink.set(glist) | ||
glist = gp | ||
} | ||
// 在释放所有挂在 channel 上的读或写 sudog 时 | ||
// 是一直在临界区的 | ||
unlock(&c.lock) | ||
// Ready all Gs now that we've dropped the channel lock. | ||
for glist != nil { | ||
gp := glist | ||
glist = glist.schedlink.ptr() | ||
gp.schedlink = 0 | ||
// 使 g 的状态切换到 Grunnable | ||
goready(gp, 3) | ||
} | ||
} | ||
``` |
Oops, something went wrong.