Kafka实践:通过8个case学习Kafka的功能

Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

Apache Kafka与传统消息系统相比,有以下不同:

  • 它被设计为一个分布式系统,易于向外扩展;
  • 它同时为发布和订阅提供高吞吐量;
  • 它支持多订阅者,当失败时能自动平衡消费者;
  • 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。

我们将通过几个case学习Kafka的功能。

这几个case是我在工作中解决一个问题而对Kafka的功能进行的调研。另外还做了性能的测试,在本文中没有提到。

  • 分区
    • case1: 同一个group的consumer并发处理消息
    • case2: 关闭一个consumer,消息会继续由另一个consumer处理
    • case3: 恢复consumer,会balance回来
  • offset
    • case4: auto.commit = true
    • case5: offset out of range
    • case6: auto.commit = false
  • delay
    • case7: delay功能
    • case8: 容错,是否会丢失数据

启动Kafka boker,创建Topic

启动kafka:

1
bin/kafka-server-start.sh config/server.properties

查看topic list:

1
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看特定的topic

1
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic colobu

创建topic:

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

High-level consumer实现

Producer实现

case 1

启动consumer 1:

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 grouo1 colobu 2 > 1.txt

启动consumer 2:

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 8 > 2.txt

发送 10000 个消息:

1
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 10000 colobu

查看每个consumer处理的messsage数量:

1
2
3
4
5
cat 1.txt|wc -l
2024
cat 2.txt|wc -l
7976

总数为 2024+7976=10000, 没有message丢失, 处理比大致为2024:7976 = 1:4,和我们的预想是一样的。

case 2 & case 3:

启动consumer 1:

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 grouo1 colobu 5 > 1.txt

启动consumer 2:

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 5 > 2.txt

发送 1000000 个消息:

1
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 500000 colobu

CTRL + C停掉 consumer1, 可以看到 consumer2 继续工作, consumer1停止了工作:

1
2
3
tail -f 1.txt
tail -f 2.txt

重新启动 consumer1,将处理:

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 5 >> 1.txt

可以看到 consumer1 和 consumer2 继续工作:

1
tail -f 1.txt

等处理完我们检查一下consumer1和consumer2处理的消息总数,看是否有消息丢失:

1
2
cat *.txt|wc -l
500504

看起来我们处理的消息似乎多了一些,相信这是在consumer1重启的时候consumer2的offset还没来得及commit。

case 4

offset的值保存在zookeeper的如下节点中:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

可以看到不同的topic不同的分区有着自己的offset, 并且不同组分别拥有自己独立的offset。

这里有一个简单的工具可以监控offset: KafkaOffsetMonitor,
运行下面的命令就可以启动一个web界面来监控:

1
2
3
4
5
6
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk zk-server1,zk-server2 \
--port 8080 \
--refresh 10.seconds \
--retain 2.days

上面我们已经做了实验, 所以在zookeeper应该有offset的值,你可以用zkCli.sh或者上面的命令查询一下:

1
2
get /consumers/group1/offsets/colobu/3
39919

因为auto.commit.enable默认为true, consumer会定时地将offset写入到zookeeper上(时间间隔由auto.commit.interval.ms决定,默认1分钟).

case 5

根据Kafka的文档, 参数auto.offset.reset的功能如下所示:

what to do if an offset is out of range.
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
anything else: throw exception to the consumer

如果offset is out of range, 这个参数决定了comsumer的行为:

smallest, 将offset设为当前所能用的最小的offset。 注意不一定是0。
largest, 将offset设为当前可用的最大的offset。也就是consumer将只处理最新写入的消息。 默认值。
其它, 校验失败,抛出kafka.common.InvalidConfigException异常。 看起来只能设置这两个值

我们可以将zookeeper的 /consumers节点删除::

1
[zk: localhost:2181(CONNECTED) 15] rmr /consumers

然后将的代码中加入一行

1
props.put("auto.offset.reset", "smallest");

或者

..
1
2
props.put("auto.offset.reset", "largest");

如果设置为largest,可以看到consumer不会处理以前发送的消息,只会处理新进的消息。
如果设置为smallest,consumer从头开始所有的消息。

