Go Channel 应用模式

目录 [−]

  1. Lock/TryLock 模式
    1. Hacked Lock/TryLock 模式
    2. TryLock By Channel
    3. TryLock with Timeout
  2. Or Channel 模式
    1. Goroutine方式
    2. Reflect方式
    3. 递归方式
  3. Or-Done-Channel模式
  4. 扇入模式
    1. Goroutine方式
    2. Reflect
    3. 递归方式
  5. Tee模式
    1. Goroutine方式
    2. Reflect方式
  6. 分布模式
    1. Goroutine方式
    2. Reflect方式
  7. eapache
    1. Distribute
    2. Tee
    3. Multiplex
    4. Pipe
  8. 集合操作
    1. skip
      1. skipN
      2. skipFn
      3. skipWhile
    2. take
      1. takeN
      2. takeFn
      3. takeWhile
    3. flat
    4. map
    5. reduce
  9. 总结
  10. 参考资料

Channel是Go中的一种类型,和goroutine一起为Go提供了并发技术, 它在开发中得到了广泛的应用。Go鼓励人们通过Channel在goroutine之间传递数据的引用(就像把数据的owner从一个goroutine传递给另外一个goroutine), Effective Go总结了这么一句话:

Do not communicate by sharing memory; instead, share memory by communicating.

Go内存模型指出了channel作为并发控制的一个特性:

A send on a channel happens before the corresponding receive from that channel completes. (Golang Spec)

除了正常的在goroutine之间安全地传递共享数据, Channel还可以玩出很多的花样(模式), 本文列举了一些channel的应用模式。

促成本文诞生的因素主要包括:

  1. eapache的channels库
  2. concurrency in go 这本书
  3. Francesc Campoy的 justforfun系列中关于merge channel的实现
  4. 我在出版Scala集合手册这本书中对Scala集合的启发

下面就让我们以实例的方式看看这么模式吧。

Lock/TryLock 模式

我们知道, Go的标准库syncMutex,可以用来作为锁,但是Mutex却没有实现TryLock方法。

我们对于TryLock的定义是当前goroutine尝试获得锁, 如果成功,则获得了锁,返回true, 否则返回false。我们可以使用这个方法避免在获取锁的时候当前goroutine被阻塞住。

本来,这是一个常用的功能,在一些其它编程语言中都有实现,为什么Go中没有实现的?issue#6123有详细的讨论,在我看来,Go核心组成员本身对这个特性没有积极性,并且认为通过channel可以实现相同的方式。

Hacked Lock/TryLock 模式

其实,对于标准库的sync.Mutex要增加这个功能很简单,下面的方式就是通过hack的方式为Mutex实现了TryLock的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const mutexLocked = 1 << iota
type Mutex struct {
mu sync.Mutex
}
func (m *Mutex) Lock() {
m.mu.Lock()
}
func (m *Mutex) Unlock() {
m.mu.Unlock()
}
func (m *Mutex) TryLock() bool {
return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.mu)), 0, mutexLocked)
}
func (m *Mutex) IsLocked() bool {
return atomic.LoadInt32((*int32)(unsafe.Pointer(&m.mu))) == mutexLocked
}

如果你看一下Mutex实现的源代码,就很容易理解上面的这段代码了,因为mutex实现锁主要利用CAS对它的一个int32字段做操作。

上面的代码还额外增加了一个IsLocked方法,不过这个方法一般不常用,因为查询和加锁这两个方法执行的时候不是一个原子的操作,素以这个方法一般在调试和打日志的时候可能有用。

TryLock By Channel

