131 lines
4.4 KiB
Python
131 lines
4.4 KiB
Python
import json
|
|
import logging
|
|
import sys
|
|
import time
|
|
|
|
from sqlalchemy.exc import IntegrityError
|
|
|
|
from src.settings import RABBIT_REPLY_QUEUE, RABBIT_REPOSITORY_QUEUE, init_logging
|
|
from src.rabbit import get_channel, consume_task
|
|
from src.db import AnalyzeTask, DBSession, TaskStatus, User, Repository
|
|
from src.messages import AnalyzeResponse
|
|
|
|
|
|
BATCH_SIZE = 100
|
|
RABBIT_CONSUME_TIMEOUT = 4
|
|
PREFETCH_COUNT = 20 # only one consumer and quick processing -> can be big
|
|
|
|
|
|
def get_user_id(session, name: str):
|
|
user = session.query(User).filter(User.name == name).first()
|
|
if user:
|
|
return user.id
|
|
new_user = User()
|
|
new_user.name = name
|
|
new_user.opt_out = False
|
|
session.add(new_user)
|
|
return session.query(User).filter(User.name == name).first().id
|
|
|
|
|
|
def process_repositories(session, channel, consume_count: int):
|
|
added_count = duplicate_count = 0
|
|
begin_at = time.monotonic()
|
|
for ind, data in enumerate(
|
|
consume_task(
|
|
channel,
|
|
RABBIT_REPOSITORY_QUEUE,
|
|
timeout=RABBIT_CONSUME_TIMEOUT,
|
|
auto_ack=False,
|
|
max_count=consume_count,
|
|
)
|
|
):
|
|
if data is None:
|
|
break
|
|
data = json.loads(data.decode("utf-8"))
|
|
if session.query(Repository).filter(Repository.id == data["id"]).first():
|
|
duplicate_count += 1
|
|
continue
|
|
user_name, repository_name = data["name"].split("/")
|
|
user_id = get_user_id(session, user_name)
|
|
new_repository = Repository()
|
|
new_repository.name = repository_name
|
|
new_repository.id = data["id"]
|
|
new_repository.user_id = user_id
|
|
new_repository.stargazers_count = data["stargazers_count"]
|
|
new_repository.fork = data["fork"]
|
|
new_repository.private = data["private"]
|
|
new_repository.default_branch = data["default_branch"]
|
|
new_repository.updated_at = data["updated_at"]
|
|
new_repository.created_at = data["created_at"]
|
|
new_repository.archived = data["archived"]
|
|
new_repository.size = data["size"]
|
|
session.add(new_repository)
|
|
added_count += 1
|
|
if ind % BATCH_SIZE == 0:
|
|
try:
|
|
session.commit()
|
|
except IntegrityError:
|
|
duplicate_count += 1
|
|
session.rollback()
|
|
else:
|
|
added_count += 1
|
|
logging.info(
|
|
f"total: {added_count + duplicate_count}, "
|
|
f"new: {added_count}, "
|
|
f"duplicates: {duplicate_count} "
|
|
f"{(added_count + duplicate_count) / (time.monotonic() - begin_at):.2f} items/s"
|
|
)
|
|
|
|
|
|
def process_task_replies(session, channel, consume_count: int):
|
|
for ind, data in enumerate(
|
|
consume_task(
|
|
channel,
|
|
RABBIT_REPLY_QUEUE,
|
|
timeout=RABBIT_CONSUME_TIMEOUT,
|
|
auto_ack=False,
|
|
max_count=consume_count,
|
|
)
|
|
):
|
|
if data is None:
|
|
return
|
|
raw_reply = json.loads(data.decode("utf-8"))
|
|
task_id = raw_reply["task_id"]
|
|
db_task = session.query(AnalyzeTask).filter(AnalyzeTask.id == task_id).first()
|
|
if db_task is None:
|
|
logging.warning(f"task_id {task_id} not found in DB")
|
|
continue
|
|
if raw_reply["status"] == "error":
|
|
db_task.status = TaskStatus.Error
|
|
logging.error(f"Error `{raw_reply['message']}` with task_id {task_id}.")
|
|
continue
|
|
response = AnalyzeResponse.from_data(raw_reply)
|
|
db_task.status = TaskStatus.Success
|
|
db_task.clone_duration = response.clone_duration
|
|
db_task.duration = response.duration
|
|
db_task.improvement_absolute = response.improvement_absolute
|
|
db_task.improvement_relative = response.improvement_relative
|
|
db_task.worker = response.worker
|
|
if ind % BATCH_SIZE == 0:
|
|
session.commit()
|
|
|
|
|
|
def main():
|
|
consume_count = 0 if len(sys.argv) == 1 else int(sys.argv[1])
|
|
session = DBSession()
|
|
channel = get_channel(prefetch_count=PREFETCH_COUNT)
|
|
try:
|
|
process_repositories(session=session, channel=channel, consume_count=consume_count)
|
|
channel.close()
|
|
channel = get_channel(prefetch_count=PREFETCH_COUNT)
|
|
process_task_replies(session=session, channel=channel, consume_count=consume_count)
|
|
finally:
|
|
session.commit()
|
|
channel.close()
|
|
session.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
init_logging()
|
|
main()
|