首先,我们先看下这样的业务场景,在消息发出后,Consumer
接收到了生产者所发出的消息,但在Consumer
突然出错崩溃,或者异常退出了,但是生产者消息已经发出来了,那么这个消息可能就会丢失,为了解决这样的问题,RabbitMQ
引入了ack
机制。
消费者在订阅队列时,可以指定autoAck
参数,当autoAck
等于false
时,RabbitMQ
会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当autoAck
等于true
时,RabbitMQ
会⾃自动把发送出去的消息置为确认, 然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
当采用ack消息确认机制后,只要将autoAck
设置为false
。消费者就可以有足够的时间来处理消息,而不用担心消费过程中突然异常退出导致消息丢失的情况,因为RabbitMQ
会一直持有消息,直到消费者调用basic.ack
为止。在这种情况下,对于RabbitMQ
来说,队列中的消息就可以分为两部分,一部分是等待发送给消费者的消息,另外一部分就是等待接收消费者确认的消息。那么如果在这个时候,消费者突然发生中断,在消费中的消息会怎么处理呢?
如果RabbitMQ
一直没有接收到消费者的确认消息,并且消费者的连接已经关闭,那么RabbitMQ
就会重新将让消息进入队列中,等待下一个消费者消费。
RabbitMQ
不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。
参考如下代码,我们设置了auto_ack=True
:
import pika
from time import sleep
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
sleep(1)
channel.basic_consume(
queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
可以看到,如果我们设置auto_ack=True
之后,虽然我们在消费的时候有休眠1s,而且这个时候消息还没有全部消费完,但是在后台看到队列中的消息已经被消费完了,这个原因是当消费者连接上队列了,因为没有指定消费者一次获取消息的条数,所以队列把队列中的所有消息一下子推送到消费者端,当消费者订阅的该队列,消息就会从队列推到客户端,当消息从队列被推出的时的那一刻就表示已经对消息进行自动确认了,消息就会从队列中删除。
下面我们再看看设置auto_ack=False
的情况:
import pika
from time import sleep
user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
sleep(1)
channel.basic_consume(
queue='hello', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
运行结果如下:
可以看到,队列中的消息都变成了
unacked
状态,这是为什么呢?
我们上面有说过,rabbitmq
需要等到消费者显示的调用basic.ack
,要不然的话rabbitmq
会一直持有这些消息,如果我们在这个时候再启动一个消费者的话,可以看到这些消息还会再次被消费。为了解决这个问题,我们只要修改下callback
方法如下:
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
sleep(1)
# delivery_tag 是在 channel 中的一个消息计数, 每次消息提取行为都对应一个数字.
ch.basic_ack(delivery_tag=method.delivery_tag)
就可以,这时,我们重启消费者,可以看到我们的消息会被正常的消费,并且队列中消息的不会被瞬间清空,而是按照我们的消费速度一个一个的删除。
消费者在接收到消息之后,还可以拒绝消息,我们只需要调用basic_reject
就可以,如下:
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
sleep(1)
# delivery_tag 是在 channel 中的一个消息计数, 每次消息提取行为都对应一个数字.
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
requeue
参数的意思是被拒绝的这个消息是否需要重新进入队列,默认是True
。
11
不能发自己的blog吗
test comment
<h1>Hello</h1>
test comment