Merge "Increase robustness of WBE producer/consumers"
This commit is contained in:
@@ -23,7 +23,7 @@ from taskflow import storage as t_storage
|
||||
class WorkerBasedActionEngine(engine.ActionEngine):
|
||||
"""Worker based action engine.
|
||||
|
||||
Specific backend options:
|
||||
Specific backend options (extracted from provided engine options):
|
||||
|
||||
:param exchange: broker exchange exchange name in which executor / worker
|
||||
communication is performed
|
||||
@@ -40,6 +40,8 @@ class WorkerBasedActionEngine(engine.ActionEngine):
|
||||
for will have its result become a
|
||||
`RequestTimeout` exception instead of its
|
||||
normally returned value (or raised exception).
|
||||
:param retry_options: retry specific options (used to configure how kombu
|
||||
handles retrying under tolerable/transient failures).
|
||||
"""
|
||||
|
||||
_storage_factory = t_storage.SingleThreadedStorage
|
||||
@@ -66,6 +68,7 @@ class WorkerBasedActionEngine(engine.ActionEngine):
|
||||
uuid=flow_detail.uuid,
|
||||
url=options.get('url'),
|
||||
exchange=options.get('exchange', 'default'),
|
||||
retry_options=options.get('retry_options'),
|
||||
topics=options.get('topics', []),
|
||||
transport=options.get('transport'),
|
||||
transport_options=options.get('transport_options'),
|
||||
|
||||
@@ -44,6 +44,26 @@ _TransportDetails = collections.namedtuple('_TransportDetails',
|
||||
class Proxy(object):
|
||||
"""A proxy processes messages from/to the named exchange."""
|
||||
|
||||
# Settings that are by default used for consumers/producers to reconnect
|
||||
# under tolerable/transient failures...
|
||||
#
|
||||
# See: http://kombu.readthedocs.org/en/latest/reference/kombu.html for
|
||||
# what these values imply...
|
||||
_DEFAULT_RETRY_OPTIONS = {
|
||||
# The number of seconds we start sleeping for.
|
||||
'interval_start': 1,
|
||||
# How many seconds added to the interval for each retry.
|
||||
'interval_step': 1,
|
||||
# Maximum number of seconds to sleep between each retry.
|
||||
'interval_max': 1,
|
||||
# Maximum number of times to retry.
|
||||
'max_retries': 3,
|
||||
}
|
||||
# This is the only provided option that should be an int, the others
|
||||
# are allowed to be floats; used when we check that the user-provided
|
||||
# value is valid...
|
||||
_RETRY_INT_OPTS = frozenset(['max_retries'])
|
||||
|
||||
def __init__(self, topic, exchange_name, type_handlers, on_wait=None,
|
||||
**kwargs):
|
||||
self._topic = topic
|
||||
@@ -56,9 +76,28 @@ class Proxy(object):
|
||||
# running, otherwise requeue them.
|
||||
lambda data, message: not self.is_running)
|
||||
|
||||
# TODO(harlowja): make these keyword arguments explict...
|
||||
url = kwargs.get('url')
|
||||
transport = kwargs.get('transport')
|
||||
transport_opts = kwargs.get('transport_options')
|
||||
ensure_options = self._DEFAULT_RETRY_OPTIONS.copy()
|
||||
if 'retry_options' in kwargs and kwargs['retry_options'] is not None:
|
||||
# Override the defaults with any user provided values...
|
||||
usr_retry_options = kwargs['retry_options']
|
||||
for k in set(six.iterkeys(ensure_options)):
|
||||
if k in usr_retry_options:
|
||||
# Ensure that the right type is passed in...
|
||||
val = usr_retry_options[k]
|
||||
if k in self._RETRY_INT_OPTS:
|
||||
tmp_val = int(val)
|
||||
else:
|
||||
tmp_val = float(val)
|
||||
if tmp_val < 0:
|
||||
raise ValueError("Expected value greater or equal to"
|
||||
" zero for 'retry_options' %s; got"
|
||||
" %s instead" % (k, val))
|
||||
ensure_options[k] = tmp_val
|
||||
self._ensure_options = ensure_options
|
||||
|
||||
self._drain_events_timeout = DRAIN_EVENTS_PERIOD
|
||||
if transport == 'memory' and transport_opts:
|
||||
@@ -113,34 +152,60 @@ class Proxy(object):
|
||||
routing_keys = [routing_key]
|
||||
else:
|
||||
routing_keys = routing_key
|
||||
|
||||
def _publish(producer, routing_key):
|
||||
queue = self._make_queue(routing_key, self._exchange)
|
||||
producer.publish(body=msg.to_dict(),
|
||||
routing_key=routing_key,
|
||||
exchange=self._exchange,
|
||||
declare=[queue],
|
||||
type=msg.TYPE,
|
||||
reply_to=reply_to,
|
||||
correlation_id=correlation_id)
|
||||
|
||||
def _publish_errback(exc, interval):
|
||||
LOG.exception('Publishing error: %s', exc)
|
||||
LOG.info('Retry triggering in %s seconds', interval)
|
||||
|
||||
LOG.debug("Sending '%s' using routing keys %s", msg, routing_keys)
|
||||
with kombu.producers[self._conn].acquire(block=True) as producer:
|
||||
for routing_key in routing_keys:
|
||||
queue = self._make_queue(routing_key, self._exchange)
|
||||
producer.publish(body=msg.to_dict(),
|
||||
routing_key=routing_key,
|
||||
exchange=self._exchange,
|
||||
declare=[queue],
|
||||
type=msg.TYPE,
|
||||
reply_to=reply_to,
|
||||
correlation_id=correlation_id)
|
||||
with kombu.connections[self._conn].acquire(block=True) as conn:
|
||||
with conn.Producer() as producer:
|
||||
ensure_kwargs = self._ensure_options.copy()
|
||||
ensure_kwargs['errback'] = _publish_errback
|
||||
safe_publish = conn.ensure(producer, _publish, **ensure_kwargs)
|
||||
for routing_key in routing_keys:
|
||||
safe_publish(producer, routing_key)
|
||||
|
||||
def start(self):
|
||||
"""Start proxy."""
|
||||
|
||||
def _drain(conn, timeout):
|
||||
try:
|
||||
conn.drain_events(timeout=timeout)
|
||||
except socket.timeout:
|
||||
pass
|
||||
|
||||
def _drain_errback(exc, interval):
|
||||
LOG.exception('Draining error: %s', exc)
|
||||
LOG.info('Retry triggering in %s seconds', interval)
|
||||
|
||||
LOG.info("Starting to consume from the '%s' exchange.",
|
||||
self._exchange_name)
|
||||
with kombu.connections[self._conn].acquire(block=True) as conn:
|
||||
queue = self._make_queue(self._topic, self._exchange, channel=conn)
|
||||
with conn.Consumer(queues=queue,
|
||||
callbacks=[self._dispatcher.on_message]):
|
||||
callbacks = [self._dispatcher.on_message]
|
||||
with conn.Consumer(queues=queue, callbacks=callbacks) as consumer:
|
||||
ensure_kwargs = self._ensure_options.copy()
|
||||
ensure_kwargs['errback'] = _drain_errback
|
||||
safe_drain = conn.ensure(consumer, _drain, **ensure_kwargs)
|
||||
self._running.set()
|
||||
while self.is_running:
|
||||
try:
|
||||
conn.drain_events(timeout=self._drain_events_timeout)
|
||||
except socket.timeout:
|
||||
pass
|
||||
if self._on_wait is not None:
|
||||
self._on_wait()
|
||||
try:
|
||||
while self._running.is_set():
|
||||
safe_drain(conn, self._drain_events_timeout)
|
||||
if self._on_wait is not None:
|
||||
self._on_wait()
|
||||
finally:
|
||||
self._running.clear()
|
||||
|
||||
def wait(self):
|
||||
"""Wait until proxy is started."""
|
||||
|
||||
@@ -80,6 +80,8 @@ class Worker(object):
|
||||
:param threads_count: threads count to be passed to the default executor
|
||||
:param transport: transport to be used (e.g. amqp, memory, etc.)
|
||||
:param transport_options: transport specific options
|
||||
:param retry_options: retry specific options (used to configure how kombu
|
||||
handles retrying under tolerable/transient failures).
|
||||
"""
|
||||
|
||||
def __init__(self, exchange, topic, tasks, executor=None, **kwargs):
|
||||
|
||||
@@ -49,7 +49,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
|
||||
topics=[],
|
||||
transport=None,
|
||||
transport_options=None,
|
||||
transition_timeout=mock.ANY)
|
||||
transition_timeout=mock.ANY,
|
||||
retry_options=None)
|
||||
]
|
||||
self.assertEqual(self.master_mock.mock_calls, expected_calls)
|
||||
|
||||
@@ -64,7 +65,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
|
||||
transport='memory',
|
||||
transport_options={},
|
||||
transition_timeout=200,
|
||||
topics=topics)
|
||||
topics=topics,
|
||||
retry_options={})
|
||||
expected_calls = [
|
||||
mock.call.executor_class(uuid=eng.storage.flow_uuid,
|
||||
url=broker_url,
|
||||
@@ -72,7 +74,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
|
||||
topics=topics,
|
||||
transport='memory',
|
||||
transport_options={},
|
||||
transition_timeout=200)
|
||||
transition_timeout=200,
|
||||
retry_options={})
|
||||
]
|
||||
self.assertEqual(self.master_mock.mock_calls, expected_calls)
|
||||
|
||||
|
||||
@@ -43,8 +43,11 @@ class TestProxy(test.MockTestCase):
|
||||
proxy.kombu, 'Producer')
|
||||
|
||||
# connection mocking
|
||||
def _ensure(obj, func, *args, **kwargs):
|
||||
return func
|
||||
self.conn_inst_mock.drain_events.side_effect = [
|
||||
socket.timeout, socket.timeout, KeyboardInterrupt]
|
||||
self.conn_inst_mock.ensure = mock.MagicMock(side_effect=_ensure)
|
||||
|
||||
# connections mocking
|
||||
self.connections_mock = self.patch(
|
||||
@@ -54,11 +57,8 @@ class TestProxy(test.MockTestCase):
|
||||
self.conn_inst_mock
|
||||
|
||||
# producers mocking
|
||||
self.producers_mock = self.patch(
|
||||
"taskflow.engines.worker_based.proxy.kombu.producers",
|
||||
attach_as='producers')
|
||||
self.producers_mock.__getitem__().acquire().__enter__.return_value =\
|
||||
self.producer_inst_mock
|
||||
self.conn_inst_mock.Producer.return_value.__enter__ = mock.MagicMock()
|
||||
self.conn_inst_mock.Producer.return_value.__exit__ = mock.MagicMock()
|
||||
|
||||
# consumer mocking
|
||||
self.conn_inst_mock.Consumer.return_value.__enter__ = mock.MagicMock()
|
||||
@@ -85,11 +85,38 @@ class TestProxy(test.MockTestCase):
|
||||
mock.call.connection.Consumer(queues=self.queue_inst_mock,
|
||||
callbacks=[mock.ANY]),
|
||||
mock.call.connection.Consumer().__enter__(),
|
||||
mock.call.connection.ensure(mock.ANY, mock.ANY,
|
||||
interval_start=mock.ANY,
|
||||
interval_max=mock.ANY,
|
||||
max_retries=mock.ANY,
|
||||
interval_step=mock.ANY,
|
||||
errback=mock.ANY),
|
||||
] + calls + [
|
||||
mock.call.connection.Consumer().__exit__(exc_type, mock.ANY,
|
||||
mock.ANY)
|
||||
]
|
||||
|
||||
def proxy_publish_calls(self, calls, routing_key, exc_type=mock.ANY):
|
||||
return [
|
||||
mock.call.connection.Producer(),
|
||||
mock.call.connection.Producer().__enter__(),
|
||||
mock.call.connection.ensure(mock.ANY, mock.ANY,
|
||||
interval_start=mock.ANY,
|
||||
interval_max=mock.ANY,
|
||||
max_retries=mock.ANY,
|
||||
interval_step=mock.ANY,
|
||||
errback=mock.ANY),
|
||||
mock.call.Queue(name=self._queue_name(routing_key),
|
||||
routing_key=routing_key,
|
||||
exchange=self.exchange_inst_mock,
|
||||
durable=False,
|
||||
auto_delete=True,
|
||||
channel=None),
|
||||
] + calls + [
|
||||
mock.call.connection.Producer().__exit__(exc_type, mock.ANY,
|
||||
mock.ANY)
|
||||
]
|
||||
|
||||
def proxy(self, reset_master_mock=False, **kwargs):
|
||||
proxy_kwargs = dict(topic=self.topic,
|
||||
exchange_name=self.exchange_name,
|
||||
@@ -133,24 +160,19 @@ class TestProxy(test.MockTestCase):
|
||||
routing_key = 'routing-key'
|
||||
task_uuid = 'task-uuid'
|
||||
|
||||
self.proxy(reset_master_mock=True).publish(
|
||||
msg_mock, routing_key, correlation_id=task_uuid)
|
||||
p = self.proxy(reset_master_mock=True)
|
||||
p.publish(msg_mock, routing_key, correlation_id=task_uuid)
|
||||
|
||||
master_mock_calls = [
|
||||
mock.call.Queue(name=self._queue_name(routing_key),
|
||||
exchange=self.exchange_inst_mock,
|
||||
routing_key=routing_key,
|
||||
durable=False,
|
||||
auto_delete=True,
|
||||
channel=None),
|
||||
mock.call.producer.publish(body=msg_data,
|
||||
routing_key=routing_key,
|
||||
exchange=self.exchange_inst_mock,
|
||||
correlation_id=task_uuid,
|
||||
declare=[self.queue_inst_mock],
|
||||
type=msg_mock.TYPE,
|
||||
reply_to=None)
|
||||
]
|
||||
mock_producer = mock.call.connection.Producer()
|
||||
master_mock_calls = self.proxy_publish_calls([
|
||||
mock_producer.__enter__().publish(body=msg_data,
|
||||
routing_key=routing_key,
|
||||
exchange=self.exchange_inst_mock,
|
||||
correlation_id=task_uuid,
|
||||
declare=[self.queue_inst_mock],
|
||||
type=msg_mock.TYPE,
|
||||
reply_to=None)
|
||||
], routing_key)
|
||||
self.master_mock.assert_has_calls(master_mock_calls)
|
||||
|
||||
def test_start(self):
|
||||
|
||||
Reference in New Issue
Block a user