Merge "Deprecate engine_conf and prefer engine instead"

This commit is contained in:
Jenkins
2014-10-19 01:43:08 +00:00
committed by Gerrit Code Review
24 changed files with 209 additions and 227 deletions

View File

@@ -118,9 +118,9 @@ might look like::
... ...
flow = make_flow() flow = make_flow()
engine = engines.load(flow, engine_conf=my_conf, eng = engines.load(flow, engine='serial', backend=my_persistence_conf)
backend=my_persistence_conf) eng.run()
engine.run ...
.. automodule:: taskflow.engines.helpers .. automodule:: taskflow.engines.helpers
@@ -129,11 +129,8 @@ Usage
===== =====
To select which engine to use and pass parameters to an engine you should use To select which engine to use and pass parameters to an engine you should use
the ``engine_conf`` parameter any helper factory function accepts. It may be: the ``engine`` parameter any engine helper function accepts and for any engine
specific options use the ``kwargs`` parameter.
* A string, naming the engine type.
* A dictionary, naming engine type with key ``'engine'`` and possibly
type-specific engine configuration parameters.
Types Types
===== =====

View File

@@ -273,32 +273,26 @@ For complete parameters and object usage please see
.. code:: python .. code:: python
engine_conf = {
'engine': 'worker-based',
'url': 'amqp://guest:guest@localhost:5672//',
'exchange': 'test-exchange',
'topics': ['topic1', 'topic2'],
}
flow = lf.Flow('simple-linear').add(...) flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf) eng = taskflow.engines.load(flow, engine='worker-based',
url='amqp://guest:guest@localhost:5672//',
exchange='test-exchange',
topics=['topic1', 'topic2'])
eng.run() eng.run()
**Example with filesystem transport:** **Example with filesystem transport:**
.. code:: python .. code:: python
engine_conf = {
'engine': 'worker-based',
'exchange': 'test-exchange',
'topics': ['topic1', 'topic2'],
'transport': 'filesystem',
'transport_options': {
'data_folder_in': '/tmp/test',
'data_folder_out': '/tmp/test',
},
}
flow = lf.Flow('simple-linear').add(...) flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine_conf=engine_conf) eng = taskflow.engines.load(flow, engine='worker-based',
exchange='test-exchange',
topics=['topic1', 'topic2'],
transport='filesystem',
transport_options={
'data_folder_in': '/tmp/in',
'data_folder_out': '/tmp/out',
})
eng.run() eng.run()
Additional supported keyword arguments: Additional supported keyword arguments:

View File

@@ -17,7 +17,7 @@ import threading
import six import six
import taskflow.engines from taskflow import engines
from taskflow import exceptions as excp from taskflow import exceptions as excp
from taskflow.utils import lock_utils from taskflow.utils import lock_utils
@@ -34,10 +34,15 @@ class Conductor(object):
period of time will finish up the prior failed conductors work. period of time will finish up the prior failed conductors work.
""" """
def __init__(self, name, jobboard, engine_conf, persistence): def __init__(self, name, jobboard, persistence,
engine=None, engine_options=None):
self._name = name self._name = name
self._jobboard = jobboard self._jobboard = jobboard
self._engine_conf = engine_conf self._engine = engine
if not engine_options:
self._engine_options = {}
else:
self._engine_options = engine_options.copy()
self._persistence = persistence self._persistence = persistence
self._lock = threading.RLock() self._lock = threading.RLock()
@@ -83,10 +88,10 @@ class Conductor(object):
store = dict(job.details["store"]) store = dict(job.details["store"])
else: else:
store = {} store = {}
return taskflow.engines.load_from_detail(flow_detail, return engines.load_from_detail(flow_detail, store=store,
store=store, engine=self._engine,
engine_conf=self._engine_conf, backend=self._persistence,
backend=self._persistence) **self._engine_options)
@lock_utils.locked @lock_utils.locked
def connect(self): def connect(self):

View File

@@ -51,11 +51,11 @@ class SingleThreadedConductor(base.Conductor):
upon the jobboard capabilities to automatically abandon these jobs. upon the jobboard capabilities to automatically abandon these jobs.
""" """
def __init__(self, name, jobboard, engine_conf, persistence, def __init__(self, name, jobboard, persistence,
wait_timeout=None): engine=None, engine_options=None, wait_timeout=None):
super(SingleThreadedConductor, self).__init__(name, jobboard, super(SingleThreadedConductor, self).__init__(
engine_conf, name, jobboard, persistence,
persistence) engine=engine, engine_options=engine_options)
if wait_timeout is None: if wait_timeout is None:
wait_timeout = WAIT_TIMEOUT wait_timeout = WAIT_TIMEOUT
if isinstance(wait_timeout, (int, float) + six.string_types): if isinstance(wait_timeout, (int, float) + six.string_types):

