From cb27080ea3cd5cddd7f91d866f6a9d1214c9e885 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 16 Jan 2015 15:37:44 -0800 Subject: [PATCH] Increase robustness of WBE producer/consumers Use the kombu provided ensure() decorator/wrapper along with sensible default settings to ensure that retries are attempted when kombu detects recoverable connection or recoverable channel errors have occurred. Change-Id: If47f72d02561d0b5d556ac386869a6122c8b871d --- taskflow/engines/worker_based/engine.py | 5 +- taskflow/engines/worker_based/proxy.py | 103 ++++++++++++++---- taskflow/engines/worker_based/worker.py | 2 + .../tests/unit/worker_based/test_creation.py | 9 +- .../tests/unit/worker_based/test_proxy.py | 66 +++++++---- 5 files changed, 140 insertions(+), 45 deletions(-) diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py index 8011222c..a161ee58 100644 --- a/taskflow/engines/worker_based/engine.py +++ b/taskflow/engines/worker_based/engine.py @@ -23,7 +23,7 @@ from taskflow import storage as t_storage class WorkerBasedActionEngine(engine.ActionEngine): """Worker based action engine. - Specific backend options: + Specific backend options (extracted from provided engine options): :param exchange: broker exchange exchange name in which executor / worker communication is performed @@ -40,6 +40,8 @@ class WorkerBasedActionEngine(engine.ActionEngine): for will have its result become a `RequestTimeout` exception instead of its normally returned value (or raised exception). + :param retry_options: retry specific options (used to configure how kombu + handles retrying under tolerable/transient failures). """ _storage_factory = t_storage.SingleThreadedStorage @@ -66,6 +68,7 @@ class WorkerBasedActionEngine(engine.ActionEngine): uuid=flow_detail.uuid, url=options.get('url'), exchange=options.get('exchange', 'default'), + retry_options=options.get('retry_options'), topics=options.get('topics', []), transport=options.get('transport'), transport_options=options.get('transport_options'), diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index 1c870595..9a3b8e09 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -44,6 +44,26 @@ _TransportDetails = collections.namedtuple('_TransportDetails', class Proxy(object): """A proxy processes messages from/to the named exchange.""" + # Settings that are by default used for consumers/producers to reconnect + # under tolerable/transient failures... + # + # See: http://kombu.readthedocs.org/en/latest/reference/kombu.html for + # what these values imply... + _DEFAULT_RETRY_OPTIONS = { + # The number of seconds we start sleeping for. + 'interval_start': 1, + # How many seconds added to the interval for each retry. + 'interval_step': 1, + # Maximum number of seconds to sleep between each retry. + 'interval_max': 1, + # Maximum number of times to retry. + 'max_retries': 3, + } + # This is the only provided option that should be an int, the others + # are allowed to be floats; used when we check that the user-provided + # value is valid... + _RETRY_INT_OPTS = frozenset(['max_retries']) + def __init__(self, topic, exchange_name, type_handlers, on_wait=None, **kwargs): self._topic = topic @@ -56,9 +76,28 @@ class Proxy(object): # running, otherwise requeue them. lambda data, message: not self.is_running) + # TODO(harlowja): make these keyword arguments explict... url = kwargs.get('url') transport = kwargs.get('transport') transport_opts = kwargs.get('transport_options') + ensure_options = self._DEFAULT_RETRY_OPTIONS.copy() + if 'retry_options' in kwargs and kwargs['retry_options'] is not None: + # Override the defaults with any user provided values... + usr_retry_options = kwargs['retry_options'] + for k in set(six.iterkeys(ensure_options)): + if k in usr_retry_options: + # Ensure that the right type is passed in... + val = usr_retry_options[k] + if k in self._RETRY_INT_OPTS: + tmp_val = int(val) + else: + tmp_val = float(val) + if tmp_val < 0: + raise ValueError("Expected value greater or equal to" + " zero for 'retry_options' %s; got" + " %s instead" % (k, val)) + ensure_options[k] = tmp_val + self._ensure_options = ensure_options self._drain_events_timeout = DRAIN_EVENTS_PERIOD if transport == 'memory' and transport_opts: @@ -113,34 +152,60 @@ class Proxy(object): routing_keys = [routing_key] else: routing_keys = routing_key + + def _publish(producer, routing_key): + queue = self._make_queue(routing_key, self._exchange) + producer.publish(body=msg.to_dict(), + routing_key=routing_key, + exchange=self._exchange, + declare=[queue], + type=msg.TYPE, + reply_to=reply_to, + correlation_id=correlation_id) + + def _publish_errback(exc, interval): + LOG.exception('Publishing error: %s', exc) + LOG.info('Retry triggering in %s seconds', interval) + LOG.debug("Sending '%s' using routing keys %s", msg, routing_keys) - with kombu.producers[self._conn].acquire(block=True) as producer: - for routing_key in routing_keys: - queue = self._make_queue(routing_key, self._exchange) - producer.publish(body=msg.to_dict(), - routing_key=routing_key, - exchange=self._exchange, - declare=[queue], - type=msg.TYPE, - reply_to=reply_to, - correlation_id=correlation_id) + with kombu.connections[self._conn].acquire(block=True) as conn: + with conn.Producer() as producer: + ensure_kwargs = self._ensure_options.copy() + ensure_kwargs['errback'] = _publish_errback + safe_publish = conn.ensure(producer, _publish, **ensure_kwargs) + for routing_key in routing_keys: + safe_publish(producer, routing_key) def start(self): """Start proxy.""" + + def _drain(conn, timeout): + try: + conn.drain_events(timeout=timeout) + except socket.timeout: + pass + + def _drain_errback(exc, interval): + LOG.exception('Draining error: %s', exc) + LOG.info('Retry triggering in %s seconds', interval) + LOG.info("Starting to consume from the '%s' exchange.", self._exchange_name) with kombu.connections[self._conn].acquire(block=True) as conn: queue = self._make_queue(self._topic, self._exchange, channel=conn) - with conn.Consumer(queues=queue, - callbacks=[self._dispatcher.on_message]): + callbacks = [self._dispatcher.on_message] + with conn.Consumer(queues=queue, callbacks=callbacks) as consumer: + ensure_kwargs = self._ensure_options.copy() + ensure_kwargs['errback'] = _drain_errback + safe_drain = conn.ensure(consumer, _drain, **ensure_kwargs) self._running.set() - while self.is_running: - try: - conn.drain_events(timeout=self._drain_events_timeout) - except socket.timeout: - pass - if self._on_wait is not None: - self._on_wait() + try: + while self._running.is_set(): + safe_drain(conn, self._drain_events_timeout) + if self._on_wait is not None: + self._on_wait() + finally: + self._running.clear() def wait(self): """Wait until proxy is started.""" diff --git a/taskflow/engines/worker_based/worker.py b/taskflow/engines/worker_based/worker.py index 98e690ef..5ac0cf4f 100644 --- a/taskflow/engines/worker_based/worker.py +++ b/taskflow/engines/worker_based/worker.py @@ -80,6 +80,8 @@ class Worker(object): :param threads_count: threads count to be passed to the default executor :param transport: transport to be used (e.g. amqp, memory, etc.) :param transport_options: transport specific options + :param retry_options: retry specific options (used to configure how kombu + handles retrying under tolerable/transient failures). """ def __init__(self, exchange, topic, tasks, executor=None, **kwargs): diff --git a/taskflow/tests/unit/worker_based/test_creation.py b/taskflow/tests/unit/worker_based/test_creation.py index 6764926a..887498ce 100644 --- a/taskflow/tests/unit/worker_based/test_creation.py +++ b/taskflow/tests/unit/worker_based/test_creation.py @@ -49,7 +49,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase): topics=[], transport=None, transport_options=None, - transition_timeout=mock.ANY) + transition_timeout=mock.ANY, + retry_options=None) ] self.assertEqual(self.master_mock.mock_calls, expected_calls) @@ -64,7 +65,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase): transport='memory', transport_options={}, transition_timeout=200, - topics=topics) + topics=topics, + retry_options={}) expected_calls = [ mock.call.executor_class(uuid=eng.storage.flow_uuid, url=broker_url, @@ -72,7 +74,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase): topics=topics, transport='memory', transport_options={}, - transition_timeout=200) + transition_timeout=200, + retry_options={}) ] self.assertEqual(self.master_mock.mock_calls, expected_calls) diff --git a/taskflow/tests/unit/worker_based/test_proxy.py b/taskflow/tests/unit/worker_based/test_proxy.py index a3c7d13f..daf9b60e 100644 --- a/taskflow/tests/unit/worker_based/test_proxy.py +++ b/taskflow/tests/unit/worker_based/test_proxy.py @@ -43,8 +43,11 @@ class TestProxy(test.MockTestCase): proxy.kombu, 'Producer') # connection mocking + def _ensure(obj, func, *args, **kwargs): + return func self.conn_inst_mock.drain_events.side_effect = [ socket.timeout, socket.timeout, KeyboardInterrupt] + self.conn_inst_mock.ensure = mock.MagicMock(side_effect=_ensure) # connections mocking self.connections_mock = self.patch( @@ -54,11 +57,8 @@ class TestProxy(test.MockTestCase): self.conn_inst_mock # producers mocking - self.producers_mock = self.patch( - "taskflow.engines.worker_based.proxy.kombu.producers", - attach_as='producers') - self.producers_mock.__getitem__().acquire().__enter__.return_value =\ - self.producer_inst_mock + self.conn_inst_mock.Producer.return_value.__enter__ = mock.MagicMock() + self.conn_inst_mock.Producer.return_value.__exit__ = mock.MagicMock() # consumer mocking self.conn_inst_mock.Consumer.return_value.__enter__ = mock.MagicMock() @@ -85,11 +85,38 @@ class TestProxy(test.MockTestCase): mock.call.connection.Consumer(queues=self.queue_inst_mock, callbacks=[mock.ANY]), mock.call.connection.Consumer().__enter__(), + mock.call.connection.ensure(mock.ANY, mock.ANY, + interval_start=mock.ANY, + interval_max=mock.ANY, + max_retries=mock.ANY, + interval_step=mock.ANY, + errback=mock.ANY), ] + calls + [ mock.call.connection.Consumer().__exit__(exc_type, mock.ANY, mock.ANY) ] + def proxy_publish_calls(self, calls, routing_key, exc_type=mock.ANY): + return [ + mock.call.connection.Producer(), + mock.call.connection.Producer().__enter__(), + mock.call.connection.ensure(mock.ANY, mock.ANY, + interval_start=mock.ANY, + interval_max=mock.ANY, + max_retries=mock.ANY, + interval_step=mock.ANY, + errback=mock.ANY), + mock.call.Queue(name=self._queue_name(routing_key), + routing_key=routing_key, + exchange=self.exchange_inst_mock, + durable=False, + auto_delete=True, + channel=None), + ] + calls + [ + mock.call.connection.Producer().__exit__(exc_type, mock.ANY, + mock.ANY) + ] + def proxy(self, reset_master_mock=False, **kwargs): proxy_kwargs = dict(topic=self.topic, exchange_name=self.exchange_name, @@ -133,24 +160,19 @@ class TestProxy(test.MockTestCase): routing_key = 'routing-key' task_uuid = 'task-uuid' - self.proxy(reset_master_mock=True).publish( - msg_mock, routing_key, correlation_id=task_uuid) + p = self.proxy(reset_master_mock=True) + p.publish(msg_mock, routing_key, correlation_id=task_uuid) - master_mock_calls = [ - mock.call.Queue(name=self._queue_name(routing_key), - exchange=self.exchange_inst_mock, - routing_key=routing_key, - durable=False, - auto_delete=True, - channel=None), - mock.call.producer.publish(body=msg_data, - routing_key=routing_key, - exchange=self.exchange_inst_mock, - correlation_id=task_uuid, - declare=[self.queue_inst_mock], - type=msg_mock.TYPE, - reply_to=None) - ] + mock_producer = mock.call.connection.Producer() + master_mock_calls = self.proxy_publish_calls([ + mock_producer.__enter__().publish(body=msg_data, + routing_key=routing_key, + exchange=self.exchange_inst_mock, + correlation_id=task_uuid, + declare=[self.queue_inst_mock], + type=msg_mock.TYPE, + reply_to=None) + ], routing_key) self.master_mock.assert_has_calls(master_mock_calls) def test_start(self):