Merge "Split engine service test cases (2)"

This commit is contained in:
Jenkins 2015-05-06 04:47:29 +00:00 committed by Gerrit Code Review
commit 4fcfdf1e71
2 changed files with 240 additions and 182 deletions

View File

@ -0,0 +1,240 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import mock
from oslo_messaging.rpc import dispatcher
import six
from heat.common import exception
from heat.common import template_format
from heat.engine import service
from heat.engine import stack
from heat.tests import common
from heat.tests.engine import tools
from heat.tests import utils
class SnapshotServiceTest(common.HeatTestCase):
# TODO(Qiming): Rework this test to handle OS::Nova::Server which
# has a real snapshot support.
def setUp(self):
super(SnapshotServiceTest, self).setUp()
self.ctx = utils.dummy_context()
self.engine = service.EngineService('a-host', 'a-topic')
self.engine.create_periodic_tasks()
utils.setup_dummy_db()
def _create_stack(self, stack_name):
t = template_format.parse(tools.wp_template)
stk = utils.parse_stack(t, stack_name=stack_name)
stk.state_set(stk.CREATE, stk.COMPLETE, 'mock completion')
return stk
def test_show_snapshot_not_found(self):
stk = self._create_stack('stack_snapshot_not_found')
snapshot_id = str(uuid.uuid4())
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.show_snapshot,
self.ctx, stk.identifier(),
snapshot_id)
expected = 'Snapshot with id %s not found' % snapshot_id
self.assertEqual(exception.NotFound, ex.exc_info[0])
self.assertIn(expected, six.text_type(ex.exc_info[1]))
def test_show_snapshot_not_belong_to_stack(self):
stk1 = self._create_stack('stack_snaphot_not_belong_to_stack_1')
snapshot1 = self.engine.stack_snapshot(
self.ctx, stk1.identifier(), 'snap1')
self.engine.thread_group_mgr.groups[stk1.id].wait()
snapshot_id = snapshot1['id']
stk2 = self._create_stack('stack_snaphot_not_belong_to_stack_2')
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.show_snapshot,
self.ctx, stk2.identifier(),
snapshot_id)
expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) '
'could not be found') % {'snapshot': snapshot_id,
'stack': stk2.name}
self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0])
self.assertIn(expected, six.text_type(ex.exc_info[1]))
@mock.patch.object(stack.Stack, 'load')
def test_create_snapshot(self, mock_load):
stk = self._create_stack('stack_snapshot_create')
mock_load.return_value = stk
snapshot = self.engine.stack_snapshot(
self.ctx, stk.identifier(), 'snap1')
self.assertIsNotNone(snapshot['id'])
self.assertIsNotNone(snapshot['creation_time'])
self.assertEqual('snap1', snapshot['name'])
self.assertEqual("IN_PROGRESS", snapshot['status'])
self.engine.thread_group_mgr.groups[stk.id].wait()
snapshot = self.engine.show_snapshot(
self.ctx, stk.identifier(), snapshot['id'])
self.assertEqual("COMPLETE", snapshot['status'])
self.assertEqual("SNAPSHOT", snapshot['data']['action'])
self.assertEqual("COMPLETE", snapshot['data']['status'])
self.assertEqual(stk.id, snapshot['data']['id'])
self.assertIsNotNone(stk.updated_time)
self.assertIsNotNone(snapshot['creation_time'])
mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
@mock.patch.object(stack.Stack, 'load')
def test_create_snapshot_action_in_progress(self, mock_load):
stack_name = 'stack_snapshot_action_in_progress'
stk = self._create_stack(stack_name)
mock_load.return_value = stk
stk.state_set(stk.UPDATE, stk.IN_PROGRESS, 'test_override')
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.stack_snapshot,
self.ctx, stk.identifier(), 'snap_none')
self.assertEqual(exception.ActionInProgress, ex.exc_info[0])
msg = ("Stack %(stack)s already has an action (%(action)s) "
"in progress.") % {'stack': stack_name, 'action': stk.action}
self.assertEqual(msg, six.text_type(ex.exc_info[1]))
mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
@mock.patch.object(stack.Stack, 'load')
def test_delete_snapshot_not_found(self, mock_load):
stk = self._create_stack('stack_snapshot_delete_not_found')
mock_load.return_value = stk
snapshot_id = str(uuid.uuid4())
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.delete_snapshot,
self.ctx, stk.identifier(), snapshot_id)
self.assertEqual(exception.NotFound, ex.exc_info[0])
mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
@mock.patch.object(stack.Stack, 'load')
def test_delete_snapshot_not_belong_to_stack(self, mock_load):
stk1 = self._create_stack('stack_snapshot_delete_not_belong_1')
mock_load.return_value = stk1
snapshot1 = self.engine.stack_snapshot(
self.ctx, stk1.identifier(), 'snap1')
self.engine.thread_group_mgr.groups[stk1.id].wait()
snapshot_id = snapshot1['id']
mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
mock_load.reset_mock()
stk2 = self._create_stack('stack_snapshot_delete_not_belong_2')
mock_load.return_value = stk2
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.delete_snapshot,
self.ctx,
stk2.identifier(),
snapshot_id)
expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) '
'could not be found') % {'snapshot': snapshot_id,
'stack': stk2.name}
self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0])
self.assertIn(expected, six.text_type(ex.exc_info[1]))
mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
mock_load.reset_mock()
@mock.patch.object(stack.Stack, 'load')
def test_delete_snapshot(self, mock_load):
stk = self._create_stack('stack_snapshot_delete_normal')
mock_load.return_value = stk
snapshot = self.engine.stack_snapshot(
self.ctx, stk.identifier(), 'snap1')
self.engine.thread_group_mgr.groups[stk.id].wait()
snapshot_id = snapshot['id']
self.engine.delete_snapshot(self.ctx, stk.identifier(), snapshot_id)
self.engine.thread_group_mgr.groups[stk.id].wait()
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.show_snapshot, self.ctx,
stk.identifier(), snapshot_id)
self.assertEqual(exception.NotFound, ex.exc_info[0])
self.assertTrue(2, mock_load.call_count)
@mock.patch.object(stack.Stack, 'load')
def test_list_snapshots(self, mock_load):
stk = self._create_stack('stack_snapshot_list')
mock_load.return_value = stk
snapshot = self.engine.stack_snapshot(
self.ctx, stk.identifier(), 'snap1')
self.assertIsNotNone(snapshot['id'])
self.assertEqual("IN_PROGRESS", snapshot['status'])
self.engine.thread_group_mgr.groups[stk.id].wait()
snapshots = self.engine.stack_list_snapshots(
self.ctx, stk.identifier())
expected = {
"id": snapshot["id"],
"name": "snap1",
"status": "COMPLETE",
"status_reason": "Stack SNAPSHOT completed successfully",
"data": stk.prepare_abandon(),
"creation_time": snapshot['creation_time']}
self.assertEqual([expected], snapshots)
mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
@mock.patch.object(stack.Stack, 'load')
def test_restore_snapshot(self, mock_load):
stk = self._create_stack('stack_snapshot_restore_normal')
mock_load.return_value = stk
snapshot = self.engine.stack_snapshot(
self.ctx, stk.identifier(), 'snap1')
self.engine.thread_group_mgr.groups[stk.id].wait()
snapshot_id = snapshot['id']
self.engine.stack_restore(self.ctx, stk.identifier(), snapshot_id)
self.engine.thread_group_mgr.groups[stk.id].wait()
self.assertEqual((stk.RESTORE, stk.COMPLETE), stk.state)
self.assertEqual(2, mock_load.call_count)
@mock.patch.object(stack.Stack, 'load')
def test_restore_snapshot_other_stack(self, mock_load):
stk1 = self._create_stack('stack_snapshot_restore_other_stack_1')
mock_load.return_value = stk1
snapshot1 = self.engine.stack_snapshot(
self.ctx, stk1.identifier(), 'snap1')
self.engine.thread_group_mgr.groups[stk1.id].wait()
snapshot_id = snapshot1['id']
mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
mock_load.reset_mock()
stk2 = self._create_stack('stack_snapshot_restore_other_stack_1')
mock_load.return_value = stk2
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.stack_restore,
self.ctx, stk2.identifier(),
snapshot_id)
expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) '
'could not be found') % {'snapshot': snapshot_id,
'stack': stk2.name}
self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0])
self.assertIn(expected, six.text_type(ex.exc_info[1]))
mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)

