Rocketmq一个生产者多个消费者的问题
消费者同组时,有两种消息模式。
1.集群模式,多个消费者通过负载均衡一起消费信息
2.广播模式,一个消息会复制成多份分发给每一个消费者
消费者不同组时
一个消息会复制多份发给不同的消费者组
public class SyncProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name
DefaultMQProducer producer = new DefaultMQProducer("producer_group_01");
//Specify name server addresses
producer.setNamesrvAddr("localhost:9876");
//Launch the instance
producer.start();
for(int i = 0; i < 10; i++){
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
//Shut down once the producer instance is not longer in use.
}
producer.shutdown();
}
}
public class Consumer {
public static void main(String[] args) throws Exception{
//Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_01");
//Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
//Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
consumer.setMessageModel(MessageModel.BROADCASTING);
//Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
for (MessageExt messageExt : list) {
System.out.println("收到消息" + new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started. %n");
}
}
