在spring使用activeMQ订阅多个topic并实现监听器监听
最近有个需求,在spring上集成activeMQ且订阅多个topic,并且需要实现监听器监听多个topic。
一、maven依赖配置pom.xml
<!-- activemq --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.6.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.3</version> </dependency>
二、spring.xml
1、配置ConnectionFactory
activeMQ的连接工厂类,brokerURL属性需注入activeMQ服务器地址url。
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.1.162:61616"/> </bean>
2、配置singleConnectionFactory
singleConnectionFactory封装了activeMQ的连接工厂类ConnectionFactory。
<bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean>
3、配置jmsTemplate
Spring提供的JMS工具类,它可以进行消息发送、接收等,若我们的项目不需用到发送功能,此处可以不配置,该模板类需注入依赖connectionFactory。
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="singleConnectionFactory"/> </bean>
4、配置topicDestination
该类是topic目标类,使用该Destination可订阅某topic接收或发送某topic,由于我们需要的是订阅多个topic,则此处为关键点:构造注入多个topic,以英文逗号’,’隔开实现订阅多个topic。
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic1,topic2"/> </bean>
5、配置自定义实现的监听器
该自定义监听器作用为实现自定义的逻辑处理接收到的message
<bean id="consumerSessionAwareMessageListener" class="com.xyh.listener.ConsumerSessionAwareMessageListener"/>
6、配置MessageListenerContainer
<bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="singleConnectionFactory" /> <property name="destination" ref="topicDestination" /> <property name="messageListener" ref="consumerSessionAwareMessageListener" /> </bean>
三、自定义监听器ConsumerSessionAwareMessageListener
public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<BytesMessage> { public void onMessage(BytesMessage message, Session session) throws JMSException { try { Destination destination = message.getJMSDestination(); String topic = destination.toString(); long length = message.getBodyLength(); byte[] b = new byte[(int) length]; message.readBytes(b); if (topic.equals("topic://topic1")) { //... }else if (topic.equals("topic://topic2")){ //... } }catch (Exception e){ } } }
上述自定义了一个监听器类实现SessionAwareMessageListener接口,且传入泛型BytesMessage,实现onMessage方法,该方法接收BytesMessage、Session两个类型的参数,从message中取得Destination,该Destination执行toString方法后返回的值为字符串(例:topic://订阅的topic),而后几行代码作用为读取message中序列化的数据。