Merge "Use explicit WBE object arguments (instead of kwargs)"

This commit is contained in:
Jenkins 2015-01-23 06:56:54 +00:00 committed by Gerrit Code Review
commit 292adc5a62
10 changed files with 62 additions and 50 deletions

View File

@ -187,8 +187,16 @@ Worker-based
**Engine type**: ``'worker-based'``
For more information, please see :doc:`workers <workers>` for more details on
how the worker based engine operates (and the design decisions behind it).
.. note:: Since this engine is significantly more complicated (and
different) then the others we thought it appropriate to devote a
whole documentation section to it.
For further information, please refer to the the following:
.. toctree::
:maxdepth: 2
workers
How they run
============

View File

@ -29,11 +29,6 @@ Contents
jobs
conductors
.. toctree::
:hidden:
workers
Examples
--------

View File

@ -1,7 +1,3 @@
-------
Workers
-------
Overview
========
@ -98,9 +94,9 @@ Use-cases
Design
======
There are two communication sides, the *executor* and *worker* that communicate
using a proxy component. The proxy is designed to accept/publish messages
from/into a named exchange.
There are two communication sides, the *executor* (and associated engine
derivative) and *worker* that communicate using a proxy component. The proxy
is designed to accept/publish messages from/into a named exchange.
High level architecture
-----------------------
@ -390,7 +386,7 @@ Limitations
Interfaces
==========
.. automodule:: taskflow.engines.worker_based.worker
.. automodule:: taskflow.engines.worker_based.engine
.. automodule:: taskflow.engines.worker_based.proxy
.. automodule:: taskflow.engines.worker_based.executor
.. automodule:: taskflow.engines.worker_based.proxy
.. automodule:: taskflow.engines.worker_based.worker

View File