View File

@@ -59,8 +59,8 @@ class ActionEngine(base.EngineBase):
_compiler_factory = compiler.PatternCompiler _compiler_factory = compiler.PatternCompiler
_task_executor_factory = executor.SerialTaskExecutor _task_executor_factory = executor.SerialTaskExecutor
def __init__(self, flow, flow_detail, backend, conf): def __init__(self, flow, flow_detail, backend, options):
super(ActionEngine, self).__init__(flow, flow_detail, backend, conf) super(ActionEngine, self).__init__(flow, flow_detail, backend, options)
self._runtime = None self._runtime = None
self._compiled = False self._compiled = False
self._compilation = None self._compilation = None
@@ -230,12 +230,6 @@ class ParallelActionEngine(ActionEngine):
_storage_factory = atom_storage.MultiThreadedStorage _storage_factory = atom_storage.MultiThreadedStorage
def _task_executor_factory(self): def _task_executor_factory(self):
return executor.ParallelTaskExecutor(executor=self._executor, return executor.ParallelTaskExecutor(
max_workers=self._max_workers) executor=self._options.get('executor'),
max_workers=self._options.get('max_workers'))
def __init__(self, flow, flow_detail, backend, conf,
executor=None, max_workers=None):
super(ParallelActionEngine, self).__init__(flow, flow_detail,
backend, conf)
self._executor = executor
self._max_workers = max_workers

View File

@@ -32,17 +32,22 @@ class EngineBase(object):
occur related to the tasks the engine contains. occur related to the tasks the engine contains.
""" """
def __init__(self, flow, flow_detail, backend, conf): def __init__(self, flow, flow_detail, backend, options):
self._flow = flow self._flow = flow
self._flow_detail = flow_detail self._flow_detail = flow_detail
self._backend = backend self._backend = backend
if not conf: if not options:
self._conf = {} self._options = {}
else: else:
self._conf = dict(conf) self._options = dict(options)
self.notifier = misc.Notifier() self.notifier = misc.Notifier()
self.task_notifier = misc.Notifier() self.task_notifier = misc.Notifier()
@property
def options(self):
"""The options that were passed to this engine on construction."""
return self._options
@misc.cachedproperty @misc.cachedproperty
def storage(self): def storage(self):
"""The storage unit for this flow.""" """The storage unit for this flow."""

View File

