如何为ZooKeeper, Kafka 和 Spark 应用编写单元测试

ZooKeeper, Kafka和Spark是当下流行的大数据平台工具之一。这两年得到飞速的发展,国内厂商也越来越多的使用它们。
本站有多篇文章介绍了它们的开发指南, 如:

官方网站提供了很多的代码例子,互联网上也有很多的开发例子,你可以很容易的学习如果编写基于这些平台框架的技术。
但是如何为这些应用编写单元测试呢? 本文提供了几种编写单元测试的技术。

阅读全文

跟着实例学习ZooKeeper的用法: 文章汇总


跟着实例学习ZooKeeper的用法提供了全面的例子, 演示了Curator在项目中的应用,全面的介绍了ZooKeeper的Recipe的实现, 以及CuratorFramework的基本用法。
都是独立的, 你可以挑选感兴趣的章节阅读,方便在需要的时候查找。
欢迎在评论中提供意见和建议以及内容的错误,我会及时的更新。
以下是各章节的汇总。

所有的代码都可以在github下载。

leader选举

Leader latch

Leader Election

分布式锁

可重入锁Shared Reentrant Lock

不可重入锁Shared Lock

可重入读写锁Shared Reentrant Read Write Lock

信号量Shared Semaphore

多锁对象 Multi Shared Lock

Barriers

栅栏Barrier

双栅栏Double Barrier

计数器Counters

Shared Counter

Distributed Atomic Long

缓存Caches

Path Cache

Node Cache

Tree Cache

临时节点

队列Queues

Distributed Queue

Distributed Id Queue

Distributed Priority Queue

Distributed Delay Queue

Simple Distributed Queue

Curator框架应用

框架介绍

操作方法

事务

Curator扩展库

跟着实例学习ZooKeeper的用法: Curator扩展库

还记得Curator提供哪几个组件吗? 我们不妨回顾一下:

  • Recipes
  • Framework
  • Utilities
  • Client
  • Errors
  • Extensions

前面的例子其实前五个组件都涉及到了, 比如Utilities例子的TestServer, Client里的CuratorZookeeperClient, Errors里的ConnectionStateListener等。 还有最后一个组件我们还没有介绍,那就是Curator扩展组件。

Recipes组件包含了丰富的Curator应用的组件。 但是这些并不是ZooKeeper Recipe的全部。 大量的分布式应用已经抽象出了许许多多的的Recipe,其中有些还是可以通过Curator来实现。
如果不断都将这些Recipe都增加到Recipes中, Recipes会变得越来越大。 为了避免这种状况, Curator把一些其它的Recipe放在单独的包中, 命名方式就是curator-x-,比如curator-x-discovery, curator-x-rpc。
本文就是介绍curator-x-discovery。

阅读全文

跟着实例学习ZooKeeper的用法: 队列

Curator也提供ZK Recipe的分布式队列实现。 利用ZK的 PERSISTENTSEQUENTIAL节点, 可以保证放入到队列中的项目是按照顺序排队的。 如果单一的消费者从队列中取数据, 那么它是先入先出的,这也是队列的特点。 如果你严格要求顺序,你就的使用单一的消费者,可以使用leader选举只让leader作为唯一的消费者。

但是, 根据Netflix的Curator作者所说, ZooKeeper真心不适合做Queue,或者说ZK没有实现一个好的Queue,详细内容可以看 Tech Note 4, 原因有五:

  1. ZK有1MB 的传输限制。 实践中ZNode必须相对较小,而队列包含成千上万的消息,非常的大。
  2. 如果有很多节点,ZK启动时相当的慢。 而使用queue会导致好多ZNode. 你需要显著增大 initLimit 和 syncLimit.
  3. ZNode很大的时候很难清理。Netflix不得不创建了一个专门的程序做这事。
  4. 当很大量的包含成千上万的子节点的ZNode时, ZK的性能变得不好
  5. ZK的数据库完全放在内存中。 大量的Queue意味着会占用很多的内存空间。

尽管如此, Curator还是创建了各种Queue的实现。 如果Queue的数据量不太多,数据量不太大的情况下,酌情考虑,还是可以使用的。

阅读全文

跟着实例学习ZooKeeper的用法: Barrier

分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。

比如赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,所有的赛马都飞奔而出。

栅栏Barrier

DistributedBarrier类实现了栅栏的功能。 它的构造函数如下:

1
2
3
4
public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

首先你需要设置栅栏,它将阻塞在它上面等待的线程:

1
setBarrier();

然后需要阻塞的线程调用``方法等待放行条件:

1
public void waitOnBarrier()

当条件满足时,移除栅栏,所有等待的线程将继续执行:

1
removeBarrier();

异常处理
DistributedBarrier 会监控连接状态,当连接断掉时waitOnBarrier()方法会抛出异常。

看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.colobu.zkrecipe.barrier;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
public class DistributedBarrierExample {
private static final int QTY = 5;
private static final String PATH = "/examples/barrier";
public static void main(String[] args) throws Exception {
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
controlBarrier.setBarrier();
for (int i = 0; i < QTY; ++i) {
final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
final int index = i;
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
Thread.sleep((long) (3 * Math.random()));
System.out.println("Client #" + index + " waits on Barrier");
barrier.waitOnBarrier();
System.out.println("Client #" + index + " begins");
return null;
}
};
service.submit(task);
}
Thread.sleep(10000);
System.out.println("all Barrier instances should wait the condition");
controlBarrier.removeBarrier();
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
}
}
}

这个例子创建了controlBarrier来设置栅栏和移除栅栏。
我们创建了5个线程,在此Barrier上等待。
最后移除栅栏后所有的线程才继续执行。

如果你开始不设置栅栏,所有的线程就不会阻塞住。

双栅栏Double Barrier

双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。
双栅栏类是DistributedDoubleBarrier
构造函数为:

1
2
3
4
5
6
7
8
9
10
public DistributedDoubleBarrier(CuratorFramework client,
String barrierPath,
int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.
Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter方法被调用时,成员被阻塞,直到所有的成员都调用了enter。 当leave方法被调用时,它也阻塞调用线程, 知道所有的成员都调用了leave
就像百米赛跑比赛, 发令枪响, 所有的运动员开始跑,等所有的运动员跑过终点线,比赛才结束。

DistributedBarrier 会监控连接状态,当连接断掉时enter()leave方法会抛出异常。

例子代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.colobu.zkrecipe.barrier;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
public class DistributedBarrierExample {
private static final int QTY = 5;
private static final String PATH = "/examples/barrier";
public static void main(String[] args) throws Exception {
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
final int index = i;
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
Thread.sleep((long) (3 * Math.random()));
System.out.println("Client #" + index + " enters");
barrier.enter();
System.out.println("Client #" + index + " begins");
Thread.sleep((long) (3000 * Math.random()));
barrier.leave();
System.out.println("Client #" + index + " left");
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
}
}
}

分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。

阅读全文