Spark Streaming 集成 Kafka 总结

目录 [−]

  1. 一个简单例子
  2. 核心概念
  3. 从Kafka并行读取
  4. Spark并行处理
  5. 注意事项
  6. 容错
    1. worker节点失败
    2. driver节点失败
  7. 参考


最近在做利用Spark streaming和Kafka进行数据分析的研究, 整理一些相应的开发文档, 做了一些代码实践。 本文特意将这些资料记录下来。

本文最后列出了一些参考的文档,实际调研中参考了很多的资料,并没有完全将它们记录下来, 只列出了主要的一些参考资料。
当前的版本:

  • Spark: 1.2.0
  • Kafka: 0.8.1.1

Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。 有以下特点:

  • 易于使用
    提供了和批处理一致的高级操作API,可以进行map, reduce, join, window。

  • 容错
    Spark Streaming可以恢复你计算的状态, 包括lost work和operator state (比如 sliding windows)。 支持worker节点和driver 节点恢复。

  • Spark集成
    可以结合批处理流和交互式查询。 可以重用批处理的代码。还可以直接使用内置的机器学习算法、图算法包来处理数据。
    它可以接受来自文件系统, Akka actors, rsKafka, Flume, Twitter, ZeroMQ和TCP Socket的数据源或者你自己定义的输入源。

它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给Spark Engine处理最后生成该批次的结果流。

Spark Streaming提供了一个高级的抽象模型,叫做discretized stream或者叫做DStream,它代表了一个持续的数据流。DStream既可以从Kafka, Flume, 和 Kinesis中产生, 或者在其它DStream上应用高级操作得到。 内部实现上一个DStream代表一个RDD序列。

一个简单例子

在我们开始进入编写我们自己的Spark Streaming程序细节之前, 让我们先快速的看一个简单的Sparking Streaming程序是什么样子的。 这个程序接收网络发过来的文本数据,让我们统计一下文本中单词的数量。 全部代码如下:

