From eb20e30d44fb5e5ecf9e144c3b039c033a9c7e1a Mon Sep 17 00:00:00 2001 From: Aleksey Lobanov Date: Fri, 26 Feb 2021 18:17:33 +0300 Subject: [PATCH] feat: Improve processing speed by increase Rabbit fetch count --- process_responses.py | 5 +++-- src/rabbit.py | 6 ++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/process_responses.py b/process_responses.py index 3badca4..5f24da1 100644 --- a/process_responses.py +++ b/process_responses.py @@ -13,6 +13,7 @@ from src.messages import AnalyzeResponse BATCH_SIZE = 100 RABBIT_CONSUME_TIMEOUT = 4 +PREFETCH_COUNT = 20 # only one consumer and quick processing -> can be big def get_user_id(session, name: str): @@ -114,11 +115,11 @@ def process_task_replies(session, channel, consume_count: int): def main(): consume_count = 0 if len(sys.argv) == 1 else int(sys.argv[1]) session = DBSession() - channel = get_channel() + channel = get_channel(prefetch_count=PREFETCH_COUNT) try: process_repositories(session=session, channel=channel, consume_count=consume_count) channel.close() - channel = get_channel() + channel = get_channel(prefetch_count=PREFETCH_COUNT) process_task_replies(session=session, channel=channel, consume_count=consume_count) finally: session.commit() diff --git a/src/rabbit.py b/src/rabbit.py index 19cf7f2..637ff35 100644 --- a/src/rabbit.py +++ b/src/rabbit.py @@ -39,7 +39,9 @@ def get_connection() -> BlockingConnection: return connection -def get_channel(connection: Optional[BlockingConnection] = None) -> Channel: +def get_channel( + connection: Optional[BlockingConnection] = None, prefetch_count: int = 4 +) -> Channel: if connection: _connection = connection else: @@ -56,7 +58,7 @@ def get_channel(connection: Optional[BlockingConnection] = None) -> Channel: channel.queue_declare(queue=RABBIT_TASK_QUEUE, **base_queue_params) channel.queue_declare(queue=RABBIT_REPOSITORY_QUEUE, **base_queue_params) - channel.basic_qos(prefetch_count=4) + channel.basic_qos(prefetch_count=prefetch_count) return channel