使用Scala开发Kafka应用

Kafka官方提供了ProducerConsumer的例子。 尽管Kafka是由Scala开发的,但是却没有提供使用Scala编写Producer和Consumer的例子。

本文介绍了使用Scala开发producer和consumer的例子。

项目代码可以在github上浏览下载: kafka-example-in-scala

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.colobu.kafka
import kafka.producer.ProducerConfig
import java.util.Properties
import kafka.producer.Producer
import scala.util.Random
import kafka.producer.KeyedMessage
import java.util.Date
object ScalaProducerExample extends App {
val events = args(0).toInt
val topic = args(1)
val brokers = args(2)
val rnd = new Random()
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
//props.put("partitioner.class", "com.colobu.kafka.SimplePartitioner")
props.put("producer.type", "async")
//props.put("request.required.acks", "1")
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
val t = System.currentTimeMillis()
for (nEvents <- Range(0, events)) {
val runtime = new Date().getTime();
val ip = "192.168.2." + rnd.nextInt(255);
val msg = runtime + "," + nEvents + ",www.example.com," + ip;
val data = new KeyedMessage[String, String](topic, ip, msg);
producer.send(data);
}
System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t));
producer.close();
}

代码很简单,首先设置一些配置:

1
2
3
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("producer.type", "async")

这里使用异步的方式发送Kafka消息。
然后生成producer来发送指定数量的KeyedMessage。

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.colobu.kafka
import java.util.Properties
import java.util.concurrent._
import scala.collection.JavaConversions._
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import kafka.utils._
import kafka.utils.Logging
import kafka.consumer.KafkaStream
class ScalaConsumerExample(val zookeeper: String,
val groupId: String,
val topic: String,
val delay: Long) extends Logging {
val config = createConsumerConfig(zookeeper, groupId)
val consumer = Consumer.create(config)
var executor: ExecutorService = null
def shutdown() = {
if (consumer != null)
consumer.shutdown();
if (executor != null)
executor.shutdown();
}
def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
val props = new Properties()
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("auto.offset.reset", "largest");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
val config = new ConsumerConfig(props)
config
}
def run(numThreads: Int) = {
val topicCountMap = Map(topic -> numThreads)
val consumerMap = consumer.createMessageStreams(topicCountMap);
val streams = consumerMap.get(topic).get;
executor = Executors.newFixedThreadPool(numThreads);
var threadNumber = 0;
for (stream <- streams) {
executor.submit(new ScalaConsumerTest(stream, threadNumber, delay))
threadNumber += 1
}
}
}
object ScalaConsumerExample extends App {
val example = new ScalaConsumerExample(args(0), args(1), args(2),args(4).toLong)
example.run(args(3).toInt)
}
class ScalaConsumerTest(val stream: KafkaStream[Array[Byte], Array[Byte]], val threadNumber: Int, val delay: Long) extends Logging with Runnable {
def run {
val it = stream.iterator()
while (it.hasNext()) {
val msg = new String(it.next().message());
System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + msg);
}
System.out.println("Shutting down Thread: " + threadNumber);
}
}

首先是设置配置,然后生成ZookeeperConsumerConnector,设置处理的线程处理KafkaStream。