如果你因为某种原因从接收端(receiver side)关闭channel或者在多个发送者中的一个关闭channel,那么你应该使用列在Golang panic/recover Use Cases的函数来安全地发送值到channel中(假设channel的元素类型是T)
1 2 3 4 5 6 7 8 9 10 11 12
funcSafeSend(ch chan T, value T)(closed bool) { deferfunc() { ifrecover() != nil { // the return result can be altered // in a defer function call closed = true } }() ch <- value // panic if ch is closed returnfalse// <=> closed = false; return }
上面的SaveSend函数有一个缺点是,在select语句的case关键字后不能作为发送操作被调用(译者注:类似于 case SafeSend(ch, t):)。另外一个缺点是,很多人,包括我自己都觉得上面通过使用panic/recover和sync包的方案不够优雅。针对各种场景,下面介绍不用使用panic/recover和sync包,纯粹是利用channel的解决方案。 (在下面的例子总,sync.WaitGroup只是用来让例子完整的。它的使用在实践中不一定一直都有用)
funcmain() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const MaxRandomNumber = 100000 const NumReceivers = 100 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chanint, 100) // the sender gofunc() { for { if value := rand.Intn(MaxRandomNumber); value == 0 { // the only sender can close the channel safely. close(dataCh) return } else { dataCh <- value } } }() // receivers for i := 0; i < NumReceivers; i++ { gofunc() { defer wgReceivers.Done() // receive values until dataCh is closed and // the value buffer queue of dataCh is empty. for value := range dataCh { log.Println(value) } }() } wgReceivers.Wait() }
funcmain() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const MaxRandomNumber = 100000 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(1) // ... dataCh := make(chanint, 100) stopCh := make(chanstruct{}) // stopCh is an additional signal channel. // Its sender is the receiver of channel dataCh. // Its reveivers are the senders of channel dataCh. // senders for i := 0; i < NumSenders; i++ { gofunc() { for { value := rand.Intn(MaxRandomNumber) select { case <- stopCh: return case dataCh <- value: } } }() } // the receiver gofunc() { defer wgReceivers.Done() for value := range dataCh { if value == MaxRandomNumber-1 { // the receiver of the dataCh channel is // also the sender of the stopCh cahnnel. // It is safe to close the stop channel here. close(stopCh) return } log.Println(value) } }() // ... wgReceivers.Wait() }
funcmain() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const MaxRandomNumber = 100000 const NumReceivers = 10 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chanint, 100) stopCh := make(chanstruct{}) // stopCh is an additional signal channel. // Its sender is the moderator goroutine shown below. // Its reveivers are all senders and receivers of dataCh. // 转注: 这里为toStop增加1个缓冲空间, 是为了避免当moderator协程还没准备好时,sender或者receiver就已经要向toStop发送数据, 从而导致数据丢失的问题.(因为向toStop发送数据的代码是写在select里,堵塞时会直接跳过并执行default分支) toStop := make(chanstring, 1) // the channel toStop is used to notify the moderator // to close the additional signal channel (stopCh). // Its senders are any senders and receivers of dataCh. // Its reveiver is the moderator goroutine shown below. var stoppedBy string // moderator gofunc() { stoppedBy = <- toStop // part of the trick used to notify the moderator // to close the additional signal channel. close(stopCh) }() // senders for i := 0; i < NumSenders; i++ { gofunc(id string) { for { value := rand.Intn(MaxRandomNumber) if value == 0 { // here, a trick is used to notify the moderator // to close the additional signal channel. select { case toStop <- "sender#" + id: default: } return } // the first select here is to try to exit the // goroutine as early as possible. // 转注: 这里我觉得是没什么用的,因为此处的代码是并发执行的,第一个select执行时进入default分支,但是在第二个select里,出现两个分支都满足的情况 select { case <- stopCh: return default: } select { case <- stopCh: return case dataCh <- value: } } }(strconv.Itoa(i)) } // receivers for i := 0; i < NumReceivers; i++ { gofunc(id string) { defer wgReceivers.Done() for { // same as senders, the first select here is to // try to exit the goroutine as early as possible. select { case <- stopCh: return default: } select { case <- stopCh: return case value := <-dataCh: if value == MaxRandomNumber-1 { // the same trick is used to notify the moderator // to close the additional signal channel. select { case toStop <- "receiver#" + id: default: } return } log.Println(value) } } }(strconv.Itoa(i)) } // ... wgReceivers.Wait() log.Println("stopped by", stoppedBy) }