From 3b15ef41f4c35ac605a773eb2714ca41e0d3b111 Mon Sep 17 00:00:00 2001 From: Aleksey Lobanov Date: Sat, 16 Jan 2021 21:58:08 +0300 Subject: [PATCH] scrape: Use RabbitMQ for scraping results --- scrape_repos.py | 96 ++++++++++++++++++++++++++----------------------- 1 file changed, 51 insertions(+), 45 deletions(-) diff --git a/scrape_repos.py b/scrape_repos.py index 0c17ab5..c5459c1 100644 --- a/scrape_repos.py +++ b/scrape_repos.py @@ -1,84 +1,90 @@ """ GitHub API repository scraper -Simple as possible, but not production ready. +RabbitMQ as results backend, saving state just in text file """ import json import os import time -import tqdm +import logging from requests.exceptions import Timeout from github import Github, Repository, RateLimitExceededException, GithubException -BASE_ID = 3544490 # id of python-github package, because we don't need first repositories -OUT_PATH = "repos_scraped.jsl" +from src.rabbit import get_connection, get_channel, send_repository +from src.settings import init_logging, SCRAPING_LAST_ID_PATH + +SAVE_ID_EVERY = 1000 + TIMEOUT_SLEEP = 5 * 60 HEADER_SIZE_TO_READ = 3 * 1024 g = Github(os.getenv("GITHUB_KEY"), per_page=100) -def write_repo(f, r: Repository): +def get_repository_data(r: Repository): try: - data_to_write = ( - json.dumps( - { - "id": r.id, - "name": r.full_name, - "fork": r.fork, - "size": r.size, - "default_branch": r.default_branch, - "stargazers_count": r.stargazers_count, - "updated_at": int(r.updated_at.timestamp()), - "created_at": int(r.created_at.timestamp()), - "private": r.private, - "archived": r.archived, - }, - ensure_ascii=False, - ) - + "\n" - ) + return { + "id": r.id, + "name": r.full_name, + "fork": r.fork, + "size": r.size, + "default_branch": r.default_branch, + "stargazers_count": r.stargazers_count, + "updated_at": int(r.updated_at.timestamp()), + "created_at": int(r.created_at.timestamp()), + "private": r.private, + "archived": r.archived, + } except GithubException: - print("error with", r) + logging.info(f"error with {r}") time.sleep(2) return - f.write(data_to_write.encode("utf-8")) -def get_last_id(path: str) -> int: - if not os.path.exists(path): - print("No base file, return base value", BASE_ID) - return BASE_ID - total_size = os.path.getsize(path) - with open(path, "rb") as f: - f.seek(max(0, total_size - HEADER_SIZE_TO_READ)) - data = f.read() - if not data: - return BASE_ID - last_item = json.loads(data.decode("utf-8").splitlines()[-1]) - return last_item["id"] +def get_last_id() -> int: + if not os.path.exists(SCRAPING_LAST_ID_PATH): + raise Exception(f"No last_id file at: {SCRAPING_LAST_ID_PATH}") + last_id = int(open(SCRAPING_LAST_ID_PATH).read().strip()) + return last_id + + +def save_last_id(val: int): + with open(SCRAPING_LAST_ID_PATH, "w") as f: + f.write(str(val)) def main(): - path = "repos_new.jsl" - f = open(path, "ab") + rabbit_connection = get_connection() + rabbit_channel = get_channel(rabbit_connection) + + processed_count = 0 + last_item_id = get_last_id() while True: - last_item_id = get_last_id(path) try: - for r in tqdm.tqdm(g.get_repos(since=last_item_id)): - write_repo(f, r) + for r in g.get_repos(since=last_item_id): + repository_data = get_repository_data(r) + processed_count += 1 + if not repository_data: + continue + send_repository(rabbit_channel, json.dumps(repository_data).encode("utf-8")) + + last_item_id = repository_data["id"] + if processed_count % SAVE_ID_EVERY: + save_last_id(last_item_id) except RateLimitExceededException: - print("waiting after", get_last_id(path)) + save_last_id(last_item_id) + logging.info(f"waiting after {last_item_id}, processed: {processed_count}") time.sleep(30 * 60) if __name__ == "__main__": - print(get_last_id("repos_new.jsl")) + init_logging() + logging.info(f"last_id = {get_last_id()}") while True: try: main() except Timeout: - print("timeout") + logging.warning("Timeout") time.sleep(TIMEOUT_SLEEP)