import json import logging import sys import time from sqlalchemy.exc import IntegrityError from src.settings import RABBIT_REPLY_QUEUE, RABBIT_REPOSITORY_QUEUE, init_logging from src.rabbit import get_channel, consume_task from src.db import AnalyzeTask, DBSession, TaskStatus, User, Repository from src.messages import AnalyzeResponse BATCH_SIZE = 100 RABBIT_CONSUME_TIMEOUT = 4 PREFETCH_COUNT = 20 # only one consumer and quick processing -> can be big def get_user_id(session, name: str): user = session.query(User).filter(User.name == name).first() if user: return user.id new_user = User() new_user.name = name new_user.opt_out = False session.add(new_user) return session.query(User).filter(User.name == name).first().id def process_repositories(session, channel, consume_count: int): added_count = duplicate_count = 0 begin_at = time.monotonic() for ind, data in enumerate( consume_task( channel, RABBIT_REPOSITORY_QUEUE, timeout=RABBIT_CONSUME_TIMEOUT, auto_ack=False, max_count=consume_count, ) ): if data is None: break data = json.loads(data.decode("utf-8")) if session.query(Repository).filter(Repository.id == data["id"]).first(): duplicate_count += 1 continue user_name, repository_name = data["name"].split("/") user_id = get_user_id(session, user_name) new_repository = Repository() new_repository.name = repository_name new_repository.id = data["id"] new_repository.user_id = user_id new_repository.stargazers_count = data["stargazers_count"] new_repository.fork = data["fork"] new_repository.private = data["private"] new_repository.default_branch = data["default_branch"] new_repository.updated_at = data["updated_at"] new_repository.created_at = data["created_at"] new_repository.archived = data["archived"] new_repository.size = data["size"] session.add(new_repository) added_count += 1 if ind % BATCH_SIZE == 0: try: session.commit() except IntegrityError: duplicate_count += 1 session.rollback() else: added_count += 1 logging.info( f"total: {added_count + duplicate_count}, " f"new: {added_count}, " f"duplicates: {duplicate_count} " f"{(added_count + duplicate_count) / (time.monotonic() - begin_at):.2f} items/s" ) def process_task_replies(session, channel, consume_count: int): for ind, data in enumerate( consume_task( channel, RABBIT_REPLY_QUEUE, timeout=RABBIT_CONSUME_TIMEOUT, auto_ack=False, max_count=consume_count, ) ): if data is None: return raw_reply = json.loads(data.decode("utf-8")) task_id = raw_reply["task_id"] db_task = session.query(AnalyzeTask).filter(AnalyzeTask.id == task_id).first() if db_task is None: logging.warning(f"task_id {task_id} not found in DB") continue if raw_reply["status"] == "error": db_task.status = TaskStatus.Error logging.error(f"Error `{raw_reply['message']}` with task_id {task_id}.") continue response = AnalyzeResponse.from_data(raw_reply) db_task.status = TaskStatus.Success db_task.clone_duration = response.clone_duration db_task.duration = response.duration db_task.improvement_absolute = response.improvement_absolute db_task.improvement_relative = response.improvement_relative db_task.worker = response.worker if ind % BATCH_SIZE == 0: session.commit() def main(): consume_count = 0 if len(sys.argv) == 1 else int(sys.argv[1]) session = DBSession() channel = get_channel(prefetch_count=PREFETCH_COUNT) try: process_repositories(session=session, channel=channel, consume_count=consume_count) channel.close() channel = get_channel(prefetch_count=PREFETCH_COUNT) process_task_replies(session=session, channel=channel, consume_count=consume_count) finally: session.commit() channel.close() session.close() if __name__ == "__main__": init_logging() main()