如何优雅地关闭Go语言的通道channel

如何优雅地关闭Go语言的通道channel

Go channel的关闭

一、Go channel设计和规范的问题

  1. 在不能更改 channel 状态的情况下,没有简单普遍的方式来检查 channel 是否已经关闭了
  2. 关闭已经关闭的 channel 会导致 panic,所以在 closer(关闭者)不知道 channel 是否已经关闭的情况下去关闭 channel 是很危险的
  3. 发送值到已经关闭的 channel 会导致 panic,所以如果 sender(发送者)在不知道 channel 是否已经关闭的情况下去向 channel 发送值是很危险的

是的,没有一个内置函数可以检查一个 channel 是否已经关闭。如果你能确定不会向 channel 发送任何值,那么也确实需要一个简单的方法来检查 channel 是否已经关闭:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import "fmt"

type T int

func IsClosed(ch <-chan T) bool {
select {
case <-ch:
return true
default:
}
return false
}

func main() {
c := make(chan T)
fmt.Println(IsClosed(c)) // false
close(c)
fmt.Println(IsClosed(c)) // true
}

上面已经提到了,没有一种适用的方式来检查 channel 是否已经关闭了。但是,就算有一个简单的 closed(chan T) bool 函数来检查 channel 是否已经关闭,它的用处还是很有限的,就像内置的 len 函数用来检查缓冲 channel 中元素数量一样。原因就在于,已经检查过的 channel 的状态有可能在调用了类似的方法返回之后就修改了,因此返回来的值已经不能够反映刚才检查的 channel 的当前状态了

尽管在调用 closed(ch) 返回 true 的情况下停止向 channel 发送值是可以的,但是如果调用 closed(ch) 返回 false,那么关闭 channel 或者继续向 channel 发送值就不安全了(会 panic)。

二、The Channel Closing Principle

在使用 Go channel 的时候,一个适用的原则是不要从接收端关闭 channel,也不要关闭有多个并发发送者的 channel。换句话说,如果 sender(发送者)只是 channel 唯一的 sender 或者是 channel 最后一个活跃的 sender,那么你应该在 sender 的 goroutine 关闭 channel,从而通知 receiver(s)(接收者们)已经没有值可以读了。维持这条原则将保证永远不会发生向一个已经关闭的 channel 发送值或者关闭一个已经关闭的 channel。

三、打破 channel closing principle 的解决方案

如果你因为某种原因从接收端(receiver side)关闭 channel 或者在多个发送者中的一个关闭 channel,那么你应该使用列在 Golang panic/recover Use Cases 的函数来安全地发送值到 channel 中(假设 channel 的元素类型是 T)。

1
2
3
4
5
6
7
8
9
10
11
12
func SafeSend(ch chan T, value T) (closed bool) {
defer func() {
if recover() != nil {
// the return result can be altered
// in a defer function call
closed = true
}
}()

ch <- value // panic if ch is closed
return false // <=> closed = false; return
}

如果 channel ch 没有被关闭的话,那么这个函数的性能将和 ch <- value 接近。对于 channel 关闭的时候,SafeSend 函数只会在每个 sender goroutine 中调用一次,因此程序不会有太大的性能损失。

同样的想法也可以用在从多个 goroutine 关闭 channel 中:

1
2
3
4
5
6
7
8
9
10
11
func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
justClosed = false
}
}()

// assume ch != nil here.
close(ch) // panic if ch is closed
return true
}

很多人喜欢用 sync.Once 来关闭 channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type MyChannel struct {
C chan T
once sync.Once
}

func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
mc.once.Do(func(){
close(mc.C)
})
}

当然了,我们也可以用 sync.Mutex 来避免多次关闭 channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type MyChannel struct {
C chan T
closed bool
mutex sync.Mutex
}

func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
mc.mutex.Lock()
if !mc.closed {
close(mc.C)
mc.closed = true
}
mc.mutex.Unlock()
}

func (mc *MyChannel) IsClosed() bool {
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.closed
}

我们应该要理解为什么 Go 不支持内置 SafeSend 和 SafeClose 函数,原因就在于并不推荐从接收端或者多个并发发送端关闭 channel。Golang 甚至禁止关闭只接收(receive-only)的 channel。

