Browse Source

Remove integration and configuration tests

This patch propose to use heat tempest plugin for Integration tests
(with no co-gating potential) and configuration tests.
Also we will remove test_autoscaling_lb from tempest plugin as well,
since it's no longer been used.
Remove senario base, since we now move all senario tests to plugin.

Change-Id: Ice6a0e1fe8ce2e1de5253c674d11949b0f8a6e31
changes/00/529800/17
ricolin 4 years ago committed by Zane Bitter
parent
commit
aed1e6f368
  1. 30
      heat_integrationtests/common/config.py
  2. 202
      heat_integrationtests/common/remote_client.py
  3. 28
      heat_integrationtests/common/test.py
  4. 101
      heat_integrationtests/functional/test_create_update_neutron_port.py
  5. 127
      heat_integrationtests/functional/test_create_update_neutron_subnet.py
  6. 275
      heat_integrationtests/functional/test_create_update_neutron_trunk.py
  7. 65
      heat_integrationtests/functional/test_encrypted_parameter.py
  8. 87
      heat_integrationtests/functional/test_encryption_vol_type.py
  9. 149
      heat_integrationtests/functional/test_nova_server_networks.py
  10. 107
      heat_integrationtests/functional/test_os_wait_condition.py
  11. 144
      heat_integrationtests/functional/test_remote_stack.py
  12. 11
      heat_integrationtests/prepare_test_env.sh
  13. 0
      heat_integrationtests/scenario/__init__.py
  14. 63
      heat_integrationtests/scenario/scenario_base.py
  15. 65
      heat_integrationtests/scenario/templates/app_server_neutron.yaml
  16. 5
      heat_integrationtests/scenario/templates/boot_config_none_env.yaml
  17. 35
      heat_integrationtests/scenario/templates/netcat-webapp.yaml
  18. 113
      heat_integrationtests/scenario/templates/test_autoscaling_lb_neutron.yaml
  19. 97
      heat_integrationtests/scenario/templates/test_server_cfn_init.yaml
  20. 107
      heat_integrationtests/scenario/templates/test_server_signal.yaml
  21. 173
      heat_integrationtests/scenario/templates/test_server_software_config.yaml
  22. 118
      heat_integrationtests/scenario/templates/test_volumes_create_from_backup.yaml
  23. 124
      heat_integrationtests/scenario/templates/test_volumes_delete_snapshot.yaml
  24. 110
      heat_integrationtests/scenario/test_autoscaling_lb.py
  25. 122
      heat_integrationtests/scenario/test_server_cfn_init.py
  26. 85
      heat_integrationtests/scenario/test_server_signal.py
  27. 171
      heat_integrationtests/scenario/test_server_software_config.py
  28. 129
      heat_integrationtests/scenario/test_volumes.py
  29. 1
      test-requirements.txt

30
heat_integrationtests/common/config.py

