Integration scenario test refactoring

- Added new base class for scenario tests with common parameters
initialization in the setUpClass
- Added short description for every test
- Scenario tests structure has become more similar

Change-Id: Icdecb1f16290038f20471991a5f998de2c0a27ea
This commit is contained in:
Anastasia Kuznetsova 2015-02-25 12:50:34 +04:00
parent 8d78c26e2b
commit b2c2bc3fa4
5 changed files with 208 additions and 142 deletions

View File

@ -0,0 +1,49 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heat_integrationtests.common import test
class ScenarioTestsBase(test.HeatIntegrationTest):
"This class define common parameters for scenario tests"
def setUp(self):
super(ScenarioTestsBase, self).setUp()
self.client = self.orchestration_client
self.sub_dir = 'templates'
self.assign_keypair()
self.net = self._get_default_network()
if not self.conf.image_ref:
raise self.skipException("No image configured to test")
if not self.conf.instance_type:
raise self.skipException("No flavor configured to test")
def launch_stack(self, template_name, expected_status='CREATE_COMPLETE',
parameters=None, **kwargs):
template = self._load_template(__file__, template_name, self.sub_dir)
parameters = parameters or {}
if kwargs.get('add_parameters'):
parameters.update(kwargs['add_parameters'])
stack_id = self.stack_create(
stack_name=kwargs.get('stack_name'),
template=template,
files=kwargs.get('files'),
parameters=parameters,
environment=kwargs.get('environment'),
expected_status=expected_status
)
return stack_id

View File

@ -10,59 +10,62 @@
# License for the specific language governing permissions and limitations
# under the License.
from heat_integrationtests.common import test
from heat_integrationtests.scenario import scenario_base
class NeutronAutoscalingTest(test.HeatIntegrationTest):
class NeutronAutoscalingTest(scenario_base.ScenarioTestsBase):
"""
The class is responsible for testing of neutron resources autoscaling.
"""
def setUp(self):
super(NeutronAutoscalingTest, self).setUp()
self.client = self.orchestration_client
if not self.conf.minimal_image_ref:
raise self.skipException("No minimal image configured to test")
if not self.conf.instance_type:
raise self.skipException("No flavor configured to test")
if not self.conf.fixed_subnet_name:
raise self.skipException("No sub-network configured to test")
self.template_name = 'test_neutron_autoscaling.yaml'
def test_neutron_autoscaling(self):
"""
Check autoscaling of load balancer members in heat.
Check autoscaling of load balancer members in Heat.
The alternative scenario is the following:
1. Initialize environment variables.
2. Create a stack with a load balancer.
3. Check that the load balancer created
1. Launch a stack with a load balancer.
2. Check that the load balancer created
one load balancer member for stack.
4. Update stack definition: increase desired capacity of stack.
5. Check that number of members in load balancer was increased.
3. Update stack definition: increase desired capacity of stack.
4. Check that number of members in load balancer was increased.
"""
# Init env variables
env = {'parameters': {"image_id": self.conf.minimal_image_ref,
"capacity": "1",
"instance_type": self.conf.instance_type,
"fixed_subnet_name": self.conf.fixed_subnet_name,
}}
parameters = {
"image_id": self.conf.minimal_image_ref,
"capacity": "1",
"instance_type": self.conf.instance_type,
"fixed_subnet_name": self.conf.fixed_subnet_name,
}
template = self._load_template(__file__,
'test_neutron_autoscaling.yaml',
'templates')
# Create stack
stack_id = self.stack_create(template=template,
environment=env)
# Launch stack
stack_id = self.launch_stack(
template_name=self.template_name,
parameters=parameters
)
# Check number of members
members = self.network_client.list_members()
self.assertEqual(1, len(members["members"]))
# Increase desired capacity and update the stack
env["parameters"]["capacity"] = "2"
self.update_stack(stack_id,
template=template,
environment=env)
template = self._load_template(
__file__, self.template_name, self.sub_dir
)
parameters["capacity"] = "2"
self.update_stack(
stack_id,
template=template,
parameters=parameters
)
# Check number of members
upd_members = self.network_client.list_members()
self.assertEqual(2, len(upd_members["members"]))

