From af06978cc970aa36191e7c6f6fde7dd477604658 Mon Sep 17 00:00:00 2001 From: Aleksey Lobanov Date: Mon, 4 Jan 2021 18:22:35 +0300 Subject: [PATCH] feat: multiprocessing for RabbitMQ worker Helps with very long tasks --- optimizer_worker.py | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/optimizer_worker.py b/optimizer_worker.py index d8970fe..c96d4ef 100644 --- a/optimizer_worker.py +++ b/optimizer_worker.py @@ -2,6 +2,8 @@ from tempfile import TemporaryDirectory import json import time import logging +import multiprocessing +import queue from src.git import clone_repo from src.optipng import apply_optipng_recursive @@ -9,6 +11,17 @@ from src.messages import AnalyzeTask, AnalyzeResponse, create_analyze_response from src.rabbit import get_channel, send_reply from src.settings import RABBIT_TASK_QUEUE, init_logging, WORKER_NAME +TASK_DELAY = 0.25 + + +def delay_generator(): + for _ in range(10): + yield 0.05 + for _ in range(10): + yield 0.1 + while True: + yield TASK_DELAY + def process_task(task: AnalyzeTask) -> AnalyzeResponse: with TemporaryDirectory() as repo_path: @@ -27,9 +40,7 @@ def process_task(task: AnalyzeTask) -> AnalyzeResponse: ) -def rabbit_callback(ch, method, properties, body): - task = AnalyzeTask.from_data(json.loads(body)) - logging.info(f"New task url: {task.url}, level: {task.level}") +def _process_main(results_queue: queue.Queue, task: AnalyzeTask): try: res = process_task(task).data res["status"] = "success" @@ -40,6 +51,23 @@ def rabbit_callback(ch, method, properties, body): "message": str(err), "task_id": task.task_id } + results_queue.put(res) + + +def rabbit_callback(ch, method, properties, body): + task = AnalyzeTask.from_data(json.loads(body)) + logging.info(f"New task url: {task.url}, level: {task.level}") + res_queue = multiprocessing.Queue() + method_process = multiprocessing.Process( + target=_process_main, + args=(res_queue, task) + ) + method_process.start() + for delay in delay_generator(): + ch.connection.sleep(delay) + if not method_process.is_alive(): + break + res = res_queue.get() send_reply(ch, json.dumps(res).encode("utf-8")) ch.basic_ack(delivery_tag=method.delivery_tag)