RabbitMQ第二种模型--workqueue
Work queues,也被称为(Task queues),任务模型。奉行的是平均主义 让多个消费者绑定到一个队列,共同消费队列中的消息。 队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。两个消费者平分队列中的消息
生产者
package com.zuoan.wrokqueue; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.zuoan.utils.RabbitMQUtils; import java.io.IOException; /** * @Description: TODO * @Author: 黄石军 * @CreateTime: 2022/4/9 17:57 * @Company: */ public class Provider { public static void main(String[] args) throws IOException { //创建连接 Connection connection = RabbitMQUtils.getConnection("192.168.128.100"); //创建通道 Channel channel = connection.createChannel(); //通过channel声明队列 //参数一:队列名称,参数二:是否持久化,参数三:是否独占队列,参数四:是否在消费完成后自动删除队列 channel.queueDeclare("work",false,false,false,null); for (int i = 0; i < 20; i++) { //发布消息 //参数一:交换机名称,参数二:队列名称,参数三:消息额外属性,参数四:消息内容 channel.basicPublish("","work",null,("hello workQueue" + i).getBytes()); } //关闭资源 RabbitMQUtils.closeChannelAndConnection(channel,connection); } }
消费者1
package com.zuoan.wrokqueue; import com.rabbitmq.client.*; import com.zuoan.utils.RabbitMQUtils; import java.io.IOException; /** * @Description: TODO * @Author: 黄石军 * @CreateTime: 2022/4/9 18:09 * @Company: */ public class Consumer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection("192.168.128.100"); //创建连接通道 Channel channel1 = connection.createChannel(); //通道绑定对象 channel1.queueDeclare("work",false,false,false,null); //消费消息 //参数一:队列名称 参数二:开始消费时自动确认机制 参数三:消费时的回调接口 channel1.basicConsume("work",true,new DefaultConsumer(channel1){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者-1 :"+ new String(body)); } }); } }
消费者2
package com.zuoan.wrokqueue; import com.rabbitmq.client.*; import com.zuoan.utils.RabbitMQUtils; import java.io.IOException; /** * @Description: TODO * @Author: 黄石军 * @CreateTime: 2022/4/9 18:09 * @Company: */ public class Consumer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection("192.168.128.100"); //创建连接通道 Channel channel2 = connection.createChannel(); //通道绑定对象 channel2.queueDeclare("work",false,false,false,null); //消费消息 //参数一:队列名称 参数二:开始消费时自动确认机制 参数三:消费时的回调接口 channel2.basicConsume("work",true,new DefaultConsumer(channel2){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者-1 :"+ new String(body)); } }); } }
工具类在我博客分类MQ的第一章里需要的小伙伴请自取! https://editor..net/md/?articleId=124064801