From eedc3353c8c362ba6df62182e20a8bb4f6ad6afe Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Sat, 28 Jun 2014 21:46:46 -0700 Subject: [PATCH] Expose only `ensure_atom` from storage Move the storage ensuring logic from being split across the engine and the storage layer and expose only a single `ensure_atom` function that does the work instead. This also removes the access to the `ensure_task` and `ensure_retry` methods as the internals of the `ensure_atom` function is now the only location that needs to use these two functions. This reduces the need to do type specific atom checks in the non-storage components (which we want to reduce overall). Breaking change: removes the public methods named `ensure_task` and `ensure_retry` (which should not be used externally anyway) from the storage object and makes those internal/private methods instead. Change-Id: I3a0f1f0dd777a1633b4937e16b50030275c84d1d --- taskflow/engines/action_engine/engine.py | 7 +- taskflow/storage.py | 28 ++++- .../tests/unit/action_engine/test_runner.py | 2 +- taskflow/tests/unit/test_storage.py | 115 +++++++----------- .../tests/unit/worker_based/test_worker.py | 2 +- taskflow/tests/utils.py | 10 ++ 6 files changed, 81 insertions(+), 83 deletions(-) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 9bf62429..8e7b3cff 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -24,7 +24,6 @@ from taskflow.engines.action_engine import executor from taskflow.engines.action_engine import runtime from taskflow.engines import base from taskflow import exceptions as exc -from taskflow import retry from taskflow import states from taskflow import storage as atom_storage from taskflow.utils import lock_utils @@ -175,11 +174,7 @@ class ActionEngine(base.EngineBase): # a resuming state (and then to suspended). self._change_state(states.RESUMING) # does nothing in PENDING state for node in self._compilation.execution_graph.nodes_iter(): - version = misc.get_version_string(node) - if isinstance(node, retry.Retry): - self.storage.ensure_retry(node.name, version, node.save_as) - else: - self.storage.ensure_task(node.name, version, node.save_as) + self.storage.ensure_atom(node) if node.inject: self.storage.inject_atom_args(node.name, node.inject) self._change_state(states.SUSPENDED) # does nothing in PENDING state diff --git a/taskflow/storage.py b/taskflow/storage.py index 31a8868f..22f0edd4 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -23,7 +23,9 @@ import six from taskflow import exceptions from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook +from taskflow import retry from taskflow import states +from taskflow import task from taskflow.utils import lock_utils from taskflow.utils import misc from taskflow.utils import reflection @@ -94,8 +96,25 @@ class Storage(object): with contextlib.closing(self._backend.get_connection()) as conn: functor(conn, *args, **kwargs) - def ensure_task(self, task_name, task_version=None, result_mapping=None): - """Ensure that there is taskdetail that corresponds the task. + def ensure_atom(self, atom): + """Ensure that there is an atomdetail in storage for the given atom. + + Returns uuid for the atomdetail that is/was created. + """ + if isinstance(atom, task.BaseTask): + return self._ensure_task(atom.name, + misc.get_version_string(atom), + atom.save_as) + elif isinstance(atom, retry.Retry): + return self._ensure_retry(atom.name, + misc.get_version_string(atom), + atom.save_as) + else: + raise TypeError("Object of type 'atom' expected." + " Got %s, %r." % (type(atom), atom)) + + def _ensure_task(self, task_name, task_version, result_mapping): + """Ensures there is a taskdetail that corresponds to the task info. If task does not exist, adds a record for it. Added task will have PENDING state. Sets result mapping for the task from result_mapping @@ -122,9 +141,8 @@ class Storage(object): self._set_result_mapping(task_name, result_mapping) return task_id - def ensure_retry(self, retry_name, retry_version=None, - result_mapping=None): - """Ensure that there is atom detail that corresponds the retry. + def _ensure_retry(self, retry_name, retry_version, result_mapping): + """Ensures there is a retrydetail that corresponds to the retry info. If retry does not exist, adds a record for it. Added retry will have PENDING state. Sets result mapping for the retry from diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py index 2e18f6b6..48cddf6b 100644 --- a/taskflow/tests/unit/action_engine/test_runner.py +++ b/taskflow/tests/unit/action_engine/test_runner.py @@ -38,7 +38,7 @@ class _RunnerTestMixin(object): store = storage.SingleThreadedStorage(flow_detail) # This ensures the tasks exist in storage... for task in compilation.execution_graph: - store.ensure_task(task.name) + store.ensure_atom(task) if initial_state: store.set_flow_state(initial_state) task_notifier = misc.Notifier() diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 94d73012..7d3b55b6 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -24,7 +24,7 @@ from taskflow.persistence import logbook from taskflow import states from taskflow import storage from taskflow import test -from taskflow.test import mock +from taskflow.tests import utils as test_utils from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils @@ -59,7 +59,7 @@ class StorageTestMixin(object): def test_non_saving_storage(self): _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) s = storage.SingleThreadedStorage(flow_detail=flow_detail) - s.ensure_task('my_task') + s.ensure_atom(test_utils.NoopTask('my_task')) self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my_task'))) def test_flow_name_and_uuid(self): @@ -70,14 +70,14 @@ class StorageTestMixin(object): def test_ensure_task(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) self.assertEqual(s.get_atom_state('my task'), states.PENDING) self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my task'))) def test_get_tasks_states(self): s = self._get_storage() - s.ensure_task('my task') - s.ensure_task('my task2') + s.ensure_atom(test_utils.NoopTask('my task')) + s.ensure_atom(test_utils.NoopTask('my task2')) s.save('my task', 'foo') expected = { 'my task': (states.SUCCESS, states.EXECUTE), @@ -88,7 +88,9 @@ class StorageTestMixin(object): def test_ensure_task_flow_detail(self): _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) s = self._get_storage(flow_detail) - s.ensure_task('my task', '3.11') + t = test_utils.NoopTask('my task') + t.version = (3, 11) + s.ensure_atom(t) td = flow_detail.find(s.get_atom_uuid('my task')) self.assertIsNotNone(td) self.assertEqual(td.name, 'my task') @@ -107,12 +109,12 @@ class StorageTestMixin(object): td = logbook.TaskDetail(name='my_task', uuid='42') flow_detail.add(td) s = self._get_storage(flow_detail) - s.ensure_task('my_task') + s.ensure_atom(test_utils.NoopTask('my_task')) self.assertEqual('42', s.get_atom_uuid('my_task')) def test_save_and_get(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.save('my task', 5) self.assertEqual(s.get('my task'), 5) self.assertEqual(s.fetch_all(), {}) @@ -120,7 +122,7 @@ class StorageTestMixin(object): def test_save_and_get_other_state(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.save('my task', 5, states.FAILURE) self.assertEqual(s.get('my task'), 5) self.assertEqual(s.get_atom_state('my task'), states.FAILURE) @@ -128,7 +130,7 @@ class StorageTestMixin(object): def test_save_and_get_cached_failure(self): failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.save('my task', failure, states.FAILURE) self.assertEqual(s.get('my task'), failure) self.assertEqual(s.get_atom_state('my task'), states.FAILURE) @@ -138,7 +140,7 @@ class StorageTestMixin(object): def test_save_and_get_non_cached_failure(self): failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.save('my task', failure, states.FAILURE) self.assertEqual(s.get('my task'), failure) s._failures['my task'] = None @@ -148,7 +150,7 @@ class StorageTestMixin(object): failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.save('my task', failure, states.FAILURE) s.set_atom_state('my task', states.REVERTING) @@ -160,7 +162,7 @@ class StorageTestMixin(object): def test_get_failure_after_reload(self): failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.save('my task', failure, states.FAILURE) s2 = self._get_storage(s._flowdetail) self.assertTrue(s2.has_failures()) @@ -170,12 +172,12 @@ class StorageTestMixin(object): def test_get_non_existing_var(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) self.assertRaises(exceptions.NotFound, s.get, 'my task') def test_reset(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.save('my task', 5) s.reset('my task') self.assertEqual(s.get_atom_state('my task'), states.PENDING) @@ -183,13 +185,13 @@ class StorageTestMixin(object): def test_reset_unknown_task(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) self.assertEqual(s.reset('my task'), None) def test_fetch_by_name(self): s = self._get_storage() name = 'my result' - s.ensure_task('my task', '1.0', {name: None}) + s.ensure_atom(test_utils.NoopTask('my task', provides=name)) s.save('my task', 5) self.assertEqual(s.fetch(name), 5) self.assertEqual(s.fetch_all(), {name: 5}) @@ -202,7 +204,7 @@ class StorageTestMixin(object): def test_task_metadata_update_with_none(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.update_atom_metadata('my task', None) self.assertEqual(s.get_task_progress('my task'), 0.0) s.set_task_progress('my task', 0.5) @@ -212,13 +214,13 @@ class StorageTestMixin(object): def test_default_task_progress(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) self.assertEqual(s.get_task_progress('my task'), 0.0) self.assertEqual(s.get_task_progress_details('my task'), None) def test_task_progress(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.set_task_progress('my task', 0.5, {'test_data': 11}) self.assertEqual(s.get_task_progress('my task'), 0.5) @@ -243,7 +245,7 @@ class StorageTestMixin(object): def test_task_progress_erase(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.set_task_progress('my task', 0.8, {}) self.assertEqual(s.get_task_progress('my task'), 0.8) @@ -252,24 +254,22 @@ class StorageTestMixin(object): def test_fetch_result_not_ready(self): s = self._get_storage() name = 'my result' - s.ensure_task('my task', result_mapping={name: None}) + s.ensure_atom(test_utils.NoopTask('my task', provides=name)) self.assertRaises(exceptions.NotFound, s.get, name) self.assertEqual(s.fetch_all(), {}) def test_save_multiple_results(self): s = self._get_storage() - result_mapping = {'foo': 0, 'bar': 1, 'whole': None} - s.ensure_task('my task', result_mapping=result_mapping) + s.ensure_atom(test_utils.NoopTask('my task', provides=['foo', 'bar'])) s.save('my task', ('spam', 'eggs')) self.assertEqual(s.fetch_all(), { 'foo': 'spam', 'bar': 'eggs', - 'whole': ('spam', 'eggs') }) def test_mapping_none(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.save('my task', 5) self.assertEqual(s.fetch_all(), {}) @@ -313,7 +313,7 @@ class StorageTestMixin(object): s = self._get_storage(threaded=True) def ensure_my_task(): - s.ensure_task('my_task', result_mapping={}) + s.ensure_atom(test_utils.NoopTask('my_task')) threads = [] for i in range(0, self.thread_count): @@ -356,7 +356,7 @@ class StorageTestMixin(object): def test_set_and_get_task_state(self): s = self._get_storage() state = states.PENDING - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.set_atom_state('my task', state) self.assertEqual(s.get_atom_state('my task'), state) @@ -367,7 +367,7 @@ class StorageTestMixin(object): def test_task_by_name(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my task'))) def test_transient_storage_fetch_all(self): @@ -422,69 +422,44 @@ class StorageTestMixin(object): s.set_flow_state(states.SUCCESS) self.assertEqual(s.get_flow_state(), states.SUCCESS) - @mock.patch.object(storage.LOG, 'warning') - def test_result_is_checked(self, mocked_warning): + def test_result_is_checked(self): s = self._get_storage() - s.ensure_task('my task', result_mapping={'result': 'key'}) + s.ensure_atom(test_utils.NoopTask('my task', provides=set(['result']))) s.save('my task', {}) - mocked_warning.assert_called_once_with( - mock.ANY, 'my task', 'key', 'result') self.assertRaisesRegexp(exceptions.NotFound, '^Unable to find result', s.fetch, 'result') - @mock.patch.object(storage.LOG, 'warning') - def test_empty_result_is_checked(self, mocked_warning): + def test_empty_result_is_checked(self): s = self._get_storage() - s.ensure_task('my task', result_mapping={'a': 0}) + s.ensure_atom(test_utils.NoopTask('my task', provides=['a'])) s.save('my task', ()) - mocked_warning.assert_called_once_with( - mock.ANY, 'my task', 0, 'a') self.assertRaisesRegexp(exceptions.NotFound, '^Unable to find result', s.fetch, 'a') - @mock.patch.object(storage.LOG, 'warning') - def test_short_result_is_checked(self, mocked_warning): + def test_short_result_is_checked(self): s = self._get_storage() - s.ensure_task('my task', result_mapping={'a': 0, 'b': 1}) + s.ensure_atom(test_utils.NoopTask('my task', provides=['a', 'b'])) s.save('my task', ['result']) - mocked_warning.assert_called_once_with( - mock.ANY, 'my task', 1, 'b') self.assertEqual(s.fetch('a'), 'result') self.assertRaisesRegexp(exceptions.NotFound, '^Unable to find result', s.fetch, 'b') - @mock.patch.object(storage.LOG, 'warning') - def test_multiple_providers_are_checked(self, mocked_warning): - s = self._get_storage() - s.ensure_task('my task', result_mapping={'result': 'key'}) - self.assertEqual(mocked_warning.mock_calls, []) - s.ensure_task('my other task', result_mapping={'result': 'key'}) - mocked_warning.assert_called_once_with( - mock.ANY, 'result') - - @mock.patch.object(storage.LOG, 'warning') - def test_multiple_providers_with_inject_are_checked(self, mocked_warning): - s = self._get_storage() - s.inject({'result': 'DONE'}) - self.assertEqual(mocked_warning.mock_calls, []) - s.ensure_task('my other task', result_mapping={'result': 'key'}) - mocked_warning.assert_called_once_with(mock.ANY, 'result') - def test_ensure_retry(self): s = self._get_storage() - s.ensure_retry('my retry') + s.ensure_atom(test_utils.NoopRetry('my retry')) history = s.get_retry_history('my retry') self.assertEqual(history, []) def test_ensure_retry_and_task_with_same_name(self): s = self._get_storage() - s.ensure_task('my retry') + s.ensure_atom(test_utils.NoopTask('my retry')) self.assertRaisesRegexp(exceptions.Duplicate, - '^Atom detail', s.ensure_retry, 'my retry') + '^Atom detail', s.ensure_atom, + test_utils.NoopRetry('my retry')) def test_save_retry_results(self): s = self._get_storage() - s.ensure_retry('my retry') + s.ensure_atom(test_utils.NoopRetry('my retry')) s.save('my retry', 'a') s.save('my retry', 'b') history = s.get_retry_history('my retry') @@ -492,7 +467,7 @@ class StorageTestMixin(object): def test_save_retry_results_with_mapping(self): s = self._get_storage() - s.ensure_retry('my retry', result_mapping={'x': 0}) + s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x'])) s.save('my retry', 'a') s.save('my retry', 'b') history = s.get_retry_history('my retry') @@ -502,7 +477,7 @@ class StorageTestMixin(object): def test_cleanup_retry_history(self): s = self._get_storage() - s.ensure_retry('my retry', result_mapping={'x': 0}) + s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x'])) s.save('my retry', 'a') s.save('my retry', 'b') s.cleanup_retry_history('my retry', states.REVERTED) @@ -513,7 +488,7 @@ class StorageTestMixin(object): def test_cached_retry_failure(self): failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() - s.ensure_retry('my retry', result_mapping={'x': 0}) + s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x'])) s.save('my retry', 'a') s.save('my retry', failure, states.FAILURE) history = s.get_retry_history('my retry') @@ -528,14 +503,14 @@ class StorageTestMixin(object): def test_save_task_intention(self): s = self._get_storage() - s.ensure_task('my task') + s.ensure_atom(test_utils.NoopTask('my task')) s.set_atom_intention('my task', states.REVERT) intention = s.get_atom_intention('my task') self.assertEqual(intention, states.REVERT) def test_save_retry_intention(self): s = self._get_storage() - s.ensure_retry('my retry') + s.ensure_atom(test_utils.NoopTask('my retry')) s.set_atom_intention('my retry', states.RETRY) intention = s.get_atom_intention('my retry') self.assertEqual(intention, states.RETRY) diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index 9d08db21..d37e817f 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -32,7 +32,7 @@ class TestWorker(test.MockTestCase): self.exchange = 'test-exchange' self.topic = 'test-topic' self.threads_count = 5 - self.endpoint_count = 21 + self.endpoint_count = 22 # patch classes self.executor_mock, self.executor_inst_mock = self.patchClass( diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index d7c85b95..e7c3fc69 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -70,6 +70,16 @@ def zookeeper_available(min_version, timeout=3): kazoo_utils.finalize_client(client) +class NoopRetry(retry.AlwaysRevert): + pass + + +class NoopTask(task.Task): + + def execute(self): + pass + + class DummyTask(task.Task): def execute(self, context, *args, **kwargs):