processing: rewritten processing pipeline

This commit is contained in:
2021-01-16 22:09:06 +03:00
parent 65947896b8
commit f323dc2f41

View File

@@ -1,21 +1,97 @@
import json
import logging
import sys
import time
from src.settings import RABBIT_REPLY_QUEUE, init_logging
from src.rabbit import get_channel, send_task, consume_task
from src.db import AnalyzeTask, DBSession, TaskStatus
from sqlalchemy.exc import IntegrityError
from src.settings import RABBIT_REPLY_QUEUE, RABBIT_REPOSITORY_QUEUE, init_logging
from src.rabbit import get_channel, consume_task
from src.db import AnalyzeTask, DBSession, TaskStatus, User, Repository
from src.messages import AnalyzeResponse
def main():
session = DBSession()
channel = get_channel()
for data in consume_task(channel, RABBIT_REPLY_QUEUE, max_count=10):
BATCH_SIZE = 100
RABBIT_CONSUME_TIMEOUT = 4
def get_user_id(session, name: str):
user = session.query(User).filter(User.name == name).first()
if user:
return user.id
new_user = User()
new_user.name = name
new_user.opt_out = False
session.add(new_user)
return session.query(User).filter(User.name == name).first().id
def process_repositories(session, channel, consume_count: int):
added_count = duplicate_count = 0
begin_at = time.monotonic()
for ind, data in enumerate(
consume_task(
channel,
RABBIT_REPOSITORY_QUEUE,
timeout=RABBIT_CONSUME_TIMEOUT,
auto_ack=False,
max_count=consume_count,
)
):
if data is None:
break
data = json.loads(data.decode("utf-8"))
if session.query(Repository).filter(Repository.id == data["id"]).first():
duplicate_count += 1
continue
user_name, repository_name = data["name"].split("/")
user_id = get_user_id(session, user_name)
new_repository = Repository()
new_repository.name = repository_name
new_repository.id = data["id"]
new_repository.user_id = user_id
new_repository.stargazers_count = data["stargazers_count"]
new_repository.fork = data["fork"]
new_repository.private = data["private"]
new_repository.default_branch = data["default_branch"]
new_repository.updated_at = data["updated_at"]
new_repository.created_at = data["created_at"]
new_repository.archived = data["archived"]
new_repository.size = data["size"]
session.add(new_repository)
added_count += 1
if ind % BATCH_SIZE == 0:
try:
session.commit()
except IntegrityError:
duplicate_count += 1
session.rollback()
else:
added_count += 1
logging.info(
f"total: {added_count + duplicate_count}, "
f"new: {added_count}, "
f"duplicates: {duplicate_count} "
f"{(added_count + duplicate_count) / (time.monotonic() - begin_at):.2f} items/s"
)
def process_task_replies(session, channel, consume_count: int):
for ind, data in enumerate(
consume_task(
channel,
RABBIT_REPLY_QUEUE,
timeout=RABBIT_CONSUME_TIMEOUT,
auto_ack=False,
max_count=consume_count,
)
):
if data is None:
return
data = data.decode("utf-8")
raw_reply = json.loads(data)
task_id = raw_reply["task_id"]
logging.info(f"New response: {raw_reply}")
response = AnalyzeResponse.from_data(raw_reply)
db_task = session.query(AnalyzeTask).filter(AnalyzeTask.id == task_id).first()
if db_task is None:
logging.warning(f"task_id {task_id} not found in DB")
@@ -24,15 +100,32 @@ def main():
db_task.status = TaskStatus.Error
logging.error(f"Error {raw_reply['message']} with task_id {task_id} not found in DB")
continue
response = AnalyzeResponse.from_data(raw_reply)
db_task.status = TaskStatus.Success
db_task.clone_duration = response.clone_duration
db_task.duration = response.duration
db_task.improvement_absolute = response.improvement_absolute
db_task.improvement_relative = response.improvement_relative
db_task.worker = response.worker
if ind % BATCH_SIZE == 0:
session.commit()
def main():
consume_count = 0 if len(sys.argv) == 1 else int(sys.argv[1])
session = DBSession()
channel = get_channel()
try:
process_repositories(session=session, channel=channel, consume_count=consume_count)
channel.close()
channel = get_channel()
process_task_replies(session=session, channel=channel, consume_count=consume_count)
finally:
session.commit()
channel.close()
session.close()
if __name__ == "__main__":
init_logging()
main()
main()