First version
This commit is contained in:
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
43
src/config.py
Normal file
43
src/config.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""
|
||||
Token should be treated as password,
|
||||
files are more secure in general than command-line arguments
|
||||
|
||||
.ini config example
|
||||
[main]
|
||||
endpoint=https://example.com/gitea
|
||||
token=something
|
||||
format={owner}/{name}
|
||||
out_dir=/home/user/repositories
|
||||
ssh_key=/home/user/.ssh/id_rsa.pub
|
||||
|
||||
"""
|
||||
|
||||
import configparser
|
||||
import os
|
||||
from .models import Config
|
||||
|
||||
MAIN_SECTION = "main"
|
||||
|
||||
|
||||
def read_ini_config(path: str) -> Config:
|
||||
if not os.path.exists(path):
|
||||
raise RuntimeError("INI config path is not exists")
|
||||
|
||||
parser = configparser.ConfigParser()
|
||||
parser.read(path)
|
||||
try:
|
||||
endpoint = parser[MAIN_SECTION]["endpoint"]
|
||||
token = parser[MAIN_SECTION]["token"]
|
||||
repository_format = parser[MAIN_SECTION]["format"]
|
||||
out_dir = parser[MAIN_SECTION]["out_dir"]
|
||||
ssh_key_path = parser[MAIN_SECTION]["ssh_key"]
|
||||
except KeyError as err:
|
||||
raise RuntimeError(f"No value for section: {err}")
|
||||
|
||||
return Config(
|
||||
repository_format=repository_format,
|
||||
endpoint=endpoint,
|
||||
token=token,
|
||||
out_dir=out_dir,
|
||||
ssh_key_path=ssh_key_path,
|
||||
)
|
||||
23
src/git.py
Normal file
23
src/git.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import subprocess
|
||||
from os import makedirs
|
||||
|
||||
|
||||
def git_clone(ssh_url: str, repository: str, ssh_key: str) -> bool:
|
||||
makedirs(repository, exist_ok=True)
|
||||
try:
|
||||
subprocess.check_call(
|
||||
["git", "clone", ssh_url, "."], cwd=repository
|
||||
)
|
||||
except subprocess.CalledProcessError:
|
||||
print(f"Unable to clone repository {repository} with key {ssh_key} from {ssh_url}")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def git_pull(repository: str, ssh_key: str) -> bool:
|
||||
try:
|
||||
subprocess.check_call(["git", "pull"], cwd=repository)
|
||||
except subprocess.CalledProcessError:
|
||||
print(f"Unable to pull repository {repository} with key {ssh_key}")
|
||||
return False
|
||||
return True
|
||||
42
src/gitea_api.py
Normal file
42
src/gitea_api.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from typing import List
|
||||
from .models import GiteaRepository
|
||||
from urllib.parse import urljoin
|
||||
from pydantic import parse_obj_as
|
||||
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
class GiteaApi:
|
||||
def __init__(self, endpoint: str, token: str):
|
||||
self._endpoint = endpoint
|
||||
self._token = token
|
||||
|
||||
def get_repositories(self, page_size=10) -> List[GiteaRepository]:
|
||||
"""
|
||||
For mirroring input user is not important.
|
||||
"""
|
||||
session = requests.session()
|
||||
session.headers.update({"Authorization": "token " + self._token})
|
||||
all_repos = {} # hack for unique repositories in result
|
||||
page_id = 1
|
||||
while True:
|
||||
r = session.get(
|
||||
urljoin(
|
||||
self._endpoint,
|
||||
f"/api/v1/user/repos",
|
||||
),
|
||||
params={"limit": page_size, "page": page_id},
|
||||
)
|
||||
if r.status_code != 200:
|
||||
print(f"Failed request, code {r.status_code}")
|
||||
return []
|
||||
repos_data = r.json()
|
||||
if not repos_data:
|
||||
break
|
||||
else:
|
||||
page_id += 1
|
||||
cur_repos = parse_obj_as(List[GiteaRepository], repos_data)
|
||||
for repo in cur_repos:
|
||||
all_repos[repo.repo_id] = repo
|
||||
return list(all_repos.values())
|
||||
24
src/models.py
Normal file
24
src/models.py
Normal file
@@ -0,0 +1,24 @@
|
||||
from pydantic import BaseModel, Field, HttpUrl
|
||||
import datetime
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
repository_format: str
|
||||
ssh_key_path: str
|
||||
endpoint: HttpUrl
|
||||
token: str
|
||||
out_dir: str
|
||||
|
||||
|
||||
class GiteaUser(BaseModel):
|
||||
user_id: int = Field(alias="id")
|
||||
login: str
|
||||
email: str
|
||||
|
||||
|
||||
class GiteaRepository(BaseModel):
|
||||
ssh_url: str
|
||||
name: str
|
||||
repo_id: int = Field(alias="id")
|
||||
updated_at: datetime.datetime
|
||||
owner: GiteaUser
|
||||
40
src/repository_name.py
Normal file
40
src/repository_name.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from .models import GiteaRepository, GiteaUser
|
||||
import datetime
|
||||
|
||||
from typing import List
|
||||
|
||||
|
||||
def _get_test_repository() -> GiteaRepository:
|
||||
return GiteaRepository(
|
||||
ssh_url="ssh://git@example.com/project/name",
|
||||
name="test name",
|
||||
id=42,
|
||||
updated_at=datetime.datetime.now(),
|
||||
owner=GiteaUser(
|
||||
id=23,
|
||||
login="test_user",
|
||||
email="test_user@example.com",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def is_valid_format(name_format: str) -> bool:
|
||||
try:
|
||||
get_repository_name(name_format, _get_test_repository())
|
||||
except KeyError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def get_repository_name(name_format: str, r: GiteaRepository) -> str:
|
||||
return name_format.format(
|
||||
name=r.name,
|
||||
repository_id=r.repo_id,
|
||||
owner=r.owner.login,
|
||||
owner_id=r.owner.user_id,
|
||||
)
|
||||
|
||||
|
||||
def is_valid_repository_names(name_format: str, repos: List[GiteaRepository]):
|
||||
names = set(get_repository_name(name_format, r) for r in repos)
|
||||
return len(names) == len(repos) # all names must be unique
|
||||
11
src/sync.py
Normal file
11
src/sync.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from .models import GiteaRepository
|
||||
|
||||
from typing import List
|
||||
|
||||
|
||||
class SyncProcessor:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def sync(self, path, repos: List[GiteaRepository]):
|
||||
pass
|
||||
Reference in New Issue
Block a user