RabbitMq实现任务分发work queues

适用场景


    一个生产者和多个消费者 生产者产生任务,按照一定规则分发给消费者:轮询和公平分发

场景如下:

这里使用模拟消费的方式,消费者接收到消息后会休眠一定的时间,以表示正在处理当前的任务。

生产者


先上代码:

# -*- coding: utf-8 -*-
import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel = connection.channel()

# queue消息持久化
channel.queue_declare(queue=task_queue, durable=True)

message =  .join(sys.argv[1:]) or "Hello World!"
# 标识消息属性:持久化
channel.basic_publish(exchange=, routing_key=task_queue, body=message, properties=pika.BasicProperties(delivery_mode=2))

print("[x] Sent to %r" %message)
connection.close()

详细分析:

    连接,建立通道,同前,不再赘述,请参考 消息持久化。用于处理当RabbitMq服务退出或崩溃时,消息丢失的问题。步骤有2个: 声明queue为持久化的 durable=True,注意,必须在定义该队列时声明,声明一个已经存在的队列会导致错误。生产者和消费者都要如此定义 标记消息是持久化的 delivery_mode=2 完成以上两步,即使RabbitMq服务重启,未被消费的消息也不会丢失。但这不是绝对的,比如RabbitMq在持久化消息的那一刻断电了,那么消息依然会丢失。

消费者


代码:

# -*- coding: utf-8 -*-
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost))
channel = connection.channel()

#消息持久化
channel.queue_declare(queue=task_queue, durable=True)

# 处理完毕返回合适的ack
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b.))
    print("[x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 每次处理一个任务
channel.basic_qos(prefetch_count=1)

# 处理完毕发确认包,默认配置
channel.basic_consume(queue=task_queue, on_message_callback=callback)
print( [*] Waiting for messages. To exit press CTRL+C)

channel.start_consuming()

详细分析:

    连接、通道、列队消息持久化同前 消息确认。消费者在消费消息时,可以选择是否返回确认标识。默认为发送ack。之前的hello world程序显式地通过设置auto_ack=True不发送ack。 发送ack可以防止消息在消费者处丢失。如果任务的处理时间较长,当一个消费者崩溃时,当前消息和已经分配给该消费者的消息都将丢失。 没有ack时,RabbitMq服务一旦把消息分配给消费者,就立标识删除消息。需要ack时,只有接收到ack,RabbitMq服务才会标识该消息为可删除。 ack机制保证在某个消费者退出后(通道关闭、连接断开或者TCP连接丢失等),未被消费的消息会重新分发给其他消费者 不会有消息超时,因为只要消费者未退出,它处理的消息可以花费很长的时间 默认需要消息确认,可手动关闭。若未关闭,就一定要发送ack,否则消费者重启后会收到重复的消息,而且,RabbitMq服务不会删除已经发送的消息,最终导致内存用尽而崩溃。 ack必须与接收的消息在同一通道发送,否则会引发异常 均等分发任务。RabbitMq默认轮换分发任务,而不管消费者的处理能力如何。这可能会导致总是把耗时的任务分发给同一个消费者,导致消息堆积,而其他消费者空闲却得不到使用。 通过设置 prefetch_count=1,使得RabbitMq不会分发任务到正在处理任务的消费者 也就是说,如果一个消费者正在处理任务而没有返回ack,它会把该任务分发到另外的消费者 注意,如果所有的消费者都在忙碌,消息将在queue中堆积。这会引起问题,这种情况可以使用增加消费者的方法缓解

结果


  1. 由于设置了ack,一个消费者在处理消息过程中退出,消息会被分发给另外的消费者
  2. 由于设置了均等分发,一个正在处理任务的消费者不会再次接收消息,该消息被分发到另一个空闲的消费者
  3. 由于设置了消息持久化,RabbitMq重启后消息不会丢失
参考资料

经验分享 程序员 微信小程序 职场和发展