目录 [−]
这一篇文章我们将学习使用Curator来实现计数器。 顾名思义,计数器是用来计数的, 利用ZooKeeper可以实现一个集群共享的计数器。 只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数,一个用long来计数。
SharedCount
这个类使用int类型来计数。 主要涉及三个类。
- SharedCount
- SharedCountReader
- SharedCountListener
SharedCount
代表计数器, 可以为它增加一个SharedCountListener,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader可以读取到最新的值, 包括字面值和带版本信息的值VersionedValue。
例子代码:
|
|
在这个例子中,我们使用baseCount
来监听计数值(addListener
方法)。 任意的SharedCount, 只要使用相同的path,都可以得到这个计数值。
然后我们使用5个线程为计数值增加一个10以内的随机数。
|
|
这里我们使用trySetCount
去设置计数器。 第一个参数提供当前的VersionedValue,如果期间其它client更新了此计数值, 你的更新可能不成功,
但是这时你的client更新了最新的值,所以失败了你可以尝试再更新一次。
而setCount
是强制更新计数器的值。
注意计数器必须start
,使用完之后必须调用close
关闭它。
在这里再重复一遍前面讲到的, 强烈推荐你监控ConnectionStateListener
, 尽管我们的有些例子没有监控它。 在本例中SharedCountListener
扩展了ConnectionStateListener
。 这一条针对所有的Curator recipes都适用,后面的文章中就不专门提示了。
DistributedAtomicLong
再看一个Long类型的计数器。 除了计数的范围比SharedCount
大了之外, 它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用InterProcessMutex
方式来更新计数值。 还记得InterProcessMutex
是什么吗? 它是我们前面跟着实例学习ZooKeeper的用法: 分布式锁讲的分布式可重入锁。 这和上面的计数器的实现有显著的不同。
可以从它的内部实现DistributedAtomicValue.trySet
中看出端倪。
|
|
此计数器有一系列的操作:
- get(): 获取当前值
- increment(): 加一
- decrement(): 减一
- add(): 增加特定的值
- subtract(): 减去特定的值
- trySet(): 尝试设置计数值
- forceSet(): 强制设置计数值
你必须检查返回结果的succeeded()
, 它代表此操作是否成功。 如果操作成功, preValue()
代表操作前的值, postValue()
代表操作后的值。
我们下面的例子中使用5个线程对计数器进行加一操作,如果成功,将操作前后的值打印出来。
|
|
下一篇文章我们将学习缓存Cache的使用。