今天一位同学给我出了一道并发题,作为在极客时间开了《GO并发编程实战课》的作者,居然一时间没有回答上来,惭愧啊,所以晚上专门研究了一下题目,给出几个实现方案供探讨。
这道题貌似在哪里见过,凭借回忆我找到了,原来是Leetcode的并发题。这道题是这么说的:
编写一个可以从 1 到 n 输出代表这个数字的字符串的程序,但是:
如果这个数字可以被 3 整除,输出 "fizz"。
如果这个数字可以被 5 整除,输出 "buzz"。
如果这个数字可以同时被 3 和 5 整除,输出 "fizzbuzz"。
例如,当 n = 15,输出: 1, 2, fizz, 4, buzz, fizz, 7, 8, fizz, buzz, 11, fizz, 13, 14, fizzbuzz。
假设有这么一个类:
1 2 3 4 5 6 7
| class FizzBuzz { public FizzBuzz(int n) { ... } public void fizz(printFizz) { ... } public void buzz(printBuzz) { ... } public void fizzbuzz(printFizzBuzz) { ... } public void number(printNumber) { ... } }
|
请你实现一个有四个线程的多线程版 FizzBuzz, 同一个 FizzBuzz 实例会被如下四个线程使用:
线程A将调用 fizz() 来判断是否能被 3 整除,如果可以,则输出 fizz。
线程B将调用 buzz() 来判断是否能被 5 整除,如果可以,则输出 buzz。
线程C将调用 fizzbuzz() 来判断是否同时能被 3 和 5 整除,如果可以,则输出 fizzbuzz。
线程D将调用 number() 来实现输出既不能被 3 整除也不能被 5 整除的数字。
基本上,这是一个任务编排的并发题,其实使用最简单的并发原语就可以实现。
将并发转为串行
我开始的思路想错了,我想实现一个manager,去调度四个goroutine,但是其实有一个最简单的糙快猛的方法,就像我在专栏中出的"击鼓传花”那道题一样,可以将这四个goroutine串行化。
每一个数字,交给一个goroutine去处理,如果不是它负责处理,那么它就交给下一个goroutine。
这道题的妙处就在于,这四种case是没有交叉的,一个数字只能由一个goroutine处理,并且肯定会有一个goroutine去处理,这就好办了。在四个goroutine交递的过程中,肯定有一个goroutine会输出内容,如果它输出了内容,它就将数字加一,交给下一个groutine去检查和处理,这就开启了新的一轮数字的检查和处理。
当处理的数字大于指定的数字,它将数字交递给下一个goroutine,然后返回。这样下一个goroutine同样的处理也会返回。最后四个goroutine都返回了。
我们使用一个WaitGroup等待四个goroutine都返回,然后程序退出。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
| package main import ( "fmt" "sync" ) type FizzBuzz struct { n int chs []chan int wg sync.WaitGroup } func New(n int) *FizzBuzz { chs := make([]chan int, 4) for i := 0; i < 4; i++ { chs[i] = make(chan int, 1) } return &FizzBuzz{ n: n, chs: chs, } } func (fb *FizzBuzz) start() { fb.wg.Add(4) go fb.fizz() go fb.buzz() go fb.fizzbuzz() go fb.number() fb.chs[0] <- 1 fb.wg.Wait() } func (fb *FizzBuzz) fizz() { defer fb.wg.Done() next := fb.chs[1] for v := range fb.chs[0] { if v > fb.n { next <- v return } if v%3 == 0 { if v%5 == 0 { next <- v continue } if v == fb.n { fmt.Print(" fizz。") } else { fmt.Print(" fizz,") } next <- v + 1 continue } next <- v } } func (fb *FizzBuzz) buzz() { defer fb.wg.Done() next := fb.chs[2] for v := range fb.chs[1] { if v > fb.n { next <- v return } if v%5 == 0 { if v%3 == 0 { next <- v continue } if v == fb.n { fmt.Print(" buzz。") } else { fmt.Print(" buzz,") } next <- v + 1 continue } next <- v } } func (fb *FizzBuzz) fizzbuzz() { defer fb.wg.Done() next := fb.chs[3] for v := range fb.chs[2] { if v > fb.n { next <- v return } if v%5 == 0 && v%3 == 0 { if v == fb.n { fmt.Print(" fizzbuzz。") } else { fmt.Print(" fizzbuzz,") } next <- v + 1 continue } next <- v } } func (fb *FizzBuzz) number() { defer fb.wg.Done() next := fb.chs[0] for v := range fb.chs[3] { if v > fb.n { next <- v return } if v%5 != 0 && v%3 != 0 { if v == fb.n { fmt.Printf(" %d。", v) } else { fmt.Printf(" %d,", v) } next <- v + 1 continue } next <- v } } func main() { fb := New(15) fb.start() }
|
使用一个channel
上面讲并发转为串行的操作,总是让人感觉差强人意,我们更想并发的去执行。
所以可以转换一下思路,四个goroutine使用同一个channel。 如果某个goroutine非常幸运,从这个channel中取出一个值,它会做检查,无非两种情况:
- 正好是自己要处理的数: 它输出相应的文本,并且把值加一,再放入到channel中
- 不是自己要处理的数:把此数再放回到channel中
对于每一个数,总会有goroutine取出来并处理。
当取出来的数大于指定的数时,把此数放回到channel,并返回。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
| package main import ( "fmt" "sync" ) type FizzBuzz struct { n int ch chan int wg sync.WaitGroup } func New(n int) *FizzBuzz { return &FizzBuzz{ n: n, ch: make(chan int, 1), } } func (fb *FizzBuzz) start() { fb.wg.Add(4) go fb.fizz() go fb.buzz() go fb.fizzbuzz() go fb.number() fb.ch <- 1 fb.wg.Wait() } func (fb *FizzBuzz) fizz() { defer fb.wg.Done() for v := range fb.ch { if v > fb.n { fb.ch <- v return } if v%3 == 0 { if v%5 == 0 { fb.ch <- v continue } if v == fb.n { fmt.Print(" fizz。") } else { fmt.Print(" fizz,") } fb.ch <- v + 1 continue } fb.ch <- v } } func (fb *FizzBuzz) buzz() { defer fb.wg.Done() for v := range fb.ch { if v > fb.n { fb.ch <- v return } if v%5 == 0 { if v%3 == 0 { fb.ch <- v continue } if v == fb.n { fmt.Print(" buzz。") } else { fmt.Print(" buzz,") } fb.ch <- v + 1 continue } fb.ch <- v } } func (fb *FizzBuzz) fizzbuzz() { defer fb.wg.Done() for v := range fb.ch { if v > fb.n { fb.ch <- v return } if v%5 == 0 && v%3 == 0 { if v == fb.n { fmt.Print(" fizzbuzz。") } else { fmt.Print(" fizzbuzz,") } fb.ch <- v + 1 continue } fb.ch <- v } } func (fb *FizzBuzz) number() { defer fb.wg.Done() for v := range fb.ch { if v > fb.n { fb.ch <- v return } if v%5 != 0 && v%3 != 0 { if v == fb.n { fmt.Printf(" %d。", v) } else { fmt.Printf(" %d,", v) } fb.ch <- v + 1 continue } fb.ch <- v } } func main() { fb := New(15) fb.start() }
|
这里有一个知识点: 会不会总是只有一个goroutine把值取出来放回去,取出来放回去,别的goroutine没有机会读取到这个值呢?
不会的,根据channel的实现,waiter还是有先来后到之说的,某个goroutine总是能有机会取到自己要处理的数据。
使用栅栏
其实我一直想实现的是使用cyclicbarrier,可能有些同学不熟悉这个并发原语,但是针对这个场景,使用Barrier代码就非常的简洁,逻辑非常的明了。
而且循环栅栏更适合这个场景,因为我们要重复使用栅栏。
针对每一个数字,我们都设置了栅栏,让每一个goroutine都去检查和处理,四个goroutine中总会有一个会处理这个数字。处理完这个数字之后,把数字加一,等待下一次栅栏放行。

