Helpers to save flow factory in metadata
This change adds new helpers that, taken together, allow to resume the flows having nothing but flow detail at hands. First one, load_from_factory, gets flow factory function as a parameter and saves its fully qualified name and arguments to flow metadata. Others can be used to re-create the flow using that metadata, and load it into engine. Change-Id: Ia3cd989b3b0388ec0a9f09fe527f768eec5cc904
This commit is contained in:
parent
c866318b3e
commit
961d91ff7a
@ -18,5 +18,8 @@
|
||||
|
||||
|
||||
# promote helpers to this module namespace
|
||||
from taskflow.engines.helpers import flow_from_detail # noqa
|
||||
from taskflow.engines.helpers import load # noqa
|
||||
from taskflow.engines.helpers import load_from_detail # noqa
|
||||
from taskflow.engines.helpers import load_from_factory # noqa
|
||||
from taskflow.engines.helpers import run # noqa
|
||||
|
@ -19,8 +19,10 @@
|
||||
import six
|
||||
import stevedore.driver
|
||||
|
||||
from taskflow.openstack.common import importutils
|
||||
from taskflow.persistence import backends as p_backends
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
from taskflow.utils import reflection
|
||||
|
||||
|
||||
# NOTE(imelnikov): this is the entrypoint namespace, not the module namespace.
|
||||
@ -98,7 +100,7 @@ def run(flow, store=None, engine_conf=None, backend=None):
|
||||
can be backend itself, or a dictionary that is passed to
|
||||
taskflow.persistence.backends.fetch to obtain backend.
|
||||
|
||||
:param flow: flow to load
|
||||
:param flow: flow to run
|
||||
:param store: dict -- data to put to storage to satisfy flow requirements
|
||||
:param engine_conf: engine type and configuration configuration
|
||||
:param backend: storage backend to use or configuration
|
||||
@ -107,3 +109,91 @@ def run(flow, store=None, engine_conf=None, backend=None):
|
||||
engine = load(flow, store=store, engine_conf=engine_conf, backend=backend)
|
||||
engine.run()
|
||||
return engine.storage.fetch_all()
|
||||
|
||||
|
||||
def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None,
|
||||
store=None, book=None, engine_conf=None, backend=None):
|
||||
"""Load flow from factory function into engine
|
||||
|
||||
Gets flow factory function (or name of it) and creates flow with
|
||||
it. Then, flow is loaded into engine with load(), and factory
|
||||
function fully qualified name is saved to flow metadata so that
|
||||
it can be later resumed with resume.
|
||||
|
||||
:param flow_factory: function or string: function that creates the flow
|
||||
:param factory_args: list or tuple of factory positional arguments
|
||||
:param factory_kwargs: dict of factory keyword arguments
|
||||
:param store: dict -- data to put to storage to satisfy flow requirements
|
||||
:param book: LogBook to create flow detail in
|
||||
:param engine_conf: engine type and configuration configuration
|
||||
:param backend: storage backend to use or configuration
|
||||
:returns: engine
|
||||
"""
|
||||
|
||||
if isinstance(flow_factory, six.string_types):
|
||||
factory_fun = importutils.import_class(flow_factory)
|
||||
factory_name = flow_factory
|
||||
else:
|
||||
factory_fun = flow_factory
|
||||
factory_name = reflection.get_callable_name(flow_factory)
|
||||
try:
|
||||
reimported = importutils.import_class(factory_name)
|
||||
assert reimported == factory_fun
|
||||
except (ImportError, AssertionError):
|
||||
raise ValueError('Flow factory %r is not reimportable by name %s'
|
||||
% (factory_fun, factory_name))
|
||||
|
||||
args = factory_args or []
|
||||
kwargs = factory_kwargs or {}
|
||||
flow = factory_fun(*args, **kwargs)
|
||||
factory_data = dict(name=factory_name, args=args, kwargs=kwargs)
|
||||
|
||||
if isinstance(backend, dict):
|
||||
backend = p_backends.fetch(backend)
|
||||
flow_detail = p_utils.create_flow_detail(flow, book=book, backend=backend,
|
||||
meta={'factory': factory_data})
|
||||
return load(flow=flow, flow_detail=flow_detail,
|
||||
store=store, book=book,
|
||||
engine_conf=engine_conf, backend=backend)
|
||||
|
||||
|
||||
def flow_from_detail(flow_detail):
|
||||
"""Recreate flow previously loaded with load_form_factory
|
||||
|
||||
Gets flow factory name from metadata, calls it to recreate the flow
|
||||
|
||||
:param flow_detail: FlowDetail that holds state of the flow to load
|
||||
"""
|
||||
try:
|
||||
factory_data = flow_detail.meta['factory']
|
||||
except (KeyError, AttributeError, TypeError):
|
||||
raise ValueError('Cannot reconstruct flow %s %s: '
|
||||
'no factory information saved.'
|
||||
% (flow_detail.name, flow_detail.uuid))
|
||||
|
||||
try:
|
||||
factory_fun = importutils.import_class(factory_data['name'])
|
||||
except (KeyError, ImportError):
|
||||
raise ImportError('Could not import factory for flow %s %s'
|
||||
% (flow_detail.name, flow_detail.uuid))
|
||||
|
||||
args = factory_data.get('args', ())
|
||||
kwargs = factory_data.get('kwargs', {})
|
||||
return factory_fun(*args, **kwargs)
|
||||
|
||||
|
||||
def load_from_detail(flow_detail, store=None, engine_conf=None, backend=None):
|
||||
"""Reload flow previously loaded with load_form_factory
|
||||
|
||||
Gets flow factory name from metadata, calls it to recreate the flow
|
||||
and loads flow into engine with load().
|
||||
|
||||
:param flow_detail: FlowDetail that holds state of the flow to load
|
||||
:param store: dict -- data to put to storage to satisfy flow requirements
|
||||
:param engine_conf: engine type and configuration configuration
|
||||
:param backend: storage backend to use or configuration
|
||||
:returns: engine
|
||||
"""
|
||||
flow = flow_from_detail(flow_detail)
|
||||
return load(flow, flow_detail=flow_detail,
|
||||
store=store, engine_conf=engine_conf, backend=backend)
|
||||
|
@ -34,7 +34,6 @@ import taskflow.engines
|
||||
|
||||
from taskflow import states
|
||||
|
||||
import my_flows # noqa
|
||||
import my_utils # noqa
|
||||
|
||||
|
||||
@ -43,8 +42,7 @@ FINISHED_STATES = (states.SUCCESS, states.FAILURE, states.REVERTED)
|
||||
|
||||
def resume(flowdetail, backend):
|
||||
print('Resuming flow %s %s' % (flowdetail.name, flowdetail.uuid))
|
||||
engine = taskflow.engines.load(my_flows.flow_factory(),
|
||||
flow_detail=flowdetail,
|
||||
engine = taskflow.engines.load_from_detail(flow_detail=flowdetail,
|
||||
backend=backend)
|
||||
engine.run()
|
||||
|
||||
|
@ -30,20 +30,14 @@ sys.path.insert(0, top_dir)
|
||||
sys.path.insert(0, self_dir)
|
||||
|
||||
import taskflow.engines
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
|
||||
import my_flows # noqa
|
||||
import my_utils # noqa
|
||||
|
||||
|
||||
backend = my_utils.get_backend()
|
||||
logbook = p_utils.temporary_log_book(backend)
|
||||
|
||||
flow = my_flows.flow_factory()
|
||||
|
||||
flowdetail = p_utils.create_flow_detail(flow, logbook, backend)
|
||||
engine = taskflow.engines.load(flow, flow_detail=flowdetail,
|
||||
engine = taskflow.engines.load_from_factory(my_flows.flow_factory,
|
||||
backend=backend)
|
||||
|
||||
print('Running flow %s %s' % (flowdetail.name, flowdetail.uuid))
|
||||
print('Running flow %s %s' % (engine.storage.flow_name,
|
||||
engine.storage.flow_uuid))
|
||||
engine.run()
|
||||
|
113
taskflow/tests/unit/test_engine_helpers.py
Normal file
113
taskflow/tests/unit/test_engine_helpers.py
Normal file
@ -0,0 +1,113 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
|
||||
import taskflow.engines
|
||||
|
||||
|
||||
class FlowFromDetailTestCase(test.TestCase):
|
||||
def test_no_meta(self):
|
||||
_lb, flow_detail = p_utils.temporary_flow_detail()
|
||||
self.assertIs(flow_detail.meta, None)
|
||||
expected_msg = '^Cannot .* no factory information saved.$'
|
||||
with self.assertRaisesRegexp(ValueError, expected_msg):
|
||||
taskflow.engines.flow_from_detail(flow_detail)
|
||||
|
||||
def test_no_factory_in_meta(self):
|
||||
_lb, flow_detail = p_utils.temporary_flow_detail()
|
||||
flow_detail.meta = {}
|
||||
expected_msg = '^Cannot .* no factory information saved.$'
|
||||
with self.assertRaisesRegexp(ValueError, expected_msg):
|
||||
taskflow.engines.flow_from_detail(flow_detail)
|
||||
|
||||
def test_no_importable_function(self):
|
||||
_lb, flow_detail = p_utils.temporary_flow_detail()
|
||||
flow_detail.meta = dict(factory=dict(
|
||||
name='you can not import me, i contain spaces'
|
||||
))
|
||||
expected_msg = '^Could not import factory'
|
||||
with self.assertRaisesRegexp(ImportError, expected_msg):
|
||||
taskflow.engines.flow_from_detail(flow_detail)
|
||||
|
||||
def test_no_arg_factory(self):
|
||||
name = 'some.test.factory'
|
||||
_lb, flow_detail = p_utils.temporary_flow_detail()
|
||||
flow_detail.meta = dict(factory=dict(name=name))
|
||||
|
||||
with mock.patch('taskflow.openstack.common.importutils.import_class',
|
||||
return_value=lambda: 'RESULT') as mock_import:
|
||||
result = taskflow.engines.flow_from_detail(flow_detail)
|
||||
mock_import.assert_called_onec_with(name)
|
||||
self.assertEquals(result, 'RESULT')
|
||||
|
||||
def test_factory_with_arg(self):
|
||||
name = 'some.test.factory'
|
||||
_lb, flow_detail = p_utils.temporary_flow_detail()
|
||||
flow_detail.meta = dict(factory=dict(name=name, args=['foo']))
|
||||
|
||||
with mock.patch('taskflow.openstack.common.importutils.import_class',
|
||||
return_value=lambda x: 'RESULT %s' % x) as mock_import:
|
||||
result = taskflow.engines.flow_from_detail(flow_detail)
|
||||
mock_import.assert_called_onec_with(name)
|
||||
self.assertEquals(result, 'RESULT foo')
|
||||
|
||||
|
||||
def my_flow_factory(task_name):
|
||||
return test_utils.DummyTask(name=task_name)
|
||||
|
||||
|
||||
class LoadFromFactoryTestCase(test.TestCase):
|
||||
|
||||
def test_non_reimportable(self):
|
||||
def factory():
|
||||
pass
|
||||
with self.assertRaisesRegexp(ValueError,
|
||||
'Flow factory .* is not reimportable'):
|
||||
taskflow.engines.load_from_factory(factory)
|
||||
|
||||
def test_it_works(self):
|
||||
engine = taskflow.engines.load_from_factory(
|
||||
my_flow_factory, factory_kwargs={'task_name': 'test1'})
|
||||
self.assertIsInstance(engine._flow, test_utils.DummyTask)
|
||||
|
||||
fd = engine.storage._flowdetail
|
||||
self.assertEquals(fd.name, 'test1')
|
||||
self.assertEquals(fd.meta.get('factory'), {
|
||||
'name': '%s.my_flow_factory' % __name__,
|
||||
'args': [],
|
||||
'kwargs': {'task_name': 'test1'},
|
||||
})
|
||||
|
||||
def test_it_works_by_name(self):
|
||||
factory_name = '%s.my_flow_factory' % __name__
|
||||
engine = taskflow.engines.load_from_factory(
|
||||
factory_name, factory_kwargs={'task_name': 'test1'})
|
||||
self.assertIsInstance(engine._flow, test_utils.DummyTask)
|
||||
|
||||
fd = engine.storage._flowdetail
|
||||
self.assertEquals(fd.name, 'test1')
|
||||
self.assertEquals(fd.meta.get('factory'), {
|
||||
'name': factory_name,
|
||||
'args': [],
|
||||
'kwargs': {'task_name': 'test1'},
|
||||
})
|
@ -58,7 +58,7 @@ def temporary_flow_detail(backend=None):
|
||||
return book, book.find(flow_id)
|
||||
|
||||
|
||||
def create_flow_detail(flow, book=None, backend=None):
|
||||
def create_flow_detail(flow, book=None, backend=None, meta=None):
|
||||
"""Creates a flow detail for the given flow and adds it to the provided
|
||||
logbook (if provided) and then uses the given backend (if provided) to
|
||||
save the logbook then returns the created flow detail.
|
||||
@ -73,7 +73,12 @@ def create_flow_detail(flow, book=None, backend=None):
|
||||
except AttributeError:
|
||||
LOG.warn("Flow %s does not have a uuid attribute, creating one.", flow)
|
||||
flow_id = uuidutils.generate_uuid()
|
||||
|
||||
flow_detail = logbook.FlowDetail(name=flow_name, uuid=flow_id)
|
||||
if meta is not None:
|
||||
if flow_detail.meta is None:
|
||||
flow_detail.meta = {}
|
||||
flow_detail.meta.update(meta)
|
||||
|
||||
if backend is not None and book is None:
|
||||
LOG.warn("No logbook provided for flow %s, creating one.", flow)
|
||||
|
Loading…
Reference in New Issue
Block a user