@ -67,14 +67,16 @@ class WorkerTaskExecutor(executor.TaskExecutor):
"""Executes tasks on remote workers."""
def __init__(self, uuid, exchange, topics,
transition_timeout=pr.REQUEST_TIMEOUT, **kwargs):
transition_timeout=pr.REQUEST_TIMEOUT,
url=None, transport=None, transport_options=None,
retry_options=None):
self._uuid = uuid
self._topics = topics
self._requests_cache = cache.RequestsCache()
self._transition_timeout = transition_timeout
self._workers_cache = cache.WorkersCache()
self._workers_arrival = threading.Condition()
handlers = {
type_handlers = {
pr.NOTIFY: [
self._process_notify,
functools.partial(pr.Notify.validate, response=True),
@ -84,8 +86,11 @@ class WorkerTaskExecutor(executor.TaskExecutor):
pr.Response.validate,
],
}
self._proxy = proxy.Proxy(uuid, exchange, handlers,
self._on_wait, **kwargs)
self._proxy = proxy.Proxy(uuid, exchange, type_handlers,
on_wait=self._on_wait, url=url,
transport=transport,
transport_options=transport_options,
retry_options=retry_options)
self._proxy_thread = None
self._periodic = PeriodicWorker(tt.Timeout(pr.NOTIFY_PERIOD),
[self._notify_topics])

View File

@ -64,10 +64,12 @@ class Proxy(object):
# value is valid...
_RETRY_INT_OPTS = frozenset(['max_retries'])
def __init__(self, topic, exchange_name, type_handlers, on_wait=None,
**kwargs):
def __init__(self, topic, exchange, type_handlers,
on_wait=None, url=None,
transport=None, transport_options=None,
retry_options=None):
self._topic = topic
self._exchange_name = exchange_name
self._exchange_name = exchange
self._on_wait = on_wait
self._running = threading_utils.Event()
self._dispatcher = dispatcher.TypeDispatcher(type_handlers)
@ -76,18 +78,13 @@ class Proxy(object):
# running, otherwise requeue them.
lambda data, message: not self.is_running)
# TODO(harlowja): make these keyword arguments explict...
url = kwargs.get('url')
transport = kwargs.get('transport')
transport_opts = kwargs.get('transport_options')
ensure_options = self._DEFAULT_RETRY_OPTIONS.copy()
if 'retry_options' in kwargs and kwargs['retry_options'] is not None:
if retry_options is not None:
# Override the defaults with any user provided values...
usr_retry_options = kwargs['retry_options']
for k in set(six.iterkeys(ensure_options)):
if k in usr_retry_options:
if k in retry_options:
# Ensure that the right type is passed in...
val = usr_retry_options[k]
val = retry_options[k]
if k in self._RETRY_INT_OPTS:
tmp_val = int(val)
else:
@ -100,14 +97,14 @@ class Proxy(object):
self._ensure_options = ensure_options
self._drain_events_timeout = DRAIN_EVENTS_PERIOD
if transport == 'memory' and transport_opts:
polling_interval = transport_opts.get('polling_interval')
if transport == 'memory' and transport_options:
polling_interval = transport_options.get('polling_interval')
if polling_interval is not None:
self._drain_events_timeout = polling_interval
# create connection
self._conn = kombu.Connection(url, transport=transport,
transport_options=transport_opts)
transport_options=transport_options)
# create exchange
self._exchange = kombu.Exchange(name=self._exchange_name,

View File

@ -45,8 +45,10 @@ def delayed(executor):
class Server(object):
"""Server implementation that waits for incoming tasks requests."""
def __init__(self, topic, exchange, executor, endpoints, **kwargs):
handlers = {
def __init__(self, topic, exchange, executor, endpoints,
url=None, transport=None, transport_options=None,
retry_options=None):
type_handlers = {
pr.NOTIFY: [
delayed(executor)(self._process_notify),
functools.partial(pr.Notify.validate, response=False),
@ -56,8 +58,10 @@ class Server(object):
pr.Request.validate,
],
}
self._proxy = proxy.Proxy(topic, exchange, handlers,
on_wait=None, **kwargs)
self._proxy = proxy.Proxy(topic, exchange, type_handlers,
url=url, transport=transport,
transport_options=transport_options,
retry_options=retry_options)
self._topic = topic
self._endpoints = dict([(endpoint.name, endpoint)
for endpoint in endpoints])

View File

@ -84,10 +84,13 @@ class TestWorkerTaskExecutor(test.MockTestCase):
def test_creation(self):
ex = self.executor(reset_master_mock=False)
master_mock_calls = [
mock.call.Proxy(self.executor_uuid, self.executor_exchange,
mock.ANY, ex._on_wait, url=self.broker_url)
mock.ANY, on_wait=ex._on_wait,
url=self.broker_url, transport=mock.ANY,
transport_options=mock.ANY,
retry_options=mock.ANY
)
]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
@ -250,8 +253,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.assertEqual(expected_calls, self.master_mock.mock_calls)
def test_execute_task_topic_not_found(self):
workers_info = {self.executor_topic: ['<unknown>']}
ex = self.executor(workers_info=workers_info)
ex = self.executor()
ex.execute_task(self.task, self.task_uuid, self.task_args)
expected_calls = [

View File

@ -28,7 +28,7 @@ class TestProxy(test.MockTestCase):
super(TestProxy, self).setUp()
self.topic = 'test-topic'
self.broker_url = 'test-url'
self.exchange_name = 'test-exchange'
self.exchange = 'test-exchange'
self.timeout = 5
self.de_period = proxy.DRAIN_EVENTS_PERIOD
@ -72,7 +72,7 @@ class TestProxy(test.MockTestCase):
self.resetMasterMock()
def _queue_name(self, topic):
return "%s_%s" % (self.exchange_name, topic)
return "%s_%s" % (self.exchange, topic)
def proxy_start_calls(self, calls, exc_type=mock.ANY):
return [
@ -119,7 +119,7 @@ class TestProxy(test.MockTestCase):
def proxy(self, reset_master_mock=False, **kwargs):
proxy_kwargs = dict(topic=self.topic,
exchange_name=self.exchange_name,
exchange=self.exchange,
url=self.broker_url,
type_handlers={})
proxy_kwargs.update(kwargs)
@ -134,7 +134,7 @@ class TestProxy(test.MockTestCase):
master_mock_calls = [
mock.call.Connection(self.broker_url, transport=None,
transport_options=None),
mock.call.Exchange(name=self.exchange_name,
mock.call.Exchange(name=self.exchange,
durable=False,
auto_delete=True)
]
@ -147,7 +147,7 @@ class TestProxy(test.MockTestCase):
master_mock_calls = [
mock.call.Connection(self.broker_url, transport='memory',
transport_options=transport_opts),
mock.call.Exchange(name=self.exchange_name,
mock.call.Exchange(name=self.exchange,
durable=False,
auto_delete=True)
]

View File

@ -86,7 +86,9 @@ class TestServer(test.MockTestCase):
# check calls
master_mock_calls = [
mock.call.Proxy(self.server_topic, self.server_exchange,
mock.ANY, url=self.broker_url, on_wait=mock.ANY)
mock.ANY, url=self.broker_url,
transport=mock.ANY, transport_options=mock.ANY,
retry_options=mock.ANY)
]
self.master_mock.assert_has_calls(master_mock_calls)
self.assertEqual(len(s._endpoints), 3)
@ -97,7 +99,9 @@ class TestServer(test.MockTestCase):
# check calls
master_mock_calls = [
mock.call.Proxy(self.server_topic, self.server_exchange,
mock.ANY, url=self.broker_url, on_wait=mock.ANY)
mock.ANY, url=self.broker_url,
transport=mock.ANY, transport_options=mock.ANY,
retry_options=mock.ANY)
]
self.master_mock.assert_has_calls(master_mock_calls)
self.assertEqual(len(s._endpoints), len(self.endpoints))

View File

@ -82,7 +82,8 @@ class TestWorker(test.MockTestCase):
master_mock_calls = [
mock.call.executor_class(10),
mock.call.Server(self.topic, self.exchange,
self.executor_inst_mock, [], url=self.broker_url)
self.executor_inst_mock, [],
url=self.broker_url)
]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)