key为null时Kafka会将消息发送给哪个分区?

当你编写kafka Producer时, 会生成KeyedMessage对象。

1
KeyedMessage<K, V> keyedMessage = new KeyedMessage<>(topicName, key, message)

这里的key值可以为空,在这种情况下, kafka会将这个消息发送到哪个分区上呢?依据Kafka官方的文档, 默认的分区类会随机挑选一个分区:

The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.

但是这句话相当的误导人。

阅读全文

如何为ZooKeeper, Kafka 和 Spark 应用编写单元测试

ZooKeeper, Kafka和Spark是当下流行的大数据平台工具之一。这两年得到飞速的发展,国内厂商也越来越多的使用它们。
本站有多篇文章介绍了它们的开发指南, 如:

官方网站提供了很多的代码例子,互联网上也有很多的开发例子,你可以很容易的学习如果编写基于这些平台框架的技术。
但是如何为这些应用编写单元测试呢? 本文提供了几种编写单元测试的技术。

阅读全文

基于ALS算法的简易在线推荐系统

根据作者的后一篇文章的介绍,这应该是他在Intel做实习生的时候两个半月做个一个实习项目: 《Spark上流式机器学习算法实现”终期检查报告 》
原文地址: 基于ALS算法的简易在线推荐系统

继前期完成广义线性模型的在线流式机器学习的代码后,我们对spark的mllib中的推荐系统这一部分比较感兴趣,因为推荐系统这一部分在现实生活中也非常实用,尤其是基于地理位置的在线推荐系统目前非常火热,很多商业软件如大众点评,淘点点等都希望能根据用户以往的一些行为和当前所处的地理位置给用户做出最佳的推荐,给用户带来意想不到的惊喜。

在推荐系统领域,目前市面上中文的参考书并不多,我们主要学习了目前就职于hulu公司的项亮编著的《推荐系统实战》这本书,这本书详细的介绍了推荐系统方面的一些典型算法和评估方法,并且结合作者的实际经验给出了很多推荐系统的相关实例,是学习推荐系统不可多得的一本好书,我们也受益匪浅。

在spark的例程中作者是根据movielens数据库(采用spark自带的小型movielens数据库在spark的data/mllib/sample_movielens_data.txt中)通过ALS(alternating leastsquares)算法来做的推荐系统。参考链接 http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html

最初我们也是用上面所说的1500个的数据集进行在线ALS算法的有效性,效果还不错,后面我们采用中等规模的movielens数据集进行测试,取得比较好的效果,具体过程记录如下。数据集的链接如下http://grouplens.org/datasets/movielens/.

这种ALS算法不像基于用户或者基于物品的协同过滤算法一样,通过计算相似度来进行评分预测和推荐,而是通过矩阵分解的方法来进行预测用户对电影的评分。即如下图所示。

阅读全文

大数据计算新贵Spark在国内知名厂商中的应用汇总

MapReduce由于其设计上的约束只适合处理离线计算,在实时查询和迭代计算上仍有较大的不足,而随着业务的发展,业界对实时查询和迭代分析有更多的需求,单纯依靠MapReduce框架已经不能满足业务的需求了。Spark由于其可伸缩、基于内存计算等特点,且可以直接读写Hadoop上任何格式的数据,成为满足业务需求的最佳候选者。

Spark作为Apache顶级的开源项目,项目主页见http://spark.apache.org。在迭代计算,交互式查询计算以及批量流计算方面都有相关的子项目,如Shark、Spark Streaming、MLbase、GraphX、SparkR等。从13年起Spark开始举行了自已的Spark Summit会议,会议网址见http://spark-summit.org。Amplab实验室单独成立了独立公司Databricks来支持Spark的研发。

为了满足挖掘分析与交互式实时查询的计算需求,腾讯大数据使用了Spark平台来支持挖掘分析类计算、交互式实时查询计算以及允许误差范围的快速查询计算,目前腾讯大数据拥有超过200台的Spark集群,并独立维护Spark和Shark分支。Spark集群已稳定运行2年,他们积累了大量的案例和运营经验能力,另外多个业务的大数据查询与分析应用,已在陆续上线并稳定运行。在SQL查询性能方面普遍比MapReduce高出2倍以上,利用内存计算和内存表的特性,性能至少在10倍以上。在迭代计算与挖掘分析方面,精准推荐将小时和天级别的模型训练转变为Spark的分钟级别的训练,同时简洁的编程接口使得算法实现比MR在时间成本和代码量上高出许多。

阅读全文

Spark Streaming 集成 Kafka 总结


