180 lines
5.4 KiB
Python
180 lines
5.4 KiB
Python
#!/usr/bin/python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import time
|
|
import tempfile
|
|
import logging
|
|
|
|
|
|
import requests
|
|
import grequests as async # requires grequests
|
|
|
|
|
|
__author__ = "Aleksey Lobanov"
|
|
__license__ = "MIT"
|
|
__email__ = "i@likemath.ru"
|
|
|
|
SLEEP_STATUS_CODE = 429
|
|
SLEEP_TIME = 60
|
|
REQUESTS_NEED_VERIFY = False
|
|
|
|
|
|
def getTempfileName(ext=""):
|
|
"""Returns full path to temp file with with `ext` extension"""
|
|
|
|
return os.path.join(
|
|
tempfile.gettempdir(),
|
|
sys.argv[0] + next(tempfile._get_candidate_names()) + ext
|
|
)
|
|
|
|
|
|
def getGamesListUrl(user_name, page, block_size=100):
|
|
"""Returns lichess url for downloading list of games"""
|
|
|
|
URL_TEMPLATE = "https://en.lichess.org/api/user/{}/games?page={}&nb={}"
|
|
return URL_TEMPLATE.format(user_name, page, block_size)
|
|
|
|
|
|
def getGamePgnUrl(game_id):
|
|
"""Returns lichess url for game with `game_id` id"""
|
|
|
|
URL_TEMPLATE = "https://en.lichess.org/game/export/{}.pgn"
|
|
return URL_TEMPLATE.format(game_id)
|
|
|
|
|
|
def getGamesList(user_name):
|
|
"""
|
|
Returns generator that returns chunks with id:
|
|
["game1","game2",..], next chunk
|
|
"""
|
|
global SLEEP_STATUS_CODE
|
|
|
|
games_count = 0
|
|
cur_page_ind = 1
|
|
|
|
while True:
|
|
r = requests.get(getGamesListUrl(user_name, cur_page_ind))
|
|
if r.status_code == SLEEP_STATUS_CODE:
|
|
logging.debug("Sleeping by status code")
|
|
time.sleep(SLEEP_TIME)
|
|
continue
|
|
elif r.status_code == 200:
|
|
logging.info("New page {} with game identifiers".format(
|
|
cur_page_ind
|
|
))
|
|
r_json = r.json()
|
|
cur_ids = [game['id'] for game in r_json["currentPageResults"]]
|
|
games_count += len(cur_ids)
|
|
yield tuple(cur_ids)
|
|
|
|
if r_json['nextPage'] is None:
|
|
logging.debug("Last page")
|
|
break
|
|
else:
|
|
cur_page_ind += 1
|
|
logging.info("Downloaded {} game identifiers".format(
|
|
games_count
|
|
))
|
|
|
|
|
|
def downloadGamesToFile(game_ids, file_name, thread_count):
|
|
"""
|
|
Creates big pool of async pgn downloads.
|
|
It is assumed that loading png is always successful (no 429 errors).
|
|
All pgns saved to one file
|
|
game_ids is chunk generator
|
|
"""
|
|
|
|
with open(file_name, "wb") as file_pgn:
|
|
# imap returns generator
|
|
for chunk in game_ids:
|
|
reqs = [async.get(getGamePgnUrl(game_id)) for game_id in chunk]
|
|
for req in async.imap(reqs, size=thread_count):
|
|
try:
|
|
logging.info("Downloaded {}".format(req.url))
|
|
file_pgn.write((req.text + "\n\n").encode("utf-8"))
|
|
finally:
|
|
for req_prev in req.history:
|
|
req_prev.release_conn()
|
|
req.raw.release_conn()
|
|
|
|
|
|
def writePgn(user_name, file_name, thread_count):
|
|
"""Write pgn for selected usern to file"""
|
|
|
|
downloadGamesToFile(getGamesList(user_name), file_name, thread_count)
|
|
|
|
|
|
def createScidFromPgn(pgn_name, scid_name, is_save_pgn=False):
|
|
logging.debug("Saving to scid base: {}".format(scid_name))
|
|
if (os.path.isfile(scid_name + ".si4") or
|
|
os.path.isfile(scid_name + ".sg4") or
|
|
os.path.isfile(scid_name + ".sn4")):
|
|
logging.warning("Scid base already exists!")
|
|
sys.exit(1)
|
|
|
|
logging.debug("Executing: " + 'pgnscid -f "{}" "{}"'.format(
|
|
pgn_name, scid_name
|
|
))
|
|
os.system('pgnscid -f "{}" "{}"'.format(pgn_name, scid_name))
|
|
|
|
if not is_save_pgn:
|
|
logging.debug("Removing old file: {}".format(pgn_name))
|
|
os.remove(pgn_name)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
description="Tool for export your games to Scid or PGN"
|
|
)
|
|
parser.add_argument("-n", "--name", help='Your lichess id', required=True)
|
|
parser.add_argument(
|
|
"-t", "--type", nargs='?', choices=['pgn', 'scid'],
|
|
help='Output type', default='scid', const='scid'
|
|
)
|
|
parser.add_argument(
|
|
"--threads", help='Number of threads for PGN downloading',
|
|
default=4, type=int
|
|
)
|
|
parser.add_argument("-o", "--output", help='Output filename', required=True)
|
|
parser.add_argument(
|
|
"--logging", nargs='?', choices=['off', 'info', 'debug'],
|
|
help='Verbosity of logging', default='info', const='info'
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
LOGGING_SELECTOR = {
|
|
"off": logging.ERROR,
|
|
"info": logging.INFO,
|
|
"debug": logging.DEBUG
|
|
}
|
|
logging.basicConfig(
|
|
format="%(asctime)s %(levelname)-8s %(message)s",
|
|
level=LOGGING_SELECTOR[args.logging],
|
|
datefmt="%H:%M:%S"
|
|
)
|
|
args.name = args.name.lower()
|
|
|
|
logging.debug(("Arguments: name - {}, type - {}," +
|
|
"threads - {}, out - {}, logging - {}").format(
|
|
args.name, args.type, args.threads, args.output, args.logging
|
|
))
|
|
|
|
temp_pgn_filename = getTempfileName('.pgn')
|
|
logging.debug("Temp pgn filename: {}".format(temp_pgn_filename))
|
|
writePgn(args.name, temp_pgn_filename, args.threads)
|
|
|
|
if args.type == 'pgn':
|
|
if args.output.lower().split('.')[-1] != 'pgn':
|
|
args.output += '.pgn'
|
|
shutil.move(temp_pgn_filename, args.output)
|
|
elif args.type == 'scid':
|
|
createScidFromPgn(temp_pgn_filename, args.output)
|
|
else:
|
|
logging.error("Wrong type of file after argparse checking")
|
|
sys.exit(1)
|