From 442c7d4850fce3af591ff05ca4db1b0d06263e81 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 28 Apr 2014 16:16:17 -0700 Subject: [PATCH] Allow the WBE to use a preexisting executor A worker task executor is a good candidate for reuse since it maintains worker knowledge that is valuable to be retained across engine runs (tasks on which workers for example). In order for it to be reused we need a way for the WBE to be able to receive and reuse a previously existing executor. Change-Id: Ia9a8f4c544b74e12e2cbd6bd941945da1111499c --- doc/source/workers.rst | 7 +++++++ taskflow/engines/worker_based/engine.py | 21 +++++++++++---------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/doc/source/workers.rst b/doc/source/workers.rst index 01787712..83b92c6d 100644 --- a/doc/source/workers.rst +++ b/doc/source/workers.rst @@ -299,6 +299,12 @@ For complete parameters and object usage please see eng = taskflow.engines.load(flow, engine_conf=engine_conf) eng.run() +Additional supported keyword arguments: + +* ``executor``: a class that provides a + :py:class:`~taskflow.engines.worker_based.executor.WorkerTaskExecutor` + interface; it will be used for executing, reverting and waiting for remote tasks. + Limitations =========== @@ -327,3 +333,4 @@ Interfaces .. automodule:: taskflow.engines.worker_based.worker .. automodule:: taskflow.engines.worker_based.engine .. automodule:: taskflow.engines.worker_based.proxy +.. automodule:: taskflow.engines.worker_based.executor diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py index 0c7702e1..a552222c 100644 --- a/taskflow/engines/worker_based/engine.py +++ b/taskflow/engines/worker_based/engine.py @@ -37,16 +37,17 @@ class WorkerBasedActionEngine(engine.ActionEngine): _storage_cls = t_storage.SingleThreadedStorage def _task_executor_cls(self): - return executor.WorkerTaskExecutor(**self._executor_config) + if self._executor is not None: + return self._executor + return executor.WorkerTaskExecutor( + uuid=self._flow_detail.uuid, + url=self._conf.get('url'), + exchange=self._conf.get('exchange', 'default'), + topics=self._conf.get('topics', []), + transport=self._conf.get('transport'), + transport_options=self._conf.get('transport_options')) - def __init__(self, flow, flow_detail, backend, conf): - self._executor_config = { - 'uuid': flow_detail.uuid, - 'url': conf.get('url'), - 'exchange': conf.get('exchange', 'default'), - 'topics': conf.get('topics', []), - 'transport': conf.get('transport'), - 'transport_options': conf.get('transport_options') - } + def __init__(self, flow, flow_detail, backend, conf, **kwargs): super(WorkerBasedActionEngine, self).__init__( flow, flow_detail, backend, conf) + self._executor = kwargs.get('executor')