View File

@ -14,55 +14,32 @@ import json
import logging
from heat_integrationtests.common import exceptions
from heat_integrationtests.common import test
from heat_integrationtests.scenario import scenario_base
LOG = logging.getLogger(__name__)
class CfnInitIntegrationTest(test.HeatIntegrationTest):
class CfnInitIntegrationTest(scenario_base.ScenarioTestsBase):
"""
The class is responsible for testing cfn-init and cfn-signal workability
"""
def setUp(self):
super(CfnInitIntegrationTest, self).setUp()
if not self.conf.image_ref:
raise self.skipException("No image configured to test")
self.assign_keypair()
self.client = self.orchestration_client
self.template_name = 'test_server_cfn_init.yaml'
self.sub_dir = 'templates'
def launch_stack(self):
net = self._get_default_network()
parameters = {
'key_name': self.keypair_name,
'flavor': self.conf.instance_type,
'image': self.conf.image_ref,
'timeout': self.conf.build_timeout,
'subnet': net['subnets'][0],
}
# create the stack
template = self._load_template(__file__, self.template_name,
self.sub_dir)
return self.stack_create(template=template,
parameters=parameters)
def check_stack(self, sid):
self._wait_for_resource_status(
sid, 'WaitHandle', 'CREATE_COMPLETE')
self._wait_for_resource_status(
sid, 'SmokeSecurityGroup', 'CREATE_COMPLETE')
self._wait_for_resource_status(
sid, 'SmokeKeys', 'CREATE_COMPLETE')
self._wait_for_resource_status(
sid, 'CfnUser', 'CREATE_COMPLETE')
self._wait_for_resource_status(
sid, 'SmokeServer', 'CREATE_COMPLETE')
# Check status of all resources
for res in ('WaitHandle', 'SmokeSecurityGroup', 'SmokeKeys',
'CfnUser', 'SmokeServer'):
self._wait_for_resource_status(
sid, res, 'CREATE_COMPLETE')
server_resource = self.client.resources.get(sid, 'SmokeServer')
server_id = server_resource.physical_resource_id
server = self.compute_client.servers.get(server_id)
server_ip = server.networks[self.conf.network_for_ssh][0]
# Check that created server is reachable
if not self._ping_ip_address(server_ip):
self._log_console_output(servers=[server])
self.fail(
@ -80,6 +57,7 @@ class CfnInitIntegrationTest(test.HeatIntegrationTest):
# logs to be compared
self._log_console_output(servers=[server])
# Check stack status
self._wait_for_stack_status(sid, 'CREATE_COMPLETE')
stack = self.client.stacks.get(sid)
@ -94,9 +72,8 @@ class CfnInitIntegrationTest(test.HeatIntegrationTest):
self._stack_output(stack, 'WaitConditionStatus'))
self.assertEqual('smoke test complete', wait_status['smoke_status'])
# Check that the user can authenticate with the generated keypair
if self.keypair:
# Check that the user can authenticate with the generated
# keypair
try:
linux_client = self.get_remote_client(
server_ip, username='ec2-user')
@ -107,5 +84,30 @@ class CfnInitIntegrationTest(test.HeatIntegrationTest):
raise e
def test_server_cfn_init(self):
sid = self.launch_stack()
self.check_stack(sid)
"""
Check cfn-init and cfn-signal availability on the created server.
The alternative scenario is the following:
1. Create a stack with a server and configured security group.
2. Check that all stack resources were created.
3. Check that created server is reachable.
4. Check that stack was created successfully.
5. Check that is it possible to connect to server
via generated keypair.
"""
parameters = {
"key_name": self.keypair_name,
"flavor": self.conf.instance_type,
"image": self.conf.image_ref,
"timeout": self.conf.build_timeout,
"subnet": self.net["subnets"][0],
}
# Launch stack
stack_id = self.launch_stack(
template_name="test_server_cfn_init.yaml",
parameters=parameters
)
# Check stack
self.check_stack(stack_id)

View File

