charm-neutron-openvswitch/tests/basic_deployment.py

202 lines
8.4 KiB
Python

#!/usr/bin/python
import amulet
import time
from charmhelpers.contrib.openstack.amulet.deployment import (
OpenStackAmuletDeployment
)
from charmhelpers.contrib.openstack.amulet.utils import (
OpenStackAmuletUtils,
DEBUG, # flake8: noqa
ERROR
)
# Use DEBUG to turn on debug logging
u = OpenStackAmuletUtils(ERROR)
# XXX Tests inspecting relation data from the perspective of the
# neutron-openvswitch are missing because amulet sentries aren't created for
# subordinates Bug#1421388
class NeutronOVSBasicDeployment(OpenStackAmuletDeployment):
"""Amulet tests on a basic neutron-openvswtich deployment."""
def __init__(self, series, openstack=None, source=None, stable=False):
"""Deploy the entire test environment."""
super(NeutronOVSBasicDeployment, self).__init__(series, openstack,
source, stable)
self._add_services()
self._add_relations()
self._configure_services()
self._deploy()
self._initialize_tests()
def _add_services(self):
"""Add services
Add the services that we're testing, where neutron-openvswitch is local,
and the rest of the service are from lp branches that are
compatible with the local charm (e.g. stable or next).
"""
this_service = {'name': 'neutron-openvswitch'}
other_services = [{'name': 'nova-compute'},
{'name': 'rabbitmq-server'},
{'name': 'neutron-api'}]
super(NeutronOVSBasicDeployment, self)._add_services(this_service,
other_services)
def _add_relations(self):
"""Add all of the relations for the services."""
relations = {
'neutron-openvswitch:amqp': 'rabbitmq-server:amqp',
'neutron-openvswitch:neutron-plugin':
'nova-compute:neutron-plugin',
'neutron-openvswitch:neutron-plugin-api':
'neutron-api:neutron-plugin-api',
}
super(NeutronOVSBasicDeployment, self)._add_relations(relations)
def _configure_services(self):
"""Configure all of the services."""
configs = {}
super(NeutronOVSBasicDeployment, self)._configure_services(configs)
def _initialize_tests(self):
"""Perform final initialization before tests get run."""
# Access the sentries for inspecting service units
self.compute_sentry = self.d.sentry.unit['nova-compute/0']
self.rabbitmq_sentry = self.d.sentry.unit['rabbitmq-server/0']
self.neutron_api_sentry = self.d.sentry.unit['neutron-api/0']
def test_services(self):
"""Verify the expected services are running on the corresponding
service units."""
commands = {
self.compute_sentry: ['status nova-compute'],
self.rabbitmq_sentry: ['service rabbitmq-server status'],
self.neutron_api_sentry: ['status neutron-server'],
}
ret = u.validate_services(commands)
if ret:
amulet.raise_status(amulet.FAIL, msg=ret)
def test_rabbitmq_amqp_relation(self):
"""Verify data in rabbitmq-server/neutron-openvswitch amqp relation"""
unit = self.rabbitmq_sentry
relation = ['amqp', 'neutron-openvswitch:amqp']
expected = {
'private-address': u.valid_ip,
'password': u.not_null,
'hostname': u.valid_ip
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
message = u.relation_error('rabbitmq amqp', ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_nova_compute_relation(self):
"""Verify the nova-compute to neutron-openvswitch relation data"""
unit = self.compute_sentry
relation = ['neutron-plugin', 'neutron-openvswitch:neutron-plugin']
expected = {
'private-address': u.valid_ip,
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
message = u.relation_error('nova-compute neutron-plugin', ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_neutron_api_relation(self):
"""Verify the neutron-api to neutron-openvswitch relation data"""
unit = self.neutron_api_sentry
relation = ['neutron-plugin-api',
'neutron-openvswitch:neutron-plugin-api']
expected = {
'private-address': u.valid_ip,
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
message = u.relation_error('neutron-api neutron-plugin-api', ret)
amulet.raise_status(amulet.FAIL, msg=message)
def process_ret(self, ret=None, message=None):
if ret:
amulet.raise_status(amulet.FAIL, msg=message)
def check_ml2_setting_propagation(self, service, charm_key,
config_file_key, vpair,
section):
unit = self.compute_sentry
conf = "/etc/neutron/plugins/ml2/ml2_conf.ini"
for value in vpair:
self.d.configure(service, {charm_key: value})
time.sleep(30)
ret = u.validate_config_data(unit, conf, section,
{config_file_key: value})
msg = "Propagation error, expected %s=%s" % (config_file_key,
value)
self.process_ret(ret=ret, message=msg)
def test_l2pop_propagation(self):
"""Verify that neutron-api l2pop setting propagates to neutron-ovs"""
self.check_ml2_setting_propagation('neutron-api',
'l2-population',
'l2_population',
['False', 'True'],
'agent')
def test_nettype_propagation(self):
"""Verify that neutron-api nettype setting propagates to neutron-ovs"""
self.check_ml2_setting_propagation('neutron-api',
'overlay-network-type',
'tunnel_types',
['vxlan', 'gre'],
'agent')
def test_secgroup_propagation_local_override(self):
"""Verify disable-security-groups overrides what neutron-api says"""
unit = self.compute_sentry
conf = "/etc/neutron/plugins/ml2/ml2_conf.ini"
self.d.configure('neutron-api', {'neutron-security-groups': 'True'})
self.d.configure('neutron-openvswitch',
{'disable-security-groups': 'True'})
time.sleep(30)
ret = u.validate_config_data(unit, conf, 'securitygroup',
{'enable_security_group': 'False'})
msg = "Propagation error, expected %s=%s" % ('enable_security_group',
'False')
self.process_ret(ret=ret, message=msg)
self.d.configure('neutron-openvswitch',
{'disable-security-groups': 'False'})
self.d.configure('neutron-api', {'neutron-security-groups': 'True'})
time.sleep(30)
ret = u.validate_config_data(unit, conf, 'securitygroup',
{'enable_security_group': 'True'})
def test_z_restart_on_config_change(self):
"""Verify that the specified services are restarted when the config
is changed.
Note(coreycb): The method name with the _z_ is a little odd
but it forces the test to run last. It just makes things
easier because restarting services requires re-authorization.
"""
conf = '/etc/neutron/neutron.conf'
self.d.configure('neutron-openvswitch', {'use-syslog': 'True'})
if not u.service_restarted(self.compute_sentry,
'neutron-openvswitch-agent', conf,
pgrep_full=True, sleep_time=60):
self.d.configure('neutron-openvswitch', {'use-syslog': 'False'})
msg = ('service neutron-openvswitch-agent did not restart after '
'config change')
amulet.raise_status(amulet.FAIL, msg=msg)
self.d.configure('neutron-openvswitch', {'use-syslog': 'False'})