Go语言的channel有两种类型,一种是无缓存的channel,一种是有缓存的buffer,这两种类型的channel大家都比较熟悉了,但是对于有缓存的channel,它的缓存长度在创建channel的时候就已经确定了,中间不能扩缩容,这导致在一些场景下使用有问题,或者说不太适合特定的场景。
我为什么突然谈起这个无限缓存的channel呢?主要是我最近在review公司一位同事的代码时,我的某种设计思路出现了一个问题,如果使用无限缓存的channel的话,我的问题就会迎刃而解了。
这位同事的设计大概是这样子的:
- 一个dispatcher包含一个channel, 里面存放待处理的url
- 一堆worker从channel中读取任务,下载解析网页,并提取其中链接,再把链接放入到dispatcher.channel中
这位同事为了解决并发的问题,不得不使用了比较复杂的sync.Mutex和sync.Cond,并且定义了一堆并发的方法处理逻辑,这里我想谈谈我的错误想法。
我review这段代码的时候想,如果每个 worker启动一个goroutine,处理url,然后把链接再放入到channel中即可,不用复杂的 Mutex+Cond等,但是我犯了一个错误,那就是如果当前channel已经满了,那么这些worker都不能把解析的结果放入到channel中,都被"阻塞"住了,并且也没有可用的worker从channel中消费url。
当然,你可以说可以创建一个buffer非常大的channel,避免被塞满,但是,第一,buffer非常大的channel占用的内存也非常大,第二,多大合适?关键你不能保证channel不会满。
如果有一个无限缓存长度的buffer就好了。
2017年,有同学向Go官方提出这么一个需求(#20352),希望能够提供一个无限容量的buffer,经过冗长的讨论,Go不会为这个"稀有"的场景提供一种实现,并且建议大家实现这样的一个库,通过第三库的方式处理这种场景,而且Griesemer提供了一个思路,通过ringbuffer实现缓存来实现这样的channel。
网上有两种实现Why Go channels limit the buffer size和Building an Unbounded Channel in Go,这两种实现也比较类似,我在第一种实现的基础上,封装了一个库: chanx,来提供通用的无限缓存的channel。
chanx, 你可以star这个库,放入到你的代码库中,说不定哪一天它就可能帮你解燃眉之急。并且我已经准备好了泛型的设计,一旦Go泛型可用,我就会把它改成泛型的实现。
缓存无限的channel拥有下面的特性:
- 不会阻塞write。 它总是能处理write的数据,或者放入到待读取的channel中,或者放入到缓存中
- 无数据时read会被阻塞。当没有可读的数据时,从channel中读取的goroutine会被阻塞
- 读写都是通过channel操作。 内部的缓存不会暴露出来
- 能够查询当前待读取的数据数量。因为缓存中可能也有待处理的数据,所以需要返回len(buffer)+len(chan)
- 关闭channel后,还未读取的channel还是能够被读取,读取完之后才能发现channel已经完毕。这和正常的channel的逻辑是一样的,这种情况叫"drain"未读的数据
因为我们不能修改内部的channel结构,也不能重载 chan <- 和 <- chan 操作符,所以我们只能通过两个channel的方式封装一个数据结构,来提供读写。
这个数据结构为:
|
|
其中In
这个channel用来写入数据,而Out
这个channel用来读取数据。你可以close In这个channel,等所有的数据都读取完后,Out channel也会被自动关闭。 用户是不能自己关闭Out
这个channel的,你也关闭不了,因为它是<-chan
类型的。
你可以通过Len
方法得到所有待读取的数据的长度,也可以通过BufLen
只获取缓存中的数据的长度,不包含外发Out
channel中数据的长度。
|
|
那么重点来了,主要的逻辑的实现如下,我在代码中加了注释,通过注释和代码你就可以很好的理解整个的实现逻辑:
|
|
这一段的逻辑还是很清晰的,就是细节需要注意,它也是学习channel使用的一个很好的素材。
5月13日更新
今天正好出差,在高铁上漫长的5个多小时没有事情做,所以我拿起笔记本干了两件事情,其中之一就是对这个无限缓存的channel做了优化。
本身这个无限缓存的channel的设计非常简洁,唯一有一点我不太满意的是它的buffer不能重用, 这和ch.buffer = ch.buffer[1:]
的处理有关系,有时候明明底层的数组很大,但是还不得不重新生成新的数据,导致堆分配频次比较多。
依照Go三巨头之一的设计,底层buffer最好采用ringbuffer的实现方式,如果buffer满了应该能自动扩容:
Such a library should do well in cases of very fast, "bursty" messages. A large enough buffered channel should be able to absorb bursts while a fast dedicated goroutine drains the channel into a ring buffer from which the messages are delivered at a slower pace to the final consumer of the messages. That ring buffer will need to be efficiently implemented, and will need to be able to grow efficiently (irrespective of size) and that will require some careful engineering. Better to leave that code to a library that can be tuned as needed than baking it into the runtime (and then possibly being at the mercy of release cycles).
所以我又实现了一个ringbuffer,这个ringbuffer比较简单,原因在这里我们不需要考虑并发的问题,这个ringbuffer只会在一个goroutine使用,所以它的实现就非常的简单了,需要注意"读追上写",以及"写满"这两个边界问题就好了。通过使用ringbuffer,上面的实现就可以更改为下面的代码,可以进一步减少写爆发(burst)的时候分配过多的问题:
|
|