如何为ZooKeeper, Kafka 和 Spark 应用编写单元测试

ZooKeeper, Kafka和Spark是当下流行的大数据平台工具之一。这两年得到飞速的发展,国内厂商也越来越多的使用它们。
本站有多篇文章介绍了它们的开发指南, 如:

官方网站提供了很多的代码例子,互联网上也有很多的开发例子,你可以很容易的学习如果编写基于这些平台框架的技术。
但是如何为这些应用编写单元测试呢? 本文提供了几种编写单元测试的技术。

所有的例子都可以在 这里 找到。

为ZooKeeper应用编写单元测试

一般我们不直接使用ZooKeeper的API编写Client代码,而是使用Curator或者i0itec-zkclient等包装的API开发。
原因是ZooKeeper本身提供的API太过底层,我们需要处理各种异常,并且API使用不方便。

有几种方式可以实现一个ZooKeeper simulator。

  • Curator TestingServer
    Curator提供了一个TestingServer类,可以模拟ZooKeeper,用来测试Curator应用或者其它使用ZooKeeper的应用。
    你需要在pom.xml中引用curator-test:
1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>${curator.version}</version>
</dependency>

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CuratorAppTest {
private TestingServer server;
@BeforeClass
public void setUp() throws Exception {
server = new TestingServer();
server.start();
}
@AfterClass
public void tearDown() throws IOException {
server.stop();
}
@Test
public void testSetAndGetData() {
CuratorApp app = new CuratorApp();
String payload = System.currentTimeMillis() + "";
String result = app.setAndGetData(server.getConnectString(), payload);
assertEquals(result, payload);
}
@Test
public void testWatch() throws Exception {
CuratorApp app = new CuratorApp();
app.watch(server.getConnectString());
}
}
  • EmbeddedZookeeper
    Kafka为了它的单元测试实现一个嵌入式的ZooKeeper,代码相当简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package kafka.zk
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxnFactory
import kafka.utils.TestUtils
import java.net.InetSocketAddress
import kafka.utils.Utils
import org.apache.kafka.common.utils.Utils.getPort
class EmbeddedZookeeper(val connectString: String) {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
val tickTime = 500
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress("127.0.0.1", getPort(connectString)), 0)
factory.startup(zookeeper)
def shutdown() {
Utils.swallow(zookeeper.shutdown())
Utils.swallow(factory.shutdown())
Utils.rm(logDir)
Utils.rm(snapshotDir)
}
}

测试工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package kafka.zk
import org.scalatest.junit.JUnit3Suite
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils}
trait ZooKeeperTestHarness extends JUnit3Suite {
val zkConnect: String = TestZKUtils.zookeeperConnect
var zookeeper: EmbeddedZookeeper = null
var zkClient: ZkClient = null
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000
override def setUp() {
super.setUp
zookeeper = new EmbeddedZookeeper(zkConnect)
zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
}
override def tearDown() {
Utils.swallow(zkClient.close())
Utils.swallow(zookeeper.shutdown())
super.tearDown
}
}
  • 自己实现
    Kafka的类无法直接在java中使用,你可以使用一些花招, 比如把trait改为abstract class改造使用。
    我们可以参考Kafka的scala代码使用java自己实现一个, 主要例如org.apache.zookeeper.server.ZooKeeperServerorg.apache.zookeeper.server.NIOServerCnxnFactory
    如这个代码: gist
    或者如stackoverflow上讨论的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Properties startupProperties = ...
QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
try {
quorumConfiguration.parseProperties(startupProperties);
} catch(Exception e) {
throw new RuntimeException(e);
}
zooKeeperServer = new ZooKeeperServerMain();
final ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumConfiguration);
new Thread() {
public void run() {
try {
zooKeeperServer.runFromConfig(configuration);
} catch (IOException e) {
log.error("ZooKeeper Failed", e);
}
}
}.start();

为Kafka应用编写单元测试

