diff --git a/doc/source/engines.rst b/doc/source/engines.rst index be19aa45..e3680672 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -118,9 +118,9 @@ might look like:: ... flow = make_flow() - engine = engines.load(flow, engine_conf=my_conf, - backend=my_persistence_conf) - engine.run + eng = engines.load(flow, engine='serial', backend=my_persistence_conf) + eng.run() + ... .. automodule:: taskflow.engines.helpers @@ -129,11 +129,8 @@ Usage ===== To select which engine to use and pass parameters to an engine you should use -the ``engine_conf`` parameter any helper factory function accepts. It may be: - -* A string, naming the engine type. -* A dictionary, naming engine type with key ``'engine'`` and possibly - type-specific engine configuration parameters. +the ``engine`` parameter any engine helper function accepts and for any engine +specific options use the ``kwargs`` parameter. Types ===== diff --git a/doc/source/workers.rst b/doc/source/workers.rst index 9c2f2b9c..caac6aa0 100644 --- a/doc/source/workers.rst +++ b/doc/source/workers.rst @@ -273,32 +273,26 @@ For complete parameters and object usage please see .. code:: python - engine_conf = { - 'engine': 'worker-based', - 'url': 'amqp://guest:guest@localhost:5672//', - 'exchange': 'test-exchange', - 'topics': ['topic1', 'topic2'], - } flow = lf.Flow('simple-linear').add(...) - eng = taskflow.engines.load(flow, engine_conf=engine_conf) + eng = taskflow.engines.load(flow, engine='worker-based', + url='amqp://guest:guest@localhost:5672//', + exchange='test-exchange', + topics=['topic1', 'topic2']) eng.run() **Example with filesystem transport:** .. code:: python - engine_conf = { - 'engine': 'worker-based', - 'exchange': 'test-exchange', - 'topics': ['topic1', 'topic2'], - 'transport': 'filesystem', - 'transport_options': { - 'data_folder_in': '/tmp/test', - 'data_folder_out': '/tmp/test', - }, - } flow = lf.Flow('simple-linear').add(...) - eng = taskflow.engines.load(flow, engine_conf=engine_conf) + eng = taskflow.engines.load(flow, engine='worker-based', + exchange='test-exchange', + topics=['topic1', 'topic2'], + transport='filesystem', + transport_options={ + 'data_folder_in': '/tmp/in', + 'data_folder_out': '/tmp/out', + }) eng.run() Additional supported keyword arguments: diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index c881346e..f7546c3e 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -17,7 +17,7 @@ import threading import six -import taskflow.engines +from taskflow import engines from taskflow import exceptions as excp from taskflow.utils import lock_utils @@ -34,10 +34,15 @@ class Conductor(object): period of time will finish up the prior failed conductors work. """ - def __init__(self, name, jobboard, engine_conf, persistence): + def __init__(self, name, jobboard, persistence, + engine=None, engine_options=None): self._name = name self._jobboard = jobboard - self._engine_conf = engine_conf + self._engine = engine + if not engine_options: + self._engine_options = {} + else: + self._engine_options = engine_options.copy() self._persistence = persistence self._lock = threading.RLock() @@ -83,10 +88,10 @@ class Conductor(object): store = dict(job.details["store"]) else: store = {} - return taskflow.engines.load_from_detail(flow_detail, - store=store, - engine_conf=self._engine_conf, - backend=self._persistence) + return engines.load_from_detail(flow_detail, store=store, + engine=self._engine, + backend=self._persistence, + **self._engine_options) @lock_utils.locked def connect(self): diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py index 23994e79..84038ef1 100644 --- a/taskflow/conductors/single_threaded.py +++ b/taskflow/conductors/single_threaded.py @@ -51,11 +51,11 @@ class SingleThreadedConductor(base.Conductor): upon the jobboard capabilities to automatically abandon these jobs. """ - def __init__(self, name, jobboard, engine_conf, persistence, - wait_timeout=None): - super(SingleThreadedConductor, self).__init__(name, jobboard, - engine_conf, - persistence) + def __init__(self, name, jobboard, persistence, + engine=None, engine_options=None, wait_timeout=None): + super(SingleThreadedConductor, self).__init__( + name, jobboard, persistence, + engine=engine, engine_options=engine_options) if wait_timeout is None: wait_timeout = WAIT_TIMEOUT if isinstance(wait_timeout, (int, float) + six.string_types): diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 69f05bd3..22db3ed7 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -59,8 +59,8 @@ class ActionEngine(base.EngineBase): _compiler_factory = compiler.PatternCompiler _task_executor_factory = executor.SerialTaskExecutor - def __init__(self, flow, flow_detail, backend, conf): - super(ActionEngine, self).__init__(flow, flow_detail, backend, conf) + def __init__(self, flow, flow_detail, backend, options): + super(ActionEngine, self).__init__(flow, flow_detail, backend, options) self._runtime = None self._compiled = False self._compilation = None @@ -230,12 +230,6 @@ class ParallelActionEngine(ActionEngine): _storage_factory = atom_storage.MultiThreadedStorage def _task_executor_factory(self): - return executor.ParallelTaskExecutor(executor=self._executor, - max_workers=self._max_workers) - - def __init__(self, flow, flow_detail, backend, conf, - executor=None, max_workers=None): - super(ParallelActionEngine, self).__init__(flow, flow_detail, - backend, conf) - self._executor = executor - self._max_workers = max_workers + return executor.ParallelTaskExecutor( + executor=self._options.get('executor'), + max_workers=self._options.get('max_workers')) diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index 4bfcbabc..63389fe0 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -32,17 +32,22 @@ class EngineBase(object): occur related to the tasks the engine contains. """ - def __init__(self, flow, flow_detail, backend, conf): + def __init__(self, flow, flow_detail, backend, options): self._flow = flow self._flow_detail = flow_detail self._backend = backend - if not conf: - self._conf = {} + if not options: + self._options = {} else: - self._conf = dict(conf) + self._options = dict(options) self.notifier = misc.Notifier() self.task_notifier = misc.Notifier() + @property + def options(self): + """The options that were passed to this engine on construction.""" + return self._options + @misc.cachedproperty def storage(self): """The storage unit for this flow.""" diff --git a/taskflow/engines/helpers.py b/taskflow/engines/helpers.py index bfbaaa53..a6236591 100644 --- a/taskflow/engines/helpers.py +++ b/taskflow/engines/helpers.py @@ -15,6 +15,7 @@ # under the License. import contextlib +import warnings from oslo.utils import importutils import six @@ -30,6 +31,40 @@ from taskflow.utils import reflection # NOTE(imelnikov): this is the entrypoint namespace, not the module namespace. ENGINES_NAMESPACE = 'taskflow.engines' +# The default entrypoint engine type looked for when it is not provided. +ENGINE_DEFAULT = 'default' + + +def _extract_engine(**kwargs): + """Extracts the engine kind and any associated options.""" + options = {} + kind = kwargs.pop('engine', None) + engine_conf = kwargs.pop('engine_conf', None) + if engine_conf is not None: + warnings.warn("Using the 'engine_conf' argument is" + " deprecated and will be removed in a future version," + " please use the 'engine' argument instead.", + DeprecationWarning) + if isinstance(engine_conf, six.string_types): + kind = engine_conf + else: + options.update(engine_conf) + kind = options.pop('engine', None) + if not kind: + kind = ENGINE_DEFAULT + # See if it's a URI and if so, extract any further options... + try: + pieces = misc.parse_uri(kind) + except (TypeError, ValueError): + pass + else: + kind = pieces['scheme'] + options = misc.merge_uri(pieces, options.copy()) + # Merge in any leftover **kwargs into the options, this makes it so that + # the provided **kwargs override any URI or engine_conf specific options. + options.update(kwargs) + return (kind, options) + def _fetch_factory(factory_name): try: @@ -56,49 +91,43 @@ def _fetch_validate_factory(flow_factory): def load(flow, store=None, flow_detail=None, book=None, - engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE, - **kwargs): + engine_conf=None, backend=None, + namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT, **kwargs): """Load a flow into an engine. - This function creates and prepares engine to run the - flow. All that is left is to run the engine with 'run()' method. + This function creates and prepares an engine to run the provided flow. All + that is left after this returns is to run the engine with the + engines ``run()`` method. - Which engine to load is specified in 'engine_conf' parameter. It - can be a string that names engine type or a dictionary which holds - engine type (with 'engine' key) and additional engine-specific - configuration. + Which engine to load is specified via the ``engine`` parameter. It + can be a string that names the engine type to use, or a string that + is a URI with a scheme that names the engine type to use and further + options contained in the URI's host, port, and query parameters... - Which storage backend to use is defined by backend parameter. It + Which storage backend to use is defined by the backend parameter. It can be backend itself, or a dictionary that is passed to - taskflow.persistence.backends.fetch to obtain backend. + ``taskflow.persistence.backends.fetch()`` to obtain a viable backend. :param flow: flow to load :param store: dict -- data to put to storage to satisfy flow requirements :param flow_detail: FlowDetail that holds the state of the flow (if one is not provided then one will be created for you in the provided backend) :param book: LogBook to create flow detail in if flow_detail is None - :param engine_conf: engine type and configuration configuration - :param backend: storage backend to use or configuration - :param namespace: driver namespace for stevedore (default is fine - if you don't know what is it) + :param engine_conf: engine type or URI and options (**deprecated**) + :param backend: storage backend to use or configuration that defines it + :param namespace: driver namespace for stevedore (or empty for default) + :param engine: string engine type or URI string with scheme that contains + the engine type and any URI specific components that will + become part of the engine options. + :param kwargs: arbitrary keyword arguments passed as options (merged with + any extracted ``engine`` and ``engine_conf`` options), + typically used for any engine specific options that do not + fit as any of the existing arguments. :returns: engine """ - if engine_conf is None: - engine_conf = {'engine': 'default'} - - # NOTE(imelnikov): this allows simpler syntax. - if isinstance(engine_conf, six.string_types): - engine_conf = {'engine': engine_conf} - - engine_name = engine_conf['engine'] - try: - pieces = misc.parse_uri(engine_name) - except (TypeError, ValueError): - pass - else: - engine_name = pieces['scheme'] - engine_conf = misc.merge_uri(pieces, engine_conf.copy()) + kind, options = _extract_engine(engine_conf=engine_conf, + engine=engine, **kwargs) if isinstance(backend, dict): backend = p_backends.fetch(backend) @@ -109,13 +138,12 @@ def load(flow, store=None, flow_detail=None, book=None, try: mgr = stevedore.driver.DriverManager( - namespace, engine_name, + namespace, kind, invoke_on_load=True, - invoke_args=(flow, flow_detail, backend, engine_conf), - invoke_kwds=kwargs) + invoke_args=(flow, flow_detail, backend, options)) engine = mgr.driver except RuntimeError as e: - raise exc.NotFound("Could not find engine %s" % (engine_name), e) + raise exc.NotFound("Could not find engine '%s'" % (kind), e) else: if store: engine.storage.inject(store) @@ -123,35 +151,20 @@ def load(flow, store=None, flow_detail=None, book=None, def run(flow, store=None, flow_detail=None, book=None, - engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE, **kwargs): + engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE, + engine=ENGINE_DEFAULT, **kwargs): """Run the flow. - This function load the flow into engine (with 'load' function) - and runs the engine. + This function loads the flow into an engine (with the :func:`load() ` + function) and runs the engine. - Which engine to load is specified in 'engine_conf' parameter. It - can be a string that names engine type or a dictionary which holds - engine type (with 'engine' key) and additional engine-specific - configuration. + The arguments are interpreted as for :func:`load() `. - Which storage backend to use is defined by backend parameter. It - can be backend itself, or a dictionary that is passed to - taskflow.persistence.backends.fetch to obtain backend. - - :param flow: flow to run - :param store: dict -- data to put to storage to satisfy flow requirements - :param flow_detail: FlowDetail that holds the state of the flow (if one is - not provided then one will be created for you in the provided backend) - :param book: LogBook to create flow detail in if flow_detail is None - :param engine_conf: engine type and configuration configuration - :param backend: storage backend to use or configuration - :param namespace: driver namespace for stevedore (default is fine - if you don't know what is it) - :returns: dictionary of all named task results (see Storage.fetch_all) + :returns: dictionary of all named results (see ``storage.fetch_all()``) """ engine = load(flow, store=store, flow_detail=flow_detail, book=book, engine_conf=engine_conf, backend=backend, - namespace=namespace, **kwargs) + namespace=namespace, engine=engine, **kwargs) engine.run() return engine.storage.fetch_all() @@ -196,23 +209,21 @@ def save_factory_details(flow_detail, def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None, store=None, book=None, engine_conf=None, backend=None, - namespace=ENGINES_NAMESPACE, **kwargs): + namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT, + **kwargs): """Loads a flow from a factory function into an engine. Gets flow factory function (or name of it) and creates flow with - it. Then, flow is loaded into engine with load(), and factory - function fully qualified name is saved to flow metadata so that - it can be later resumed with resume. + it. Then, the flow is loaded into an engine with the :func:`load() ` + function, and the factory function fully qualified name is saved to flow + metadata so that it can be later resumed. :param flow_factory: function or string: function that creates the flow :param factory_args: list or tuple of factory positional arguments :param factory_kwargs: dict of factory keyword arguments - :param store: dict -- data to put to storage to satisfy flow requirements - :param book: LogBook to create flow detail in - :param engine_conf: engine type and configuration configuration - :param backend: storage backend to use or configuration - :param namespace: driver namespace for stevedore (default is fine - if you don't know what is it) + + Further arguments are interpreted as for :func:`load() `. + :returns: engine """ @@ -230,7 +241,7 @@ def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None, backend=backend) return load(flow=flow, store=store, flow_detail=flow_detail, book=book, engine_conf=engine_conf, backend=backend, namespace=namespace, - **kwargs) + engine=engine, **kwargs) def flow_from_detail(flow_detail): @@ -261,21 +272,21 @@ def flow_from_detail(flow_detail): def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None, - namespace=ENGINES_NAMESPACE, **kwargs): + namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT, + **kwargs): """Reloads an engine previously saved. - This reloads the flow using the flow_from_detail() function and then calls - into the load() function to create an engine from that flow. + This reloads the flow using the + :func:`flow_from_detail() ` function and then calls + into the :func:`load() ` function to create an engine from that flow. :param flow_detail: FlowDetail that holds state of the flow to load - :param store: dict -- data to put to storage to satisfy flow requirements - :param engine_conf: engine type and configuration configuration - :param backend: storage backend to use or configuration - :param namespace: driver namespace for stevedore (default is fine - if you don't know what is it) + + Further arguments are interpreted as for :func:`load() `. + :returns: engine """ flow = flow_from_detail(flow_detail) return load(flow, flow_detail=flow_detail, store=store, engine_conf=engine_conf, backend=backend, - namespace=namespace, **kwargs) + namespace=namespace, engine=engine, **kwargs) diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py index 0c1b8ead..aefce23f 100644 --- a/taskflow/engines/worker_based/engine.py +++ b/taskflow/engines/worker_based/engine.py @@ -23,7 +23,7 @@ from taskflow import storage as t_storage class WorkerBasedActionEngine(engine.ActionEngine): """Worker based action engine. - Specific backend configuration: + Specific backend options: :param exchange: broker exchange exchange name in which executor / worker communication is performed @@ -45,19 +45,15 @@ class WorkerBasedActionEngine(engine.ActionEngine): _storage_factory = t_storage.SingleThreadedStorage def _task_executor_factory(self): - if self._executor is not None: - return self._executor - return executor.WorkerTaskExecutor( - uuid=self._flow_detail.uuid, - url=self._conf.get('url'), - exchange=self._conf.get('exchange', 'default'), - topics=self._conf.get('topics', []), - transport=self._conf.get('transport'), - transport_options=self._conf.get('transport_options'), - transition_timeout=self._conf.get('transition_timeout', - pr.REQUEST_TIMEOUT)) - - def __init__(self, flow, flow_detail, backend, conf, **kwargs): - super(WorkerBasedActionEngine, self).__init__( - flow, flow_detail, backend, conf) - self._executor = kwargs.get('executor') + try: + return self._options['executor'] + except KeyError: + return executor.WorkerTaskExecutor( + uuid=self._flow_detail.uuid, + url=self._options.get('url'), + exchange=self._options.get('exchange', 'default'), + topics=self._options.get('topics', []), + transport=self._options.get('transport'), + transport_options=self._options.get('transport_options'), + transition_timeout=self._options.get('transition_timeout', + pr.REQUEST_TIMEOUT)) diff --git a/taskflow/examples/calculate_in_parallel.py b/taskflow/examples/calculate_in_parallel.py index 0215f956..7ab32fae 100644 --- a/taskflow/examples/calculate_in_parallel.py +++ b/taskflow/examples/calculate_in_parallel.py @@ -93,5 +93,5 @@ flow = lf.Flow('root').add( # The result here will be all results (from all tasks) which is stored in an # in-memory storage location that backs this engine since it is not configured # with persistence storage. -result = taskflow.engines.run(flow, engine_conf='parallel') +result = taskflow.engines.run(flow, engine='parallel') print(result) diff --git a/taskflow/examples/create_parallel_volume.py b/taskflow/examples/create_parallel_volume.py index de511adf..5185330b 100644 --- a/taskflow/examples/create_parallel_volume.py +++ b/taskflow/examples/create_parallel_volume.py @@ -64,13 +64,9 @@ VOLUME_COUNT = 5 # time difference that this causes. SERIAL = False if SERIAL: - engine_conf = { - 'engine': 'serial', - } + engine = 'serial' else: - engine_conf = { - 'engine': 'parallel', - } + engine = 'parallel' class VolumeCreator(task.Task): @@ -106,7 +102,7 @@ for i in range(0, VOLUME_COUNT): # Show how much time the overall engine loading and running takes. with show_time(name=flow.name.title()): - eng = engines.load(flow, engine_conf=engine_conf) + eng = engines.load(flow, engine=engine) # This context manager automatically adds (and automatically removes) a # helpful set of state transition notification printing helper utilities # that show you exactly what transitions the engine is going through diff --git a/taskflow/examples/delayed_return.py b/taskflow/examples/delayed_return.py index 46578621..bc44a897 100644 --- a/taskflow/examples/delayed_return.py +++ b/taskflow/examples/delayed_return.py @@ -74,7 +74,7 @@ class Bye(task.Task): def return_from_flow(pool): wf = lf.Flow("root").add(Hi("hi"), Bye("bye")) - eng = taskflow.engines.load(wf, engine_conf='serial') + eng = taskflow.engines.load(wf, engine='serial') f = futures.Future() watcher = PokeFutureListener(eng, f, 'hi') watcher.register() diff --git a/taskflow/examples/fake_billing.py b/taskflow/examples/fake_billing.py index 9a421f92..ac15dbae 100644 --- a/taskflow/examples/fake_billing.py +++ b/taskflow/examples/fake_billing.py @@ -170,7 +170,7 @@ flow.add(sub_flow) store = { 'request': misc.AttrDict(user="bob", id="1.35"), } -eng = engines.load(flow, engine_conf='serial', store=store) +eng = engines.load(flow, engine='serial', store=store) # This context manager automatically adds (and automatically removes) a # helpful set of state transition notification printing helper utilities diff --git a/taskflow/examples/graph_flow.py b/taskflow/examples/graph_flow.py index 99dfdd45..9f28dc71 100644 --- a/taskflow/examples/graph_flow.py +++ b/taskflow/examples/graph_flow.py @@ -81,11 +81,11 @@ store = { } result = taskflow.engines.run( - flow, engine_conf='serial', store=store) + flow, engine='serial', store=store) print("Single threaded engine result %s" % result) result = taskflow.engines.run( - flow, engine_conf='parallel', store=store) + flow, engine='parallel', store=store) print("Multi threaded engine result %s" % result) diff --git a/taskflow/examples/persistence_example.py b/taskflow/examples/persistence_example.py index 720914cd..fe5968fe 100644 --- a/taskflow/examples/persistence_example.py +++ b/taskflow/examples/persistence_example.py @@ -91,20 +91,15 @@ else: blowup = True with eu.get_backend(backend_uri) as backend: - # Now we can run. - engine_config = { - 'backend': backend, - 'engine_conf': 'serial', - 'book': logbook.LogBook("my-test"), - } - # Make a flow that will blowup if the file doesn't exist previously, if it # did exist, assume we won't blowup (and therefore this shows the undo # and redo that a flow will go through). + book = logbook.LogBook("my-test") flow = make_flow(blowup=blowup) eu.print_wrapped("Running") try: - eng = engines.load(flow, **engine_config) + eng = engines.load(flow, engine='serial', + backend=backend, book=book) eng.run() if not blowup: eu.rm_path(persist_path) @@ -115,4 +110,4 @@ with eu.get_backend(backend_uri) as backend: traceback.print_exc(file=sys.stdout) eu.print_wrapped("Book contents") - print(p_utils.pformat(engine_config['book'])) + print(p_utils.pformat(book)) diff --git a/taskflow/examples/resume_vm_boot.py b/taskflow/examples/resume_vm_boot.py index acdf42b5..203cb882 100644 --- a/taskflow/examples/resume_vm_boot.py +++ b/taskflow/examples/resume_vm_boot.py @@ -235,11 +235,9 @@ with eu.get_backend() as backend: flow_id = None # Set up how we want our engine to run, serial, parallel... - engine_conf = { - 'engine': 'parallel', - } + executor = None if e_utils.EVENTLET_AVAILABLE: - engine_conf['executor'] = e_utils.GreenExecutor(5) + executor = e_utils.GreenExecutor(5) # Create/fetch a logbook that will track the workflows work. book = None @@ -255,15 +253,15 @@ with eu.get_backend() as backend: book = p_utils.temporary_log_book(backend) engine = engines.load_from_factory(create_flow, backend=backend, book=book, - engine_conf=engine_conf) + engine='parallel', + executor=executor) print("!! Your tracking id is: '%s+%s'" % (book.uuid, engine.storage.flow_uuid)) print("!! Please submit this on later runs for tracking purposes") else: # Attempt to load from a previously partially completed flow. - engine = engines.load_from_detail(flow_detail, - backend=backend, - engine_conf=engine_conf) + engine = engines.load_from_detail(flow_detail, backend=backend, + engine='parallel', executor=executor) # Make me my vm please! eu.print_wrapped('Running') diff --git a/taskflow/examples/resume_volume_create.py b/taskflow/examples/resume_volume_create.py index 0fe502e4..275fa6b8 100644 --- a/taskflow/examples/resume_volume_create.py +++ b/taskflow/examples/resume_volume_create.py @@ -143,13 +143,9 @@ with example_utils.get_backend() as backend: flow_detail = find_flow_detail(backend, book_id, flow_id) # Load and run. - engine_conf = { - 'engine': 'serial', - } engine = engines.load(flow, flow_detail=flow_detail, - backend=backend, - engine_conf=engine_conf) + backend=backend, engine='serial') engine.run() # How to use. diff --git a/taskflow/examples/wbe_simple_linear.py b/taskflow/examples/wbe_simple_linear.py index bfec2d86..a15b48fa 100644 --- a/taskflow/examples/wbe_simple_linear.py +++ b/taskflow/examples/wbe_simple_linear.py @@ -69,19 +69,16 @@ WORKER_CONF = { 'taskflow.tests.utils:TaskMultiArgOneReturn' ], } -ENGINE_CONF = { - 'engine': 'worker-based', -} -def run(engine_conf): +def run(engine_options): flow = lf.Flow('simple-linear').add( utils.TaskOneArgOneReturn(provides='result1'), utils.TaskMultiArgOneReturn(provides='result2') ) eng = engines.load(flow, store=dict(x=111, y=222, z=333), - engine_conf=engine_conf) + engine='worker-based', **engine_options) eng.run() return eng.storage.fetch_all() @@ -115,8 +112,7 @@ if __name__ == "__main__": }) worker_conf = dict(WORKER_CONF) worker_conf.update(shared_conf) - engine_conf = dict(ENGINE_CONF) - engine_conf.update(shared_conf) + engine_options = dict(shared_conf) workers = [] worker_topics = [] @@ -135,8 +131,8 @@ if __name__ == "__main__": # Now use those workers to do something. print('Executing some work.') - engine_conf['topics'] = worker_topics - result = run(engine_conf) + engine_options['topics'] = worker_topics + result = run(engine_options) print('Execution finished.') # This is done so that the test examples can work correctly # even when the keys change order (which will happen in various diff --git a/taskflow/examples/wrapped_exception.py b/taskflow/examples/wrapped_exception.py index 7679a150..dff6b2b4 100644 --- a/taskflow/examples/wrapped_exception.py +++ b/taskflow/examples/wrapped_exception.py @@ -93,7 +93,7 @@ def run(**store): try: with utils.wrap_all_failures(): taskflow.engines.run(flow, store=store, - engine_conf='parallel') + engine='parallel') except exceptions.WrappedFailure as ex: unknown_failures = [] for failure in ex: diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py index b43ba035..216fa387 100644 --- a/taskflow/tests/unit/conductor/test_conductor.py +++ b/taskflow/tests/unit/conductor/test_conductor.py @@ -63,11 +63,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): board = impl_zookeeper.ZookeeperJobBoard(name, {}, client=client, persistence=persistence) - engine_conf = { - 'engine': 'default', - } - conductor = stc.SingleThreadedConductor(name, board, engine_conf, - persistence, wait_timeout) + conductor = stc.SingleThreadedConductor(name, board, persistence, + wait_timeout=wait_timeout) return misc.AttrDict(board=board, client=client, persistence=persistence, diff --git a/taskflow/tests/unit/test_arguments_passing.py b/taskflow/tests/unit/test_arguments_passing.py index 5e9fc3a8..fb4744bd 100644 --- a/taskflow/tests/unit/test_arguments_passing.py +++ b/taskflow/tests/unit/test_arguments_passing.py @@ -155,15 +155,14 @@ class SingleThreadedEngineTest(ArgumentsPassingTest, def _make_engine(self, flow, flow_detail=None): return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf='serial', + engine='serial', backend=self.backend) class MultiThreadedEngineTest(ArgumentsPassingTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): - engine_conf = dict(engine='parallel') return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf=engine_conf, + engine='parallel', backend=self.backend, executor=executor) diff --git a/taskflow/tests/unit/test_engine_helpers.py b/taskflow/tests/unit/test_engine_helpers.py index 2353d77d..4087d839 100644 --- a/taskflow/tests/unit/test_engine_helpers.py +++ b/taskflow/tests/unit/test_engine_helpers.py @@ -24,17 +24,30 @@ from taskflow.utils import persistence_utils as p_utils class EngineLoadingTestCase(test.TestCase): - def test_default_load(self): + def _make_dummy_flow(self): f = linear_flow.Flow('test') f.add(test_utils.TaskOneReturn("run-1")) + return f + + def test_default_load(self): + f = self._make_dummy_flow() e = taskflow.engines.load(f) self.assertIsNotNone(e) def test_unknown_load(self): - f = linear_flow.Flow('test') - f.add(test_utils.TaskOneReturn("run-1")) + f = self._make_dummy_flow() self.assertRaises(exc.NotFound, taskflow.engines.load, f, - engine_conf='not_really_any_engine') + engine='not_really_any_engine') + + def test_options_empty(self): + f = self._make_dummy_flow() + e = taskflow.engines.load(f) + self.assertEqual({}, e.options) + + def test_options_passthrough(self): + f = self._make_dummy_flow() + e = taskflow.engines.load(f, pass_1=1, pass_2=2) + self.assertEqual({'pass_1': 1, 'pass_2': 2}, e.options) class FlowFromDetailTestCase(test.TestCase): diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 243635fb..4ce291d1 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -550,7 +550,7 @@ class SingleThreadedEngineTest(EngineTaskTest, def _make_engine(self, flow, flow_detail=None): return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf='serial', + engine='serial', backend=self.backend) def test_correct_load(self): @@ -570,16 +570,14 @@ class MultiThreadedEngineTest(EngineTaskTest, EngineCheckingTaskTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): - engine_conf = dict(engine='parallel') return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf=engine_conf, backend=self.backend, - executor=executor) + executor=executor, + engine='parallel') def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) self.assertIsInstance(engine, eng.ParallelActionEngine) - self.assertIs(engine._executor, None) def test_using_common_executor(self): flow = utils.TaskNoRequiresNoReturns(name='task1') @@ -587,7 +585,7 @@ class MultiThreadedEngineTest(EngineTaskTest, try: e1 = self._make_engine(flow, executor=executor) e2 = self._make_engine(flow, executor=executor) - self.assertIs(e1._executor, e2._executor) + self.assertIs(e1.options['executor'], e2.options['executor']) finally: executor.shutdown(wait=True) @@ -604,11 +602,9 @@ class ParallelEngineWithEventletTest(EngineTaskTest, def _make_engine(self, flow, flow_detail=None, executor=None): if executor is None: executor = eu.GreenExecutor() - engine_conf = dict(engine='parallel', - executor=executor) return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf=engine_conf, - backend=self.backend) + backend=self.backend, engine='parallel', + executor=executor) class WorkerBasedEngineTest(EngineTaskTest, @@ -647,15 +643,12 @@ class WorkerBasedEngineTest(EngineTaskTest, super(WorkerBasedEngineTest, self).tearDown() def _make_engine(self, flow, flow_detail=None): - engine_conf = { - 'engine': 'worker-based', - 'exchange': self.exchange, - 'topics': [self.topic], - 'transport': self.transport, - } return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf=engine_conf, - backend=self.backend) + backend=self.backend, + engine='worker-based', + exchange=self.exchange, + topics=[self.topic], + transport=self.transport) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index fb1b3802..54400435 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -758,7 +758,7 @@ class SingleThreadedEngineTest(RetryTest, def _make_engine(self, flow, flow_detail=None): return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf='serial', + engine='serial', backend=self.backend) @@ -766,8 +766,7 @@ class MultiThreadedEngineTest(RetryTest, RetryParallelExecutionTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): - engine_conf = dict(engine='parallel') return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf=engine_conf, + engine='parallel', backend=self.backend, executor=executor) diff --git a/taskflow/tests/unit/test_suspend_flow.py b/taskflow/tests/unit/test_suspend_flow.py index bb953449..7b9875c9 100644 --- a/taskflow/tests/unit/test_suspend_flow.py +++ b/taskflow/tests/unit/test_suspend_flow.py @@ -168,16 +168,15 @@ class SingleThreadedEngineTest(SuspendFlowTest, def _make_engine(self, flow, flow_detail=None): return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf='serial', + engine='serial', backend=self.backend) class MultiThreadedEngineTest(SuspendFlowTest, test.TestCase): def _make_engine(self, flow, flow_detail=None, executor=None): - engine_conf = dict(engine='parallel') return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf=engine_conf, + engine='parallel', backend=self.backend, executor=executor) @@ -189,8 +188,7 @@ class ParallelEngineWithEventletTest(SuspendFlowTest, def _make_engine(self, flow, flow_detail=None, executor=None): if executor is None: executor = eu.GreenExecutor() - engine_conf = dict(engine='parallel') return taskflow.engines.load(flow, flow_detail=flow_detail, - engine_conf=engine_conf, + engine='parallel', backend=self.backend, executor=executor)