feat: draft version
This commit is contained in:
38
process_responses.py
Normal file
38
process_responses.py
Normal file
@@ -0,0 +1,38 @@
|
||||
import json
|
||||
import logging
|
||||
|
||||
from src.settings import RABBIT_REPLY_QUEUE, init_logging
|
||||
from src.rabbit import get_channel, send_task, consume_task
|
||||
from src.db import AnalyzeTask, DBSession, TaskStatus
|
||||
from src.messages import AnalyzeResponse
|
||||
|
||||
|
||||
def main():
|
||||
session = DBSession()
|
||||
channel = get_channel()
|
||||
for data in consume_task(channel, RABBIT_REPLY_QUEUE, max_count=10):
|
||||
data = data.decode("utf-8")
|
||||
raw_reply = json.loads(data)
|
||||
task_id = raw_reply["task_id"]
|
||||
logging.info(f"New response: {raw_reply}")
|
||||
response = AnalyzeResponse.from_data(raw_reply)
|
||||
db_task = session.query(AnalyzeTask).filter(AnalyzeTask.id == task_id).first()
|
||||
if db_task is None:
|
||||
logging.warning(f"task_id {task_id} not found in DB")
|
||||
continue
|
||||
if raw_reply["status"] == "error":
|
||||
db_task.status = TaskStatus.Error
|
||||
logging.error(f"Error {raw_reply['message']} with task_id {task_id} not found in DB")
|
||||
continue
|
||||
db_task.status = TaskStatus.Success
|
||||
db_task.clone_duration = response.clone_duration
|
||||
db_task.duration = response.duration
|
||||
db_task.improvement_absolute = response.improvement_absolute
|
||||
db_task.improvement_relative = response.improvement_relative
|
||||
db_task.worker = response.worker
|
||||
session.commit()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
init_logging()
|
||||
main()
|
||||
Reference in New Issue
Block a user