并发编程趣题: 制造一氧化二氢

最近leetcode的算法题中新增加了一个并发问题的分类, 目前提供了四道题,一道简单题,两道中级题和一道Hard题。这是leetcode尝试在并发算法分类的一个尝试,虽然目前还在摸索阶段,有的题目可以通过作弊的方式提供高分答案,有的即使是通过了测试,但是其实还是有并发的问题的,只不过测试用例没有覆盖到而已。

这也说明了并发编程并不是一件很容易的事情,但是还是很高兴leetcode能在这个方面往前走一步。其中的最难的那道题,看它第一眼的时候觉得还是比较简单的,但是仔细思考后却发现实现它并不是一件很容易的事情,即使目前的接受率才51.9%,但其实已接收的提交有很多还是有并发的问题的,只不过由于验证的问题并没有验证出来。

这道题是这样子的,利用两个氢原子和一个氧原子生成一氧化二氢, 一氧化二氢这种化合物在常温常压下为无色无味的透明液体,也就是我们常说的。这道题目我大致翻译一下:

oxygen (氧)和 hydrogen(氢) 两个线程,你的目标是把它们分组生成水分子。当然这里有道闸门(barrier)在水分子生成之前不得不等待。oxygenhydrogen 线程每三个会被分成一组,包括两个hydrogen 线程和一个oxygen,它们三个每个都会产生一个对应的原子组合成水分子。你必须保证当前分组内的各个线程提供的原子组合成一个水分子之后这些线程才能参与产生下一个水分子

换句话说:

  • 假如一个oxygen到达了闸门,而其它hydrogen还没来,那么这个oxygen会一直等待另两个hydrogen线程
  • 假如 hydrogen线程到达了闸门,而其它的线程还没来,那么它会等待oxygen线程和另外一个hydrogen线程

所以,一个水分子总是由两个hydrogen线程和oxygen提供。

你不必担心线程是如何分组的,外部调度器可以正确地把两个hydrogen线程和一个oxygen线程分成一组。问题的关键是线程组要完整地通过闸门。

你需要编写同步代码,保证线程能够按照前面的要求有序地生成水分子。

举几个例子:

1
2
3
Input: "HOH"
Output: "HHO"
Explanation: "HOH""OHH" 也是合法的答案
1
2
3
Input: "OOHHHH"
Output: "HHOHHO"
Explanation: "HOHHHO", "OHHHHO", "HHOHOH", "HOHHOH", "OHHHOH", "HHOOHH", "HOHOHH""OHHOHH" 也是合法的答案

约束条件:

  • 总的输入是3n, 这里 1 ≤ n ≤ 30
  • 总的H的数量是2n
  • 总的O的数量是O

目前leetcode这类题目只支持C、C++、Python和Java,还不支持其它编程语言。一开始我觉得这道题目还是比较容易解决的,使用三个Java的Semaphore对线程进行编排,提交解决方案也顺利通过的,耗时也在最前面。过了两天再品味这道题的时候,发现实现还是有问题的,因为同一个hydrogen线程提供两个氢原子也可以顺利通过测试,随后看了讨论区的答案,很多也是有问题的,输出结果可能没问题,但是实际线程的运行并不符合题目的要求。

下面尝试使用Go来解决这个问题。

首先定义这个题的实现框架,保持和网站上其它语言一致:

1
2
3
4
5
6
7
8
9
10
11
12
13
type H2O struct {
}
func (h2o *H2O) hydrogen(releaseHydrogen func()) {
// releaseHydrogen() outputs "H". Do not change or remove this line.
releaseHydrogen()
}
func (h2o *H2O) oxygen(releaseOxygen func()) {
// releaseOxygen() outputs "O". Do not change or remove this line.
releaseOxygen()
}

hydrogen线程会调用hydrogen方法,而oxygen线程会调用oxygen方法。一组线程(三个线程,包括两个hydrogen和一个oxygen线程)调用完各自的方法后,输出完HHO (或者HOHOHH)后才能进行下一次的调用。

一个hydrogen线程在本次HHO完成后才能再次调用hydrogen方法。

一个oxygen线程在本次HHO完成后才能再次调用oxygen方法。

你现在的工作就是在这个代码中添加同步逻辑,保证上面的约束要满足,也就是程序能够按照要求顺利进行。

思来想去,还是回归原来的题目上,题目中已经使用了barrier的概念,我们还是使用它来实现更简洁。

实现代码如下:

Go标准库没有Barrier同步原语,不像Java中的并发类那么丰富,所以这里使用了github.com/marusama/cyclicbarrier第三方的库。

方法开始之前利用Semphore控制要编排的goroutine的数量和种类,然后使用CyclicBarrier来保证三个goroutine同时准备好了各自的原子。一旦三个goroutine都准备好了,释放Semphore准备下一次的水分子合成。