@@ -15,6 +15,7 @@
# under the License. # under the License.
import contextlib import contextlib
import warnings
from oslo.utils import importutils from oslo.utils import importutils
import six import six
@@ -30,6 +31,40 @@ from taskflow.utils import reflection
# NOTE(imelnikov): this is the entrypoint namespace, not the module namespace. # NOTE(imelnikov): this is the entrypoint namespace, not the module namespace.
ENGINES_NAMESPACE = 'taskflow.engines' ENGINES_NAMESPACE = 'taskflow.engines'
# The default entrypoint engine type looked for when it is not provided.
ENGINE_DEFAULT = 'default'
def _extract_engine(**kwargs):
"""Extracts the engine kind and any associated options."""
options = {}
kind = kwargs.pop('engine', None)
engine_conf = kwargs.pop('engine_conf', None)
if engine_conf is not None:
warnings.warn("Using the 'engine_conf' argument is"
" deprecated and will be removed in a future version,"
" please use the 'engine' argument instead.",
DeprecationWarning)
if isinstance(engine_conf, six.string_types):
kind = engine_conf
else:
options.update(engine_conf)
kind = options.pop('engine', None)
if not kind:
kind = ENGINE_DEFAULT
# See if it's a URI and if so, extract any further options...
try:
pieces = misc.parse_uri(kind)
except (TypeError, ValueError):
pass
else:
kind = pieces['scheme']
options = misc.merge_uri(pieces, options.copy())
# Merge in any leftover **kwargs into the options, this makes it so that
# the provided **kwargs override any URI or engine_conf specific options.
options.update(kwargs)
return (kind, options)
def _fetch_factory(factory_name): def _fetch_factory(factory_name):
try: try:
@@ -56,49 +91,43 @@ def _fetch_validate_factory(flow_factory):
def load(flow, store=None, flow_detail=None, book=None, def load(flow, store=None, flow_detail=None, book=None,
engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE, engine_conf=None, backend=None,
**kwargs): namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT, **kwargs):
"""Load a flow into an engine. """Load a flow into an engine.
This function creates and prepares engine to run the This function creates and prepares an engine to run the provided flow. All
flow. All that is left is to run the engine with 'run()' method. that is left after this returns is to run the engine with the
engines ``run()`` method.
Which engine to load is specified in 'engine_conf' parameter. It Which engine to load is specified via the ``engine`` parameter. It
can be a string that names engine type or a dictionary which holds can be a string that names the engine type to use, or a string that
engine type (with 'engine' key) and additional engine-specific is a URI with a scheme that names the engine type to use and further
configuration. options contained in the URI's host, port, and query parameters...
Which storage backend to use is defined by backend parameter. It Which storage backend to use is defined by the backend parameter. It
can be backend itself, or a dictionary that is passed to can be backend itself, or a dictionary that is passed to
taskflow.persistence.backends.fetch to obtain backend. ``taskflow.persistence.backends.fetch()`` to obtain a viable backend.
:param flow: flow to load :param flow: flow to load
:param store: dict -- data to put to storage to satisfy flow requirements :param store: dict -- data to put to storage to satisfy flow requirements
:param flow_detail: FlowDetail that holds the state of the flow (if one is :param flow_detail: FlowDetail that holds the state of the flow (if one is
not provided then one will be created for you in the provided backend) not provided then one will be created for you in the provided backend)
:param book: LogBook to create flow detail in if flow_detail is None :param book: LogBook to create flow detail in if flow_detail is None
:param engine_conf: engine type and configuration configuration :param engine_conf: engine type or URI and options (**deprecated**)
:param backend: storage backend to use or configuration :param backend: storage backend to use or configuration that defines it
:param namespace: driver namespace for stevedore (default is fine :param namespace: driver namespace for stevedore (or empty for default)
if you don't know what is it) :param engine: string engine type or URI string with scheme that contains
the engine type and any URI specific components that will
become part of the engine options.
:param kwargs: arbitrary keyword arguments passed as options (merged with
any extracted ``engine`` and ``engine_conf`` options),
typically used for any engine specific options that do not
fit as any of the existing arguments.
:returns: engine :returns: engine
""" """
if engine_conf is None: kind, options = _extract_engine(engine_conf=engine_conf,
engine_conf = {'engine': 'default'} engine=engine, **kwargs)
# NOTE(imelnikov): this allows simpler syntax.
if isinstance(engine_conf, six.string_types):
engine_conf = {'engine': engine_conf}
engine_name = engine_conf['engine']
try:
pieces = misc.parse_uri(engine_name)
except (TypeError, ValueError):
pass
else:
engine_name = pieces['scheme']
engine_conf = misc.merge_uri(pieces, engine_conf.copy())
if isinstance(backend, dict): if isinstance(backend, dict):
backend = p_backends.fetch(backend) backend = p_backends.fetch(backend)
@@ -109,13 +138,12 @@ def load(flow, store=None, flow_detail=None, book=None,
try: try:
mgr = stevedore.driver.DriverManager( mgr = stevedore.driver.DriverManager(
namespace, engine_name, namespace, kind,
invoke_on_load=True, invoke_on_load=True,
invoke_args=(flow, flow_detail, backend, engine_conf), invoke_args=(flow, flow_detail, backend, options))
invoke_kwds=kwargs)
engine = mgr.driver engine = mgr.driver
except RuntimeError as e: except RuntimeError as e:
raise exc.NotFound("Could not find engine %s" % (engine_name), e) raise exc.NotFound("Could not find engine '%s'" % (kind), e)
else: else:
if store: if store:
engine.storage.inject(store) engine.storage.inject(store)
@@ -123,35 +151,20 @@ def load(flow, store=None, flow_detail=None, book=None,
def run(flow, store=None, flow_detail=None, book=None, def run(flow, store=None, flow_detail=None, book=None,
engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE, **kwargs): engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE,
engine=ENGINE_DEFAULT, **kwargs):
"""Run the flow. """Run the flow.
This function load the flow into engine (with 'load' function) This function loads the flow into an engine (with the :func:`load() <load>`
and runs the engine. function) and runs the engine.
Which engine to load is specified in 'engine_conf' parameter. It The arguments are interpreted as for :func:`load() <load>`.
can be a string that names engine type or a dictionary which holds
engine type (with 'engine' key) and additional engine-specific
configuration.
Which storage backend to use is defined by backend parameter. It :returns: dictionary of all named results (see ``storage.fetch_all()``)
can be backend itself, or a dictionary that is passed to
taskflow.persistence.backends.fetch to obtain backend.
:param flow: flow to run
:param store: dict -- data to put to storage to satisfy flow requirements
:param flow_detail: FlowDetail that holds the state of the flow (if one is
not provided then one will be created for you in the provided backend)
:param book: LogBook to create flow detail in if flow_detail is None
:param engine_conf: engine type and configuration configuration
:param backend: storage backend to use or configuration
:param namespace: driver namespace for stevedore (default is fine
if you don't know what is it)
:returns: dictionary of all named task results (see Storage.fetch_all)
""" """
engine = load(flow, store=store, flow_detail=flow_detail, book=book, engine = load(flow, store=store, flow_detail=flow_detail, book=book,
engine_conf=engine_conf, backend=backend, engine_conf=engine_conf, backend=backend,
namespace=namespace, **kwargs) namespace=namespace, engine=engine, **kwargs)
engine.run() engine.run()
return engine.storage.fetch_all() return engine.storage.fetch_all()
@@ -196,23 +209,21 @@ def save_factory_details(flow_detail,
def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None, def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None,
store=None, book=None, engine_conf=None, backend=None, store=None, book=None, engine_conf=None, backend=None,
namespace=ENGINES_NAMESPACE, **kwargs): namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT,
**kwargs):
"""Loads a flow from a factory function into an engine. """Loads a flow from a factory function into an engine.
Gets flow factory function (or name of it) and creates flow with Gets flow factory function (or name of it) and creates flow with
it. Then, flow is loaded into engine with load(), and factory it. Then, the flow is loaded into an engine with the :func:`load() <load>`
function fully qualified name is saved to flow metadata so that function, and the factory function fully qualified name is saved to flow
it can be later resumed with resume. metadata so that it can be later resumed.
:param flow_factory: function or string: function that creates the flow :param flow_factory: function or string: function that creates the flow
:param factory_args: list or tuple of factory positional arguments :param factory_args: list or tuple of factory positional arguments
:param factory_kwargs: dict of factory keyword arguments :param factory_kwargs: dict of factory keyword arguments
:param store: dict -- data to put to storage to satisfy flow requirements
:param book: LogBook to create flow detail in Further arguments are interpreted as for :func:`load() <load>`.
:param engine_conf: engine type and configuration configuration
:param backend: storage backend to use or configuration
:param namespace: driver namespace for stevedore (default is fine
if you don't know what is it)
:returns: engine :returns: engine
""" """
@@ -230,7 +241,7 @@ def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None,
backend=backend) backend=backend)
return load(flow=flow, store=store, flow_detail=flow_detail, book=book, return load(flow=flow, store=store, flow_detail=flow_detail, book=book,
engine_conf=engine_conf, backend=backend, namespace=namespace, engine_conf=engine_conf, backend=backend, namespace=namespace,
**kwargs) engine=engine, **kwargs)
def flow_from_detail(flow_detail): def flow_from_detail(flow_detail):
@@ -261,21 +272,21 @@ def flow_from_detail(flow_detail):
def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None, def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None,
namespace=ENGINES_NAMESPACE, **kwargs): namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT,
**kwargs):
"""Reloads an engine previously saved. """Reloads an engine previously saved.
This reloads the flow using the flow_from_detail() function and then calls This reloads the flow using the
into the load() function to create an engine from that flow. :func:`flow_from_detail() <flow_from_detail>` function and then calls
into the :func:`load() <load>` function to create an engine from that flow.
:param flow_detail: FlowDetail that holds state of the flow to load :param flow_detail: FlowDetail that holds state of the flow to load
:param store: dict -- data to put to storage to satisfy flow requirements
:param engine_conf: engine type and configuration configuration Further arguments are interpreted as for :func:`load() <load>`.
:param backend: storage backend to use or configuration
:param namespace: driver namespace for stevedore (default is fine
if you don't know what is it)
:returns: engine :returns: engine
""" """
flow = flow_from_detail(flow_detail) flow = flow_from_detail(flow_detail)
return load(flow, flow_detail=flow_detail, return load(flow, flow_detail=flow_detail,
store=store, engine_conf=engine_conf, backend=backend, store=store, engine_conf=engine_conf, backend=backend,
namespace=namespace, **kwargs) namespace=namespace, engine=engine, **kwargs)

View File

@@ -23,7 +23,7 @@ from taskflow import storage as t_storage
class WorkerBasedActionEngine(engine.ActionEngine): class WorkerBasedActionEngine(engine.ActionEngine):
"""Worker based action engine. """Worker based action engine.
Specific backend configuration: Specific backend options:
:param exchange: broker exchange exchange name in which executor / worker :param exchange: broker exchange exchange name in which executor / worker
communication is performed communication is performed
@@ -45,19 +45,15 @@ class WorkerBasedActionEngine(engine.ActionEngine):
_storage_factory = t_storage.SingleThreadedStorage _storage_factory = t_storage.SingleThreadedStorage
def _task_executor_factory(self): def _task_executor_factory(self):
if self._executor is not None: try:
return self._executor return self._options['executor']
return executor.WorkerTaskExecutor( except KeyError:
uuid=self._flow_detail.uuid, return executor.WorkerTaskExecutor(
url=self._conf.get('url'), uuid=self._flow_detail.uuid,
exchange=self._conf.get('exchange', 'default'), url=self._options.get('url'),
topics=self._conf.get('topics', []), exchange=self._options.get('exchange', 'default'),
transport=self._conf.get('transport'), topics=self._options.get('topics', []),
transport_options=self._conf.get('transport_options'), transport=self._options.get('transport'),
transition_timeout=self._conf.get('transition_timeout', transport_options=self._options.get('transport_options'),
pr.REQUEST_TIMEOUT)) transition_timeout=self._options.get('transition_timeout',
pr.REQUEST_TIMEOUT))
def __init__(self, flow, flow_detail, backend, conf, **kwargs):
super(WorkerBasedActionEngine, self).__init__(
flow, flow_detail, backend, conf)
self._executor = kwargs.get('executor')

View File

@@ -93,5 +93,5 @@ flow = lf.Flow('root').add(
# The result here will be all results (from all tasks) which is stored in an # The result here will be all results (from all tasks) which is stored in an
# in-memory storage location that backs this engine since it is not configured # in-memory storage location that backs this engine since it is not configured
# with persistence storage. # with persistence storage.
result = taskflow.engines.run(flow, engine_conf='parallel') result = taskflow.engines.run(flow, engine='parallel')
print(result) print(result)

View File

@@ -64,13 +64,9 @@ VOLUME_COUNT = 5
# time difference that this causes. # time difference that this causes.
SERIAL = False SERIAL = False
if SERIAL: if SERIAL:
engine_conf = { engine = 'serial'
'engine': 'serial',
}
else: else:
engine_conf = { engine = 'parallel'
'engine': 'parallel',
}
class VolumeCreator(task.Task): class VolumeCreator(task.Task):
@@ -106,7 +102,7 @@ for i in range(0, VOLUME_COUNT):
# Show how much time the overall engine loading and running takes. # Show how much time the overall engine loading and running takes.
with show_time(name=flow.name.title()): with show_time(name=flow.name.title()):
eng = engines.load(flow, engine_conf=engine_conf) eng = engines.load(flow, engine=engine)
# This context manager automatically adds (and automatically removes) a # This context manager automatically adds (and automatically removes) a
# helpful set of state transition notification printing helper utilities # helpful set of state transition notification printing helper utilities
# that show you exactly what transitions the engine is going through # that show you exactly what transitions the engine is going through

View File

@@ -74,7 +74,7 @@ class Bye(task.Task):
def return_from_flow(pool): def return_from_flow(pool):
wf = lf.Flow("root").add(Hi("hi"), Bye("bye")) wf = lf.Flow("root").add(Hi("hi"), Bye("bye"))
eng = taskflow.engines.load(wf, engine_conf='serial') eng = taskflow.engines.load(wf, engine='serial')
f = futures.Future() f = futures.Future()
watcher = PokeFutureListener(eng, f, 'hi') watcher = PokeFutureListener(eng, f, 'hi')
watcher.register() watcher.register()

View File

@@ -170,7 +170,7 @@ flow.add(sub_flow)
store = { store = {
'request': misc.AttrDict(user="bob", id="1.35"), 'request': misc.AttrDict(user="bob", id="1.35"),
} }
eng = engines.load(flow, engine_conf='serial', store=store) eng = engines.load(flow, engine='serial', store=store)
# This context manager automatically adds (and automatically removes) a # This context manager automatically adds (and automatically removes) a
# helpful set of state transition notification printing helper utilities # helpful set of state transition notification printing helper utilities

View File

@@ -81,11 +81,11 @@ store = {
} }
result = taskflow.engines.run( result = taskflow.engines.run(
flow, engine_conf='serial', store=store) flow, engine='serial', store=store)
print("Single threaded engine result %s" % result) print("Single threaded engine result %s" % result)
result = taskflow.engines.run( result = taskflow.engines.run(
flow, engine_conf='parallel', store=store) flow, engine='parallel', store=store)
print("Multi threaded engine result %s" % result) print("Multi threaded engine result %s" % result)

View File

@@ -91,20 +91,15 @@ else:
blowup = True blowup = True
with eu.get_backend(backend_uri) as backend: with eu.get_backend(backend_uri) as backend:
# Now we can run.
engine_config = {
'backend': backend,
'engine_conf': 'serial',
'book': logbook.LogBook("my-test"),
}
# Make a flow that will blowup if the file doesn't exist previously, if it # Make a flow that will blowup if the file doesn't exist previously, if it
# did exist, assume we won't blowup (and therefore this shows the undo # did exist, assume we won't blowup (and therefore this shows the undo
# and redo that a flow will go through). # and redo that a flow will go through).
book = logbook.LogBook("my-test")
flow = make_flow(blowup=blowup) flow = make_flow(blowup=blowup)
eu.print_wrapped("Running") eu.print_wrapped("Running")
try: try:
eng = engines.load(flow, **engine_config) eng = engines.load(flow, engine='serial',
backend=backend, book=book)
eng.run() eng.run()
if not blowup: if not blowup:
eu.rm_path(persist_path) eu.rm_path(persist_path)
@@ -115,4 +110,4 @@ with eu.get_backend(backend_uri) as backend:
traceback.print_exc(file=sys.stdout) traceback.print_exc(file=sys.stdout)
eu.print_wrapped("Book contents") eu.print_wrapped("Book contents")
print(p_utils.pformat(engine_config['book'])) print(p_utils.pformat(book))

View File

@@ -235,11 +235,9 @@ with eu.get_backend() as backend:
flow_id = None flow_id = None
# Set up how we want our engine to run, serial, parallel... # Set up how we want our engine to run, serial, parallel...
engine_conf = { executor = None
'engine': 'parallel',
}
if e_utils.EVENTLET_AVAILABLE: if e_utils.EVENTLET_AVAILABLE:
engine_conf['executor'] = e_utils.GreenExecutor(5) executor = e_utils.GreenExecutor(5)
# Create/fetch a logbook that will track the workflows work. # Create/fetch a logbook that will track the workflows work.
book = None book = None
@@ -255,15 +253,15 @@ with eu.get_backend() as backend:
book = p_utils.temporary_log_book(backend) book = p_utils.temporary_log_book(backend)
engine = engines.load_from_factory(create_flow, engine = engines.load_from_factory(create_flow,
backend=backend, book=book, backend=backend, book=book,
engine_conf=engine_conf) engine='parallel',
executor=executor)
print("!! Your tracking id is: '%s+%s'" % (book.uuid, print("!! Your tracking id is: '%s+%s'" % (book.uuid,
engine.storage.flow_uuid)) engine.storage.flow_uuid))
print("!! Please submit this on later runs for tracking purposes") print("!! Please submit this on later runs for tracking purposes")
else: else:
# Attempt to load from a previously partially completed flow. # Attempt to load from a previously partially completed flow.
engine = engines.load_from_detail(flow_detail, engine = engines.load_from_detail(flow_detail, backend=backend,
backend=backend, engine='parallel', executor=executor)
engine_conf=engine_conf)
# Make me my vm please! # Make me my vm please!
eu.print_wrapped('Running') eu.print_wrapped('Running')

View File

@@ -143,13 +143,9 @@ with example_utils.get_backend() as backend:
flow_detail = find_flow_detail(backend, book_id, flow_id) flow_detail = find_flow_detail(backend, book_id, flow_id)
# Load and run. # Load and run.
engine_conf = {
'engine': 'serial',
}
engine = engines.load(flow, engine = engines.load(flow,
flow_detail=flow_detail, flow_detail=flow_detail,
backend=backend, backend=backend, engine='serial')
engine_conf=engine_conf)
engine.run() engine.run()
# How to use. # How to use.

View File

@@ -69,19 +69,16 @@ WORKER_CONF = {
'taskflow.tests.utils:TaskMultiArgOneReturn' 'taskflow.tests.utils:TaskMultiArgOneReturn'
], ],
} }
ENGINE_CONF = {
'engine': 'worker-based',
}
def run(engine_conf): def run(engine_options):
flow = lf.Flow('simple-linear').add( flow = lf.Flow('simple-linear').add(
utils.TaskOneArgOneReturn(provides='result1'), utils.TaskOneArgOneReturn(provides='result1'),
utils.TaskMultiArgOneReturn(provides='result2') utils.TaskMultiArgOneReturn(provides='result2')
) )
eng = engines.load(flow, eng = engines.load(flow,
store=dict(x=111, y=222, z=333), store=dict(x=111, y=222, z=333),
engine_conf=engine_conf) engine='worker-based', **engine_options)
eng.run() eng.run()
return eng.storage.fetch_all() return eng.storage.fetch_all()
@@ -115,8 +112,7 @@ if __name__ == "__main__":
}) })
worker_conf = dict(WORKER_CONF) worker_conf = dict(WORKER_CONF)
worker_conf.update(shared_conf) worker_conf.update(shared_conf)
engine_conf = dict(ENGINE_CONF) engine_options = dict(shared_conf)
engine_conf.update(shared_conf)
workers = [] workers = []
worker_topics = [] worker_topics = []
@@ -135,8 +131,8 @@ if __name__ == "__main__":
# Now use those workers to do something. # Now use those workers to do something.
print('Executing some work.') print('Executing some work.')
engine_conf['topics'] = worker_topics engine_options['topics'] = worker_topics
result = run(engine_conf) result = run(engine_options)
print('Execution finished.') print('Execution finished.')
# This is done so that the test examples can work correctly # This is done so that the test examples can work correctly
# even when the keys change order (which will happen in various # even when the keys change order (which will happen in various

