From 901b1e6320ac5dc849b9d394b3ba5d3411fc2352 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 24 Jan 2014 18:46:44 -0800 Subject: [PATCH] Retain the same api for all helpers Functionality in one helper method seems to not exist in the other helper methods, even though they call into the same methods, this seems like a bad inconsistency to have so we should fix that and have a consistent api for all of these helper methods instead. Change-Id: I72aa66e5920a6093fc9df0c261c950a9cb225b76 --- taskflow/engines/__init__.py | 1 + taskflow/engines/helpers.py | 124 ++++++++++++++++++++++++++--------- 2 files changed, 94 insertions(+), 31 deletions(-) diff --git a/taskflow/engines/__init__.py b/taskflow/engines/__init__.py index fa56cc7e..5bc17e36 100644 --- a/taskflow/engines/__init__.py +++ b/taskflow/engines/__init__.py @@ -23,3 +23,4 @@ from taskflow.engines.helpers import load # noqa from taskflow.engines.helpers import load_from_detail # noqa from taskflow.engines.helpers import load_from_factory # noqa from taskflow.engines.helpers import run # noqa +from taskflow.engines.helpers import save_factory_details # noqa diff --git a/taskflow/engines/helpers.py b/taskflow/engines/helpers.py index 687d64b1..08d9795f 100644 --- a/taskflow/engines/helpers.py +++ b/taskflow/engines/helpers.py @@ -16,6 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib + import six import stevedore.driver @@ -29,6 +31,22 @@ from taskflow.utils import reflection ENGINES_NAMESPACE = 'taskflow.engines' +def _fetch_validate_factory(flow_factory): + if isinstance(flow_factory, six.string_types): + factory_fun = importutils.import_class(flow_factory) + factory_name = flow_factory + else: + factory_fun = flow_factory + factory_name = reflection.get_callable_name(flow_factory) + try: + reimported = importutils.import_class(factory_name) + assert reimported == factory_fun + except (ImportError, AssertionError): + raise ValueError('Flow factory %r is not reimportable by name %s' + % (factory_fun, factory_name)) + return (factory_name, factory_fun) + + def load(flow, store=None, flow_detail=None, book=None, engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE): """Load flow into engine. @@ -47,7 +65,8 @@ def load(flow, store=None, flow_detail=None, book=None, :param flow: flow to load :param store: dict -- data to put to storage to satisfy flow requirements - :param flow_detail: FlowDetail that holds state of the flow + :param flow_detail: FlowDetail that holds the state of the flow (if one is + not provided then one will be created for you in the provided backend) :param book: LogBook to create flow detail in if flow_detail is None :param engine_conf: engine type and configuration configuration :param backend: storage backend to use or configuration @@ -85,7 +104,8 @@ def load(flow, store=None, flow_detail=None, book=None, return engine -def run(flow, store=None, engine_conf=None, backend=None): +def run(flow, store=None, flow_detail=None, book=None, + engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE): """Run the flow. This function load the flow into engine (with 'load' function) @@ -102,18 +122,65 @@ def run(flow, store=None, engine_conf=None, backend=None): :param flow: flow to run :param store: dict -- data to put to storage to satisfy flow requirements + :param flow_detail: FlowDetail that holds the state of the flow (if one is + not provided then one will be created for you in the provided backend) + :param book: LogBook to create flow detail in if flow_detail is None :param engine_conf: engine type and configuration configuration :param backend: storage backend to use or configuration + :param namespace: driver namespace for stevedore (default is fine + if you don't know what is it) :returns: dictionary of all named task results (see Storage.fetch_all) """ - engine = load(flow, store=store, engine_conf=engine_conf, backend=backend) + engine = load(flow, store=store, flow_detail=flow_detail, book=book, + engine_conf=engine_conf, backend=backend, + namespace=namespace) engine.run() return engine.storage.fetch_all() +def save_factory_details(flow_detail, + flow_factory, factory_args, factory_kwargs, + backend=None): + """Saves the given factories reimportable name, args, kwargs into the + flow detail. + + This function saves the factory name, arguments, and keyword arguments + into the given flow details object and if a backend is provided it will + also ensure that the backend saves the flow details after being updated. + + :param flow_detail: FlowDetail that holds state of the flow to load + :param flow_factory: function or string: function that creates the flow + :param factory_args: list or tuple of factory positional arguments + :param factory_kwargs: dict of factory keyword arguments + :param backend: storage backend to use or configuration + """ + if not factory_args: + factory_args = [] + if not factory_kwargs: + factory_kwargs = {} + factory_name, _factory_fun = _fetch_validate_factory(flow_factory) + factory_data = { + 'factory': { + 'name': factory_name, + 'args': factory_args, + 'kwargs': factory_kwargs, + }, + } + if not flow_detail.meta: + flow_detail.meta = factory_data + else: + flow_detail.meta.update(factory_data) + if backend is not None: + if isinstance(backend, dict): + backend = p_backends.fetch(backend) + with contextlib.closing(backend.get_connection()) as conn: + conn.update_flow_details(flow_detail) + + def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None, - store=None, book=None, engine_conf=None, backend=None): - """Load flow from factory function into engine. + store=None, book=None, engine_conf=None, backend=None, + namespace=ENGINES_NAMESPACE): + """Loads a flow from a factory function into an engine. Gets flow factory function (or name of it) and creates flow with it. Then, flow is loaded into engine with load(), and factory @@ -127,34 +194,25 @@ def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None, :param book: LogBook to create flow detail in :param engine_conf: engine type and configuration configuration :param backend: storage backend to use or configuration + :param namespace: driver namespace for stevedore (default is fine + if you don't know what is it) :returns: engine """ - if isinstance(flow_factory, six.string_types): - factory_fun = importutils.import_class(flow_factory) - factory_name = flow_factory - else: - factory_fun = flow_factory - factory_name = reflection.get_callable_name(flow_factory) - try: - reimported = importutils.import_class(factory_name) - assert reimported == factory_fun - except (ImportError, AssertionError): - raise ValueError('Flow factory %r is not reimportable by name %s' - % (factory_fun, factory_name)) - - args = factory_args or [] - kwargs = factory_kwargs or {} - flow = factory_fun(*args, **kwargs) - factory_data = dict(name=factory_name, args=args, kwargs=kwargs) - + _factory_name, factory_fun = _fetch_validate_factory(flow_factory) + if not factory_args: + factory_args = [] + if not factory_kwargs: + factory_kwargs = {} + flow = factory_fun(*factory_args, **factory_kwargs) if isinstance(backend, dict): backend = p_backends.fetch(backend) - flow_detail = p_utils.create_flow_detail(flow, book=book, backend=backend, - meta={'factory': factory_data}) - return load(flow=flow, flow_detail=flow_detail, - store=store, book=book, - engine_conf=engine_conf, backend=backend) + flow_detail = p_utils.create_flow_detail(flow, book=book, backend=backend) + save_factory_details(flow_detail, + flow_factory, factory_args, factory_kwargs, + backend=backend) + return load(flow=flow, store=store, flow_detail=flow_detail, book=book, + engine_conf=engine_conf, backend=backend, namespace=namespace) def flow_from_detail(flow_detail): @@ -182,8 +240,9 @@ def flow_from_detail(flow_detail): return factory_fun(*args, **kwargs) -def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None): - """Reload flow previously loaded with load_form_factory. +def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None, + namespace=ENGINES_NAMESPACE): + """Reload flow previously loaded with load_form_factory function. Gets flow factory name from metadata, calls it to recreate the flow and loads flow into engine with load(). @@ -192,8 +251,11 @@ def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None): :param store: dict -- data to put to storage to satisfy flow requirements :param engine_conf: engine type and configuration configuration :param backend: storage backend to use or configuration + :param namespace: driver namespace for stevedore (default is fine + if you don't know what is it) :returns: engine """ flow = flow_from_detail(flow_detail) return load(flow, flow_detail=flow_detail, - store=store, engine_conf=engine_conf, backend=backend) + store=store, engine_conf=engine_conf, backend=backend, + namespace=namespace)