spring cloud stream 3.0 -- 生产者消费者例子
spring cloud stream 3.0 的开发方式有了比较大的变化,不需要注解,根据方法名自动生成需要的队列和exchange。
创建项目
使用 Spring Initializr创建一个新project,并选择上依赖“Cloud Stream"。 选择rabbitmq作为broker。
依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
rabbit地址
application.yml:
spring:
rabbitmq:
addresses: localhost:5672
编写消费者代码
@SpringBootApplication
public class LoggingConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(LoggingConsumerApplication.class, args);
}
@Bean
public Consumer<Person> log() {
return person -> {
System.out.println("Received: " + person);
};
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return this.name;
}
}
}
上面的代码做了以下的事情:
- 通过Consumer接口实现了一个简单的消费者。
- spring框架自动帮我们把消息者绑定到了destination名为log-in-0,随机的group名
- spring框架自动帮我们把消息转换为Persion对象。
运行
输出:
注意: 指定group名称后,创建的队列不再会自动删除。
{
"name":"Sam Spade"}
程序控制台会有输出:
Received: Sam Spade
生产者
同步发送消息
@SpringBootApplication
@Controller
public class DemoApplication {
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("log-in-0", body);
}
}
上面的例子通过StreamBridge发送消息到log-in-0,与之前的消息者的destination对应。
发送post请求到http://localhost:8080
curl -H "Content-Type: text/plain" -X POST -d {"name":"Sam Spade"} http://localhost:8080/
前面定义的消费者会收到消息。
也可以通过配置属性spring.cloud.stream.bindings.log-in-0.destination=des来自定义destination名称。
Reactive异步发送消息
@SpringBootApplication
@Controller
public class DemoApplication {
EmitterProcessor<String> processor = EmitterProcessor.create();
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
processor.onNext(body);
}
@Bean
public Supplier<Flux<String>> supplier() {
return () -> this.processor;
}
通过配置spring.cloud.stream.bindings.supplier-out-0.destination=des指定要发送到的destination。
注意: 如果项目中有多个Supplier或Consumer,需要通过配置spring.cloud.function.definition=log;supplier指定哪些方法需要绑定。
binding命名规则
上面代码中的binding名称都是自动生成的,命名规则是:
-
input - <functionName> + -in- + <index> output - <functionName> + -out- + <index>
Supplier为output,Consumer为input。除非方法有多个input或output,否则index为0。
