spring-boot整合RabbitMQ的详细步骤
前言
是目前使用最为广泛的开源消息中间件之一。它实现了AMQP协议,它支持集群化部署,可以动态伸缩。使用RabbitMQ可以将复杂的系统解耦,可以做瞬时高峰的削峰处理。那么在spring-boot中我们该如何集成RabbitMQ呢?
1. 在pom.xml中加入spring-boot-starter-amqp依赖
<!-- Rabbit MQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. 修改application.yml配置
spring: rabbitmq: host: xxxx port: xxxx username: xxxx password: xxxx listener: type: simple simple: acknowledge-mode: manual #采用手动应答 prefetch: 1 #限制每次发送一条数据。 queues: testQueueName: testQueueName testQueueExchangeName: testQueueExchangeName testExchangeEnabled: true
3. 添加RabbitMQConfig配置文件
@Configuration public class RabbitMQConfig { @Value("${queues.testQueueName}") private String testQueueName; @Value("${queues.testQueueExchangeName}") private String testQueueExchangeName; @Bean(name = "TestInfo") public Queue TestMessage() { return new Queue(testQueueName); } /** * 配置广播路由器 * @return */ @Bean FanoutExchange testFanoutExchange() { return new FanoutExchange(testQueueExchangeName); } @Bean Binding bindingExchangeTestInfo(@Qualifier("TestInfo") Queue TestMessage, @Qualifier("testFanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(TestMessage).to(fanoutExchange); } }
4. 消息发送逻辑
@Slf4j @Service public class RabbitMQSendServiceImpl implements RabbitMQSendService { @Autowired RabbitTemplate rabbitmqTemplate; /** * 发送到MQ * * @param msg */ public void sendToMQ(String msg) { // Send MQ try { rabbitmqTemplate.convertAndSend("testQueueExchangeName", "", msg); } catch (Exception ex) { ex.printStackTrace(); System.out.println(ex); } } }
5. 消息监听逻辑
@Component public class RabbitMQMsgListener { /** * 批量监听配置 * @param connectionFactory * @return */ @Bean("batchTestListenerContainerFactory") public SimpleRabbitListenerContainerFactory batchBumImpListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //设置批量 factory.setBatchListener(true); factory.setConsumerBatchEnabled(true);//设置BatchMessageListener生效 factory.setBatchSize(100);//设置监听器一次批量处理的消息数量 return factory; } // 单条消息接收处理 @RabbitListener(queues = "${queues.testQueueName}", autoStartup = "${queues.testExchangeEnabled}") public void process(Message message, Channel channel) throws IOException { String msg = new String(message.getBody(), "UTF-8"); // TODO 单条消息处理 // 处理完手动ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } // 批量接收处理 @RabbitListener(queues = "${queues.testQueueName}", autoStartup = "${queues.testExchangeEnabled}", containerFactory = "batchTestListenerContainerFactory") public void onMessageBatch(List<Message> messages, Channel channel) throws IOException { try { for (Message message : messages) { String msg = new String(message.getBody(), "UTF-8"); // TODO 单条消息处理 } } catch (Exception e) { e.printStackTrace(); } } }