Small storage tests clean-up

* Used misc.Failure.from_exception instead of misc.Failure(exc_info=(...));
* Fixed spelling error;
* Added tests for uncovered code;
* Renamed fd -> flow_detail.

Change-Id: I2d2b7e555e9bf6d0b7331b078fd973e32f202f66
This commit is contained in:
Stanislav Kudriashev
2014-02-07 15:41:42 +02:00
parent a4f88963f9
commit 98cabed9cd

View File

@@ -38,37 +38,36 @@ class StorageTest(test.TestCase):
self.backend = impl_memory.MemoryBackend(conf={})
self.thread_count = 50
def _run_many_threads(self, threads):
def tearDown(self):
with contextlib.closing(self.backend) as be:
with contextlib.closing(be.get_connection()) as conn:
conn.clear_all()
super(StorageTest, self).tearDown()
@staticmethod
def _run_many_threads(threads):
for t in threads:
t.start()
for t in threads:
t.join()
def _get_storage(self, threaded=False):
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
def _get_storage(self, flow_detail=None, threaded=False):
if flow_detail is None:
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
storage_cls = storage.SingleThreadedStorage
if threaded:
return storage.MultiThreadedStorage(backend=self.backend,
flow_detail=flow_detail)
else:
return storage.SingleThreadedStorage(backend=self.backend,
flow_detail=flow_detail)
def tearDown(self):
super(StorageTest, self).tearDown()
with contextlib.closing(self.backend) as be:
with contextlib.closing(be.get_connection()) as conn:
conn.clear_all()
storage_cls = storage.MultiThreadedStorage
return storage_cls(flow_detail=flow_detail, backend=self.backend)
def test_non_saving_storage(self):
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
s = storage.SingleThreadedStorage(flow_detail=flow_detail)
s.ensure_task('my_task')
self.assertTrue(
uuidutils.is_uuid_like(s.get_task_uuid('my_task')))
self.assertTrue(uuidutils.is_uuid_like(s.get_task_uuid('my_task')))
def test_flow_name_and_uuid(self):
fd = logbook.FlowDetail(name='test-fd', uuid='aaaa')
s = storage.SingleThreadedStorage(flow_detail=fd)
flow_detail = logbook.FlowDetail(name='test-fd', uuid='aaaa')
s = self._get_storage(flow_detail)
self.assertEqual(s.flow_name, 'test-fd')
self.assertEqual(s.flow_uuid, 'aaaa')
@@ -76,8 +75,7 @@ class StorageTest(test.TestCase):
s = self._get_storage()
s.ensure_task('my task')
self.assertEqual(s.get_task_state('my task'), states.PENDING)
self.assertTrue(
uuidutils.is_uuid_like(s.get_task_uuid('my task')))
self.assertTrue(uuidutils.is_uuid_like(s.get_task_uuid('my task')))
def test_get_tasks_states(self):
s = self._get_storage()
@@ -90,10 +88,9 @@ class StorageTest(test.TestCase):
}
self.assertEqual(s.get_tasks_states(['my task', 'my task2']), expected)
def test_ensure_task_fd(self):
def test_ensure_task_flow_detail(self):
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
s = storage.SingleThreadedStorage(backend=self.backend,
flow_detail=flow_detail)
s = self._get_storage(flow_detail)
s.ensure_task('my task', '3.11')
td = flow_detail.find(s.get_task_uuid('my task'))
self.assertIsNotNone(td)
@@ -105,18 +102,14 @@ class StorageTest(test.TestCase):
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
td = logbook.TaskDetail(name='my_task', uuid='42')
flow_detail.add(td)
s = storage.SingleThreadedStorage(backend=self.backend,
flow_detail=flow_detail)
s = self._get_storage(flow_detail)
self.assertEqual('42', s.get_task_uuid('my_task'))
def test_ensure_existing_task(self):
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
td = logbook.TaskDetail(name='my_task', uuid='42')
flow_detail.add(td)
s = storage.SingleThreadedStorage(backend=self.backend,
flow_detail=flow_detail)
s = self._get_storage(flow_detail)
s.ensure_task('my_task')
self.assertEqual('42', s.get_task_uuid('my_task'))
@@ -136,38 +129,45 @@ class StorageTest(test.TestCase):
self.assertEqual(s.get_task_state('my task'), states.FAILURE)
def test_save_and_get_failure(self):
fail = misc.Failure(exc_info=(RuntimeError, RuntimeError(), None))
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_task('my task')
s.save('my task', fail, states.FAILURE)
self.assertEqual(s.get('my task'), fail)
s.save('my task', failure, states.FAILURE)
self.assertEqual(s.get('my task'), failure)
self.assertEqual(s.get_task_state('my task'), states.FAILURE)
self.assertIs(s.has_failures(), True)
self.assertEqual(s.get_failures(), {'my task': fail})
self.assertTrue(s.has_failures())
self.assertEqual(s.get_failures(), {'my task': failure})
def test_save_and_get_non_cached_failure(self):
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_task('my task')
s.save('my task', failure, states.FAILURE)
self.assertEqual(s.get('my task'), failure)
s._failures['my task'] = None
self.assertEqual(s.get('my task'), failure)
def test_get_failure_from_reverted_task(self):
fail = misc.Failure(exc_info=(RuntimeError, RuntimeError(), None))
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_task('my task')
s.save('my task', fail, states.FAILURE)
s.save('my task', failure, states.FAILURE)
s.set_task_state('my task', states.REVERTING)
self.assertEqual(s.get('my task'), fail)
self.assertEqual(s.get('my task'), failure)
s.set_task_state('my task', states.REVERTED)
self.assertEqual(s.get('my task'), fail)
self.assertEqual(s.get('my task'), failure)
def test_get_failure_after_reload(self):
fail = misc.Failure(exc_info=(RuntimeError, RuntimeError(), None))
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage()
s.ensure_task('my task')
s.save('my task', fail, states.FAILURE)
s2 = storage.SingleThreadedStorage(backend=self.backend,
flow_detail=s._flowdetail)
s.save('my task', failure, states.FAILURE)
s2 = self._get_storage(s._flowdetail)
self.assertIs(s2.has_failures(), True)
self.assertEqual(s2.get_failures(), {'my task': fail})
self.assertEqual(s2.get('my task'), fail)
self.assertEqual(s2.get_failures(), {'my task': failure})
self.assertEqual(s2.get('my task'), failure)
self.assertEqual(s2.get_task_state('my task'), states.FAILURE)
def test_get_non_existing_var(self):
@@ -230,6 +230,16 @@ class StorageTest(test.TestCase):
"^Name 'xxx' is not mapped",
s.fetch, 'xxx')
def test_task_metadata_update_with_none(self):
s = self._get_storage()
s.ensure_task('my task')
s.update_task_metadata('my task', None)
self.assertEqual(s.get_task_progress('my task'), 0.0)
s.set_task_progress('my task', 0.5)
self.assertEqual(s.get_task_progress('my task'), 0.5)
s.update_task_metadata('my task', None)
self.assertEqual(s.get_task_progress('my task'), 0.5)
def test_default_task_progress(self):
s = self._get_storage()
s.ensure_task('my task')
@@ -322,7 +332,7 @@ class StorageTest(test.TestCase):
})
# imagine we are resuming, so we need to make new
# storage from same flow details
s2 = storage.SingleThreadedStorage(s._flowdetail, backend=self.backend)
s2 = self._get_storage(s._flowdetail)
# injected data should still be there:
self.assertEqual(s2.fetch_all(), {
'foo': 'bar',
@@ -387,7 +397,7 @@ class StorageTest(test.TestCase):
self.assertEqual(self.thread_count, len(s.fetch_all()))
self.assertEqual(1, len(s._flowdetail))
def test_fetch_meapped_args(self):
def test_fetch_mapped_args(self):
s = self._get_storage()
s.inject({'foo': 'bar', 'spam': 'eggs'})
self.assertEqual(s.fetch_mapped_args({'viking': 'spam'}),
@@ -414,8 +424,7 @@ class StorageTest(test.TestCase):
def test_task_by_name(self):
s = self._get_storage()
s.ensure_task('my task')
self.assertTrue(
uuidutils.is_uuid_like(s.get_task_uuid('my task')))
self.assertTrue(uuidutils.is_uuid_like(s.get_task_uuid('my task')))
def test_unknown_task_by_name(self):
s = self._get_storage()
@@ -428,11 +437,11 @@ class StorageTest(test.TestCase):
self.assertEqual(s.get_flow_state(), states.PENDING)
def test_get_flow_state(self):
_lb, fd = p_utils.temporary_flow_detail(backend=self.backend)
fd.state = states.FAILURE
_lb, flow_detail = p_utils.temporary_flow_detail(backend=self.backend)
flow_detail.state = states.FAILURE
with contextlib.closing(self.backend.get_connection()) as conn:
fd.update(conn.update_flow_details(fd))
s = storage.SingleThreadedStorage(flow_detail=fd, backend=self.backend)
flow_detail.update(conn.update_flow_details(flow_detail))
s = self._get_storage(flow_detail)
self.assertEqual(s.get_flow_state(), states.FAILURE)
def test_set_and_get_flow_state(self):