diff --git a/doc/source/engines.rst b/doc/source/engines.rst index f9e2ad90..631d3d53 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -126,7 +126,7 @@ the ``engine_conf`` parameter any helper factory function accepts. It may be: * a string, naming engine type; * a dictionary, holding engine type with key ``'engine'`` and possibly - type-specific engine parameters. + type-specific engine configuration parameters. Single-Threaded --------------- @@ -150,7 +150,7 @@ Parallel Parallel engine schedules tasks onto different threads to run them in parallel. -Additional configuration parameters: +Additional supported keyword arguments: * ``executor``: a object that implements a :pep:`3148` compatible `executor`_ interface; it will be used for scheduling tasks. You can use instances diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 4f3d85c1..beb6536f 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -204,7 +204,7 @@ class MultiThreadedActionEngine(ActionEngine): def _task_executor_cls(self): return executor.ParallelTaskExecutor(self._executor) - def __init__(self, flow, flow_detail, backend, conf): + def __init__(self, flow, flow_detail, backend, conf, **kwargs): super(MultiThreadedActionEngine, self).__init__( flow, flow_detail, backend, conf) - self._executor = conf.get('executor', None) + self._executor = kwargs.get('executor') diff --git a/taskflow/engines/helpers.py b/taskflow/engines/helpers.py index c70c8f1e..2aeddabc 100644 --- a/taskflow/engines/helpers.py +++ b/taskflow/engines/helpers.py @@ -57,7 +57,7 @@ def load(flow, store=None, flow_detail=None, book=None, Which engine to load is specified in 'engine_conf' parameter. It can be a string that names engine type or a dictionary which holds engine type (with 'engine' key) and additional engine-specific - configuration (for example, executor for multithreaded engine). + configuration. Which storage backend to use is defined by backend parameter. It can be backend itself, or a dictionary that is passed to @@ -119,7 +119,7 @@ def run(flow, store=None, flow_detail=None, book=None, Which engine to load is specified in 'engine_conf' parameter. It can be a string that names engine type or a dictionary which holds engine type (with 'engine' key) and additional engine-specific - configuration (for example, executor for multithreaded engine). + configuration. Which storage backend to use is defined by backend parameter. It can be backend itself, or a dictionary that is passed to diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index d711a1c2..b6c6c894 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -529,11 +529,11 @@ class MultiThreadedEngineTest(EngineTaskTest, EngineCheckingTaskTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): - engine_conf = dict(engine='parallel', - executor=executor) + engine_conf = dict(engine='parallel') return taskflow.engines.load(flow, flow_detail=flow_detail, engine_conf=engine_conf, - backend=self.backend) + backend=self.backend, + executor=executor) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) diff --git a/taskflow/tests/unit/test_arguments_passing.py b/taskflow/tests/unit/test_arguments_passing.py index 0a038bd1..4e8d5bb6 100644 --- a/taskflow/tests/unit/test_arguments_passing.py +++ b/taskflow/tests/unit/test_arguments_passing.py @@ -133,8 +133,8 @@ class SingleThreadedEngineTest(ArgumentsPassingTest, class MultiThreadedEngineTest(ArgumentsPassingTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): - engine_conf = dict(engine='parallel', - executor=executor) + engine_conf = dict(engine='parallel') return taskflow.engines.load(flow, flow_detail=flow_detail, engine_conf=engine_conf, - backend=self.backend) + backend=self.backend, + executor=executor) diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index d9c60903..6953b376 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -768,8 +768,8 @@ class MultiThreadedEngineTest(RetryTest, RetryParallelExecutionTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): - engine_conf = dict(engine='parallel', - executor=executor) + engine_conf = dict(engine='parallel') return taskflow.engines.load(flow, flow_detail=flow_detail, engine_conf=engine_conf, - backend=self.backend) + backend=self.backend, + executor=executor) diff --git a/taskflow/tests/unit/test_suspend_flow.py b/taskflow/tests/unit/test_suspend_flow.py index eeda19f0..bb953449 100644 --- a/taskflow/tests/unit/test_suspend_flow.py +++ b/taskflow/tests/unit/test_suspend_flow.py @@ -175,11 +175,11 @@ class SingleThreadedEngineTest(SuspendFlowTest, class MultiThreadedEngineTest(SuspendFlowTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): - engine_conf = dict(engine='parallel', - executor=executor) + engine_conf = dict(engine='parallel') return taskflow.engines.load(flow, flow_detail=flow_detail, engine_conf=engine_conf, - backend=self.backend) + backend=self.backend, + executor=executor) @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') @@ -189,8 +189,8 @@ class ParallelEngineWithEventletTest(SuspendFlowTest, def _make_engine(self, flow, flow_detail=None, executor=None): if executor is None: executor = eu.GreenExecutor() - engine_conf = dict(engine='parallel', - executor=executor) + engine_conf = dict(engine='parallel') return taskflow.engines.load(flow, flow_detail=flow_detail, engine_conf=engine_conf, - backend=self.backend) + backend=self.backend, + executor=executor)