memcached Java客户端spymemcached的一致性Hash算法

最近看到两篇文章,一个是江南白衣的陌生但默默一统江湖的MurmurHash,另外一篇是张洋的一致性哈希算法及其在分布式系统中的应用。虽然我在项目中使用memcached的java客户端spymemcached好几年了,但是对它的一致性哈希算法的细节从来没有仔细研究过。趁此机会,特别的看了一下它的源代码。

阅读全文

Spark: 大数据领域的新贵

现在谈起大数据几乎等价于谈论Hadoop及其它的生态圈产品。但是现在一个下一代的计算框架已经长大,而且声名显赫。那就是Spark。你或许已经听说过它以及它的诸多好处。
自从发布之日起,Hadoop就被认为是Google大数据工具的等价实现。它帮助很多公司处理先前不可想象的大数据。围绕着它的两个主要部件(HDFS,分布式一致性的文件系统,和MapReduce, 分布式的计算框架), 一堆相关的工具涌现, 补充并提高它的功能。

阅读全文

Kafka实践:通过8个case学习Kafka的功能

Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

Apache Kafka与传统消息系统相比,有以下不同:

  • 它被设计为一个分布式系统,易于向外扩展;
  • 它同时为发布和订阅提供高吞吐量;
  • 它支持多订阅者,当失败时能自动平衡消费者;
  • 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。

我们将通过几个case学习Kafka的功能。

阅读全文

Kafka 配置参数

Kafka为broker,producer和consumer提供了很多的配置参数。 了解并理解这些配置参数对于我们使用kafka是非常重要的。
本文列出了一些重要的配置参数。

官方的文档 Configuration比较老了,很多参数有所变动, 有些名字也有所改变。我在整理的过程中根据0.8.2的代码也做了修正。

阅读全文

key为null时Kafka会将消息发送给哪个分区?

当你编写kafka Producer时, 会生成KeyedMessage对象。

1
KeyedMessage<K, V> keyedMessage = new KeyedMessage<>(topicName, key, message)

这里的key值可以为空,在这种情况下, kafka会将这个消息发送到哪个分区上呢?依据Kafka官方的文档, 默认的分区类会随机挑选一个分区:

The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.

但是这句话相当的误导人。

阅读全文

如何为ZooKeeper, Kafka 和 Spark 应用编写单元测试

ZooKeeper, Kafka和Spark是当下流行的大数据平台工具之一。这两年得到飞速的发展,国内厂商也越来越多的使用它们。
本站有多篇文章介绍了它们的开发指南, 如:

官方网站提供了很多的代码例子,互联网上也有很多的开发例子,你可以很容易的学习如果编写基于这些平台框架的技术。
但是如何为这些应用编写单元测试呢? 本文提供了几种编写单元测试的技术。

所有的例子都可以在 这里 找到。

为ZooKeeper应用编写单元测试

一般我们不直接使用ZooKeeper的API编写Client代码,而是使用Curator或者i0itec-zkclient等包装的API开发。
原因是ZooKeeper本身提供的API太过底层,我们需要处理各种异常,并且API使用不方便。

有几种方式可以实现一个ZooKeeper simulator。

  • Curator TestingServer
    Curator提供了一个TestingServer类,可以模拟ZooKeeper,用来测试Curator应用或者其它使用ZooKeeper的应用。
    你需要在pom.xml中引用curator-test:
1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>${curator.version}</version>
</dependency>

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CuratorAppTest {
private TestingServer server;

@BeforeClass
public void setUp() throws Exception {
server = new TestingServer();
server.start();
}
@AfterClass
public void tearDown() throws IOException {
server.stop();
}

@Test
public void testSetAndGetData() {
CuratorApp app = new CuratorApp();
String payload = System.currentTimeMillis() + "";
String result = app.setAndGetData(server.getConnectString(), payload);
assertEquals(result, payload);
}

@Test
public void testWatch() throws Exception {
CuratorApp app = new CuratorApp();
app.watch(server.getConnectString());
}
}
  • EmbeddedZookeeper
    Kafka为了它的单元测试实现一个嵌入式的ZooKeeper,代码相当简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package kafka.zk

import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxnFactory
import kafka.utils.TestUtils
import java.net.InetSocketAddress
import kafka.utils.Utils
import org.apache.kafka.common.utils.Utils.getPort

class EmbeddedZookeeper(val connectString: String) {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
val tickTime = 500
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress("127.0.0.1", getPort(connectString)), 0)
factory.startup(zookeeper)

def shutdown() {
Utils.swallow(zookeeper.shutdown())
Utils.swallow(factory.shutdown())
Utils.rm(logDir)
Utils.rm(snapshotDir)
}

}

测试工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package kafka.zk

import org.scalatest.junit.JUnit3Suite
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils}

