Expose only ensure_atom from storage
Move the storage ensuring logic from being split across the engine and the storage layer and expose only a single `ensure_atom` function that does the work instead. This also removes the access to the `ensure_task` and `ensure_retry` methods as the internals of the `ensure_atom` function is now the only location that needs to use these two functions. This reduces the need to do type specific atom checks in the non-storage components (which we want to reduce overall). Breaking change: removes the public methods named `ensure_task` and `ensure_retry` (which should not be used externally anyway) from the storage object and makes those internal/private methods instead. Change-Id: I3a0f1f0dd777a1633b4937e16b50030275c84d1d
This commit is contained in:
@@ -24,7 +24,6 @@ from taskflow.engines.action_engine import executor
|
||||
from taskflow.engines.action_engine import runtime
|
||||
from taskflow.engines import base
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import retry
|
||||
from taskflow import states
|
||||
from taskflow import storage as atom_storage
|
||||
from taskflow.utils import lock_utils
|
||||
@@ -175,11 +174,7 @@ class ActionEngine(base.EngineBase):
|
||||
# a resuming state (and then to suspended).
|
||||
self._change_state(states.RESUMING) # does nothing in PENDING state
|
||||
for node in self._compilation.execution_graph.nodes_iter():
|
||||
version = misc.get_version_string(node)
|
||||
if isinstance(node, retry.Retry):
|
||||
self.storage.ensure_retry(node.name, version, node.save_as)
|
||||
else:
|
||||
self.storage.ensure_task(node.name, version, node.save_as)
|
||||
self.storage.ensure_atom(node)
|
||||
if node.inject:
|
||||
self.storage.inject_atom_args(node.name, node.inject)
|
||||
self._change_state(states.SUSPENDED) # does nothing in PENDING state
|
||||
|
||||
@@ -23,7 +23,9 @@ import six
|
||||
from taskflow import exceptions
|
||||
from taskflow.openstack.common import uuidutils
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow import retry
|
||||
from taskflow import states
|
||||
from taskflow import task
|
||||
from taskflow.utils import lock_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import reflection
|
||||
@@ -94,8 +96,25 @@ class Storage(object):
|
||||
with contextlib.closing(self._backend.get_connection()) as conn:
|
||||
functor(conn, *args, **kwargs)
|
||||
|
||||
def ensure_task(self, task_name, task_version=None, result_mapping=None):
|
||||
"""Ensure that there is taskdetail that corresponds the task.
|
||||
def ensure_atom(self, atom):
|
||||
"""Ensure that there is an atomdetail in storage for the given atom.
|
||||
|
||||
Returns uuid for the atomdetail that is/was created.
|
||||
"""
|
||||
if isinstance(atom, task.BaseTask):
|
||||
return self._ensure_task(atom.name,
|
||||
misc.get_version_string(atom),
|
||||
atom.save_as)
|
||||
elif isinstance(atom, retry.Retry):
|
||||
return self._ensure_retry(atom.name,
|
||||
misc.get_version_string(atom),
|
||||
atom.save_as)
|
||||
else:
|
||||
raise TypeError("Object of type 'atom' expected."
|
||||
" Got %s, %r." % (type(atom), atom))
|
||||
|
||||
def _ensure_task(self, task_name, task_version, result_mapping):
|
||||
"""Ensures there is a taskdetail that corresponds to the task info.
|
||||
|
||||
If task does not exist, adds a record for it. Added task will have
|
||||
PENDING state. Sets result mapping for the task from result_mapping
|
||||
@@ -122,9 +141,8 @@ class Storage(object):
|
||||
self._set_result_mapping(task_name, result_mapping)
|
||||
return task_id
|
||||
|
||||
def ensure_retry(self, retry_name, retry_version=None,
|
||||
result_mapping=None):
|
||||
"""Ensure that there is atom detail that corresponds the retry.
|
||||
def _ensure_retry(self, retry_name, retry_version, result_mapping):
|
||||
"""Ensures there is a retrydetail that corresponds to the retry info.
|
||||
|
||||
If retry does not exist, adds a record for it. Added retry
|
||||
will have PENDING state. Sets result mapping for the retry from
|
||||
|
||||
@@ -38,7 +38,7 @@ class _RunnerTestMixin(object):
|
||||
store = storage.SingleThreadedStorage(flow_detail)
|
||||
# This ensures the tasks exist in storage...
|
||||
for task in compilation.execution_graph:
|
||||
store.ensure_task(task.name)
|
||||
store.ensure_atom(task)
|
||||
if initial_state:
|
||||
store.set_flow_state(initial_state)
|
||||
task_notifier = misc.Notifier()
|
||||
|
||||
@@ -24,7 +24,7 @@ from taskflow.persistence import logbook
|
||||
from taskflow import states
|
||||
from taskflow import storage
|
||||
from taskflow import test
|
||||
from taskflow.test import mock
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
|
||||
@@ -59,7 +59,7 @@ class StorageTestMixin(object):
|
||||
def test_non_saving_storage(self):
|
||||
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
|
||||
s = storage.SingleThreadedStorage(flow_detail=flow_detail)
|
||||
s.ensure_task('my_task')
|
||||
s.ensure_atom(test_utils.NoopTask('my_task'))
|
||||
self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my_task')))
|
||||
|
||||
def test_flow_name_and_uuid(self):
|
||||
@@ -70,14 +70,14 @@ class StorageTestMixin(object):
|
||||
|
||||
def test_ensure_task(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
self.assertEqual(s.get_atom_state('my task'), states.PENDING)
|
||||
self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my task')))
|
||||
|
||||
def test_get_tasks_states(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_task('my task2')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.ensure_atom(test_utils.NoopTask('my task2'))
|
||||
s.save('my task', 'foo')
|
||||
expected = {
|
||||
'my task': (states.SUCCESS, states.EXECUTE),
|
||||
@@ -88,7 +88,9 @@ class StorageTestMixin(object):
|
||||
def test_ensure_task_flow_detail(self):
|
||||
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
|
||||
s = self._get_storage(flow_detail)
|
||||
s.ensure_task('my task', '3.11')
|
||||
t = test_utils.NoopTask('my task')
|
||||
t.version = (3, 11)
|
||||
s.ensure_atom(t)
|
||||
td = flow_detail.find(s.get_atom_uuid('my task'))
|
||||
self.assertIsNotNone(td)
|
||||
self.assertEqual(td.name, 'my task')
|
||||
@@ -107,12 +109,12 @@ class StorageTestMixin(object):
|
||||
td = logbook.TaskDetail(name='my_task', uuid='42')
|
||||
flow_detail.add(td)
|
||||
s = self._get_storage(flow_detail)
|
||||
s.ensure_task('my_task')
|
||||
s.ensure_atom(test_utils.NoopTask('my_task'))
|
||||
self.assertEqual('42', s.get_atom_uuid('my_task'))
|
||||
|
||||
def test_save_and_get(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.save('my task', 5)
|
||||
self.assertEqual(s.get('my task'), 5)
|
||||
self.assertEqual(s.fetch_all(), {})
|
||||
@@ -120,7 +122,7 @@ class StorageTestMixin(object):
|
||||
|
||||
def test_save_and_get_other_state(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.save('my task', 5, states.FAILURE)
|
||||
self.assertEqual(s.get('my task'), 5)
|
||||
self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
|
||||
@@ -128,7 +130,7 @@ class StorageTestMixin(object):
|
||||
def test_save_and_get_cached_failure(self):
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.save('my task', failure, states.FAILURE)
|
||||
self.assertEqual(s.get('my task'), failure)
|
||||
self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
|
||||
@@ -138,7 +140,7 @@ class StorageTestMixin(object):
|
||||
def test_save_and_get_non_cached_failure(self):
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.save('my task', failure, states.FAILURE)
|
||||
self.assertEqual(s.get('my task'), failure)
|
||||
s._failures['my task'] = None
|
||||
@@ -148,7 +150,7 @@ class StorageTestMixin(object):
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.save('my task', failure, states.FAILURE)
|
||||
|
||||
s.set_atom_state('my task', states.REVERTING)
|
||||
@@ -160,7 +162,7 @@ class StorageTestMixin(object):
|
||||
def test_get_failure_after_reload(self):
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.save('my task', failure, states.FAILURE)
|
||||
s2 = self._get_storage(s._flowdetail)
|
||||
self.assertTrue(s2.has_failures())
|
||||
@@ -170,12 +172,12 @@ class StorageTestMixin(object):
|
||||
|
||||
def test_get_non_existing_var(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
self.assertRaises(exceptions.NotFound, s.get, 'my task')
|
||||
|
||||
def test_reset(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.save('my task', 5)
|
||||
s.reset('my task')
|
||||
self.assertEqual(s.get_atom_state('my task'), states.PENDING)
|
||||
@@ -183,13 +185,13 @@ class StorageTestMixin(object):
|
||||
|
||||
def test_reset_unknown_task(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
self.assertEqual(s.reset('my task'), None)
|
||||
|
||||
def test_fetch_by_name(self):
|
||||
s = self._get_storage()
|
||||
name = 'my result'
|
||||
s.ensure_task('my task', '1.0', {name: None})
|
||||
s.ensure_atom(test_utils.NoopTask('my task', provides=name))
|
||||
s.save('my task', 5)
|
||||
self.assertEqual(s.fetch(name), 5)
|
||||
self.assertEqual(s.fetch_all(), {name: 5})
|
||||
@@ -202,7 +204,7 @@ class StorageTestMixin(object):
|
||||
|
||||
def test_task_metadata_update_with_none(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.update_atom_metadata('my task', None)
|
||||
self.assertEqual(s.get_task_progress('my task'), 0.0)
|
||||
s.set_task_progress('my task', 0.5)
|
||||
@@ -212,13 +214,13 @@ class StorageTestMixin(object):
|
||||
|
||||
def test_default_task_progress(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
self.assertEqual(s.get_task_progress('my task'), 0.0)
|
||||
self.assertEqual(s.get_task_progress_details('my task'), None)
|
||||
|
||||
def test_task_progress(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
|
||||
s.set_task_progress('my task', 0.5, {'test_data': 11})
|
||||
self.assertEqual(s.get_task_progress('my task'), 0.5)
|
||||
@@ -243,7 +245,7 @@ class StorageTestMixin(object):
|
||||
|
||||
def test_task_progress_erase(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
|
||||
s.set_task_progress('my task', 0.8, {})
|
||||
self.assertEqual(s.get_task_progress('my task'), 0.8)
|
||||
@@ -252,24 +254,22 @@ class StorageTestMixin(object):
|
||||
def test_fetch_result_not_ready(self):
|
||||
s = self._get_storage()
|
||||
name = 'my result'
|
||||
s.ensure_task('my task', result_mapping={name: None})
|
||||
s.ensure_atom(test_utils.NoopTask('my task', provides=name))
|
||||
self.assertRaises(exceptions.NotFound, s.get, name)
|
||||
self.assertEqual(s.fetch_all(), {})
|
||||
|
||||
def test_save_multiple_results(self):
|
||||
s = self._get_storage()
|
||||
result_mapping = {'foo': 0, 'bar': 1, 'whole': None}
|
||||
s.ensure_task('my task', result_mapping=result_mapping)
|
||||
s.ensure_atom(test_utils.NoopTask('my task', provides=['foo', 'bar']))
|
||||
s.save('my task', ('spam', 'eggs'))
|
||||
self.assertEqual(s.fetch_all(), {
|
||||
'foo': 'spam',
|
||||
'bar': 'eggs',
|
||||
'whole': ('spam', 'eggs')
|
||||
})
|
||||
|
||||
def test_mapping_none(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.save('my task', 5)
|
||||
self.assertEqual(s.fetch_all(), {})
|
||||
|
||||
@@ -313,7 +313,7 @@ class StorageTestMixin(object):
|
||||
s = self._get_storage(threaded=True)
|
||||
|
||||
def ensure_my_task():
|
||||
s.ensure_task('my_task', result_mapping={})
|
||||
s.ensure_atom(test_utils.NoopTask('my_task'))
|
||||
|
||||
threads = []
|
||||
for i in range(0, self.thread_count):
|
||||
@@ -356,7 +356,7 @@ class StorageTestMixin(object):
|
||||
def test_set_and_get_task_state(self):
|
||||
s = self._get_storage()
|
||||
state = states.PENDING
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.set_atom_state('my task', state)
|
||||
self.assertEqual(s.get_atom_state('my task'), state)
|
||||
|
||||
@@ -367,7 +367,7 @@ class StorageTestMixin(object):
|
||||
|
||||
def test_task_by_name(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my task')))
|
||||
|
||||
def test_transient_storage_fetch_all(self):
|
||||
@@ -422,69 +422,44 @@ class StorageTestMixin(object):
|
||||
s.set_flow_state(states.SUCCESS)
|
||||
self.assertEqual(s.get_flow_state(), states.SUCCESS)
|
||||
|
||||
@mock.patch.object(storage.LOG, 'warning')
|
||||
def test_result_is_checked(self, mocked_warning):
|
||||
def test_result_is_checked(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task', result_mapping={'result': 'key'})
|
||||
s.ensure_atom(test_utils.NoopTask('my task', provides=set(['result'])))
|
||||
s.save('my task', {})
|
||||
mocked_warning.assert_called_once_with(
|
||||
mock.ANY, 'my task', 'key', 'result')
|
||||
self.assertRaisesRegexp(exceptions.NotFound,
|
||||
'^Unable to find result', s.fetch, 'result')
|
||||
|
||||
@mock.patch.object(storage.LOG, 'warning')
|
||||
def test_empty_result_is_checked(self, mocked_warning):
|
||||
def test_empty_result_is_checked(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task', result_mapping={'a': 0})
|
||||
s.ensure_atom(test_utils.NoopTask('my task', provides=['a']))
|
||||
s.save('my task', ())
|
||||
mocked_warning.assert_called_once_with(
|
||||
mock.ANY, 'my task', 0, 'a')
|
||||
self.assertRaisesRegexp(exceptions.NotFound,
|
||||
'^Unable to find result', s.fetch, 'a')
|
||||
|
||||
@mock.patch.object(storage.LOG, 'warning')
|
||||
def test_short_result_is_checked(self, mocked_warning):
|
||||
def test_short_result_is_checked(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task', result_mapping={'a': 0, 'b': 1})
|
||||
s.ensure_atom(test_utils.NoopTask('my task', provides=['a', 'b']))
|
||||
s.save('my task', ['result'])
|
||||
mocked_warning.assert_called_once_with(
|
||||
mock.ANY, 'my task', 1, 'b')
|
||||
self.assertEqual(s.fetch('a'), 'result')
|
||||
self.assertRaisesRegexp(exceptions.NotFound,
|
||||
'^Unable to find result', s.fetch, 'b')
|
||||
|
||||
@mock.patch.object(storage.LOG, 'warning')
|
||||
def test_multiple_providers_are_checked(self, mocked_warning):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task', result_mapping={'result': 'key'})
|
||||
self.assertEqual(mocked_warning.mock_calls, [])
|
||||
s.ensure_task('my other task', result_mapping={'result': 'key'})
|
||||
mocked_warning.assert_called_once_with(
|
||||
mock.ANY, 'result')
|
||||
|
||||
@mock.patch.object(storage.LOG, 'warning')
|
||||
def test_multiple_providers_with_inject_are_checked(self, mocked_warning):
|
||||
s = self._get_storage()
|
||||
s.inject({'result': 'DONE'})
|
||||
self.assertEqual(mocked_warning.mock_calls, [])
|
||||
s.ensure_task('my other task', result_mapping={'result': 'key'})
|
||||
mocked_warning.assert_called_once_with(mock.ANY, 'result')
|
||||
|
||||
def test_ensure_retry(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_retry('my retry')
|
||||
s.ensure_atom(test_utils.NoopRetry('my retry'))
|
||||
history = s.get_retry_history('my retry')
|
||||
self.assertEqual(history, [])
|
||||
|
||||
def test_ensure_retry_and_task_with_same_name(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my retry')
|
||||
s.ensure_atom(test_utils.NoopTask('my retry'))
|
||||
self.assertRaisesRegexp(exceptions.Duplicate,
|
||||
'^Atom detail', s.ensure_retry, 'my retry')
|
||||
'^Atom detail', s.ensure_atom,
|
||||
test_utils.NoopRetry('my retry'))
|
||||
|
||||
def test_save_retry_results(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_retry('my retry')
|
||||
s.ensure_atom(test_utils.NoopRetry('my retry'))
|
||||
s.save('my retry', 'a')
|
||||
s.save('my retry', 'b')
|
||||
history = s.get_retry_history('my retry')
|
||||
@@ -492,7 +467,7 @@ class StorageTestMixin(object):
|
||||
|
||||
def test_save_retry_results_with_mapping(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_retry('my retry', result_mapping={'x': 0})
|
||||
s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x']))
|
||||
s.save('my retry', 'a')
|
||||
s.save('my retry', 'b')
|
||||
history = s.get_retry_history('my retry')
|
||||
@@ -502,7 +477,7 @@ class StorageTestMixin(object):
|
||||
|
||||
def test_cleanup_retry_history(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_retry('my retry', result_mapping={'x': 0})
|
||||
s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x']))
|
||||
s.save('my retry', 'a')
|
||||
s.save('my retry', 'b')
|
||||
s.cleanup_retry_history('my retry', states.REVERTED)
|
||||
@@ -513,7 +488,7 @@ class StorageTestMixin(object):
|
||||
def test_cached_retry_failure(self):
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
s = self._get_storage()
|
||||
s.ensure_retry('my retry', result_mapping={'x': 0})
|
||||
s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x']))
|
||||
s.save('my retry', 'a')
|
||||
s.save('my retry', failure, states.FAILURE)
|
||||
history = s.get_retry_history('my retry')
|
||||
@@ -528,14 +503,14 @@ class StorageTestMixin(object):
|
||||
|
||||
def test_save_task_intention(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.set_atom_intention('my task', states.REVERT)
|
||||
intention = s.get_atom_intention('my task')
|
||||
self.assertEqual(intention, states.REVERT)
|
||||
|
||||
def test_save_retry_intention(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_retry('my retry')
|
||||
s.ensure_atom(test_utils.NoopTask('my retry'))
|
||||
s.set_atom_intention('my retry', states.RETRY)
|
||||
intention = s.get_atom_intention('my retry')
|
||||
self.assertEqual(intention, states.RETRY)
|
||||
|
||||
@@ -32,7 +32,7 @@ class TestWorker(test.MockTestCase):
|
||||
self.exchange = 'test-exchange'
|
||||
self.topic = 'test-topic'
|
||||
self.threads_count = 5
|
||||
self.endpoint_count = 21
|
||||
self.endpoint_count = 22
|
||||
|
||||
# patch classes
|
||||
self.executor_mock, self.executor_inst_mock = self.patchClass(
|
||||
|
||||
@@ -70,6 +70,16 @@ def zookeeper_available(min_version, timeout=3):
|
||||
kazoo_utils.finalize_client(client)
|
||||
|
||||
|
||||
class NoopRetry(retry.AlwaysRevert):
|
||||
pass
|
||||
|
||||
|
||||
class NoopTask(task.Task):
|
||||
|
||||
def execute(self):
|
||||
pass
|
||||
|
||||
|
||||
class DummyTask(task.Task):
|
||||
|
||||
def execute(self, context, *args, **kwargs):
|
||||
|
||||
Reference in New Issue
Block a user