import argparse import io import json import logging import os.path import re import sys from os.path import expanduser from typing import Mapping, Set, Tuple import minio import tqdm from tenacity import RetryError, retry, stop_after_delay, wait_exponential default_retry_args = dict( wait=wait_exponential(multiplier=1, min=0.5, max=10), stop=stop_after_delay(20) ) s3_config_data = json.loads(open(expanduser("~/.mc/config.json")).read())["aliases"] def get_files(s3, bucket, prefix) -> Mapping[str, Tuple[int, str]]: res = {} prefix_len = len(prefix) for obj in tqdm.tqdm( s3.list_objects(bucket, prefix=prefix, recursive=True), desc="S3 list objects" ): if obj.is_dir: continue res[obj.object_name[prefix_len:].lstrip("/")] = (obj.size, obj.etag) return res def save_files(files: Mapping[str, Tuple[int, str]], path): processed_files = {key: list(val) for key, val in files.items()} with open(path, "w") as f: f.write( json.dumps(processed_files, indent=4, ensure_ascii=False, sort_keys=True) ) def get_file_metrics(files: Mapping[str, Tuple[int, str]]): total_size = sum([x[0] for x in files.values()]) return f"total {len(files)} files with size: {total_size}" def build_s3(config_data, name): try: s3_credentials = config_data[name] except KeyError: print(f"Unknown s3: {name}") sys.exit(1) return minio.Minio( endpoint=re.sub(r"^https?://", "", s3_credentials["url"]), access_key=s3_credentials["accessKey"], secret_key=s3_credentials["secretKey"], secure=s3_credentials["url"].startswith("https"), ) def get_redundant_files( source: Mapping[str, Tuple[int, str]], target: Mapping[str, Tuple[int, str]] ) -> Set[str]: """ :return: files that exists in target but not in source. Equal to `target - source` """ res = set() for key in target: if key in source: continue res.add(key) return res def get_different_files( source: Mapping[str, Tuple[int, str]], target: Mapping[str, Tuple[int, str]] ) -> Set[str]: """ :return: All key that key in source and target and different values """ res = set() for key in source: if key in target and source[key] != target[key]: res.add(key) return res @retry(**default_retry_args) def get_object_data(s3: minio.Minio, bucket: str, object_name: str) -> bytes: return s3.get_object(bucket_name=bucket, object_name=object_name).read() @retry(**default_retry_args) def put_object_data(s3: minio.Minio, bucket: str, object_name: str, data: bytes): s3.put_object( bucket, object_name, data=io.BytesIO(data), length=len(data), num_parallel_uploads=1, part_size=150 * 1024 * 1024, ) def parse_args(): parser = argparse.ArgumentParser(description="Reliable s3-mirror with logs") parser.add_argument( "--source", type=str, required=True, help="source path (with s3)" ) parser.add_argument( "--target", type=str, required=True, help="source path (with s3)" ) parser.add_argument( "-r", "--remove", default=False, action="store_true", help="remove redundant files", ) return parser.parse_args() def extract_s3_parts(val) -> Tuple[str, str, str]: s3_alias, bucket, path = re.findall(r"([^/]+)/([^/]+)/(.*)", val)[0] return s3_alias, bucket, path def main(): logging.info(f"Known s3: {list(s3_config_data.keys())}") args = parse_args() source_s3_alias, source_bucket, source_prefix = extract_s3_parts(args.source) logging.info(f"{source_s3_alias=}, {source_bucket=}, {source_prefix=}") source_s3 = build_s3(s3_config_data, source_s3_alias) target_s3_alias, target_bucket, target_prefix = extract_s3_parts(args.target) logging.info(f"{target_s3_alias=}, {target_bucket=}, {target_prefix=}") target_s3 = build_s3(s3_config_data, target_s3_alias) source_files = get_files(source_s3, bucket=source_bucket, prefix=source_prefix) save_files(source_files, "source.json") print(f"Source {get_file_metrics(source_files)}") target_files = get_files(target_s3, bucket=target_bucket, prefix=target_prefix) save_files(target_files, "target.json") print(f"Target initial {get_file_metrics(target_files)}") if args.remove: redundant = get_redundant_files(source_files, target_files) if redundant: for file_to_remove in redundant: object_name = os.path.join(target_prefix, file_to_remove) logging.info(f"Removing redundant {target_bucket}:{object_name}") try: target_s3.remove_object( bucket_name=target_bucket, object_name=object_name ) except Exception as err: print( f"Unable to remove {target_bucket}/{object_name}: erorr {err}" ) del target_files[file_to_remove] print(f"Removed {len(redundant)} files") print(f"Target after removing redundant {get_file_metrics(target_files)}") new_files = get_redundant_files(target_files, source_files) print(f"New {len(new_files)} files") different_files = get_different_files(source_files, target_files) print(f"Different {len(different_files)} files") for key in tqdm.tqdm(new_files.union(different_files)): source_object = os.path.join(source_prefix, key) target_object = os.path.join(target_prefix, key) try: logging.info( f"Moving {source_bucket}:{source_object} to " f"{target_bucket}:{target_object}" ) source_data = get_object_data(source_s3, source_bucket, source_object) put_object_data( target_s3, target_bucket, target_object, data=source_data, ) except RetryError: logging.warning( "Retry on moving" f"{source_bucket}:{source_object} to " f"{target_bucket}:{target_object}" ) if __name__ == "__main__": logging.basicConfig( filename="s3-mirror.log", level=logging.INFO, format="[%(asctime)s] %(levelname)s - %(message)s", ) main()