Curator连接状态源码浅析
Curator zookeeper
Curator作为操作Zookeeper的优秀工具,网上有很多的文章介绍和实例,我这里就不再介绍了。
我这篇主要想写写最近看Curator源码一点总结,关于ZooKeeper连接不上时,如何控制连接状态的源码。
Curator大体类结构(主要是和重试相关的类):
(家里电脑没有UML工具,不想装,欠着)
CuratorFrameworkImpl->CuratorZookeeperClient->ConnectionState->HandleHolder->Helper->ZooKeeper(zk客户端原生)
先看如何在连接断开后,更新连接状态。
ZooKeeper连接建立时使用ConnectionState作为watcher监听zk状态,当连接断开后,zk通知watcher(ConnectionState),执行process,更新状态。
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; class ConnectionState implements Watcher, Closeable{ @Override public void process(WatchedEvent event){ if ( LOG_EVENTS ) { log.debug("ConnectState watcher: " + event); } for ( Watcher parentWatcher : parentWatchers ) { TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get()); parentWatcher.process(event); timeTrace.commit(); } boolean wasConnected = isConnected.get(); boolean newIsConnected = wasConnected; if ( event.getType() == Watcher.Event.EventType.None ) { newIsConnected = checkState(event.getState(), wasConnected); } if ( newIsConnected != wasConnected ) { isConnected.set(newIsConnected); connectionStartMs = System.currentTimeMillis(); } } private boolean checkState(Event.KeeperState state, boolean wasConnected) { boolean isConnected = wasConnected; boolean checkNewConnectionString = true; switch ( state ) { default: case Disconnected: { isConnected = false; break; } case SyncConnected: case ConnectedReadOnly: { isConnected = true; break; } case AuthFailed: { isConnected = false; log.error("Authentication failed"); break; } case Expired: { isConnected = false; checkNewConnectionString = false; handleExpiredSession(); break; } case SaslAuthenticated: { // NOP break; } } if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() ) { handleNewConnectionString(); } return isConnected; } ...... } |
下面来看一个getData操作
org.apache.curator.framework.imps.CuratorFrameworkImpl.getData()
@Override public GetDataBuilder getData() { Preconditions.checkState(isStarted(), "instance must be started before calling this method"); return new GetDataBuilderImpl(this); } |
无论GetData还是SetData都有对应Builder实现类,实现相应的操作。
我们来看看GetDataBuilderImpl的代码。
org.apache.curator.framework.imps.GetDataBuilderImpl
@Override public byte[] forPath(String path) throws Exception { path = client.fixForNamespace(path); byte[] responseData = null; if ( backgrounding.inBackground() ) { client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null); } else { responseData = pathInForeground(path); } return responseData; } |
我这里只是看看非后台执行的逻辑,后台执行代码,请小伙伴自己查看。
private byte[] pathInForeground(final String path) throws Exception { TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Foreground"); byte[] responseData = RetryLoop.callWithRetry ( client.getZookeeperClient(), new Callable<byte[]>() { @Override public byte[] call() throws Exception { byte[] responseData; if ( watching.isWatched() ) { responseData = client.getZooKeeper().getData(path, true, responseStat); } else { responseData = client.getZooKeeper().getData(path, watching.getWatcher(), responseStat); } return responseData; } } ); trace.commit(); return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData; } |
RetryLoop.callWithRetry代码如下:
public static<T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception { T result = null; RetryLoop retryLoop = client.newRetryLoop(); while ( retryLoop.shouldContinue() ) { try { client.internalBlockUntilConnectedOrTimedOut(); result = proc.call(); retryLoop.markComplete(); } catch ( Exception e ) { retryLoop.takeException(e); } } return result; } |
上面函数中client.internalBlockUntilConnectedOrTimedOut()这句代码,就是测试当前状态是否处于连接态,如果不是就一直等待,一直到Timeout时间用完.
超过了Timeout时间,状态还是非连接态。这时仍然调用client.getZooKeeper().getData()仿佛,之所以这么设计,我分析原因,如果非连接态直接调用getZooKeeper()会触发线程加锁及重试连接的过程。加锁是一个相对比较重型操作,线程的事前重试比阻塞相对效果更好。
重点是下面这个方法:
org.apache.curator.ConnectionState.getZooKeeper()
ZooKeeper getZooKeeper() throws Exception { if ( SessionFailRetryLoop.sessionForThreadHasFailed() ) { throw new SessionFailRetryLoop.SessionFailedException(); } Exception exception = backgroundExceptions.poll(); if ( exception != null ) { log.error("Background exception caught", exception); tracer.get().addCount("background-exceptions", 1); throw exception; } boolean localIsConnected = isConnected.get(); if ( !localIsConnected ) { checkTimeouts(); } return zooKeeper.getZooKeeper(); } |
判断如果是非连接状态,就会调用checkTimeouts方法进行检查,重试。
checkTimeouts逻辑:
1.判断是否到了最小的连接过期时间。没有到,先不处理,证明刚才的连接检查可能不准确。
2.判断是否有新的zk地址。有则重新设定环境后进行重连。
3.如果过了最大Session过期时间(证明这个连接,时间太长了,已经过期了),需要重连。如没有大于Session过期时间,连接还是断开的,抛出异常。
private synchronized void checkTimeouts() throws Exception { int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs); long elapsed = System.currentTimeMillis() - connectionStartMs; if ( elapsed >= minTimeout ) { if ( zooKeeper.hasNewConnectionString() ) { handleNewConnectionString(); } else { int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs); if ( elapsed > maxTimeout ) { if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) { log.warn(String.format("Connection attempt unsuccessful after %d (greater than max timeout of %d). Resetting connection and trying again with a new connection.", elapsed, maxTimeout)); } reset(); } else { KeeperException.ConnectionLossException connectionLossException = new CuratorConnectionLossException(); if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) { log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException); } tracer.get().addCount("connections-timed-out", 1); throw connectionLossException; } } } } |
private synchronized void reset() throws Exception { log.debug("reset"); instanceIndex.incrementAndGet(); isConnected.set(false); connectionStartMs = System.currentTimeMillis(); zooKeeper.closeAndReset(); zooKeeper.getZooKeeper(); // initiate connection } |
更多推荐
所有评论(0)