feat: multiprocessing for RabbitMQ worker
Helps with very long tasks
This commit is contained in:
@@ -2,6 +2,8 @@ from tempfile import TemporaryDirectory
|
|||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
|
import multiprocessing
|
||||||
|
import queue
|
||||||
|
|
||||||
from src.git import clone_repo
|
from src.git import clone_repo
|
||||||
from src.optipng import apply_optipng_recursive
|
from src.optipng import apply_optipng_recursive
|
||||||
@@ -9,6 +11,17 @@ from src.messages import AnalyzeTask, AnalyzeResponse, create_analyze_response
|
|||||||
from src.rabbit import get_channel, send_reply
|
from src.rabbit import get_channel, send_reply
|
||||||
from src.settings import RABBIT_TASK_QUEUE, init_logging, WORKER_NAME
|
from src.settings import RABBIT_TASK_QUEUE, init_logging, WORKER_NAME
|
||||||
|
|
||||||
|
TASK_DELAY = 0.25
|
||||||
|
|
||||||
|
|
||||||
|
def delay_generator():
|
||||||
|
for _ in range(10):
|
||||||
|
yield 0.05
|
||||||
|
for _ in range(10):
|
||||||
|
yield 0.1
|
||||||
|
while True:
|
||||||
|
yield TASK_DELAY
|
||||||
|
|
||||||
|
|
||||||
def process_task(task: AnalyzeTask) -> AnalyzeResponse:
|
def process_task(task: AnalyzeTask) -> AnalyzeResponse:
|
||||||
with TemporaryDirectory() as repo_path:
|
with TemporaryDirectory() as repo_path:
|
||||||
@@ -27,9 +40,7 @@ def process_task(task: AnalyzeTask) -> AnalyzeResponse:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def rabbit_callback(ch, method, properties, body):
|
def _process_main(results_queue: queue.Queue, task: AnalyzeTask):
|
||||||
task = AnalyzeTask.from_data(json.loads(body))
|
|
||||||
logging.info(f"New task url: {task.url}, level: {task.level}")
|
|
||||||
try:
|
try:
|
||||||
res = process_task(task).data
|
res = process_task(task).data
|
||||||
res["status"] = "success"
|
res["status"] = "success"
|
||||||
@@ -40,6 +51,23 @@ def rabbit_callback(ch, method, properties, body):
|
|||||||
"message": str(err),
|
"message": str(err),
|
||||||
"task_id": task.task_id
|
"task_id": task.task_id
|
||||||
}
|
}
|
||||||
|
results_queue.put(res)
|
||||||
|
|
||||||
|
|
||||||
|
def rabbit_callback(ch, method, properties, body):
|
||||||
|
task = AnalyzeTask.from_data(json.loads(body))
|
||||||
|
logging.info(f"New task url: {task.url}, level: {task.level}")
|
||||||
|
res_queue = multiprocessing.Queue()
|
||||||
|
method_process = multiprocessing.Process(
|
||||||
|
target=_process_main,
|
||||||
|
args=(res_queue, task)
|
||||||
|
)
|
||||||
|
method_process.start()
|
||||||
|
for delay in delay_generator():
|
||||||
|
ch.connection.sleep(delay)
|
||||||
|
if not method_process.is_alive():
|
||||||
|
break
|
||||||
|
res = res_queue.get()
|
||||||
send_reply(ch, json.dumps(res).encode("utf-8"))
|
send_reply(ch, json.dumps(res).encode("utf-8"))
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user