mirror of
https://github.com/AlekseyLobanov/AlekseyLobanov.github.io.git
synced 2026-01-12 05:02:02 +03:00
New version
This commit is contained in:
@@ -1,138 +1,138 @@
|
||||
<!DOCTYPE html><!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--><!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]--><!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]--><!--[if gt IE 8]><!--><html class="no-js"> <head><meta charset="utf-8"><meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"><title>Долгие сообщения в RabbitMQ</title><meta name="description" content><meta name="viewport" content="width=device-width"><link rel="stylesheet" href="../../theme/css/normalize.css"><link href="https://fonts.googleapis.com/css?family=Forum|Oswald|PT+Sans|Philosopher|Ubuntu+Mono" rel="stylesheet"><link rel="stylesheet" href="../../theme/css/font-awesome.min.css"><link rel="stylesheet" href="../../theme/css/main.css"><link rel="stylesheet" href="../../theme/css/blog.css"><link rel="stylesheet" href="../../theme/css/github.css"><link href="https://likemath.ru/feeds/all.atom.xml" type="application/atom+xml" rel="alternate" title="Блог 529 Atom Feed"><link href="https://likemath.ru/feeds/all.rss.xml" type="application/rss+xml" rel="alternate" title="Блог 529 RSS Feed"><script src="../../theme/js/vendor/modernizr-2.6.2.min.js"></script></head><body><!--[if lt IE 7]>
|
||||
<!DOCTYPE html><!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--><!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]--><!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]--><!--[if gt IE 8]><!--><html class=no-js> <!--<![endif]--> <head><meta charset=utf-8><meta http-equiv=X-UA-Compatible content="IE=edge,chrome=1"><title>Долгие сообщения в RabbitMQ</title><meta name=description content><meta name=viewport content="width=device-width"><link rel=stylesheet href=../../theme/css/normalize.css><link href="https://fonts.googleapis.com/css?family=Forum|Oswald|PT+Sans|Philosopher|Ubuntu+Mono" rel=stylesheet><link rel=stylesheet href=../../theme/css/font-awesome.min.css><link rel=stylesheet href=../../theme/css/main.css><link rel=stylesheet href=../../theme/css/blog.css><link rel=stylesheet href=../../theme/css/github.css><link href=https://likemath.ru/feeds/all.atom.xml type=application/atom+xml rel=alternate title="Блог 529 Atom Feed"><link href=https://likemath.ru/feeds/all.rss.xml type=application/rss+xml rel=alternate title="Блог 529 RSS Feed"><script src=../../theme/js/vendor/modernizr-2.6.2.min.js></script></head> <body> <!--[if lt IE 7]>
|
||||
<p class="chromeframe">You are using an <strong>outdated</strong> browser. Please <a href="http://browsehappy.com/">upgrade your browser</a> or <a href="http://www.google.com/chromeframe/?redirect=true">activate Google Chrome Frame</a> to improve your experience.</p>
|
||||
<![endif]--><div id="wrapper"><header id="sidebar" class="side-shadow"><hgroup id="site-header"><a id="site-title" href="../.."><h2><i class="icon-pencil"></i> Блог 529</h2></a><p id="site-desc"> Project Euler и остальное </p></hgroup><nav><ul id="nav-links"><li><a href="../../">Главная</a></li><li><a href="../../pages/projects.html">Мои проекты</a></li><li><a href="../../pages/about.html">Об авторе</a></li><li><a href="../../feeds/feed.atom.xml">Atom feed</a></li></ul></nav><footer id="site-info"><p> Powered by Pelican. </p></footer></header><div id="post-container"><ol id="post-list"><li><article class="post-entry"><header class="entry-header"><time class="post-time" datetime="2018-10-01T12:40:00+03:00" pubdate> Пн 01 Октябрь 2018 </time><a href="../../posts/dolgie-soobshcheniia-v-rabbitmq/" rel="bookmark"><h1>Долгие сообщения в RabbitMQ</h1></a></header><section class="post-content"><p>Предположим, что у вас появилось желание перекодировать фильмы на вашем медиасервере, и вы решили использовать production-ready решение для хранения заданий. Вы взяли RabbitMQ для управления очередями сообщений и Python для их обработки. Но почему-то сообщения обрабатываются нестабильно, клиент падает без всяких видимых причин. Попробуем понять почему такое может быть.</p><p>Возьмём готовый код из официального <a href="https://www.rabbitmq.com/tutorials/tutorial-one-python.html">туториала</a> RabbitMQ и немного его модифицируем, чтобы он обрабатывал сообщения за различное время.</p><h3>Producer</h3><div class="highlight"><pre><span class="code-line"><span></span><span class="ch">#!/usr/bin/python</span></span>
|
||||
<span class="code-line"><span class="c1"># -*- coding: utf-8 -*-</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="n">__author__</span> <span class="o">=</span> <span class="s2">"Aleksey Lobanov"</span></span>
|
||||
<span class="code-line"><span class="n">__license__</span> <span class="o">=</span> <span class="s2">"MIT"</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="kn">import</span> <span class="nn">sys</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="kn">import</span> <span class="nn">pika</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">"__main__"</span><span class="p">:</span></span>
|
||||
<span class="code-line"> <span class="n">connection</span> <span class="o">=</span> <span class="n">pika</span><span class="o">.</span><span class="n">BlockingConnection</span><span class="p">(</span></span>
|
||||
<span class="code-line"> <span class="n">pika</span><span class="o">.</span><span class="n">ConnectionParameters</span><span class="p">(</span><span class="n">host</span><span class="o">=</span><span class="s1">'localhost'</span><span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="n">channel</span> <span class="o">=</span> <span class="n">connection</span><span class="o">.</span><span class="n">channel</span><span class="p">()</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"> <span class="n">channel</span><span class="o">.</span><span class="n">queue_declare</span><span class="p">(</span><span class="n">queue</span><span class="o">=</span><span class="s1">'demo.hello'</span><span class="p">)</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"> <span class="c1"># чтение первого аргумента командной строки, если он есть</span></span>
|
||||
<span class="code-line"> <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">sys</span><span class="o">.</span><span class="n">argv</span><span class="p">)</span> <span class="o">></span> <span class="mi">1</span><span class="p">:</span></span>
|
||||
<span class="code-line"> <span class="n">delay_to_send</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">sys</span><span class="o">.</span><span class="n">argv</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span></span>
|
||||
<span class="code-line"> <span class="k">else</span><span class="p">:</span></span>
|
||||
<span class="code-line"> <span class="n">delay_to_send</span> <span class="o">=</span> <span class="mi">3</span></span>
|
||||
<span class="code-line"> <span class="n">channel</span><span class="o">.</span><span class="n">basic_publish</span><span class="p">(</span></span>
|
||||
<span class="code-line"> <span class="n">exchange</span><span class="o">=</span><span class="s1">''</span><span class="p">,</span></span>
|
||||
<span class="code-line"> <span class="n">routing_key</span><span class="o">=</span><span class="s1">'demo.hello'</span><span class="p">,</span></span>
|
||||
<span class="code-line"> <span class="n">body</span><span class="o">=</span><span class="nb">str</span><span class="p">(</span><span class="n">delay_to_send</span><span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="k">print</span><span class="p">(</span><span class="s2">" [x] Sent "</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">delay_to_send</span><span class="p">))</span></span>
|
||||
<span class="code-line"> <span class="n">connection</span><span class="o">.</span><span class="n">close</span><span class="p">()</span></span>
|
||||
</pre></div><h3>Consumer</h3><div class="highlight"><pre><span class="code-line"><span></span><span class="ch">#!/usr/bin/python</span></span>
|
||||
<span class="code-line"><span class="c1"># -*- coding: utf-8 -*-</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="n">__author__</span> <span class="o">=</span> <span class="s2">"Aleksey Lobanov"</span></span>
|
||||
<span class="code-line"><span class="n">__license__</span> <span class="o">=</span> <span class="s2">"MIT"</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="kn">import</span> <span class="nn">sys</span></span>
|
||||
<span class="code-line"><span class="kn">import</span> <span class="nn">time</span></span>
|
||||
<span class="code-line"><span class="kn">import</span> <span class="nn">math</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="kn">import</span> <span class="nn">pika</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="k">def</span> <span class="nf">callback</span><span class="p">(</span><span class="n">ch</span><span class="p">,</span> <span class="n">method</span><span class="p">,</span> <span class="n">properties</span><span class="p">,</span> <span class="n">body</span><span class="p">):</span></span>
|
||||
<span class="code-line"> <span class="k">print</span><span class="p">(</span><span class="s2">" [x] Received </span><span class="si">%r</span><span class="s2">"</span> <span class="o">%</span> <span class="n">body</span><span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="n">delay</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">body</span><span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="n">begin_at</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span></span>
|
||||
<span class="code-line"> <span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="n">delay</span><span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="k">print</span><span class="p">(</span><span class="s2">" [x] Finished {}"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">body</span><span class="p">))</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">"__main__"</span><span class="p">:</span></span>
|
||||
<span class="code-line"> <span class="n">connection</span> <span class="o">=</span> <span class="n">pika</span><span class="o">.</span><span class="n">BlockingConnection</span><span class="p">(</span><span class="n">pika</span><span class="o">.</span><span class="n">ConnectionParameters</span><span class="p">(</span><span class="n">host</span><span class="o">=</span><span class="s1">'localhost'</span><span class="p">))</span></span>
|
||||
<span class="code-line"> <span class="n">channel</span> <span class="o">=</span> <span class="n">connection</span><span class="o">.</span><span class="n">channel</span><span class="p">()</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"> <span class="n">channel</span><span class="o">.</span><span class="n">queue_declare</span><span class="p">(</span><span class="n">queue</span><span class="o">=</span><span class="s1">'demo.hello'</span><span class="p">)</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"> <span class="n">channel</span><span class="o">.</span><span class="n">basic_consume</span><span class="p">(</span></span>
|
||||
<span class="code-line"> <span class="n">callback</span><span class="p">,</span></span>
|
||||
<span class="code-line"> <span class="n">queue</span><span class="o">=</span><span class="s1">'demo.hello'</span><span class="p">,</span></span>
|
||||
<span class="code-line"> <span class="n">no_ack</span><span class="o">=</span><span class="bp">True</span></span>
|
||||
<span class="code-line"> <span class="p">)</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"> <span class="k">print</span><span class="p">(</span><span class="s1">' [*] Waiting for messages. To exit press CTRL+C'</span><span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="n">channel</span><span class="o">.</span><span class="n">start_consuming</span><span class="p">()</span></span>
|
||||
</pre></div><p>Пока мы посылаем маленькие числа, всё будет хорошо работать. Но если послать какое-то большое (в моём случае достаточно 200), то consumer потеряет соединение с сервером, будет ошибка. Скорее всего это будет <code>pika.exceptions.ConnectionClosed: (-1, 'EOF')</code> или <code>Socket Error 104</code> (<a href="https://github.com/pika/pika/issues/753">тут</a> есть обсуждение на GitHub библиотеки, но установка <code>prefetch_count=1</code> тоже не поможет). Эта проблема актуальна для обоих веток Python.</p><p>Реальная причина в том, что при обработке сообщения не происходит необходимого взаимодействия с RabbitMQ, не отправляются hearbeats, а без них сервер считает, что этот клиент погиб окончательно. Нужно отметить, что переход на другой тип соединения не помогает. Например использование <a href="https://pika.readthedocs.io/en/stable/examples/twisted_example.html">примера</a> на Twisted из официальной документации ничего не изменит.</p><p>У этой проблемы есть много решений. Будем считать, что сообщение, которое мы обрабатываем не разделяется на подзадачи и рассмотрим некоторые из них:</p><ol><li>Отключить hearbeats/увеличить их интервал так, чтобы самprefetch_countая долгая обработка сообщения вела к потере не более, чем одного. Это самое простое решение, но в таком случае мы теряем в надёжности. Внешнем сервисам мониторинга будет сложнее понять, обрабатываются ли сейчас сообщения или уже нет. И чем больше интервал, тем серьёзнее проблема. <strong>Не подходит</strong></li><li>Разбить обработку сообщения на несколько этапов. Часто это хорошее решение, но в предположении, что данная обработка не разбивается на более мелкие тоже <strong>не подходит</strong>. </li><li>Использовать <code>connection.sleep</code> вместо <code>time.sleep</code>, а также регулярный вызов <code>BlockingConnection.process_data_events</code>. Оба эти решения помогают, но их использование плохо по многим причинам. Во-первых это явное протекании абстракции, когда код для обработки сообщения вынужден работать с очередью. Во-вторых не всегда можно гарантировать, что эти функции вызываются достаточно часто, а это главная проблема. <strong>Не подходит</strong></li><li>Выделить отдельный процесс под <code>pika</code>. Вероятно, единственно универсальное решение. Если исходный код был правильно написан, то адаптация будет простой. Но у этого решения есть минусы, обязательные при использовании нескольких потоков/процессов. Также сама библиотека <code>pika</code> не является потокобезопасной.</li></ol><p>Возможное решение будет заключаться в вынесение обработки данных в отдельный метод:</p><div class="highlight"><pre><span class="code-line"><span></span><span class="k">def</span> <span class="nf">real_work</span><span class="p">(</span><span class="n">body</span><span class="p">):</span></span>
|
||||
<span class="code-line"> <span class="n">delay</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">body</span><span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="n">begin_at</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span></span>
|
||||
<span class="code-line"> <span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="n">delay</span><span class="p">)</span></span>
|
||||
</pre></div><p>и небольшой доработке callback-метода:</p><div class="highlight"><pre><span class="code-line"><span></span><span class="c1"># Конструктор, при запуске процесс запустит</span></span>
|
||||
<span class="code-line"><span class="c1"># функцию с указанными параметрами</span></span>
|
||||
<span class="code-line"><span class="n">work_process</span> <span class="o">=</span> <span class="n">multiprocessing</span><span class="o">.</span><span class="n">Process</span><span class="p">(</span></span>
|
||||
<span class="code-line"> <span class="n">target</span><span class="o">=</span><span class="n">real_work</span><span class="p">,</span></span>
|
||||
<span class="code-line"> <span class="n">args</span><span class="o">=</span><span class="p">(</span><span class="n">body</span><span class="p">,</span> <span class="p">)</span></span>
|
||||
<span class="code-line"><span class="p">)</span></span>
|
||||
<span class="code-line"><span class="n">work_process</span><span class="o">.</span><span class="n">start</span><span class="p">()</span> <span class="c1"># процесс нужно явно запустить</span></span>
|
||||
<span class="code-line"><span class="k">while</span> <span class="bp">True</span><span class="p">:</span></span>
|
||||
<span class="code-line"> <span class="n">ch</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="c1"># пока поток работает, используем функцию pika</span></span>
|
||||
<span class="code-line"> <span class="c1"># для обработки необходимых сообщений</span></span>
|
||||
<span class="code-line"> <span class="k">if</span> <span class="ow">not</span> <span class="n">work_process</span><span class="o">.</span><span class="n">is_alive</span><span class="p">():</span></span>
|
||||
<span class="code-line"> <span class="k">break</span></span>
|
||||
</pre></div><p>Тогда готовый код будет выглядеть так:</p><div class="highlight"><pre><span class="code-line"><span></span><span class="ch">#!/usr/bin/python</span></span>
|
||||
<span class="code-line"><span class="c1"># -*- coding: utf-8 -*-</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="n">__author__</span> <span class="o">=</span> <span class="s2">"Aleksey Lobanov"</span></span>
|
||||
<span class="code-line"><span class="n">__license__</span> <span class="o">=</span> <span class="s2">"MIT"</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="kn">import</span> <span class="nn">sys</span></span>
|
||||
<span class="code-line"><span class="kn">import</span> <span class="nn">time</span></span>
|
||||
<span class="code-line"><span class="kn">import</span> <span class="nn">math</span></span>
|
||||
<span class="code-line"><span class="kn">import</span> <span class="nn">multiprocessing</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="kn">import</span> <span class="nn">pika</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="k">def</span> <span class="nf">real_work</span><span class="p">(</span><span class="n">body</span><span class="p">):</span></span>
|
||||
<span class="code-line"> <span class="n">delay</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">body</span><span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="n">begin_at</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span></span>
|
||||
<span class="code-line"> <span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="n">delay</span><span class="p">)</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="k">def</span> <span class="nf">callback</span><span class="p">(</span><span class="n">ch</span><span class="p">,</span> <span class="n">method</span><span class="p">,</span> <span class="n">properties</span><span class="p">,</span> <span class="n">body</span><span class="p">):</span></span>
|
||||
<span class="code-line"> <span class="k">print</span><span class="p">(</span><span class="s2">" [x] Received </span><span class="si">%r</span><span class="s2">"</span> <span class="o">%</span> <span class="n">body</span><span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="n">work_process</span> <span class="o">=</span> <span class="n">multiprocessing</span><span class="o">.</span><span class="n">Process</span><span class="p">(</span></span>
|
||||
<span class="code-line"> <span class="n">target</span><span class="o">=</span><span class="n">real_work</span><span class="p">,</span></span>
|
||||
<span class="code-line"> <span class="n">args</span><span class="o">=</span><span class="p">(</span><span class="n">body</span><span class="p">,</span> <span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="n">work_process</span><span class="o">.</span><span class="n">start</span><span class="p">()</span></span>
|
||||
<span class="code-line"> <span class="k">while</span> <span class="bp">True</span><span class="p">:</span></span>
|
||||
<span class="code-line"> <span class="n">ch</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="k">if</span> <span class="ow">not</span> <span class="n">work_process</span><span class="o">.</span><span class="n">is_alive</span><span class="p">():</span></span>
|
||||
<span class="code-line"> <span class="k">break</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"> <span class="k">print</span><span class="p">(</span><span class="s2">" [x] Finished "</span> <span class="o">+</span> <span class="n">body</span><span class="p">)</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"><span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">"__main__"</span><span class="p">:</span></span>
|
||||
<span class="code-line"> <span class="n">connection</span> <span class="o">=</span> <span class="n">pika</span><span class="o">.</span><span class="n">BlockingConnection</span><span class="p">(</span><span class="n">pika</span><span class="o">.</span><span class="n">ConnectionParameters</span><span class="p">(</span><span class="n">host</span><span class="o">=</span><span class="s1">'localhost'</span><span class="p">))</span></span>
|
||||
<span class="code-line"> <span class="n">channel</span> <span class="o">=</span> <span class="n">connection</span><span class="o">.</span><span class="n">channel</span><span class="p">()</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"> <span class="n">channel</span><span class="o">.</span><span class="n">queue_declare</span><span class="p">(</span><span class="n">queue</span><span class="o">=</span><span class="s1">'demo.hello'</span><span class="p">)</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"> <span class="n">channel</span><span class="o">.</span><span class="n">basic_consume</span><span class="p">(</span></span>
|
||||
<span class="code-line"> <span class="n">callback</span><span class="p">,</span></span>
|
||||
<span class="code-line"> <span class="n">queue</span><span class="o">=</span><span class="s1">'demo.hello'</span><span class="p">,</span></span>
|
||||
<span class="code-line"> <span class="n">no_ack</span><span class="o">=</span><span class="bp">True</span></span>
|
||||
<span class="code-line"> <span class="p">)</span></span>
|
||||
<span class="code-line"></span>
|
||||
<span class="code-line"> <span class="k">print</span><span class="p">(</span><span class="s1">' [*] Waiting for messages. To exit press CTRL+C'</span><span class="p">)</span></span>
|
||||
<span class="code-line"> <span class="n">channel</span><span class="o">.</span><span class="n">start_consuming</span><span class="p">()</span></span>
|
||||
</pre></div></section><hr><aside class="post-meta"><p>Категория: <a href="../../category/python.html">Python</a></p><p>Теги: <a href="../../tag/python.html">Python</a>, <a href="../../tag/rabbitmq.html">RabbitMQ</a>, </p></aside><hr></article></li></ol></div></div><script type="text/javascript">
|
||||
<![endif]--> <div id=wrapper> <header id=sidebar class=side-shadow> <hgroup id=site-header> <a id=site-title href=../..><h2><i class=icon-pencil></i> Блог 529</h2></a> <p id=site-desc> Project Euler и остальное </p> </hgroup> <nav> <ul id=nav-links> <li><a href=../../ >Главная</a></li> <li><a href=../../pages/projects.html>Мои проекты</a></li> <li><a href=../../pages/about.html>Об авторе</a></li> <li><a href=../../feeds/feed.atom.xml>Atom feed</a></li> </ul> </nav> <footer id=site-info> <p> Powered by Pelican. </p> </footer></header> <div id=post-container> <ol id=post-list> <li> <article class=post-entry> <header class=entry-header> <time class=post-time datetime=2018-10-01T12:40:00+03:00 pubdate> Пн 01 октября 2018 </time> <a href=../../posts/dolgie-soobshcheniia-v-rabbitmq/ rel=bookmark><h1>Долгие сообщения в RabbitMQ</h1></a> </header> <section class=post-content> <p>Предположим, что у вас появилось желание перекодировать фильмы на вашем медиасервере, и вы решили использовать production-ready решение для хранения заданий. Вы взяли RabbitMQ для управления очередями сообщений и Python для их обработки. Но почему-то сообщения обрабатываются нестабильно, клиент падает без всяких видимых причин. Попробуем понять почему такое может быть.</p> <p>Возьмём готовый код из официального <a href=https://www.rabbitmq.com/tutorials/tutorial-one-python.html>туториала</a> RabbitMQ и немного его модифицируем, чтобы он обрабатывал сообщения за различное время.</p> <h3>Producer</h3> <div class=highlight><pre><span class=code-line><span></span><span class=ch>#!/usr/bin/python</span></span>
|
||||
<span class=code-line><span class=c1># -*- coding: utf-8 -*-</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=n>__author__</span> <span class=o>=</span> <span class=s2>"Aleksey Lobanov"</span></span>
|
||||
<span class=code-line><span class=n>__license__</span> <span class=o>=</span> <span class=s2>"MIT"</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=kn>import</span> <span class=nn>sys</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=kn>import</span> <span class=nn>pika</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=k>if</span> <span class=vm>__name__</span> <span class=o>==</span> <span class=s2>"__main__"</span><span class=p>:</span></span>
|
||||
<span class=code-line> <span class=n>connection</span> <span class=o>=</span> <span class=n>pika</span><span class=o>.</span><span class=n>BlockingConnection</span><span class=p>(</span></span>
|
||||
<span class=code-line> <span class=n>pika</span><span class=o>.</span><span class=n>ConnectionParameters</span><span class=p>(</span><span class=n>host</span><span class=o>=</span><span class=s1>'localhost'</span><span class=p>)</span></span>
|
||||
<span class=code-line> <span class=p>)</span></span>
|
||||
<span class=code-line> <span class=n>channel</span> <span class=o>=</span> <span class=n>connection</span><span class=o>.</span><span class=n>channel</span><span class=p>()</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line> <span class=n>channel</span><span class=o>.</span><span class=n>queue_declare</span><span class=p>(</span><span class=n>queue</span><span class=o>=</span><span class=s1>'demo.hello'</span><span class=p>)</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line> <span class=c1># чтение первого аргумента командной строки, если он есть</span></span>
|
||||
<span class=code-line> <span class=k>if</span> <span class=nb>len</span><span class=p>(</span><span class=n>sys</span><span class=o>.</span><span class=n>argv</span><span class=p>)</span> <span class=o>></span> <span class=mi>1</span><span class=p>:</span></span>
|
||||
<span class=code-line> <span class=n>delay_to_send</span> <span class=o>=</span> <span class=nb>int</span><span class=p>(</span><span class=n>sys</span><span class=o>.</span><span class=n>argv</span><span class=p>[</span><span class=mi>1</span><span class=p>])</span></span>
|
||||
<span class=code-line> <span class=k>else</span><span class=p>:</span></span>
|
||||
<span class=code-line> <span class=n>delay_to_send</span> <span class=o>=</span> <span class=mi>3</span></span>
|
||||
<span class=code-line> <span class=n>channel</span><span class=o>.</span><span class=n>basic_publish</span><span class=p>(</span></span>
|
||||
<span class=code-line> <span class=n>exchange</span><span class=o>=</span><span class=s1>''</span><span class=p>,</span></span>
|
||||
<span class=code-line> <span class=n>routing_key</span><span class=o>=</span><span class=s1>'demo.hello'</span><span class=p>,</span></span>
|
||||
<span class=code-line> <span class=n>body</span><span class=o>=</span><span class=nb>str</span><span class=p>(</span><span class=n>delay_to_send</span><span class=p>)</span></span>
|
||||
<span class=code-line> <span class=p>)</span></span>
|
||||
<span class=code-line> <span class=k>print</span><span class=p>(</span><span class=s2>" [x] Sent "</span> <span class=o>+</span> <span class=nb>str</span><span class=p>(</span><span class=n>delay_to_send</span><span class=p>))</span></span>
|
||||
<span class=code-line> <span class=n>connection</span><span class=o>.</span><span class=n>close</span><span class=p>()</span></span>
|
||||
</pre></div> <h3>Consumer</h3> <div class=highlight><pre><span class=code-line><span></span><span class=ch>#!/usr/bin/python</span></span>
|
||||
<span class=code-line><span class=c1># -*- coding: utf-8 -*-</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=n>__author__</span> <span class=o>=</span> <span class=s2>"Aleksey Lobanov"</span></span>
|
||||
<span class=code-line><span class=n>__license__</span> <span class=o>=</span> <span class=s2>"MIT"</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=kn>import</span> <span class=nn>sys</span></span>
|
||||
<span class=code-line><span class=kn>import</span> <span class=nn>time</span></span>
|
||||
<span class=code-line><span class=kn>import</span> <span class=nn>math</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=kn>import</span> <span class=nn>pika</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=k>def</span> <span class=nf>callback</span><span class=p>(</span><span class=n>ch</span><span class=p>,</span> <span class=n>method</span><span class=p>,</span> <span class=n>properties</span><span class=p>,</span> <span class=n>body</span><span class=p>):</span></span>
|
||||
<span class=code-line> <span class=k>print</span><span class=p>(</span><span class=s2>" [x] Received </span><span class=si>%r</span><span class=s2>"</span> <span class=o>%</span> <span class=n>body</span><span class=p>)</span></span>
|
||||
<span class=code-line> <span class=n>delay</span> <span class=o>=</span> <span class=nb>int</span><span class=p>(</span><span class=n>body</span><span class=p>)</span></span>
|
||||
<span class=code-line> <span class=n>begin_at</span> <span class=o>=</span> <span class=n>time</span><span class=o>.</span><span class=n>time</span><span class=p>()</span></span>
|
||||
<span class=code-line> <span class=n>time</span><span class=o>.</span><span class=n>sleep</span><span class=p>(</span><span class=n>delay</span><span class=p>)</span></span>
|
||||
<span class=code-line> <span class=k>print</span><span class=p>(</span><span class=s2>" [x] Finished {}"</span><span class=o>.</span><span class=n>format</span><span class=p>(</span><span class=n>body</span><span class=p>))</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=k>if</span> <span class=vm>__name__</span> <span class=o>==</span> <span class=s2>"__main__"</span><span class=p>:</span></span>
|
||||
<span class=code-line> <span class=n>connection</span> <span class=o>=</span> <span class=n>pika</span><span class=o>.</span><span class=n>BlockingConnection</span><span class=p>(</span><span class=n>pika</span><span class=o>.</span><span class=n>ConnectionParameters</span><span class=p>(</span><span class=n>host</span><span class=o>=</span><span class=s1>'localhost'</span><span class=p>))</span></span>
|
||||
<span class=code-line> <span class=n>channel</span> <span class=o>=</span> <span class=n>connection</span><span class=o>.</span><span class=n>channel</span><span class=p>()</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line> <span class=n>channel</span><span class=o>.</span><span class=n>queue_declare</span><span class=p>(</span><span class=n>queue</span><span class=o>=</span><span class=s1>'demo.hello'</span><span class=p>)</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line> <span class=n>channel</span><span class=o>.</span><span class=n>basic_consume</span><span class=p>(</span></span>
|
||||
<span class=code-line> <span class=n>callback</span><span class=p>,</span></span>
|
||||
<span class=code-line> <span class=n>queue</span><span class=o>=</span><span class=s1>'demo.hello'</span><span class=p>,</span></span>
|
||||
<span class=code-line> <span class=n>no_ack</span><span class=o>=</span><span class=bp>True</span></span>
|
||||
<span class=code-line> <span class=p>)</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line> <span class=k>print</span><span class=p>(</span><span class=s1>' [*] Waiting for messages. To exit press CTRL+C'</span><span class=p>)</span></span>
|
||||
<span class=code-line> <span class=n>channel</span><span class=o>.</span><span class=n>start_consuming</span><span class=p>()</span></span>
|
||||
</pre></div> <p>Пока мы посылаем маленькие числа, всё будет хорошо работать. Но если послать какое-то большое (в моём случае достаточно 200), то consumer потеряет соединение с сервером, будет ошибка. Скорее всего это будет <code>pika.exceptions.ConnectionClosed: (-1, 'EOF')</code> или <code>Socket Error 104</code> (<a href=https://github.com/pika/pika/issues/753>тут</a> есть обсуждение на GitHub библиотеки, но установка <code>prefetch_count=1</code> тоже не поможет). Эта проблема актуальна для обоих веток Python.</p> <p>Реальная причина в том, что при обработке сообщения не происходит необходимого взаимодействия с RabbitMQ, не отправляются hearbeats, а без них сервер считает, что этот клиент погиб окончательно. Нужно отметить, что переход на другой тип соединения не помогает. Например использование <a href=https://pika.readthedocs.io/en/stable/examples/twisted_example.html>примера</a> на Twisted из официальной документации ничего не изменит.</p> <p>У этой проблемы есть много решений. Будем считать, что сообщение, которое мы обрабатываем не разделяется на подзадачи и рассмотрим некоторые из них:</p> <ol> <li>Отключить hearbeats/увеличить их интервал так, чтобы самprefetch_countая долгая обработка сообщения вела к потере не более, чем одного. Это самое простое решение, но в таком случае мы теряем в надёжности. Внешнем сервисам мониторинга будет сложнее понять, обрабатываются ли сейчас сообщения или уже нет. И чем больше интервал, тем серьёзнее проблема. <strong>Не подходит</strong></li> <li>Разбить обработку сообщения на несколько этапов. Часто это хорошее решение, но в предположении, что данная обработка не разбивается на более мелкие тоже <strong>не подходит</strong>. </li> <li>Использовать <code>connection.sleep</code> вместо <code>time.sleep</code>, а также регулярный вызов <code>BlockingConnection.process_data_events</code>. Оба эти решения помогают, но их использование плохо по многим причинам. Во-первых это явное протекании абстракции, когда код для обработки сообщения вынужден работать с очередью. Во-вторых не всегда можно гарантировать, что эти функции вызываются достаточно часто, а это главная проблема. <strong>Не подходит</strong></li> <li>Выделить отдельный процесс под <code>pika</code>. Вероятно, единственно универсальное решение. Если исходный код был правильно написан, то адаптация будет простой. Но у этого решения есть минусы, обязательные при использовании нескольких потоков/процессов. Также сама библиотека <code>pika</code> не является потокобезопасной.</li> </ol> <p>Возможное решение будет заключаться в вынесение обработки данных в отдельный метод:</p> <div class=highlight><pre><span class=code-line><span></span><span class=k>def</span> <span class=nf>real_work</span><span class=p>(</span><span class=n>body</span><span class=p>):</span></span>
|
||||
<span class=code-line> <span class=n>delay</span> <span class=o>=</span> <span class=nb>int</span><span class=p>(</span><span class=n>body</span><span class=p>)</span></span>
|
||||
<span class=code-line> <span class=n>begin_at</span> <span class=o>=</span> <span class=n>time</span><span class=o>.</span><span class=n>time</span><span class=p>()</span></span>
|
||||
<span class=code-line> <span class=n>time</span><span class=o>.</span><span class=n>sleep</span><span class=p>(</span><span class=n>delay</span><span class=p>)</span></span>
|
||||
</pre></div> <p>и небольшой доработке callback-метода:</p> <div class=highlight><pre><span class=code-line><span></span><span class=c1># Конструктор, при запуске процесс запустит</span></span>
|
||||
<span class=code-line><span class=c1># функцию с указанными параметрами</span></span>
|
||||
<span class=code-line><span class=n>work_process</span> <span class=o>=</span> <span class=n>multiprocessing</span><span class=o>.</span><span class=n>Process</span><span class=p>(</span></span>
|
||||
<span class=code-line> <span class=n>target</span><span class=o>=</span><span class=n>real_work</span><span class=p>,</span></span>
|
||||
<span class=code-line> <span class=n>args</span><span class=o>=</span><span class=p>(</span><span class=n>body</span><span class=p>,</span> <span class=p>)</span></span>
|
||||
<span class=code-line><span class=p>)</span></span>
|
||||
<span class=code-line><span class=n>work_process</span><span class=o>.</span><span class=n>start</span><span class=p>()</span> <span class=c1># процесс нужно явно запустить</span></span>
|
||||
<span class=code-line><span class=k>while</span> <span class=bp>True</span><span class=p>:</span></span>
|
||||
<span class=code-line> <span class=n>ch</span><span class=o>.</span><span class=n>connection</span><span class=o>.</span><span class=n>sleep</span><span class=p>(</span><span class=mi>1</span><span class=p>)</span></span>
|
||||
<span class=code-line> <span class=c1># пока поток работает, используем функцию pika</span></span>
|
||||
<span class=code-line> <span class=c1># для обработки необходимых сообщений</span></span>
|
||||
<span class=code-line> <span class=k>if</span> <span class=ow>not</span> <span class=n>work_process</span><span class=o>.</span><span class=n>is_alive</span><span class=p>():</span></span>
|
||||
<span class=code-line> <span class=k>break</span></span>
|
||||
</pre></div> <p>Тогда готовый код будет выглядеть так:</p> <div class=highlight><pre><span class=code-line><span></span><span class=ch>#!/usr/bin/python</span></span>
|
||||
<span class=code-line><span class=c1># -*- coding: utf-8 -*-</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=n>__author__</span> <span class=o>=</span> <span class=s2>"Aleksey Lobanov"</span></span>
|
||||
<span class=code-line><span class=n>__license__</span> <span class=o>=</span> <span class=s2>"MIT"</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=kn>import</span> <span class=nn>sys</span></span>
|
||||
<span class=code-line><span class=kn>import</span> <span class=nn>time</span></span>
|
||||
<span class=code-line><span class=kn>import</span> <span class=nn>math</span></span>
|
||||
<span class=code-line><span class=kn>import</span> <span class=nn>multiprocessing</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=kn>import</span> <span class=nn>pika</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=k>def</span> <span class=nf>real_work</span><span class=p>(</span><span class=n>body</span><span class=p>):</span></span>
|
||||
<span class=code-line> <span class=n>delay</span> <span class=o>=</span> <span class=nb>int</span><span class=p>(</span><span class=n>body</span><span class=p>)</span></span>
|
||||
<span class=code-line> <span class=n>begin_at</span> <span class=o>=</span> <span class=n>time</span><span class=o>.</span><span class=n>time</span><span class=p>()</span></span>
|
||||
<span class=code-line> <span class=n>time</span><span class=o>.</span><span class=n>sleep</span><span class=p>(</span><span class=n>delay</span><span class=p>)</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=k>def</span> <span class=nf>callback</span><span class=p>(</span><span class=n>ch</span><span class=p>,</span> <span class=n>method</span><span class=p>,</span> <span class=n>properties</span><span class=p>,</span> <span class=n>body</span><span class=p>):</span></span>
|
||||
<span class=code-line> <span class=k>print</span><span class=p>(</span><span class=s2>" [x] Received </span><span class=si>%r</span><span class=s2>"</span> <span class=o>%</span> <span class=n>body</span><span class=p>)</span></span>
|
||||
<span class=code-line> <span class=n>work_process</span> <span class=o>=</span> <span class=n>multiprocessing</span><span class=o>.</span><span class=n>Process</span><span class=p>(</span></span>
|
||||
<span class=code-line> <span class=n>target</span><span class=o>=</span><span class=n>real_work</span><span class=p>,</span></span>
|
||||
<span class=code-line> <span class=n>args</span><span class=o>=</span><span class=p>(</span><span class=n>body</span><span class=p>,</span> <span class=p>)</span></span>
|
||||
<span class=code-line> <span class=p>)</span></span>
|
||||
<span class=code-line> <span class=n>work_process</span><span class=o>.</span><span class=n>start</span><span class=p>()</span></span>
|
||||
<span class=code-line> <span class=k>while</span> <span class=bp>True</span><span class=p>:</span></span>
|
||||
<span class=code-line> <span class=n>ch</span><span class=o>.</span><span class=n>connection</span><span class=o>.</span><span class=n>sleep</span><span class=p>(</span><span class=mi>1</span><span class=p>)</span></span>
|
||||
<span class=code-line> <span class=k>if</span> <span class=ow>not</span> <span class=n>work_process</span><span class=o>.</span><span class=n>is_alive</span><span class=p>():</span></span>
|
||||
<span class=code-line> <span class=k>break</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line> <span class=k>print</span><span class=p>(</span><span class=s2>" [x] Finished "</span> <span class=o>+</span> <span class=n>body</span><span class=p>)</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line><span class=k>if</span> <span class=vm>__name__</span> <span class=o>==</span> <span class=s2>"__main__"</span><span class=p>:</span></span>
|
||||
<span class=code-line> <span class=n>connection</span> <span class=o>=</span> <span class=n>pika</span><span class=o>.</span><span class=n>BlockingConnection</span><span class=p>(</span><span class=n>pika</span><span class=o>.</span><span class=n>ConnectionParameters</span><span class=p>(</span><span class=n>host</span><span class=o>=</span><span class=s1>'localhost'</span><span class=p>))</span></span>
|
||||
<span class=code-line> <span class=n>channel</span> <span class=o>=</span> <span class=n>connection</span><span class=o>.</span><span class=n>channel</span><span class=p>()</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line> <span class=n>channel</span><span class=o>.</span><span class=n>queue_declare</span><span class=p>(</span><span class=n>queue</span><span class=o>=</span><span class=s1>'demo.hello'</span><span class=p>)</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line> <span class=n>channel</span><span class=o>.</span><span class=n>basic_consume</span><span class=p>(</span></span>
|
||||
<span class=code-line> <span class=n>callback</span><span class=p>,</span></span>
|
||||
<span class=code-line> <span class=n>queue</span><span class=o>=</span><span class=s1>'demo.hello'</span><span class=p>,</span></span>
|
||||
<span class=code-line> <span class=n>no_ack</span><span class=o>=</span><span class=bp>True</span></span>
|
||||
<span class=code-line> <span class=p>)</span></span>
|
||||
<span class=code-line></span>
|
||||
<span class=code-line> <span class=k>print</span><span class=p>(</span><span class=s1>' [*] Waiting for messages. To exit press CTRL+C'</span><span class=p>)</span></span>
|
||||
<span class=code-line> <span class=n>channel</span><span class=o>.</span><span class=n>start_consuming</span><span class=p>()</span></span>
|
||||
</pre></div> </section> <hr> <aside class=post-meta> <p>Категория: <a href=../../category/python.html>Python</a></p> <p>Теги: <a href=../../tag/python.html>Python</a>, <a href=../../tag/rabbitmq.html>RabbitMQ</a>, </p> </aside> <hr> </article> </li> </ol> </div> </div> <script type=text/javascript>
|
||||
var _paq = _paq || [];
|
||||
_paq.push(['trackPageView']);
|
||||
_paq.push(['enableLinkTracking']);
|
||||
@@ -143,4 +143,4 @@
|
||||
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
|
||||
g.type='text/javascript'; g.async=true; g.defer=true; g.src=u+'piwik.js'; s.parentNode.insertBefore(g,s);
|
||||
})();
|
||||
</script> <script src="../../theme/js/main.js"></script></body></html>
|
||||
</script> <script src=../../theme/js/main.js></script> </body> </html>
|
||||
Reference in New Issue
Block a user