diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..f66cdf6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.8-slim + +RUN apt update && apt install -y git + +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD [ "python", "./optimizer_worker.py" ] diff --git a/db_repo/README b/db_repo/README new file mode 100644 index 0000000..6218f8c --- /dev/null +++ b/db_repo/README @@ -0,0 +1,4 @@ +This is a database migration repository. + +More information at +http://code.google.com/p/sqlalchemy-migrate/ diff --git a/db_repo/__init__.py b/db_repo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/db_repo/manage.py b/db_repo/manage.py new file mode 100644 index 0000000..39fa389 --- /dev/null +++ b/db_repo/manage.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python +from migrate.versioning.shell import main + +if __name__ == '__main__': + main(debug='False') diff --git a/db_repo/migrate.cfg b/db_repo/migrate.cfg new file mode 100644 index 0000000..787fec1 --- /dev/null +++ b/db_repo/migrate.cfg @@ -0,0 +1,25 @@ +[db_settings] +# Used to identify which repository this database is versioned under. +# You can use the name of your project. +repository_id=assets-bot work table + +# The name of the database table used to track the schema version. +# This name shouldn't already be used by your project. +# If this is changed once a database is under version control, you'll need to +# change the table name in each database too. +version_table=migrate_version + +# When committing a change script, Migrate will attempt to generate the +# sql for all supported databases; normally, if one of them fails - probably +# because you don't have that database installed - it is ignored and the +# commit continues, perhaps ending successfully. +# Databases in this list MUST compile successfully during a commit, or the +# entire commit will fail. List the databases your application will actually +# be using to ensure your updates to that database work properly. +# This must be a list; example: ['postgres','sqlite'] +required_dbs=[] + +# When creating new change scripts, Migrate will stamp the new script with +# a version number. By default this is latest_version + 1. You can set this +# to 'true' to tell Migrate to use the UTC timestamp instead. +use_timestamp_numbering=False diff --git a/db_repo/versions/001_Analyze_tables.py b/db_repo/versions/001_Analyze_tables.py new file mode 100644 index 0000000..3eb29fd --- /dev/null +++ b/db_repo/versions/001_Analyze_tables.py @@ -0,0 +1,54 @@ +from sqlalchemy import Table, Column, Integer, Unicode, Boolean, MetaData, ForeignKey, Float, Index + +meta = MetaData() + +user = Table( + "users", meta, + Column("id", Integer, primary_key=True), + Column("opt_out", Boolean, default=False, nullable=False), + Column("name", Unicode(120), nullable=False, index=True, unique=True) +) + +repository = Table( + "repositories", meta, + Column("id", Integer, primary_key=True), + Column("user_id", ForeignKey("users.id"), nullable=False), + Column("name", Unicode(120), nullable=False, index=True), + Column("stargazers_count", Integer, nullable=False), + Column("fork", Boolean, nullable=False), + Column("default_branch", Unicode(80), nullable=False), + Column("archived", Boolean, nullable=False), + Column("updated_at", Integer, comment="Value from GitHub API", nullable=False, index=True), + Column("created_at", Integer, comment="Value from GitHub API", nullable=False), + Column("size", Integer, nullable=False, comment="Value from GitHub API"), + Column("private", Boolean, nullable=False), +) + + +analyze_task = Table( + "analyze_tasks", meta, + Column("id", Integer, primary_key=True), + Column("repository_id", ForeignKey("repositories.id"), nullable=False, index=True), + Column("duration", Float, nullable=False, index=True), + Column("created_at", Integer, nullable=False, index=True), + Column("worker", Unicode(80), nullable=False, index=True), + Column("level", Integer, nullable=False), + Column("url", Unicode(200), nullable=False), + Column("improvement_absolute", Integer, nullable=False, index=True), + Column("improvement_relative", Float, nullable=False, index=True) +) + + +def upgrade(migrate_engine): + meta.bind = migrate_engine + meta.reflect() + user.create() + repository.create() + analyze_task.create() + + +def downgrade(migrate_engine): + meta.bind = migrate_engine + user.drop() + repository.drop() + analyze_task.drop() diff --git a/db_repo/versions/002_unique_repositories_index.py b/db_repo/versions/002_unique_repositories_index.py new file mode 100644 index 0000000..77cb443 --- /dev/null +++ b/db_repo/versions/002_unique_repositories_index.py @@ -0,0 +1,24 @@ +from sqlalchemy import Table, MetaData, Index + +meta = MetaData() + + +def _get_indices(meta): + meta.reflect() + Users = Table("users", meta, autoload=True, autoload_with=meta.bind) + Repositories = Table("users", meta, autoload=True, autoload_with=meta.bind) + return [ + Index("repositories_name_unique_id", Users.c.id, Repositories.c.name, unique=True) + ] + + +def upgrade(migrate_engine): + meta.bind = migrate_engine + for idx in _get_indices(meta): + idx.create() + + +def downgrade(migrate_engine): + meta.bind = migrate_engine + for idx in _get_indices(meta): + idx.drop() diff --git a/db_repo/versions/003_clone_duration_column.py b/db_repo/versions/003_clone_duration_column.py new file mode 100644 index 0000000..0d68909 --- /dev/null +++ b/db_repo/versions/003_clone_duration_column.py @@ -0,0 +1,26 @@ +from sqlalchemy import Table, MetaData, Column, Float + +meta = MetaData() + + +def _get_table(meta): + meta.reflect() + AnalyzeTask = Table("analyze_tasks", meta, autoload=True, autoload_with=meta.bind) + return AnalyzeTask + + +def _get_column(): + return Column("clone_duration", Float, nullable=False, default=0, server_default="0") + + +def upgrade(migrate_engine): + meta.bind = migrate_engine + table = _get_table(meta) + col = _get_column() + col.create(table, populate_default=True) + + +def downgrade(migrate_engine): + meta.bind = migrate_engine + table = _get_table(meta) + table.c.clone_duration.drop() diff --git a/db_repo/versions/004_task_status_column.py b/db_repo/versions/004_task_status_column.py new file mode 100644 index 0000000..751174e --- /dev/null +++ b/db_repo/versions/004_task_status_column.py @@ -0,0 +1,38 @@ +from sqlalchemy import Table, MetaData, Column, Integer, Index + +meta = MetaData() + + +def _get_table(meta): + meta.reflect() + AnalyzeTask = Table("analyze_tasks", meta, autoload=True, autoload_with=meta.bind) + return AnalyzeTask + + +def _get_column(): + return Column("status", Integer, nullable=False, default=0, server_default="2") + + +def _get_indices(meta): + meta.reflect() + AnalyzeTask = Table("analyze_tasks", meta, autoload=True, autoload_with=meta.bind) + return [ + Index("analyze_tasks_status_idx", AnalyzeTask.c.status, unique=False) + ] + + +def upgrade(migrate_engine): + meta.bind = migrate_engine + table = _get_table(meta) + col = _get_column() + col.create(table, populate_default=True) + for idx in _get_indices(meta): + idx.create() + + +def downgrade(migrate_engine): + meta.bind = migrate_engine + table = _get_table(meta) + table.c.status.drop() + for idx in _get_indices(meta): + idx.drop() diff --git a/db_repo/versions/__init__.py b/db_repo/versions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/process_responses.py b/process_responses.py new file mode 100644 index 0000000..ad3d6ae --- /dev/null +++ b/process_responses.py @@ -0,0 +1,38 @@ +import json +import logging + +from src.settings import RABBIT_REPLY_QUEUE, init_logging +from src.rabbit import get_channel, send_task, consume_task +from src.db import AnalyzeTask, DBSession, TaskStatus +from src.messages import AnalyzeResponse + + +def main(): + session = DBSession() + channel = get_channel() + for data in consume_task(channel, RABBIT_REPLY_QUEUE, max_count=10): + data = data.decode("utf-8") + raw_reply = json.loads(data) + task_id = raw_reply["task_id"] + logging.info(f"New response: {raw_reply}") + response = AnalyzeResponse.from_data(raw_reply) + db_task = session.query(AnalyzeTask).filter(AnalyzeTask.id == task_id).first() + if db_task is None: + logging.warning(f"task_id {task_id} not found in DB") + continue + if raw_reply["status"] == "error": + db_task.status = TaskStatus.Error + logging.error(f"Error {raw_reply['message']} with task_id {task_id} not found in DB") + continue + db_task.status = TaskStatus.Success + db_task.clone_duration = response.clone_duration + db_task.duration = response.duration + db_task.improvement_absolute = response.improvement_absolute + db_task.improvement_relative = response.improvement_relative + db_task.worker = response.worker + session.commit() + + +if __name__ == "__main__": + init_logging() + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..228909c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +PyGithub +tqdm +requests +python-magic +sqlalchemy-migrate +pika diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/db.py b/src/db.py new file mode 100644 index 0000000..f2d77ec --- /dev/null +++ b/src/db.py @@ -0,0 +1,65 @@ +import datetime + +from sqlalchemy import create_engine, Column, Integer, Unicode, Boolean, Float, ForeignKey +from sqlalchemy.orm import sessionmaker, relationship +from sqlalchemy.ext.declarative import declarative_base + +from src.settings import DB_PATH + +Base = declarative_base() + + +class TaskStatus: + Error = 0 + InQueue = 1 + Success = 2 + + +class User(Base): + __tablename__ = "users" + id = Column(Integer, primary_key=True) + # updated_at = Column( + # "updated_at", Integer, onupdate=lambda: int(datetime.datetime.now().timestamp()) + # ) + opt_out = Column(Boolean, default=False, nullable=False) + name = Column(Unicode(120), nullable=False, index=True, unique=True) + + +class Repository(Base): + __tablename__ = "repositories" + id = Column(Integer, primary_key=True) + user_id = Column(ForeignKey("users.id"), nullable=False) + user = relationship(User) + tasks = relationship("AnalyzeTask") + name = Column(Unicode(120), nullable=False, index=True) + stargazers_count = Column(Integer, nullable=False) + fork = Column(Boolean, nullable=False) + default_branch = Column(Unicode(80), nullable=False) + archived = Column(Boolean, nullable=False) + updated_at = Column(Integer, comment="Value from GitHub API", nullable=False, index=True) + created_at = Column(Integer, comment="Value from GitHub API", nullable=False) + size = Column(Integer, nullable=False, comment="Value from GitHub API") + private = Column(Boolean, nullable=False) + + def get_clone_url(self): + return f"git@github.com:{self.user.name}/{self.name}" + + +class AnalyzeTask(Base): + __tablename__ = "analyze_tasks" + id = Column(Integer, primary_key=True) + repository_id = Column(ForeignKey("repositories.id"), nullable=False) + status = Column(Integer, default=TaskStatus.InQueue,nullable=False, index=True) + repository = relationship(Repository) + clone_duration = Column(Float, default=0., nullable=False, index=True) + duration = Column(Float, default=0., nullable=False, index=True) + created_at = Column(Integer, default=lambda: int(datetime.datetime.now().timestamp()), nullable=False, index=True) + worker = Column(Unicode(80), default="", nullable=False, index=True) + level = Column(Integer, default=0, nullable=False) + url = Column(Unicode(200), default="", nullable=False) + improvement_absolute = Column(Integer, default=0, nullable=False, index=True) + improvement_relative = Column(Float, default=0., nullable=False, index=True) + + +_engine = create_engine(DB_PATH) +DBSession = sessionmaker(bind=_engine) \ No newline at end of file diff --git a/src/git.py b/src/git.py new file mode 100644 index 0000000..e7300b2 --- /dev/null +++ b/src/git.py @@ -0,0 +1,45 @@ +import logging +import subprocess +import os +import shlex + +from src.settings import GIT_PRIVATE_KEY_PATH + +_logger = logging.getLogger(__name__) +_logger.addHandler(logging.NullHandler()) + +os.environ['GIT_SSH_COMMAND'] = f"ssh -i {shlex.quote(GIT_PRIVATE_KEY_PATH)} -o IdentitiesOnly=yes" + + +def _check_preconditions(): + assert subprocess.check_call(["git", "--version"]) == 0 + + +def clone_repo(ssh_url: str, path: str): + subprocess.check_call(["git", "clone", ssh_url, path], stdout=subprocess.DEVNULL) + + +def create_and_open_branch(repo_path: str, branch_name: str): + subprocess.check_call( + ["git", "checkout", "-b", branch_name], cwd=repo_path, stdout=subprocess.DEVNULL + ) + + +def commit_and_push(repo_path: str, commit_message: str): + subprocess.check_call(["git", "add", "."], cwd=repo_path, stdout=subprocess.DEVNULL) + subprocess.check_call( + ["git", "commit", "-m", commit_message], cwd=repo_path, stdout=subprocess.DEVNULL + ) + branches = subprocess.check_output( + ["git", "branch"], cwd=repo_path + ).decode("utf-8") + current_branch = next(filter(lambda x: x.startswith("*"), branches.splitlines()))[1:].strip() + _logger.info(f"pushing branch {current_branch} in repo {repo_path}") + subprocess.check_call( + ["git", "push", "--set-upstream", "origin", current_branch], + cwd=repo_path, + stdout=subprocess.DEVNULL, + ) + + +_check_preconditions() diff --git a/src/github.py b/src/github.py new file mode 100644 index 0000000..5a151ef --- /dev/null +++ b/src/github.py @@ -0,0 +1,46 @@ +import logging +import subprocess + +from github import Github, Repository, PullRequest + +_logger = logging.getLogger(__name__) +_logger.addHandler(logging.NullHandler()) + + +_g = Github("993d405e652f72a58f64750a7cd6ac9052e01bd2") +user_login = _g.get_user().login + +# def _fork_repo(r: Repository) -> Repository: +# r = _g.get_user().create_fork(r) +# return r + +# print(r.size, r.stargazers_coun>t) + + +def create_fork(r: Repository) -> Repository: + return _g.get_user().create_fork(r) + + +def to_repository(full_name: str) -> Repository: + return _g.get_repo(full_name) + + +def get_possible_repos(): + return [_g.get_repo("AlekseyLobanov/CrossGen")] + + +def create_pr( + base_repo: Repository, branch: str, title: str, body: str, master: str = "master" +) -> PullRequest: + base_repo.create_pull( + title=title, + body=body, + head=f"{user_login}:{branch}", + base=master, + draft=True, + maintainer_can_modify=True, + ) + + +def is_for_fork(r: Repository): + return not r.fork diff --git a/src/messages.py b/src/messages.py new file mode 100644 index 0000000..bf03517 --- /dev/null +++ b/src/messages.py @@ -0,0 +1,106 @@ +import datetime +import platform +from typing import Optional + + +class AnalyzeTask: + def __init__(self): + self.created_at = 0 + self.url = "" + self.level = -1 + self.task_id = -1 + + @classmethod + def from_data(cls, data): + res = cls() + res.created_at = data["created_at"] + res.url = data["url"] + res.level = data["level"] + res.task_id = data["task_id"] + return res + + @property + def data(self): + return { + "created_at": self.created_at, + "url": self.url, + "level": self.level, + "task_id": self.task_id, + } + + +class AnalyzeResponse: + def __init__(self): + self.created_at = 0 + self.url = "" + self.level = -1 + self.task_id = -1 + self.clone_duration = 0.0 + self.duration = 0.0 + self.improvement_absolute = 0 + self.improvement_relative = 0 + self.worker = "" + + @classmethod + def from_data(cls, data): + res = cls() + res.created_at = data["created_at"] + res.url = data["url"] + res.level = data["level"] + res.task_id = data["task_id"] + res.duration = data["duration"] + res.clone_duration = data["clone_duration"] + res.worker = data["worker"] + res.improvement_absolute = data["improvement_absolute"] + res.improvement_relative = data["improvement_relative"] + return res + + @property + def data(self): + if self.task_id < 0: + raise RuntimeError("No task_id") + return { + "created_at": self.created_at, + "url": self.url, + "level": self.level, + "task_id": self.task_id, + "duration": self.duration, + "clone_duration": self.clone_duration, + "worker": self.worker, + "improvement_absolute": self.improvement_absolute, + "improvement_relative": self.improvement_relative, + } + + +def create_analyze_task(url: str, level: int, task_id: Optional[int] = None) -> AnalyzeTask: + res = AnalyzeTask() + res.created_at = int(datetime.datetime.now().timestamp()) + res.url = url + res.level = level + res.task_id = task_id + return res + + +def create_analyze_response( + task: AnalyzeTask, + initial_size: int, + final_size: int, + duration: float, + worker: str = platform.node(), + clone_duration: float = 0, + +) -> AnalyzeResponse: + res = AnalyzeResponse() + res.created_at = task.created_at + res.url = task.url + res.level = task.level + res.task_id = task.task_id + res.clone_duration = clone_duration + res.duration = duration + res.worker = worker + if initial_size < 100: + res.improvement_absolute = res.improvement_relative = 0 + else: + res.improvement_absolute = initial_size - final_size + res.improvement_relative = 1 - final_size / initial_size + return res diff --git a/tests/base_test.py b/tests/base_test.py new file mode 100644 index 0000000..d2fcda5 --- /dev/null +++ b/tests/base_test.py @@ -0,0 +1,2 @@ +def test_nothing(): + assert True \ No newline at end of file diff --git a/worker-compose.yaml b/worker-compose.yaml new file mode 100644 index 0000000..e69de29