Add check that task provides all results it should

Value returned from execute method of task should contain all
results that are mappend with task.rebind. To verify that we
check that the value can be indexed with all necessary indexes.
If that fails, we put warning to log.

Change-Id: I21aac7b36885668155c2d7bd2eedcf4eeba22b11
This commit is contained in:
Ivan A. Melnikov
2013-09-12 19:53:33 +04:00
parent 15b2af47ae
commit 6272c83b36
3 changed files with 90 additions and 15 deletions

View File

@@ -17,16 +17,38 @@
# under the License.
import contextlib
import logging
from taskflow import exceptions
from taskflow.openstack.common import uuidutils
from taskflow.persistence import logbook
from taskflow import states
from taskflow.utils import misc
from taskflow.utils import threading_utils
LOG = logging.getLogger(__name__)
STATES_WITH_RESULTS = (states.SUCCESS, states.REVERTING, states.FAILURE)
def _item_from_result(result, index, name):
if index is None:
return result
try:
return result[index]
except (IndexError, KeyError, ValueError, TypeError):
# NOTE(harlowja): The result that the uuid returned can not be
# accessed in the manner that the index is requesting. Perhaps
# the result is a dictionary-like object and that key does
# not exist (key error), or the result is a tuple/list and a
# non-numeric key is being requested (index error), or there
# was no result and an attempt to index into None is being
# requested (type error).
raise exceptions.NotFound("Unable to find result %r" % name)
class Storage(object):
"""Interface between engines and logbook
@@ -98,6 +120,24 @@ class Storage(object):
"""Get state of task with given uuid"""
return self._taskdetail_by_uuid(uuid).state
def _check_all_results_provided(self, uuid, task_name, data):
"""Warn if task did not provide some of results
This may happen if task returns shorter tuple or list or dict
without all needed keys. It may also happen if task returns
result of wrong type.
"""
result_mapping = self._result_mappings.get(uuid, None)
if result_mapping is None:
return
for name, index in result_mapping.items():
try:
_item_from_result(data, index, name)
except exceptions.NotFound:
LOG.warning("Task %s did not supply result "
"with index %r (name %s)",
task_name, index, name)
def save(self, uuid, data, state=states.SUCCESS):
"""Put result for task with id 'uuid' to storage"""
td = self._taskdetail_by_uuid(uuid)
@@ -105,6 +145,10 @@ class Storage(object):
td.results = data
self._with_connection(self._save_task_detail, task_detail=td)
# Warn if result was incomplete
if not isinstance(data, misc.Failure):
self._check_all_results_provided(uuid, td.name, data)
def get(self, uuid):
"""Get result for task with id 'uuid' to storage"""
td = self._taskdetail_by_uuid(uuid)
@@ -158,21 +202,8 @@ class Storage(object):
for uuid, index in indexes:
try:
result = self.get(uuid)
if index is None:
return result
else:
return result[index]
return _item_from_result(result, index, name)
except exceptions.NotFound:
# NOTE(harlowja): No result was found for the given uuid.
pass
except (KeyError, IndexError, TypeError):
# NOTE(harlowja): The result that the uuid returned can not be
# accessed in the manner that the index is requesting. Perhaps
# the result is a dictionary-like object and that key does
# not exist (key error), or the result is a tuple/list and a
# non-numeric key is being requested (index error), or there
# was no result and an attempt to index into None is being
# requested (type error).
pass
raise exceptions.NotFound("Unable to find result %r" % name)

View File

@@ -17,6 +17,7 @@
# under the License.
import contextlib
import mock
from taskflow import exceptions
from taskflow.persistence.backends import impl_memory
@@ -41,6 +42,12 @@ class StorageTest(test.TestCase):
with contextlib.closing(be.get_connection()) as conn:
conn.clear_all()
def test_non_saving_storage(self):
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
s = storage.Storage(flow_detail=flow_detail) # no backend
s.add_task('42', 'my task')
self.assertEquals(s.get_uuid_by_name('my task'), '42')
def test_add_task(self):
s = self._get_storage()
s.add_task('42', 'my task')
@@ -179,3 +186,40 @@ class StorageTest(test.TestCase):
s = self._get_storage()
s.set_flow_state(states.SUCCESS)
self.assertEquals(s.get_flow_state(), states.SUCCESS)
@mock.patch.object(storage.LOG, 'warning')
def test_result_is_checked(self, mocked_warning):
s = self._get_storage()
s.add_task('42', 'my task')
s.set_result_mapping('42', {'result': 'key'})
s.save('42', {})
mocked_warning.assert_called_once_with(
mock.ANY, 'my task', 'key', 'result')
with self.assertRaisesRegexp(exceptions.NotFound,
'^Unable to find result'):
s.fetch('result')
@mock.patch.object(storage.LOG, 'warning')
def test_empty_result_is_checked(self, mocked_warning):
s = self._get_storage()
s.add_task('42', 'my task')
s.set_result_mapping('42', {'a': 0})
s.save('42', ())
mocked_warning.assert_called_once_with(
mock.ANY, 'my task', 0, 'a')
with self.assertRaisesRegexp(exceptions.NotFound,
'^Unable to find result'):
s.fetch('a')
@mock.patch.object(storage.LOG, 'warning')
def test_short_result_is_checked(self, mocked_warning):
s = self._get_storage()
s.add_task('42', 'my task')
s.set_result_mapping('42', {'a': 0, 'b': 1})
s.save('42', ['result'])
mocked_warning.assert_called_once_with(
mock.ANY, 'my task', 1, 'b')
self.assertEquals(s.fetch('a'), 'result')
with self.assertRaisesRegexp(exceptions.NotFound,
'^Unable to find result'):
s.fetch('b')

View File

@@ -6,7 +6,7 @@ hacking>=0.5.3,<0.6
coverage
distribute>=0.6.24
mox
mock>=1.0
nose
nose-exclude
openstack.nose_plugin>=0.7