diff --git a/taskflow/storage.py b/taskflow/storage.py index 054e2710..5e883790 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -17,16 +17,38 @@ # under the License. import contextlib +import logging from taskflow import exceptions from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook from taskflow import states +from taskflow.utils import misc from taskflow.utils import threading_utils + +LOG = logging.getLogger(__name__) + + STATES_WITH_RESULTS = (states.SUCCESS, states.REVERTING, states.FAILURE) +def _item_from_result(result, index, name): + if index is None: + return result + try: + return result[index] + except (IndexError, KeyError, ValueError, TypeError): + # NOTE(harlowja): The result that the uuid returned can not be + # accessed in the manner that the index is requesting. Perhaps + # the result is a dictionary-like object and that key does + # not exist (key error), or the result is a tuple/list and a + # non-numeric key is being requested (index error), or there + # was no result and an attempt to index into None is being + # requested (type error). + raise exceptions.NotFound("Unable to find result %r" % name) + + class Storage(object): """Interface between engines and logbook @@ -98,6 +120,24 @@ class Storage(object): """Get state of task with given uuid""" return self._taskdetail_by_uuid(uuid).state + def _check_all_results_provided(self, uuid, task_name, data): + """Warn if task did not provide some of results + + This may happen if task returns shorter tuple or list or dict + without all needed keys. It may also happen if task returns + result of wrong type. + """ + result_mapping = self._result_mappings.get(uuid, None) + if result_mapping is None: + return + for name, index in result_mapping.items(): + try: + _item_from_result(data, index, name) + except exceptions.NotFound: + LOG.warning("Task %s did not supply result " + "with index %r (name %s)", + task_name, index, name) + def save(self, uuid, data, state=states.SUCCESS): """Put result for task with id 'uuid' to storage""" td = self._taskdetail_by_uuid(uuid) @@ -105,6 +145,10 @@ class Storage(object): td.results = data self._with_connection(self._save_task_detail, task_detail=td) + # Warn if result was incomplete + if not isinstance(data, misc.Failure): + self._check_all_results_provided(uuid, td.name, data) + def get(self, uuid): """Get result for task with id 'uuid' to storage""" td = self._taskdetail_by_uuid(uuid) @@ -158,21 +202,8 @@ class Storage(object): for uuid, index in indexes: try: result = self.get(uuid) - if index is None: - return result - else: - return result[index] + return _item_from_result(result, index, name) except exceptions.NotFound: - # NOTE(harlowja): No result was found for the given uuid. - pass - except (KeyError, IndexError, TypeError): - # NOTE(harlowja): The result that the uuid returned can not be - # accessed in the manner that the index is requesting. Perhaps - # the result is a dictionary-like object and that key does - # not exist (key error), or the result is a tuple/list and a - # non-numeric key is being requested (index error), or there - # was no result and an attempt to index into None is being - # requested (type error). pass raise exceptions.NotFound("Unable to find result %r" % name) diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 8e7ed3d6..86c30d93 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -17,6 +17,7 @@ # under the License. import contextlib +import mock from taskflow import exceptions from taskflow.persistence.backends import impl_memory @@ -41,6 +42,12 @@ class StorageTest(test.TestCase): with contextlib.closing(be.get_connection()) as conn: conn.clear_all() + def test_non_saving_storage(self): + _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) + s = storage.Storage(flow_detail=flow_detail) # no backend + s.add_task('42', 'my task') + self.assertEquals(s.get_uuid_by_name('my task'), '42') + def test_add_task(self): s = self._get_storage() s.add_task('42', 'my task') @@ -179,3 +186,40 @@ class StorageTest(test.TestCase): s = self._get_storage() s.set_flow_state(states.SUCCESS) self.assertEquals(s.get_flow_state(), states.SUCCESS) + + @mock.patch.object(storage.LOG, 'warning') + def test_result_is_checked(self, mocked_warning): + s = self._get_storage() + s.add_task('42', 'my task') + s.set_result_mapping('42', {'result': 'key'}) + s.save('42', {}) + mocked_warning.assert_called_once_with( + mock.ANY, 'my task', 'key', 'result') + with self.assertRaisesRegexp(exceptions.NotFound, + '^Unable to find result'): + s.fetch('result') + + @mock.patch.object(storage.LOG, 'warning') + def test_empty_result_is_checked(self, mocked_warning): + s = self._get_storage() + s.add_task('42', 'my task') + s.set_result_mapping('42', {'a': 0}) + s.save('42', ()) + mocked_warning.assert_called_once_with( + mock.ANY, 'my task', 0, 'a') + with self.assertRaisesRegexp(exceptions.NotFound, + '^Unable to find result'): + s.fetch('a') + + @mock.patch.object(storage.LOG, 'warning') + def test_short_result_is_checked(self, mocked_warning): + s = self._get_storage() + s.add_task('42', 'my task') + s.set_result_mapping('42', {'a': 0, 'b': 1}) + s.save('42', ['result']) + mocked_warning.assert_called_once_with( + mock.ANY, 'my task', 1, 'b') + self.assertEquals(s.fetch('a'), 'result') + with self.assertRaisesRegexp(exceptions.NotFound, + '^Unable to find result'): + s.fetch('b') diff --git a/tools/test-requires b/tools/test-requires index b37c8069..fdc29b93 100644 --- a/tools/test-requires +++ b/tools/test-requires @@ -6,7 +6,7 @@ hacking>=0.5.3,<0.6 coverage distribute>=0.6.24 -mox +mock>=1.0 nose nose-exclude openstack.nose_plugin>=0.7