RabbitMq实现任务分发work queues
适用场景
-
一个生产者和多个消费者 生产者产生任务,按照一定规则分发给消费者:轮询和公平分发
场景如下:
这里使用模拟消费的方式,消费者接收到消息后会休眠一定的时间,以表示正在处理当前的任务。
生产者
先上代码:
# -*- coding: utf-8 -*- import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost)) channel = connection.channel() # queue消息持久化 channel.queue_declare(queue=task_queue, durable=True) message = .join(sys.argv[1:]) or "Hello World!" # 标识消息属性:持久化 channel.basic_publish(exchange=, routing_key=task_queue, body=message, properties=pika.BasicProperties(delivery_mode=2)) print("[x] Sent to %r" %message) connection.close()
详细分析:
-
连接,建立通道,同前,不再赘述,请参考 消息持久化。用于处理当RabbitMq服务退出或崩溃时,消息丢失的问题。步骤有2个: 声明queue为持久化的 durable=True,注意,必须在定义该队列时声明,声明一个已经存在的队列会导致错误。生产者和消费者都要如此定义 标记消息是持久化的 delivery_mode=2 完成以上两步,即使RabbitMq服务重启,未被消费的消息也不会丢失。但这不是绝对的,比如RabbitMq在持久化消息的那一刻断电了,那么消息依然会丢失。
消费者
代码:
# -*- coding: utf-8 -*- import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost)) channel = connection.channel() #消息持久化 channel.queue_declare(queue=task_queue, durable=True) # 处理完毕返回合适的ack def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b.)) print("[x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) # 每次处理一个任务 channel.basic_qos(prefetch_count=1) # 处理完毕发确认包,默认配置 channel.basic_consume(queue=task_queue, on_message_callback=callback) print( [*] Waiting for messages. To exit press CTRL+C) channel.start_consuming()
详细分析:
-
连接、通道、列队消息持久化同前 消息确认。消费者在消费消息时,可以选择是否返回确认标识。默认为发送ack。之前的hello world程序显式地通过设置auto_ack=True不发送ack。 发送ack可以防止消息在消费者处丢失。如果任务的处理时间较长,当一个消费者崩溃时,当前消息和已经分配给该消费者的消息都将丢失。 没有ack时,RabbitMq服务一旦把消息分配给消费者,就立标识删除消息。需要ack时,只有接收到ack,RabbitMq服务才会标识该消息为可删除。 ack机制保证在某个消费者退出后(通道关闭、连接断开或者TCP连接丢失等),未被消费的消息会重新分发给其他消费者 不会有消息超时,因为只要消费者未退出,它处理的消息可以花费很长的时间 默认需要消息确认,可手动关闭。若未关闭,就一定要发送ack,否则消费者重启后会收到重复的消息,而且,RabbitMq服务不会删除已经发送的消息,最终导致内存用尽而崩溃。 ack必须与接收的消息在同一通道发送,否则会引发异常 均等分发任务。RabbitMq默认轮换分发任务,而不管消费者的处理能力如何。这可能会导致总是把耗时的任务分发给同一个消费者,导致消息堆积,而其他消费者空闲却得不到使用。 通过设置 prefetch_count=1,使得RabbitMq不会分发任务到正在处理任务的消费者 也就是说,如果一个消费者正在处理任务而没有返回ack,它会把该任务分发到另外的消费者 注意,如果所有的消费者都在忙碌,消息将在queue中堆积。这会引起问题,这种情况可以使用增加消费者的方法缓解
结果
- 由于设置了ack,一个消费者在处理消息过程中退出,消息会被分发给另外的消费者
- 由于设置了均等分发,一个正在处理任务的消费者不会再次接收消息,该消息被分发到另一个空闲的消费者
- 由于设置了消息持久化,RabbitMq重启后消息不会丢失