From f323dc2f415b51157f54f32b84627497311bfd21 Mon Sep 17 00:00:00 2001 From: Aleksey Lobanov Date: Sat, 16 Jan 2021 22:09:06 +0300 Subject: [PATCH] processing: rewritten processing pipeline --- process_responses.py | 111 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 102 insertions(+), 9 deletions(-) diff --git a/process_responses.py b/process_responses.py index ad3d6ae..3badca4 100644 --- a/process_responses.py +++ b/process_responses.py @@ -1,21 +1,97 @@ import json import logging +import sys +import time -from src.settings import RABBIT_REPLY_QUEUE, init_logging -from src.rabbit import get_channel, send_task, consume_task -from src.db import AnalyzeTask, DBSession, TaskStatus +from sqlalchemy.exc import IntegrityError + +from src.settings import RABBIT_REPLY_QUEUE, RABBIT_REPOSITORY_QUEUE, init_logging +from src.rabbit import get_channel, consume_task +from src.db import AnalyzeTask, DBSession, TaskStatus, User, Repository from src.messages import AnalyzeResponse -def main(): - session = DBSession() - channel = get_channel() - for data in consume_task(channel, RABBIT_REPLY_QUEUE, max_count=10): +BATCH_SIZE = 100 +RABBIT_CONSUME_TIMEOUT = 4 + + +def get_user_id(session, name: str): + user = session.query(User).filter(User.name == name).first() + if user: + return user.id + new_user = User() + new_user.name = name + new_user.opt_out = False + session.add(new_user) + return session.query(User).filter(User.name == name).first().id + + +def process_repositories(session, channel, consume_count: int): + added_count = duplicate_count = 0 + begin_at = time.monotonic() + for ind, data in enumerate( + consume_task( + channel, + RABBIT_REPOSITORY_QUEUE, + timeout=RABBIT_CONSUME_TIMEOUT, + auto_ack=False, + max_count=consume_count, + ) + ): + if data is None: + break + data = json.loads(data.decode("utf-8")) + if session.query(Repository).filter(Repository.id == data["id"]).first(): + duplicate_count += 1 + continue + user_name, repository_name = data["name"].split("/") + user_id = get_user_id(session, user_name) + new_repository = Repository() + new_repository.name = repository_name + new_repository.id = data["id"] + new_repository.user_id = user_id + new_repository.stargazers_count = data["stargazers_count"] + new_repository.fork = data["fork"] + new_repository.private = data["private"] + new_repository.default_branch = data["default_branch"] + new_repository.updated_at = data["updated_at"] + new_repository.created_at = data["created_at"] + new_repository.archived = data["archived"] + new_repository.size = data["size"] + session.add(new_repository) + added_count += 1 + if ind % BATCH_SIZE == 0: + try: + session.commit() + except IntegrityError: + duplicate_count += 1 + session.rollback() + else: + added_count += 1 + logging.info( + f"total: {added_count + duplicate_count}, " + f"new: {added_count}, " + f"duplicates: {duplicate_count} " + f"{(added_count + duplicate_count) / (time.monotonic() - begin_at):.2f} items/s" + ) + + +def process_task_replies(session, channel, consume_count: int): + for ind, data in enumerate( + consume_task( + channel, + RABBIT_REPLY_QUEUE, + timeout=RABBIT_CONSUME_TIMEOUT, + auto_ack=False, + max_count=consume_count, + ) + ): + if data is None: + return data = data.decode("utf-8") raw_reply = json.loads(data) task_id = raw_reply["task_id"] logging.info(f"New response: {raw_reply}") - response = AnalyzeResponse.from_data(raw_reply) db_task = session.query(AnalyzeTask).filter(AnalyzeTask.id == task_id).first() if db_task is None: logging.warning(f"task_id {task_id} not found in DB") @@ -24,15 +100,32 @@ def main(): db_task.status = TaskStatus.Error logging.error(f"Error {raw_reply['message']} with task_id {task_id} not found in DB") continue + response = AnalyzeResponse.from_data(raw_reply) db_task.status = TaskStatus.Success db_task.clone_duration = response.clone_duration db_task.duration = response.duration db_task.improvement_absolute = response.improvement_absolute db_task.improvement_relative = response.improvement_relative db_task.worker = response.worker + if ind % BATCH_SIZE == 0: + session.commit() + + +def main(): + consume_count = 0 if len(sys.argv) == 1 else int(sys.argv[1]) + session = DBSession() + channel = get_channel() + try: + process_repositories(session=session, channel=channel, consume_count=consume_count) + channel.close() + channel = get_channel() + process_task_replies(session=session, channel=channel, consume_count=consume_count) + finally: session.commit() + channel.close() + session.close() if __name__ == "__main__": init_logging() - main() \ No newline at end of file + main()