from tempfile import TemporaryDirectory import json import time import logging import multiprocessing import queue from src.git import clone_repo from src.optipng import apply_optipng_recursive from src.messages import AnalyzeTask, AnalyzeResponse, create_analyze_response from src.rabbit import get_channel, send_reply from src.settings import RABBIT_TASK_QUEUE, init_logging, WORKER_NAME TASK_DELAY = 0.25 def delay_generator(): for _ in range(10): yield 0.05 for _ in range(10): yield 0.1 while True: yield TASK_DELAY def process_task(task: AnalyzeTask) -> AnalyzeResponse: with TemporaryDirectory() as repo_path: clone_begin_at = time.monotonic() clone_repo(task.url, repo_path) clone_duration = time.monotonic() - clone_begin_at base_size, after_size = apply_optipng_recursive(repo_path, level=task.level) optipng_duration = time.monotonic() - clone_begin_at - clone_duration return create_analyze_response( task, base_size, after_size, clone_duration=clone_duration, duration=optipng_duration, worker=WORKER_NAME, ) def _process_main(results_queue: queue.Queue, task: AnalyzeTask): try: res = process_task(task).data res["status"] = "success" except Exception as err: logging.exception(f"Error {err} with {task.url}") res = {"status": "error", "message": str(err), "task_id": task.task_id} results_queue.put(res) def rabbit_callback(ch, method, properties, body): task = AnalyzeTask.from_data(json.loads(body)) logging.info(f"New task url: {task.url}, level: {task.level}") res_queue = multiprocessing.Queue() method_process = multiprocessing.Process(target=_process_main, args=(res_queue, task)) method_process.start() for delay in delay_generator(): ch.connection.sleep(delay) if not method_process.is_alive(): break res = res_queue.get() send_reply(ch, json.dumps(res).encode("utf-8")) ch.basic_ack(delivery_tag=method.delivery_tag) def main(): channel = get_channel() channel.basic_consume( queue=RABBIT_TASK_QUEUE, on_message_callback=rabbit_callback, auto_ack=False ) channel.start_consuming() if __name__ == "__main__": init_logging() main()