既然标准库中不准备在Mutex上增加这个方法,而是推荐使用channel来实现,那么就让我们看看如何使用 channel来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Mutex struct {
ch chan struct{}
}
func NewMutex() *Mutex {
mu := &Mutex{make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}
func (m *Mutex) Lock() {
<-m.ch
}
func (m *Mutex) Unlock() {
select {
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
func (m *Mutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
}
return false
}
func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}

主要是利用channel边界情况下的阻塞特性实现的。

你还可以将缓存的大小从1改为n,用来处理n个锁(资源)。

TryLock with Timeout

有时候,我们在获取一把锁的时候,由于有竞争的关系,在锁被别的goroutine拥有的时候,当前goroutine没有办法立即获得锁,只能阻塞等待。标准库并没有提供等待超时的功能,我们尝试实现它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type Mutex struct {
ch chan struct{}
}
func NewMutex() *Mutex {
mu := &Mutex{make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}
func (m *Mutex) Lock() {
<-m.ch
}
func (m *Mutex) Unlock() {
select {
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
func (m *Mutex) TryLock(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case <-m.ch:
timer.Stop()
return true
case <-time.After(timeout):
}
return false
}
func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}

你也可以把它用Context来改造,不是利用超时,而是利用Context来取消/超时获得锁的操作,这个作业留给读者来实现。

Or Channel 模式

当你等待多个信号的时候,如果收到任意一个信号, 就执行业务逻辑,忽略其它的还未收到的信号。

举个例子, 我们往提供相同服务的n个节点发送请求,只要任意一个服务节点返回结果,我们就可以执行下面的业务逻辑,其它n-1的节点的请求可以被取消或者忽略。当n=2的时候,这就是back request模式。 这样可以用资源来换取latency的提升。

需要注意的是,当收到任意一个信号的时候,其它信号都被忽略。如果用channel来实现,只要从任意一个channel中接收到一个数据,那么所有的channel都可以被关闭了(依照你的实现,但是输出的channel肯定会被关闭)。

有三种实现的方式: goroutine、reflect和递归。

Goroutine方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func or(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
var once sync.Once
for _, c := range chans {
go func(c <-chan interface{}) {
select {
case <-c:
once.Do(func() { close(out) })
case <-out:
}
}(c)
}
}()
return out
}

or函数可以处理n个channel,它为每个channel启动一个goroutine,只要任意一个goroutine从channel读取到数据,输出的channel就被关闭掉了。

为了避免并发关闭输出channel的问题,关闭操作只执行一次。

Reflect方式

Go的反射库针对select语句有专门的数据(reflect.SelectCase)和函数(reflect.Select)处理。
所以我们可以利用反射“随机”地从一组可选的channel中接收数据,并关闭输出channel。

这种方式看起来更简洁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func or(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
var cases []reflect.SelectCase
for _, c := range channels {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
})
}
reflect.Select(cases)
}()
return orDone
}

递归方式

递归方式一向是比较开脑洞的实现,下面的方式就是分而治之的方式,逐步合并channel,最终返回一个channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func or(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
m := len(channels) / 2
select {
case <-or(channels[:m]...):
case <-or(channels[m:]...):
}
}
}()
return orDone
}

在后面的扇入(合并)模式中,我们还是会使用相同样的递归模式来合并多个输入channel,根据 justforfun 的测试结果,这种递归的方式要比goroutine、Reflect更有效。

Or-Done-Channel模式

这种模式是我们经常使用的一种模式,通过一个信号channel(done)来控制(取消)输入channel的处理。

一旦从done channel中读取到一个信号,或者done channel被关闭, 输入channel的处理则被取消。

这个模式提供一个简便的方法,把done channel 和 输入 channel 融合成一个输出channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}

扇入模式

扇入模式(FanIn)是将多个同样类型的输入channel合并成一个同样类型的输出channel,也就是channel的合并。

Goroutine方式

每个channel起一个goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func fanIn(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
var wg sync.WaitGroup
wg.Add(len(chans))
for _, c := range chans {
go func(c <-chan interface{}) {
for v := range c {
out <- v
}
wg.Done()
}(c)
}
wg.Wait()
close(out)
}()
return out
}

Reflect

利用反射库针对select语句的处理合并输入channel。

下面这种实现方式其实还是有些问题的, 在输入channel读取比较均匀的时候比较有效,否则性能比较低下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
var cases []reflect.SelectCase
for _, c := range chans {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
})
}
for len(cases) > 0 {
i, v, ok := reflect.Select(cases)
if !ok { //remove this case
cases = append(cases[:i], cases[i+1:]...)
continue
}
out <- v.Interface()
}
}()
return out
}

递归方式

这种方式虽然理解起来不直观,但是性能还是不错的(输入channel不是很多的情况下递归层级不会很高,不会成为瓶颈)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
switch len(chans) {
case 0:
c := make(chan interface{})
close(c)
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default:
m := len(chans) / 2
return mergeTwo(
fanInRec(chans[:m]...),
fanInRec(chans[m:]...))
}
}
func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
for a != nil || b != nil {
select {
case v, ok := <-a:
if !ok {
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok {
b = nil
continue
}
c <- v
}
}
}()
return c
}

Tee模式

扇出模式(FanOut)是将一个输入channel扇出为多个channel。

扇出行为至少可以分为两种:

  1. 从输入channel中读取一个数据,发送给每个输入channel,这种模式称之为Tee模式
  2. 从输入channel中读取一个数据,在输出channel中选择一个channel发送

本节只介绍第一种情况,下一节介绍第二种情况

Goroutine方式

