目录 [−]
本文以单机的环境演示如何将Kafka和Spring集成。
单机的环境最容易搭建, 并且只需在自己的PC上运行即可, 不需要很多的硬件环境,便于学习。 况且,本文的目的不是搭建ZooKeeper的集群环境, 而是重点介绍Kafka和Spring的应用。
具体的软件环境如下:
- OS: CentOS 6.4
- Zookepper: zookeeper-3.4.6
- Kafka: kafka_2.9.1-0.8.2-beta
- Java: JDK 1.7.0_45-b18
- Spring:4.0.6
本例子在我的这个环境中运行正常, 全部代码可以到 github 下载。
本文所有的操作系统用户都是root
。 实际产品中可能安全标准需要特定的用户如zookeeper
, kafka
等。
安装Zookeeper
首先下载解压zookeeper,选择合适的镜像站点以加快下载速度。
我们可以将zookeeper加到系统服务中, 增加一个/etc/init.d/zookeeper
文件。
|
|
将https://raw.githubusercontent.com/apache/zookeeper/trunk/src/packages/rpm/init.d/zookeeper文件的内容拷贝到这个文件,修改其中的运行zookeeper的用户以及zookeeper的文件夹位置。
|
|
|
|
如果你不想加到服务,也可以直接运行zookeeper。
|
|
安装Kafka
从合适的镜像站点下载最新的kafka并解压。
|
|
启动Kafka:
|
|
创建一个test
的topic:
|
|
可以利用kafka的命令启动一个生产者和消费者试验一下:
|
|
|
|
更多的介绍可以查看我翻译整理的 Kafka快速入门
创建一个Spring项目
以上的准备环境完成,让我们开始创建一个项目。
以前我写过一篇简单介绍: Spring 集成 Kafka.
spring-integration-kafka这个官方框架我就不介绍了。 我们主要使用它做集成。
首先我们先看一下使用Kafka自己的Producer/Consumer API发送/接收消息的例子。
使用Producer API发送消息到Kafka
OK,现在我们先看一个使用Kafka 自己的producer API发送消息的例子:
|
|
这个例子中首先初始化Producer对象,指定相应的broker和serializer, 然后发送100个字符串消息给Kafka。
运行mvn package
编译代码,执行查看结果:
|
|
上面的消费者控制台窗口会打印出收到的消息:
|
|
使用Kafka High Level API接收消息
用High level Consumer API接收消息,
|
|
在生产者控制台输入几条消息,可以看到运行这个例子的控制台可以将这些消息打印出来。
教程的代码中还包括一个使用Simple Consumer API接收消息的例子。 因为spring-integration-kafka不支持这种API,这里也不列出对比代码了。
使用spring-integration-kafka发送消息
Outbound Channel Adapter用来发送消息到Kafka。 消息从Spring Integration Channel中读取。 你可以在Spring application context指定这个channel。
一旦配置好这个Channel,就可以利用这个Channel往Kafka发消息。 明显地,Spring Integration特定的消息发送给这个Adaptor,然后发送前在内部被转为Kafka消息。当前的版本要求你必须指定消息key和topic作为头部数据 (header),消息作为有载荷(payload)。
例如
|
|
实际代码如下:
|
|
Spring 配置文件:
|
|
int:channel
是配置Spring Integration Channel, 此channel基于queue。int-kafka:outbound-channel-adapter
是outbound-channel-adapter对象, 内部使用一个线程池处理消息。关键是kafka-producer-context-ref
。int-kafka:producer-context
配置producer列表,要处理的topic,这些Producer最终要转换成Kafka的Producer。
producer的配置参数如下:
|
|
value-encoder 和key-encoder可以是其它实现了Kafka Encoder接口的Bean。同样partitioner也是实现了Kafka的Partitioner接口的Bean。
一个Encoder的例子:
|
|
Spring Integration Kafka 也提供了个基于Avro的Encoder。 Avro也是Apache的一个项目, 在大数据处理时也是一个常用的序列化框架。
不指定Encoder将使用Kafka缺省的Encoder (kafka.serializer.DefaultEncoder, byte[] --> same byte[])。
producerProperties
可以用来设置配置属性进行调优。配置属性列表请参考 http://kafka.apache.org/documentation.html#producerconfigs
使用spring-integration-kafka接收消息
同样的原理实现一个消费者:
|
|
Spring的配置文件如下:
|
|
这个配置和Producer类似, 同样声明一个channel, 定义inbound-channel-adapter
, 它引用Bean kafka-consumer-context
,kafka-consumer-context
定义了消费者的列表。 consumer-configuration
还提供了topic-filter
,使用正则表达式建立白名单或者黑名单(exclude属性)。
消费者上下文还需要zookeeper-connect
。
由于spring-integration-kafka只实现了high level Consumer API,这也就意味着你不可能回滚重新查看以前的消息, 因为high level API不提供offset管理。
注意Channel中得到的有效负载的类型是:Map<String, Map<Integer, List<Object>>>
,
这个Map的key是topic, 值还是另外的Map对象。
这个值Map的key值是分区号,value值是消息列表。 在本例中由于消息是字符串, 转换成了byte[]数组。
这种复杂的结构是由于Kafka的设计造成的。 Kafka保证对于同一个topic的同一个分区的消息是严格有序的。所有这种数据结构可以提供有序的消息。