diff --git a/doc/source/engines.rst b/doc/source/engines.rst index 0231ae86..e98eacde 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -187,8 +187,16 @@ Worker-based **Engine type**: ``'worker-based'`` -For more information, please see :doc:`workers ` for more details on -how the worker based engine operates (and the design decisions behind it). +.. note:: Since this engine is significantly more complicated (and + different) then the others we thought it appropriate to devote a + whole documentation section to it. + +For further information, please refer to the the following: + +.. toctree:: + :maxdepth: 2 + + workers How they run ============ diff --git a/doc/source/index.rst b/doc/source/index.rst index 657a08be..7ab0fedd 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -29,11 +29,6 @@ Contents jobs conductors -.. toctree:: - :hidden: - - workers - Examples -------- diff --git a/doc/source/workers.rst b/doc/source/workers.rst index 373615ef..cabe7c5b 100644 --- a/doc/source/workers.rst +++ b/doc/source/workers.rst @@ -1,7 +1,3 @@ -------- -Workers -------- - Overview ======== @@ -98,9 +94,9 @@ Use-cases Design ====== -There are two communication sides, the *executor* and *worker* that communicate -using a proxy component. The proxy is designed to accept/publish messages -from/into a named exchange. +There are two communication sides, the *executor* (and associated engine +derivative) and *worker* that communicate using a proxy component. The proxy +is designed to accept/publish messages from/into a named exchange. High level architecture ----------------------- @@ -390,7 +386,7 @@ Limitations Interfaces ========== -.. automodule:: taskflow.engines.worker_based.worker .. automodule:: taskflow.engines.worker_based.engine -.. automodule:: taskflow.engines.worker_based.proxy .. automodule:: taskflow.engines.worker_based.executor +.. automodule:: taskflow.engines.worker_based.proxy +.. automodule:: taskflow.engines.worker_based.worker diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index b07c73f8..cdc63614 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -67,14 +67,16 @@ class WorkerTaskExecutor(executor.TaskExecutor): """Executes tasks on remote workers.""" def __init__(self, uuid, exchange, topics, - transition_timeout=pr.REQUEST_TIMEOUT, **kwargs): + transition_timeout=pr.REQUEST_TIMEOUT, + url=None, transport=None, transport_options=None, + retry_options=None): self._uuid = uuid self._topics = topics self._requests_cache = cache.RequestsCache() self._transition_timeout = transition_timeout self._workers_cache = cache.WorkersCache() self._workers_arrival = threading.Condition() - handlers = { + type_handlers = { pr.NOTIFY: [ self._process_notify, functools.partial(pr.Notify.validate, response=True), @@ -84,8 +86,11 @@ class WorkerTaskExecutor(executor.TaskExecutor): pr.Response.validate, ], } - self._proxy = proxy.Proxy(uuid, exchange, handlers, - self._on_wait, **kwargs) + self._proxy = proxy.Proxy(uuid, exchange, type_handlers, + on_wait=self._on_wait, url=url, + transport=transport, + transport_options=transport_options, + retry_options=retry_options) self._proxy_thread = None self._periodic = PeriodicWorker(tt.Timeout(pr.NOTIFY_PERIOD), [self._notify_topics]) diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index 9a3b8e09..1770d562 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -64,10 +64,12 @@ class Proxy(object): # value is valid... _RETRY_INT_OPTS = frozenset(['max_retries']) - def __init__(self, topic, exchange_name, type_handlers, on_wait=None, - **kwargs): + def __init__(self, topic, exchange, type_handlers, + on_wait=None, url=None, + transport=None, transport_options=None, + retry_options=None): self._topic = topic - self._exchange_name = exchange_name + self._exchange_name = exchange self._on_wait = on_wait self._running = threading_utils.Event() self._dispatcher = dispatcher.TypeDispatcher(type_handlers) @@ -76,18 +78,13 @@ class Proxy(object): # running, otherwise requeue them. lambda data, message: not self.is_running) - # TODO(harlowja): make these keyword arguments explict... - url = kwargs.get('url') - transport = kwargs.get('transport') - transport_opts = kwargs.get('transport_options') ensure_options = self._DEFAULT_RETRY_OPTIONS.copy() - if 'retry_options' in kwargs and kwargs['retry_options'] is not None: + if retry_options is not None: # Override the defaults with any user provided values... - usr_retry_options = kwargs['retry_options'] for k in set(six.iterkeys(ensure_options)): - if k in usr_retry_options: + if k in retry_options: # Ensure that the right type is passed in... - val = usr_retry_options[k] + val = retry_options[k] if k in self._RETRY_INT_OPTS: tmp_val = int(val) else: @@ -100,14 +97,14 @@ class Proxy(object): self._ensure_options = ensure_options self._drain_events_timeout = DRAIN_EVENTS_PERIOD - if transport == 'memory' and transport_opts: - polling_interval = transport_opts.get('polling_interval') + if transport == 'memory' and transport_options: + polling_interval = transport_options.get('polling_interval') if polling_interval is not None: self._drain_events_timeout = polling_interval # create connection self._conn = kombu.Connection(url, transport=transport, - transport_options=transport_opts) + transport_options=transport_options) # create exchange self._exchange = kombu.Exchange(name=self._exchange_name, diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index bb7a97d2..6e64f1cd 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -45,8 +45,10 @@ def delayed(executor): class Server(object): """Server implementation that waits for incoming tasks requests.""" - def __init__(self, topic, exchange, executor, endpoints, **kwargs): - handlers = { + def __init__(self, topic, exchange, executor, endpoints, + url=None, transport=None, transport_options=None, + retry_options=None): + type_handlers = { pr.NOTIFY: [ delayed(executor)(self._process_notify), functools.partial(pr.Notify.validate, response=False), @@ -56,8 +58,10 @@ class Server(object): pr.Request.validate, ], } - self._proxy = proxy.Proxy(topic, exchange, handlers, - on_wait=None, **kwargs) + self._proxy = proxy.Proxy(topic, exchange, type_handlers, + url=url, transport=transport, + transport_options=transport_options, + retry_options=retry_options) self._topic = topic self._endpoints = dict([(endpoint.name, endpoint) for endpoint in endpoints]) diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index 4e0c38bc..cdb421d1 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -84,10 +84,13 @@ class TestWorkerTaskExecutor(test.MockTestCase): def test_creation(self): ex = self.executor(reset_master_mock=False) - master_mock_calls = [ mock.call.Proxy(self.executor_uuid, self.executor_exchange, - mock.ANY, ex._on_wait, url=self.broker_url) + mock.ANY, on_wait=ex._on_wait, + url=self.broker_url, transport=mock.ANY, + transport_options=mock.ANY, + retry_options=mock.ANY + ) ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) @@ -250,8 +253,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertEqual(expected_calls, self.master_mock.mock_calls) def test_execute_task_topic_not_found(self): - workers_info = {self.executor_topic: ['']} - ex = self.executor(workers_info=workers_info) + ex = self.executor() ex.execute_task(self.task, self.task_uuid, self.task_args) expected_calls = [ diff --git a/taskflow/tests/unit/worker_based/test_proxy.py b/taskflow/tests/unit/worker_based/test_proxy.py index daf9b60e..7ec91780 100644 --- a/taskflow/tests/unit/worker_based/test_proxy.py +++ b/taskflow/tests/unit/worker_based/test_proxy.py @@ -28,7 +28,7 @@ class TestProxy(test.MockTestCase): super(TestProxy, self).setUp() self.topic = 'test-topic' self.broker_url = 'test-url' - self.exchange_name = 'test-exchange' + self.exchange = 'test-exchange' self.timeout = 5 self.de_period = proxy.DRAIN_EVENTS_PERIOD @@ -72,7 +72,7 @@ class TestProxy(test.MockTestCase): self.resetMasterMock() def _queue_name(self, topic): - return "%s_%s" % (self.exchange_name, topic) + return "%s_%s" % (self.exchange, topic) def proxy_start_calls(self, calls, exc_type=mock.ANY): return [ @@ -119,7 +119,7 @@ class TestProxy(test.MockTestCase): def proxy(self, reset_master_mock=False, **kwargs): proxy_kwargs = dict(topic=self.topic, - exchange_name=self.exchange_name, + exchange=self.exchange, url=self.broker_url, type_handlers={}) proxy_kwargs.update(kwargs) @@ -134,7 +134,7 @@ class TestProxy(test.MockTestCase): master_mock_calls = [ mock.call.Connection(self.broker_url, transport=None, transport_options=None), - mock.call.Exchange(name=self.exchange_name, + mock.call.Exchange(name=self.exchange, durable=False, auto_delete=True) ] @@ -147,7 +147,7 @@ class TestProxy(test.MockTestCase): master_mock_calls = [ mock.call.Connection(self.broker_url, transport='memory', transport_options=transport_opts), - mock.call.Exchange(name=self.exchange_name, + mock.call.Exchange(name=self.exchange, durable=False, auto_delete=True) ] diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index 7da5b432..7a77e8df 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -86,7 +86,9 @@ class TestServer(test.MockTestCase): # check calls master_mock_calls = [ mock.call.Proxy(self.server_topic, self.server_exchange, - mock.ANY, url=self.broker_url, on_wait=mock.ANY) + mock.ANY, url=self.broker_url, + transport=mock.ANY, transport_options=mock.ANY, + retry_options=mock.ANY) ] self.master_mock.assert_has_calls(master_mock_calls) self.assertEqual(len(s._endpoints), 3) @@ -97,7 +99,9 @@ class TestServer(test.MockTestCase): # check calls master_mock_calls = [ mock.call.Proxy(self.server_topic, self.server_exchange, - mock.ANY, url=self.broker_url, on_wait=mock.ANY) + mock.ANY, url=self.broker_url, + transport=mock.ANY, transport_options=mock.ANY, + retry_options=mock.ANY) ] self.master_mock.assert_has_calls(master_mock_calls) self.assertEqual(len(s._endpoints), len(self.endpoints)) diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index c8ab9185..a572beb4 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -82,7 +82,8 @@ class TestWorker(test.MockTestCase): master_mock_calls = [ mock.call.executor_class(10), mock.call.Server(self.topic, self.exchange, - self.executor_inst_mock, [], url=self.broker_url) + self.executor_inst_mock, [], + url=self.broker_url) ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls)