diff --git a/doc/source/workers.rst b/doc/source/workers.rst index 01787712..83b92c6d 100644 --- a/doc/source/workers.rst +++ b/doc/source/workers.rst @@ -299,6 +299,12 @@ For complete parameters and object usage please see eng = taskflow.engines.load(flow, engine_conf=engine_conf) eng.run() +Additional supported keyword arguments: + +* ``executor``: a class that provides a + :py:class:`~taskflow.engines.worker_based.executor.WorkerTaskExecutor` + interface; it will be used for executing, reverting and waiting for remote tasks. + Limitations =========== @@ -327,3 +333,4 @@ Interfaces .. automodule:: taskflow.engines.worker_based.worker .. automodule:: taskflow.engines.worker_based.engine .. automodule:: taskflow.engines.worker_based.proxy +.. automodule:: taskflow.engines.worker_based.executor diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py index 0c7702e1..a552222c 100644 --- a/taskflow/engines/worker_based/engine.py +++ b/taskflow/engines/worker_based/engine.py @@ -37,16 +37,17 @@ class WorkerBasedActionEngine(engine.ActionEngine): _storage_cls = t_storage.SingleThreadedStorage def _task_executor_cls(self): - return executor.WorkerTaskExecutor(**self._executor_config) + if self._executor is not None: + return self._executor + return executor.WorkerTaskExecutor( + uuid=self._flow_detail.uuid, + url=self._conf.get('url'), + exchange=self._conf.get('exchange', 'default'), + topics=self._conf.get('topics', []), + transport=self._conf.get('transport'), + transport_options=self._conf.get('transport_options')) - def __init__(self, flow, flow_detail, backend, conf): - self._executor_config = { - 'uuid': flow_detail.uuid, - 'url': conf.get('url'), - 'exchange': conf.get('exchange', 'default'), - 'topics': conf.get('topics', []), - 'transport': conf.get('transport'), - 'transport_options': conf.get('transport_options') - } + def __init__(self, flow, flow_detail, backend, conf, **kwargs): super(WorkerBasedActionEngine, self).__init__( flow, flow_detail, backend, conf) + self._executor = kwargs.get('executor')