View File

@@ -93,7 +93,7 @@ def run(**store):
try: try:
with utils.wrap_all_failures(): with utils.wrap_all_failures():
taskflow.engines.run(flow, store=store, taskflow.engines.run(flow, store=store,
engine_conf='parallel') engine='parallel')
except exceptions.WrappedFailure as ex: except exceptions.WrappedFailure as ex:
unknown_failures = [] unknown_failures = []
for failure in ex: for failure in ex:

View File

@@ -63,11 +63,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
board = impl_zookeeper.ZookeeperJobBoard(name, {}, board = impl_zookeeper.ZookeeperJobBoard(name, {},
client=client, client=client,
persistence=persistence) persistence=persistence)
engine_conf = { conductor = stc.SingleThreadedConductor(name, board, persistence,
'engine': 'default', wait_timeout=wait_timeout)
}
conductor = stc.SingleThreadedConductor(name, board, engine_conf,
persistence, wait_timeout)
return misc.AttrDict(board=board, return misc.AttrDict(board=board,
client=client, client=client,
persistence=persistence, persistence=persistence,

View File

@@ -155,15 +155,14 @@ class SingleThreadedEngineTest(ArgumentsPassingTest,
def _make_engine(self, flow, flow_detail=None): def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow, return taskflow.engines.load(flow,
flow_detail=flow_detail, flow_detail=flow_detail,
engine_conf='serial', engine='serial',
backend=self.backend) backend=self.backend)
class MultiThreadedEngineTest(ArgumentsPassingTest, class MultiThreadedEngineTest(ArgumentsPassingTest,
test.TestCase): test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None): def _make_engine(self, flow, flow_detail=None, executor=None):
engine_conf = dict(engine='parallel')
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf, engine='parallel',
backend=self.backend, backend=self.backend,
executor=executor) executor=executor)