将读取的值发送给每个输出channel, 异步模式可能会产生很多的goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
go func() {
defer func() {
for i := 0; i < len(out); i++ {
close(out[i])
}
}()
for v := range ch {
v := v
for i := 0; i < len(out); i++ {
i := i
if async {
go func() {
out[i] <- v
}()
} else {
out[i] <- v
}
}
}
}()
}

Reflect方式

这种模式一旦一个输出channel被阻塞,可能会导致后续的处理延迟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() {
for i := 0; i < len(out); i++ {
close(out[i])
}
}()
cases := make([]reflect.SelectCase, len(out))
for i := range cases {
cases[i].Dir = reflect.SelectSend
}
for v := range ch {
v := v
for i := range cases {
cases[i].Chan = reflect.ValueOf(out[i])
cases[i].Send = reflect.ValueOf(v)
}
for _ = range cases { // for each channel
chosen, _, _ := reflect.Select(cases)
cases[chosen].Chan = reflect.ValueOf(nil)
}
}
}()
}

分布模式

分布模式将从输入channel中读取的值往输出channel中的其中一个发送。

Goroutine方式

roundrobin的方式选择输出channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func fanOut(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() {
for i := 0; i < len(out); i++ {
close(out[i])
}
}()
// roundrobin
var i = 0
var n = len(out)
for v := range ch {
v := v
out[i] <- v
i = (i + 1) % n
}
}()
}

Reflect方式

利用发射随机的选择。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() {
for i := 0; i < len(out); i++ {
close(out[i])
}
}()
cases := make([]reflect.SelectCase, len(out))
for i := range cases {
cases[i].Dir = reflect.SelectSend
cases[i].Chan = reflect.ValueOf(out[i])
}
for v := range ch {
v := v
for i := range cases {
cases[i].Send = reflect.ValueOf(v)
}
_, _, _ = reflect.Select(cases)
}
}()
}

eapache

eapache/channels提供了一些channel应用模式的方法,比如上面的扇入扇出模式等。

因为go本身的channel无法再进行扩展, eapache/channels库定义了自己的channel接口,并提供了与channel方便的转换。

eapache/channels 提供了四个方法:

  • Distribute: 从输入channel读取值,发送到其中一个输出channel中。当输入channel关闭后,输出channel都被关闭
  • Tee: 从输入channel读取值,发送到所有的输出channel中。当输入channel关闭后,输出channel都被关闭
  • Multiplex: 合并输入channel为一个输出channel, 当所有的输入都关闭后,输出才关闭
  • Pipe: 将两个channel串起来

同时对上面的四个函数还提供了WeakXXX的函数,输入关闭后不会关闭输出。

下面看看对应的函数的例子。

Distribute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func testDist() {
fmt.Println("dist:")
a := channels.NewNativeChannel(channels.None)
outputs := []channels.Channel{
channels.NewNativeChannel(channels.None),
channels.NewNativeChannel(channels.None),
channels.NewNativeChannel(channels.None),
channels.NewNativeChannel(channels.None),
}
channels.Distribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
//channels.WeakDistribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
go func() {
for i := 0; i < 5; i++ {
a.In() <- i
}
a.Close()
}()
for i := 0; i < 6; i++ {
var v interface{}
var j int
select {
case v = <-outputs[0].Out():
j = 0
case v = <-outputs[1].Out():
j = 1
case v = <-outputs[2].Out():
j = 2
case v = <-outputs[3].Out():
j = 3
}
fmt.Printf("channel#%d: %d\n", j, v)
}
}

Tee

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func testTee() {
fmt.Println("tee:")
a := channels.NewNativeChannel(channels.None)
outputs := []channels.Channel{
channels.NewNativeChannel(channels.None),
channels.NewNativeChannel(channels.None),
channels.NewNativeChannel(channels.None),
channels.NewNativeChannel(channels.None),
}
channels.Tee(a, outputs[0], outputs[1], outputs[2], outputs[3])
//channels.WeakTee(a, outputs[0], outputs[1], outputs[2], outputs[3])
go func() {
for i := 0; i < 5; i++ {
a.In() <- i
}
a.Close()
}()
for i := 0; i < 20; i++ {
var v interface{}
var j int
select {
case v = <-outputs[0].Out():
j = 0
case v = <-outputs[1].Out():
j = 1
case v = <-outputs[2].Out():
j = 2
case v = <-outputs[3].Out():
j = 3
}
fmt.Printf("channel#%d: %d\n", j, v)
}
}

