快捷搜索: 王者荣耀 脱发

zookeeper开源框架curator的ConnectionStateListener机制

zookeeper的开源框架curator提供了重连的机制

近期我做的一个开源项目分布式调度平台xj-job中执行器与调度中心解耦,用到了zookeeper做为执行器注册中心,执行器启动向zookeeper注册执行器信息临时节点,当执行器停止会自动剔除,curator提供了客户端重连机制,进程阻塞导致的zookeeper客户端会话超时,导致的zookeeper启用重连机制。

    ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加 RetryNTimes:指定最大重试次数的重试策略 RetryOneTime:仅重试一次 RetryUntilElapsed:一直重试直到达到规定的时间

application.properties:

################################## zookeeper ##################################
xjjob.zookeeper.host=192.168.220.153:2181
xjjob.zookeeper.maxRetry=3
xjjob.zookeeper.sessionTimeout=6000
xjjob.zookeeper.connectTimeout=6000
xjjob.zookeeper.namespace=xjjob
@Configuration
@ConfigurationProperties(prefix = "xjjob.zookeeper")
public class ZookeeperConfig {
    private final Logger LOGGER = LoggerFactory.getLogger(ZookeeperConfig.class);

    private String host;

    private int maxRetry;

    private int sessionTimeout;

    private int connectTimeout;

    private String namespace;

    @Bean
    public CuratorFramework curatorFramework(){
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(host)
                        .sessionTimeoutMs(sessionTimeout)
                        .connectionTimeoutMs(connectTimeout)
                        .retryPolicy(retryPolicy)
                        .namespace(namespace)
                        .build();
        client.start();
        return client;
    }

    @PreDestroy
    private void destroyClient(){
        curatorFramework().close();
        LOGGER.info("==================关闭成功==================");
    }

重试机制解决了但是会引发另一个问题,执行器zookeeper进程阻塞导致session超时会话断开,注册中心的执行器信息临时节点会丢失,重试机制启用重连策略,连接成功但是执行器信息已经不在zookeeper中了,为了解决这个问题引入客户端连接状态监听机制(即ConnectionStateListener使用)。有了解决方案在curator客户端中添加connection状态监听,当充实机制重连成功后,需要把执行器信息重新注册到注册中心zookeeper上。代码如下:

/**
 * @author shengtao
 * @Description: 监听ZK客户端会话
 * @date 2019/03/06 17:23
 */
@Component
public class ZookeeperConnectionListener implements ConnectionStateListener {

    private final Logger log = LoggerFactory.getLogger(ZookeeperConnectionListener.class);

    @Autowired
    private Environment environment;

    @Override
    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
        if(connectionState == ConnectionState.RECONNECTED){ //监控重新连接的状态,补偿执行器注册
            String port = environment.getProperty("server.port"); //获取执行器端口
            int hostPort = 0;
            if (StringUtils.isNotBlank(port)) {
                hostPort = Integer.valueOf(port);
            }
            ExcutorEntity excutorEntity = ExcutorHelper.getExcutorEntity(hostPort); //构造执行器注册信息
            try {
                Stat stat = curatorFramework.checkExists().forPath(ExcutorHelper.getPath(excutorEntity)); //检查执行器信息状态
                if(stat == null){
                    ExcutorHelper.registerExcutor(curatorFramework,hostPort); //重新注册执行器信息
                }
            } catch (Exception e) {
                log.error("注册执行器节点失败:",e);
                //重试
            }
        }
    }
}
经验分享 程序员 微信小程序 职场和发展