View File

@@ -24,17 +24,30 @@ from taskflow.utils import persistence_utils as p_utils
class EngineLoadingTestCase(test.TestCase): class EngineLoadingTestCase(test.TestCase):
def test_default_load(self): def _make_dummy_flow(self):
f = linear_flow.Flow('test') f = linear_flow.Flow('test')
f.add(test_utils.TaskOneReturn("run-1")) f.add(test_utils.TaskOneReturn("run-1"))
return f
def test_default_load(self):
f = self._make_dummy_flow()
e = taskflow.engines.load(f) e = taskflow.engines.load(f)
self.assertIsNotNone(e) self.assertIsNotNone(e)
def test_unknown_load(self): def test_unknown_load(self):
f = linear_flow.Flow('test') f = self._make_dummy_flow()
f.add(test_utils.TaskOneReturn("run-1"))
self.assertRaises(exc.NotFound, taskflow.engines.load, f, self.assertRaises(exc.NotFound, taskflow.engines.load, f,
engine_conf='not_really_any_engine') engine='not_really_any_engine')
def test_options_empty(self):
f = self._make_dummy_flow()
e = taskflow.engines.load(f)
self.assertEqual({}, e.options)
def test_options_passthrough(self):
f = self._make_dummy_flow()
e = taskflow.engines.load(f, pass_1=1, pass_2=2)
self.assertEqual({'pass_1': 1, 'pass_2': 2}, e.options)
class FlowFromDetailTestCase(test.TestCase): class FlowFromDetailTestCase(test.TestCase):

