Merge "Retain the same api for all helpers"

This commit is contained in:
Jenkins
2014-02-04 19:55:22 +00:00
committed by Gerrit Code Review
2 changed files with 94 additions and 31 deletions

View File

@@ -23,3 +23,4 @@ from taskflow.engines.helpers import load # noqa
from taskflow.engines.helpers import load_from_detail # noqa
from taskflow.engines.helpers import load_from_factory # noqa
from taskflow.engines.helpers import run # noqa
from taskflow.engines.helpers import save_factory_details # noqa

View File

@@ -16,6 +16,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import six
import stevedore.driver
@@ -29,6 +31,22 @@ from taskflow.utils import reflection
ENGINES_NAMESPACE = 'taskflow.engines'
def _fetch_validate_factory(flow_factory):
if isinstance(flow_factory, six.string_types):
factory_fun = importutils.import_class(flow_factory)
factory_name = flow_factory
else:
factory_fun = flow_factory
factory_name = reflection.get_callable_name(flow_factory)
try:
reimported = importutils.import_class(factory_name)
assert reimported == factory_fun
except (ImportError, AssertionError):
raise ValueError('Flow factory %r is not reimportable by name %s'
% (factory_fun, factory_name))
return (factory_name, factory_fun)
def load(flow, store=None, flow_detail=None, book=None,
engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE):
"""Load flow into engine.
@@ -47,7 +65,8 @@ def load(flow, store=None, flow_detail=None, book=None,
:param flow: flow to load
:param store: dict -- data to put to storage to satisfy flow requirements
:param flow_detail: FlowDetail that holds state of the flow
:param flow_detail: FlowDetail that holds the state of the flow (if one is
not provided then one will be created for you in the provided backend)
:param book: LogBook to create flow detail in if flow_detail is None
:param engine_conf: engine type and configuration configuration
:param backend: storage backend to use or configuration
@@ -85,7 +104,8 @@ def load(flow, store=None, flow_detail=None, book=None,
return engine
def run(flow, store=None, engine_conf=None, backend=None):
def run(flow, store=None, flow_detail=None, book=None,
engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE):
"""Run the flow.
This function load the flow into engine (with 'load' function)
@@ -102,18 +122,65 @@ def run(flow, store=None, engine_conf=None, backend=None):
:param flow: flow to run
:param store: dict -- data to put to storage to satisfy flow requirements
:param flow_detail: FlowDetail that holds the state of the flow (if one is
not provided then one will be created for you in the provided backend)
:param book: LogBook to create flow detail in if flow_detail is None
:param engine_conf: engine type and configuration configuration
:param backend: storage backend to use or configuration
:param namespace: driver namespace for stevedore (default is fine
if you don't know what is it)
:returns: dictionary of all named task results (see Storage.fetch_all)
"""
engine = load(flow, store=store, engine_conf=engine_conf, backend=backend)
engine = load(flow, store=store, flow_detail=flow_detail, book=book,
engine_conf=engine_conf, backend=backend,
namespace=namespace)
engine.run()
return engine.storage.fetch_all()
def save_factory_details(flow_detail,
flow_factory, factory_args, factory_kwargs,
backend=None):
"""Saves the given factories reimportable name, args, kwargs into the
flow detail.
This function saves the factory name, arguments, and keyword arguments
into the given flow details object and if a backend is provided it will
also ensure that the backend saves the flow details after being updated.
:param flow_detail: FlowDetail that holds state of the flow to load
:param flow_factory: function or string: function that creates the flow
:param factory_args: list or tuple of factory positional arguments
:param factory_kwargs: dict of factory keyword arguments
:param backend: storage backend to use or configuration
"""
if not factory_args:
factory_args = []
if not factory_kwargs:
factory_kwargs = {}
factory_name, _factory_fun = _fetch_validate_factory(flow_factory)
factory_data = {
'factory': {
'name': factory_name,
'args': factory_args,
'kwargs': factory_kwargs,
},
}
if not flow_detail.meta:
flow_detail.meta = factory_data
else:
flow_detail.meta.update(factory_data)
if backend is not None:
if isinstance(backend, dict):
backend = p_backends.fetch(backend)
with contextlib.closing(backend.get_connection()) as conn:
conn.update_flow_details(flow_detail)
def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None,
store=None, book=None, engine_conf=None, backend=None):
"""Load flow from factory function into engine.
store=None, book=None, engine_conf=None, backend=None,
namespace=ENGINES_NAMESPACE):
"""Loads a flow from a factory function into an engine.
Gets flow factory function (or name of it) and creates flow with
it. Then, flow is loaded into engine with load(), and factory
@@ -127,34 +194,25 @@ def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None,
:param book: LogBook to create flow detail in
:param engine_conf: engine type and configuration configuration
:param backend: storage backend to use or configuration
:param namespace: driver namespace for stevedore (default is fine
if you don't know what is it)
:returns: engine
"""
if isinstance(flow_factory, six.string_types):
factory_fun = importutils.import_class(flow_factory)
factory_name = flow_factory
else:
factory_fun = flow_factory
factory_name = reflection.get_callable_name(flow_factory)
try:
reimported = importutils.import_class(factory_name)
assert reimported == factory_fun
except (ImportError, AssertionError):
raise ValueError('Flow factory %r is not reimportable by name %s'
% (factory_fun, factory_name))
args = factory_args or []
kwargs = factory_kwargs or {}
flow = factory_fun(*args, **kwargs)
factory_data = dict(name=factory_name, args=args, kwargs=kwargs)
_factory_name, factory_fun = _fetch_validate_factory(flow_factory)
if not factory_args:
factory_args = []
if not factory_kwargs:
factory_kwargs = {}
flow = factory_fun(*factory_args, **factory_kwargs)
if isinstance(backend, dict):
backend = p_backends.fetch(backend)
flow_detail = p_utils.create_flow_detail(flow, book=book, backend=backend,
meta={'factory': factory_data})
return load(flow=flow, flow_detail=flow_detail,
store=store, book=book,
engine_conf=engine_conf, backend=backend)
flow_detail = p_utils.create_flow_detail(flow, book=book, backend=backend)
save_factory_details(flow_detail,
flow_factory, factory_args, factory_kwargs,
backend=backend)
return load(flow=flow, store=store, flow_detail=flow_detail, book=book,
engine_conf=engine_conf, backend=backend, namespace=namespace)
def flow_from_detail(flow_detail):
@@ -182,8 +240,9 @@ def flow_from_detail(flow_detail):
return factory_fun(*args, **kwargs)
def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None):
"""Reload flow previously loaded with load_form_factory.
def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None,
namespace=ENGINES_NAMESPACE):
"""Reload flow previously loaded with load_form_factory function.
Gets flow factory name from metadata, calls it to recreate the flow
and loads flow into engine with load().
@@ -192,8 +251,11 @@ def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None):
:param store: dict -- data to put to storage to satisfy flow requirements
:param engine_conf: engine type and configuration configuration
:param backend: storage backend to use or configuration
:param namespace: driver namespace for stevedore (default is fine
if you don't know what is it)
:returns: engine
"""
flow = flow_from_detail(flow_detail)
return load(flow, flow_detail=flow_detail,
store=store, engine_conf=engine_conf, backend=backend)
store=store, engine_conf=engine_conf, backend=backend,
namespace=namespace)