scrape: Use RabbitMQ for scraping results
This commit is contained in:
@@ -1,30 +1,31 @@
|
|||||||
"""
|
"""
|
||||||
GitHub API repository scraper
|
GitHub API repository scraper
|
||||||
Simple as possible, but not production ready.
|
RabbitMQ as results backend, saving state just in text file
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import tqdm
|
import logging
|
||||||
|
|
||||||
from requests.exceptions import Timeout
|
from requests.exceptions import Timeout
|
||||||
from github import Github, Repository, RateLimitExceededException, GithubException
|
from github import Github, Repository, RateLimitExceededException, GithubException
|
||||||
|
|
||||||
BASE_ID = 3544490 # id of python-github package, because we don't need first repositories
|
from src.rabbit import get_connection, get_channel, send_repository
|
||||||
OUT_PATH = "repos_scraped.jsl"
|
from src.settings import init_logging, SCRAPING_LAST_ID_PATH
|
||||||
|
|
||||||
|
SAVE_ID_EVERY = 1000
|
||||||
|
|
||||||
TIMEOUT_SLEEP = 5 * 60
|
TIMEOUT_SLEEP = 5 * 60
|
||||||
HEADER_SIZE_TO_READ = 3 * 1024
|
HEADER_SIZE_TO_READ = 3 * 1024
|
||||||
|
|
||||||
g = Github(os.getenv("GITHUB_KEY"), per_page=100)
|
g = Github(os.getenv("GITHUB_KEY"), per_page=100)
|
||||||
|
|
||||||
|
|
||||||
def write_repo(f, r: Repository):
|
def get_repository_data(r: Repository):
|
||||||
try:
|
try:
|
||||||
data_to_write = (
|
return {
|
||||||
json.dumps(
|
|
||||||
{
|
|
||||||
"id": r.id,
|
"id": r.id,
|
||||||
"name": r.full_name,
|
"name": r.full_name,
|
||||||
"fork": r.fork,
|
"fork": r.fork,
|
||||||
@@ -35,50 +36,55 @@ def write_repo(f, r: Repository):
|
|||||||
"created_at": int(r.created_at.timestamp()),
|
"created_at": int(r.created_at.timestamp()),
|
||||||
"private": r.private,
|
"private": r.private,
|
||||||
"archived": r.archived,
|
"archived": r.archived,
|
||||||
},
|
}
|
||||||
ensure_ascii=False,
|
|
||||||
)
|
|
||||||
+ "\n"
|
|
||||||
)
|
|
||||||
except GithubException:
|
except GithubException:
|
||||||
print("error with", r)
|
logging.info(f"error with {r}")
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
return
|
return
|
||||||
f.write(data_to_write.encode("utf-8"))
|
|
||||||
|
|
||||||
|
|
||||||
def get_last_id(path: str) -> int:
|
def get_last_id() -> int:
|
||||||
if not os.path.exists(path):
|
if not os.path.exists(SCRAPING_LAST_ID_PATH):
|
||||||
print("No base file, return base value", BASE_ID)
|
raise Exception(f"No last_id file at: {SCRAPING_LAST_ID_PATH}")
|
||||||
return BASE_ID
|
last_id = int(open(SCRAPING_LAST_ID_PATH).read().strip())
|
||||||
total_size = os.path.getsize(path)
|
return last_id
|
||||||
with open(path, "rb") as f:
|
|
||||||
f.seek(max(0, total_size - HEADER_SIZE_TO_READ))
|
|
||||||
data = f.read()
|
def save_last_id(val: int):
|
||||||
if not data:
|
with open(SCRAPING_LAST_ID_PATH, "w") as f:
|
||||||
return BASE_ID
|
f.write(str(val))
|
||||||
last_item = json.loads(data.decode("utf-8").splitlines()[-1])
|
|
||||||
return last_item["id"]
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
path = "repos_new.jsl"
|
rabbit_connection = get_connection()
|
||||||
f = open(path, "ab")
|
rabbit_channel = get_channel(rabbit_connection)
|
||||||
|
|
||||||
|
processed_count = 0
|
||||||
|
last_item_id = get_last_id()
|
||||||
while True:
|
while True:
|
||||||
last_item_id = get_last_id(path)
|
|
||||||
try:
|
try:
|
||||||
for r in tqdm.tqdm(g.get_repos(since=last_item_id)):
|
for r in g.get_repos(since=last_item_id):
|
||||||
write_repo(f, r)
|
repository_data = get_repository_data(r)
|
||||||
|
processed_count += 1
|
||||||
|
if not repository_data:
|
||||||
|
continue
|
||||||
|
send_repository(rabbit_channel, json.dumps(repository_data).encode("utf-8"))
|
||||||
|
|
||||||
|
last_item_id = repository_data["id"]
|
||||||
|
if processed_count % SAVE_ID_EVERY:
|
||||||
|
save_last_id(last_item_id)
|
||||||
except RateLimitExceededException:
|
except RateLimitExceededException:
|
||||||
print("waiting after", get_last_id(path))
|
save_last_id(last_item_id)
|
||||||
|
logging.info(f"waiting after {last_item_id}, processed: {processed_count}")
|
||||||
time.sleep(30 * 60)
|
time.sleep(30 * 60)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
print(get_last_id("repos_new.jsl"))
|
init_logging()
|
||||||
|
logging.info(f"last_id = {get_last_id()}")
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
main()
|
main()
|
||||||
except Timeout:
|
except Timeout:
|
||||||
print("timeout")
|
logging.warning("Timeout")
|
||||||
time.sleep(TIMEOUT_SLEEP)
|
time.sleep(TIMEOUT_SLEEP)
|
||||||
|
|||||||
Reference in New Issue
Block a user