@ -13,7 +13,7 @@
import six
from heat_integrationtests.common import exceptions
from heat_integrationtests.common import test
from heat_integrationtests.scenario import scenario_base
CFG1_SH = '''#!/bin/sh
echo "Writing to /tmp/$bar"
@ -39,46 +39,17 @@ $::deploy_server_id during $::deploy_action",
}'''
class SoftwareConfigIntegrationTest(test.HeatIntegrationTest):
class SoftwareConfigIntegrationTest(scenario_base.ScenarioTestsBase):
def setUp(self):
super(SoftwareConfigIntegrationTest, self).setUp()
if self.conf.skip_software_config_tests:
self.skipTest('Testing software config disabled in conf, '
'skipping')
self.client = self.orchestration_client
self.template_name = 'test_server_software_config.yaml'
self.sub_dir = 'templates'
self.stack_name = self._stack_rand_name()
self.maxDiff = None
def launch_stack(self):
net = self._get_default_network()
self.parameters = {
'key_name': self.keypair_name,
'flavor': self.conf.instance_type,
'image': self.conf.image_ref,
'network': net['id']
}
# create the stack
self.template = self._load_template(__file__, self.template_name,
self.sub_dir)
self.stack_create(
stack_name=self.stack_name,
template=self.template,
parameters=self.parameters,
files={
'cfg1.sh': CFG1_SH,
'cfg3.pp': CFG3_PP
},
expected_status=None)
self.stack = self.client.stacks.get(self.stack_name)
self.stack_identifier = '%s/%s' % (self.stack_name, self.stack.id)
def check_stack(self):
sid = self.stack_identifier
def check_stack(self, sid):
# Check that all stack resources were created
for res in ('cfg2a', 'cfg2b', 'cfg1', 'cfg3', 'server'):
self._wait_for_resource_status(
sid, res, 'CREATE_COMPLETE')
@ -87,14 +58,15 @@ class SoftwareConfigIntegrationTest(test.HeatIntegrationTest):
server_id = server_resource.physical_resource_id
server = self.compute_client.servers.get(server_id)
# Waiting for each deployment to contribute their
# config to resource
try:
# wait for each deployment to contribute their
# config to resource
for res in ('dep2b', 'dep1', 'dep3'):
self._wait_for_resource_status(
sid, res, 'CREATE_IN_PROGRESS')
server_metadata = self.client.resources.metadata(sid, 'server')
server_metadata = self.client.resources.metadata(
sid, 'server')
deployments = dict((d['name'], d) for d in
server_metadata['deployments'])
@ -106,11 +78,13 @@ class SoftwareConfigIntegrationTest(test.HeatIntegrationTest):
self._log_console_output(servers=[server])
raise e
# Check that stack was fully created
self._wait_for_stack_status(sid, 'CREATE_COMPLETE')
complete_server_metadata = self.client.resources.metadata(
sid, 'server')
# ensure any previously available deployments haven't changed so
# Ensure any previously available deployments haven't changed so
# config isn't re-triggered
complete_deployments = dict((d['name'], d) for d in
complete_server_metadata['deployments'])
@ -153,6 +127,39 @@ class SoftwareConfigIntegrationTest(test.HeatIntegrationTest):
self.assertNotEqual(dep1_dep.updated_time, dep1_dep.creation_time)
def test_server_software_config(self):
self.assign_keypair()
self.launch_stack()
self.check_stack()
"""
Check that passed files with scripts are executed on created server.
The alternative scenario is the following:
1. Create a stack and pass files with scripts.
2. Check that all stack resources are created successfully.
3. Wait for all deployments.
4. Check that stack was created.
5. Check stack outputs.
"""
parameters = {
'key_name': self.keypair_name,
'flavor': self.conf.instance_type,
'image': self.conf.image_ref,
'network': self.net['id']
}
files = {
'cfg1.sh': CFG1_SH,
'cfg3.pp': CFG3_PP
}
# Launch stack
stack_id = self.launch_stack(
stack_name=self.stack_name,
template_name='test_server_software_config.yaml',
parameters=parameters,
files=files,
expected_status=None
)
self.stack_identifier = '%s/%s' % (self.stack_name, stack_id)
# Check stack
self.check_stack(self.stack_identifier)

View File

