diff --git a/taskflow/engines/__init__.py b/taskflow/engines/__init__.py index bdec4e4c..fa56cc7e 100644 --- a/taskflow/engines/__init__.py +++ b/taskflow/engines/__init__.py @@ -18,5 +18,8 @@ # promote helpers to this module namespace -from taskflow.engines.helpers import load # noqa -from taskflow.engines.helpers import run # noqa +from taskflow.engines.helpers import flow_from_detail # noqa +from taskflow.engines.helpers import load # noqa +from taskflow.engines.helpers import load_from_detail # noqa +from taskflow.engines.helpers import load_from_factory # noqa +from taskflow.engines.helpers import run # noqa diff --git a/taskflow/engines/helpers.py b/taskflow/engines/helpers.py index 71e469b4..95c2d9b0 100644 --- a/taskflow/engines/helpers.py +++ b/taskflow/engines/helpers.py @@ -19,8 +19,10 @@ import six import stevedore.driver +from taskflow.openstack.common import importutils from taskflow.persistence import backends as p_backends from taskflow.utils import persistence_utils as p_utils +from taskflow.utils import reflection # NOTE(imelnikov): this is the entrypoint namespace, not the module namespace. @@ -98,7 +100,7 @@ def run(flow, store=None, engine_conf=None, backend=None): can be backend itself, or a dictionary that is passed to taskflow.persistence.backends.fetch to obtain backend. - :param flow: flow to load + :param flow: flow to run :param store: dict -- data to put to storage to satisfy flow requirements :param engine_conf: engine type and configuration configuration :param backend: storage backend to use or configuration @@ -107,3 +109,91 @@ def run(flow, store=None, engine_conf=None, backend=None): engine = load(flow, store=store, engine_conf=engine_conf, backend=backend) engine.run() return engine.storage.fetch_all() + + +def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None, + store=None, book=None, engine_conf=None, backend=None): + """Load flow from factory function into engine + + Gets flow factory function (or name of it) and creates flow with + it. Then, flow is loaded into engine with load(), and factory + function fully qualified name is saved to flow metadata so that + it can be later resumed with resume. + + :param flow_factory: function or string: function that creates the flow + :param factory_args: list or tuple of factory positional arguments + :param factory_kwargs: dict of factory keyword arguments + :param store: dict -- data to put to storage to satisfy flow requirements + :param book: LogBook to create flow detail in + :param engine_conf: engine type and configuration configuration + :param backend: storage backend to use or configuration + :returns: engine + """ + + if isinstance(flow_factory, six.string_types): + factory_fun = importutils.import_class(flow_factory) + factory_name = flow_factory + else: + factory_fun = flow_factory + factory_name = reflection.get_callable_name(flow_factory) + try: + reimported = importutils.import_class(factory_name) + assert reimported == factory_fun + except (ImportError, AssertionError): + raise ValueError('Flow factory %r is not reimportable by name %s' + % (factory_fun, factory_name)) + + args = factory_args or [] + kwargs = factory_kwargs or {} + flow = factory_fun(*args, **kwargs) + factory_data = dict(name=factory_name, args=args, kwargs=kwargs) + + if isinstance(backend, dict): + backend = p_backends.fetch(backend) + flow_detail = p_utils.create_flow_detail(flow, book=book, backend=backend, + meta={'factory': factory_data}) + return load(flow=flow, flow_detail=flow_detail, + store=store, book=book, + engine_conf=engine_conf, backend=backend) + + +def flow_from_detail(flow_detail): + """Recreate flow previously loaded with load_form_factory + + Gets flow factory name from metadata, calls it to recreate the flow + + :param flow_detail: FlowDetail that holds state of the flow to load + """ + try: + factory_data = flow_detail.meta['factory'] + except (KeyError, AttributeError, TypeError): + raise ValueError('Cannot reconstruct flow %s %s: ' + 'no factory information saved.' + % (flow_detail.name, flow_detail.uuid)) + + try: + factory_fun = importutils.import_class(factory_data['name']) + except (KeyError, ImportError): + raise ImportError('Could not import factory for flow %s %s' + % (flow_detail.name, flow_detail.uuid)) + + args = factory_data.get('args', ()) + kwargs = factory_data.get('kwargs', {}) + return factory_fun(*args, **kwargs) + + +def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None): + """Reload flow previously loaded with load_form_factory + + Gets flow factory name from metadata, calls it to recreate the flow + and loads flow into engine with load(). + + :param flow_detail: FlowDetail that holds state of the flow to load + :param store: dict -- data to put to storage to satisfy flow requirements + :param engine_conf: engine type and configuration configuration + :param backend: storage backend to use or configuration + :returns: engine + """ + flow = flow_from_detail(flow_detail) + return load(flow, flow_detail=flow_detail, + store=store, engine_conf=engine_conf, backend=backend) diff --git a/taskflow/examples/resume_many_flows/resume_all.py b/taskflow/examples/resume_many_flows/resume_all.py index 82be3073..94e3b48a 100644 --- a/taskflow/examples/resume_many_flows/resume_all.py +++ b/taskflow/examples/resume_many_flows/resume_all.py @@ -34,7 +34,6 @@ import taskflow.engines from taskflow import states -import my_flows # noqa import my_utils # noqa @@ -43,9 +42,8 @@ FINISHED_STATES = (states.SUCCESS, states.FAILURE, states.REVERTED) def resume(flowdetail, backend): print('Resuming flow %s %s' % (flowdetail.name, flowdetail.uuid)) - engine = taskflow.engines.load(my_flows.flow_factory(), - flow_detail=flowdetail, - backend=backend) + engine = taskflow.engines.load_from_detail(flow_detail=flowdetail, + backend=backend) engine.run() diff --git a/taskflow/examples/resume_many_flows/run_flow.py b/taskflow/examples/resume_many_flows/run_flow.py index 523c9846..e3409e13 100644 --- a/taskflow/examples/resume_many_flows/run_flow.py +++ b/taskflow/examples/resume_many_flows/run_flow.py @@ -30,20 +30,14 @@ sys.path.insert(0, top_dir) sys.path.insert(0, self_dir) import taskflow.engines -from taskflow.utils import persistence_utils as p_utils import my_flows # noqa import my_utils # noqa backend = my_utils.get_backend() -logbook = p_utils.temporary_log_book(backend) - -flow = my_flows.flow_factory() - -flowdetail = p_utils.create_flow_detail(flow, logbook, backend) -engine = taskflow.engines.load(flow, flow_detail=flowdetail, - backend=backend) - -print('Running flow %s %s' % (flowdetail.name, flowdetail.uuid)) +engine = taskflow.engines.load_from_factory(my_flows.flow_factory, + backend=backend) +print('Running flow %s %s' % (engine.storage.flow_name, + engine.storage.flow_uuid)) engine.run() diff --git a/taskflow/tests/unit/test_engine_helpers.py b/taskflow/tests/unit/test_engine_helpers.py new file mode 100644 index 00000000..71b9310c --- /dev/null +++ b/taskflow/tests/unit/test_engine_helpers.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from taskflow import test +from taskflow.tests import utils as test_utils +from taskflow.utils import persistence_utils as p_utils + +import taskflow.engines + + +class FlowFromDetailTestCase(test.TestCase): + def test_no_meta(self): + _lb, flow_detail = p_utils.temporary_flow_detail() + self.assertIs(flow_detail.meta, None) + expected_msg = '^Cannot .* no factory information saved.$' + with self.assertRaisesRegexp(ValueError, expected_msg): + taskflow.engines.flow_from_detail(flow_detail) + + def test_no_factory_in_meta(self): + _lb, flow_detail = p_utils.temporary_flow_detail() + flow_detail.meta = {} + expected_msg = '^Cannot .* no factory information saved.$' + with self.assertRaisesRegexp(ValueError, expected_msg): + taskflow.engines.flow_from_detail(flow_detail) + + def test_no_importable_function(self): + _lb, flow_detail = p_utils.temporary_flow_detail() + flow_detail.meta = dict(factory=dict( + name='you can not import me, i contain spaces' + )) + expected_msg = '^Could not import factory' + with self.assertRaisesRegexp(ImportError, expected_msg): + taskflow.engines.flow_from_detail(flow_detail) + + def test_no_arg_factory(self): + name = 'some.test.factory' + _lb, flow_detail = p_utils.temporary_flow_detail() + flow_detail.meta = dict(factory=dict(name=name)) + + with mock.patch('taskflow.openstack.common.importutils.import_class', + return_value=lambda: 'RESULT') as mock_import: + result = taskflow.engines.flow_from_detail(flow_detail) + mock_import.assert_called_onec_with(name) + self.assertEquals(result, 'RESULT') + + def test_factory_with_arg(self): + name = 'some.test.factory' + _lb, flow_detail = p_utils.temporary_flow_detail() + flow_detail.meta = dict(factory=dict(name=name, args=['foo'])) + + with mock.patch('taskflow.openstack.common.importutils.import_class', + return_value=lambda x: 'RESULT %s' % x) as mock_import: + result = taskflow.engines.flow_from_detail(flow_detail) + mock_import.assert_called_onec_with(name) + self.assertEquals(result, 'RESULT foo') + + +def my_flow_factory(task_name): + return test_utils.DummyTask(name=task_name) + + +class LoadFromFactoryTestCase(test.TestCase): + + def test_non_reimportable(self): + def factory(): + pass + with self.assertRaisesRegexp(ValueError, + 'Flow factory .* is not reimportable'): + taskflow.engines.load_from_factory(factory) + + def test_it_works(self): + engine = taskflow.engines.load_from_factory( + my_flow_factory, factory_kwargs={'task_name': 'test1'}) + self.assertIsInstance(engine._flow, test_utils.DummyTask) + + fd = engine.storage._flowdetail + self.assertEquals(fd.name, 'test1') + self.assertEquals(fd.meta.get('factory'), { + 'name': '%s.my_flow_factory' % __name__, + 'args': [], + 'kwargs': {'task_name': 'test1'}, + }) + + def test_it_works_by_name(self): + factory_name = '%s.my_flow_factory' % __name__ + engine = taskflow.engines.load_from_factory( + factory_name, factory_kwargs={'task_name': 'test1'}) + self.assertIsInstance(engine._flow, test_utils.DummyTask) + + fd = engine.storage._flowdetail + self.assertEquals(fd.name, 'test1') + self.assertEquals(fd.meta.get('factory'), { + 'name': factory_name, + 'args': [], + 'kwargs': {'task_name': 'test1'}, + }) diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py index 9b5ef0d3..17133fe1 100644 --- a/taskflow/utils/persistence_utils.py +++ b/taskflow/utils/persistence_utils.py @@ -58,7 +58,7 @@ def temporary_flow_detail(backend=None): return book, book.find(flow_id) -def create_flow_detail(flow, book=None, backend=None): +def create_flow_detail(flow, book=None, backend=None, meta=None): """Creates a flow detail for the given flow and adds it to the provided logbook (if provided) and then uses the given backend (if provided) to save the logbook then returns the created flow detail. @@ -73,7 +73,12 @@ def create_flow_detail(flow, book=None, backend=None): except AttributeError: LOG.warn("Flow %s does not have a uuid attribute, creating one.", flow) flow_id = uuidutils.generate_uuid() + flow_detail = logbook.FlowDetail(name=flow_name, uuid=flow_id) + if meta is not None: + if flow_detail.meta is None: + flow_detail.meta = {} + flow_detail.meta.update(meta) if backend is not None and book is None: LOG.warn("No logbook provided for flow %s, creating one.", flow)