ZooKeeper, Kafka和Spark是当下流行的大数据平台工具之一。这两年得到飞速的发展,国内厂商也越来越多的使用它们。
本站有多篇文章介绍了它们的开发指南, 如:
官方网站提供了很多的代码例子,互联网上也有很多的开发例子,你可以很容易的学习如果编写基于这些平台框架的技术。
但是如何为这些应用编写单元测试呢? 本文提供了几种编写单元测试的技术。
ZooKeeper, Kafka和Spark是当下流行的大数据平台工具之一。这两年得到飞速的发展,国内厂商也越来越多的使用它们。
本站有多篇文章介绍了它们的开发指南, 如:
官方网站提供了很多的代码例子,互联网上也有很多的开发例子,你可以很容易的学习如果编写基于这些平台框架的技术。
但是如何为这些应用编写单元测试呢? 本文提供了几种编写单元测试的技术。
跟着实例学习ZooKeeper的用法提供了全面的例子, 演示了Curator在项目中的应用,全面的介绍了ZooKeeper的Recipe的实现, 以及CuratorFramework的基本用法。
都是独立的, 你可以挑选感兴趣的章节阅读,方便在需要的时候查找。
欢迎在评论中提供意见和建议以及内容的错误,我会及时的更新。
以下是各章节的汇总。
所有的代码都可以在github下载。
还记得Curator提供哪几个组件吗? 我们不妨回顾一下:
前面的例子其实前五个组件都涉及到了, 比如Utilities例子的TestServer, Client里的CuratorZookeeperClient, Errors里的ConnectionStateListener等。 还有最后一个组件我们还没有介绍,那就是Curator扩展组件。
Recipes组件包含了丰富的Curator应用的组件。 但是这些并不是ZooKeeper Recipe的全部。 大量的分布式应用已经抽象出了许许多多的的Recipe,其中有些还是可以通过Curator来实现。
如果不断都将这些Recipe都增加到Recipes中, Recipes会变得越来越大。 为了避免这种状况, Curator把一些其它的Recipe放在单独的包中, 命名方式就是curator-x-
本文就是介绍curator-x-discovery。
前面的几篇文章介绍了一些ZooKeeper的应用方法, 本文将介绍Curator访问ZooKeeper的一些基本方法, 而不仅仅限于指定的Recipes, 你可以使用Curator API任意的访问ZooKeeper。
Curator也提供ZK Recipe的分布式队列实现。 利用ZK的 PERSISTENTSEQUENTIAL节点, 可以保证放入到队列中的项目是按照顺序排队的。 如果单一的消费者从队列中取数据, 那么它是先入先出的,这也是队列的特点。 如果你严格要求顺序,你就的使用单一的消费者,可以使用leader选举只让leader作为唯一的消费者。
但是, 根据Netflix的Curator作者所说, ZooKeeper真心不适合做Queue,或者说ZK没有实现一个好的Queue,详细内容可以看 Tech Note 4, 原因有五:
尽管如此, Curator还是创建了各种Queue的实现。 如果Queue的数据量不太多,数据量不太大的情况下,酌情考虑,还是可以使用的。
使用Curator也可以简化Ephemeral Node (临时节点)的操作。
临时节点驻存在ZooKeeper中,当连接和session断掉时被删除。
比如通过ZooKeeper发布服务,服务启动时将自己的信息注册为临时节点,当服务断掉时ZooKeeper将此临时节点删除,这样client就不会得到服务的信息了。
可以利用ZooKeeper在集群的各个节点之间缓存数据。 每个节点都可以得到最新的缓存的数据。 Curator提供了三种类型的缓存方式:Path Cache,Node Cache 和Tree Cache。
这一篇文章我们将学习使用Curator来实现计数器。 顾名思义,计数器是用来计数的, 利用ZooKeeper可以实现一个集群共享的计数器。 只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数,一个用long来计数。
分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。
比如赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,所有的赛马都飞奔而出。
DistributedBarrier
类实现了栅栏的功能。 它的构造函数如下:
|
|
首先你需要设置栅栏,它将阻塞在它上面等待的线程:
|
|
然后需要阻塞的线程调用方法等待放行条件:
|
|
当条件满足时,移除栅栏,所有等待的线程将继续执行:
|
|
异常处理
DistributedBarrier 会监控连接状态,当连接断掉时waitOnBarrier()
方法会抛出异常。
看一个例子:
|
|
这个例子创建了controlBarrier
来设置栅栏和移除栅栏。
我们创建了5个线程,在此Barrier上等待。
最后移除栅栏后所有的线程才继续执行。
如果你开始不设置栅栏,所有的线程就不会阻塞住。
双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。
双栅栏类是DistributedDoubleBarrier
。
构造函数为:
|
|
memberQty
是成员数量,当enter
方法被调用时,成员被阻塞,直到所有的成员都调用了enter
。 当leave
方法被调用时,它也阻塞调用线程, 知道所有的成员都调用了leave
。
就像百米赛跑比赛, 发令枪响, 所有的运动员开始跑,等所有的运动员跑过终点线,比赛才结束。
DistributedBarrier 会监控连接状态,当连接断掉时enter()
和leave
方法会抛出异常。
例子代码:
|
|
分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。
上一篇 跟着实例学习ZooKeeper的用法: Leader选举介绍了ZooKeeper Leader选举的两种用法,这一篇文章我们将介绍分布式锁(Lock)的实现。