diff --git a/taskflow/engines/__init__.py b/taskflow/engines/__init__.py index fa56cc7e..5bc17e36 100644 --- a/taskflow/engines/__init__.py +++ b/taskflow/engines/__init__.py @@ -23,3 +23,4 @@ from taskflow.engines.helpers import load # noqa from taskflow.engines.helpers import load_from_detail # noqa from taskflow.engines.helpers import load_from_factory # noqa from taskflow.engines.helpers import run # noqa +from taskflow.engines.helpers import save_factory_details # noqa diff --git a/taskflow/engines/helpers.py b/taskflow/engines/helpers.py index 687d64b1..08d9795f 100644 --- a/taskflow/engines/helpers.py +++ b/taskflow/engines/helpers.py @@ -16,6 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib + import six import stevedore.driver @@ -29,6 +31,22 @@ from taskflow.utils import reflection ENGINES_NAMESPACE = 'taskflow.engines' +def _fetch_validate_factory(flow_factory): + if isinstance(flow_factory, six.string_types): + factory_fun = importutils.import_class(flow_factory) + factory_name = flow_factory + else: + factory_fun = flow_factory + factory_name = reflection.get_callable_name(flow_factory) + try: + reimported = importutils.import_class(factory_name) + assert reimported == factory_fun + except (ImportError, AssertionError): + raise ValueError('Flow factory %r is not reimportable by name %s' + % (factory_fun, factory_name)) + return (factory_name, factory_fun) + + def load(flow, store=None, flow_detail=None, book=None, engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE): """Load flow into engine. @@ -47,7 +65,8 @@ def load(flow, store=None, flow_detail=None, book=None, :param flow: flow to load :param store: dict -- data to put to storage to satisfy flow requirements - :param flow_detail: FlowDetail that holds state of the flow + :param flow_detail: FlowDetail that holds the state of the flow (if one is + not provided then one will be created for you in the provided backend) :param book: LogBook to create flow detail in if flow_detail is None :param engine_conf: engine type and configuration configuration :param backend: storage backend to use or configuration @@ -85,7 +104,8 @@ def load(flow, store=None, flow_detail=None, book=None, return engine -def run(flow, store=None, engine_conf=None, backend=None): +def run(flow, store=None, flow_detail=None, book=None, + engine_conf=None, backend=None, namespace=ENGINES_NAMESPACE): """Run the flow. This function load the flow into engine (with 'load' function) @@ -102,18 +122,65 @@ def run(flow, store=None, engine_conf=None, backend=None): :param flow: flow to run :param store: dict -- data to put to storage to satisfy flow requirements + :param flow_detail: FlowDetail that holds the state of the flow (if one is + not provided then one will be created for you in the provided backend) + :param book: LogBook to create flow detail in if flow_detail is None :param engine_conf: engine type and configuration configuration :param backend: storage backend to use or configuration + :param namespace: driver namespace for stevedore (default is fine + if you don't know what is it) :returns: dictionary of all named task results (see Storage.fetch_all) """ - engine = load(flow, store=store, engine_conf=engine_conf, backend=backend) + engine = load(flow, store=store, flow_detail=flow_detail, book=book, + engine_conf=engine_conf, backend=backend, + namespace=namespace) engine.run() return engine.storage.fetch_all() +def save_factory_details(flow_detail, + flow_factory, factory_args, factory_kwargs, + backend=None): + """Saves the given factories reimportable name, args, kwargs into the + flow detail. + + This function saves the factory name, arguments, and keyword arguments + into the given flow details object and if a backend is provided it will + also ensure that the backend saves the flow details after being updated. + + :param flow_detail: FlowDetail that holds state of the flow to load + :param flow_factory: function or string: function that creates the flow + :param factory_args: list or tuple of factory positional arguments + :param factory_kwargs: dict of factory keyword arguments + :param backend: storage backend to use or configuration + """ + if not factory_args: + factory_args = [] + if not factory_kwargs: + factory_kwargs = {} + factory_name, _factory_fun = _fetch_validate_factory(flow_factory) + factory_data = { + 'factory': { + 'name': factory_name, + 'args': factory_args, + 'kwargs': factory_kwargs, + }, + } + if not flow_detail.meta: + flow_detail.meta = factory_data + else: + flow_detail.meta.update(factory_data) + if backend is not None: + if isinstance(backend, dict): + backend = p_backends.fetch(backend) + with contextlib.closing(backend.get_connection()) as conn: + conn.update_flow_details(flow_detail) + + def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None, - store=None, book=None, engine_conf=None, backend=None): - """Load flow from factory function into engine. + store=None, book=None, engine_conf=None, backend=None, + namespace=ENGINES_NAMESPACE): + """Loads a flow from a factory function into an engine. Gets flow factory function (or name of it) and creates flow with it. Then, flow is loaded into engine with load(), and factory @@ -127,34 +194,25 @@ def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None, :param book: LogBook to create flow detail in :param engine_conf: engine type and configuration configuration :param backend: storage backend to use or configuration + :param namespace: driver namespace for stevedore (default is fine + if you don't know what is it) :returns: engine """ - if isinstance(flow_factory, six.string_types): - factory_fun = importutils.import_class(flow_factory) - factory_name = flow_factory - else: - factory_fun = flow_factory - factory_name = reflection.get_callable_name(flow_factory) - try: - reimported = importutils.import_class(factory_name) - assert reimported == factory_fun - except (ImportError, AssertionError): - raise ValueError('Flow factory %r is not reimportable by name %s' - % (factory_fun, factory_name)) - - args = factory_args or [] - kwargs = factory_kwargs or {} - flow = factory_fun(*args, **kwargs) - factory_data = dict(name=factory_name, args=args, kwargs=kwargs) - + _factory_name, factory_fun = _fetch_validate_factory(flow_factory) + if not factory_args: + factory_args = [] + if not factory_kwargs: + factory_kwargs = {} + flow = factory_fun(*factory_args, **factory_kwargs) if isinstance(backend, dict): backend = p_backends.fetch(backend) - flow_detail = p_utils.create_flow_detail(flow, book=book, backend=backend, - meta={'factory': factory_data}) - return load(flow=flow, flow_detail=flow_detail, - store=store, book=book, - engine_conf=engine_conf, backend=backend) + flow_detail = p_utils.create_flow_detail(flow, book=book, backend=backend) + save_factory_details(flow_detail, + flow_factory, factory_args, factory_kwargs, + backend=backend) + return load(flow=flow, store=store, flow_detail=flow_detail, book=book, + engine_conf=engine_conf, backend=backend, namespace=namespace) def flow_from_detail(flow_detail): @@ -182,8 +240,9 @@ def flow_from_detail(flow_detail): return factory_fun(*args, **kwargs) -def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None): - """Reload flow previously loaded with load_form_factory. +def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None, + namespace=ENGINES_NAMESPACE): + """Reload flow previously loaded with load_form_factory function. Gets flow factory name from metadata, calls it to recreate the flow and loads flow into engine with load(). @@ -192,8 +251,11 @@ def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None): :param store: dict -- data to put to storage to satisfy flow requirements :param engine_conf: engine type and configuration configuration :param backend: storage backend to use or configuration + :param namespace: driver namespace for stevedore (default is fine + if you don't know what is it) :returns: engine """ flow = flow_from_detail(flow_detail) return load(flow, flow_detail=flow_detail, - store=store, engine_conf=engine_conf, backend=backend) + store=store, engine_conf=engine_conf, backend=backend, + namespace=namespace)