rabbit: small qos + acks for reliable messages

This commit is contained in:
2021-01-03 16:32:19 +03:00
parent ca37d31e18
commit d06167070d
2 changed files with 12 additions and 4 deletions

View File

@@ -41,13 +41,14 @@ def rabbit_callback(ch, method, properties, body):
"task_id": task.task_id "task_id": task.task_id
} }
send_reply(ch, json.dumps(res).encode("utf-8")) send_reply(ch, json.dumps(res).encode("utf-8"))
ch.basic_ack(delivery_tag=method.delivery_tag)
def main(): def main():
channel = get_channel() channel = get_channel()
channel.basic_consume( channel.basic_consume(
queue=RABBIT_TASK_QUEUE, on_message_callback=rabbit_callback, auto_ack=True queue=RABBIT_TASK_QUEUE, on_message_callback=rabbit_callback, auto_ack=False
) )
channel.start_consuming() channel.start_consuming()

View File

@@ -55,7 +55,7 @@ def get_channel(connection: Optional[BlockingConnection] = None) -> Channel:
channel.queue_declare(queue=RABBIT_REPLY_QUEUE, **base_queue_params) channel.queue_declare(queue=RABBIT_REPLY_QUEUE, **base_queue_params)
channel.queue_declare(queue=RABBIT_TASK_QUEUE, **base_queue_params) channel.queue_declare(queue=RABBIT_TASK_QUEUE, **base_queue_params)
channel.basic_qos(prefetch_count=1) channel.basic_qos(prefetch_count=4)
return channel return channel
@@ -64,7 +64,11 @@ def send_task(channel: Channel, data: bytes):
exchange="", exchange="",
routing_key=RABBIT_TASK_QUEUE, routing_key=RABBIT_TASK_QUEUE,
body=data, body=data,
properties=BasicProperties(expiration=RABBIT_MESSAGE_TTL, reply_to=RABBIT_REPLY_QUEUE), properties=BasicProperties(
expiration=RABBIT_MESSAGE_TTL,
reply_to=RABBIT_REPLY_QUEUE,
delivery_mode=2, # make message persistent
),
) )
@@ -73,7 +77,10 @@ def send_reply(channel, data: bytes):
exchange="", exchange="",
routing_key=RABBIT_REPLY_QUEUE, routing_key=RABBIT_REPLY_QUEUE,
body=data, body=data,
properties=BasicProperties(expiration=RABBIT_MESSAGE_TTL), properties=BasicProperties(
expiration=RABBIT_MESSAGE_TTL,
# delivery_mode=2, # make message persistent
),
) )