首先, 我们导入Spark Streaming类名以及StreamingContext的一些隐式转换到我们的环境中, 这样可以为我们需要的类(比如DStream)增加一些有用的方法。. StreamingContext是所有功能的主入口。 我们创建了一个本地StreamingContext, 它使用两个线程, 批处理间隔为1秒.
1
2
3
4
5
6
7
8
9
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
使用这个context, 我们可以创建一个DStream, 代表来自TCP源的流数据。需要指定主机名和端口(如 localhost 和 9999).
1
2
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
这一行代表从数据服务器接受到的数据流. DStream中每条记录是一行文本. 接下来, 我们想使用空格分隔每一行,这样就可以得到文本中的单词。
1
2
// Split each line into words
val words = lines.flatMap(_.split(" "))
flatMap 是一个一对多的DStream操作, 它从源DStream中的每一个Record产生多个Record, 这些新产生的Record组成了一个新的DStream。 在我们的例子中, 每一行文本被分成了多个单词, 结果得到单词流DStream. 下一步, 我们想统计以下单词的数量.
1
2
3
4
5
6
7
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
单词DStream 被mapped (one-to-one transformation) 成*(word, 1)对*的DStream ,然后reduced 得到每一批单词的频度. 最后, wordCounts.print()会打印出每一秒产生的一些单词的统计值。 注意当这些行执行时,Spark Streaming仅仅设置这些计算, 它并没有马上被执行。 当所有的计算设置完后,我们可以调用下面的代码启动处理
1
2
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
完整的代码可以在例子 [NetworkWordCount](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala) 中找到. 如果你已经下载并编译了Spark, 你可以按照下面的命令运行例子. 你要先运行Netcat工具作为数据服务器
1
$ nc -lk 9999
然后, 在另一个终端中, 你可以启动例子
1
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
然后, 在netcat服务器中输入的每一行都会被统计,然后统计结果被输出到屏幕上。 类似下面的输出
1
2
3
4
5
6
7
8
9
10
# TERMINAL 2: RUNNING NetworkWordCount
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
首先我们创建一个JavaStreamingContext对象, 它是处理流的功能的主入口. 我们创建了一个本地的StreamingContext, 使用两个线程, 批处理间隔为1秒.
1
2
3
4
5
6
7
8
9
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1))
使用这个context, 我们可以创建一个DStream, 代表来自TCP源的流数据。需要指定主机名和端口(如 localhost 和 9999).
1
2
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
这一行代表从数据服务器接受到的数据流. DStream中每条记录是一行文本. 接下来, 我们想使用空格分隔每一行,这样就可以得到文本中的单词。
1
2
3
4
5
6
7
// Split each line into words
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
});
flatMap 是一个一对多的DStream操作, 它从源DStream中的每一个Record产生多个Record, 这些新产生的Record组成了一个新的DStream。 在我们的例子中, 每一行文本被分成了多个单词, 结果得到单词流DStream. 下一步, 我们想统计以下单词的数量.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.map(
new PairFunction<String, String, Integer>() {
@Override public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
单词DStream 被mapped (one-to-one transformation) 成*(word, 1)对*的DStream ,然后reduced 得到每一批单词的频度. 最后, wordCounts.print()会打印出每一秒产生的一些单词的统计值。 注意当这些行执行时,Spark Streaming仅仅设置这些计算, 它并没有马上被执行。 当所有的计算设置完后,我们可以调用下面的代码启动处理
1
2
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
完整的的代码看例子 [JavaNetworkWordCount](https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java). 如果你已经下载并编译了Spark, 你可以按照下面的命令运行例子. 你要先运行Netcat工具作为数据服务器
1
$ nc -lk 9999
然后, 在另一个终端中, 你可以启动例子
1
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
然后, 在netcat服务器中输入的每一行都会被统计,然后统计结果被输出到屏幕上。 类似下面的输出
1
2
3
4
5
6
7
8
9
10
# TERMINAL 2: RUNNING JavaNetworkWordCount
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...

核心概念

本节介绍一些Spark和Kafka的概念
Spark cluster:
一个Spark集群至少包含一个worker节点。

worker node:
一个工作节点可以执行一个或者多个executor.

executor:
executor就是一个进程, 负责启在一个worker节点上启动应用,运行task执行计算,存储数据到内存或者磁盘上。 每个Spark应用都有自己的executor。一个executor拥有一定数量的cores, 也被叫做“slots”, 可以执行指派给它的task。

job:
一个并行的计算单元,包含多个task。 在执行Spark action (比如 save, collect)产生; 在log中可以看到这个词。

task:
一个task就是一个工作单元, 可以发送给一个executor执行。 它执行你的应用的实际计算的部分工作。 每个task占用父executor的一个slot (core)。

stage:
每个job都被分隔成多个彼此依赖称之为stage的task(类似MapReduce中的map 和 reduce stage);

共享变量: 普通可序列化的变量复制到远程各个节点。在远程节点上的更新并不会返回到原始节点。因为我们需要共享变量。 Spark提供了两种类型的共享变量。

* Broadcast 变量。  SparkContext.broadcast(v)通过创建, **只读*** Accumulator: 累加器,通过SparkContext.accumulator(v)创建,在任务中只能调用add或者+操作,不能读取值。只有驱动程序才可以读取值。

receiver:
receiver长时间(可能7*24小时)运行在executor。 每个receiver负责一个 input DStream (例如 一个 读取Kafka消息的input stream)。 每个receiver, 加上input DStream会占用一个core/slot.

input DStream:
一个input DStream是一个特殊的DStream, 将Spark Streaming连接到一个外部数据源来读取数据。

kafka topic:
topic是发布消息发布的category 或者 feed名. 对于每个topic, Kafka管理一个分区的log,如下图所示:

分区内的消息都是有序不可变的。

kafka partition:
partitions的设计目的有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions(备注:基于sharding),来消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.

kafka consumer group:
在kafka中,每个消费者要标记自己在那个组中。
如果所有的消费者都在同一个组中,则类似传统的queue消息模式,消息只发给一个消费者。
如果消费者都在不同的组中, 则类似发布-订阅消息模式。 每个消费者都会得到所有的消息。
最通用的模式是混用这两种模式,如下图:

关于kafka和消费者线程, 遵循下面的约束:
如果你的消费者读取包含10个分区的 test的topic,

  • 如果你配置你的消费者只使用1个线程, 则它负责读取十个分区
  • 如果你配置你的消费者只使用5个线程, 则每个线程负责读取2个分区
  • 如果你配置你的消费者只使用10个线程, 则每个线程负责读取1个分区
  • 如果你配置你的消费者只使用14个线程, 则10个线程各负责读取1个分区,4个空闲
  • 如果你配置你的消费者只使用8个线程, 则6个线程个负责一个分区,2个线程各负责2个分区

从Kafka并行读取

有几种方法可以并行的读取Kafka的消息。

Spark的KafkaInputDStream (也叫做Kafka “connector”)使用 Kafka high-level consumer API读取数据,所以有两种方式可以并行的读取数据。

  • 多个input DStream: Spark为每个input dstream运行一个receiver. 这意味着多个input dstream可以运行在多个core上并行读取。 如果它们使用相同的topic,则相当于一个load balancer, 一个时间点上只有一个receiver读取。 如果不同的topic,可以同时读取。
1
2
3
4
5
val ssc: StreamingContext = ??? // ignore for now
val kafkaParams: Map[String, String] = Map("group.id" -> "test", /* ignore rest */)
val numInputDStreams = 5
val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }
  • 每个input dstream的消费者线程数。 同一个receiver可以运行多个线程。 可以配置和分区相同的线程。
1
2
3
4
5
6
val ssc: StreamingContext = ??? // ignore for now
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
val consumerThreadsPerInputDstream = 3
val topics = Map("test" -> consumerThreadsPerInputDstream)
val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)