case 6

默认情况下auto.commit.enable等于true,这也就意味着consumer会定期的commit offset。 前面也介绍了, zookeeper中节点中记录这这些offset。
我们也可以手工进行commit。

首先我们先将zookeeper中把/consumers节点删除掉。
然后在ConsumerExample.java中增加下面一行:

1
props.put("auto.commit.enable", "false");

然后启动consumer:

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 10

过几分钟检查zookeeper的/consumers/group1节点,发现并没有offsets节点。
这符合我们的期望,因为我们将自动提交设为false。

修改ConsumerTest.javarun方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
int count = 0;
while (it.hasNext()) {
//格式: 处理时间,线程编号:消息
System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + new String(it.next().message()));
count++;
if (count == 100) {
consumer.commitOffsets();
count = 0;
}
}
System.out.println("Shutting down Thread: " + threadNumber);
}

当处理了100个消息后就commit offset一次。
启动consumer,只运行一个线程,这样我们可以直观的感受到commit.如果多个线程, 我们不能精确确定应该发多少个消息才能使某个线程处理100个消息。

1
java -jar kafka-0.1.0-SNAPSHOT.jar localhost:2181 group1 colobu 1

先用producer发送99个消息,

1
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 99 colobu

查看zookeeper的节点/consumers/group1下依然没有offsets节点。
再发送一个消息,

1
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 1 colobu

可以看到zookeeper的节点/consumers/group1下增加了offsets节点。

素以如果你有必要,你也可以手工控制offset的commit的时机。

case 7

这是我现在想利用Kafka实现的一个分布式DelayQueue的功能。
在我的文章跟着实例学习ZooKeeper的用法:讲到,利用zookeeper可以实现Distributed Delay Queue,但是Curator的作者也讲了, zookeeper真心不适合做queue,尤其在数据量很大的情况。

可以利用redis的sorted set实现: Delay queue in redis,
或者使用其它的一些框架实现。

这里我利用Kafka实现这样的一个功能。
实现方式大概和这个帖子相同: Delayed Queue

Each produced message should have a timestamp at which it was pushed to the queue. At the
consumer side, fetch a message from a partition and compare the message timestamp with system's timestamp to see if enough time has passed for you
to process the message. If enough time has passed, process the message and commit the message's offset otherwise make sure you do not commit the
offset.

我们将ConsumerTest的run方法修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
//格式: 处理时间,线程编号:消息
String msg = new String(it.next().message());
long t = System.currentTimeMillis() - Long.parseLong(msg.substring(0, msg.indexOf(",")));
if (t < 5 * 60 *1000) {
try {
Thread.currentThread().sleep(5 * 60 *1000 -t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + msg);
}
System.out.println("Shutting down Thread: " + threadNumber);
}

然后用producer发送100消息:

1
java -cp kafka-0.1.0-SNAPSHOT.jar com.colobu.kafka.ProducerExample 100 colobu

可以看到consumer5分钟后才处理这100个数据。

看起来方案可行。

case 8

注意offset是自动commit的。 在上面的例子中,如果sleep的过程中consumer重启,是否会有message丢失或者重复处理呢?
Kafka默认往zookeeper上写offset的频率是10秒。

查看Kafka的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import kafka.utils.{IteratorTemplate, Logging, Utils}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference
import kafka.message.{MessageAndOffset, MessageAndMetadata}
import kafka.common.{KafkaException, MessageSizeTooLargeException}
/**
* An iterator that blocks until a value can be read from the supplied queue.
* The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
*
*/
class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
val clientId: String)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L
private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
override def next(): MessageAndMetadata[K, V] = {
val item = super.next()
if(consumedOffset < 0)
throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
currentTopicInfo.resetConsumeOffset(consumedOffset)
val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
item
}
......
}

只有在next获取下一个消息的情况下才会更新consumedOffset,这样的话最多只有一个消息被读取还没来得及处理就因为程序crash而丢掉了。 在我们可以接受的范围之内。

其它

InfoQ网站有几篇Kafka文章挺好的,如

另外本站也有几篇我的关于Kafka的文章可供参考。
Tag: Kafka