使用阿里云RocketMQ发送json字符串到线上

MQClient mqClient = new MQClient(
                // 设置HTTP接入域名(此处以公共云生产环境为例)
                nameServer,
                // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
                accessKey,
                // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
                secretKey
        );



        // 获取Topic的生产者
        MQProducer producer;
        if (StringUtil.isNotBlank(instanceId)) {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            // 发送消息
            TopicMessage pubMsg;
            // 普通消息
            pubMsg = new TopicMessage();
            logger.debug("jsonStr:::"+jsonStr);
            pubMsg.setMessageBody(JSON.toJSONBytes(jsonStr));
            pubMsg.setMessageTag(MessageTag);
            // 设置属性
            pubMsg.getProperties().put("Content-Type", "application/json");
            // 设置KEY
            String MessageKey = DateUtil.getNowDate(DateUtil.DFT_NO_BLANK);

            pubMsg.setMessageKey(MessageKey);
            // 同步发送消息,只要不抛异常就是成功
            TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

            // 同步发送消息,只要不抛异常就是成功
            logger.debug(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                    + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
        } catch (Throwable e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            logger.debug(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
            logger.error(e.getMessage(),e);
        }

        mqClient.close();

记录下第一次用阿里云的MQ对接信息

经验分享 程序员 微信小程序 职场和发展