RabbitMQ(Java操作直连式)
1 - maven依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.2</version> </dependency>
2 - 生产者
public class Provider { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.181.133"); factory.setPort(5672); factory.setVirtualHost("/ems"); factory.setUsername("root"); factory.setPassword("root"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 获取队列信息 * 参数1:队列名称 (没有自动创建) * 参数2:队列是否持久化 * 参数3:线程是否独占队列 * 参数4:是否在消费完成后删除数据 * 参数5:添加额外数据 */ channel.queueDeclare("onebyone", true, false, false, null); /** * 发布消息 * 参数1:交换机的名称(直连式没有涉及到交换机) * 参数2:发布消息的队列名称 * 参数3:额外参数 * 参数4:发布的消息(二进制方式) */ channel.basicPublish("","onebyone", null,"张三".getBytes()); //关闭资源 channel.close(); connection.close(); } }
3 - 消费者
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.181.133"); factory.setPort(5672); factory.setVirtualHost("/ems"); factory.setUsername("root"); factory.setPassword("root"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 获取队列: * 参数1:队列的名称 * 参数2:是否持久化 * 参数3:是否独占队列 * 参数4:是否完成消费后删除消息 * 参数5:额外参数 */ channel.queueDeclare("onebyone", true, false , false, null); /** * 消费消息 * 参数1:队列名称 * 参数2:消费者完成消费后是否通知rabbitmq删除已被消费消息 * 参数3:回调函数(获取消费的消息信息) */ channel.basicConsume("onebyone", true, new DefaultConsumer(channel){ @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); System.out.println("这是异步"); } }
4 - 测试
先开启消费者进行阻塞获取消息 在开启生产者 查看消费者状态
下一篇:
架构1.分层架构,什么是分层架构