From 0647efe18f478b22b770964b08ed870194d69975 Mon Sep 17 00:00:00 2001 From: Aleksey Lobanov Date: Sat, 16 Jan 2021 21:53:04 +0300 Subject: [PATCH] rabbit: Improve task sending --- src/rabbit.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/rabbit.py b/src/rabbit.py index c90862f..bedbf17 100644 --- a/src/rabbit.py +++ b/src/rabbit.py @@ -1,5 +1,4 @@ import logging -import datetime from typing import Optional import pika @@ -14,9 +13,9 @@ from src.settings import ( RABBIT_CREDENTIALS, RABBIT_TASK_QUEUE, RABBIT_REPLY_QUEUE, + RABBIT_REPOSITORY_QUEUE, MESSAGE_TTL, ) -from src.messages import AnalyzeTask, AnalyzeResponse _logger = logging.getLogger(__name__) _logger.addHandler(logging.NullHandler()) @@ -54,24 +53,36 @@ def get_channel(connection: Optional[BlockingConnection] = None) -> Channel: base_queue_params["arguments"] = {"x-max-priority": QUEUE_MAX_PRIORITY} channel.queue_declare(queue=RABBIT_REPLY_QUEUE, **base_queue_params) channel.queue_declare(queue=RABBIT_TASK_QUEUE, **base_queue_params) + channel.queue_declare(queue=RABBIT_REPOSITORY_QUEUE, **base_queue_params) channel.basic_qos(prefetch_count=4) return channel -def send_task(channel: Channel, data: bytes): +def _basic_send(channel: Channel, queue: str, data: bytes, priority: int = 0, delivery_mode=None): channel.basic_publish( exchange="", - routing_key=RABBIT_TASK_QUEUE, + routing_key=queue, body=data, properties=BasicProperties( expiration=RABBIT_MESSAGE_TTL, reply_to=RABBIT_REPLY_QUEUE, - delivery_mode=2, # make message persistent + priority=priority, + delivery_mode=delivery_mode, ), ) +def send_repository(channel: Channel, data: bytes): + _basic_send( + channel=channel, data=data, priority=0, delivery_mode=2, queue=RABBIT_REPOSITORY_QUEUE + ) + + +def send_task(channel: Channel, data: bytes, priority: int = 0): + _basic_send(channel=channel, data=data, priority=priority, queue=RABBIT_TASK_QUEUE) + + def send_reply(channel, data: bytes): channel.basic_publish( exchange="", @@ -79,7 +90,6 @@ def send_reply(channel, data: bytes): body=data, properties=BasicProperties( expiration=RABBIT_MESSAGE_TTL, - # delivery_mode=2, # make message persistent ), )