Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka与传统消息系统相比,有以下不同:
它被设计为一个分布式系统,易于向外扩展; 它同时为发布和订阅提供高吞吐量; 它支持多订阅者,当失败时能自动平衡消费者; 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。 我们将通过几个case学习Kafka的功能。
这几个case是我在工作中解决一个问题而对Kafka的功能进行的调研。另外还做了性能的测试,在本文中没有提到。
分区case1: 同一个group的consumer并发处理消息 case2: 关闭一个consumer,消息会继续由另一个consumer处理 case3: 恢复consumer,会balance回来  offsetcase4: auto.commit = true case5: offset out of range case6: auto.commit = false  delaycase7: delay功能 case8: 容错,是否会丢失数据  启动Kafka boker,创建Topic 启动kafka:
1
bin/kafka-server-start.sh config/server.properties
查看topic list:
1
bin/kafka-topics.sh --list --zookeeper localhost:2181 
查看特定的topic
1
bin/kafka-topics.sh --describe --zookeeper localhost:2181  --topic colobu
创建topic:
1
bin/kafka-topics.sh --create --zookeeper localhost:2181  --replication-factor 3  --partitions 1  --topic my-replicated-topic
High-level consumer实现 Producer实现 case 1 启动consumer 1:
1
java -jar kafka-0.1 .0 -SNAPSHOT.jar localhost:2181  grouo1 colobu 2  > 1 .txt
启动consumer 2:
1
java -jar kafka-0.1 .0 -SNAPSHOT.jar localhost:2181  group1 colobu 8  > 2 .txt
发送 10000 个消息:
1
java -cp kafka-0.1 .0 -SNAPSHOT.jar com.colobu.kafka.ProducerExample 10000  colobu
查看每个consumer处理的messsage数量:
1
2
3
4
5
cat 1 .txt|wc -l 
2024 
cat 2 .txt|wc -l 
7976 
总数为 2024+7976=10000, 没有message丢失, 处理比大致为2024:7976 = 1:4,和我们的预想是一样的。
case 2 & case 3: 启动consumer 1:
1
java -jar kafka-0.1 .0 -SNAPSHOT.jar localhost:2181  grouo1 colobu 5  > 1 .txt
启动consumer 2:
1
java -jar kafka-0.1 .0 -SNAPSHOT.jar localhost:2181  group1 colobu 5  > 2 .txt
发送 1000000 个消息:
1
java -cp kafka-0.1 .0 -SNAPSHOT.jar com.colobu.kafka.ProducerExample 500000  colobu
CTRL + C停掉 consumer1, 可以看到 consumer2 继续工作, consumer1停止了工作:
1
2
3
tail -f  1 .txt
tail -f  2 .txt
重新启动 consumer1,将处理:
1
java -jar kafka-0.1 .0 -SNAPSHOT.jar localhost:2181  group1 colobu 5  >> 1 .txt
可以看到 consumer1 和 consumer2 继续工作:
等处理完我们检查一下consumer1和consumer2处理的消息总数,看是否有消息丢失:
看起来我们处理的消息似乎多了一些,相信这是在consumer1重启的时候consumer2的offset还没来得及commit。
case 4 offset的值保存在zookeeper的如下节点中:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
可以看到不同的topic不同的分区有着自己的offset, 并且不同组分别拥有自己独立的offset。
这里有一个简单的工具可以监控offset: KafkaOffsetMonitor ,
1
2
3
4
5
6
java -cp KafkaOffsetMonitor-assembly-0.2 .1 .jar \
     com.quantifind.kafka.offsetapp.OffsetGetterWeb \
     --zk zk-server1,zk-server2 \
     --port 8080  \
     --refresh 10 .seconds \
     --retain 2 .days
上面我们已经做了实验, 所以在zookeeper应该有offset的值,你可以用zkCli.sh或者上面的命令查询一下:
1
2
get /consumers/group1/offsets/colobu/3 
39919 
因为auto.commit.enable默认为true, consumer会定时地将offset写入到zookeeper上(时间间隔由auto.commit.interval.ms决定,默认1分钟).
case 5 根据Kafka的文档, 参数auto.offset.reset的功能如下所示:
what to do if an offset is out of range.
如果offset is out of range, 这个参数决定了comsumer的行为:
smallest, 将offset设为当前所能用的最小的offset。 注意不一定是0。largest, 将offset设为当前可用的最大的offset。也就是consumer将只处理最新写入的消息。 默认值。
我们可以将zookeeper的 /consumers节点删除::
1
[zk: localhost:2181 (CONNECTED) 15 ] rmr /consumers
然后将的代码中加入一行
1
props.put("auto.offset.reset" , "smallest" );
或者
.. 1
2
props.put ("auto.offset.reset" , "largest" );
如果设置为largest,可以看到consumer不会处理以前发送的消息,只会处理新进的消息。smallest,consumer从头开始所有的消息。
case 6 默认情况下auto.commit.enable等于true,这也就意味着consumer会定期的commit offset。 前面也介绍了, zookeeper中节点中记录这这些offset。
首先我们先将zookeeper中把/consumers节点删除掉。ConsumerExample.java中增加下面一行:
1
props.put("auto.commit.enable" , "false" );
然后启动consumer:
1
java -jar kafka-0.1 .0 -SNAPSHOT.jar localhost:2181  group1 colobu 10 
过几分钟检查zookeeper的/consumers/group1节点,发现并没有offsets节点。
修改ConsumerTest.java的run方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public  void  run () {
		ConsumerIterator<byte [], byte []> it = stream.iterator();
		int  count = 0 ;
		while  (it.hasNext()) {
			
			System.out.println(System.currentTimeMillis() + ",Thread "  + threadNumber + ": "  + new  String(it.next().message()));
			count++;
			
			if  (count == 100 ) {
				consumer.commitOffsets();
				count = 0 ;
			}
		}
		
		System.out.println("Shutting down Thread: "  + threadNumber);
		
	}
