Merge "Expose only ensure_atom from storage"

This commit is contained in:
Jenkins
2014-10-18 20:16:40 +00:00
committed by Gerrit Code Review
6 changed files with 81 additions and 66 deletions

View File

@@ -24,7 +24,6 @@ from taskflow.engines.action_engine import executor
from taskflow.engines.action_engine import runtime
from taskflow.engines import base
from taskflow import exceptions as exc
from taskflow import retry
from taskflow import states
from taskflow import storage as atom_storage
from taskflow.utils import lock_utils
@@ -175,11 +174,7 @@ class ActionEngine(base.EngineBase):
# a resuming state (and then to suspended).
self._change_state(states.RESUMING) # does nothing in PENDING state
for node in self._compilation.execution_graph.nodes_iter():
version = misc.get_version_string(node)
if isinstance(node, retry.Retry):
self.storage.ensure_retry(node.name, version, node.save_as)
else:
self.storage.ensure_task(node.name, version, node.save_as)
self.storage.ensure_atom(node)
if node.inject:
self.storage.inject_atom_args(node.name, node.inject)
self._change_state(states.SUSPENDED) # does nothing in PENDING state

View File

@@ -23,7 +23,9 @@ import six
from taskflow import exceptions
from taskflow.openstack.common import uuidutils
from taskflow.persistence import logbook
from taskflow import retry
from taskflow import states
from taskflow import task
from taskflow.utils import lock_utils
from taskflow.utils import misc
from taskflow.utils import reflection
@@ -166,8 +168,25 @@ class Storage(object):
with contextlib.closing(self._backend.get_connection()) as conn:
functor(conn, *args, **kwargs)
def ensure_task(self, task_name, task_version=None, result_mapping=None):
"""Ensure that there is taskdetail that corresponds the task.
def ensure_atom(self, atom):
"""Ensure that there is an atomdetail in storage for the given atom.
Returns uuid for the atomdetail that is/was created.
"""
if isinstance(atom, task.BaseTask):
return self._ensure_task(atom.name,
misc.get_version_string(atom),
atom.save_as)
elif isinstance(atom, retry.Retry):
return self._ensure_retry(atom.name,
misc.get_version_string(atom),
atom.save_as)
else:
raise TypeError("Object of type 'atom' expected."
" Got %s, %r." % (type(atom), atom))
def _ensure_task(self, task_name, task_version, result_mapping):
"""Ensures there is a taskdetail that corresponds to the task info.
If task does not exist, adds a record for it. Added task will have
PENDING state. Sets result mapping for the task from result_mapping
@@ -194,9 +213,8 @@ class Storage(object):
self._set_result_mapping(task_name, result_mapping)
return task_id
def ensure_retry(self, retry_name, retry_version=None,
result_mapping=None):
"""Ensure that there is atom detail that corresponds the retry.
def _ensure_retry(self, retry_name, retry_version, result_mapping):
"""Ensures there is a retrydetail that corresponds to the retry info.
If retry does not exist, adds a record for it. Added retry
will have PENDING state. Sets result mapping for the retry from

View File

@@ -38,7 +38,7 @@ class _RunnerTestMixin(object):
store = storage.SingleThreadedStorage(flow_detail)
# This ensures the tasks exist in storage...
for task in compilation.execution_graph:
store.ensure_task(task.name)
store.ensure_atom(task)
if initial_state:
store.set_flow_state(initial_state)
task_notifier = misc.Notifier()

View File

