假定:主机 A, B 通过 tcp 连接发送数据,如果拔掉 A 主机的网线,B 是无法感知到的。但是如果 A 定时给 B 发送心跳,则能根据心跳的回复来判断连接的状态。
以 zookeeper 为例:zk client 会记录上一次发送数据的时间(lastSend)和上一次接收数据的时间(lastHeard),zk client 给 server 发送心跳(ping),这些心跳和其他命令一起发送给 zk server,如果 zk client 发现好长的时间没有接收到数据,认为超时,则断开与 server 的连接,并重连服务器。
// zookeeper 3.3.3 // void org.apache.zookeeper.ClientCnxn.SendThread.run() public void run() { long now = System.currentTimeMillis(); long lastHeard = now; long lastSend = now; // 这里的 zooKeeper 是客户端 while (zooKeeper.state.isAlive()) { try { if (sockKey == null) { // don't re-establish connection if we are closing if (closing) { break; } // 连接 zk server startConnect(); lastSend = now; lastHeard = now; } int idleRecv = (int) (now - lastHeard); int idleSend = (int) (now - lastSend); int to = readTimeout - idleRecv; if (zooKeeper.state != States.CONNECTED) { to = connectTimeout - idleRecv; } // 接收数据超时,抛异常,异常会在后面的 catch 块中处理 if (to <= 0) { throw new SessionTimeoutException( "Client session timed out, have not heard from server in " + idleRecv + "ms" + " for sessionid 0x" + Long.toHexString(sessionId)); } if (zooKeeper.state == States.CONNECTED) { int timeToNextPing = readTimeout/2 - idleSend; // 发送 ping 命令(心跳),更新 lastSend if (timeToNextPing <= 0) { sendPing(); lastSend = now; enableWrite(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } selector.select(to); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here now = System.currentTimeMillis(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { lastHeard = now; lastSend = now; primeConnection(k); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { if (outgoingQueue.size() > 0) { // We have something to send so it's the same // as if we do the send now. lastSend = now; } if (doIO()) { lastHeard = now; } } } if (zooKeeper.state == States.CONNECTED) { if (outgoingQueue.size() > 0) { enableWrite(); } else { disableWrite(); } } selected.clear(); } catch (Exception e) { if (closing) { if (LOG.isDebugEnabled()) { // closing so this is expected LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(getSessionId()) + " : " + e.getMessage()); } break; } else { // this is ugly, you have a better way speak up if (e instanceof SessionExpiredException) { LOG.info(e.getMessage() + ", closing socket connection"); } else if (e instanceof SessionTimeoutException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof EndOfStreamException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else { LOG.warn("Session 0x" + Long.toHexString(getSessionId()) + " for server " + ((SocketChannel)sockKey.channel()) .socket().getRemoteSocketAddress() + ", unexpected error" + RETRY_CONN_MSG, e); } // 断开连接 cleanup(); if (zooKeeper.state.isAlive()) { eventThread.queueEvent(new WatchedEvent( Event.EventType.None, Event.KeeperState.Disconnected, null)); } now = System.currentTimeMillis(); lastHeard = now; lastSend = now; } } } cleanup(); try { selector.close(); } catch (IOException e) { LOG.warn("Ignoring exception during selector close", e); } if (zooKeeper.state.isAlive()) { eventThread.queueEvent(new WatchedEvent( Event.EventType.None, Event.KeeperState.Disconnected, null)); } ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "SendThread exitedloop."); }
zk server 对 session 也会有一个跟踪,它也会关掉超时的 session,具体逻辑在
void org.apache.zookeeper.server.SessionTrackerImpl.run()
zk server 每收到一个请求,就会触发 touchSession
所有评论(0)