RabbitMQ第二种模型--workqueue

Work queues,也被称为(Task queues),任务模型。奉行的是平均主义 让多个消费者绑定到一个队列,共同消费队列中的消息。 队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。两个消费者平分队列中的消息

生产者

package com.zuoan.wrokqueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zuoan.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * @Description: TODO
 * @Author: 黄石军
 * @CreateTime: 2022/4/9 17:57
 * @Company:
 */
public class Provider {
          
   
    public static void main(String[] args) throws IOException {
          
   
        //创建连接
        Connection connection = RabbitMQUtils.getConnection("192.168.128.100");
        //创建通道
        Channel channel = connection.createChannel();

        //通过channel声明队列
        //参数一:队列名称,参数二:是否持久化,参数三:是否独占队列,参数四:是否在消费完成后自动删除队列
        channel.queueDeclare("work",false,false,false,null);

        for (int i = 0; i < 20; i++) {
          
   
            //发布消息
            //参数一:交换机名称,参数二:队列名称,参数三:消息额外属性,参数四:消息内容
            channel.basicPublish("","work",null,("hello workQueue" + i).getBytes());
        }

        //关闭资源
        RabbitMQUtils.closeChannelAndConnection(channel,connection);
    }
}

消费者1

package com.zuoan.wrokqueue;

import com.rabbitmq.client.*;
import com.zuoan.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * @Description: TODO
 * @Author: 黄石军
 * @CreateTime: 2022/4/9 18:09
 * @Company:
 */
public class Consumer1 {
          
   
    public static void main(String[] args) throws IOException {
          
   
        Connection connection = RabbitMQUtils.getConnection("192.168.128.100");
        //创建连接通道
        Channel channel1 = connection.createChannel();
        //通道绑定对象
        channel1.queueDeclare("work",false,false,false,null);

        //消费消息
        //参数一:队列名称 参数二:开始消费时自动确认机制 参数三:消费时的回调接口
        channel1.basicConsume("work",true,new DefaultConsumer(channel1){
          
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          
   
                try {
          
   
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
          
   
                    e.printStackTrace();
                }
                System.out.println("消费者-1 :"+ new String(body));
            }
        });
    }
}

消费者2

package com.zuoan.wrokqueue;

import com.rabbitmq.client.*;
import com.zuoan.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * @Description: TODO
 * @Author: 黄石军
 * @CreateTime: 2022/4/9 18:09
 * @Company:
 */
public class Consumer2 {
          
   
    public static void main(String[] args) throws IOException {
          
   
        Connection connection = RabbitMQUtils.getConnection("192.168.128.100");
        //创建连接通道
        Channel channel2 = connection.createChannel();
        //通道绑定对象
        channel2.queueDeclare("work",false,false,false,null);

        //消费消息
        //参数一:队列名称 参数二:开始消费时自动确认机制 参数三:消费时的回调接口
        channel2.basicConsume("work",true,new DefaultConsumer(channel2){
          
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          
   
                System.out.println("消费者-1 :"+ new String(body));
            }
        });
    }
}

工具类在我博客分类MQ的第一章里需要的小伙伴请自取! https://editor..net/md/?articleId=124064801

经验分享 程序员 微信小程序 职场和发展