From 5a0b7649702e78fb425b171455e6ea235d4135ee Mon Sep 17 00:00:00 2001 From: tengqm Date: Thu, 23 Apr 2015 10:08:58 -0400 Subject: [PATCH] Split engine service test cases (2) This patch separates out the stack snapshot test cases. It also replaces mox tests with mock calls. Change-Id: I437746a3b7c82ed83c6ec0b3ff7d93a79cfa79b3 --- heat/tests/engine/test_stack_snapshot.py | 240 +++++++++++++++++++++++ heat/tests/test_engine_service.py | 182 ----------------- 2 files changed, 240 insertions(+), 182 deletions(-) create mode 100644 heat/tests/engine/test_stack_snapshot.py diff --git a/heat/tests/engine/test_stack_snapshot.py b/heat/tests/engine/test_stack_snapshot.py new file mode 100644 index 0000000000..c96b84e578 --- /dev/null +++ b/heat/tests/engine/test_stack_snapshot.py @@ -0,0 +1,240 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +import mock +from oslo_messaging.rpc import dispatcher +import six + +from heat.common import exception +from heat.common import template_format +from heat.engine import service +from heat.engine import stack +from heat.tests import common +from heat.tests.engine import tools +from heat.tests import utils + + +class SnapshotServiceTest(common.HeatTestCase): + # TODO(Qiming): Rework this test to handle OS::Nova::Server which + # has a real snapshot support. + def setUp(self): + super(SnapshotServiceTest, self).setUp() + self.ctx = utils.dummy_context() + + self.engine = service.EngineService('a-host', 'a-topic') + self.engine.create_periodic_tasks() + utils.setup_dummy_db() + + def _create_stack(self, stack_name): + t = template_format.parse(tools.wp_template) + stk = utils.parse_stack(t, stack_name=stack_name) + stk.state_set(stk.CREATE, stk.COMPLETE, 'mock completion') + + return stk + + def test_show_snapshot_not_found(self): + stk = self._create_stack('stack_snapshot_not_found') + snapshot_id = str(uuid.uuid4()) + ex = self.assertRaises(dispatcher.ExpectedException, + self.engine.show_snapshot, + self.ctx, stk.identifier(), + snapshot_id) + expected = 'Snapshot with id %s not found' % snapshot_id + self.assertEqual(exception.NotFound, ex.exc_info[0]) + self.assertIn(expected, six.text_type(ex.exc_info[1])) + + def test_show_snapshot_not_belong_to_stack(self): + stk1 = self._create_stack('stack_snaphot_not_belong_to_stack_1') + snapshot1 = self.engine.stack_snapshot( + self.ctx, stk1.identifier(), 'snap1') + self.engine.thread_group_mgr.groups[stk1.id].wait() + snapshot_id = snapshot1['id'] + + stk2 = self._create_stack('stack_snaphot_not_belong_to_stack_2') + ex = self.assertRaises(dispatcher.ExpectedException, + self.engine.show_snapshot, + self.ctx, stk2.identifier(), + snapshot_id) + expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) ' + 'could not be found') % {'snapshot': snapshot_id, + 'stack': stk2.name} + self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0]) + self.assertIn(expected, six.text_type(ex.exc_info[1])) + + @mock.patch.object(stack.Stack, 'load') + def test_create_snapshot(self, mock_load): + stk = self._create_stack('stack_snapshot_create') + mock_load.return_value = stk + + snapshot = self.engine.stack_snapshot( + self.ctx, stk.identifier(), 'snap1') + self.assertIsNotNone(snapshot['id']) + self.assertIsNotNone(snapshot['creation_time']) + self.assertEqual('snap1', snapshot['name']) + self.assertEqual("IN_PROGRESS", snapshot['status']) + self.engine.thread_group_mgr.groups[stk.id].wait() + snapshot = self.engine.show_snapshot( + self.ctx, stk.identifier(), snapshot['id']) + self.assertEqual("COMPLETE", snapshot['status']) + self.assertEqual("SNAPSHOT", snapshot['data']['action']) + self.assertEqual("COMPLETE", snapshot['data']['status']) + self.assertEqual(stk.id, snapshot['data']['id']) + self.assertIsNotNone(stk.updated_time) + self.assertIsNotNone(snapshot['creation_time']) + mock_load.assert_called_once_with(self.ctx, stack=mock.ANY) + + @mock.patch.object(stack.Stack, 'load') + def test_create_snapshot_action_in_progress(self, mock_load): + stack_name = 'stack_snapshot_action_in_progress' + stk = self._create_stack(stack_name) + mock_load.return_value = stk + + stk.state_set(stk.UPDATE, stk.IN_PROGRESS, 'test_override') + ex = self.assertRaises(dispatcher.ExpectedException, + self.engine.stack_snapshot, + self.ctx, stk.identifier(), 'snap_none') + self.assertEqual(exception.ActionInProgress, ex.exc_info[0]) + msg = ("Stack %(stack)s already has an action (%(action)s) " + "in progress.") % {'stack': stack_name, 'action': stk.action} + self.assertEqual(msg, six.text_type(ex.exc_info[1])) + + mock_load.assert_called_once_with(self.ctx, stack=mock.ANY) + + @mock.patch.object(stack.Stack, 'load') + def test_delete_snapshot_not_found(self, mock_load): + stk = self._create_stack('stack_snapshot_delete_not_found') + mock_load.return_value = stk + + snapshot_id = str(uuid.uuid4()) + ex = self.assertRaises(dispatcher.ExpectedException, + self.engine.delete_snapshot, + self.ctx, stk.identifier(), snapshot_id) + self.assertEqual(exception.NotFound, ex.exc_info[0]) + + mock_load.assert_called_once_with(self.ctx, stack=mock.ANY) + + @mock.patch.object(stack.Stack, 'load') + def test_delete_snapshot_not_belong_to_stack(self, mock_load): + stk1 = self._create_stack('stack_snapshot_delete_not_belong_1') + mock_load.return_value = stk1 + + snapshot1 = self.engine.stack_snapshot( + self.ctx, stk1.identifier(), 'snap1') + self.engine.thread_group_mgr.groups[stk1.id].wait() + snapshot_id = snapshot1['id'] + + mock_load.assert_called_once_with(self.ctx, stack=mock.ANY) + mock_load.reset_mock() + + stk2 = self._create_stack('stack_snapshot_delete_not_belong_2') + mock_load.return_value = stk2 + + ex = self.assertRaises(dispatcher.ExpectedException, + self.engine.delete_snapshot, + self.ctx, + stk2.identifier(), + snapshot_id) + expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) ' + 'could not be found') % {'snapshot': snapshot_id, + 'stack': stk2.name} + self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0]) + self.assertIn(expected, six.text_type(ex.exc_info[1])) + + mock_load.assert_called_once_with(self.ctx, stack=mock.ANY) + mock_load.reset_mock() + + @mock.patch.object(stack.Stack, 'load') + def test_delete_snapshot(self, mock_load): + stk = self._create_stack('stack_snapshot_delete_normal') + mock_load.return_value = stk + + snapshot = self.engine.stack_snapshot( + self.ctx, stk.identifier(), 'snap1') + self.engine.thread_group_mgr.groups[stk.id].wait() + snapshot_id = snapshot['id'] + self.engine.delete_snapshot(self.ctx, stk.identifier(), snapshot_id) + self.engine.thread_group_mgr.groups[stk.id].wait() + + ex = self.assertRaises(dispatcher.ExpectedException, + self.engine.show_snapshot, self.ctx, + stk.identifier(), snapshot_id) + self.assertEqual(exception.NotFound, ex.exc_info[0]) + + self.assertTrue(2, mock_load.call_count) + + @mock.patch.object(stack.Stack, 'load') + def test_list_snapshots(self, mock_load): + stk = self._create_stack('stack_snapshot_list') + mock_load.return_value = stk + + snapshot = self.engine.stack_snapshot( + self.ctx, stk.identifier(), 'snap1') + self.assertIsNotNone(snapshot['id']) + self.assertEqual("IN_PROGRESS", snapshot['status']) + self.engine.thread_group_mgr.groups[stk.id].wait() + + snapshots = self.engine.stack_list_snapshots( + self.ctx, stk.identifier()) + expected = { + "id": snapshot["id"], + "name": "snap1", + "status": "COMPLETE", + "status_reason": "Stack SNAPSHOT completed successfully", + "data": stk.prepare_abandon(), + "creation_time": snapshot['creation_time']} + + self.assertEqual([expected], snapshots) + mock_load.assert_called_once_with(self.ctx, stack=mock.ANY) + + @mock.patch.object(stack.Stack, 'load') + def test_restore_snapshot(self, mock_load): + stk = self._create_stack('stack_snapshot_restore_normal') + mock_load.return_value = stk + + snapshot = self.engine.stack_snapshot( + self.ctx, stk.identifier(), 'snap1') + self.engine.thread_group_mgr.groups[stk.id].wait() + snapshot_id = snapshot['id'] + self.engine.stack_restore(self.ctx, stk.identifier(), snapshot_id) + self.engine.thread_group_mgr.groups[stk.id].wait() + self.assertEqual((stk.RESTORE, stk.COMPLETE), stk.state) + self.assertEqual(2, mock_load.call_count) + + @mock.patch.object(stack.Stack, 'load') + def test_restore_snapshot_other_stack(self, mock_load): + stk1 = self._create_stack('stack_snapshot_restore_other_stack_1') + mock_load.return_value = stk1 + + snapshot1 = self.engine.stack_snapshot( + self.ctx, stk1.identifier(), 'snap1') + self.engine.thread_group_mgr.groups[stk1.id].wait() + snapshot_id = snapshot1['id'] + + mock_load.assert_called_once_with(self.ctx, stack=mock.ANY) + mock_load.reset_mock() + + stk2 = self._create_stack('stack_snapshot_restore_other_stack_1') + mock_load.return_value = stk2 + + ex = self.assertRaises(dispatcher.ExpectedException, + self.engine.stack_restore, + self.ctx, stk2.identifier(), + snapshot_id) + expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) ' + 'could not be found') % {'snapshot': snapshot_id, + 'stack': stk2.name} + self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0]) + self.assertIn(expected, six.text_type(ex.exc_info[1])) + + mock_load.assert_called_once_with(self.ctx, stack=mock.ANY) diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index 6b8ff952d9..3326ec835b 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -4156,185 +4156,3 @@ class ThreadGroupManagerStopTest(common.HeatTestCase): self.assertIn(thread, done) self.assertNotIn(stack_id, thm.groups) self.assertNotIn(stack_id, thm.events) - - -class SnapshotServiceTest(common.HeatTestCase): - # TODO(Qiming): Rework this test to handle OS::Nova::Server which - # has a real snapshot support. - def setUp(self): - super(SnapshotServiceTest, self).setUp() - self.ctx = utils.dummy_context() - - self.m.ReplayAll() - self.engine = service.EngineService('a-host', 'a-topic') - self.engine.create_periodic_tasks() - utils.setup_dummy_db() - self.addCleanup(self.m.VerifyAll) - - def _create_stack(self, stub=True): - stack = tools.get_stack('stack', self.ctx) - sid = stack.store() - - s = stack_object.Stack.get_by_id(self.ctx, sid) - stack.state_set(stack.CREATE, stack.COMPLETE, 'mock completion') - if stub: - self.m.StubOutWithMock(parser.Stack, 'load') - parser.Stack.load(self.ctx, - stack=s).MultipleTimes().AndReturn(stack) - return stack - - def test_show_snapshot_not_found(self): - stack1 = self._create_stack(stub=False) - snapshot_id = str(uuid.uuid4()) - ex = self.assertRaises(dispatcher.ExpectedException, - self.engine.show_snapshot, - self.ctx, stack1.identifier(), - snapshot_id) - expected = 'Snapshot with id %s not found' % snapshot_id - self.assertEqual(exception.NotFound, ex.exc_info[0]) - self.assertIn(expected, six.text_type(ex.exc_info[1])) - - def test_show_snapshot_not_belong_to_stack(self): - stack1 = self._create_stack(stub=False) - snapshot1 = self.engine.stack_snapshot( - self.ctx, stack1.identifier(), 'snap1') - self.engine.thread_group_mgr.groups[stack1.id].wait() - snapshot_id = snapshot1['id'] - stack2 = self._create_stack(stub=False) - ex = self.assertRaises(dispatcher.ExpectedException, - self.engine.show_snapshot, - self.ctx, stack2.identifier(), - snapshot_id) - expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) ' - 'could not be found') % {'snapshot': snapshot_id, - 'stack': stack2.name} - self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0]) - self.assertIn(expected, six.text_type(ex.exc_info[1])) - - def test_create_snapshot(self): - stack = self._create_stack() - self.m.ReplayAll() - snapshot = self.engine.stack_snapshot( - self.ctx, stack.identifier(), 'snap1') - self.assertIsNotNone(snapshot['id']) - self.assertIsNotNone(snapshot['creation_time']) - self.assertEqual('snap1', snapshot['name']) - self.assertEqual("IN_PROGRESS", snapshot['status']) - self.engine.thread_group_mgr.groups[stack.id].wait() - snapshot = self.engine.show_snapshot( - self.ctx, stack.identifier(), snapshot['id']) - self.assertEqual("COMPLETE", snapshot['status']) - self.assertEqual("SNAPSHOT", snapshot['data']['action']) - self.assertEqual("COMPLETE", snapshot['data']['status']) - self.assertEqual(stack.id, snapshot['data']['id']) - self.assertIsNotNone(stack.updated_time) - self.assertIsNotNone(snapshot['creation_time']) - - def test_create_snapshot_action_in_progress(self): - stack = self._create_stack() - self.m.ReplayAll() - stack.state_set(stack.UPDATE, stack.IN_PROGRESS, 'test_override') - ex = self.assertRaises(dispatcher.ExpectedException, - self.engine.stack_snapshot, - self.ctx, stack.identifier(), 'snap_none') - self.assertEqual(exception.ActionInProgress, ex.exc_info[0]) - msg = ("Stack stack already has an action (%(action)s) " - "in progress.") % {'action': stack.action} - self.assertEqual(msg, six.text_type(ex.exc_info[1])) - - def test_delete_snapshot_not_found(self): - stack = self._create_stack() - self.m.ReplayAll() - snapshot_id = str(uuid.uuid4()) - ex = self.assertRaises(dispatcher.ExpectedException, - self.engine.delete_snapshot, - self.ctx, stack.identifier(), snapshot_id) - self.assertEqual(exception.NotFound, ex.exc_info[0]) - - def test_delete_snapshot_not_belong_to_stack(self): - stack1 = self._create_stack() - self.m.ReplayAll() - snapshot1 = self.engine.stack_snapshot( - self.ctx, stack1.identifier(), 'snap1') - self.engine.thread_group_mgr.groups[stack1.id].wait() - snapshot_id = snapshot1['id'] - self.m.UnsetStubs() - stack2 = self._create_stack() - self.m.ReplayAll() - - ex = self.assertRaises(dispatcher.ExpectedException, - self.engine.delete_snapshot, - self.ctx, - stack2.identifier(), - snapshot_id) - expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) ' - 'could not be found') % {'snapshot': snapshot_id, - 'stack': stack2.name} - self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0]) - self.assertIn(expected, six.text_type(ex.exc_info[1])) - - def test_delete_snapshot(self): - stack = self._create_stack() - self.m.ReplayAll() - snapshot = self.engine.stack_snapshot( - self.ctx, stack.identifier(), 'snap1') - self.engine.thread_group_mgr.groups[stack.id].wait() - snapshot_id = snapshot['id'] - self.engine.delete_snapshot(self.ctx, stack.identifier(), snapshot_id) - self.engine.thread_group_mgr.groups[stack.id].wait() - ex = self.assertRaises(dispatcher.ExpectedException, - self.engine.show_snapshot, self.ctx, - stack.identifier(), snapshot_id) - self.assertEqual(exception.NotFound, ex.exc_info[0]) - - def test_list_snapshots(self): - stack = self._create_stack() - self.m.ReplayAll() - snapshot = self.engine.stack_snapshot( - self.ctx, stack.identifier(), 'snap1') - self.assertIsNotNone(snapshot['id']) - self.assertEqual("IN_PROGRESS", snapshot['status']) - self.engine.thread_group_mgr.groups[stack.id].wait() - - snapshots = self.engine.stack_list_snapshots( - self.ctx, stack.identifier()) - expected = { - "id": snapshot["id"], - "name": "snap1", - "status": "COMPLETE", - "status_reason": "Stack SNAPSHOT completed successfully", - "data": stack.prepare_abandon(), - "creation_time": snapshot['creation_time']} - self.assertEqual([expected], snapshots) - - def test_restore_snapshot(self): - stack = self._create_stack() - self.m.ReplayAll() - snapshot = self.engine.stack_snapshot( - self.ctx, stack.identifier(), 'snap1') - self.engine.thread_group_mgr.groups[stack.id].wait() - snapshot_id = snapshot['id'] - self.engine.stack_restore(self.ctx, stack.identifier(), snapshot_id) - self.engine.thread_group_mgr.groups[stack.id].wait() - self.assertEqual((stack.RESTORE, stack.COMPLETE), stack.state) - - def test_restore_snapshot_other_stack(self): - stack1 = self._create_stack() - self.m.ReplayAll() - snapshot1 = self.engine.stack_snapshot( - self.ctx, stack1.identifier(), 'snap1') - self.engine.thread_group_mgr.groups[stack1.id].wait() - snapshot_id = snapshot1['id'] - self.m.UnsetStubs() - stack2 = self._create_stack() - self.m.ReplayAll() - ex = self.assertRaises(dispatcher.ExpectedException, - self.engine.stack_restore, - self.ctx, - stack2.identifier(), - snapshot_id) - expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) ' - 'could not be found') % {'snapshot': snapshot_id, - 'stack': stack2.name} - self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0]) - self.assertIn(expected, six.text_type(ex.exc_info[1]))