Merge "Add check that task provides all results it should"
This commit is contained in:
@@ -17,16 +17,38 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import contextlib
|
import contextlib
|
||||||
|
import logging
|
||||||
|
|
||||||
from taskflow import exceptions
|
from taskflow import exceptions
|
||||||
from taskflow.openstack.common import uuidutils
|
from taskflow.openstack.common import uuidutils
|
||||||
from taskflow.persistence import logbook
|
from taskflow.persistence import logbook
|
||||||
from taskflow import states
|
from taskflow import states
|
||||||
|
from taskflow.utils import misc
|
||||||
from taskflow.utils import threading_utils
|
from taskflow.utils import threading_utils
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
STATES_WITH_RESULTS = (states.SUCCESS, states.REVERTING, states.FAILURE)
|
STATES_WITH_RESULTS = (states.SUCCESS, states.REVERTING, states.FAILURE)
|
||||||
|
|
||||||
|
|
||||||
|
def _item_from_result(result, index, name):
|
||||||
|
if index is None:
|
||||||
|
return result
|
||||||
|
try:
|
||||||
|
return result[index]
|
||||||
|
except (IndexError, KeyError, ValueError, TypeError):
|
||||||
|
# NOTE(harlowja): The result that the uuid returned can not be
|
||||||
|
# accessed in the manner that the index is requesting. Perhaps
|
||||||
|
# the result is a dictionary-like object and that key does
|
||||||
|
# not exist (key error), or the result is a tuple/list and a
|
||||||
|
# non-numeric key is being requested (index error), or there
|
||||||
|
# was no result and an attempt to index into None is being
|
||||||
|
# requested (type error).
|
||||||
|
raise exceptions.NotFound("Unable to find result %r" % name)
|
||||||
|
|
||||||
|
|
||||||
class Storage(object):
|
class Storage(object):
|
||||||
"""Interface between engines and logbook
|
"""Interface between engines and logbook
|
||||||
|
|
||||||
@@ -98,6 +120,24 @@ class Storage(object):
|
|||||||
"""Get state of task with given uuid"""
|
"""Get state of task with given uuid"""
|
||||||
return self._taskdetail_by_uuid(uuid).state
|
return self._taskdetail_by_uuid(uuid).state
|
||||||
|
|
||||||
|
def _check_all_results_provided(self, uuid, task_name, data):
|
||||||
|
"""Warn if task did not provide some of results
|
||||||
|
|
||||||
|
This may happen if task returns shorter tuple or list or dict
|
||||||
|
without all needed keys. It may also happen if task returns
|
||||||
|
result of wrong type.
|
||||||
|
"""
|
||||||
|
result_mapping = self._result_mappings.get(uuid, None)
|
||||||
|
if result_mapping is None:
|
||||||
|
return
|
||||||
|
for name, index in result_mapping.items():
|
||||||
|
try:
|
||||||
|
_item_from_result(data, index, name)
|
||||||
|
except exceptions.NotFound:
|
||||||
|
LOG.warning("Task %s did not supply result "
|
||||||
|
"with index %r (name %s)",
|
||||||
|
task_name, index, name)
|
||||||
|
|
||||||
def save(self, uuid, data, state=states.SUCCESS):
|
def save(self, uuid, data, state=states.SUCCESS):
|
||||||
"""Put result for task with id 'uuid' to storage"""
|
"""Put result for task with id 'uuid' to storage"""
|
||||||
td = self._taskdetail_by_uuid(uuid)
|
td = self._taskdetail_by_uuid(uuid)
|
||||||
@@ -105,6 +145,10 @@ class Storage(object):
|
|||||||
td.results = data
|
td.results = data
|
||||||
self._with_connection(self._save_task_detail, task_detail=td)
|
self._with_connection(self._save_task_detail, task_detail=td)
|
||||||
|
|
||||||
|
# Warn if result was incomplete
|
||||||
|
if not isinstance(data, misc.Failure):
|
||||||
|
self._check_all_results_provided(uuid, td.name, data)
|
||||||
|
|
||||||
def get(self, uuid):
|
def get(self, uuid):
|
||||||
"""Get result for task with id 'uuid' to storage"""
|
"""Get result for task with id 'uuid' to storage"""
|
||||||
td = self._taskdetail_by_uuid(uuid)
|
td = self._taskdetail_by_uuid(uuid)
|
||||||
@@ -158,21 +202,8 @@ class Storage(object):
|
|||||||
for uuid, index in indexes:
|
for uuid, index in indexes:
|
||||||
try:
|
try:
|
||||||
result = self.get(uuid)
|
result = self.get(uuid)
|
||||||
if index is None:
|
return _item_from_result(result, index, name)
|
||||||
return result
|
|
||||||
else:
|
|
||||||
return result[index]
|
|
||||||
except exceptions.NotFound:
|
except exceptions.NotFound:
|
||||||
# NOTE(harlowja): No result was found for the given uuid.
|
|
||||||
pass
|
|
||||||
except (KeyError, IndexError, TypeError):
|
|
||||||
# NOTE(harlowja): The result that the uuid returned can not be
|
|
||||||
# accessed in the manner that the index is requesting. Perhaps
|
|
||||||
# the result is a dictionary-like object and that key does
|
|
||||||
# not exist (key error), or the result is a tuple/list and a
|
|
||||||
# non-numeric key is being requested (index error), or there
|
|
||||||
# was no result and an attempt to index into None is being
|
|
||||||
# requested (type error).
|
|
||||||
pass
|
pass
|
||||||
raise exceptions.NotFound("Unable to find result %r" % name)
|
raise exceptions.NotFound("Unable to find result %r" % name)
|
||||||
|
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import contextlib
|
import contextlib
|
||||||
|
import mock
|
||||||
|
|
||||||
from taskflow import exceptions
|
from taskflow import exceptions
|
||||||
from taskflow.persistence.backends import impl_memory
|
from taskflow.persistence.backends import impl_memory
|
||||||
@@ -41,6 +42,12 @@ class StorageTest(test.TestCase):
|
|||||||
with contextlib.closing(be.get_connection()) as conn:
|
with contextlib.closing(be.get_connection()) as conn:
|
||||||
conn.clear_all()
|
conn.clear_all()
|
||||||
|
|
||||||
|
def test_non_saving_storage(self):
|
||||||
|
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
|
||||||
|
s = storage.Storage(flow_detail=flow_detail) # no backend
|
||||||
|
s.add_task('42', 'my task')
|
||||||
|
self.assertEquals(s.get_uuid_by_name('my task'), '42')
|
||||||
|
|
||||||
def test_add_task(self):
|
def test_add_task(self):
|
||||||
s = self._get_storage()
|
s = self._get_storage()
|
||||||
s.add_task('42', 'my task')
|
s.add_task('42', 'my task')
|
||||||
@@ -179,3 +186,40 @@ class StorageTest(test.TestCase):
|
|||||||
s = self._get_storage()
|
s = self._get_storage()
|
||||||
s.set_flow_state(states.SUCCESS)
|
s.set_flow_state(states.SUCCESS)
|
||||||
self.assertEquals(s.get_flow_state(), states.SUCCESS)
|
self.assertEquals(s.get_flow_state(), states.SUCCESS)
|
||||||
|
|
||||||
|
@mock.patch.object(storage.LOG, 'warning')
|
||||||
|
def test_result_is_checked(self, mocked_warning):
|
||||||
|
s = self._get_storage()
|
||||||
|
s.add_task('42', 'my task')
|
||||||
|
s.set_result_mapping('42', {'result': 'key'})
|
||||||
|
s.save('42', {})
|
||||||
|
mocked_warning.assert_called_once_with(
|
||||||
|
mock.ANY, 'my task', 'key', 'result')
|
||||||
|
with self.assertRaisesRegexp(exceptions.NotFound,
|
||||||
|
'^Unable to find result'):
|
||||||
|
s.fetch('result')
|
||||||
|
|
||||||
|
@mock.patch.object(storage.LOG, 'warning')
|
||||||
|
def test_empty_result_is_checked(self, mocked_warning):
|
||||||
|
s = self._get_storage()
|
||||||
|
s.add_task('42', 'my task')
|
||||||
|
s.set_result_mapping('42', {'a': 0})
|
||||||
|
s.save('42', ())
|
||||||
|
mocked_warning.assert_called_once_with(
|
||||||
|
mock.ANY, 'my task', 0, 'a')
|
||||||
|
with self.assertRaisesRegexp(exceptions.NotFound,
|
||||||
|
'^Unable to find result'):
|
||||||
|
s.fetch('a')
|
||||||
|
|
||||||
|
@mock.patch.object(storage.LOG, 'warning')
|
||||||
|
def test_short_result_is_checked(self, mocked_warning):
|
||||||
|
s = self._get_storage()
|
||||||
|
s.add_task('42', 'my task')
|
||||||
|
s.set_result_mapping('42', {'a': 0, 'b': 1})
|
||||||
|
s.save('42', ['result'])
|
||||||
|
mocked_warning.assert_called_once_with(
|
||||||
|
mock.ANY, 'my task', 1, 'b')
|
||||||
|
self.assertEquals(s.fetch('a'), 'result')
|
||||||
|
with self.assertRaisesRegexp(exceptions.NotFound,
|
||||||
|
'^Unable to find result'):
|
||||||
|
s.fetch('b')
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ hacking>=0.5.3,<0.6
|
|||||||
|
|
||||||
coverage
|
coverage
|
||||||
distribute>=0.6.24
|
distribute>=0.6.24
|
||||||
mox
|
mock>=1.0
|
||||||
nose
|
nose
|
||||||
nose-exclude
|
nose-exclude
|
||||||
openstack.nose_plugin>=0.7
|
openstack.nose_plugin>=0.7
|
||||||
|
|||||||
Reference in New Issue
Block a user