| package main import ( "context" "fmt" "sync" "github.com/marusama/cyclicbarrier" ) type FizzBuzz struct { n int barrier cyclicbarrier.CyclicBarrier wg sync.WaitGroup } func New(n int) *FizzBuzz { return &FizzBuzz{ n: n, barrier: cyclicbarrier.New(4), } } func (fb *FizzBuzz) start() { fb.wg.Add(4) go fb.fizz() go fb.buzz() go fb.fizzbuzz() go fb.number() fb.wg.Wait() } func (fb *FizzBuzz) fizz() { defer fb.wg.Done() ctx := context.Background() v := 0 for { fb.barrier.Await(ctx) v++ if v > fb.n { return } if v%3 == 0 { if v%5 == 0 { continue } if v == fb.n { fmt.Print(" fizz。") } else { fmt.Print(" fizz,") } } } } func (fb *FizzBuzz) buzz() { defer fb.wg.Done() ctx := context.Background() v := 0 for { fb.barrier.Await(ctx) v++ if v > fb.n { return } if v%5 == 0 { if v%3 == 0 { continue } if v == fb.n { fmt.Print(" buzz。") } else { fmt.Print(" buzz,") } } } } func (fb *FizzBuzz) fizzbuzz() { defer fb.wg.Done() ctx := context.Background() v := 0 for { fb.barrier.Await(ctx) v++ if v > fb.n { return } if v%5 == 0 && v%3 == 0 { if v == fb.n { fmt.Print(" fizzbuzz。") } else { fmt.Print(" fizzbuzz,") } } } } func (fb *FizzBuzz) number() { defer fb.wg.Done() ctx := context.Background() v := 0 for { fb.barrier.Await(ctx) v++ if v > fb.n { return } if v%5 != 0 && v%3 != 0 { if v == fb.n { fmt.Printf(" %d。", v) } else { fmt.Printf(" %d,", v) } } } } func main() { fb := New(15) fb.start() }
|
谢谢 仁亮 指出第一版的代码对v的读写是有并发问题的。现在做了修改。
使用其它并发原语
我还想到了sync.Cond
、Semaphore
、WaitGroup
等并发原语,简单评估之后,我感觉使用这几个并发原语做任务编排不太合适。
sync.Cond
: 貌似可以,不过实际使用的时候,让这个四个goroutine"自洽"实现Wait/Broadcast并不太容易。使用额外的一个goroutine做Manager貌似还可以
Semaphore
: 每个goroutine获取一个信号量,相应的goroutine处理完后把数加一,然后再释放信号量貌似可以。但是有可能导致饥饿的情况,总是由一个goroutine请求信号量释放信号量
WaitGroup
: WaitGroup在这个场景下最大的困难是要重复使用,而重复使用WaitGroup很容易导致panic
所以这三个并发原语我尝试之后放弃了。我个人还是比较喜欢Barrier的实现方式,虽然它有一个“惊群”的缺点,但是对于这道题而言问题不大。
你有什么想法和实现,欢迎在原文链接的评论区留言。