使用阿里云RocketMQ发送json字符串到线上
MQClient mqClient = new MQClient( // 设置HTTP接入域名(此处以公共云生产环境为例) nameServer, // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建 accessKey, // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建 secretKey ); // 获取Topic的生产者 MQProducer producer; if (StringUtil.isNotBlank(instanceId)) { producer = mqClient.getProducer(instanceId, topic); } else { producer = mqClient.getProducer(topic); } try { // 发送消息 TopicMessage pubMsg; // 普通消息 pubMsg = new TopicMessage(); logger.debug("jsonStr:::"+jsonStr); pubMsg.setMessageBody(JSON.toJSONBytes(jsonStr)); pubMsg.setMessageTag(MessageTag); // 设置属性 pubMsg.getProperties().put("Content-Type", "application/json"); // 设置KEY String MessageKey = DateUtil.getNowDate(DateUtil.DFT_NO_BLANK); pubMsg.setMessageKey(MessageKey); // 同步发送消息,只要不抛异常就是成功 TopicMessage pubResultMsg = producer.publishMessage(pubMsg); // 同步发送消息,只要不抛异常就是成功 logger.debug(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId() + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5()); } catch (Throwable e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理 logger.debug(new Date() + " Send mq message failed. Topic is:" + topic); e.printStackTrace(); logger.error(e.getMessage(),e); } mqClient.close();
记录下第一次用阿里云的MQ对接信息