Deprecate engine_conf and prefer engine instead
To avoid having one set of options coming from `engine_conf` and another set of options coming from `kwargs` and another set coming from `engine_conf` if it is a URI just start to shift toward `engine_conf` being deprecated and `engine` being a string type only (or a URI with additional query parameters) and having any additional **kwargs that are provided just get merged into the final engine options. This adds a new helper function that handles all these various options and adds in a keyword argument `engine` that will be shifted to in a future version (in that future version we can also then remove the `engine_conf` and just stick to a smaller set of option mechanisms). It also adjusts all examples to use this new and more easier to understand format and adjusts tests, conductor interface to use this new more easily understandable style of getting an engine. Change-Id: Ic7617057338e0c63775cf38a24643cff6e454950
This commit is contained in:
@@ -118,9 +118,9 @@ might look like::
|
||||
|
||||
...
|
||||
flow = make_flow()
|
||||
engine = engines.load(flow, engine_conf=my_conf,
|
||||
backend=my_persistence_conf)
|
||||
engine.run
|
||||
eng = engines.load(flow, engine='serial', backend=my_persistence_conf)
|
||||
eng.run()
|
||||
...
|
||||
|
||||
|
||||
.. automodule:: taskflow.engines.helpers
|
||||
@@ -129,11 +129,8 @@ Usage
|
||||
=====
|
||||
|
||||
To select which engine to use and pass parameters to an engine you should use
|
||||
the ``engine_conf`` parameter any helper factory function accepts. It may be:
|
||||
|
||||
* A string, naming the engine type.
|
||||
* A dictionary, naming engine type with key ``'engine'`` and possibly
|
||||
type-specific engine configuration parameters.
|
||||
the ``engine`` parameter any engine helper function accepts and for any engine
|
||||
specific options use the ``kwargs`` parameter.
|
||||
|
||||
Types
|
||||
=====
|
||||
|
||||
@@ -273,32 +273,26 @@ For complete parameters and object usage please see
|
||||
|
||||
.. code:: python
|
||||
|
||||
engine_conf = {
|
||||
'engine': 'worker-based',
|
||||
'url': 'amqp://guest:guest@localhost:5672//',
|
||||
'exchange': 'test-exchange',
|
||||
'topics': ['topic1', 'topic2'],
|
||||
}
|
||||
flow = lf.Flow('simple-linear').add(...)
|
||||
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
|
||||
eng = taskflow.engines.load(flow, engine='worker-based',
|
||||
url='amqp://guest:guest@localhost:5672//',
|
||||
exchange='test-exchange',
|
||||
topics=['topic1', 'topic2'])
|
||||
eng.run()
|
||||
|
||||
**Example with filesystem transport:**
|
||||
|
||||
.. code:: python
|
||||
|
||||
engine_conf = {
|
||||
'engine': 'worker-based',
|
||||
'exchange': 'test-exchange',
|
||||
'topics': ['topic1', 'topic2'],
|
||||
'transport': 'filesystem',
|
||||
'transport_options': {
|
||||
'data_folder_in': '/tmp/test',
|
||||
'data_folder_out': '/tmp/test',
|
||||
},
|
||||
}
|
||||
flow = lf.Flow('simple-linear').add(...)
|
||||
eng = taskflow.engines.load(flow, engine_conf=engine_conf)
|
||||
eng = taskflow.engines.load(flow, engine='worker-based',
|
||||
exchange='test-exchange',
|
||||
topics=['topic1', 'topic2'],
|
||||
transport='filesystem',
|
||||
transport_options={
|
||||
'data_folder_in': '/tmp/in',
|
||||
'data_folder_out': '/tmp/out',
|
||||
})
|
||||
eng.run()
|
||||
|
||||
Additional supported keyword arguments:
|
||||
|
||||
@@ -17,7 +17,7 @@ import threading
|
||||
|
||||
import six
|
||||
|
||||
import taskflow.engines
|
||||
from taskflow import engines
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.utils import lock_utils
|
||||
|
||||
@@ -34,10 +34,15 @@ class Conductor(object):
|
||||
period of time will finish up the prior failed conductors work.
|
||||
"""
|
||||
|
||||
def __init__(self, name, jobboard, engine_conf, persistence):
|
||||
def __init__(self, name, jobboard, persistence,
|
||||
engine=None, engine_options=None):
|
||||
self._name = name
|
||||
self._jobboard = jobboard
|
||||
self._engine_conf = engine_conf
|
||||
self._engine = engine
|
||||
if not engine_options:
|
||||
self._engine_options = {}
|
||||
else:
|
||||
self._engine_options = engine_options.copy()
|
||||
self._persistence = persistence
|
||||
self._lock = threading.RLock()
|
||||
|
||||
@@ -83,10 +88,10 @@ class Conductor(object):
|
||||
store = dict(job.details["store"])
|
||||
else:
|
||||
store = {}
|
||||
return taskflow.engines.load_from_detail(flow_detail,
|
||||
store=store,
|
||||
engine_conf=self._engine_conf,
|
||||
backend=self._persistence)
|
||||
return engines.load_from_detail(flow_detail, store=store,
|
||||
engine=self._engine,
|
||||
backend=self._persistence,
|
||||
**self._engine_options)
|
||||
|
||||
@lock_utils.locked
|
||||
def connect(self):
|
||||
|
||||
@@ -51,11 +51,11 @@ class SingleThreadedConductor(base.Conductor):
|
||||
upon the jobboard capabilities to automatically abandon these jobs.
|
||||
"""
|
||||
|
||||
def __init__(self, name, jobboard, engine_conf, persistence,
|
||||
wait_timeout=None):
|
||||
super(SingleThreadedConductor, self).__init__(name, jobboard,
|
||||
engine_conf,
|
||||
persistence)
|
||||
def __init__(self, name, jobboard, persistence,
|
||||
engine=None, engine_options=None, wait_timeout=None):
|
||||
super(SingleThreadedConductor, self).__init__(
|
||||
name, jobboard, persistence,
|
||||
engine=engine, engine_options=engine_options)
|
||||
if wait_timeout is None:
|
||||
wait_timeout = WAIT_TIMEOUT
|
||||
if isinstance(wait_timeout, (int, float) + six.string_types):
|
||||
|
||||
@@ -59,8 +59,8 @@ class ActionEngine(base.EngineBase):
|
||||
_compiler_factory = compiler.PatternCompiler
|
||||
_task_executor_factory = executor.SerialTaskExecutor
|
||||
|
||||
def __init__(self, flow, flow_detail, backend, conf):
|
||||
super(ActionEngine, self).__init__(flow, flow_detail, backend, conf)
|
||||
def __init__(self, flow, flow_detail, backend, options):
|
||||
super(ActionEngine, self).__init__(flow, flow_detail, backend, options)
|
||||
self._runtime = None
|
||||
self._compiled = False
|
||||
self._compilation = None
|
||||
@@ -230,12 +230,6 @@ class ParallelActionEngine(ActionEngine):
|
||||
_storage_factory = atom_storage.MultiThreadedStorage
|
||||
|
||||
def _task_executor_factory(self):
|
||||
return executor.ParallelTaskExecutor(executor=self._executor,
|
||||
max_workers=self._max_workers)
|
||||
|
||||
def __init__(self, flow, flow_detail, backend, conf,
|
||||
executor=None, max_workers=None):
|
||||
super(ParallelActionEngine, self).__init__(flow, flow_detail,
|
||||
backend, conf)
|
||||
self._executor = executor
|
||||
self._max_workers = max_workers
|
||||
return executor.ParallelTaskExecutor(
|
||||
executor=self._options.get('executor'),
|
||||
max_workers=self._options.get('max_workers'))
|
||||
|
||||
@@ -32,17 +32,22 @@ class EngineBase(object):
|
||||
occur related to the tasks the engine contains.
|
||||
"""
|
||||
|
||||
def __init__(self, flow, flow_detail, backend, conf):
|
||||
def __init__(self, flow, flow_detail, backend, options):
|
||||
self._flow = flow
|
||||
self._flow_detail = flow_detail
|
||||
self._backend = backend
|
||||
if not conf:
|
||||
self._conf = {}
|
||||
if not options:
|
||||
self._options = {}
|
||||
else:
|
||||
self._conf = dict(conf)
|
||||
self._options = dict(options)
|
||||
self.notifier = misc.Notifier()
|
||||
self.task_notifier = misc.Notifier()
|
||||
|
||||
@property
|
||||
def options(self):
|
||||
"""The options that were passed to this engine on construction."""
|
||||
return self._options
|
||||
|
||||
@misc.cachedproperty
|
||||
def storage(self):
|
||||
"""The storage unit for this flow."""
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import warnings
|
||||
|
||||
from oslo.utils import importutils
|
||||
import six
|
||||
@@ -30,6 +31,40 @@ from taskflow.utils import reflection
|
||||
# NOTE(imelnikov): this is the entrypoint namespace, not the module namespace.
|
||||
ENGINES_NAMESPACE = 'taskflow.engines'
|
||||
|
||||
# The default entrypoint engine type looked for when it is not provided.
|
||||
ENGINE_DEFAULT = 'default'
|
||||
|
||||
|
||||
def _extract_engine(**kwargs):
|
||||
"""Extracts the engine kind and any associated options."""
|
||||
options = {}
|
||||
kind = kwargs.pop('engine', None)
|
||||
engine_conf = kwargs.pop('engine_conf', None)
|
||||
if engine_conf is not None:
|
||||
warnings.warn("Using the 'engine_conf' argument is"
|
||||
" deprecated and will be removed in a future version,"
|
||||
" please use the 'engine' argument instead.",
|
||||
DeprecationWarning)
|
||||
if isinstance(engine_conf, six.string_types):
|
||||
kind = engine_conf
|
||||
else:
|
||||
options.update(engine_conf)
|
||||
kind = options.pop('engine', None)
|
||||
if not kind:
|
||||
kind = ENGINE_DEFAULT
|
||||
# See if it's a URI and if so, extract any further options...
|
||||
try:
|
||||
pieces = misc.parse_uri(kind)
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
else:
|
||||
kind = pieces['scheme']
|
||||
options = misc.merge_uri(pieces, options.copy())
|
||||
# Merge in any leftover **kwargs into the options, this makes it so that
|
||||
# the provided **kwargs override any URI or engine_conf specific options.
|
||||
options.update(kwargs)
|
||||
return (kind, options)
|
||||
|
||||
|
||||
def _fetch_factory(factory_name):
|
||||
try:
|
||||
@@ -56,49 +91,43 @@ def _fetch_validate_factory(flow_factory):
|
||||
|
||||
|
||||
def load(flow, store=None, flow_detail=None, book=None,
|
||||
engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE,
|
||||
**kwargs):
|
||||
engine_conf=None, backend=None,
|
||||
namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT, **kwargs):
|
||||
"""Load a flow into an engine.
|
||||
|
||||
This function creates and prepares engine to run the
|
||||
flow. All that is left is to run the engine with 'run()' method.
|
||||
This function creates and prepares an engine to run the provided flow. All
|
||||
that is left after this returns is to run the engine with the
|
||||
engines ``run()`` method.
|
||||
|
||||
Which engine to load is specified in 'engine_conf' parameter. It
|
||||
can be a string that names engine type or a dictionary which holds
|
||||
engine type (with 'engine' key) and additional engine-specific
|
||||
configuration.
|
||||
Which engine to load is specified via the ``engine`` parameter. It
|
||||
can be a string that names the engine type to use, or a string that
|
||||
is a URI with a scheme that names the engine type to use and further
|
||||
options contained in the URI's host, port, and query parameters...
|
||||
|
||||
Which storage backend to use is defined by backend parameter. It
|
||||
Which storage backend to use is defined by the backend parameter. It
|
||||
can be backend itself, or a dictionary that is passed to
|
||||
taskflow.persistence.backends.fetch to obtain backend.
|
||||
``taskflow.persistence.backends.fetch()`` to obtain a viable backend.
|
||||
|
||||
:param flow: flow to load
|
||||
:param store: dict -- data to put to storage to satisfy flow requirements
|
||||
:param flow_detail: FlowDetail that holds the state of the flow (if one is
|
||||
not provided then one will be created for you in the provided backend)
|
||||
:param book: LogBook to create flow detail in if flow_detail is None
|
||||
:param engine_conf: engine type and configuration configuration
|
||||
:param backend: storage backend to use or configuration
|
||||
:param namespace: driver namespace for stevedore (default is fine
|
||||
if you don't know what is it)
|
||||
:param engine_conf: engine type or URI and options (**deprecated**)
|
||||
:param backend: storage backend to use or configuration that defines it
|
||||
:param namespace: driver namespace for stevedore (or empty for default)
|
||||
:param engine: string engine type or URI string with scheme that contains
|
||||
the engine type and any URI specific components that will
|
||||
become part of the engine options.
|
||||
:param kwargs: arbitrary keyword arguments passed as options (merged with
|
||||
any extracted ``engine`` and ``engine_conf`` options),
|
||||
typically used for any engine specific options that do not
|
||||
fit as any of the existing arguments.
|
||||
:returns: engine
|
||||
"""
|
||||
|
||||
if engine_conf is None:
|
||||
engine_conf = {'engine': 'default'}
|
||||
|
||||
# NOTE(imelnikov): this allows simpler syntax.
|
||||
if isinstance(engine_conf, six.string_types):
|
||||
engine_conf = {'engine': engine_conf}
|
||||
|
||||
engine_name = engine_conf['engine']
|
||||
try:
|
||||
pieces = misc.parse_uri(engine_name)
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
else:
|
||||
engine_name = pieces['scheme']
|
||||
engine_conf = misc.merge_uri(pieces, engine_conf.copy())
|
||||
kind, options = _extract_engine(engine_conf=engine_conf,
|
||||
engine=engine, **kwargs)
|
||||
|
||||
if isinstance(backend, dict):
|
||||
backend = p_backends.fetch(backend)
|
||||
@@ -109,13 +138,12 @@ def load(flow, store=None, flow_detail=None, book=None,
|
||||
|
||||
try:
|
||||
mgr = stevedore.driver.DriverManager(
|
||||
namespace, engine_name,
|
||||
namespace, kind,
|
||||
invoke_on_load=True,
|
||||
invoke_args=(flow, flow_detail, backend, engine_conf),
|
||||
invoke_kwds=kwargs)
|
||||
invoke_args=(flow, flow_detail, backend, options))
|
||||
engine = mgr.driver
|
||||
except RuntimeError as e:
|
||||
raise exc.NotFound("Could not find engine %s" % (engine_name), e)
|
||||
raise exc.NotFound("Could not find engine '%s'" % (kind), e)
|
||||
else:
|
||||
if store:
|
||||
engine.storage.inject(store)
|
||||
@@ -123,35 +151,20 @@ def load(flow, store=None, flow_detail=None, book=None,
|
||||
|
||||
|
||||
def run(flow, store=None, flow_detail=None, book=None,
|
||||
engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE, **kwargs):
|
||||
engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE,
|
||||
engine=ENGINE_DEFAULT, **kwargs):
|
||||
"""Run the flow.
|
||||
|
||||
This function load the flow into engine (with 'load' function)
|
||||
and runs the engine.
|
||||
This function loads the flow into an engine (with the :func:`load() <load>`
|
||||
function) and runs the engine.
|
||||
|
||||
Which engine to load is specified in 'engine_conf' parameter. It
|
||||
can be a string that names engine type or a dictionary which holds
|
||||
engine type (with 'engine' key) and additional engine-specific
|
||||
configuration.
|
||||
The arguments are interpreted as for :func:`load() <load>`.
|
||||
|
||||
Which storage backend to use is defined by backend parameter. It
|
||||
can be backend itself, or a dictionary that is passed to
|
||||
taskflow.persistence.backends.fetch to obtain backend.
|
||||
|
||||
:param flow: flow to run
|
||||
:param store: dict -- data to put to storage to satisfy flow requirements
|
||||
:param flow_detail: FlowDetail that holds the state of the flow (if one is
|
||||
not provided then one will be created for you in the provided backend)
|
||||
:param book: LogBook to create flow detail in if flow_detail is None
|
||||
:param engine_conf: engine type and configuration configuration
|
||||
:param backend: storage backend to use or configuration
|
||||
:param namespace: driver namespace for stevedore (default is fine
|
||||
if you don't know what is it)
|
||||
:returns: dictionary of all named task results (see Storage.fetch_all)
|
||||
:returns: dictionary of all named results (see ``storage.fetch_all()``)
|
||||
"""
|
||||
engine = load(flow, store=store, flow_detail=flow_detail, book=book,
|
||||
engine_conf=engine_conf, backend=backend,
|
||||
namespace=namespace, **kwargs)
|
||||
namespace=namespace, engine=engine, **kwargs)
|
||||
engine.run()
|
||||
return engine.storage.fetch_all()
|
||||
|
||||
@@ -196,23 +209,21 @@ def save_factory_details(flow_detail,
|
||||
|
||||
def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None,
|
||||
store=None, book=None, engine_conf=None, backend=None,
|
||||
namespace=ENGINES_NAMESPACE, **kwargs):
|
||||
namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT,
|
||||
**kwargs):
|
||||
"""Loads a flow from a factory function into an engine.
|
||||
|
||||
Gets flow factory function (or name of it) and creates flow with
|
||||
it. Then, flow is loaded into engine with load(), and factory
|
||||
function fully qualified name is saved to flow metadata so that
|
||||
it can be later resumed with resume.
|
||||
it. Then, the flow is loaded into an engine with the :func:`load() <load>`
|
||||
function, and the factory function fully qualified name is saved to flow
|
||||
metadata so that it can be later resumed.
|
||||
|
||||
:param flow_factory: function or string: function that creates the flow
|
||||
:param factory_args: list or tuple of factory positional arguments
|
||||
:param factory_kwargs: dict of factory keyword arguments
|
||||
:param store: dict -- data to put to storage to satisfy flow requirements
|
||||
:param book: LogBook to create flow detail in
|
||||
:param engine_conf: engine type and configuration configuration
|
||||
:param backend: storage backend to use or configuration
|
||||
:param namespace: driver namespace for stevedore (default is fine
|
||||
if you don't know what is it)
|
||||
|
||||
Further arguments are interpreted as for :func:`load() <load>`.
|
||||
|
||||
:returns: engine
|
||||
"""
|
||||
|
||||
@@ -230,7 +241,7 @@ def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None,
|
||||
backend=backend)
|
||||
return load(flow=flow, store=store, flow_detail=flow_detail, book=book,
|
||||
engine_conf=engine_conf, backend=backend, namespace=namespace,
|
||||
**kwargs)
|
||||
engine=engine, **kwargs)
|
||||
|
||||
|
||||
def flow_from_detail(flow_detail):
|
||||
@@ -261,21 +272,21 @@ def flow_from_detail(flow_detail):
|
||||
|
||||
|
||||
def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None,
|
||||
namespace=ENGINES_NAMESPACE, **kwargs):
|
||||
namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT,
|
||||
**kwargs):
|
||||
"""Reloads an engine previously saved.
|
||||
|
||||
This reloads the flow using the flow_from_detail() function and then calls
|
||||
into the load() function to create an engine from that flow.
|
||||
This reloads the flow using the
|
||||
:func:`flow_from_detail() <flow_from_detail>` function and then calls
|
||||
into the :func:`load() <load>` function to create an engine from that flow.
|
||||
|
||||
:param flow_detail: FlowDetail that holds state of the flow to load
|
||||
:param store: dict -- data to put to storage to satisfy flow requirements
|
||||
:param engine_conf: engine type and configuration configuration
|
||||
:param backend: storage backend to use or configuration
|
||||
:param namespace: driver namespace for stevedore (default is fine
|
||||
if you don't know what is it)
|
||||
|
||||
Further arguments are interpreted as for :func:`load() <load>`.
|
||||
|
||||
:returns: engine
|
||||
"""
|
||||
flow = flow_from_detail(flow_detail)
|
||||
return load(flow, flow_detail=flow_detail,
|
||||
store=store, engine_conf=engine_conf, backend=backend,
|
||||
namespace=namespace, **kwargs)
|
||||
namespace=namespace, engine=engine, **kwargs)
|
||||
|
||||
@@ -23,7 +23,7 @@ from taskflow import storage as t_storage
|
||||
class WorkerBasedActionEngine(engine.ActionEngine):
|
||||
"""Worker based action engine.
|
||||
|
||||
Specific backend configuration:
|
||||
Specific backend options:
|
||||
|
||||
:param exchange: broker exchange exchange name in which executor / worker
|
||||
communication is performed
|
||||
@@ -45,19 +45,15 @@ class WorkerBasedActionEngine(engine.ActionEngine):
|
||||
_storage_factory = t_storage.SingleThreadedStorage
|
||||
|
||||
def _task_executor_factory(self):
|
||||
if self._executor is not None:
|
||||
return self._executor
|
||||
try:
|
||||
return self._options['executor']
|
||||
except KeyError:
|
||||
return executor.WorkerTaskExecutor(
|
||||
uuid=self._flow_detail.uuid,
|
||||
url=self._conf.get('url'),
|
||||
exchange=self._conf.get('exchange', 'default'),
|
||||
topics=self._conf.get('topics', []),
|
||||
transport=self._conf.get('transport'),
|
||||
transport_options=self._conf.get('transport_options'),
|
||||
transition_timeout=self._conf.get('transition_timeout',
|
||||
url=self._options.get('url'),
|
||||
exchange=self._options.get('exchange', 'default'),
|
||||
topics=self._options.get('topics', []),
|
||||
transport=self._options.get('transport'),
|
||||
transport_options=self._options.get('transport_options'),
|
||||
transition_timeout=self._options.get('transition_timeout',
|
||||
pr.REQUEST_TIMEOUT))
|
||||
|
||||
def __init__(self, flow, flow_detail, backend, conf, **kwargs):
|
||||
super(WorkerBasedActionEngine, self).__init__(
|
||||
flow, flow_detail, backend, conf)
|
||||
self._executor = kwargs.get('executor')
|
||||
|
||||
@@ -93,5 +93,5 @@ flow = lf.Flow('root').add(
|
||||
# The result here will be all results (from all tasks) which is stored in an
|
||||
# in-memory storage location that backs this engine since it is not configured
|
||||
# with persistence storage.
|
||||
result = taskflow.engines.run(flow, engine_conf='parallel')
|
||||
result = taskflow.engines.run(flow, engine='parallel')
|
||||
print(result)
|
||||
|
||||
@@ -64,13 +64,9 @@ VOLUME_COUNT = 5
|
||||
# time difference that this causes.
|
||||
SERIAL = False
|
||||
if SERIAL:
|
||||
engine_conf = {
|
||||
'engine': 'serial',
|
||||
}
|
||||
engine = 'serial'
|
||||
else:
|
||||
engine_conf = {
|
||||
'engine': 'parallel',
|
||||
}
|
||||
engine = 'parallel'
|
||||
|
||||
|
||||
class VolumeCreator(task.Task):
|
||||
@@ -106,7 +102,7 @@ for i in range(0, VOLUME_COUNT):
|
||||
|
||||
# Show how much time the overall engine loading and running takes.
|
||||
with show_time(name=flow.name.title()):
|
||||
eng = engines.load(flow, engine_conf=engine_conf)
|
||||
eng = engines.load(flow, engine=engine)
|
||||
# This context manager automatically adds (and automatically removes) a
|
||||
# helpful set of state transition notification printing helper utilities
|
||||
# that show you exactly what transitions the engine is going through
|
||||
|
||||
@@ -74,7 +74,7 @@ class Bye(task.Task):
|
||||
|
||||
def return_from_flow(pool):
|
||||
wf = lf.Flow("root").add(Hi("hi"), Bye("bye"))
|
||||
eng = taskflow.engines.load(wf, engine_conf='serial')
|
||||
eng = taskflow.engines.load(wf, engine='serial')
|
||||
f = futures.Future()
|
||||
watcher = PokeFutureListener(eng, f, 'hi')
|
||||
watcher.register()
|
||||
|
||||
@@ -170,7 +170,7 @@ flow.add(sub_flow)
|
||||
store = {
|
||||
'request': misc.AttrDict(user="bob", id="1.35"),
|
||||
}
|
||||
eng = engines.load(flow, engine_conf='serial', store=store)
|
||||
eng = engines.load(flow, engine='serial', store=store)
|
||||
|
||||
# This context manager automatically adds (and automatically removes) a
|
||||
# helpful set of state transition notification printing helper utilities
|
||||
|
||||
@@ -81,11 +81,11 @@ store = {
|
||||
}
|
||||
|
||||
result = taskflow.engines.run(
|
||||
flow, engine_conf='serial', store=store)
|
||||
flow, engine='serial', store=store)
|
||||
|
||||
print("Single threaded engine result %s" % result)
|
||||
|
||||
result = taskflow.engines.run(
|
||||
flow, engine_conf='parallel', store=store)
|
||||
flow, engine='parallel', store=store)
|
||||
|
||||
print("Multi threaded engine result %s" % result)
|
||||
|
||||
@@ -91,20 +91,15 @@ else:
|
||||
blowup = True
|
||||
|
||||
with eu.get_backend(backend_uri) as backend:
|
||||
# Now we can run.
|
||||
engine_config = {
|
||||
'backend': backend,
|
||||
'engine_conf': 'serial',
|
||||
'book': logbook.LogBook("my-test"),
|
||||
}
|
||||
|
||||
# Make a flow that will blowup if the file doesn't exist previously, if it
|
||||
# did exist, assume we won't blowup (and therefore this shows the undo
|
||||
# and redo that a flow will go through).
|
||||
book = logbook.LogBook("my-test")
|
||||
flow = make_flow(blowup=blowup)
|
||||
eu.print_wrapped("Running")
|
||||
try:
|
||||
eng = engines.load(flow, **engine_config)
|
||||
eng = engines.load(flow, engine='serial',
|
||||
backend=backend, book=book)
|
||||
eng.run()
|
||||
if not blowup:
|
||||
eu.rm_path(persist_path)
|
||||
@@ -115,4 +110,4 @@ with eu.get_backend(backend_uri) as backend:
|
||||
traceback.print_exc(file=sys.stdout)
|
||||
|
||||
eu.print_wrapped("Book contents")
|
||||
print(p_utils.pformat(engine_config['book']))
|
||||
print(p_utils.pformat(book))
|
||||
|
||||
@@ -235,11 +235,9 @@ with eu.get_backend() as backend:
|
||||
flow_id = None
|
||||
|
||||
# Set up how we want our engine to run, serial, parallel...
|
||||
engine_conf = {
|
||||
'engine': 'parallel',
|
||||
}
|
||||
executor = None
|
||||
if e_utils.EVENTLET_AVAILABLE:
|
||||
engine_conf['executor'] = e_utils.GreenExecutor(5)
|
||||
executor = e_utils.GreenExecutor(5)
|
||||
|
||||
# Create/fetch a logbook that will track the workflows work.
|
||||
book = None
|
||||
@@ -255,15 +253,15 @@ with eu.get_backend() as backend:
|
||||
book = p_utils.temporary_log_book(backend)
|
||||
engine = engines.load_from_factory(create_flow,
|
||||
backend=backend, book=book,
|
||||
engine_conf=engine_conf)
|
||||
engine='parallel',
|
||||
executor=executor)
|
||||
print("!! Your tracking id is: '%s+%s'" % (book.uuid,
|
||||
engine.storage.flow_uuid))
|
||||
print("!! Please submit this on later runs for tracking purposes")
|
||||
else:
|
||||
# Attempt to load from a previously partially completed flow.
|
||||
engine = engines.load_from_detail(flow_detail,
|
||||
backend=backend,
|
||||
engine_conf=engine_conf)
|
||||
engine = engines.load_from_detail(flow_detail, backend=backend,
|
||||
engine='parallel', executor=executor)
|
||||
|
||||
# Make me my vm please!
|
||||
eu.print_wrapped('Running')
|
||||
|
||||
@@ -143,13 +143,9 @@ with example_utils.get_backend() as backend:
|
||||
flow_detail = find_flow_detail(backend, book_id, flow_id)
|
||||
|
||||
# Load and run.
|
||||
engine_conf = {
|
||||
'engine': 'serial',
|
||||
}
|
||||
engine = engines.load(flow,
|
||||
flow_detail=flow_detail,
|
||||
backend=backend,
|
||||
engine_conf=engine_conf)
|
||||
backend=backend, engine='serial')
|
||||
engine.run()
|
||||
|
||||
# How to use.
|
||||
|
||||
@@ -69,19 +69,16 @@ WORKER_CONF = {
|
||||
'taskflow.tests.utils:TaskMultiArgOneReturn'
|
||||
],
|
||||
}
|
||||
ENGINE_CONF = {
|
||||
'engine': 'worker-based',
|
||||
}
|
||||
|
||||
|
||||
def run(engine_conf):
|
||||
def run(engine_options):
|
||||
flow = lf.Flow('simple-linear').add(
|
||||
utils.TaskOneArgOneReturn(provides='result1'),
|
||||
utils.TaskMultiArgOneReturn(provides='result2')
|
||||
)
|
||||
eng = engines.load(flow,
|
||||
store=dict(x=111, y=222, z=333),
|
||||
engine_conf=engine_conf)
|
||||
engine='worker-based', **engine_options)
|
||||
eng.run()
|
||||
return eng.storage.fetch_all()
|
||||
|
||||
@@ -115,8 +112,7 @@ if __name__ == "__main__":
|
||||
})
|
||||
worker_conf = dict(WORKER_CONF)
|
||||
worker_conf.update(shared_conf)
|
||||
engine_conf = dict(ENGINE_CONF)
|
||||
engine_conf.update(shared_conf)
|
||||
engine_options = dict(shared_conf)
|
||||
workers = []
|
||||
worker_topics = []
|
||||
|
||||
@@ -135,8 +131,8 @@ if __name__ == "__main__":
|
||||
|
||||
# Now use those workers to do something.
|
||||
print('Executing some work.')
|
||||
engine_conf['topics'] = worker_topics
|
||||
result = run(engine_conf)
|
||||
engine_options['topics'] = worker_topics
|
||||
result = run(engine_options)
|
||||
print('Execution finished.')
|
||||
# This is done so that the test examples can work correctly
|
||||
# even when the keys change order (which will happen in various
|
||||
|
||||
@@ -93,7 +93,7 @@ def run(**store):
|
||||
try:
|
||||
with utils.wrap_all_failures():
|
||||
taskflow.engines.run(flow, store=store,
|
||||
engine_conf='parallel')
|
||||
engine='parallel')
|
||||
except exceptions.WrappedFailure as ex:
|
||||
unknown_failures = []
|
||||
for failure in ex:
|
||||
|
||||
@@ -63,11 +63,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
|
||||
board = impl_zookeeper.ZookeeperJobBoard(name, {},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
engine_conf = {
|
||||
'engine': 'default',
|
||||
}
|
||||
conductor = stc.SingleThreadedConductor(name, board, engine_conf,
|
||||
persistence, wait_timeout)
|
||||
conductor = stc.SingleThreadedConductor(name, board, persistence,
|
||||
wait_timeout=wait_timeout)
|
||||
return misc.AttrDict(board=board,
|
||||
client=client,
|
||||
persistence=persistence,
|
||||
|
||||
@@ -155,15 +155,14 @@ class SingleThreadedEngineTest(ArgumentsPassingTest,
|
||||
def _make_engine(self, flow, flow_detail=None):
|
||||
return taskflow.engines.load(flow,
|
||||
flow_detail=flow_detail,
|
||||
engine_conf='serial',
|
||||
engine='serial',
|
||||
backend=self.backend)
|
||||
|
||||
|
||||
class MultiThreadedEngineTest(ArgumentsPassingTest,
|
||||
test.TestCase):
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
engine_conf = dict(engine='parallel')
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
engine='parallel',
|
||||
backend=self.backend,
|
||||
executor=executor)
|
||||
|
||||
@@ -24,17 +24,30 @@ from taskflow.utils import persistence_utils as p_utils
|
||||
|
||||
|
||||
class EngineLoadingTestCase(test.TestCase):
|
||||
def test_default_load(self):
|
||||
def _make_dummy_flow(self):
|
||||
f = linear_flow.Flow('test')
|
||||
f.add(test_utils.TaskOneReturn("run-1"))
|
||||
return f
|
||||
|
||||
def test_default_load(self):
|
||||
f = self._make_dummy_flow()
|
||||
e = taskflow.engines.load(f)
|
||||
self.assertIsNotNone(e)
|
||||
|
||||
def test_unknown_load(self):
|
||||
f = linear_flow.Flow('test')
|
||||
f.add(test_utils.TaskOneReturn("run-1"))
|
||||
f = self._make_dummy_flow()
|
||||
self.assertRaises(exc.NotFound, taskflow.engines.load, f,
|
||||
engine_conf='not_really_any_engine')
|
||||
engine='not_really_any_engine')
|
||||
|
||||
def test_options_empty(self):
|
||||
f = self._make_dummy_flow()
|
||||
e = taskflow.engines.load(f)
|
||||
self.assertEqual({}, e.options)
|
||||
|
||||
def test_options_passthrough(self):
|
||||
f = self._make_dummy_flow()
|
||||
e = taskflow.engines.load(f, pass_1=1, pass_2=2)
|
||||
self.assertEqual({'pass_1': 1, 'pass_2': 2}, e.options)
|
||||
|
||||
|
||||
class FlowFromDetailTestCase(test.TestCase):
|
||||
|
||||
@@ -550,7 +550,7 @@ class SingleThreadedEngineTest(EngineTaskTest,
|
||||
def _make_engine(self, flow, flow_detail=None):
|
||||
return taskflow.engines.load(flow,
|
||||
flow_detail=flow_detail,
|
||||
engine_conf='serial',
|
||||
engine='serial',
|
||||
backend=self.backend)
|
||||
|
||||
def test_correct_load(self):
|
||||
@@ -570,16 +570,14 @@ class MultiThreadedEngineTest(EngineTaskTest,
|
||||
EngineCheckingTaskTest,
|
||||
test.TestCase):
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
engine_conf = dict(engine='parallel')
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
backend=self.backend,
|
||||
executor=executor)
|
||||
executor=executor,
|
||||
engine='parallel')
|
||||
|
||||
def test_correct_load(self):
|
||||
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
|
||||
self.assertIsInstance(engine, eng.ParallelActionEngine)
|
||||
self.assertIs(engine._executor, None)
|
||||
|
||||
def test_using_common_executor(self):
|
||||
flow = utils.TaskNoRequiresNoReturns(name='task1')
|
||||
@@ -587,7 +585,7 @@ class MultiThreadedEngineTest(EngineTaskTest,
|
||||
try:
|
||||
e1 = self._make_engine(flow, executor=executor)
|
||||
e2 = self._make_engine(flow, executor=executor)
|
||||
self.assertIs(e1._executor, e2._executor)
|
||||
self.assertIs(e1.options['executor'], e2.options['executor'])
|
||||
finally:
|
||||
executor.shutdown(wait=True)
|
||||
|
||||
@@ -604,11 +602,9 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
if executor is None:
|
||||
executor = eu.GreenExecutor()
|
||||
engine_conf = dict(engine='parallel',
|
||||
executor=executor)
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
backend=self.backend)
|
||||
backend=self.backend, engine='parallel',
|
||||
executor=executor)
|
||||
|
||||
|
||||
class WorkerBasedEngineTest(EngineTaskTest,
|
||||
@@ -647,15 +643,12 @@ class WorkerBasedEngineTest(EngineTaskTest,
|
||||
super(WorkerBasedEngineTest, self).tearDown()
|
||||
|
||||
def _make_engine(self, flow, flow_detail=None):
|
||||
engine_conf = {
|
||||
'engine': 'worker-based',
|
||||
'exchange': self.exchange,
|
||||
'topics': [self.topic],
|
||||
'transport': self.transport,
|
||||
}
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
backend=self.backend)
|
||||
backend=self.backend,
|
||||
engine='worker-based',
|
||||
exchange=self.exchange,
|
||||
topics=[self.topic],
|
||||
transport=self.transport)
|
||||
|
||||
def test_correct_load(self):
|
||||
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
|
||||
|
||||
@@ -758,7 +758,7 @@ class SingleThreadedEngineTest(RetryTest,
|
||||
def _make_engine(self, flow, flow_detail=None):
|
||||
return taskflow.engines.load(flow,
|
||||
flow_detail=flow_detail,
|
||||
engine_conf='serial',
|
||||
engine='serial',
|
||||
backend=self.backend)
|
||||
|
||||
|
||||
@@ -766,8 +766,7 @@ class MultiThreadedEngineTest(RetryTest,
|
||||
RetryParallelExecutionTest,
|
||||
test.TestCase):
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
engine_conf = dict(engine='parallel')
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
engine='parallel',
|
||||
backend=self.backend,
|
||||
executor=executor)
|
||||
|
||||
@@ -168,16 +168,15 @@ class SingleThreadedEngineTest(SuspendFlowTest,
|
||||
def _make_engine(self, flow, flow_detail=None):
|
||||
return taskflow.engines.load(flow,
|
||||
flow_detail=flow_detail,
|
||||
engine_conf='serial',
|
||||
engine='serial',
|
||||
backend=self.backend)
|
||||
|
||||
|
||||
class MultiThreadedEngineTest(SuspendFlowTest,
|
||||
test.TestCase):
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
engine_conf = dict(engine='parallel')
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
engine='parallel',
|
||||
backend=self.backend,
|
||||
executor=executor)
|
||||
|
||||
@@ -189,8 +188,7 @@ class ParallelEngineWithEventletTest(SuspendFlowTest,
|
||||
def _make_engine(self, flow, flow_detail=None, executor=None):
|
||||
if executor is None:
|
||||
executor = eu.GreenExecutor()
|
||||
engine_conf = dict(engine='parallel')
|
||||
return taskflow.engines.load(flow, flow_detail=flow_detail,
|
||||
engine_conf=engine_conf,
|
||||
engine='parallel',
|
||||
backend=self.backend,
|
||||
executor=executor)
|
||||
|
||||
Reference in New Issue
Block a user