四、保持 channel closing principle 的优雅方案

上面的 SafeSend 函数有一个缺点,在 select 语句的 case 关键字后不能作为发送操作被调用(类似于 case SafeSend(ch, t):)。

  • M 个 receivers,一个 sender,sender 通过关闭 data channel 说“不再发送”

这是最简单的场景了,就只是当 sender 不想再发送的时候让 sender 关闭 data 来关闭 channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
"log"
"math/rand"
"sync"
"time"
)

func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)

// ...
const MaxRandomNumber = 100000
const NumReceivers = 100

wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)

// ...
dataCh := make(chan int, 100)

// the sender
go func() {
for {
if value := rand.Intn(MaxRandomNumber); value == 0 {
// the only sender can close the channel safely.
close(dataCh)
return
} else {
dataCh <- value
}
}
}()

// receivers
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()

// receive values until dataCh is closed and
// the value buffer queue of dataCh is empty.
for value := range dataCh {
log.Println(value)
}
}()
}

wgReceivers.Wait()
}
  • 一个 receiver,N 个 sender,receiver 通过关闭一个额外的 signal channel 说“请停止发送”

这种场景比上一个要复杂一点。我们不能让 receiver 关闭 data channel,因为这么做将会打破 channel closing principle。但是我们可以让 receiver 关闭一个额外的 signal channel 来通知 sender 停止发送值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

import (
"time"
"math/rand"
"sync"
"log"
)

func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)

// ...
const MaxRandomNumber = 100000
const NumSenders = 1000

wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)

// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel dataCh.
// Its reveivers are the senders of channel dataCh.

// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
value := rand.Intn(MaxRandomNumber)

select {
case <- stopCh:
return
case dataCh <- value:
}
}
}()
}

// the receiver
go func() {
defer wgReceivers.Done()

for value := range dataCh {
if value == MaxRandomNumber-1 {
// the receiver of the dataCh channel is
// also the sender of the stopCh cahnnel.
// It is safe to close the stop channel here.
close(stopCh)
return
}

log.Println(value)
}
}()

// ...
wgReceivers.Wait()
}
  • M 个 receiver,N 个 sender,它们当中任意一个通过通知一个 moderator(仲裁者)关闭额外的 signal channel 来说“让我们结束游戏吧”

这是最复杂的场景了。我们不能让任意的 receivers 和 senders 关闭 data channel,也不能让任何一个 receivers 通过关闭一个额外的 signal channel 来通知所有的 senders 和 receivers 退出游戏。这么做的话会打破 channel closing principle。但是,我们可以引入一个 moderator 来关闭一个额外的 signal channel。这个例子的一个技巧是怎么通知 moderator 去关闭额外的 signal channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main

import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)

func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)

// ...
const MaxRandomNumber = 100000
const NumReceivers = 10
const NumSenders = 1000

wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)

// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown below.
// Its reveivers are all senders and receivers of dataCh.
toStop := make(chan string, 1)
// the channel toStop is used to notify the moderator
// to close the additional signal channel (stopCh).
// Its senders are any senders and receivers of dataCh.
// Its reveiver is the moderator goroutine shown below.

var stoppedBy string

// moderator
go func() {
stoppedBy = <- toStop // part of the trick used to notify the moderator
// to close the additional signal channel.
close(stopCh)
}()

// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(MaxRandomNumber)
if value == 0 {
// here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}

// the first select here is to try to exit the
// goroutine as early as possible.
select {
case <- stopCh:
return
default:
}

select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}

// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()

for {
// same as senders, the first select here is to
// try to exit the goroutine as early as possible.
select {
case <- stopCh:
return
default:
}

select {
case <- stopCh:
return
case value := <-dataCh:
if value == MaxRandomNumber-1 {
// the same trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "receiver#" + id:
default:
}
return
}

log.Println(value)
}
}
}(strconv.Itoa(i))
}

// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}

在这个例子中,仍然遵守着 channel closing principle。请注意 channel toStop 的缓冲大小是1,这是为了避免当 mederator goroutine 准备好之前第一个通知就已经发送了,导致丢失

参考文章:
如何优雅地关闭Go channel

评论

:D 一言句子获取中...

加载中,最新评论有1分钟缓存...