最近在做利用Spark streaming和Kafka进行数据分析的研究, 整理一些相应的开发文档, 做了一些代码实践。 本文特意将这些资料记录下来。

本文最后列出了一些参考的文档,实际调研中参考了很多的资料,并没有完全将它们记录下来, 只列出了主要的一些参考资料。
当前的版本:

  • Spark: 1.2.0
  • Kafka: 0.8.1.1

Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。 有以下特点:

  • 易于使用
    提供了和批处理一致的高级操作API,可以进行map, reduce, join, window。

  • 容错
    Spark Streaming可以恢复你计算的状态, 包括lost work和operator state (比如 sliding windows)。 支持worker节点和driver 节点恢复。

  • Spark集成
    可以结合批处理流和交互式查询。 可以重用批处理的代码。还可以直接使用内置的机器学习算法、图算法包来处理数据。
    它可以接受来自文件系统, Akka actors, rsKafka, Flume, Twitter, ZeroMQ和TCP Socket的数据源或者你自己定义的输入源。

    阅读全文

跟着实例学习ZooKeeper的用法: 文章汇总


跟着实例学习ZooKeeper的用法提供了全面的例子, 演示了Curator在项目中的应用,全面的介绍了ZooKeeper的Recipe的实现, 以及CuratorFramework的基本用法。
都是独立的, 你可以挑选感兴趣的章节阅读,方便在需要的时候查找。
欢迎在评论中提供意见和建议以及内容的错误,我会及时的更新。
以下是各章节的汇总。

所有的代码都可以在github下载。

leader选举

Leader latch

Leader Election

分布式锁

可重入锁Shared Reentrant Lock

不可重入锁Shared Lock

可重入读写锁Shared Reentrant Read Write Lock

信号量Shared Semaphore

多锁对象 Multi Shared Lock

Barriers

栅栏Barrier

双栅栏Double Barrier

计数器Counters

Shared Counter

Distributed Atomic Long

缓存Caches

Path Cache

Node Cache

Tree Cache

临时节点

队列Queues

Distributed Queue

Distributed Id Queue

Distributed Priority Queue

Distributed Delay Queue

Simple Distributed Queue

Curator框架应用

框架介绍

操作方法

事务

Curator扩展库

跟着实例学习ZooKeeper的用法: Curator扩展库

还记得Curator提供哪几个组件吗? 我们不妨回顾一下:

  • Recipes
  • Framework
  • Utilities
  • Client
  • Errors
  • Extensions

前面的例子其实前五个组件都涉及到了, 比如Utilities例子的TestServer, Client里的CuratorZookeeperClient, Errors里的ConnectionStateListener等。 还有最后一个组件我们还没有介绍,那就是Curator扩展组件。

Recipes组件包含了丰富的Curator应用的组件。 但是这些并不是ZooKeeper Recipe的全部。 大量的分布式应用已经抽象出了许许多多的的Recipe,其中有些还是可以通过Curator来实现。
如果不断都将这些Recipe都增加到Recipes中, Recipes会变得越来越大。 为了避免这种状况, Curator把一些其它的Recipe放在单独的包中, 命名方式就是curator-x-,比如curator-x-discovery, curator-x-rpc。
本文就是介绍curator-x-discovery。

阅读全文

跟着实例学习ZooKeeper的用法: 队列

Curator也提供ZK Recipe的分布式队列实现。 利用ZK的 PERSISTENTSEQUENTIAL节点, 可以保证放入到队列中的项目是按照顺序排队的。 如果单一的消费者从队列中取数据, 那么它是先入先出的,这也是队列的特点。 如果你严格要求顺序,你就的使用单一的消费者,可以使用leader选举只让leader作为唯一的消费者。

但是, 根据Netflix的Curator作者所说, ZooKeeper真心不适合做Queue,或者说ZK没有实现一个好的Queue,详细内容可以看 Tech Note 4, 原因有五:

  1. ZK有1MB 的传输限制。 实践中ZNode必须相对较小,而队列包含成千上万的消息,非常的大。
  2. 如果有很多节点,ZK启动时相当的慢。 而使用queue会导致好多ZNode. 你需要显著增大 initLimit 和 syncLimit.
  3. ZNode很大的时候很难清理。Netflix不得不创建了一个专门的程序做这事。
  4. 当很大量的包含成千上万的子节点的ZNode时, ZK的性能变得不好
  5. ZK的数据库完全放在内存中。 大量的Queue意味着会占用很多的内存空间。

尽管如此, Curator还是创建了各种Queue的实现。 如果Queue的数据量不太多,数据量不太大的情况下,酌情考虑,还是可以使用的。

阅读全文