trait ZooKeeperTestHarness extends JUnit3Suite {
val zkConnect: String = TestZKUtils.zookeeperConnect
var zookeeper: EmbeddedZookeeper = null
var zkClient: ZkClient = null
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000

override def setUp() {
super.setUp
zookeeper = new EmbeddedZookeeper(zkConnect)
zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
}

override def tearDown() {
Utils.swallow(zkClient.close())
Utils.swallow(zookeeper.shutdown())
super.tearDown
}

}
  • 自己实现
    Kafka的类无法直接在java中使用,你可以使用一些花招, 比如把trait改为abstract class改造使用。
    我们可以参考Kafka的scala代码使用java自己实现一个, 主要例如org.apache.zookeeper.server.ZooKeeperServerorg.apache.zookeeper.server.NIOServerCnxnFactory
    如这个代码: gist
    或者如stackoverflow上讨论的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Properties startupProperties = ...

QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
try {
quorumConfiguration.parseProperties(startupProperties);
} catch(Exception e) {
throw new RuntimeException(e);
}

zooKeeperServer = new ZooKeeperServerMain();
final ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumConfiguration);

new Thread() {
public void run() {
try {
zooKeeperServer.runFromConfig(configuration);
} catch (IOException e) {
log.error("ZooKeeper Failed", e);
}
}
}.start();

为Kafka应用编写单元测试

Kafka代码本身就提供了单元测试,所以我们编写Kafka producer应用和consumer应用时可以参考这些应用。
你可以在线查看这些单元测试以及它们可以重用的文件: kafka单元测试类
KafkaServerTestHarness 提供了一个基本测试trait:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {

val configs: List[KafkaConfig]
var servers: List[KafkaServer] = null

override def setUp() {
super.setUp
if(configs.size <= 0)
throw new KafkaException("Must suply at least one server config.")
servers = configs.map(TestUtils.createServer(_))
}

override def tearDown() {
servers.map(server => server.shutdown())
servers.map(server => server.config.logDirs.map(Utils.rm(_)))
super.tearDown
}
}

以及提供一个初始化好producer和consumer的Trait:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
val port: Int
val host = "localhost"
var producer: Producer[String, String] = null
var consumer: SimpleConsumer = null

override def setUp() {
super.setUp
val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner")
producer = new Producer(new ProducerConfig(props))
consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
}

override def tearDown() {
producer.close()
consumer.close()
super.tearDown
}
}

如果你想在代码中使用它们,你需要引入:

1
2
3
4
5
6
7
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<classifier>test</classifier>
<scope>test</scope>
<version>${kafka.version}</version>
</dependency>

如果你想在Java直接实现这些trait是不可以的,这些Trait都有实现方法,在Java中无法直接使用,可能你需要把trait改为abstract class。 或者你使用它的TestUtils创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@BeforeClass
public void setup() {
......
}
@AfterClass
public void teardown() {
......
}
@Test
public void testReceive() {
Properties consumerProps = TestUtils.createConsumerProperties(zkServer.connectString(), "group_1", "consumer_id", 1000);
consumerProps.putAll(kafkaProps);
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

// now launch all the threads
ExecutorService executor = Executors.newFixedThreadPool(2);

// now create an object to consume the messages
int threadNumber = 0;
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new ConsumerThread(stream, threadNumber));
threadNumber++;
}
// setup producer
Properties properties = TestUtils.getProducerConfig("localhost:" + port);
properties.put("serializer.class", StringEncoder.class.getCanonicalName());

ProducerConfig pConfig = new ProducerConfig(properties);
Producer<Integer, String> producer = new Producer<>(pConfig);

// send message
for (int i = 0; i < 10; i++) {
KeyedMessage<Integer, String> data = new KeyedMessage<>(topic, "test-message-" + i);

List<KeyedMessage<Integer, String>> messages = new ArrayList<KeyedMessage<Integer, String>>();
messages.add(data);
// producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
}

try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}

executor.shutdownNow();
producer.close();
}

为Spark应用编写单元测试

如果是为普通的Spark应用编写单元测试代码, 比较简单,创建SparkContext时只需将master设为local即可。确保在finally或者teardown方法中调用SparkContext.stop(),因为Spark不允许在同一个程序中拥有两个SparkContext。
以SparkPi为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final class JavaSparkPi {
public static void main(String[] args) throws Exception {
double pi = calculatePi(args);
System.out.println("Pi is roughly " + pi);

}

public static double calculatePi(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
l.add(i);
}
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
int count = dataSet.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});
double pi = 4.0 * count / n;
jsc.stop();
return pi;
}
}

单元测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class JavaSparkPiTest {

@Test
public void testPi() {
Properties props = System.getProperties();
props.setProperty("spark.master", "local[4]");
props.setProperty("spark.rdd.compress", "true");
props.setProperty("spark.executor.memory", "1g");

try {
double pi = JavaSparkPi.calculatePi(new String[]{"1"});
assertTrue((pi -3.14) < 1);
} catch (Exception e) {
fail(e.getMessage(),e);
}
}
}

对于Spark Streaming应用, 单元测试相对复杂,因为你需要集成其它的框架。
如果你的应用集成Kafka,你可以使用上面的Kafka的测试类,
如果你的应用集成Mongo,你可以使用真实的Mongo或者fake Mongo如Fongo,
如果你的应用集成TCP 流, 你需要实现一个TCP server simulator,
基本上你应该寻找或者实现要集成的框架的simulator。