diff --git a/main.py b/main.py new file mode 100644 index 0000000..d9bb1d6 --- /dev/null +++ b/main.py @@ -0,0 +1,192 @@ +import argparse +import io +import json +import logging +import os.path +import re +import sys +from os.path import expanduser +from typing import Mapping, Set, Tuple + +import minio +import tqdm +from tenacity import RetryError, retry, stop_after_delay, wait_exponential + +default_retry_args = dict( + wait=wait_exponential(multiplier=1, min=0.5, max=10), stop=stop_after_delay(20) +) + +s3_config_data = json.loads(open(expanduser("~/.mc/config.json")).read())["aliases"] + + +def get_files(s3, bucket, prefix) -> Mapping[str, Tuple[int, str]]: + res = {} + prefix_len = len(prefix) + for obj in s3.list_objects(bucket, prefix=prefix, recursive=True): + if obj.is_dir: + continue + res[obj.object_name[prefix_len:].lstrip("/")] = (obj.size, obj.etag) + return res + + +def save_files(files: Mapping[str, Tuple[int, str]], path): + processed_files = {key: list(val) for key, val in files.items()} + with open(path, "w") as f: + f.write( + json.dumps(processed_files, indent=4, ensure_ascii=False, sort_keys=True) + ) + + +def get_file_metrics(files: Mapping[str, Tuple[int, str]]): + total_size = sum([x[0] for x in files.values()]) + return f"total {len(files)} files with size: {total_size}" + + +def build_s3(config_data, name): + try: + s3_credentials = config_data[name] + except KeyError: + print(f"Unknown s3: {name}") + sys.exit(1) + + return minio.Minio( + endpoint=re.sub(r"^https?://", "", s3_credentials["url"]), + access_key=s3_credentials["accessKey"], + secret_key=s3_credentials["secretKey"], + secure=s3_credentials["url"].startswith("https"), + ) + + +def get_redundant_files( + source: Mapping[str, Tuple[int, str]], target: Mapping[str, Tuple[int, str]] +) -> Set[str]: + """ + :return: files that exists in target but not in source. Equal to `target - source` + """ + res = set() + for key in target: + if key in source: + continue + res.add(key) + return res + + +def get_different_files( + source: Mapping[str, Tuple[int, str]], target: Mapping[str, Tuple[int, str]] +) -> Set[str]: + """ + :return: All key that key in source and target and different values + """ + res = set() + for key in source: + if key in target and source[key] != target[key]: + res.add(key) + return res + + +@retry(**default_retry_args) +def get_object_data(s3: minio.Minio, bucket: str, object_name: str) -> bytes: + return s3.get_object(bucket_name=bucket, object_name=object_name).read() + + +@retry(**default_retry_args) +def put_object_data(s3: minio.Minio, bucket: str, object_name: str, data: bytes): + s3.put_object( + bucket, + object_name, + io.BytesIO(data), + len(data), + ) + + +def parse_args(): + parser = argparse.ArgumentParser(description="Reliable s3-mirror with logs") + parser.add_argument( + "--source", type=str, required=True, help="source path (with s3)" + ) + parser.add_argument( + "--target", type=str, required=True, help="source path (with s3)" + ) + parser.add_argument( + "-r", + "--remove", + default=False, + action="store_true", + help="remove redundant files", + ) + return parser.parse_args() + + +def extract_s3_parts(val) -> Tuple[str, str, str]: + s3_alias, bucket, path = re.findall(r"([^/]+)/([^/]+)/(.*)", val)[0] + return s3_alias, bucket, path + + +def main(): + logging.info(f"Known s3: {list(s3_config_data.keys())}") + + args = parse_args() + + source_s3_alias, source_bucket, source_prefix = extract_s3_parts(args.source) + logging.info(f"{source_s3_alias=}, {source_bucket=}, {source_prefix=}") + source_s3 = build_s3(s3_config_data, source_s3_alias) + + target_s3_alias, target_bucket, target_prefix = extract_s3_parts(args.target) + logging.info(f"{target_s3_alias=}, {target_bucket=}, {target_prefix=}") + target_s3 = build_s3(s3_config_data, target_s3_alias) + + source_files = get_files(source_s3, bucket=source_bucket, prefix=source_prefix) + save_files(source_files, "source.json") + print(f"Source {get_file_metrics(source_files)}") + + target_files = get_files(target_s3, bucket=target_bucket, prefix=target_prefix) + save_files(target_files, "target.json") + print(f"Target initial {get_file_metrics(target_files)}") + + if args.remove: + redundant = get_redundant_files(source_files, target_files) + if redundant: + for file_to_remove in redundant: + object_name = os.path.join(target_prefix, file_to_remove) + logging.info(f"Removing redundant {target_bucket}:{object_name}") + target_s3.remove_object(bucket_name="backups", object_name=object_name) + del target_files[file_to_remove] + print(f"Removed {len(redundant)} files") + print(f"Target after removing redundant {get_file_metrics(target_files)}") + + new_files = get_redundant_files(target_files, source_files) + print(f"New {len(new_files)} files") + different_files = get_redundant_files(source_files, target_files) + print(f"Different {len(different_files)} files") + + for key in tqdm.tqdm(new_files.union(different_files)): + try: + source_object = os.path.join(source_prefix, key) + target_object = os.path.join(target_prefix, key) + logging.info( + f"Moving {source_bucket}:{source_object} to " + f"{target_bucket}:{target_object}" + ) + source_data = get_object_data(source_s3, source_bucket, source_object) + put_object_data( + target_s3, + target_bucket, + target_object, + data=source_data, + ) + except RetryError: + logging.warning( + "Retry on moving" + "{source_bucket}:{source_object} to " + "{target_bucket}:{target_object}" + ) + + +if __name__ == "__main__": + logging.basicConfig( + filename="s3-mirror.log", + level=logging.INFO, + format="[%(asctime)s] %(levelname)s - %(message)s", + ) + + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..034a343 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,10 @@ +[tool.black] +line-length = 80 +target-version = ['py38'] +include = '.pyi?$' + +[tool.isort] +profile = "black" +py_version = "auto" +sections = "FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER" +known_local_folder = "src" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..22e273c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +minio==7.1.13 +tenacity==8.2.1 +tqdm==4.64.1