@ -17,17 +17,18 @@ import six
from testtools import testcase
from heat_integrationtests.common import exceptions
from heat_integrationtests.common import test
from heat_integrationtests.scenario import scenario_base
LOG = logging.getLogger(__name__)
class VolumeBackupRestoreIntegrationTest(test.HeatIntegrationTest):
class VolumeBackupRestoreIntegrationTest(scenario_base.ScenarioTestsBase):
"""
Class is responsible for testing of volume backup.
"""
def setUp(self):
super(VolumeBackupRestoreIntegrationTest, self).setUp()
self.client = self.orchestration_client
self.assign_keypair()
self.volume_description = 'A test volume description 123'
self.volume_size = self.conf.volume_size
@ -48,37 +49,8 @@ class VolumeBackupRestoreIntegrationTest(test.HeatIntegrationTest):
self.assertEqual(self.volume_description,
self._stack_output(stack, 'display_description'))
def launch_stack(self, template_name, add_parameters={}):
net = self._get_default_network()
template = self._load_template(__file__, template_name, 'templates')
parameters = {'key_name': self.keypair_name,
'instance_type': self.conf.instance_type,
'image_id': self.conf.minimal_image_ref,
'volume_description': self.volume_description,
'timeout': self.conf.build_timeout,
'network': net['id']}
parameters.update(add_parameters)
return self.stack_create(template=template,
parameters=parameters)
@testcase.skip('Skipped until failure rate '
'can be reduced ref bug #1382300')
def test_cinder_volume_create_backup_restore(self):
"""Ensure the 'Snapshot' deletion policy works.
This requires a more complex test, but it tests several aspects
of the heat cinder resources:
1. Create a volume, attach it to an instance, write some data to it
2. Delete the stack, with 'Snapshot' specified, creates a backup
3. Check the snapshot has created a volume backup
4. Create a new stack, where the volume is created from the backup
5. Verify the test data written in (1) is present in the new volume
"""
stack_identifier = self.launch_stack(
template_name='test_volumes_delete_snapshot.yaml',
add_parameters={'volume_size': self.volume_size})
stack = self.client.stacks.get(stack_identifier)
def check_stack(self, stack_id):
stack = self.client.stacks.get(stack_id)
# Verify with cinder that the volume exists, with matching details
volume_id = self._stack_output(stack, 'volume_id')
@ -89,8 +61,8 @@ class VolumeBackupRestoreIntegrationTest(test.HeatIntegrationTest):
# Delete the stack and ensure a backup is created for volume_id
# but the volume itself is gone
self.client.stacks.delete(stack_identifier)
self._wait_for_stack_status(stack_identifier, 'DELETE_COMPLETE')
self.client.stacks.delete(stack_id)
self._wait_for_stack_status(stack_id, 'DELETE_COMPLETE')
self.assertRaises(cinder_exceptions.NotFound,
self.volume_client.volumes.get,
volume_id)
@ -130,3 +102,36 @@ class VolumeBackupRestoreIntegrationTest(test.HeatIntegrationTest):
self.assertRaises(cinder_exceptions.NotFound,
self.volume_client.volumes.get,
volume_id2)
@testcase.skip('Skipped until failure rate '
'can be reduced ref bug #1382300')
def test_cinder_volume_create_backup_restore(self):
"""
Ensure the 'Snapshot' deletion policy works.
This requires a more complex test, but it tests several aspects
of the heat cinder resources:
1. Create a volume, attach it to an instance, write some data to it
2. Delete the stack, with 'Snapshot' specified, creates a backup
3. Check the snapshot has created a volume backup
4. Create a new stack, where the volume is created from the backup
5. Verify the test data written in (1) is present in the new volume
"""
parameters = {
'key_name': self.keypair_name,
'instance_type': self.conf.instance_type,
'image_id': self.conf.minimal_image_ref,
'volume_description': self.volume_description,
'timeout': self.conf.build_timeout,
'network': self.net['id']
}
# Launch stack
stack_id = self.launch_stack(
template_name='test_volumes_delete_snapshot.yaml',
parameters=parameters,
add_parameters={'volume_size': self.volume_size}
)
# Check stack
self.check_stack(stack_id)