spring cloud stream 3.0 -- 生产者消费者例子

spring cloud stream 3.0 的开发方式有了比较大的变化,不需要注解,根据方法名自动生成需要的队列和exchange。

创建项目

使用 Spring Initializr创建一个新project,并选择上依赖“Cloud Stream"。 选择rabbitmq作为broker。

依赖:

<dependency>
	<groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

rabbit地址

application.yml:

spring:
  rabbitmq:
    addresses: localhost:5672

编写消费者代码

@SpringBootApplication
public class LoggingConsumerApplication {
          
   

	public static void main(String[] args) {
          
   
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
          
   
	    return person -> {
          
   
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
          
   
		private String name;
		public String getName() {
          
   
			return name;
		}
		public void setName(String name) {
          
   
			this.name = name;
		}
		public String toString() {
          
   
			return this.name;
		}
	}
}

上面的代码做了以下的事情:

  1. 通过Consumer接口实现了一个简单的消费者。
  2. spring框架自动帮我们把消息者绑定到了destination名为log-in-0,随机的group名
  3. spring框架自动帮我们把消息转换为Persion对象。

运行

输出:

注意: 指定group名称后,创建的队列不再会自动删除。
{
          
   "name":"Sam Spade"}

程序控制台会有输出:

Received: Sam Spade

生产者

同步发送消息

@SpringBootApplication
@Controller
public class DemoApplication {
          
   

    @Autowired
    private StreamBridge streamBridge;

    public static void main(String[] args) {
          
   
        SpringApplication.run(DemoApplication.class, args);
    }
    @RequestMapping
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void delegateToSupplier(@RequestBody String body) {
          
   
        System.out.println("Sending " + body);
        streamBridge.send("log-in-0", body);
    }
}

上面的例子通过StreamBridge发送消息到log-in-0,与之前的消息者的destination对应。

发送post请求到http://localhost:8080

curl -H "Content-Type: text/plain" -X POST -d {"name":"Sam Spade"} http://localhost:8080/

前面定义的消费者会收到消息。

也可以通过配置属性spring.cloud.stream.bindings.log-in-0.destination=des来自定义destination名称。

Reactive异步发送消息

@SpringBootApplication
@Controller
public class DemoApplication {
          
   

    EmitterProcessor<String> processor = EmitterProcessor.create();


    public static void main(String[] args) {
          
   
        SpringApplication.run(DemoApplication.class, args);
    }
    @RequestMapping
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void delegateToSupplier(@RequestBody String body) {
          
   
        processor.onNext(body);
    }

    @Bean
    public Supplier<Flux<String>> supplier() {
          
   
        return () -> this.processor;
    }

通过配置spring.cloud.stream.bindings.supplier-out-0.destination=des指定要发送到的destination。

注意: 如果项目中有多个Supplier或Consumer,需要通过配置spring.cloud.function.definition=log;supplier指定哪些方法需要绑定。

binding命名规则

上面代码中的binding名称都是自动生成的,命名规则是:

    input - <functionName> + -in- + <index> output - <functionName> + -out- + <index>

Supplier为output,Consumer为input。除非方法有多个input或output,否则index为0。

经验分享 程序员 微信小程序 职场和发展