38 lines
1.4 KiB
Python
38 lines
1.4 KiB
Python
import json
|
|
import logging
|
|
|
|
from src.settings import RABBIT_REPLY_QUEUE, init_logging
|
|
from src.rabbit import get_channel, send_task, consume_task
|
|
from src.db import AnalyzeTask, DBSession, TaskStatus
|
|
from src.messages import AnalyzeResponse
|
|
|
|
|
|
def main():
|
|
session = DBSession()
|
|
channel = get_channel()
|
|
for data in consume_task(channel, RABBIT_REPLY_QUEUE, max_count=10):
|
|
data = data.decode("utf-8")
|
|
raw_reply = json.loads(data)
|
|
task_id = raw_reply["task_id"]
|
|
logging.info(f"New response: {raw_reply}")
|
|
response = AnalyzeResponse.from_data(raw_reply)
|
|
db_task = session.query(AnalyzeTask).filter(AnalyzeTask.id == task_id).first()
|
|
if db_task is None:
|
|
logging.warning(f"task_id {task_id} not found in DB")
|
|
continue
|
|
if raw_reply["status"] == "error":
|
|
db_task.status = TaskStatus.Error
|
|
logging.error(f"Error {raw_reply['message']} with task_id {task_id} not found in DB")
|
|
continue
|
|
db_task.status = TaskStatus.Success
|
|
db_task.clone_duration = response.clone_duration
|
|
db_task.duration = response.duration
|
|
db_task.improvement_absolute = response.improvement_absolute
|
|
db_task.improvement_relative = response.improvement_relative
|
|
db_task.worker = response.worker
|
|
session.commit()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
init_logging()
|
|
main() |