RocketMQ教程(一) - 基础
目录
正文
1. 下载
选择Binary包
这里解压缩后放到了D盘, 且为了操作方便, 把文件夹从rocketmq-all-4.5.0-bin-release改名为RocketMQ
默认设置占用内存很大, 如果不是土豪配置需要修改一下
NameServer设置: D:RocketMQin unserver.cmd
修改为512m 512m 256m即可
Broker设置: D:RocketMQin unbroker.cmd
2. 设置环境变量
桌面的计算机上点击右键 -> 属性
3. 启动NameServer
在D:RocketMQin目录下启动命令行, 执行 runserver.cmd
4. 启动Broker
执行mqbroker.cmd -n localhost:9876
注意图中, Broker注册的地址是192.168.12.22:10911, 接下来需要用到
5. 创建Topic
执行mqadmin.cmd updateTopic -n localhost:9876 -b 192.168.12.22:10911 -t demo
其中用-b指定Broker, 即步骤4中显示的地址
6. 开发生产者Producer
建立一个SpringBoot项目, 添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
创建ProducerService
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @Service public class ProducerService { private DefaultMQProducer producer = null; @PostConstruct public void initMQProducer() { producer = new DefaultMQProducer("defaultGroup"); producer.setNamesrvAddr("localhost:9876"); producer.setRetryTimesWhenSendFailed(3); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } public boolean send(String topic, String tags, String content) { Message msg = new Message(topic, tags, "", content.getBytes()); try { producer.send(msg); return true; } catch (Exception e) { e.printStackTrace(); } return false; } @PreDestroy public void shutDownProducer() { if(producer != null) { producer.shutdown(); } } }
测试
@RunWith(SpringRunner.class) @SpringBootTest public class RocketmqDemoApplicationTests { @Autowired private ProducerService producerService; @Test public void contextLoads() { boolean result = producerService.send("demo", "TAG-A", "Hello RocketMQ"); assertTrue(result); } }
7. 开发消费者
创建ConsumerService
@Service public class ConsumerService { private DefaultMQPushConsumer consumer = null; @PostConstruct public void initMQConsumer() { consumer = new DefaultMQPushConsumer("defaultGroup"); consumer.setNamesrvAddr("localhost:9876"); try { consumer.subscribe("demo", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("Message Received: " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } } @PreDestroy public void shutDownConsumer() { if (consumer != null) { consumer.shutdown(); } }
启动项目