jfinal中ActiveMQ的简单使用
参考官网分享文档:http://www.jfinal.com/share/77
本次MQ主要是用于接口调用的同时,异步发送邮件,因此直接使用点对点方式,具体实现如下: 依赖:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.13.4</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.7.0</version> </dependency>
工具类:
package com.solomo.controller.activemq; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.pool.PooledConnection; public class ActiveMQ { public static final ConcurrentHashMap<String, PooledConnection> pooledConnectionMap = new ConcurrentHashMap<>(); public static void addConnection(String connectionName, PooledConnection connection) { pooledConnectionMap.put(connectionName, connection); } public static PooledConnection getConnection(String connectionName) { return pooledConnectionMap.get(connectionName); } }
ActiveMQ插件:
package com.solomo.controller.activemq; import javax.jms.JMSException; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.pool.PooledConnection; import org.apache.activemq.pool.PooledConnectionFactory; import com.jfinal.plugin.IPlugin; public class ActiveMQPlugin implements IPlugin { public static final String BROKER_URL = "tcp://***.**.***.***:61616"; public static final String QUEUE_NAME = "sendMail"; @Override public boolean start() { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setUserName(ActiveMQConnection.DEFAULT_USER); activeMQConnectionFactory.setPassword(ActiveMQConnection.DEFAULT_PASSWORD); activeMQConnectionFactory.setBrokerURL(BROKER_URL); activeMQConnectionFactory.setDispatchAsync(true);// 异步发送消息 PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); pooledConnectionFactory.setMaximumActiveSessionPerConnection(200); pooledConnectionFactory.setIdleTimeout(120); pooledConnectionFactory.setMaxConnections(5); pooledConnectionFactory.setBlockIfSessionPoolIsFull(true); try { PooledConnection connection = (PooledConnection) pooledConnectionFactory.createConnection(); connection.start(); ActiveMQ.pooledConnectionMap.put(QUEUE_NAME, connection); new JmsReceiver(connection, QUEUE_NAME); } catch (JMSException e) { e.printStackTrace(); } return true; } @Override public boolean stop() { return true; } }
生产者:
package com.solomo.controller.activemq; import javax.jms.*; import org.apache.activemq.pool.PooledConnection; public class JmsSender { private Session session; private MessageProducer producer; public JmsSender(PooledConnection connection, String subject) throws JMSException { // 事务性会话,自动确认消息 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消息的目的地 Queue queue = session.createQueue(subject); producer = session.createProducer(queue); } public Session getSession() { return session; } public void sendMessage(Message message) throws JMSException { producer.send(message); } }
消费者:
配置文件中配置插件:
// 配置消息中间件 ActiveMQPlugin activeMQPlugin = new ActiveMQPlugin(); me.add(activeMQPlugin);
使用:
JmsSender sender = new JmsSender(ActiveMQ.getConnection("sendMail"), ActiveMQPlugin.QUEUE_NAME); MapMessage mapMessage = sender.getSession().createMapMessage(); mapMessage.setString("nickName", user.getStr("nickName")); mapMessage.setString("opinions", opinions); sender.sendMessage(mapMessage);