import sys import json import logging import time import subprocess from sqlalchemy.exc import IntegrityError import tqdm from src.db import DBSession, User, Repository from src.settings import init_logging BATCH_SIZE = 1000 def get_lines_count(path: str) -> int: wc_output = subprocess.check_output(["wc", "-l", path]).decode("utf-8") return int(wc_output.split(" ")[0]) def get_user_id(session, name: str): user = session.query(User).filter(User.name == name).first() if user: return user.id new_user = User() new_user.name = name new_user.opt_out = False session.add(new_user) return session.query(User).filter(User.name == name).first().id def main(): session = DBSession() added_count = duplicate_count = 0 begin_at = time.monotonic() for input_path in sys.argv[1:]: for line_ind, line in tqdm.tqdm( enumerate(open(input_path)), desc=input_path, total=get_lines_count(input_path) ): data = json.loads(line) if session.query(Repository).filter(Repository.id == data["id"]).first(): duplicate_count += 1 continue user_name, repository_name = data["name"].split("/") user_id = get_user_id(session, user_name) new_repository = Repository() new_repository.name = repository_name new_repository.id = data["id"] new_repository.user_id = user_id new_repository.stargazers_count = data["stargazers_count"] new_repository.fork = data["fork"] new_repository.private = data["private"] new_repository.default_branch = data["default_branch"] new_repository.updated_at = data["updated_at"] new_repository.created_at = data["created_at"] new_repository.archived = data["archived"] new_repository.size = data["size"] session.add(new_repository) added_count += 1 if line_ind % BATCH_SIZE == 0: try: session.commit() except IntegrityError: duplicate_count += 1 session.rollback() else: added_count += 1 print( f"total: {added_count + duplicate_count}, " f"new: {added_count}, " f"duplicates: {duplicate_count} " f"{(added_count + duplicate_count) / (time.monotonic() - begin_at):.2f} items/s" ) if __name__ == "__main__": init_logging() main()