четверг, 5 сентября 2013 г.

Настройка таймаутов для Python-пакета amqp

Только что столкнулся с малоприятным багом в одном клиенте для протокола AMQP - пакет amqp (версии 1.0.13, но и в 1.2.1 этот баг имеется). Этот клиент по умолчанию используется во фреймворке kombu, на базе которого основана реализация очереди задач - Celery. В простых случаях вы вероятно даже не столкнётесь с этим багом, но знать про него будет полезно.

У меня в проекте этот баг проявил себя только в условиях идентичных тем, которые будут на production-серверах, а именно при использовании кластера RabbitMQ за AWS ELB (Elastic Load Balancer). Настройки амазоновского балансера таковы, что он автоматически рвёт неактивные в течении одной минуты соединения. Сама по себе эта проблема решается организацией heartbeat, но в моём случае видимо балансер имеет ещё какие то, неизвестные мне, особености. Из-за них в пакете amqp иногда происходит зависание при отправке данных в ранее открытый сокет. В моём случае это зависание длилось примерно 15 минут, после чего сокет бросал ошибку socket.timeout и kombu благополучно выполнял переподключение к RabbitMQ. Реальную причину почему такое происходило я выяснять не стал, т.к. решил что проще будет лечить последствия, т.е. слишком большой таймаут на сокете.

В документации к пакету amqp есть утверждение о том, что он поддерживает таймауты (в отличии от аналогичного, но старого пакета amqplib). На самом деле это утверждение истинно только частично. Указать таймаут можно исключительно для момента подключения к RabbitMQ. После того как сокет будет подключен автор пакета, по непонятным причинам, отключает таймаут на сокете, в результате чего я и получил описаный выше баг.

Подглядев как работает с таймаутами для сокета клиент Pika, рекомендуемый документацией по RabbitMQ, я быстро сделал патч для пакета amqp. Возможно он поможет кому то справится с аналогичными проблемами.

--- amqp/transport.py
+++ amqp/transport.py
@@ -22,6 +22,7 @@
 from __future__ import absolute_import
 
 import errno
+import os
 import re
 import socket
 
@@ -98,7 +99,11 @@
             # Didn't connect, return the most recent error message
             raise socket.error(last_err)
 
-        self.sock.settimeout(None)
+        try:
+            amqp_socket_timeout = float(os.environ.get('AMQP_SOCKET_TIMEOUT', '0.25'))
+        except ValueError:
+            amqp_socket_timeout = None
+        self.sock.settimeout(amqp_socket_timeout)
         self.sock.setsockopt(SOL_TCP, socket.TCP_NODELAY, 1)
         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)

Комментариев нет:

Отправить комментарий