feat: Improve processing speed by increase Rabbit fetch count
This commit is contained in:
@@ -13,6 +13,7 @@ from src.messages import AnalyzeResponse
|
|||||||
|
|
||||||
BATCH_SIZE = 100
|
BATCH_SIZE = 100
|
||||||
RABBIT_CONSUME_TIMEOUT = 4
|
RABBIT_CONSUME_TIMEOUT = 4
|
||||||
|
PREFETCH_COUNT = 20 # only one consumer and quick processing -> can be big
|
||||||
|
|
||||||
|
|
||||||
def get_user_id(session, name: str):
|
def get_user_id(session, name: str):
|
||||||
@@ -114,11 +115,11 @@ def process_task_replies(session, channel, consume_count: int):
|
|||||||
def main():
|
def main():
|
||||||
consume_count = 0 if len(sys.argv) == 1 else int(sys.argv[1])
|
consume_count = 0 if len(sys.argv) == 1 else int(sys.argv[1])
|
||||||
session = DBSession()
|
session = DBSession()
|
||||||
channel = get_channel()
|
channel = get_channel(prefetch_count=PREFETCH_COUNT)
|
||||||
try:
|
try:
|
||||||
process_repositories(session=session, channel=channel, consume_count=consume_count)
|
process_repositories(session=session, channel=channel, consume_count=consume_count)
|
||||||
channel.close()
|
channel.close()
|
||||||
channel = get_channel()
|
channel = get_channel(prefetch_count=PREFETCH_COUNT)
|
||||||
process_task_replies(session=session, channel=channel, consume_count=consume_count)
|
process_task_replies(session=session, channel=channel, consume_count=consume_count)
|
||||||
finally:
|
finally:
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|||||||
@@ -39,7 +39,9 @@ def get_connection() -> BlockingConnection:
|
|||||||
return connection
|
return connection
|
||||||
|
|
||||||
|
|
||||||
def get_channel(connection: Optional[BlockingConnection] = None) -> Channel:
|
def get_channel(
|
||||||
|
connection: Optional[BlockingConnection] = None, prefetch_count: int = 4
|
||||||
|
) -> Channel:
|
||||||
if connection:
|
if connection:
|
||||||
_connection = connection
|
_connection = connection
|
||||||
else:
|
else:
|
||||||
@@ -56,7 +58,7 @@ def get_channel(connection: Optional[BlockingConnection] = None) -> Channel:
|
|||||||
channel.queue_declare(queue=RABBIT_TASK_QUEUE, **base_queue_params)
|
channel.queue_declare(queue=RABBIT_TASK_QUEUE, **base_queue_params)
|
||||||
channel.queue_declare(queue=RABBIT_REPOSITORY_QUEUE, **base_queue_params)
|
channel.queue_declare(queue=RABBIT_REPOSITORY_QUEUE, **base_queue_params)
|
||||||
|
|
||||||
channel.basic_qos(prefetch_count=4)
|
channel.basic_qos(prefetch_count=prefetch_count)
|
||||||
return channel
|
return channel
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user