View File

@@ -550,7 +550,7 @@ class SingleThreadedEngineTest(EngineTaskTest,
def _make_engine(self, flow, flow_detail=None): def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow, return taskflow.engines.load(flow,
flow_detail=flow_detail, flow_detail=flow_detail,
engine_conf='serial', engine='serial',
backend=self.backend) backend=self.backend)
def test_correct_load(self): def test_correct_load(self):
@@ -570,16 +570,14 @@ class MultiThreadedEngineTest(EngineTaskTest,
EngineCheckingTaskTest, EngineCheckingTaskTest,
test.TestCase): test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None): def _make_engine(self, flow, flow_detail=None, executor=None):
engine_conf = dict(engine='parallel')
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf,
backend=self.backend, backend=self.backend,
executor=executor) executor=executor,
engine='parallel')
def test_correct_load(self): def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns) engine = self._make_engine(utils.TaskNoRequiresNoReturns)
self.assertIsInstance(engine, eng.ParallelActionEngine) self.assertIsInstance(engine, eng.ParallelActionEngine)
self.assertIs(engine._executor, None)
def test_using_common_executor(self): def test_using_common_executor(self):
flow = utils.TaskNoRequiresNoReturns(name='task1') flow = utils.TaskNoRequiresNoReturns(name='task1')
@@ -587,7 +585,7 @@ class MultiThreadedEngineTest(EngineTaskTest,
try: try:
e1 = self._make_engine(flow, executor=executor) e1 = self._make_engine(flow, executor=executor)
e2 = self._make_engine(flow, executor=executor) e2 = self._make_engine(flow, executor=executor)
self.assertIs(e1._executor, e2._executor) self.assertIs(e1.options['executor'], e2.options['executor'])
finally: finally:
executor.shutdown(wait=True) executor.shutdown(wait=True)
@@ -604,11 +602,9 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
def _make_engine(self, flow, flow_detail=None, executor=None): def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None: if executor is None:
executor = eu.GreenExecutor() executor = eu.GreenExecutor()
engine_conf = dict(engine='parallel',
executor=executor)
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf, backend=self.backend, engine='parallel',
backend=self.backend) executor=executor)
class WorkerBasedEngineTest(EngineTaskTest, class WorkerBasedEngineTest(EngineTaskTest,
@@ -647,15 +643,12 @@ class WorkerBasedEngineTest(EngineTaskTest,
super(WorkerBasedEngineTest, self).tearDown() super(WorkerBasedEngineTest, self).tearDown()
def _make_engine(self, flow, flow_detail=None): def _make_engine(self, flow, flow_detail=None):
engine_conf = {
'engine': 'worker-based',
'exchange': self.exchange,
'topics': [self.topic],
'transport': self.transport,
}
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf, backend=self.backend,
backend=self.backend) engine='worker-based',
exchange=self.exchange,
topics=[self.topic],
transport=self.transport)
def test_correct_load(self): def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns) engine = self._make_engine(utils.TaskNoRequiresNoReturns)

