feat: optipng worker

This commit is contained in:
2021-01-03 15:58:17 +03:00
parent 6ce3a307cc
commit 519374cc06

57
optimizer_worker.py Normal file
View File

@@ -0,0 +1,57 @@
from tempfile import TemporaryDirectory
import json
import time
import logging
from src.git import clone_repo
from src.optipng import apply_optipng_recursive
from src.messages import AnalyzeTask, AnalyzeResponse, create_analyze_response
from src.rabbit import get_channel, send_reply
from src.settings import RABBIT_TASK_QUEUE, init_logging, WORKER_NAME
def process_task(task: AnalyzeTask) -> AnalyzeResponse:
with TemporaryDirectory() as repo_path:
clone_begin_at = time.monotonic()
clone_repo(task.url, repo_path)
clone_duration = time.monotonic() - clone_begin_at
base_size, after_size = apply_optipng_recursive(repo_path, level=task.level)
optipng_duration = time.monotonic() - clone_begin_at - clone_duration
return create_analyze_response(
task,
base_size,
after_size,
clone_duration=clone_duration,
duration=optipng_duration,
worker=WORKER_NAME,
)
def rabbit_callback(ch, method, properties, body):
task = AnalyzeTask.from_data(json.loads(body))
logging.info(f"New task url: {task.url}, level: {task.level}")
try:
res = process_task(task).data
res["status"] = "success"
except Exception as err:
logging.exception(f"Error {err} with {task.url}")
res = {
"status": "error",
"message": str(err),
"task_id": task.task_id
}
send_reply(ch, json.dumps(res).encode("utf-8"))
def main():
channel = get_channel()
channel.basic_consume(
queue=RABBIT_TASK_QUEUE, on_message_callback=rabbit_callback, auto_ack=True
)
channel.start_consuming()
if __name__ == "__main__":
init_logging()
main()