water.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package water
import (
"context"
"github.com/marusama/cyclicbarrier"
"golang.org/x/sync/semaphore"
)
type H2O struct {
semaH *semaphore.Weighted
semaO *semaphore.Weighted
b cyclicbarrier.CyclicBarrier
}
func New() *H2O {
return &H2O{
semaH: semaphore.NewWeighted(2),
semaO: semaphore.NewWeighted(1),
b: cyclicbarrier.New(3),
}
}
func (h2o *H2O) hydrogen(releaseHydrogen func()) {
h2o.semaH.Acquire(context.Background(), 1)
// releaseHydrogen() outputs "H". Do not change or remove this line.
releaseHydrogen()
h2o.b.Await(context.Background())
h2o.semaH.Release(1)
}
func (h2o *H2O) oxygen(releaseOxygen func()) {
h2o.semaO.Acquire(context.Background(), 1)
// releaseOxygen() outputs "O". Do not change or remove this line.
releaseOxygen()
h2o.b.Await(context.Background())
h2o.semaO.Release(1)
}
water_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package water
import (
"math/rand"
"sort"
"sync"
"testing"
"time"
)
func TestWaterFactory(t *testing.T) {
var ch chan string
releaseHydrogen := func() {
ch <- "H"
}
releaseOxygen := func() {
ch <- "O"
}
var N = 100
ch = make(chan string, N*3)
h2o := New()
var wg sync.WaitGroup
wg.Add(N * 3)
// h1
go func() {
for i := 0; i < N; i++ {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
h2o.hydrogen(releaseHydrogen)
wg.Done()
}
}()
// h2
go func() {
for i := 0; i < N; i++ {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
h2o.hydrogen(releaseHydrogen)
wg.Done()
}
}()
// o
go func() {
for i := 0; i < N; i++ {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
h2o.oxygen(releaseOxygen)
wg.Done()
}
}()
wg.Wait()
if len(ch) != N*3 {
t.Fatalf("expect %d atom but got %d", N*3, len(ch))
}
var s = make([]string, 3)
for i := 0; i < N; i++ {
s[0] = <-ch
s[1] = <-ch
s[2] = <-ch
sort.Strings(s)
water := s[0] + s[1] + s[2]
if water != "HHO" {
t.Fatalf("expect a water molecule but got %s", water)
}
}
}
H2O.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
class H2O {
private Semaphore semO;
private Semaphore semH;
private Phaser phaser;
public H2O() {
semO = new Semaphore(1);
semH = new Semaphore(2);
phaser = new Phaser(3);
}
public void hydrogen(Runnable releaseHydrogen) throws InterruptedException {
semH.acquire();
releaseHydrogen.run();
phaser.arriveAndAwaitAdvance();
semH.release();
}
public void oxygen(Runnable releaseOxygen) throws InterruptedException {
semO.acquire();
releaseOxygen.run();
phaser.arriveAndAwaitAdvance();
semO.release();
}
}
Main.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class Main {
public static void main(String[] args) throws InterruptedException {
Map validResult = new HashMap();
validResult.put("HhO", true);
validResult.put("HOh", true);
validResult.put("OHh", true);
validResult.put("OhH", true);
validResult.put("hOH", true);
validResult.put("hHO", true);
int count = 100;
H2O h2o = new H2O();
StringBuffer sb = new StringBuffer();
Runnable releaseHydrogen1 = () -> sb.append("H");
Runnable releaseHydrogen2 = () -> sb.append("h");
Runnable releaseOxygen = () -> sb.append("O");
HydrogenGenerator h1 = new HydrogenGenerator(count, h2o, releaseHydrogen1);
HydrogenGenerator h2 = new HydrogenGenerator(count, h2o, releaseHydrogen2);
OxygenGenerator o = new OxygenGenerator(count, h2o, releaseOxygen);
h1.start();
o.start();
Thread.sleep(1000);
h2.start();
h1.join();
h2.join();
o.join();
System.out.println(sb.toString());
for (int i = 0; i < (count - 1) * 3; i += 3) {
String s = sb.substring(i, i + 3);
if (validResult.get(s) == null) {
System.out.println("expect (H && h && O) but got " + s);
}
}
}
}
class HydrogenGenerator extends Thread {
int n;
H2O h2o;
Runnable releaseHydrogen;
Random rand = new Random(System.currentTimeMillis());
public HydrogenGenerator(int n, H2O h2o, Runnable releaseHydrogen) {
this.n = n;
this.h2o = h2o;
this.releaseHydrogen = releaseHydrogen;
}
@Override
public void run() {
for (; n >= 0; n--) {
try {
Thread.sleep(rand.nextInt(100));
h2o.hydrogen(releaseHydrogen);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class OxygenGenerator extends Thread {
int n;
H2O h2o;
Runnable releaseOxygen;
Random rand = new Random(System.currentTimeMillis());
public OxygenGenerator(int n, H2O h2o, Runnable releaseOxygen) {
this.n = n;
this.h2o = h2o;
this.releaseOxygen = releaseOxygen;
}
@Override
public void run() {
for (; n >= 0; n--) {
try {
Thread.sleep(rand.nextInt(100));
h2o.oxygen(releaseOxygen);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

本文提供了Go语言和Java语言的实现,还提供了测试用例来测试(我们就使用三个goroutine来测试):

如果不是生成水分子,而是使用四个线程一组生成双氧水,也就是过氧化氢 H₂O₂ ,又该如何编写同步代码呢?