Pass executor via kwargs instead of config
Breaking change: moves from taking an executor from configuration (it really is not configuration) and instead takes that executor instead from the additional kwargs which can be provided to a engine-specific type. Change-Id: I475f33a63ebd08f6c20a16534423c8bc3502fa3f
This commit is contained in:
parent
c379885f2e
commit
8ebce5b027
@ -126,7 +126,7 @@ the ``engine_conf`` parameter any helper factory function accepts. It may be:
|
||||
|
||||
* a string, naming engine type;
|
||||
* a dictionary, holding engine type with key ``'engine'`` and possibly
|
||||
type-specific engine parameters.
|
||||
type-specific engine configuration parameters.
|
||||
|
||||
Single-Threaded
|
||||
---------------
|
||||
@ -150,7 +150,7 @@ Parallel
|
||||
|
||||
Parallel engine schedules tasks onto different threads to run them in parallel.
|
||||
|
||||
Additional configuration parameters:
|
||||
Additional supported keyword arguments:
|
||||
|
||||
* ``executor``: a object that implements a :pep:`3148` compatible `executor`_
|
||||
interface; it will be used for scheduling tasks. You can use instances
|
||||
|
@ -204,7 +204,7 @@ class MultiThreadedActionEngine(ActionEngine):
|
||||
def _task_executor_cls(self):
|
||||
return executor.ParallelTaskExecutor(self._executor)
|
||||
|
||||
def __init__(self, flow, flow_detail, backend, conf):
|
||||
def __init__(self, flow, flow_detail, backend, conf, **kwargs):
|
||||
super(MultiThreadedActionEngine, self).__init__(
|
||||
flow, flow_detail, backend, conf)
|
||||
self._executor = conf.get('executor', None)
|
||||
self._executor = kwargs.get('executor')
|
||||
|
@ -57,7 +57,7 @@ def load(flow, store=None, flow_detail=None, book=None,
|
||||
Which engine to load is specified in 'engine_conf' parameter. It
|
||||
can be a string that names engine type or a dictionary which holds
|
||||
engine type (with 'engine' key) and additional engine-specific
|
||||
configuration (for example, executor for multithreaded engine).
|
||||
configuration.
|
||||
|
||||
Which storage backend to use is defined by backend parameter. It
|
||||
can be backend itself, or a dictionary that is passed to
|
||||
@ -119,7 +119,7 @@ def run(flow, store=None, flow_detail=None, book=None,
|
||||
Which engine to load is specified in 'engine_conf' parameter. It
|
||||
can be a string that names engine type or a dictionary which holds
|
||||
engine type (with 'engine' key) and additional engine-specific
|
||||
configuration (for example, executor for multithreaded engine).
|
||||
configuration.
|
||||
|
||||
Which storage backend to use is defined by backend parameter. It
|
||||
can be backend itself, or a dictionary that is passed to
|
||||
|
@ -529,11 +529,11 @@ class MultiThreadedEngineTest(EngineTaskTest,
|
||||
EngineCheckingTaskTest,
|
||||
test.TestCase):
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
engine_conf = dict(engine='parallel',
|
||||
executor=executor)
|
||||
engine_conf = dict(engine='parallel')
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
backend=self.backend)
|
||||
backend=self.backend,
|
||||
executor=executor)
|
||||
|
||||
def test_correct_load(self):
|
||||
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
|
||||
|
@ -133,8 +133,8 @@ class SingleThreadedEngineTest(ArgumentsPassingTest,
|
||||
class MultiThreadedEngineTest(ArgumentsPassingTest,
|
||||
test.TestCase):
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
engine_conf = dict(engine='parallel',
|
||||
executor=executor)
|
||||
engine_conf = dict(engine='parallel')
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
backend=self.backend)
|
||||
backend=self.backend,
|
||||
executor=executor)
|
||||
|
@ -768,8 +768,8 @@ class MultiThreadedEngineTest(RetryTest,
|
||||
RetryParallelExecutionTest,
|
||||
test.TestCase):
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
engine_conf = dict(engine='parallel',
|
||||
executor=executor)
|
||||
engine_conf = dict(engine='parallel')
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
backend=self.backend)
|
||||
backend=self.backend,
|
||||
executor=executor)
|
||||
|
@ -175,11 +175,11 @@ class SingleThreadedEngineTest(SuspendFlowTest,
|
||||
class MultiThreadedEngineTest(SuspendFlowTest,
|
||||
test.TestCase):
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
engine_conf = dict(engine='parallel',
|
||||
executor=executor)
|
||||
engine_conf = dict(engine='parallel')
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
backend=self.backend)
|
||||
backend=self.backend,
|
||||
executor=executor)
|
||||
|
||||
|
||||
@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available')
|
||||
@ -189,8 +189,8 @@ class ParallelEngineWithEventletTest(SuspendFlowTest,
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
if executor is None:
|
||||
executor = eu.GreenExecutor()
|
||||
engine_conf = dict(engine='parallel',
|
||||
executor=executor)
|
||||
engine_conf = dict(engine='parallel')
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
backend=self.backend)
|
||||
backend=self.backend,
|
||||
executor=executor)
|
||||
|
Loading…
x
Reference in New Issue
Block a user