Spymemcached的一个坑

当Memcached集群中的一个或者多个节点宕机时

Spymemcached 是 Memcached 的一个流行的Java client库(另一个比较著名的是原淘宝的 伯岩/庄晓丹 开发的XMemcached, 他也开发另一个Taobao开源的项目 Metamorphosis),性能表现出色,广泛应用于Java + Memcached 项目中。
Spymemcached 最早由 Dustin Sallings 开发,Dustin 后来和别人一起创办了Couchbase (原NorthScale),职位为首席架构师。2014加入Google。

本身Memcached没有集群的功能,客户端可以根据不同的key值set/get到不同的Memcached的节点上。 一致性Hash算法可以将数据均衡地分配到各个节点,并且在节点加入和退出的时候可以很好地将失效节点上的数据均衡的分配给其它节点。 Spymemcached使用Ketama算法。

但是,当 memcached 集群的一个节点因为某种原因宕机的时候,spymemcached 并没有正确的选择到另外一个live的节点,而是直接失败:

1
2
3
4
5
6
7
8
9
2015-11-23 05:56:20.942 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108182.
2015-11-23 05:56:20.944 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108254.
2015-11-23 05:56:20.946 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108341.
2015-11-23 05:56:20.947 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108352.
2015-11-23 05:56:20.947 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108381.
2015-11-23 05:56:20.948 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108407.
2015-11-23 05:56:20.950 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108480.
2015-11-23 05:56:20.952 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108552.
2015-11-23 05:56:20.954 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108608.

如果使用XMemcached,则没有这种现象。
spymemcached已经设置为一致性Hash的模式:

1
2
3
4
5
6
......
ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder();
builder.setHashAlg(DefaultHashAlgorithm.KETAMA_HASH);
builder.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT);
builder.setFailureMode(FailureMode.Redistribute);
......

原因在于当key对应的节点(称之为primary node宕机的时候),spymemcached会有限地选择另外一个节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (primary.isActive() || failureMode == FailureMode.Retry) {
placeIn = primary;
} else if (failureMode == FailureMode.Cancel) {
o.cancel();
} else {
Iterator<MemcachedNode> i = locator.getSequence(key);
while (placeIn == null && i.hasNext()) {
MemcachedNode n = i.next();
if (n.isActive()) {
placeIn = n;
}
}
if (placeIn == null) {
placeIn = primary;
this.getLogger().warn("Could not redistribute to another node, " + "retrying primary node for %s.", key);
}
}

其中locator.getSequence(key)最多会提供7个备选虚拟节点。

1
2
3
4
5
public Iterator<MemcachedNode> getSequence(String k) {
// Seven searches gives us a 1 in 2^7 chance of hitting the
// same dead node all of the time.
return new KetamaIterator(k, 7, getKetamaNodes(), hashAlg);
}

但是,依照他的注释,在宕机的情况下,大约会有1/128的几率这七个虚拟节点都会指向这个宕机的primary node。
实际上,下面的代码百分百会选择不到那个live ("127.0.0.1:11211")的节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void testMissingNode2() {
List<MemcachedNode> nodes = new ArrayList<MemcachedNode>();
nodes.add(createMockNode(new InetSocketAddress("127.0.0.1", 11211)));
nodes.add(createMockNode(new InetSocketAddress("127.0.0.1", 11311)));
KetamaNodeLocator locator = new KetamaNodeLocator(nodes, DefaultHashAlgorithm.KETAMA_HASH);
Iterator<MemcachedNode> i = locator.getSequence("ff-108552");
Set<MemcachedNode> foundNodes = new HashSet<MemcachedNode>();
while (i.hasNext()) {
foundNodes.add(i.next());
}
// This fails. 127.0.0.1:11211 is never found.
for (MemcachedNode node: nodes) {
Assert.assertTrue(foundNodes.contains(node));
}
}
private MemcachedNode createMockNode(InetSocketAddress sock) {
MemcachedNode mockNode = EasyMock.createMock(MemcachedNode.class);
EasyMock.expect(mockNode.getSocketAddress()).andReturn(sock).anyTimes();
EasyMock.replay(mockNode);
return mockNode;
}

