zookeeper开源框架curator的ConnectionStateListener机制
zookeeper的开源框架curator提供了重连的机制
近期我做的一个开源项目分布式调度平台xj-job中执行器与调度中心解耦,用到了zookeeper做为执行器注册中心,执行器启动向zookeeper注册执行器信息临时节点,当执行器停止会自动剔除,curator提供了客户端重连机制,进程阻塞导致的zookeeper客户端会话超时,导致的zookeeper启用重连机制。
-
ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加 RetryNTimes:指定最大重试次数的重试策略 RetryOneTime:仅重试一次 RetryUntilElapsed:一直重试直到达到规定的时间
application.properties:
################################## zookeeper ################################## xjjob.zookeeper.host=192.168.220.153:2181 xjjob.zookeeper.maxRetry=3 xjjob.zookeeper.sessionTimeout=6000 xjjob.zookeeper.connectTimeout=6000 xjjob.zookeeper.namespace=xjjob
@Configuration @ConfigurationProperties(prefix = "xjjob.zookeeper") public class ZookeeperConfig { private final Logger LOGGER = LoggerFactory.getLogger(ZookeeperConfig.class); private String host; private int maxRetry; private int sessionTimeout; private int connectTimeout; private String namespace; @Bean public CuratorFramework curatorFramework(){ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(host) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectTimeout) .retryPolicy(retryPolicy) .namespace(namespace) .build(); client.start(); return client; } @PreDestroy private void destroyClient(){ curatorFramework().close(); LOGGER.info("==================关闭成功=================="); }
重试机制解决了但是会引发另一个问题,执行器zookeeper进程阻塞导致session超时会话断开,注册中心的执行器信息临时节点会丢失,重试机制启用重连策略,连接成功但是执行器信息已经不在zookeeper中了,为了解决这个问题引入客户端连接状态监听机制(即ConnectionStateListener使用)。有了解决方案在curator客户端中添加connection状态监听,当充实机制重连成功后,需要把执行器信息重新注册到注册中心zookeeper上。代码如下:
/** * @author shengtao * @Description: 监听ZK客户端会话 * @date 2019/03/06 17:23 */ @Component public class ZookeeperConnectionListener implements ConnectionStateListener { private final Logger log = LoggerFactory.getLogger(ZookeeperConnectionListener.class); @Autowired private Environment environment; @Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { if(connectionState == ConnectionState.RECONNECTED){ //监控重新连接的状态,补偿执行器注册 String port = environment.getProperty("server.port"); //获取执行器端口 int hostPort = 0; if (StringUtils.isNotBlank(port)) { hostPort = Integer.valueOf(port); } ExcutorEntity excutorEntity = ExcutorHelper.getExcutorEntity(hostPort); //构造执行器注册信息 try { Stat stat = curatorFramework.checkExists().forPath(ExcutorHelper.getPath(excutorEntity)); //检查执行器信息状态 if(stat == null){ ExcutorHelper.registerExcutor(curatorFramework,hostPort); //重新注册执行器信息 } } catch (Exception e) { log.error("注册执行器节点失败:",e); //重试 } } } }