zookeeper开源框架curator的ConnectionStateListener机制
zookeeper的开源框架curator提供了重连的机制近期我做的一个开源项目分布式调度平台xj-job中执行器与调度中心解耦,用到了zookeeper做为执行器注册中心,执行器启动向zookeeper注册执行器信息临时节点,当执行器停止会自动剔除,curator提供了客户端重连机制,进程阻塞导致的zookeeper客户端会话超时,导致的zookeeper启用重连机制。Exponenti...
·
zookeeper的开源框架curator提供了重连的机制
近期我做的一个开源项目分布式调度平台xj-job中执行器与调度中心解耦,用到了zookeeper做为执行器注册中心,执行器启动向zookeeper注册执行器信息临时节点,当执行器停止会自动剔除,curator提供了客户端重连机制,进程阻塞导致的zookeeper客户端会话超时,导致的zookeeper启用重连机制。
- ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加
- RetryNTimes:指定最大重试次数的重试策略
- RetryOneTime:仅重试一次
- RetryUntilElapsed:一直重试直到达到规定的时间
application.properties:
################################## zookeeper ##################################
xjjob.zookeeper.host=192.168.220.153:2181
xjjob.zookeeper.maxRetry=3
xjjob.zookeeper.sessionTimeout=6000
xjjob.zookeeper.connectTimeout=6000
xjjob.zookeeper.namespace=xjjob
@Configuration
@ConfigurationProperties(prefix = "xjjob.zookeeper")
public class ZookeeperConfig {
private final Logger LOGGER = LoggerFactory.getLogger(ZookeeperConfig.class);
private String host;
private int maxRetry;
private int sessionTimeout;
private int connectTimeout;
private String namespace;
@Bean
public CuratorFramework curatorFramework(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(host)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectTimeout)
.retryPolicy(retryPolicy)
.namespace(namespace)
.build();
client.start();
return client;
}
@PreDestroy
private void destroyClient(){
curatorFramework().close();
LOGGER.info("==================关闭成功==================");
}
重试机制解决了但是会引发另一个问题,执行器zookeeper进程阻塞导致session超时会话断开,注册中心的执行器信息临时节点会丢失,重试机制启用重连策略,连接成功但是执行器信息已经不在zookeeper中了,为了解决这个问题引入客户端连接状态监听机制(即ConnectionStateListener使用)。有了解决方案在curator客户端中添加connection状态监听,当充实机制重连成功后,需要把执行器信息重新注册到注册中心zookeeper上。代码如下:
/**
* @author shengtao
* @Description: 监听ZK客户端会话
* @date 2019/03/06 17:23
*/
@Component
public class ZookeeperConnectionListener implements ConnectionStateListener {
private final Logger log = LoggerFactory.getLogger(ZookeeperConnectionListener.class);
@Autowired
private Environment environment;
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
if(connectionState == ConnectionState.RECONNECTED){ //监控重新连接的状态,补偿执行器注册
String port = environment.getProperty("server.port"); //获取执行器端口
int hostPort = 0;
if (StringUtils.isNotBlank(port)) {
hostPort = Integer.valueOf(port);
}
ExcutorEntity excutorEntity = ExcutorHelper.getExcutorEntity(hostPort); //构造执行器注册信息
try {
Stat stat = curatorFramework.checkExists().forPath(ExcutorHelper.getPath(excutorEntity)); //检查执行器信息状态
if(stat == null){
ExcutorHelper.registerExcutor(curatorFramework,hostPort); //重新注册执行器信息
}
} catch (Exception e) {
log.error("注册执行器节点失败:",e);
//重试
}
}
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)