Kafka代码本身就提供了单元测试,所以我们编写Kafka producer应用和consumer应用时可以参考这些应用。
你可以在线查看这些单元测试以及它们可以重用的文件: kafka单元测试类
KafkaServerTestHarness 提供了一个基本测试trait:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
val configs: List[KafkaConfig]
var servers: List[KafkaServer] = null
override def setUp() {
super.setUp
if(configs.size <= 0)
throw new KafkaException("Must suply at least one server config.")
servers = configs.map(TestUtils.createServer(_))
}
override def tearDown() {
servers.map(server => server.shutdown())
servers.map(server => server.config.logDirs.map(Utils.rm(_)))
super.tearDown
}
}

以及提供一个初始化好producer和consumer的Trait:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
val port: Int
val host = "localhost"
var producer: Producer[String, String] = null
var consumer: SimpleConsumer = null
override def setUp() {
super.setUp
val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner")
producer = new Producer(new ProducerConfig(props))
consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
}
override def tearDown() {
producer.close()
consumer.close()
super.tearDown
}
}

如果你想在代码中使用它们,你需要引入:

1
2
3
4
5
6
7
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<classifier>test</classifier>
<scope>test</scope>
<version>${kafka.version}</version>
</dependency>

如果你想在Java直接实现这些trait是不可以的,这些Trait都有实现方法,在Java中无法直接使用,可能你需要把trait改为abstract class。 或者你使用它的TestUtils创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@BeforeClass
public void setup() {
......
}
@AfterClass
public void teardown() {
......
}
@Test
public void testReceive() {
Properties consumerProps = TestUtils.createConsumerProperties(zkServer.connectString(), "group_1", "consumer_id", 1000);
consumerProps.putAll(kafkaProps);
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
ExecutorService executor = Executors.newFixedThreadPool(2);
// now create an object to consume the messages
int threadNumber = 0;
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new ConsumerThread(stream, threadNumber));
threadNumber++;
}
// setup producer
Properties properties = TestUtils.getProducerConfig("localhost:" + port);
properties.put("serializer.class", StringEncoder.class.getCanonicalName());
ProducerConfig pConfig = new ProducerConfig(properties);
Producer<Integer, String> producer = new Producer<>(pConfig);
// send message
for (int i = 0; i < 10; i++) {
KeyedMessage<Integer, String> data = new KeyedMessage<>(topic, "test-message-" + i);
List<KeyedMessage<Integer, String>> messages = new ArrayList<KeyedMessage<Integer, String>>();
messages.add(data);
// producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
}
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdownNow();
producer.close();
}

为Spark应用编写单元测试

如果是为普通的Spark应用编写单元测试代码, 比较简单,创建SparkContext时只需将master设为local即可。确保在finally或者teardown方法中调用SparkContext.stop(),因为Spark不允许在同一个程序中拥有两个SparkContext。
以SparkPi为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final class JavaSparkPi {
public static void main(String[] args) throws Exception {
double pi = calculatePi(args);
System.out.println("Pi is roughly " + pi);
}
public static double calculatePi(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
l.add(i);
}
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
int count = dataSet.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});
double pi = 4.0 * count / n;
jsc.stop();
return pi;
}
}

单元测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class JavaSparkPiTest {
@Test
public void testPi() {
Properties props = System.getProperties();
props.setProperty("spark.master", "local[4]");
props.setProperty("spark.rdd.compress", "true");
props.setProperty("spark.executor.memory", "1g");
try {
double pi = JavaSparkPi.calculatePi(new String[]{"1"});
assertTrue((pi -3.14) < 1);
} catch (Exception e) {
fail(e.getMessage(),e);
}
}
}

对于Spark Streaming应用, 单元测试相对复杂,因为你需要集成其它的框架。
如果你的应用集成Kafka,你可以使用上面的Kafka的测试类,
如果你的应用集成Mongo,你可以使用真实的Mongo或者fake Mongo如Fongo,
如果你的应用集成TCP 流, 你需要实现一个TCP server simulator,
基本上你应该寻找或者实现要集成的框架的simulator。