rabbit: Improve task sending

This commit is contained in:
2021-01-16 21:53:04 +03:00
parent ba5706e5e3
commit 0647efe18f

View File

@@ -1,5 +1,4 @@
import logging import logging
import datetime
from typing import Optional from typing import Optional
import pika import pika
@@ -14,9 +13,9 @@ from src.settings import (
RABBIT_CREDENTIALS, RABBIT_CREDENTIALS,
RABBIT_TASK_QUEUE, RABBIT_TASK_QUEUE,
RABBIT_REPLY_QUEUE, RABBIT_REPLY_QUEUE,
RABBIT_REPOSITORY_QUEUE,
MESSAGE_TTL, MESSAGE_TTL,
) )
from src.messages import AnalyzeTask, AnalyzeResponse
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
_logger.addHandler(logging.NullHandler()) _logger.addHandler(logging.NullHandler())
@@ -54,24 +53,36 @@ def get_channel(connection: Optional[BlockingConnection] = None) -> Channel:
base_queue_params["arguments"] = {"x-max-priority": QUEUE_MAX_PRIORITY} base_queue_params["arguments"] = {"x-max-priority": QUEUE_MAX_PRIORITY}
channel.queue_declare(queue=RABBIT_REPLY_QUEUE, **base_queue_params) channel.queue_declare(queue=RABBIT_REPLY_QUEUE, **base_queue_params)
channel.queue_declare(queue=RABBIT_TASK_QUEUE, **base_queue_params) channel.queue_declare(queue=RABBIT_TASK_QUEUE, **base_queue_params)
channel.queue_declare(queue=RABBIT_REPOSITORY_QUEUE, **base_queue_params)
channel.basic_qos(prefetch_count=4) channel.basic_qos(prefetch_count=4)
return channel return channel
def send_task(channel: Channel, data: bytes): def _basic_send(channel: Channel, queue: str, data: bytes, priority: int = 0, delivery_mode=None):
channel.basic_publish( channel.basic_publish(
exchange="", exchange="",
routing_key=RABBIT_TASK_QUEUE, routing_key=queue,
body=data, body=data,
properties=BasicProperties( properties=BasicProperties(
expiration=RABBIT_MESSAGE_TTL, expiration=RABBIT_MESSAGE_TTL,
reply_to=RABBIT_REPLY_QUEUE, reply_to=RABBIT_REPLY_QUEUE,
delivery_mode=2, # make message persistent priority=priority,
delivery_mode=delivery_mode,
), ),
) )
def send_repository(channel: Channel, data: bytes):
_basic_send(
channel=channel, data=data, priority=0, delivery_mode=2, queue=RABBIT_REPOSITORY_QUEUE
)
def send_task(channel: Channel, data: bytes, priority: int = 0):
_basic_send(channel=channel, data=data, priority=priority, queue=RABBIT_TASK_QUEUE)
def send_reply(channel, data: bytes): def send_reply(channel, data: bytes):
channel.basic_publish( channel.basic_publish(
exchange="", exchange="",
@@ -79,7 +90,6 @@ def send_reply(channel, data: bytes):
body=data, body=data,
properties=BasicProperties( properties=BasicProperties(
expiration=RABBIT_MESSAGE_TTL, expiration=RABBIT_MESSAGE_TTL,
# delivery_mode=2, # make message persistent
), ),
) )