探究Golang channel的底层实现

数据结构

channel 的底层是一个叫做 hchan 的结构体。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type hchan struct {
  //channel分为无缓冲和有缓冲两种。
  //对于有缓冲的channel存储数据,借助的是如下循环数组的结构
	qcount   uint           // 循环数组中的元素数量
	dataqsiz uint           // 循环数组的长度
	buf      unsafe.Pointer // 指向底层循环数组的指针
	elemsize uint16 //能够收发元素的大小
  

	closed   uint32   //channel是否关闭的标志
	elemtype *_type //channel中的元素类型
  
  //有缓冲channel内的缓冲数组会被作为一个“环型”来使用。
  //当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和写的下标位置
	sendx    uint   // 下一次发送数据的下标位置
	recvx    uint   // 下一次读取数据的下标位置
  
  //当循环数组中没有数据时,收到了接收请求,那么接收数据的变量地址将会写入读等待队列
  //当循环数组中数据已满时,收到了发送请求,那么发送数据的变量地址将写入写等待队列
	recvq    waitq  // 读等待队列
	sendq    waitq  // 写等待队列


	lock mutex //互斥锁,保证读写channel时不存在并发竞争问题
}

创建 channel

golang runtime 调用mallocgc()来创建 channel,在堆上开辟内存空间,channel 本身会被 GC 自动回收。

channel 发送数据&接收数据

前置检查

步骤一:

先上锁,保证线程安全。

步骤二:

再次检查 channel 是否关闭,如果关闭则抛出 panic。如果加锁成功并且 channel 未关闭,则开始发送。

同步

g1 同步发送数据到 g2 时,检查recvq队列是否有 g2 在等待读取数据,如果有,直接 copy 数据到目标地址。

1

如果没有,阻塞自己(把自己打包为sudog,放在sendq队列,执行gopark()将g1阻塞,让出cpu使用权),然后激活g2(调用goready()将等待接收的阻塞 goroutine 的状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable),g2激活从sendq队列读取g1发送的数据。

需要强调的是,通道并不提供跨 goroutine 的数据访问保护机制。如果通过通道传输数据的一份副本,那么每个 goroutine 都持有一份副本,各自对自己的副本做修改是安全的。当传输的是指向数据的指针时,如果读和写是由不同的 goroutine 完成的,那么每个 goroutine 依旧需要额外的同步操作。

异步

异步发送时,会利用hchan的buf环形数组,无论是发送数据还是读取数据都不会等待对方。然而如果发送时buf满了,或者读取时buf空了,也会作出上文的阻塞操作。

2

当G1向ch里发送数据时,首先会对buf加锁,然后将数据copy到buf中,然后sendx++,然后释放对buf的锁。 当G2消费ch的时候,会首先对buf加锁,然后将buf中的数据copy到task变量对应的内存里,然后recvx++,并释放锁。

可以发现整个过程,G1和G2没有共享的内存,底层是通过hchan结构体的buf,并使用copy内存的方式进行通信,最后达到了共享内存的目的,这里也体现了Go中的CSP并发模型。

优雅关闭 channel

  1. 关闭已经关闭的 Channel 会导致 panic。(如果channel为nil也会panic)
  2. 写入已经关闭的 Channel 会导致 panic。(同上)

优雅关闭 Channel 的方式就 2 种:

  1. context
  2. done channel

本节聊聊 done channel 的做法:假设有多个生产者,有多个消费者。在生产者和消费者之间增加一个额外的辅助控制channel,用来传递关闭信号

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
type session struct {
	done     chan struct{}
	doneOnce sync.Once
	data     chan int
}

func (sess *session) Serve() {
	go sess.loopRead()
	sess.loopWrite()
}

func (sess *session) loopRead() {
	defer func() {
		if err := recover(); err != nil {
			sess.doneOnce.Do(func() { close(sess.done) })
		}
	}()

	var err error
	for {
		select {
		case <-sess.done:
			return
		default:
		}

		if err == io.ErrUnexpectedEOF || err == io.EOF {
			goto failed
		}
	}
failed:
	sess.doneOnce.Do(func() { close(sess.done) })
}

func (sess *session) loopWrite() {
	defer func() {
		if err := recover(); err != nil {
			sess.doneOnce.Do(func() { close(sess.done) })
		}
	}()

	var err error
	for {
		select {
		case <-sess.done:
			return
		case sess.data <- rand.Intn(100):
		}
		
		if err != nil {
			goto done
		}
	}
done:
	if err != nil {
		log("sess: loop write failed: %v, %s", err, sess)
	}
}

func (sess *session) ForceClose() {
	sess.doneOnce.Do(func() { close(sess.done) })
}

总结一下 done channel 的做法:消费者利用辅助的 done channel 发送信号,并先开始退出协程。生产者接收到 done channel 的信号,也开始退出协程。最终 data channel 无人持有,被 gc 回收关闭。

消费者侧发送关闭 done channel,由于消费者有多个,如果每一个都关闭 done channel,会导致 panic。所以这里用 doneOnce.Do() 保证只会关闭 done channel 一次。

3


原文地址:探究Golang channel的底层实现

Buy me a coffee~
室长 支付宝支付宝
室长 微信微信
0%