spring cloud stream 3.0 -- 生产者消费者例子
spring cloud stream 3.0 的开发方式有了比较大的变化,不需要注解,根据方法名自动生成需要的队列和exchange。
创建项目
使用 Spring Initializr创建一个新project,并选择上依赖“Cloud Stream"。 选择rabbitmq作为broker。
依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
rabbit地址
application.yml:
spring: rabbitmq: addresses: localhost:5672
编写消费者代码
@SpringBootApplication public class LoggingConsumerApplication { public static void main(String[] args) { SpringApplication.run(LoggingConsumerApplication.class, args); } @Bean public Consumer<Person> log() { return person -> { System.out.println("Received: " + person); }; } public static class Person { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } public String toString() { return this.name; } } }
上面的代码做了以下的事情:
- 通过Consumer接口实现了一个简单的消费者。
- spring框架自动帮我们把消息者绑定到了destination名为log-in-0,随机的group名
- spring框架自动帮我们把消息转换为Persion对象。
运行
输出:
注意: 指定group名称后,创建的队列不再会自动删除。
{ "name":"Sam Spade"}
程序控制台会有输出:
Received: Sam Spade
生产者
同步发送消息
@SpringBootApplication @Controller public class DemoApplication { @Autowired private StreamBridge streamBridge; public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @RequestMapping @ResponseStatus(HttpStatus.ACCEPTED) public void delegateToSupplier(@RequestBody String body) { System.out.println("Sending " + body); streamBridge.send("log-in-0", body); } }
上面的例子通过StreamBridge发送消息到log-in-0,与之前的消息者的destination对应。
发送post请求到http://localhost:8080
curl -H "Content-Type: text/plain" -X POST -d {"name":"Sam Spade"} http://localhost:8080/
前面定义的消费者会收到消息。
也可以通过配置属性spring.cloud.stream.bindings.log-in-0.destination=des来自定义destination名称。
Reactive异步发送消息
@SpringBootApplication @Controller public class DemoApplication { EmitterProcessor<String> processor = EmitterProcessor.create(); public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @RequestMapping @ResponseStatus(HttpStatus.ACCEPTED) public void delegateToSupplier(@RequestBody String body) { processor.onNext(body); } @Bean public Supplier<Flux<String>> supplier() { return () -> this.processor; }
通过配置spring.cloud.stream.bindings.supplier-out-0.destination=des指定要发送到的destination。
注意: 如果项目中有多个Supplier或Consumer,需要通过配置spring.cloud.function.definition=log;supplier指定哪些方法需要绑定。
binding命名规则
上面代码中的binding名称都是自动生成的,命名规则是:
-
input - <functionName> + -in- + <index> output - <functionName> + -out- + <index>
Supplier为output,Consumer为input。除非方法有多个input或output,否则index为0。