""" GitHub API repository scraper Simple as possible, but not production ready. """ import json import os import time import tqdm from requests.exceptions import Timeout from github import Github, Repository, RateLimitExceededException, GithubException BASE_ID = 3544490 # id of python-github package, because we don't need first repositories OUT_PATH = "repos_scraped.jsl" TIMEOUT_SLEEP = 5 * 60 HEADER_SIZE_TO_READ = 3 * 1024 g = Github(os.getenv("GITHUB_KEY"), per_page=100) def write_repo(f, r: Repository): try: data_to_write = ( json.dumps( { "id": r.id, "name": r.full_name, "fork": r.fork, "size": r.size, "default_branch": r.default_branch, "stargazers_count": r.stargazers_count, "updated_at": int(r.updated_at.timestamp()), "created_at": int(r.created_at.timestamp()), "private": r.private, "archived": r.archived, }, ensure_ascii=False, ) + "\n" ) except GithubException: print("error with", r) time.sleep(2) return f.write(data_to_write.encode("utf-8")) def get_last_id(path: str) -> int: if not os.path.exists(path): print("No base file, return base value", BASE_ID) return BASE_ID total_size = os.path.getsize(path) with open(path, "rb") as f: f.seek(max(0, total_size - HEADER_SIZE_TO_READ)) data = f.read() if not data: return BASE_ID last_item = json.loads(data.decode("utf-8").splitlines()[-1]) return last_item["id"] def main(): path = "repos_new.jsl" f = open(path, "ab") while True: last_item_id = get_last_id(path) try: for r in tqdm.tqdm(g.get_repos(since=last_item_id)): write_repo(f, r) except RateLimitExceededException: print("waiting after", get_last_id(path)) time.sleep(30 * 60) if __name__ == "__main__": print(get_last_id("repos_new.jsl")) while True: try: main() except Timeout: print("timeout") time.sleep(TIMEOUT_SLEEP)