事实上,google groups上也有讨论, 原spymemcached的bug管理系统上也有相关的bug,但是问题并没有解决。
导致的问题是,某些缓存项在某个memcached节点宕机的时候,不能利用缓存系统,只能从其它持久化系统比如数据库中获取值。

问题找到,解决办法也就有了,修改getSequence方法,提供更多的节点共选择:

1
2
3
4
5
6
7
8
9
public Iterator<MemcachedNode> getSequence(String k) {
// return new KetamaIterator(k, 7, getKetamaNodes(), hashAlg);
int maxTry = config.getNodeRepetitions() + 1;
if (maxTry < 20) {
maxTry = 20;
}
return new KetamaIterator(k, maxTry, getKetamaNodes(), hashAlg);
}

另外,如何在运行时动态地增加新的memcached节点? 这篇文章给出了一个解决方案
你不得不重载MemcachedClient, MemcachedConnection 和 DefaultConnectionFactory。作者未测试,不保证work。不管怎样,倒是一个思路。
ExtMemCachedConnection.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class ExtMemCachedConnection extends MemcachedConnection {
protected final OperationFactory opFact;
/**
* Construct a memcached connection.
*
* @param bufSize the size of the buffer used for reading from the server
* @param f the factory that will provide an operation queue
* @param a the addresses of the servers to connect to
* @throws java.io.IOException if a connection attempt fails early
*/
public ExtendableMemcachedConnection(int bufSize, ConnectionFactory f,
List<InetSocketAddress> a,
Collection<ConnectionObserver> obs,
FailureMode fm, OperationFactory opfactory)
throws IOException {
super(bufSize, f, a, obs, fm, opfactory);
this.opFact = opfactory;
}
public void add(InetSocketAddress nodeAddress) throws IOException {
final List<InetSocketAddress> nodeToAdd = new ArrayList<InetSocketAddress>(1);
nodeToAdd.add(nodeAddress);
List<MemcachedNode> newNodesList = createConnections(nodeToAdd);
newNodesList.addAll(getLocator().getAll());
getLocator().updateLocator(newNodesList);
}
//The node should be obtain from locator to ensure currentNode.equals(node) will return true
public void remove(MemcachedNode node) throws IOException {
for(MemcachedNode currentNode : getLocator().getAll()) {
if(currentNode.equals(node)) {
Collection<Operation> notCompletedOperations = currentNode.destroyInputQueue();
if (currentNode.getChannel() != null) {
currentNode.getChannel().close();
currentNode.setSk(null);
if (currentNode.getBytesRemainingToWrite() > 0) {
getLogger().warn("Shut down with %d bytes remaining to write",
currentNode.getBytesRemainingToWrite());
}
getLogger().debug("Shut down channel %s", currentNode.getChannel());
}
//Unfortunatelly, redistributeOperations is private so it cannot be used or override. I put copy/paste the implementation
redistributeOperations(notCompletedOperations);
}
}
}
protected void redistributeOperations(Collection<Operation> ops) {
for (Operation op : ops) {
if (op.isCancelled() || op.isTimedOut()) {
continue;
}
if (op instanceof KeyedOperation) {
KeyedOperation ko = (KeyedOperation) op;
int added = 0;
for (String k : ko.getKeys()) {
for (Operation newop : opFact.clone(ko)) {
addOperation(k, newop);
added++;
}
}
assert added > 0 : "Didn't add any new operations when redistributing";
} else {
// Cancel things that don't have definite targets.
op.cancel();
}
}
}
}

ExtMemcachedClient.java

1
2
3
4
5
6
7
8
9
10
public void add(InetSocketAddress nodeAddress) {
if(mconn instanceof ExtMemcachedConnection) {
((ExtMemcachedConnection)mconn).add(nodeAddress);
}
}
public boolean remove(MemcachedNode node) {
if(mconn instanceof ExtMemcachedConnection) {
((ExtMemcachedConnection)mconn).remove(nodeAddress);
}
}

ExtMemcachedConnectionfactory.java

1
2
3
4
5
@Override
public MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException {
return new ExtendableMemcachedConnection(getReadBufSize(), this, addrs,
getInitialObservers(), getFailureMode(), getOperationFactory());
}