diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index c3494f24..72227e3b 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -38,37 +38,36 @@ class StorageTest(test.TestCase): self.backend = impl_memory.MemoryBackend(conf={}) self.thread_count = 50 - def _run_many_threads(self, threads): + def tearDown(self): + with contextlib.closing(self.backend) as be: + with contextlib.closing(be.get_connection()) as conn: + conn.clear_all() + super(StorageTest, self).tearDown() + + @staticmethod + def _run_many_threads(threads): for t in threads: t.start() for t in threads: t.join() - def _get_storage(self, threaded=False): - _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) + def _get_storage(self, flow_detail=None, threaded=False): + if flow_detail is None: + _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) + storage_cls = storage.SingleThreadedStorage if threaded: - return storage.MultiThreadedStorage(backend=self.backend, - flow_detail=flow_detail) - else: - return storage.SingleThreadedStorage(backend=self.backend, - flow_detail=flow_detail) - - def tearDown(self): - super(StorageTest, self).tearDown() - with contextlib.closing(self.backend) as be: - with contextlib.closing(be.get_connection()) as conn: - conn.clear_all() + storage_cls = storage.MultiThreadedStorage + return storage_cls(flow_detail=flow_detail, backend=self.backend) def test_non_saving_storage(self): _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) s = storage.SingleThreadedStorage(flow_detail=flow_detail) s.ensure_task('my_task') - self.assertTrue( - uuidutils.is_uuid_like(s.get_task_uuid('my_task'))) + self.assertTrue(uuidutils.is_uuid_like(s.get_task_uuid('my_task'))) def test_flow_name_and_uuid(self): - fd = logbook.FlowDetail(name='test-fd', uuid='aaaa') - s = storage.SingleThreadedStorage(flow_detail=fd) + flow_detail = logbook.FlowDetail(name='test-fd', uuid='aaaa') + s = self._get_storage(flow_detail) self.assertEqual(s.flow_name, 'test-fd') self.assertEqual(s.flow_uuid, 'aaaa') @@ -76,8 +75,7 @@ class StorageTest(test.TestCase): s = self._get_storage() s.ensure_task('my task') self.assertEqual(s.get_task_state('my task'), states.PENDING) - self.assertTrue( - uuidutils.is_uuid_like(s.get_task_uuid('my task'))) + self.assertTrue(uuidutils.is_uuid_like(s.get_task_uuid('my task'))) def test_get_tasks_states(self): s = self._get_storage() @@ -90,10 +88,9 @@ class StorageTest(test.TestCase): } self.assertEqual(s.get_tasks_states(['my task', 'my task2']), expected) - def test_ensure_task_fd(self): + def test_ensure_task_flow_detail(self): _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) - s = storage.SingleThreadedStorage(backend=self.backend, - flow_detail=flow_detail) + s = self._get_storage(flow_detail) s.ensure_task('my task', '3.11') td = flow_detail.find(s.get_task_uuid('my task')) self.assertIsNotNone(td) @@ -105,18 +102,14 @@ class StorageTest(test.TestCase): _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) td = logbook.TaskDetail(name='my_task', uuid='42') flow_detail.add(td) - - s = storage.SingleThreadedStorage(backend=self.backend, - flow_detail=flow_detail) + s = self._get_storage(flow_detail) self.assertEqual('42', s.get_task_uuid('my_task')) def test_ensure_existing_task(self): _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) td = logbook.TaskDetail(name='my_task', uuid='42') flow_detail.add(td) - - s = storage.SingleThreadedStorage(backend=self.backend, - flow_detail=flow_detail) + s = self._get_storage(flow_detail) s.ensure_task('my_task') self.assertEqual('42', s.get_task_uuid('my_task')) @@ -136,38 +129,45 @@ class StorageTest(test.TestCase): self.assertEqual(s.get_task_state('my task'), states.FAILURE) def test_save_and_get_failure(self): - fail = misc.Failure(exc_info=(RuntimeError, RuntimeError(), None)) + failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_task('my task') - s.save('my task', fail, states.FAILURE) - self.assertEqual(s.get('my task'), fail) + s.save('my task', failure, states.FAILURE) + self.assertEqual(s.get('my task'), failure) self.assertEqual(s.get_task_state('my task'), states.FAILURE) - self.assertIs(s.has_failures(), True) - self.assertEqual(s.get_failures(), {'my task': fail}) + self.assertTrue(s.has_failures()) + self.assertEqual(s.get_failures(), {'my task': failure}) + + def test_save_and_get_non_cached_failure(self): + failure = misc.Failure.from_exception(RuntimeError('Woot!')) + s = self._get_storage() + s.ensure_task('my task') + s.save('my task', failure, states.FAILURE) + self.assertEqual(s.get('my task'), failure) + s._failures['my task'] = None + self.assertEqual(s.get('my task'), failure) def test_get_failure_from_reverted_task(self): - fail = misc.Failure(exc_info=(RuntimeError, RuntimeError(), None)) + failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_task('my task') - s.save('my task', fail, states.FAILURE) + s.save('my task', failure, states.FAILURE) s.set_task_state('my task', states.REVERTING) - self.assertEqual(s.get('my task'), fail) + self.assertEqual(s.get('my task'), failure) s.set_task_state('my task', states.REVERTED) - self.assertEqual(s.get('my task'), fail) + self.assertEqual(s.get('my task'), failure) def test_get_failure_after_reload(self): - fail = misc.Failure(exc_info=(RuntimeError, RuntimeError(), None)) + failure = misc.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_task('my task') - s.save('my task', fail, states.FAILURE) - - s2 = storage.SingleThreadedStorage(backend=self.backend, - flow_detail=s._flowdetail) + s.save('my task', failure, states.FAILURE) + s2 = self._get_storage(s._flowdetail) self.assertIs(s2.has_failures(), True) - self.assertEqual(s2.get_failures(), {'my task': fail}) - self.assertEqual(s2.get('my task'), fail) + self.assertEqual(s2.get_failures(), {'my task': failure}) + self.assertEqual(s2.get('my task'), failure) self.assertEqual(s2.get_task_state('my task'), states.FAILURE) def test_get_non_existing_var(self): @@ -230,6 +230,16 @@ class StorageTest(test.TestCase): "^Name 'xxx' is not mapped", s.fetch, 'xxx') + def test_task_metadata_update_with_none(self): + s = self._get_storage() + s.ensure_task('my task') + s.update_task_metadata('my task', None) + self.assertEqual(s.get_task_progress('my task'), 0.0) + s.set_task_progress('my task', 0.5) + self.assertEqual(s.get_task_progress('my task'), 0.5) + s.update_task_metadata('my task', None) + self.assertEqual(s.get_task_progress('my task'), 0.5) + def test_default_task_progress(self): s = self._get_storage() s.ensure_task('my task') @@ -322,7 +332,7 @@ class StorageTest(test.TestCase): }) # imagine we are resuming, so we need to make new # storage from same flow details - s2 = storage.SingleThreadedStorage(s._flowdetail, backend=self.backend) + s2 = self._get_storage(s._flowdetail) # injected data should still be there: self.assertEqual(s2.fetch_all(), { 'foo': 'bar', @@ -387,7 +397,7 @@ class StorageTest(test.TestCase): self.assertEqual(self.thread_count, len(s.fetch_all())) self.assertEqual(1, len(s._flowdetail)) - def test_fetch_meapped_args(self): + def test_fetch_mapped_args(self): s = self._get_storage() s.inject({'foo': 'bar', 'spam': 'eggs'}) self.assertEqual(s.fetch_mapped_args({'viking': 'spam'}), @@ -414,8 +424,7 @@ class StorageTest(test.TestCase): def test_task_by_name(self): s = self._get_storage() s.ensure_task('my task') - self.assertTrue( - uuidutils.is_uuid_like(s.get_task_uuid('my task'))) + self.assertTrue(uuidutils.is_uuid_like(s.get_task_uuid('my task'))) def test_unknown_task_by_name(self): s = self._get_storage() @@ -428,11 +437,11 @@ class StorageTest(test.TestCase): self.assertEqual(s.get_flow_state(), states.PENDING) def test_get_flow_state(self): - _lb, fd = p_utils.temporary_flow_detail(backend=self.backend) - fd.state = states.FAILURE + _lb, flow_detail = p_utils.temporary_flow_detail(backend=self.backend) + flow_detail.state = states.FAILURE with contextlib.closing(self.backend.get_connection()) as conn: - fd.update(conn.update_flow_details(fd)) - s = storage.SingleThreadedStorage(flow_detail=fd, backend=self.backend) + flow_detail.update(conn.update_flow_details(flow_detail)) + s = self._get_storage(flow_detail) self.assertEqual(s.get_flow_state(), states.FAILURE) def test_set_and_get_flow_state(self):