SpringCloud源码解析之Bus


前言

Spring Cloud Bus 的主要功能:事件的订阅与发布 、事件监听的具体实现;Spring Cloud Bus 基于 Spring Cloud Stream, Spring Cloud Stream 屏蔽了底层消息中间件的差异性,在其之上封装成各种 Binder。


一、基础使用

1、流程

  1. 提交代码webHook触发post给Server端发送bus/refresh
  2. Server端接收到请求并发送给Spring Cloud Bus
  3. Spring Cloud bus接到消息并通知给其它客户端
  4. 其它客户端接收到通知,请求Server端获取最新配置
  5. 全部客户端均获取到最新的配置

2、服务端配置:

1.1、添加pom依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

1.2、配置文件(在前面config分析的基础上增加):

## 设置actuator在HTTP上暴露所有的控制端点
management.endpoints.web.exposure.include=*
.
## 开启消息跟踪
spring.cloud.bus.trace.enabled=true

## rabbitMQ信息配置
spring.rabbitmq.host= 
spring.rabbitmq.port= 
spring.rabbitmq.username=
spring.rabbitmq.password=

3、客户端配置:

1.1、添加pom依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

1.2、配置文件(在前面config分析的基础上增加):

## 设置actuator在HTTP上暴露所有的控制端点
management.endpoints.web.exposure.include=*

## 如果有多个引用 加上kafaka时,要加上这个配置
spring.cloud.stream.default-binder=rabbit


## rabbitMQ信息配置
spring.rabbitmq.host= 
spring.rabbitmq.port= 
spring.rabbitmq.username=
spring.rabbitmq.password=

4、webhook配置:

1、设置encrypt.key;

encrypt:
  key: yourKey

2、将上步的key配置到git中;

3、配置webHook url; 在Payload URL中写上bus-refresh的地址(比如:http://ip:port/actuator/bus-refresh),这里的IP为公网IP和对应的端口; 如果在本地尝试不生效的情况,建议参考下: 或者不使用webhook,针对refresh接口,我们自己写一下post接口进行刷新。

二、源码分析

1.bus-refresh底层机制分析

1.1、内置事件结构:RefreshRemoteApplicationEvent 1.2、刷新事件的发送端:RefreshBusEndpoint

RefreshBusEndPoint消息的发送者; RefreshListener消息的接收者; 确定destinationService对应的服务: 然后把消息publish出去: 该方法先校验event是否为空; 然后强制转换为applicationEvent; 再进入multicastEvent()方法,该方法中getApplicationListeners()会获得3个对象, 其中一个就是RefreshListener

进入refresh()方法

再进入refreshEnvironment()方法 再进入changes()方法,before就是之前的属性,after就是变更后的属性(更像个merge操作):

回到refresh()方法, 进入refreshAll()方法:销毁scope中所有beans中的instance并刷新

EurekaDiscoveryClientConfiguration类中最后打上断点,收到eurekaClient的请求,先关闭注册再打开。

经验分享 程序员 微信小程序 职场和发展