2015-11-23 05:56:20.942 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108182. 2015-11-23 05:56:20.944 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108254. 2015-11-23 05:56:20.946 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108341. 2015-11-23 05:56:20.947 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108352. 2015-11-23 05:56:20.947 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108381. 2015-11-23 05:56:20.948 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108407. 2015-11-23 05:56:20.950 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108480. 2015-11-23 05:56:20.952 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108552. 2015-11-23 05:56:20.954 WARN net.spy.memcached.MemcachedConnection: Could not redistribute to another node, retrying primary node for ff-108608.
if (primary.isActive() || failureMode == FailureMode.Retry) { placeIn = primary; } elseif (failureMode == FailureMode.Cancel) { o.cancel(); } else { Iterator<MemcachedNode> i = locator.getSequence(key); while (placeIn == null && i.hasNext()) { MemcachedNoden= i.next(); if (n.isActive()) { placeIn = n; } } if (placeIn == null) { placeIn = primary; this.getLogger().warn("Could not redistribute to another node, " + "retrying primary node for %s.", key); } }
其中locator.getSequence(key)最多会提供7个备选虚拟节点。
1 2 3 4 5
public Iterator<MemcachedNode> getSequence(String k) { // Seven searches gives us a 1 in 2^7 chance of hitting the // same dead node all of the time. returnnewKetamaIterator(k, 7, getKetamaNodes(), hashAlg); }
publicclassExtMemCachedConnectionextendsMemcachedConnection { protectedfinal OperationFactory opFact; /** * Construct a memcached connection. * * @param bufSize the size of the buffer used for reading from the server * @param f the factory that will provide an operation queue * @param a the addresses of the servers to connect to * @throws java.io.IOException if a connection attempt fails early */ publicExtendableMemcachedConnection(int bufSize, ConnectionFactory f, List<InetSocketAddress> a, Collection<ConnectionObserver> obs, FailureMode fm, OperationFactory opfactory) throws IOException { super(bufSize, f, a, obs, fm, opfactory); this.opFact = opfactory; } publicvoidadd(InetSocketAddress nodeAddress)throws IOException { final List<InetSocketAddress> nodeToAdd = newArrayList<InetSocketAddress>(1); nodeToAdd.add(nodeAddress); List<MemcachedNode> newNodesList = createConnections(nodeToAdd); newNodesList.addAll(getLocator().getAll()); getLocator().updateLocator(newNodesList); } //The node should be obtain from locator to ensure currentNode.equals(node) will return true publicvoidremove(MemcachedNode node)throws IOException { for(MemcachedNode currentNode : getLocator().getAll()) { if(currentNode.equals(node)) { Collection<Operation> notCompletedOperations = currentNode.destroyInputQueue(); if (currentNode.getChannel() != null) { currentNode.getChannel().close(); currentNode.setSk(null); if (currentNode.getBytesRemainingToWrite() > 0) { getLogger().warn("Shut down with %d bytes remaining to write", currentNode.getBytesRemainingToWrite()); } getLogger().debug("Shut down channel %s", currentNode.getChannel()); } //Unfortunatelly, redistributeOperations is private so it cannot be used or override. I put copy/paste the implementation redistributeOperations(notCompletedOperations); } } } protectedvoidredistributeOperations(Collection<Operation> ops) { for (Operation op : ops) { if (op.isCancelled() || op.isTimedOut()) { continue; } if (op instanceof KeyedOperation) { KeyedOperationko= (KeyedOperation) op; intadded=0; for (String k : ko.getKeys()) { for (Operation newop : opFact.clone(ko)) { addOperation(k, newop); added++; } } assert added > 0 : "Didn't add any new operations when redistributing"; } else { // Cancel things that don't have definite targets. op.cancel(); } } } }