View File

@ -4618,185 +4618,3 @@ class ThreadGroupManagerStopTest(common.HeatTestCase):
self.assertIn(thread, done)
self.assertNotIn(stack_id, thm.groups)
self.assertNotIn(stack_id, thm.events)
class SnapshotServiceTest(common.HeatTestCase):
# TODO(Qiming): Rework this test to handle OS::Nova::Server which
# has a real snapshot support.
def setUp(self):
super(SnapshotServiceTest, self).setUp()
self.ctx = utils.dummy_context()
self.m.ReplayAll()
self.engine = service.EngineService('a-host', 'a-topic')
self.engine.create_periodic_tasks()
utils.setup_dummy_db()
self.addCleanup(self.m.VerifyAll)
def _create_stack(self, stub=True):
stack = tools.get_stack('stack', self.ctx)
sid = stack.store()
s = stack_object.Stack.get_by_id(self.ctx, sid)
stack.state_set(stack.CREATE, stack.COMPLETE, 'mock completion')
if stub:
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx,
stack=s).MultipleTimes().AndReturn(stack)
return stack
def test_show_snapshot_not_found(self):
stack1 = self._create_stack(stub=False)
snapshot_id = str(uuid.uuid4())
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.show_snapshot,
self.ctx, stack1.identifier(),
snapshot_id)
expected = 'Snapshot with id %s not found' % snapshot_id
self.assertEqual(exception.NotFound, ex.exc_info[0])
self.assertIn(expected, six.text_type(ex.exc_info[1]))
def test_show_snapshot_not_belong_to_stack(self):
stack1 = self._create_stack(stub=False)
snapshot1 = self.engine.stack_snapshot(
self.ctx, stack1.identifier(), 'snap1')
self.engine.thread_group_mgr.groups[stack1.id].wait()
snapshot_id = snapshot1['id']
stack2 = self._create_stack(stub=False)
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.show_snapshot,
self.ctx, stack2.identifier(),
snapshot_id)
expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) '
'could not be found') % {'snapshot': snapshot_id,
'stack': stack2.name}
self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0])
self.assertIn(expected, six.text_type(ex.exc_info[1]))
def test_create_snapshot(self):
stack = self._create_stack()
self.m.ReplayAll()
snapshot = self.engine.stack_snapshot(
self.ctx, stack.identifier(), 'snap1')
self.assertIsNotNone(snapshot['id'])
self.assertIsNotNone(snapshot['creation_time'])
self.assertEqual('snap1', snapshot['name'])
self.assertEqual("IN_PROGRESS", snapshot['status'])
self.engine.thread_group_mgr.groups[stack.id].wait()
snapshot = self.engine.show_snapshot(
self.ctx, stack.identifier(), snapshot['id'])
self.assertEqual("COMPLETE", snapshot['status'])
self.assertEqual("SNAPSHOT", snapshot['data']['action'])
self.assertEqual("COMPLETE", snapshot['data']['status'])
self.assertEqual(stack.id, snapshot['data']['id'])
self.assertIsNotNone(stack.updated_time)
self.assertIsNotNone(snapshot['creation_time'])
def test_create_snapshot_action_in_progress(self):
stack = self._create_stack()
self.m.ReplayAll()
stack.state_set(stack.UPDATE, stack.IN_PROGRESS, 'test_override')
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.stack_snapshot,
self.ctx, stack.identifier(), 'snap_none')
self.assertEqual(exception.ActionInProgress, ex.exc_info[0])
msg = ("Stack stack already has an action (%(action)s) "
"in progress.") % {'action': stack.action}
self.assertEqual(msg, six.text_type(ex.exc_info[1]))
def test_delete_snapshot_not_found(self):
stack = self._create_stack()
self.m.ReplayAll()
snapshot_id = str(uuid.uuid4())
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.delete_snapshot,
self.ctx, stack.identifier(), snapshot_id)
self.assertEqual(exception.NotFound, ex.exc_info[0])
def test_delete_snapshot_not_belong_to_stack(self):
stack1 = self._create_stack()
self.m.ReplayAll()
snapshot1 = self.engine.stack_snapshot(
self.ctx, stack1.identifier(), 'snap1')
self.engine.thread_group_mgr.groups[stack1.id].wait()
snapshot_id = snapshot1['id']
self.m.UnsetStubs()
stack2 = self._create_stack()
self.m.ReplayAll()
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.delete_snapshot,
self.ctx,
stack2.identifier(),
snapshot_id)
expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) '
'could not be found') % {'snapshot': snapshot_id,
'stack': stack2.name}
self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0])
self.assertIn(expected, six.text_type(ex.exc_info[1]))
def test_delete_snapshot(self):
stack = self._create_stack()
self.m.ReplayAll()
snapshot = self.engine.stack_snapshot(
self.ctx, stack.identifier(), 'snap1')
self.engine.thread_group_mgr.groups[stack.id].wait()
snapshot_id = snapshot['id']
self.engine.delete_snapshot(self.ctx, stack.identifier(), snapshot_id)
self.engine.thread_group_mgr.groups[stack.id].wait()
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.show_snapshot, self.ctx,
stack.identifier(), snapshot_id)
self.assertEqual(exception.NotFound, ex.exc_info[0])
def test_list_snapshots(self):
stack = self._create_stack()
self.m.ReplayAll()
snapshot = self.engine.stack_snapshot(
self.ctx, stack.identifier(), 'snap1')
self.assertIsNotNone(snapshot['id'])
self.assertEqual("IN_PROGRESS", snapshot['status'])
self.engine.thread_group_mgr.groups[stack.id].wait()
snapshots = self.engine.stack_list_snapshots(
self.ctx, stack.identifier())
expected = {
"id": snapshot["id"],
"name": "snap1",
"status": "COMPLETE",
"status_reason": "Stack SNAPSHOT completed successfully",
"data": stack.prepare_abandon(),
"creation_time": snapshot['creation_time']}
self.assertEqual([expected], snapshots)
def test_restore_snapshot(self):
stack = self._create_stack()
self.m.ReplayAll()
snapshot = self.engine.stack_snapshot(
self.ctx, stack.identifier(), 'snap1')
self.engine.thread_group_mgr.groups[stack.id].wait()
snapshot_id = snapshot['id']
self.engine.stack_restore(self.ctx, stack.identifier(), snapshot_id)
self.engine.thread_group_mgr.groups[stack.id].wait()
self.assertEqual((stack.RESTORE, stack.COMPLETE), stack.state)
def test_restore_snapshot_other_stack(self):
stack1 = self._create_stack()
self.m.ReplayAll()
snapshot1 = self.engine.stack_snapshot(
self.ctx, stack1.identifier(), 'snap1')
self.engine.thread_group_mgr.groups[stack1.id].wait()
snapshot_id = snapshot1['id']
self.m.UnsetStubs()
stack2 = self._create_stack()
self.m.ReplayAll()
ex = self.assertRaises(dispatcher.ExpectedException,
self.engine.stack_restore,
self.ctx,
stack2.identifier(),
snapshot_id)
expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) '
'could not be found') % {'snapshot': snapshot_id,
'stack': stack2.name}
self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0])
self.assertIn(expected, six.text_type(ex.exc_info[1]))