RocketMQTemplate发送带tags的消息
RocketMQTemplate是RocketMQ集成到Spring cloud之后提供的个方便发送消息的模板类,它是基本Spring 的消息机制实现的,对外只提供了Spring抽象出来的消息发送接口。在单独使用RocketMQ的时候,发送消息使用的Message是‘org.apache.rocketmq.common.message’包下面的Message,而使用RocketMQTemplate发送消息时,使用的Message是org.springframework.messaging的Message,猛一看,没办法发送带tags的消息了,其实在RocketMQ集成的时候已经解决了这个问题。
在RocketMQTemplate发送消息时,调用的方法是:
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("syncSendOrderly failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { long now = System.currentTimeMillis(); //在这里对消息进行了转化,将Spring的message转化为rocketmq自己的message org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout); long costTime = System.currentTimeMillis() - now; log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); return sendResult; } catch (Exception e) { log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } }
在上面的代码中,对消息进行了转化,将Spring的message转化为rocketmq自己的message,在RocketMQUtil.convertToRocketMessage方法中有个地方就是获取tags的:
String[] tempArr = destination.split(":", 2); String topic = tempArr[0]; String tags = ""; if (tempArr.length > 1) { tags = tempArr[1]; }
所以,在发送消息的时候,我们只要把tags使用":"添加到topic后面就可以了。例如:xxxx:tag1 || tag2 || tag3