或者你还可以混合这两种情况:

1
2
3
4
5
6
7
8
val ssc: StreamingContext = ???
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
val numDStreams = 5
val topics = Map("zerg.hydra" -> 1)
val kafkaDStreams = (1 to numDStreams).map { _ =>
KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
}

Spark并行处理

上面介绍了Kafka的并行化读取的控制,在Spark中我们可以进行并行化处理。类似Kafka,Spark将parallelism设置的与(RDD)分区数量有关, 通过在每个RDD分区上运行task进行。在有些文档中,分区仍然被称为“slices”。
同样两个控制手段:

  • input DStreams的数量
  • DStream transformation的重分配(repartition): 这里将获得一个全新的DStream,其parallelism等级可能增加、减少,或者保持原样。在DStream中每个返回的RDD都有指定的N个分区。DStream由一系列的RDD组成,DStream.repartition则是通过RDD.repartition实现。
    因此,repartition是从processing parallelism分隔read parallelism的主要途径。在这里,我们可以设置processing tasks的数量,也就是说设置处理过程中所有core的数量。间接上,我们同样设置了投入machines/NICs的数量。

一个DStream转换相关是 union。这个方法同样在StreamingContext中,它将从多个DStream中返回一个统一的DStream,它将拥有相同的类型和滑动时间。union会将多个 DStreams压缩到一个 DStreams或者RDD中,但是需要注意的是,这里的parallelism并不会发生改变。

你的用例将决定你如何分区。如果你的用例是CPU密集型的,你希望对test topic进行5 read parallelism读取。也就是说,每个消费者进程使用5个receiver,但是却可以将processing parallelism提升到20。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val ssc: StreamingContext = ???
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
val readParallelism = 5
val topics = Map("test" -> 1)
val kafkaDStreams = (1 to readParallelism).map { _ =>
KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
}
//> collection of five *input* DStreams = handled by five receivers/tasks
val unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it
//> single DStream
val processingParallelism = 20
val processingDStream = unionDStream(processingParallelism)
//> single DStream but now with 20 partitions

注意事项

在对Kafka进行读写上仍然存在一些含糊不清的问题,你可以在类似 Multiple Kafka Receivers and UnionHow to scale more consumer to Kafka stream mailing list的讨论中发现。

  • Spark 1.1并不会恢复那些已经接收却没有进行处理的原始数据(查看)。因此,在某些情况下,你的Spark可能会丢失数据。Tathagata Das指出驱动恢复问题会在Spark的1.2版本中解决,现在已经提供Reliable Receiver 和Unreliable Receiver两种Receiver。

  • 1.1版本中的Kafka连接器是基于Kafka的高级消费者API。这样就会造成一个问题,Spark Streaming不可以依赖其自身的KafkaInputDStream将数据从Kafka中重新发送,从而无法解决下游数据丢失问题(比如Spark服务器发生故障)。
    Dibyendu Bhattacharya 实现了使用简单消费者API: kafka-spark-consumer.

  • 使用最新的Spark和Kafka,一些bugs已经在最新的Spark和Kafka中修复。

  • 在使用window操作时,window duration和sliding duration必须是DStream批处理的duration的整数倍。

  • 如果分配给应用的core的数量小于或者等于input DStream/receiver数量,则系统只接收数据, 没有额外的core处理数据

  • 接上一条, 你在本地进行测试的时候,如果将master URL设置为“local”的话,则只有一个core运行任务,这明显违反上一条, 只能接收数据,无法处理。

  • Kafak Topic 的分区和 Spark RDD的分区没有任何关系。 它俩是分别设置的。

容错

有两种情况的机器失败。

worker节点失败

receiver接收到的消息在集群间有备份。如果只是一个节点失败, Spark可以恢复。
但是如果是receiver所在的那个节点失败,可能会有一点点数据丢失。 但是Receiver可以在其它节点上恢复启动,继续接收数据。

driver节点失败

如果7*24工作的应用, 如果driver节点失败,Spark Streaming也可以恢复。 Spark streaming定期的把元数据写到HDFS中。 你需要设置checkpoint 文件夹。
为了支持恢复,必须遵循下面的处理:

  1. 当应用首次启动时, 它会创建一个新的StreamingContext, 设置所有的流,然后启动start().
  2. 当应用因失败而恢复时, 它会从checkpoint文件中的checkpoint重建StreamingContext.

就像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Create a factory object that can create a and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override public JavaStreamingContext create() {
JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams
...
jssc.checkpoint(checkpointDirectory); // set checkpoint directory
return jssc;
}
};
// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start();
context.awaitTermination();

参考

  1. http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/
  2. https://spark.apache.org/docs/1.2.0/streaming-kafka-integration.html
  3. https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html