View File

@@ -758,7 +758,7 @@ class SingleThreadedEngineTest(RetryTest,
def _make_engine(self, flow, flow_detail=None): def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow, return taskflow.engines.load(flow,
flow_detail=flow_detail, flow_detail=flow_detail,
engine_conf='serial', engine='serial',
backend=self.backend) backend=self.backend)
@@ -766,8 +766,7 @@ class MultiThreadedEngineTest(RetryTest,
RetryParallelExecutionTest, RetryParallelExecutionTest,
test.TestCase): test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None): def _make_engine(self, flow, flow_detail=None, executor=None):
engine_conf = dict(engine='parallel')
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf, engine='parallel',
backend=self.backend, backend=self.backend,
executor=executor) executor=executor)

View File

@@ -168,16 +168,15 @@ class SingleThreadedEngineTest(SuspendFlowTest,
def _make_engine(self, flow, flow_detail=None): def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow, return taskflow.engines.load(flow,
flow_detail=flow_detail, flow_detail=flow_detail,
engine_conf='serial', engine='serial',
backend=self.backend) backend=self.backend)
class MultiThreadedEngineTest(SuspendFlowTest, class MultiThreadedEngineTest(SuspendFlowTest,
test.TestCase): test.TestCase):
def _make_engine(self, flow, flow_detail=None, executor=None): def _make_engine(self, flow, flow_detail=None, executor=None):
engine_conf = dict(engine='parallel')
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf, engine='parallel',
backend=self.backend, backend=self.backend,
executor=executor) executor=executor)
@@ -189,8 +188,7 @@ class ParallelEngineWithEventletTest(SuspendFlowTest,
def _make_engine(self, flow, flow_detail=None, executor=None): def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None: if executor is None:
executor = eu.GreenExecutor() executor = eu.GreenExecutor()
engine_conf = dict(engine='parallel')
return taskflow.engines.load(flow, flow_detail=flow_detail, return taskflow.engines.load(flow, flow_detail=flow_detail,
engine_conf=engine_conf, engine='parallel',
backend=self.backend, backend=self.backend,
executor=executor) executor=executor)