【rocketmq系列】【四】【消息发送】
消息队列RocketMQ版提供三种方式来发送普通消息:同步(Sync)发送、异步(Async)发送和单向(Oneway)发送。
了解点
- 发送方式的原理
- 应用场景
- 消息队列如何进行负载
- 消息发送如何实现高可用
- 批量消息发送如何实现一致性
3种发送方式
同步发送
原理 同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
异步发送
原理 异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ版的异步发送,需要您实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
应用场景 异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
单向发送(oneway)
原理 发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
应用场景 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
对比
消息描述
org.apache.rocketmq.common.message.Message
public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; // 主题 private String topic; // org.apache.rocketmq.common.sysflag.MessageSysFlag private int flag; // 扩展属性 private Map<String, String> properties; private byte[] body; private String transactionId; public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) { this.topic = topic; this.flag = flag; this.body = body; if (tags != null && tags.length() > 0) this.setTags(tags); if (keys != null && keys.length() > 0) this.setKeys(keys); this.setWaitStoreMsgOK(waitStoreMsgOK); } public void setTags(String tags) { this.putProperty(MessageConst.PROPERTY_TAGS, tags); } public void setKeys(Collection<String> keys) { StringBuffer sb = new StringBuffer(); for (String k : keys) { sb.append(k); sb.append(MessageConst.KEY_SEPARATOR); } this.setKeys(sb.toString().trim()); } public void setKeys(String keys) { this.putProperty(MessageConst.PROPERTY_KEYS, keys); } public void setWaitStoreMsgOK(boolean waitStoreMsgOK) { this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK)); } }
Message的扩展属性
- tag:消息Tag,用于消息过滤
- keys:Message索引键,多个用空格分开,rocketmq可以根据这些key快速检索到消息
- waitStoreMsgOK:消息发送时是否等消息存储完成后再返回
- delayTimeLevel:消息延迟级别,用于定时消息或消息重试
生产者启动流程
代码都在client模块,相对于rocketmq来说,他就是客户端。
org.apache.rocketmq.client.MQAdmin org.apache.rocketmq.client.producer.DefaultMQProducer
参考
https://help.aliyun.com/document_detail/29547.html
https://kunzhao.org/docs/rocketmq/rocketmq-send-message-flow/#%E4%B8%80%E6%9C%8D%E5%8A%A1%E5%99%A8%E5%90%AF%E5%8A%A8