import json import logging from src.settings import RABBIT_REPLY_QUEUE, init_logging from src.rabbit import get_channel, send_task, consume_task from src.db import AnalyzeTask, DBSession, TaskStatus from src.messages import AnalyzeResponse def main(): session = DBSession() channel = get_channel() for data in consume_task(channel, RABBIT_REPLY_QUEUE, max_count=10): data = data.decode("utf-8") raw_reply = json.loads(data) task_id = raw_reply["task_id"] logging.info(f"New response: {raw_reply}") response = AnalyzeResponse.from_data(raw_reply) db_task = session.query(AnalyzeTask).filter(AnalyzeTask.id == task_id).first() if db_task is None: logging.warning(f"task_id {task_id} not found in DB") continue if raw_reply["status"] == "error": db_task.status = TaskStatus.Error logging.error(f"Error {raw_reply['message']} with task_id {task_id} not found in DB") continue db_task.status = TaskStatus.Success db_task.clone_duration = response.clone_duration db_task.duration = response.duration db_task.improvement_absolute = response.improvement_absolute db_task.improvement_relative = response.improvement_relative db_task.worker = response.worker session.commit() if __name__ == "__main__": init_logging() main()