SpringCloud源码解析之Bus
前言
Spring Cloud Bus 的主要功能:事件的订阅与发布 、事件监听的具体实现;Spring Cloud Bus 基于 Spring Cloud Stream, Spring Cloud Stream 屏蔽了底层消息中间件的差异性,在其之上封装成各种 Binder。
一、基础使用
1、流程
- 提交代码webHook触发post给Server端发送bus/refresh
- Server端接收到请求并发送给Spring Cloud Bus
- Spring Cloud bus接到消息并通知给其它客户端
- 其它客户端接收到通知,请求Server端获取最新配置
- 全部客户端均获取到最新的配置
2、服务端配置:
1.1、添加pom依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
1.2、配置文件(在前面config分析的基础上增加):
## 设置actuator在HTTP上暴露所有的控制端点 management.endpoints.web.exposure.include=* . ## 开启消息跟踪 spring.cloud.bus.trace.enabled=true ## rabbitMQ信息配置 spring.rabbitmq.host= spring.rabbitmq.port= spring.rabbitmq.username= spring.rabbitmq.password=
3、客户端配置:
1.1、添加pom依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
1.2、配置文件(在前面config分析的基础上增加):
## 设置actuator在HTTP上暴露所有的控制端点 management.endpoints.web.exposure.include=* ## 如果有多个引用 加上kafaka时,要加上这个配置 spring.cloud.stream.default-binder=rabbit ## rabbitMQ信息配置 spring.rabbitmq.host= spring.rabbitmq.port= spring.rabbitmq.username= spring.rabbitmq.password=
4、webhook配置:
1、设置encrypt.key;
encrypt: key: yourKey
2、将上步的key配置到git中;
3、配置webHook url; 在Payload URL中写上bus-refresh的地址(比如:http://ip:port/actuator/bus-refresh),这里的IP为公网IP和对应的端口; 如果在本地尝试不生效的情况,建议参考下: 或者不使用webhook,针对refresh接口,我们自己写一下post接口进行刷新。
二、源码分析
1.bus-refresh底层机制分析
1.1、内置事件结构:RefreshRemoteApplicationEvent 1.2、刷新事件的发送端:RefreshBusEndpoint
RefreshBusEndPoint消息的发送者; RefreshListener消息的接收者; 确定destinationService对应的服务: 然后把消息publish出去: 该方法先校验event是否为空; 然后强制转换为applicationEvent; 再进入multicastEvent()方法,该方法中getApplicationListeners()会获得3个对象, 其中一个就是RefreshListener
进入refresh()方法
再进入refreshEnvironment()方法 再进入changes()方法,before就是之前的属性,after就是变更后的属性(更像个merge操作):
回到refresh()方法, 进入refreshAll()方法:销毁scope中所有beans中的instance并刷新
EurekaDiscoveryClientConfiguration类中最后打上断点,收到eurekaClient的请求,先关闭注册再打开。