Files
s3-mirror/main.py
2023-02-11 19:31:58 +03:00

193 lines
6.0 KiB
Python

import argparse
import io
import json
import logging
import os.path
import re
import sys
from os.path import expanduser
from typing import Mapping, Set, Tuple
import minio
import tqdm
from tenacity import RetryError, retry, stop_after_delay, wait_exponential
default_retry_args = dict(
wait=wait_exponential(multiplier=1, min=0.5, max=10), stop=stop_after_delay(20)
)
s3_config_data = json.loads(open(expanduser("~/.mc/config.json")).read())["aliases"]
def get_files(s3, bucket, prefix) -> Mapping[str, Tuple[int, str]]:
res = {}
prefix_len = len(prefix)
for obj in s3.list_objects(bucket, prefix=prefix, recursive=True):
if obj.is_dir:
continue
res[obj.object_name[prefix_len:].lstrip("/")] = (obj.size, obj.etag)
return res
def save_files(files: Mapping[str, Tuple[int, str]], path):
processed_files = {key: list(val) for key, val in files.items()}
with open(path, "w") as f:
f.write(
json.dumps(processed_files, indent=4, ensure_ascii=False, sort_keys=True)
)
def get_file_metrics(files: Mapping[str, Tuple[int, str]]):
total_size = sum([x[0] for x in files.values()])
return f"total {len(files)} files with size: {total_size}"
def build_s3(config_data, name):
try:
s3_credentials = config_data[name]
except KeyError:
print(f"Unknown s3: {name}")
sys.exit(1)
return minio.Minio(
endpoint=re.sub(r"^https?://", "", s3_credentials["url"]),
access_key=s3_credentials["accessKey"],
secret_key=s3_credentials["secretKey"],
secure=s3_credentials["url"].startswith("https"),
)
def get_redundant_files(
source: Mapping[str, Tuple[int, str]], target: Mapping[str, Tuple[int, str]]
) -> Set[str]:
"""
:return: files that exists in target but not in source. Equal to `target - source`
"""
res = set()
for key in target:
if key in source:
continue
res.add(key)
return res
def get_different_files(
source: Mapping[str, Tuple[int, str]], target: Mapping[str, Tuple[int, str]]
) -> Set[str]:
"""
:return: All key that key in source and target and different values
"""
res = set()
for key in source:
if key in target and source[key] != target[key]:
res.add(key)
return res
@retry(**default_retry_args)
def get_object_data(s3: minio.Minio, bucket: str, object_name: str) -> bytes:
return s3.get_object(bucket_name=bucket, object_name=object_name).read()
@retry(**default_retry_args)
def put_object_data(s3: minio.Minio, bucket: str, object_name: str, data: bytes):
s3.put_object(
bucket,
object_name,
io.BytesIO(data),
len(data),
)
def parse_args():
parser = argparse.ArgumentParser(description="Reliable s3-mirror with logs")
parser.add_argument(
"--source", type=str, required=True, help="source path (with s3)"
)
parser.add_argument(
"--target", type=str, required=True, help="source path (with s3)"
)
parser.add_argument(
"-r",
"--remove",
default=False,
action="store_true",
help="remove redundant files",
)
return parser.parse_args()
def extract_s3_parts(val) -> Tuple[str, str, str]:
s3_alias, bucket, path = re.findall(r"([^/]+)/([^/]+)/(.*)", val)[0]
return s3_alias, bucket, path
def main():
logging.info(f"Known s3: {list(s3_config_data.keys())}")
args = parse_args()
source_s3_alias, source_bucket, source_prefix = extract_s3_parts(args.source)
logging.info(f"{source_s3_alias=}, {source_bucket=}, {source_prefix=}")
source_s3 = build_s3(s3_config_data, source_s3_alias)
target_s3_alias, target_bucket, target_prefix = extract_s3_parts(args.target)
logging.info(f"{target_s3_alias=}, {target_bucket=}, {target_prefix=}")
target_s3 = build_s3(s3_config_data, target_s3_alias)
source_files = get_files(source_s3, bucket=source_bucket, prefix=source_prefix)
save_files(source_files, "source.json")
print(f"Source {get_file_metrics(source_files)}")
target_files = get_files(target_s3, bucket=target_bucket, prefix=target_prefix)
save_files(target_files, "target.json")
print(f"Target initial {get_file_metrics(target_files)}")
if args.remove:
redundant = get_redundant_files(source_files, target_files)
if redundant:
for file_to_remove in redundant:
object_name = os.path.join(target_prefix, file_to_remove)
logging.info(f"Removing redundant {target_bucket}:{object_name}")
target_s3.remove_object(bucket_name="backups", object_name=object_name)
del target_files[file_to_remove]
print(f"Removed {len(redundant)} files")
print(f"Target after removing redundant {get_file_metrics(target_files)}")
new_files = get_redundant_files(target_files, source_files)
print(f"New {len(new_files)} files")
different_files = get_redundant_files(source_files, target_files)
print(f"Different {len(different_files)} files")
for key in tqdm.tqdm(new_files.union(different_files)):
try:
source_object = os.path.join(source_prefix, key)
target_object = os.path.join(target_prefix, key)
logging.info(
f"Moving {source_bucket}:{source_object} to "
f"{target_bucket}:{target_object}"
)
source_data = get_object_data(source_s3, source_bucket, source_object)
put_object_data(
target_s3,
target_bucket,
target_object,
data=source_data,
)
except RetryError:
logging.warning(
"Retry on moving"
"{source_bucket}:{source_object} to "
"{target_bucket}:{target_object}"
)
if __name__ == "__main__":
logging.basicConfig(
filename="s3-mirror.log",
level=logging.INFO,
format="[%(asctime)s] %(levelname)s - %(message)s",
)
main()