diff --git a/optimizer_worker.py b/optimizer_worker.py index 050326a..d8970fe 100644 --- a/optimizer_worker.py +++ b/optimizer_worker.py @@ -41,13 +41,14 @@ def rabbit_callback(ch, method, properties, body): "task_id": task.task_id } send_reply(ch, json.dumps(res).encode("utf-8")) + ch.basic_ack(delivery_tag=method.delivery_tag) def main(): channel = get_channel() channel.basic_consume( - queue=RABBIT_TASK_QUEUE, on_message_callback=rabbit_callback, auto_ack=True + queue=RABBIT_TASK_QUEUE, on_message_callback=rabbit_callback, auto_ack=False ) channel.start_consuming() diff --git a/src/rabbit.py b/src/rabbit.py index b3d8e68..84fac4c 100644 --- a/src/rabbit.py +++ b/src/rabbit.py @@ -55,7 +55,7 @@ def get_channel(connection: Optional[BlockingConnection] = None) -> Channel: channel.queue_declare(queue=RABBIT_REPLY_QUEUE, **base_queue_params) channel.queue_declare(queue=RABBIT_TASK_QUEUE, **base_queue_params) - channel.basic_qos(prefetch_count=1) + channel.basic_qos(prefetch_count=4) return channel @@ -64,7 +64,11 @@ def send_task(channel: Channel, data: bytes): exchange="", routing_key=RABBIT_TASK_QUEUE, body=data, - properties=BasicProperties(expiration=RABBIT_MESSAGE_TTL, reply_to=RABBIT_REPLY_QUEUE), + properties=BasicProperties( + expiration=RABBIT_MESSAGE_TTL, + reply_to=RABBIT_REPLY_QUEUE, + delivery_mode=2, # make message persistent + ), ) @@ -73,7 +77,10 @@ def send_reply(channel, data: bytes): exchange="", routing_key=RABBIT_REPLY_QUEUE, body=data, - properties=BasicProperties(expiration=RABBIT_MESSAGE_TTL), + properties=BasicProperties( + expiration=RABBIT_MESSAGE_TTL, + # delivery_mode=2, # make message persistent + ), )