RabbitMQ第二种模型--workqueue
Work queues,也被称为(Task queues),任务模型。奉行的是平均主义 让多个消费者绑定到一个队列,共同消费队列中的消息。 队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。两个消费者平分队列中的消息
生产者
package com.zuoan.wrokqueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zuoan.utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: TODO
* @Author: 黄石军
* @CreateTime: 2022/4/9 17:57
* @Company:
*/
public class Provider {
public static void main(String[] args) throws IOException {
//创建连接
Connection connection = RabbitMQUtils.getConnection("192.168.128.100");
//创建通道
Channel channel = connection.createChannel();
//通过channel声明队列
//参数一:队列名称,参数二:是否持久化,参数三:是否独占队列,参数四:是否在消费完成后自动删除队列
channel.queueDeclare("work",false,false,false,null);
for (int i = 0; i < 20; i++) {
//发布消息
//参数一:交换机名称,参数二:队列名称,参数三:消息额外属性,参数四:消息内容
channel.basicPublish("","work",null,("hello workQueue" + i).getBytes());
}
//关闭资源
RabbitMQUtils.closeChannelAndConnection(channel,connection);
}
}
消费者1
package com.zuoan.wrokqueue;
import com.rabbitmq.client.*;
import com.zuoan.utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: TODO
* @Author: 黄石军
* @CreateTime: 2022/4/9 18:09
* @Company:
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection("192.168.128.100");
//创建连接通道
Channel channel1 = connection.createChannel();
//通道绑定对象
channel1.queueDeclare("work",false,false,false,null);
//消费消息
//参数一:队列名称 参数二:开始消费时自动确认机制 参数三:消费时的回调接口
channel1.basicConsume("work",true,new DefaultConsumer(channel1){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者-1 :"+ new String(body));
}
});
}
}
消费者2
package com.zuoan.wrokqueue;
import com.rabbitmq.client.*;
import com.zuoan.utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: TODO
* @Author: 黄石军
* @CreateTime: 2022/4/9 18:09
* @Company:
*/
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection("192.168.128.100");
//创建连接通道
Channel channel2 = connection.createChannel();
//通道绑定对象
channel2.queueDeclare("work",false,false,false,null);
//消费消息
//参数一:队列名称 参数二:开始消费时自动确认机制 参数三:消费时的回调接口
channel2.basicConsume("work",true,new DefaultConsumer(channel2){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1 :"+ new String(body));
}
});
}
}
工具类在我博客分类MQ的第一章里需要的小伙伴请自取! https://editor..net/md/?articleId=124064801
