Allow the WBE to use a preexisting executor

A worker task executor is a good candidate for reuse since
it maintains worker knowledge that is valuable to be retained
across engine runs (tasks on which workers for example). In order
for it to be reused we need a way for the WBE to be able to
receive and reuse a previously existing executor.

Change-Id: Ia9a8f4c544b74e12e2cbd6bd941945da1111499c
This commit is contained in:
Joshua Harlow
2014-04-28 16:16:17 -07:00
committed by Thomas Goirand
parent fcc6dbdbc3
commit 442c7d4850
2 changed files with 18 additions and 10 deletions

View File

@@ -299,6 +299,12 @@ For complete parameters and object usage please see
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
eng.run()
Additional supported keyword arguments:
* ``executor``: a class that provides a
:py:class:`~taskflow.engines.worker_based.executor.WorkerTaskExecutor`
interface; it will be used for executing, reverting and waiting for remote tasks.
Limitations
===========
@@ -327,3 +333,4 @@ Interfaces
.. automodule:: taskflow.engines.worker_based.worker
.. automodule:: taskflow.engines.worker_based.engine
.. automodule:: taskflow.engines.worker_based.proxy
.. automodule:: taskflow.engines.worker_based.executor

View File

@@ -37,16 +37,17 @@ class WorkerBasedActionEngine(engine.ActionEngine):
_storage_cls = t_storage.SingleThreadedStorage
def _task_executor_cls(self):
return executor.WorkerTaskExecutor(**self._executor_config)
if self._executor is not None:
return self._executor
return executor.WorkerTaskExecutor(
uuid=self._flow_detail.uuid,
url=self._conf.get('url'),
exchange=self._conf.get('exchange', 'default'),
topics=self._conf.get('topics', []),
transport=self._conf.get('transport'),
transport_options=self._conf.get('transport_options'))
def __init__(self, flow, flow_detail, backend, conf):
self._executor_config = {
'uuid': flow_detail.uuid,
'url': conf.get('url'),
'exchange': conf.get('exchange', 'default'),
'topics': conf.get('topics', []),
'transport': conf.get('transport'),
'transport_options': conf.get('transport_options')
}
def __init__(self, flow, flow_detail, backend, conf, **kwargs):
super(WorkerBasedActionEngine, self).__init__(
flow, flow_detail, backend, conf)
self._executor = kwargs.get('executor')