当处理了100个消息后就commit offset一次。
1
java -jar kafka-0.1 .0 -SNAPSHOT.jar localhost:2181  group1 colobu 1 
先用producer发送99个消息,
1
java -cp kafka-0.1 .0 -SNAPSHOT.jar com.colobu.kafka.ProducerExample 99  colobu
查看zookeeper的节点/consumers/group1下依然没有offsets节点。
1
java -cp kafka-0.1 .0 -SNAPSHOT.jar com.colobu.kafka.ProducerExample 1  colobu
可以看到zookeeper的节点/consumers/group1下增加了offsets节点。
素以如果你有必要,你也可以手工控制offset的commit的时机。
case 7 这是我现在想利用Kafka实现的一个分布式DelayQueue的功能。跟着实例学习ZooKeeper的用法: 讲到,利用zookeeper可以实现Distributed Delay Queue ,但是Curator的作者也讲了 , zookeeper真心不适合做queue,尤其在数据量很大的情况。
可以利用redis的sorted set实现: Delay queue in redis ,
这里我利用Kafka实现这样的一个功能。Delayed Queue 
Each produced message should have a timestamp at which it was pushed to the queue. At the
我们将ConsumerTest的run方法修改如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public  void  run () {
	ConsumerIterator<byte [], byte []> it = stream.iterator();
	
	while  (it.hasNext()) {
		
		String msg = new  String(it.next().message());
		long  t = System.currentTimeMillis() - Long.parseLong(msg.substring(0 , msg.indexOf("," )));
		if  (t < 5  * 60  *1000 ) {
			try  {
				Thread.currentThread().sleep(5  * 60  *1000  -t);
			} catch  (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println(System.currentTimeMillis() + ",Thread "  + threadNumber + ": "  + msg);
	}
	
	System.out.println("Shutting down Thread: "  + threadNumber);
	
}
然后用producer发送100消息:
1
java -cp kafka-0.1 .0 -SNAPSHOT.jar com.colobu.kafka.ProducerExample 100  colobu
可以看到consumer5分钟后才处理这100个数据。
看起来方案可行。
case 8 注意offset是自动commit的。 在上面的例子中,如果sleep的过程中consumer重启,是否会有message丢失或者重复处理呢?
查看Kafka的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/** 
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package  kafka.consumer
import  kafka.utils.{IteratorTemplate, Logging, Utils}
import  java.util.concurrent.{TimeUnit, BlockingQueue}
import  kafka.serializer.Decoder
import  java.util.concurrent.atomic.AtomicReference
import  kafka.message.{MessageAndOffset, MessageAndMetadata}
import  kafka.common.{KafkaException, MessageSizeTooLargeException}
/** 
 * An iterator that blocks until a value can be read from the supplied queue.
 * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
 *
 */
class  ConsumerIterator [K , V ](private val channel: BlockingQueue[FetchedDataChunk], 
                             consumerTimeoutMs: Int,
                             private val keyDecoder: Decoder[K],
                             private val valueDecoder: Decoder[V],
                             val clientId: String)
  extends  IteratorTemplate[MessageAndMetadata[K, V]] with  Logging {
  private  var  current: AtomicReference[Iterator[MessageAndOffset]] = new  AtomicReference(null )
  private  var  currentTopicInfo: PartitionTopicInfo = null 
  private  var  consumedOffset: Long = -1 L
  private  val  consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
  override  def  next(): MessageAndMetadata[K, V] = {
    val  item = super .next()
    if (consumedOffset < 0 )
      throw  new  KafkaException("Offset returned by the message set is invalid %d" .format(consumedOffset))
    currentTopicInfo.resetConsumeOffset(consumedOffset)
    val  topic = currentTopicInfo.topic
    trace("Setting %s consumed offset to %d" .format(topic, consumedOffset))
    consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
    consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
    item
  }
  ......
}
只有在next获取下一个消息的情况下才会更新consumedOffset,这样的话最多只有一个消息被读取还没来得及处理就因为程序crash而丢掉了。 在我们可以接受的范围之内。
其它 InfoQ网站有几篇Kafka文章挺好的,如
另外本站也有几篇我的关于Kafka的文章可供参考。Tag: Kafka