@ -96,32 +96,9 @@ HeatGroup = [
cfg.StrOpt('floating_network_name',
default='public',
help="Visible floating network name "),
cfg.StrOpt('boot_config_env',
default=('heat_integrationtests/scenario/templates'
'/boot_config_none_env.yaml'),
help="Path to environment file which defines the "
"resource type Heat::InstallConfigAgent. Needs to "
"be appropriate for the image_ref."),
cfg.StrOpt('fixed_subnet_name',
default='heat-subnet',
help="Visible fixed sub-network name "),
cfg.IntOpt('ssh_timeout',
default=300,
help="Timeout in seconds to wait for authentication to "
"succeed."),
cfg.IntOpt('ip_version_for_ssh',
default=4,
help="IP version used for SSH connections."),
cfg.IntOpt('ssh_channel_timeout',
default=60,
help="Timeout in seconds to wait for output from ssh "
"channel."),
cfg.IntOpt('tenant_network_mask_bits',
default=28,
help="The mask bits for tenant ipv4 subnets"),
cfg.BoolOpt('skip_scenario_tests',
default=False,
help="Skip all scenario tests"),
cfg.BoolOpt('skip_functional_tests',
default=False,
help="Skip all functional tests"),
@ -129,10 +106,6 @@ HeatGroup = [
help="List of functional test class or class.method "
"names to skip ex. AutoscalingGroupTest, "
"InstanceGroupBasicTest.test_size_updates_work"),
cfg.ListOpt('skip_scenario_test_list',
help="List of scenario test class or class.method "
"names to skip ex. NeutronLoadBalancerTest, "
"AodhAlarmTest.test_alarm"),
cfg.ListOpt('skip_test_stack_action_list',
help="List of stack actions in tests to skip "
"ex. ABANDON, ADOPT, SUSPEND, RESUME"),
@ -140,9 +113,6 @@ HeatGroup = [
default=True,
help="Test features that are only present for stacks with "
"convergence enabled."),
cfg.IntOpt('volume_size',
default=1,
help='Default size in GB for volumes created by volumes tests'),
cfg.IntOpt('connectivity_timeout',
default=120,
help="Timeout in seconds to wait for connectivity to "

202
heat_integrationtests/common/remote_client.py

@ -1,202 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import select
import socket
import time
from oslo_log import log as logging
import paramiko
import six
from heat_integrationtests.common import exceptions
LOG = logging.getLogger(__name__)
class Client(object):
def __init__(self, host, username, password=None, timeout=300, pkey=None,
channel_timeout=10, look_for_keys=False, key_filename=None):
self.host = host
self.username = username
self.password = password
if isinstance(pkey, six.string_types):
pkey = paramiko.RSAKey.from_private_key(
six.moves.cStringIO(str(pkey)))
self.pkey = pkey
self.look_for_keys = look_for_keys
self.key_filename = key_filename
self.timeout = int(timeout)
self.channel_timeout = float(channel_timeout)
self.buf_size = 1024
def _get_ssh_connection(self, sleep=1.5, backoff=1):
"""Returns an ssh connection to the specified host."""
bsleep = sleep
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(
paramiko.AutoAddPolicy())
_start_time = time.time()
if self.pkey is not None:
LOG.info("Creating ssh connection to '%s' as '%s'"
" with public key authentication",
self.host, self.username)
else:
LOG.info("Creating ssh connection to '%s' as '%s'"
" with password %s",
self.host, self.username, str(self.password))
attempts = 0
while True:
try:
ssh.connect(self.host, username=self.username,
password=self.password,
look_for_keys=self.look_for_keys,
key_filename=self.key_filename,
timeout=self.channel_timeout, pkey=self.pkey)
LOG.info("ssh connection to %s@%s successfuly created",
self.username, self.host)
return ssh
except (socket.error,
paramiko.SSHException) as e:
if self._is_timed_out(_start_time):
LOG.exception("Failed to establish authenticated ssh"
" connection to %s@%s after %d attempts",
self.username, self.host, attempts)
raise exceptions.SSHTimeout(host=self.host,
user=self.username,
password=self.password)
bsleep += backoff
attempts += 1
LOG.warning("Failed to establish authenticated ssh"
" connection to %s@%s (%s). Number attempts: %s."
" Retry after %d seconds.",
self.username, self.host, e, attempts, bsleep)
time.sleep(bsleep)
def _is_timed_out(self, start_time):
return (time.time() - self.timeout) > start_time
def exec_command(self, cmd):
"""Execute the specified command on the server.
Note that this method is reading whole command outputs to memory, thus
shouldn't be used for large outputs.
:returns: data read from standard output of the command.
:raises: SSHExecCommandFailed if command returns nonzero
status. The exception contains command status stderr content.
"""
ssh = self._get_ssh_connection()
transport = ssh.get_transport()
channel = transport.open_session()
channel.fileno() # Register event pipe
channel.exec_command(cmd)
channel.shutdown_write()
out_data = []
err_data = []
poll = select.poll()
poll.register(channel, select.POLLIN)
start_time = time.time()
while True:
ready = poll.poll(self.channel_timeout)
if not any(ready):
if not self._is_timed_out(start_time):
continue
raise exceptions.TimeoutException(
"Command: '{0}' executed on host '{1}'.".format(
cmd, self.host))
if not ready[0]: # If there is nothing to read.
continue
out_chunk = err_chunk = None
if channel.recv_ready():
out_chunk = channel.recv(self.buf_size)
out_data += out_chunk,
if channel.recv_stderr_ready():
err_chunk = channel.recv_stderr(self.buf_size)
err_data += err_chunk,
if channel.closed and not err_chunk and not out_chunk:
break
exit_status = channel.recv_exit_status()
if 0 != exit_status:
raise exceptions.SSHExecCommandFailed(
command=cmd, exit_status=exit_status,
strerror=''.join(err_data))
return ''.join(out_data)
def test_connection_auth(self):
"""Raises an exception when we can not connect to server via ssh."""
connection = self._get_ssh_connection()
connection.close()
class RemoteClient(object):
# NOTE(afazekas): It should always get an address instead of server
def __init__(self, server, username, password=None, pkey=None,
conf=None):
self.conf = conf
ssh_timeout = self.conf.ssh_timeout
network = self.conf.network_for_ssh
ip_version = self.conf.ip_version_for_ssh
ssh_channel_timeout = self.conf.ssh_channel_timeout
if isinstance(server, six.string_types):
ip_address = server
else:
addresses = server['addresses'][network]
for address in addresses:
if address['version'] == ip_version:
ip_address = address['addr']
break
else:
raise exceptions.ServerUnreachable()
self.ssh_client = Client(ip_address, username, password,
ssh_timeout, pkey=pkey,
channel_timeout=ssh_channel_timeout)
def exec_command(self, cmd):
return self.ssh_client.exec_command(cmd)
def validate_authentication(self):
"""Validate ssh connection and authentication.
This method raises an Exception when the validation fails.
"""
self.ssh_client.test_connection_auth()
def get_partitions(self):
# Return the contents of /proc/partitions
command = 'cat /proc/partitions'
output = self.exec_command(command)
return output
def get_boot_time(self):
cmd = 'cut -f1 -d. /proc/uptime'
boot_secs = self.exec_command(cmd)
boot_time = time.time() - int(boot_secs)
return time.localtime(boot_time)
def write_to_console(self, message):
message = re.sub("([$\\`])", "\\\\\\\\\\1", message)
# usually to /dev/ttyS0
cmd = 'sudo sh -c "echo \\"%s\\" >/dev/console"' % message
return self.exec_command(cmd)
def ping_host(self, host):
cmd = 'ping -c1 -w1 %s' % host
return self.exec_command(cmd)
def get_ip_list(self):
cmd = "/bin/ip address"
return self.exec_command(cmd)

28
heat_integrationtests/common/test.py

@ -19,7 +19,6 @@ import time
import fixtures
from heatclient import exc as heat_exceptions
from keystoneauth1 import exceptions as kc_exceptions
from neutronclient.common import exceptions as network_exceptions
from oslo_log import log as logging
from oslo_utils import timeutils
import six
@ -30,7 +29,6 @@ import testtools
from heat_integrationtests.common import clients
from heat_integrationtests.common import config
from heat_integrationtests.common import exceptions
from heat_integrationtests.common import remote_client
LOG = logging.getLogger(__name__)
_LOG_FORMAT = "%(levelname)8s [%(name)s] %(message)s"
@ -116,25 +114,6 @@ class HeatIntegrationTest(testscenarios.WithScenarios,
def setup_clients_for_admin(self):
self.setup_clients(self.conf, True)
def get_remote_client(self, server_or_ip, username, private_key=None):
if isinstance(server_or_ip, six.string_types):
ip = server_or_ip
else:
network_name_for_ssh = self.conf.network_for_ssh
ip = server_or_ip.networks[network_name_for_ssh][0]
if private_key is None:
private_key = self.keypair.private_key
linux_client = remote_client.RemoteClient(ip, username,
pkey=private_key,
conf=self.conf)
try:
linux_client.validate_authentication()
except exceptions.SSHTimeout:
LOG.exception('ssh connection to %s failed', ip)
raise
return linux_client
def check_connectivity(self, check_ip):
def try_connect(ip):
try:
@ -199,13 +178,6 @@ class HeatIntegrationTest(testscenarios.WithScenarios,
if net['name'] == net_name:
return net
def is_network_extension_supported(self, extension_alias):
try:
self.network_client.show_extension(extension_alias)
except network_exceptions.NeutronClientException:
return False
return True
def is_service_available(self, service_type):
try:
self.identity_client.get_endpoint_url(

101
heat_integrationtests/functional/test_create_update_neutron_port.py

@ -1,101 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heat_integrationtests.functional import functional_base
test_template = '''
heat_template_version: 2015-04-30
description: Test template to create port wit ip_address.
parameters:
mac:
type: string
default: 00-00-00-00-BB-BB
resources:
net:
type: OS::Neutron::Net
subnet:
type: OS::Neutron::Subnet
properties:
enable_dhcp: false
network: { get_resource: net }
cidr: 11.11.11.0/24
port:
type: OS::Neutron::Port
properties:
network: {get_resource: net}
mac_address: {get_param: mac}
fixed_ips:
- subnet: {get_resource: subnet}
ip_address: 11.11.11.11
test:
depends_on: port
type: OS::Heat::TestResource
properties:
value: Test1
fail: False
outputs:
port_ip:
value: {get_attr: [port, fixed_ips, 0, ip_address]}
mac_address:
value: {get_attr: [port, mac_address]}
'''
class UpdatePortTest(functional_base.FunctionalTestsBase):
def get_port_id_and_outputs(self, stack_identifier):
resources = self.client.resources.list(stack_identifier)
port_id = [res.physical_resource_id for res in resources
if res.resource_name == 'port']
stack = self.client.stacks.get(stack_identifier)
port_ip = self._stack_output(stack, 'port_ip')
port_mac = self._stack_output(stack, 'mac_address')
return port_id[0], port_ip, port_mac
def test_update_remove_ip(self):
# create with defined ip_address
stack_identifier = self.stack_create(template=test_template)
_id, _ip, _mac = self.get_port_id_and_outputs(stack_identifier)
# remove ip_address property and update stack
templ_no_ip = test_template.replace('ip_address: 11.11.11.11', '')
self.update_stack(stack_identifier, templ_no_ip)
new_id, new_ip, new_mac = self.get_port_id_and_outputs(
stack_identifier)
# port should be updated with the same id
self.assertEqual(_id, new_id)
self.assertEqual(_mac, new_mac)
def test_update_with_mac_address(self):
if not self.conf.admin_username or not self.conf.admin_password:
self.skipTest('No admin creds found, skipping')
# Setup admin clients for updating mac_address
self.setup_clients_for_admin()
# Create with default mac_address and defined ip_address
stack_identifier = self.stack_create(template=test_template)
_id, _ip, _mac = self.get_port_id_and_outputs(stack_identifier)
# Update with another 'mac' parameter
parameters = {'mac': '00-00-00-00-AA-AA'}
self.update_stack(stack_identifier, test_template,
parameters=parameters)
new_id, new_ip, new_mac = self.get_port_id_and_outputs(
stack_identifier)
# mac_address should be different
self.assertEqual(_id, new_id)
self.assertEqual(_ip, new_ip)
self.assertNotEqual(_mac, new_mac)

127
heat_integrationtests/functional/test_create_update_neutron_subnet.py

@ -1,127 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heat_integrationtests.functional import functional_base
test_template = '''
heat_template_version: 2015-04-30
description: Test template to create/update subnet with allocation_pools.
resources:
net:
type: OS::Neutron::Net
subnet:
type: OS::Neutron::Subnet
properties:
network: { get_resource: net }
cidr: 11.11.11.0/24
gateway_ip: 11.11.11.5
allocation_pools: [{start: 11.11.11.10, end: 11.11.11.250}]
outputs:
alloc_pools:
value: {get_attr: [subnet, allocation_pools]}
gateway_ip:
value: {get_attr: [subnet, gateway_ip]}
'''
class UpdateSubnetTest(functional_base.FunctionalTestsBase):
def get_outputs(self, stack_identifier, output_key):
stack = self.client.stacks.get(stack_identifier)
output = self._stack_output(stack, output_key)
return output
def test_update_allocation_pools(self):
stack_identifier = self.stack_create(template=test_template)
alloc_pools = self.get_outputs(stack_identifier, 'alloc_pools')
self.assertEqual([{'start': '11.11.11.10', 'end': '11.11.11.250'}],
alloc_pools)
# Update allocation_pools with a new range
templ_other_pool = test_template.replace(
'allocation_pools: [{start: 11.11.11.10, end: 11.11.11.250}]',
'allocation_pools: [{start: 11.11.11.10, end: 11.11.11.100}]')
self.update_stack(stack_identifier, templ_other_pool)
new_alloc_pools = self.get_outputs(stack_identifier, 'alloc_pools')
# the new pools should be the new range
self.assertEqual([{'start': '11.11.11.10', 'end': '11.11.11.100'}],
new_alloc_pools)
def test_update_allocation_pools_to_empty(self):
stack_identifier = self.stack_create(template=test_template)
alloc_pools = self.get_outputs(stack_identifier, 'alloc_pools')
self.assertEqual([{'start': '11.11.11.10', 'end': '11.11.11.250'}],
alloc_pools)
# Update allocation_pools with []
templ_empty_pools = test_template.replace(
'allocation_pools: [{start: 11.11.11.10, end: 11.11.11.250}]',
'allocation_pools: []')
self.update_stack(stack_identifier, templ_empty_pools)
new_alloc_pools = self.get_outputs(stack_identifier, 'alloc_pools')
# new_alloc_pools should be []
self.assertEqual([], new_alloc_pools)
def test_update_to_no_allocation_pools(self):
stack_identifier = self.stack_create(template=test_template)
alloc_pools = self.get_outputs(stack_identifier, 'alloc_pools')
self.assertEqual([{'start': '11.11.11.10', 'end': '11.11.11.250'}],
alloc_pools)
# Remove the allocation_pools from template
templ_no_pools = test_template.replace(
'allocation_pools: [{start: 11.11.11.10, end: 11.11.11.250}]',
'')
self.update_stack(stack_identifier, templ_no_pools)
last_alloc_pools = self.get_outputs(stack_identifier, 'alloc_pools')
# last_alloc_pools should be []
self.assertEqual([], last_alloc_pools)
def test_update_gateway_ip(self):
stack_identifier = self.stack_create(template=test_template)
gw_ip = self.get_outputs(stack_identifier, 'gateway_ip')
self.assertEqual('11.11.11.5', gw_ip)
# Update gateway_ip
templ_other_gw_ip = test_template.replace(
'gateway_ip: 11.11.11.5', 'gateway_ip: 11.11.11.9')
self.update_stack(stack_identifier, templ_other_gw_ip)
new_gw_ip = self.get_outputs(stack_identifier, 'gateway_ip')
# the gateway_ip should be the new one
self.assertEqual('11.11.11.9', new_gw_ip)
def test_update_gateway_ip_to_empty(self):
stack_identifier = self.stack_create(template=test_template)
gw_ip = self.get_outputs(stack_identifier, 'gateway_ip')
self.assertEqual('11.11.11.5', gw_ip)
# Update gateway_ip to null(resolve to '')
templ_empty_gw_ip = test_template.replace(
'gateway_ip: 11.11.11.5', 'gateway_ip: null')
self.update_stack(stack_identifier, templ_empty_gw_ip)
new_gw_ip = self.get_outputs(stack_identifier, 'gateway_ip')
# new gateway_ip should be None
self.assertIsNone(new_gw_ip)
def test_update_to_no_gateway_ip(self):
stack_identifier = self.stack_create(template=test_template)
gw_ip = self.get_outputs(stack_identifier, 'gateway_ip')
self.assertEqual('11.11.11.5', gw_ip)
# Remove the gateway from template
templ_no_gw_ip = test_template.replace(
'gateway_ip: 11.11.11.5', '')
self.update_stack(stack_identifier, templ_no_gw_ip)
new_gw_ip = self.get_outputs(stack_identifier, 'gateway_ip')
# new gateway_ip should be None
self.assertIsNone(new_gw_ip)

275
heat_integrationtests/functional/test_create_update_neutron_trunk.py

@ -1,275 +0,0 @@
# Copyright (c) 2017 Ericsson.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import yaml
from heat_integrationtests.functional import functional_base
test_template = '''
heat_template_version: pike
description: Test template to create, update, delete trunk.
resources:
parent_net:
type: OS::Neutron::Net
trunk_net_one:
type: OS::Neutron::Net
trunk_net_two:
type: OS::Neutron::Net
parent_subnet:
type: OS::Neutron::Subnet
properties:
network: { get_resource: parent_net }
cidr: 10.0.0.0/16
trunk_subnet_one:
type: OS::Neutron::Subnet
properties:
network: { get_resource: trunk_net_one }
cidr: 10.10.0.0/16
trunk_subnet_two:
type: OS::Neutron::Subnet
properties:
network: { get_resource: trunk_net_two }
cidr: 10.20.0.0/16
parent_port:
type: OS::Neutron::Port
properties:
network: { get_resource: parent_net }
name: trunk_parent_port
sub_port_one:
type: OS::Neutron::Port
properties:
network: { get_resource: trunk_net_one }
name: trunk_sub_port_one
sub_port_two:
type: OS::Neutron::Port
properties:
network: { get_resource: trunk_net_two }
name: trunk_sub_port_two
trunk:
type: OS::Neutron::Trunk
properties:
name: test_trunk
port: { get_resource: parent_port }
sub_ports:
outputs:
trunk_parent_port:
value: { get_attr: [trunk, port_id] }
'''
class UpdateTrunkTest(functional_base.FunctionalTestsBase):
@staticmethod
def _sub_ports_dict_to_set(sub_ports):
new_sub_ports = copy.deepcopy(sub_ports)
# NOTE(lajos katona): In the template we have to give the sub port as
# port, but from trunk_details we receive back them with port_id.
# As an extra trunk_details contains the mac_address as well which is
# useless here.
# So here we have to make sure that the dictionary (input from
# template or output from trunk_details) have the same keys:
if any('mac_address' in d for d in new_sub_ports):
for sp in new_sub_ports:
sp['port'] = sp['port_id']
del sp['port_id']
del sp['mac_address']
# NOTE(lajos katona): We receive lists (trunk_details['sub_ports'] and
# the input to the template) and we can't be sure that the order is the
# same, so by using sets we can compare them.
sub_ports_set = {frozenset(d.items()) for d in new_sub_ports}
return sub_ports_set
def test_add_first_sub_port(self):
stack_identifier = self.stack_create(template=test_template)
parsed_template = yaml.safe_load(test_template)
new_sub_port = [{'port': {'get_resource': 'sub_port_one'},
'segmentation_id': 10,
'segmentation_type': 'vlan'}]
parsed_template['resources']['trunk']['properties'][
'sub_ports'] = new_sub_port
updated_template = yaml.safe_dump(parsed_template)
self.update_stack(stack_identifier, updated_template)
# Fix the port_id in the template for assertion
new_sub_port[0]['port'] = self.get_physical_resource_id(
stack_identifier, 'sub_port_one')
parent_id = self.get_stack_output(
stack_identifier, 'trunk_parent_port')
parent_port = self.network_client.show_port(parent_id)['port']
trunk_sub_port = parent_port['trunk_details']['sub_ports']
self.assertEqual(self._sub_ports_dict_to_set(new_sub_port),
self._sub_ports_dict_to_set(trunk_sub_port))
def test_add_a_second_sub_port(self):
parsed_template = yaml.safe_load(test_template)
sub_ports = [{'port': {'get_resource': 'sub_port_one'},
'segmentation_type': 'vlan',
'segmentation_id': 10}, ]
parsed_template['resources']['trunk']['properties'][
'sub_ports'] = sub_ports
template_with_sub_ports = yaml.safe_dump(parsed_template)
stack_identifier = self.stack_create(template=template_with_sub_ports)
new_sub_port = {'port': {'get_resource': 'sub_port_two'},
'segmentation_id': 20,
'segmentation_type': 'vlan'}
parsed_template['resources']['trunk']['properties'][
'sub_ports'].append(new_sub_port)
updated_template = yaml.safe_dump(parsed_template)
self.update_stack(stack_identifier, updated_template)
# Fix the port_ids in the templates for assertion
sub_ports[0]['port'] = self.get_physical_resource_id(
stack_identifier, 'sub_port_one')
new_sub_port['port'] = self.get_physical_resource_id(
stack_identifier, 'sub_port_two')
expected_sub_ports = [sub_ports[0], new_sub_port]
parent_id = self.get_stack_output(
stack_identifier, 'trunk_parent_port')
parent_port = self.network_client.show_port(parent_id)['port']
trunk_sub_ports = parent_port['trunk_details']['sub_ports']
self.assertEqual(self._sub_ports_dict_to_set(expected_sub_ports),
self._sub_ports_dict_to_set(trunk_sub_ports))
def test_remove_sub_port_from_trunk(self):
sub_ports = [{'port': {'get_resource': 'sub_port_one'},
'segmentation_type': 'vlan',
'segmentation_id': 10},
{'port': {'get_resource': 'sub_port_two'},
'segmentation_type': 'vlan',
'segmentation_id': 20}]
parsed_template = yaml.safe_load(test_template)
parsed_template['resources']['trunk']['properties'][
'sub_ports'] = sub_ports
template_with_sub_ports = yaml.safe_dump(parsed_template)
stack_identifier = self.stack_create(template=template_with_sub_ports)
sub_port_to_be_removed = {'port': {'get_resource': 'sub_port_two'},
'segmentation_type': 'vlan',
'segmentation_id': 20}
parsed_template['resources']['trunk'][
'properties']['sub_ports'].remove(sub_port_to_be_removed)
updated_template = yaml.safe_dump(parsed_template)
self.update_stack(stack_identifier, updated_template)
# Fix the port_ids in the templates for assertion
sub_ports[0]['port'] = self.get_physical_resource_id(
stack_identifier, 'sub_port_one')
expected_sub_ports = [sub_ports[0]]
parent_id = self.get_stack_output(
stack_identifier, 'trunk_parent_port')
parent_port = self.network_client.show_port(parent_id)['port']
trunk_sub_ports = parent_port['trunk_details']['sub_ports']
self.assertEqual(self._sub_ports_dict_to_set(expected_sub_ports),
self._sub_ports_dict_to_set(trunk_sub_ports))
def test_remove_last_sub_port_from_trunk(self):
sub_ports = [{'port': {'get_resource': 'sub_port_one'},
'segmentation_type': 'vlan',
'segmentation_id': 10}]
parsed_template = yaml.safe_load(test_template)
parsed_template['resources']['trunk']['properties'][
'sub_ports'] = sub_ports
template_with_sub_ports = yaml.safe_dump(parsed_template)
stack_identifier = self.stack_create(template=template_with_sub_ports)
sub_port_to_be_removed = {'port': {'get_resource': 'sub_port_one'},
'segmentation_type': 'vlan',
'segmentation_id': 10}
parsed_template['resources']['trunk'][
'properties']['sub_ports'] = []
updated_template = yaml.safe_dump(parsed_template)
self.update_stack(stack_identifier, updated_template)
sub_port_to_be_removed['port'] = self.get_physical_resource_id(
stack_identifier, 'sub_port_one')
parent_id = self.get_stack_output(
stack_identifier, 'trunk_parent_port')
parent_port = self.network_client.show_port(parent_id)['port']
trunk_sub_ports = parent_port['trunk_details']['sub_ports']
self.assertNotEqual(
self._sub_ports_dict_to_set([sub_port_to_be_removed]),
self._sub_ports_dict_to_set(trunk_sub_ports))
self.assertFalse(trunk_sub_ports,
'The returned sub ports (%s) in trunk_details is '
'not empty!' % trunk_sub_ports)
def test_update_existing_sub_port_on_trunk(self):
sub_ports = [{'port': {'get_resource': 'sub_port_one'},
'segmentation_type': 'vlan',
'segmentation_id': 10}]
parsed_template = yaml.safe_load(test_template)
parsed_template['resources']['trunk']['properties'][
'sub_ports'] = sub_ports
template_with_sub_ports = yaml.safe_dump(parsed_template)
stack_identifier = self.stack_create(template=template_with_sub_ports)
sub_port_id = self.get_physical_resource_id(
stack_identifier, 'sub_port_one')
parsed_template['resources']['trunk']['properties']['sub_ports'][0][
'segmentation_id'] = 99
updated_template = yaml.safe_dump(parsed_template)
self.update_stack(stack_identifier, updated_template)
updated_sub_port = {'port': sub_port_id,
'segmentation_type': 'vlan',
'segmentation_id': 99}
parent_id = self.get_stack_output(
stack_identifier, 'trunk_parent_port')
parent_port = self.network_client.show_port(parent_id)['port']
trunk_sub_ports = parent_port['trunk_details']['sub_ports']
self.assertEqual(self._sub_ports_dict_to_set([updated_sub_port]),
self._sub_ports_dict_to_set(trunk_sub_ports))
def test_update_trunk_name_and_description(self):
new_name = 'pineapple'
new_description = 'This is a test trunk'
stack_identifier = self.stack_create(template=test_template)
parsed_template = yaml.safe_load(test_template)
parsed_template['resources']['trunk']['properties']['name'] = new_name
parsed_template['resources']['trunk']['properties'][
'description'] = new_description
updated_template = yaml.safe_dump(parsed_template)
self.update_stack(stack_identifier, template=updated_template)
parent_id = self.get_stack_output(
stack_identifier, 'trunk_parent_port')
parent_port = self.network_client.show_port(parent_id)['port']
trunk_id = parent_port['trunk_details']['trunk_id']
trunk = self.network_client.show_trunk(trunk_id)['trunk']
self.assertEqual(new_name, trunk['name'])
self.assertEqual(new_description, trunk['description'])

65
heat_integrationtests/functional/test_encrypted_parameter.py

@ -1,65 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heat_integrationtests.functional import functional_base
class EncryptedParametersTest(functional_base.FunctionalTestsBase):
template = '''
heat_template_version: 2014-10-16
parameters:
image:
type: string
flavor:
type: string
network:
type: string
foo:
type: string
description: 'parameter with encryption turned on'
hidden: true
default: secret
resources:
server_with_encrypted_property:
type: OS::Nova::Server
properties:
name: { get_param: foo }
image: { get_param: image }
flavor: { get_param: flavor }
networks: [{network: {get_param: network} }]
outputs:
encrypted_foo_param:
description: 'encrypted param'
value: { get_param: foo }
'''
def test_db_encryption(self):
# Create a stack with the value of 'foo' to be encrypted
foo_param = 'my_encrypted_foo'
parameters = {
"image": self.conf.minimal_image_ref,
"flavor": self.conf.minimal_instance_type,
'network': self.conf.fixed_network_name,
"foo": foo_param
}
stack_identifier = self.stack_create(
template=self.template,
parameters=parameters
)
stack = self.client.stacks.get(stack_identifier)
# Verify the output value for 'foo' parameter
for out in stack.outputs:
if out['output_key'] == 'encrypted_foo_param':
self.assertEqual(foo_param, out['output_value'])

87
heat_integrationtests/functional/test_encryption_vol_type.py

@ -1,87 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heat_integrationtests.functional import functional_base
test_encryption_vol_type = {
'heat_template_version': '2015-04-30',
'description': 'Test template to create encryption volume type.',
'resources': {
'my_volume_type': {
'type': 'OS::Cinder::VolumeType',
'properties': {
'name': 'LUKS'
}
},
'my_encrypted_vol_type': {
'type': 'OS::Cinder::EncryptedVolumeType',
'properties': {
'provider': 'nova.volume.encryptors.luks.LuksEncryptor',
'control_location': 'front-end',
'cipher': 'aes-xts-plain64',
'key_size': 512,
'volume_type': {'get_resource': 'my_volume_type'}
}
}
}
}
class EncryptionVolTypeTest(functional_base.FunctionalTestsBase):
def setUp(self):
super(EncryptionVolTypeTest, self).setUp()
if not self.conf.admin_username or not self.conf.admin_password:
self.skipTest('No admin creds found, skipping')
# cinder security policy usage of volume type is limited
# to being used by administrators only.
# Switch to admin
self.setup_clients_for_admin()
def check_stack(self, sid):
vt = 'my_volume_type'
e_vt = 'my_encrypted_vol_type'
# check if only two resources are present.
expected_resources = {vt: 'OS::Cinder::VolumeType',
e_vt: 'OS::Cinder::EncryptedVolumeType'}
self.assertEqual(expected_resources,
self.list_resources(sid))
e_vt_obj = self.client.resources.get(sid, e_vt)
my_encrypted_vol_type_tmpl_prop = test_encryption_vol_type[
'resources']['my_encrypted_vol_type']['properties']
# check if the phy rsrc specs was created in accordance with template.
phy_rsrc_specs = self.volume_client.volume_encryption_types.get(
e_vt_obj.physical_resource_id)
self.assertEqual(my_encrypted_vol_type_tmpl_prop['key_size'],
phy_rsrc_specs.key_size)
self.assertEqual(my_encrypted_vol_type_tmpl_prop['provider'],
phy_rsrc_specs.provider)
self.assertEqual(my_encrypted_vol_type_tmpl_prop['cipher'],
phy_rsrc_specs.cipher)
self.assertEqual(my_encrypted_vol_type_tmpl_prop['control_location'],
phy_rsrc_specs.control_location)
def test_create_update(self):
stack_identifier = self.stack_create(
template=test_encryption_vol_type)
self.check_stack(stack_identifier)
# Change some properties and trigger update.
my_encrypted_vol_type_tmpl_prop = test_encryption_vol_type[
'resources']['my_encrypted_vol_type']['properties']
my_encrypted_vol_type_tmpl_prop['key_size'] = 256
my_encrypted_vol_type_tmpl_prop['cipher'] = 'aes-cbc-essiv'
self.update_stack(stack_identifier, test_encryption_vol_type)
self.check_stack(stack_identifier)

149
heat_integrationtests/functional/test_nova_server_networks.py

@ -1,149 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heat_integrationtests.functional import functional_base
server_with_sub_fixed_ip_template = '''
heat_template_version: 2016-04-08
description: Test template to test nova server with subnet and fixed_ip.
parameters:
flavor:
type: string
image:
type: string
resources:
net:
type: OS::Neutron::Net
properties:
name: my_net
subnet:
type: OS::Neutron::Subnet
properties:
network: {get_resource: net}
cidr: 11.11.11.0/24
security_group:
type: OS::Neutron::SecurityGroup
properties:
name: the_sg
description: Ping and SSH
rules:
- protocol: icmp
- protocol: tcp
port_range_min: 22
port_range_max: 22
server:
type: OS::Nova::Server
properties:
image: {get_param: image}
flavor: {get_param: flavor}
networks:
- subnet: {get_resource: subnet}
fixed_ip: 11.11.11.11
security_groups:
- {get_resource: security_group}
outputs:
networks:
value: {get_attr: [server, networks]}
'''
server_with_port_template = '''
heat_template_version: 2016-04-08
description: Test template to test nova server with port.
parameters:
flavor:
type: string
image:
type: string
resources:
net:
type: OS::Neutron::Net
properties:
name: server_with_port_net
subnet:
type: OS::Neutron::Subnet
properties:
network: {get_resource: net}
cidr: 11.11.11.0/24
port:
type: OS::Neutron::Port
properties:
network: {get_resource: net}
fixed_ips:
- subnet: {get_resource: subnet}
ip_address: 11.11.11.11
server:
type: OS::Nova::Server
properties:
image: {get_param: image}
flavor: {get_param: flavor}
networks:
- port: {get_resource: port}
'''
class CreateServerTest(functional_base.FunctionalTestsBase):
def get_outputs(self, stack_identifier, output_key):
stack = self.client.stacks.get(stack_identifier)
return self._stack_output(stack, output_key)
def test_create_server_with_subnet_fixed_ip_sec_group(self):
parms = {'flavor': self.conf.minimal_instance_type,
'image': self.conf.minimal_image_ref}
stack_identifier = self.stack_create(
template=server_with_sub_fixed_ip_template,
stack_name='server_with_sub_ip',
parameters=parms)
networks = self.get_outputs(stack_identifier, 'networks')
self.assertEqual(['11.11.11.11'], networks['my_net'])
server_resource = self.client.resources.get(
stack_identifier, 'server')
server_id = server_resource.physical_resource_id
server = self.compute_client.servers.get(server_id)
self.assertEqual([{"name": "the_sg"}], server.security_groups)
def test_create_update_server_with_subnet(self):
parms = {'flavor': self.conf.minimal_instance_type,
'image': self.conf.minimal_image_ref}
template = server_with_sub_fixed_ip_template.replace(
'fixed_ip: 11.11.11.11',
'fixed_ip: 11.11.11.22').replace(
'name: my_net', 'name: your_net')
stack_identifier = self.stack_create(
template=template,
stack_name='create_server_with_sub_ip',
parameters=parms)
networks = self.get_outputs(stack_identifier, 'networks')
self.assertEqual(['11.11.11.22'], networks['your_net'])
# update the server only with subnet, we won't pass
# both port_id and net_id to attach interface, then update success
template_only_subnet = template.replace(
'fixed_ip: 11.11.11.22', '')
self.update_stack(stack_identifier,
template_only_subnet,
parameters=parms)
new_networks = self.get_outputs(stack_identifier, 'networks')
self.assertNotEqual(['11.11.11.22'], new_networks['your_net'])
def test_create_server_with_port(self):
parms = {'flavor': self.conf.minimal_instance_type,
'image': self.conf.minimal_image_ref}
# We just want to make sure we can create the server, no need to assert
# anything
self.stack_create(
template=server_with_port_template,
stack_name='server_with_port',
parameters=parms)

107
heat_integrationtests/functional/test_os_wait_condition.py

@ -1,107 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heat_integrationtests.functional import functional_base
class OSWaitCondition(functional_base.FunctionalTestsBase):
template = '''
heat_template_version: 2013-05-23
parameters:
flavor:
type: string
image:
type: string
network:
type: string
timeout:
type: number
default: 60
resources:
instance1:
type: OS::Nova::Server
properties:
flavor: {get_param: flavor}
image: {get_param: image}
networks:
- network: {get_param: network}
user_data_format: RAW
user_data:
str_replace:
template: '#!/bin/sh
wc_notify --data-binary ''{"status": "SUCCESS"}''
# signals with reason
wc_notify --data-binary ''{"status": "SUCCESS", "reason":
"signal2"}''
# signals with data
wc_notify --data-binary ''{"status": "SUCCESS", "reason":
"signal3", "data": "data3"}''
wc_notify --data-binary ''{"status": "SUCCESS", "reason":
"signal4", "data": "data4"}''
# check signals with the same number
wc_notify --data-binary ''{"status": "SUCCESS", "id": "5"}''
wc_notify --data-binary ''{"status": "SUCCESS", "id": "5"}''
# loop for 20 signals without reasons and data
for i in `seq 1 20`; do wc_notify --data-binary ''{"status":
"SUCCESS"}'' & done
wait
'
params:
wc_notify:
get_attr: [wait_handle, curl_cli]
wait_condition:
type: OS::Heat::WaitCondition
depends_on: instance1
properties:
count: 25
handle: {get_resource: wait_handle}
timeout: {get_param: timeout}
wait_handle:
type: OS::Heat::WaitConditionHandle
outputs:
curl_cli:
value:
get_attr: [wait_handle, curl_cli]
wc_data:
value:
get_attr: [wait_condition, data]
'''
def setUp(self):
super(OSWaitCondition, self).setUp()
if not self.conf.minimal_image_ref:
raise self.skipException("No minimal image configured to test")
if not self.conf.minimal_instance_type:
raise self.skipException("No minimal flavor configured to test")
def test_create_stack_with_multi_signal_waitcondition(self):
params = {'flavor': self.conf.minimal_instance_type,
'image': self.conf.minimal_image_ref,
'network': self.conf.fixed_network_name,
'timeout': 120}
self.stack_create(template=self.template, parameters=params)

144
heat_integrationtests/functional/test_remote_stack.py

@ -1,144 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heatclient import exc
import six
from heat_integrationtests.functional import functional_base
class RemoteStackTest(functional_base.FunctionalTestsBase):
template = '''
heat_template_version: 2013-05-23
resources:
my_stack:
type: OS::Heat::Stack
properties:
context:
region_name: RegionOne
template:
get_file: remote_stack.yaml
outputs:
key:
value: {get_attr: [my_stack, outputs]}
'''
remote_template = '''
heat_template_version: 2013-05-23
resources:
random1:
type: OS::Heat::RandomString
outputs:
remote_key:
value: {get_attr: [random1, value]}
'''
def setUp(self):
super(RemoteStackTest, self).setUp()
# replacing the template region with the one from the config
self.template = self.template.replace('RegionOne',
self.conf.region)
def test_remote_stack_alone(self):
stack_id = self.stack_create(template=self.remote_template)
expected_resources = {'random1': 'OS::Heat::RandomString'}
self.assertEqual(expected_resources, self.list_resources(stack_id))
stack = self.client.stacks.get(stack_id)
output_value = self._stack_output(stack, 'remote_key')
self.assertEqual(32, len(output_value))
def test_stack_create(self):
files = {'remote_stack.yaml': self.remote_template}
stack_id = self.stack_create(files=files)
expected_resources = {'my_stack': 'OS::Heat::Stack'}
self.assertEqual(expected_resources, self.list_resources(stack_id))
stack = self.client.stacks.get(stack_id)
output = self._stack_output(stack, 'key')
parent_output_value = output['remote_key']
self.assertEqual(32, len(parent_output_value))
rsrc = self.client.resources.get(stack_id, 'my_stack')
remote_id = rsrc.physical_resource_id
rstack = self.client.stacks.get(remote_id)
self.assertEqual(remote_id, rstack.id)
remote_output_value = self._stack_output(rstack, 'remote_key')
self.assertEqual(32, len(remote_output_value))
self.assertEqual(parent_output_value, remote_output_value)
remote_resources = {'random1': 'OS::Heat::RandomString'}
self.assertEqual(remote_resources, self.list_resources(remote_id))
def test_stack_create_bad_region(self):
tmpl_bad_region = self.template.replace(self.conf.region, 'DARKHOLE')
files = {'remote_stack.yaml': self.remote_template}
kwargs = {
'template': tmpl_bad_region,
'files': files
}
ex = self.assertRaises(exc.HTTPBadRequest, self.stack_create, **kwargs)
error_msg = ('ERROR: Cannot establish connection to Heat endpoint '
'at region "DARKHOLE" due to "publicURL endpoint for '
'orchestration service in DARKHOLE region not found"')
self.assertEqual(error_msg, six.text_type(ex))