@@ -24,7 +24,7 @@ from taskflow.persistence import logbook
from taskflow import states
from taskflow import storage
from taskflow import test
from taskflow.test import mock
from taskflow.tests import utils as test_utils
from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils
@@ -59,7 +59,7 @@ class StorageTestMixin(object):
def test_non_saving_storage(self):
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
s = storage.SingleThreadedStorage(flow_detail=flow_detail)
s.ensure_task('my_task')
s.ensure_atom(test_utils.NoopTask('my_task'))
self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my_task')))
def test_flow_name_and_uuid(self):
@@ -70,14 +70,14 @@ class StorageTestMixin(object):
def test_ensure_task(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
self.assertEqual(s.get_atom_state('my task'), states.PENDING)
self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my task')))
def test_get_tasks_states(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_task('my task2')
s.ensure_atom(test_utils.NoopTask('my task'))
s.ensure_atom(test_utils.NoopTask('my task2'))
s.save('my task', 'foo')
expected = {
'my task': (states.SUCCESS, states.EXECUTE),
@@ -88,7 +88,9 @@ class StorageTestMixin(object):
def test_ensure_task_flow_detail(self):
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
s = self._get_storage(flow_detail)
s.ensure_task('my task', '3.11')
t = test_utils.NoopTask('my task')
t.version = (3, 11)
s.ensure_atom(t)
td = flow_detail.find(s.get_atom_uuid('my task'))
self.assertIsNotNone(td)
self.assertEqual(td.name, 'my task')
@@ -107,12 +109,12 @@ class StorageTestMixin(object):
td = logbook.TaskDetail(name='my_task', uuid='42')
flow_detail.add(td)
s = self._get_storage(flow_detail)
s.ensure_task('my_task')
s.ensure_atom(test_utils.NoopTask('my_task'))
self.assertEqual('42', s.get_atom_uuid('my_task'))
def test_save_and_get(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', 5)
self.assertEqual(s.get('my task'), 5)
self.assertEqual(s.fetch_all(), {})
@@ -120,7 +122,7 @@ class StorageTestMixin(object):
def test_save_and_get_other_state(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', 5, states.FAILURE)
self.assertEqual(s.get('my task'), 5)
self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
@@ -128,7 +130,7 @@ class StorageTestMixin(object):
def test_save_and_get_cached_failure(self):
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', failure, states.FAILURE)
self.assertEqual(s.get('my task'), failure)
self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
@@ -138,7 +140,7 @@ class StorageTestMixin(object):
def test_save_and_get_non_cached_failure(self):
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', failure, states.FAILURE)
self.assertEqual(s.get('my task'), failure)
s._failures['my task'] = None
@@ -148,7 +150,7 @@ class StorageTestMixin(object):
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', failure, states.FAILURE)
s.set_atom_state('my task', states.REVERTING)
@@ -160,7 +162,7 @@ class StorageTestMixin(object):
def test_get_failure_after_reload(self):
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', failure, states.FAILURE)
s2 = self._get_storage(s._flowdetail)
self.assertTrue(s2.has_failures())
@@ -170,12 +172,12 @@ class StorageTestMixin(object):
def test_get_non_existing_var(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
self.assertRaises(exceptions.NotFound, s.get, 'my task')
def test_reset(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', 5)
s.reset('my task')
self.assertEqual(s.get_atom_state('my task'), states.PENDING)
@@ -183,13 +185,13 @@ class StorageTestMixin(object):
def test_reset_unknown_task(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
self.assertEqual(s.reset('my task'), None)
def test_fetch_by_name(self):
s = self._get_storage()
name = 'my result'
s.ensure_task('my task', '1.0', {name: None})
s.ensure_atom(test_utils.NoopTask('my task', provides=name))
s.save('my task', 5)
self.assertEqual(s.fetch(name), 5)
self.assertEqual(s.fetch_all(), {name: 5})
@@ -202,7 +204,7 @@ class StorageTestMixin(object):
def test_task_metadata_update_with_none(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.update_atom_metadata('my task', None)
self.assertEqual(s.get_task_progress('my task'), 0.0)
s.set_task_progress('my task', 0.5)
@@ -212,13 +214,13 @@ class StorageTestMixin(object):
def test_default_task_progress(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
self.assertEqual(s.get_task_progress('my task'), 0.0)
self.assertEqual(s.get_task_progress_details('my task'), None)
def test_task_progress(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.set_task_progress('my task', 0.5, {'test_data': 11})
self.assertEqual(s.get_task_progress('my task'), 0.5)
@@ -243,7 +245,7 @@ class StorageTestMixin(object):
def test_task_progress_erase(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.set_task_progress('my task', 0.8, {})
self.assertEqual(s.get_task_progress('my task'), 0.8)
@@ -252,24 +254,22 @@ class StorageTestMixin(object):
def test_fetch_result_not_ready(self):
s = self._get_storage()
name = 'my result'
s.ensure_task('my task', result_mapping={name: None})
s.ensure_atom(test_utils.NoopTask('my task', provides=name))
self.assertRaises(exceptions.NotFound, s.get, name)
self.assertEqual(s.fetch_all(), {})
def test_save_multiple_results(self):
s = self._get_storage()
result_mapping = {'foo': 0, 'bar': 1, 'whole': None}
s.ensure_task('my task', result_mapping=result_mapping)
s.ensure_atom(test_utils.NoopTask('my task', provides=['foo', 'bar']))
s.save('my task', ('spam', 'eggs'))
self.assertEqual(s.fetch_all(), {
'foo': 'spam',
'bar': 'eggs',
'whole': ('spam', 'eggs')
})
def test_mapping_none(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', 5)
self.assertEqual(s.fetch_all(), {})
@@ -313,7 +313,7 @@ class StorageTestMixin(object):
s = self._get_storage(threaded=True)
def ensure_my_task():
s.ensure_task('my_task', result_mapping={})
s.ensure_atom(test_utils.NoopTask('my_task'))
threads = []
for i in range(0, self.thread_count):
@@ -356,7 +356,7 @@ class StorageTestMixin(object):
def test_set_and_get_task_state(self):
s = self._get_storage()
state = states.PENDING
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.set_atom_state('my task', state)
self.assertEqual(s.get_atom_state('my task'), state)
@@ -367,7 +367,7 @@ class StorageTestMixin(object):
def test_task_by_name(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my task')))
def test_transient_storage_fetch_all(self):
@@ -422,52 +422,44 @@ class StorageTestMixin(object):
s.set_flow_state(states.SUCCESS)
self.assertEqual(s.get_flow_state(), states.SUCCESS)
@mock.patch.object(storage.LOG, 'warning')
def test_result_is_checked(self, mocked_warning):
def test_result_is_checked(self):
s = self._get_storage()
s.ensure_task('my task', result_mapping={'result': 'key'})
s.ensure_atom(test_utils.NoopTask('my task', provides=set(['result'])))
s.save('my task', {})
mocked_warning.assert_called_once_with(
mock.ANY, 'my task', 'key', 'result')
self.assertRaisesRegexp(exceptions.NotFound,
'^Unable to find result', s.fetch, 'result')
@mock.patch.object(storage.LOG, 'warning')
def test_empty_result_is_checked(self, mocked_warning):
def test_empty_result_is_checked(self):
s = self._get_storage()
s.ensure_task('my task', result_mapping={'a': 0})
s.ensure_atom(test_utils.NoopTask('my task', provides=['a']))
s.save('my task', ())
mocked_warning.assert_called_once_with(
mock.ANY, 'my task', 0, 'a')
self.assertRaisesRegexp(exceptions.NotFound,
'^Unable to find result', s.fetch, 'a')
@mock.patch.object(storage.LOG, 'warning')
def test_short_result_is_checked(self, mocked_warning):
def test_short_result_is_checked(self):
s = self._get_storage()
s.ensure_task('my task', result_mapping={'a': 0, 'b': 1})
s.ensure_atom(test_utils.NoopTask('my task', provides=['a', 'b']))
s.save('my task', ['result'])
mocked_warning.assert_called_once_with(
mock.ANY, 'my task', 1, 'b')
self.assertEqual(s.fetch('a'), 'result')
self.assertRaisesRegexp(exceptions.NotFound,
'^Unable to find result', s.fetch, 'b')
def test_ensure_retry(self):
s = self._get_storage()
s.ensure_retry('my retry')
s.ensure_atom(test_utils.NoopRetry('my retry'))
history = s.get_retry_history('my retry')
self.assertEqual(history, [])
def test_ensure_retry_and_task_with_same_name(self):
s = self._get_storage()
s.ensure_task('my retry')
s.ensure_atom(test_utils.NoopTask('my retry'))
self.assertRaisesRegexp(exceptions.Duplicate,
'^Atom detail', s.ensure_retry, 'my retry')
'^Atom detail', s.ensure_atom,
test_utils.NoopRetry('my retry'))
def test_save_retry_results(self):
s = self._get_storage()
s.ensure_retry('my retry')
s.ensure_atom(test_utils.NoopRetry('my retry'))
s.save('my retry', 'a')
s.save('my retry', 'b')
history = s.get_retry_history('my retry')
@@ -475,7 +467,7 @@ class StorageTestMixin(object):
def test_save_retry_results_with_mapping(self):
s = self._get_storage()
s.ensure_retry('my retry', result_mapping={'x': 0})
s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x']))
s.save('my retry', 'a')
s.save('my retry', 'b')
history = s.get_retry_history('my retry')
@@ -485,7 +477,7 @@ class StorageTestMixin(object):
def test_cleanup_retry_history(self):
s = self._get_storage()
s.ensure_retry('my retry', result_mapping={'x': 0})
s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x']))
s.save('my retry', 'a')
s.save('my retry', 'b')
s.cleanup_retry_history('my retry', states.REVERTED)
@@ -496,7 +488,7 @@ class StorageTestMixin(object):
def test_cached_retry_failure(self):
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_retry('my retry', result_mapping={'x': 0})
s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x']))
s.save('my retry', 'a')
s.save('my retry', failure, states.FAILURE)
history = s.get_retry_history('my retry')
@@ -511,14 +503,14 @@ class StorageTestMixin(object):
def test_save_task_intention(self):
s = self._get_storage()
s.ensure_task('my task')
s.ensure_atom(test_utils.NoopTask('my task'))
s.set_atom_intention('my task', states.REVERT)
intention = s.get_atom_intention('my task')
self.assertEqual(intention, states.REVERT)
def test_save_retry_intention(self):
s = self._get_storage()
s.ensure_retry('my retry')
s.ensure_atom(test_utils.NoopTask('my retry'))
s.set_atom_intention('my retry', states.RETRY)
intention = s.get_atom_intention('my retry')
self.assertEqual(intention, states.RETRY)

View File

@@ -32,7 +32,7 @@ class TestWorker(test.MockTestCase):
self.exchange = 'test-exchange'
self.topic = 'test-topic'
self.threads_count = 5
self.endpoint_count = 21
self.endpoint_count = 22
# patch classes
self.executor_mock, self.executor_inst_mock = self.patchClass(

View File

@@ -70,6 +70,16 @@ def zookeeper_available(min_version, timeout=3):
kazoo_utils.finalize_client(client)
class NoopRetry(retry.AlwaysRevert):
pass
class NoopTask(task.Task):
def execute(self):
pass
class DummyTask(task.Task):
def execute(self, context, *args, **kwargs):