Multiplex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func testMulti() {
fmt.Println("multi:")
a := channels.NewNativeChannel(channels.None)
inputs := []channels.Channel{
channels.NewNativeChannel(channels.None),
channels.NewNativeChannel(channels.None),
channels.NewNativeChannel(channels.None),
channels.NewNativeChannel(channels.None),
}
channels.Multiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
//channels.WeakMultiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
go func() {
for i := 0; i < 5; i++ {
for j := range inputs {
inputs[j].In() <- i
}
}
for i := range inputs {
inputs[i].Close()
}
}()
for v := range a.Out() {
fmt.Printf("%d ", v)
}
}

Pipe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func testPipe() {
fmt.Println("pipe:")
a := channels.NewNativeChannel(channels.None)
b := channels.NewNativeChannel(channels.None)
channels.Pipe(a, b)
// channels.WeakPipe(a, b)
go func() {
for i := 0; i < 5; i++ {
a.In() <- i
}
a.Close()
}()
for v := range b.Out() {
fmt.Printf("%d ", v)
}
}

集合操作

从channel的行为来看,它看起来很像一个数据流,所以我们可以实现一些类似Scala 集合的操作。

Scala的集合类提供了丰富的操作(方法), 当然其它的一些编程语言或者框架也提供了类似的方法, 比如Apache Spark、Java Stream、ReactiveX等。

下面列出了一些方法的实现,我相信经过一些人的挖掘,相关的方法可以变成一个很好的类库,但是目前我们先看一些例子。

skip

skip函数是从一个channel中跳过开一些数据,然后才开始读取。

skipN

skipN跳过开始的N个数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func skipN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case <-valueStream:
}
}
for {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}

skipFn

skipFn 提供Fn函数为true的数据,比如跳过偶数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func skipFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for {
select {
case <-done:
return
case v := <-valueStream:
if !fn(v) {
takeStream <- v
}
}
}
}()
return takeStream
}

skipWhile

跳过开头函数fn为true的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func skipWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
take := false
for {
select {
case <-done:
return
case v := <-valueStream:
if !take {
take = !fn(v)
if !take {
continue
}
}
takeStream <- v
}
}
}()
return takeStream
}

take

skip的反向操作,读取一部分数据。

takeN

takeN 读取开头N个数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}

takeFn

takeFn 只筛选满足fn的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func takeFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for {
select {
case <-done:
return
case v := <-valueStream:
if fn(v) {
takeStream <- v
}
}
}
}()
return takeStream
}

takeWhile

takeWhile只挑选开头满足fn的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func takeWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for {
select {
case <-done:
return
case v := <-valueStream:
if !fn(v) {
return
}
takeStream <- v
}
}
}()
return takeStream
}

flat

平展(flat)操作是一个有趣的操作。

如果输入是一个channel,channel中的数据还是相同类型的channel, 那么flat将返回一个输出channel,输出channel中的数据是输入的各个channel中的数据。

它与扇入不同,扇入的输入channel在调用的时候就是固定的,并且以数组的方式提供,而flat的输入是一个channel,可以运行时随时的加入channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
func flat(done <-chan struct{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
if ok == false {
return
}
stream = maybeStream
case <-done:
return
}
for val := range orDone(done, stream) {
select {
case valStream <- val:
case <-done:
}
}
}
}()
return valStream
}

map

map和reduce是一组常用的操作。

map将一个channel映射成另外一个channel, channel的类型可以不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
out := make(chan interface{})
if in == nil {
close(out)
return out
}
go func() {
defer close(out)
for v := range in {
out <- fn(v)
}
}()
return out
}

因为map是go的关键字,所以我们不能命名函数类型为map,这里用mapChan代替。

比如你可以处理一个公司员工工资的channel, 输出一个扣税之后的员工工资的channel。

reduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
if in == nil {
return nil
}
out := <-in
for v := range in {
out = fn(out, v)
}
return out
}
你可以用`reduce`实现`sum``max``min`等聚合操作。

总结

本文列出了channel的一些深入应用的模式,相信通过阅读本文,你可以更加深入的了解Go的channel类型,并在开发中灵活的应用channel。也欢迎你在评论中提出更多的 channel的应用模式。

所有的代码可以在github上找到: smallnest/channels

参考资料

  1. https://github.com/kat-co/concurrency-in-go-src
  2. https://github.com/campoy/justforfunc/tree/master/27-merging-chans
  3. https://github.com/eapache/channels
  4. https://github.com/LK4D4/trylock
  5. https://stackoverflow.com/questions/36391421/explain-dont-communicate-by-sharing-memory-share-memory-by-communicating
  6. https://github.com/lrita/gosync
  7. https://www.ardanlabs.com/blog/2017/10/the-behavior-of-channels.html