charm-neutron-api/unit_tests/test_neutron_api_hooks.py
Liam Young 20ace1288c Use chelper generate_ha_relation_data for ha rel
Use the generate_ha_relation_data helper from charmhelpers to
generate the data to send down the relation to the hacluster
charm.

This results in a few changes in behaviour:

1) The charm will no longer specify a nic name to bind the vip. This
   is because Pacemaker VIP resources are able to automatically
   detect and configure correct iface and netmask parameters based
   on local configuration of the unit.
2) The original iface named VIP resource will be stopped and deleted
   prior to the creation of the new short hash named VIP resource.

Change-Id: I473fc8a8c00e0fa2fd39e7d187f63334acbe6462
2018-12-04 18:24:10 +00:00

852 lines
33 KiB
Python

# Copyright 2016 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from mock import MagicMock, patch, call
from test_utils import CharmTestCase
# python-apt is not installed as part of test-requirements but is imported by
# some charmhelpers modules so create a fake import.
sys.modules['apt'] = MagicMock()
with patch('charmhelpers.core.hookenv.config') as config:
config.return_value = 'neutron'
import neutron_api_utils as utils
_reg = utils.register_configs
_map = utils.restart_map
utils.register_configs = MagicMock()
utils.restart_map = MagicMock()
with patch('charmhelpers.contrib.hardening.harden.harden') as mock_dec:
mock_dec.side_effect = (lambda *dargs, **dkwargs: lambda f:
lambda *args, **kwargs: f(*args, **kwargs))
import neutron_api_hooks as hooks
hooks.hooks._config_save = False
utils.register_configs = _reg
utils.restart_map = _map
TO_PATCH = [
'api_port',
'apt_update',
'apt_install',
'config',
'CONFIGS',
'check_call',
'add_source',
'configure_installation_source',
'determine_packages',
'determine_ports',
'do_openstack_upgrade',
'dvr_router_present',
'local_unit',
'l3ha_router_present',
'execd_preinstall',
'filter_installed_packages',
'get_dns_domain',
'get_dvr',
'get_l3ha',
'get_l2population',
'get_overlay_network_type',
'is_clustered',
'is_elected_leader',
'is_qos_requested_and_valid',
'is_vlan_trunking_requested_and_valid',
'log',
'migrate_neutron_database',
'neutron_ready',
'open_port',
'openstack_upgrade_available',
'os_release',
'relation_get',
'relation_ids',
'relation_set',
'related_units',
'unit_get',
'update_nrpe_config',
'service_reload',
'neutron_plugin_attribute',
'IdentityServiceContext',
'force_etcd_restart',
'status_set',
'get_relation_ip',
'generate_ha_relation_data',
'is_nsg_logging_enabled',
'remove_old_packages',
'services',
'service_restart',
'generate_ha_relation_data',
]
NEUTRON_CONF_DIR = "/etc/neutron"
NEUTRON_CONF = '%s/neutron.conf' % NEUTRON_CONF_DIR
from random import randrange
def _mock_nuage_npa(plugin, attr, net_manager=None):
plugins = {
'vsp': {
'config': '/etc/neutron/plugins/nuage/nuage_plugin.ini',
'driver': 'neutron.plugins.nuage.plugin.NuagePlugin',
'contexts': [],
'services': [],
'packages': [],
'server_packages': ['neutron-server',
'neutron-plugin-nuage'],
'server_services': ['neutron-server']
},
}
return plugins[plugin][attr]
class DummyContext():
def __init__(self, return_value):
self.return_value = return_value
def __call__(self):
return self.return_value
class NeutronAPIHooksTests(CharmTestCase):
def setUp(self):
super(NeutronAPIHooksTests, self).setUp(hooks, TO_PATCH)
self.config.side_effect = self.test_config.get
self.relation_get.side_effect = self.test_relation.get
self.test_config.set('openstack-origin', 'distro')
self.test_config.set('neutron-plugin', 'ovs')
self.neutron_plugin_attribute.side_effect = _mock_nuage_npa
self.is_nsg_logging_enabled.return_value = False
def _fake_relids(self, rel_name):
return [randrange(100) for _count in range(2)]
def _call_hook(self, hookname):
hooks.hooks.execute([
'hooks/{}'.format(hookname)])
def test_install_hook(self):
_pkgs = ['foo', 'bar']
_ports = [80, 81, 82]
_port_calls = [call(port) for port in _ports]
self.determine_packages.return_value = _pkgs
self.determine_ports.return_value = _ports
self._call_hook('install')
self.configure_installation_source.assert_called_with(
'distro'
)
self.apt_update.assert_called_with(fatal=True)
self.apt_install.assert_has_calls([
call(_pkgs, fatal=True),
])
self.open_port.assert_has_calls(_port_calls)
self.assertTrue(self.execd_preinstall.called)
def test_nuage_install_hook(self):
self.test_config.set('neutron-plugin', 'vsp')
self.test_config.set('extra-source',
"deb http://10.14.4.1/nuage trusty main")
_pkgs = ['foo', 'bar', 'python-nuagenetlib']
_expected_pkgs = list(_pkgs)
_ports = [80, 81, 82]
_port_calls = [call(port) for port in _ports]
self.determine_packages.return_value = _pkgs
self.determine_ports.return_value = _ports
self._call_hook('install')
self.configure_installation_source.assert_called_with(
'distro'
)
self.apt_update.assert_called_with(fatal=True)
self.apt_install.assert_has_calls([
call(_expected_pkgs, fatal=True),
])
self.open_port.assert_has_calls(_port_calls)
self.assertTrue(self.execd_preinstall.called)
@patch.object(hooks, 'configure_https')
def test_config_changed(self, conf_https):
self.neutron_ready.return_value = True
self.openstack_upgrade_available.return_value = True
self.dvr_router_present.return_value = False
self.l3ha_router_present.return_value = False
self.relation_ids.side_effect = self._fake_relids
_n_api_rel_joined = self.patch('neutron_api_relation_joined')
_n_plugin_api_rel_joined =\
self.patch('neutron_plugin_api_relation_joined')
_amqp_rel_joined = self.patch('amqp_joined')
_id_rel_joined = self.patch('identity_joined')
_id_cluster_joined = self.patch('cluster_joined')
_id_ha_joined = self.patch('ha_joined')
self._call_hook('config-changed')
self.assertTrue(_n_api_rel_joined.called)
self.assertTrue(_n_plugin_api_rel_joined.called)
self.assertTrue(_amqp_rel_joined.called)
self.assertTrue(_id_rel_joined.called)
self.assertTrue(_id_cluster_joined.called)
self.assertTrue(_id_ha_joined.called)
self.assertTrue(self.CONFIGS.write_all.called)
self.assertTrue(self.do_openstack_upgrade.called)
self.assertTrue(self.apt_install.called)
def test_config_changed_nodvr_disprouters(self):
self.neutron_ready.return_value = True
self.dvr_router_present.return_value = True
self.get_dvr.return_value = False
with self.assertRaises(Exception):
self._call_hook('config-changed')
def test_config_changed_nol3ha_harouters(self):
self.neutron_ready.return_value = True
self.dvr_router_present.return_value = False
self.l3ha_router_present.return_value = True
self.get_l3ha.return_value = False
with self.assertRaises(Exception):
self._call_hook('config-changed')
def test_config_changed_with_openstack_upgrade_action(self):
self.remove_old_packages.return_value = False
self.openstack_upgrade_available.return_value = True
self.test_config.set('action-managed-upgrade', True)
self._call_hook('config-changed')
self.assertFalse(self.do_openstack_upgrade.called)
def test_config_changed_with_purge(self):
self.remove_old_packages.return_value = True
self.services.return_value = ['neutron-server']
self.openstack_upgrade_available.return_value = False
self._call_hook('config-changed')
self.remove_old_packages.assert_called_once_with()
self.service_restart.assert_called_once_with('neutron-server')
def test_amqp_joined(self):
self._call_hook('amqp-relation-joined')
self.relation_set.assert_called_with(
username='neutron',
vhost='openstack',
relation_id=None
)
@patch.object(hooks, 'neutron_plugin_api_subordinate_relation_joined')
def test_amqp_changed(self, plugin_joined):
self.relation_ids.return_value = ['neutron-plugin-api-subordinate:1']
self.CONFIGS.complete_contexts.return_value = ['amqp']
self._call_hook('amqp-relation-changed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
self.relation_ids.assert_called_with('neutron-plugin-api-subordinate')
plugin_joined.assert_called_with(
relid='neutron-plugin-api-subordinate:1')
def test_amqp_departed(self):
self._call_hook('amqp-relation-departed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
def test_db_joined(self):
self.get_relation_ip.return_value = '10.0.0.1'
self._call_hook('shared-db-relation-joined')
self.relation_set.assert_called_with(
username='neutron',
database='neutron',
hostname='10.0.0.1',
)
def test_db_joined_spaces(self):
self.get_relation_ip.return_value = '192.168.20.1'
self.unit_get.return_value = 'myhostname'
self._call_hook('shared-db-relation-joined')
self.relation_set.assert_called_with(
username='neutron',
database='neutron',
hostname='192.168.20.1',
)
@patch.object(hooks, 'neutron_plugin_api_subordinate_relation_joined')
@patch.object(hooks, 'conditional_neutron_migration')
def test_shared_db_changed(self, cond_neutron_mig, plugin_joined):
self.CONFIGS.complete_contexts.return_value = ['shared-db']
self.relation_ids.return_value = ['neutron-plugin-api-subordinate:1']
self._call_hook('shared-db-relation-changed')
self.assertTrue(self.CONFIGS.write_all.called)
cond_neutron_mig.assert_called_with()
self.relation_ids.assert_called_with('neutron-plugin-api-subordinate')
plugin_joined.assert_called_with(
relid='neutron-plugin-api-subordinate:1')
def test_shared_db_changed_partial_ctxt(self):
self.CONFIGS.complete_contexts.return_value = []
self._call_hook('shared-db-relation-changed')
self.assertFalse(self.CONFIGS.write_all.called)
def test_amqp_broken(self):
self._call_hook('amqp-relation-broken')
self.assertTrue(self.CONFIGS.write_all.called)
@patch.object(hooks, 'canonical_url')
def test_identity_joined(self, _canonical_url):
_canonical_url.return_value = 'http://127.0.0.1'
self.api_port.return_value = '9696'
self.test_config.set('region', 'region1')
_neutron_url = 'http://127.0.0.1:9696'
_endpoints = {
'neutron_service': 'neutron',
'neutron_region': 'region1',
'neutron_public_url': _neutron_url,
'neutron_admin_url': _neutron_url,
'neutron_internal_url': _neutron_url,
'quantum_service': None,
'quantum_region': None,
'quantum_public_url': None,
'quantum_admin_url': None,
'quantum_internal_url': None,
}
self._call_hook('identity-service-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
relation_settings=_endpoints
)
def test_identity_joined_partial_cluster(self):
self.is_clustered.return_value = False
self.test_config.set('vip', '10.0.0.10')
hooks.identity_joined()
self.assertFalse(self.relation_set.called)
@patch('charmhelpers.contrib.openstack.ip.service_name',
lambda *args: 'neutron-api')
@patch('charmhelpers.contrib.openstack.ip.unit_get')
@patch('charmhelpers.contrib.openstack.ip.is_clustered')
@patch('charmhelpers.contrib.openstack.ip.config')
def test_identity_changed_public_name(self, _config, _is_clustered,
_unit_get):
_unit_get.return_value = '127.0.0.1'
_is_clustered.return_value = False
_config.side_effect = self.test_config.get
self.api_port.return_value = '9696'
self.test_config.set('region', 'region1')
self.test_config.set('os-public-hostname',
'neutron-api.example.com')
self._call_hook('identity-service-relation-joined')
_neutron_url = 'http://127.0.0.1:9696'
_endpoints = {
'neutron_service': 'neutron',
'neutron_region': 'region1',
'neutron_public_url': 'http://neutron-api.example.com:9696',
'neutron_admin_url': _neutron_url,
'neutron_internal_url': _neutron_url,
'quantum_service': None,
'quantum_region': None,
'quantum_public_url': None,
'quantum_admin_url': None,
'quantum_internal_url': None,
}
self.relation_set.assert_called_with(
relation_id=None,
relation_settings=_endpoints
)
def test_identity_changed_partial_ctxt(self):
self.CONFIGS.complete_contexts.return_value = []
_api_rel_joined = self.patch('neutron_api_relation_joined')
self.relation_ids.side_effect = self._fake_relids
self._call_hook('identity-service-relation-changed')
self.assertFalse(_api_rel_joined.called)
@patch.object(hooks, 'configure_https')
def test_identity_changed(self, conf_https):
self.CONFIGS.complete_contexts.return_value = ['identity-service']
_api_rel_joined = self.patch('neutron_api_relation_joined')
self.relation_ids.side_effect = self._fake_relids
self._call_hook('identity-service-relation-changed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
self.assertTrue(_api_rel_joined.called)
@patch.object(hooks, 'canonical_url')
def test_neutron_api_relation_no_id_joined(self, _canonical_url):
host = 'http://127.0.0.1'
port = 1234
_id_rel_joined = self.patch('identity_joined')
self.relation_ids.side_effect = self._fake_relids
_canonical_url.return_value = host
self.api_port.return_value = port
self.get_dns_domain.return_value = "dnsdomain."
neutron_url = '%s:%s' % (host, port)
_relation_data = {
'enable-sriov': False,
'neutron-plugin': 'ovs',
'neutron-url': neutron_url,
'neutron-security-groups': 'no',
'neutron-api-ready': 'no',
'dns-domain': 'dnsdomain.',
}
self._call_hook('neutron-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
self.assertTrue(_id_rel_joined.called)
self.test_config.set('neutron-security-groups', True)
self._call_hook('neutron-api-relation-joined')
_relation_data['neutron-security-groups'] = 'yes'
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
@patch.object(hooks, 'is_api_ready')
@patch.object(hooks, 'canonical_url')
def test_neutron_api_relation_joined(self, _canonical_url, api_ready):
api_ready.return_value = True
host = 'http://127.0.0.1'
port = 1234
_canonical_url.return_value = host
self.api_port.return_value = port
self.get_dns_domain.return_value = ""
neutron_url = '%s:%s' % (host, port)
_relation_data = {
'enable-sriov': False,
'neutron-plugin': 'ovs',
'neutron-url': neutron_url,
'neutron-security-groups': 'no',
'neutron-api-ready': 'yes',
}
self._call_hook('neutron-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_vsd_api_relation_changed(self):
self.os_release.return_value = 'kilo'
self.test_config.set('neutron-plugin', 'vsp')
self.test_relation.set({
'vsd-ip-address': '10.11.12.13',
'nuage-cms-id': 'abc'
})
self._call_hook('vsd-rest-api-relation-changed')
config_file = '/etc/neutron/plugins/nuage/nuage_plugin.ini'
self.assertTrue(self.CONFIGS.write.called_with(config_file))
def test_vsd_api_relation_joined(self):
self.os_release.return_value = 'kilo'
repo = 'cloud:trusty-kilo'
self.test_config.set('openstack-origin', repo)
self._call_hook('vsd-rest-api-relation-joined')
e = "Neutron Api hook failed as vsd-cms-name" \
" is not specified"
self.status_set.assert_called_with(
'blocked', e)
self.test_config.set('vsd-cms-name', '1234567890')
_relation_data = {
'vsd-cms-name': '1234567890',
}
self._call_hook('vsd-rest-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_neutron_api_relation_changed(self):
self.CONFIGS.complete_contexts.return_value = ['shared-db']
self._call_hook('neutron-api-relation-changed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
def test_neutron_api_relation_changed_incomplere_ctxt(self):
self.CONFIGS.complete_contexts.return_value = []
self._call_hook('neutron-api-relation-changed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
@patch.object(hooks, 'is_api_ready')
def test_neutron_load_balancer_relation_joined(self, is_api_ready):
is_api_ready.return_value = True
self._call_hook('neutron-load-balancer-relation-joined')
is_api_ready.assert_called_once_with(self.CONFIGS)
_relation_data = {'neutron-api-ready': True}
self.relation_set.assert_called_once_with(relation_id=None,
**_relation_data)
@patch.object(hooks, 'is_api_ready')
def test_neutron_load_balancer_relation_changed(self, is_api_ready):
is_api_ready.return_value = True
self._call_hook('neutron-load-balancer-relation-changed')
is_api_ready.assert_called_once_with(self.CONFIGS)
_relation_data = {'neutron-api-ready': True}
self.relation_set.assert_called_once_with(relation_id=None,
**_relation_data)
self.CONFIGS.write.assert_called_once_with(NEUTRON_CONF)
def test_neutron_plugin_api_relation_joined_nol2(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
_relation_data = {
'neutron-security-groups': False,
'enable-dvr': False,
'enable-l3ha': False,
'enable-qos': False,
'enable-vlan-trunking': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': False,
'overlay-network-type': 'vxlan',
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'neutron-api-ready': 'no',
'enable-nsg-logging': False,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = False
self.get_l3ha.return_value = False
self.get_l2population.return_value = False
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = ''
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_neutron_plugin_api_relation_joined_nsg_logging(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
_relation_data = {
'neutron-security-groups': False,
'enable-dvr': False,
'enable-l3ha': False,
'enable-qos': False,
'enable-vlan-trunking': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': False,
'overlay-network-type': 'vxlan',
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'neutron-api-ready': 'no',
'enable-nsg-logging': True,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = False
self.get_l3ha.return_value = False
self.get_l2population.return_value = False
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = ''
self.test_config.set('enable-security-group-logging', True)
self.is_nsg_logging_enabled.return_value = True
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_neutron_plugin_api_relation_joined_dvr(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
_relation_data = {
'neutron-security-groups': False,
'enable-dvr': True,
'enable-l3ha': False,
'enable-qos': False,
'enable-vlan-trunking': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': True,
'overlay-network-type': 'vxlan',
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'neutron-api-ready': 'no',
'enable-nsg-logging': False,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = True
self.get_l3ha.return_value = False
self.get_l2population.return_value = True
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = ''
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_neutron_plugin_api_relation_joined_l3ha(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
_relation_data = {
'neutron-security-groups': False,
'enable-dvr': False,
'enable-l3ha': True,
'enable-qos': False,
'enable-vlan-trunking': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': False,
'overlay-network-type': 'vxlan',
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'neutron-api-ready': 'no',
'enable-nsg-logging': False,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = False
self.get_l3ha.return_value = True
self.get_l2population.return_value = False
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = ''
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_neutron_plugin_api_relation_joined_w_mtu(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
self.test_config.set('network-device-mtu', 1500)
_relation_data = {
'neutron-security-groups': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': False,
'enable-qos': False,
'enable-vlan-trunking': False,
'overlay-network-type': 'vxlan',
'network-device-mtu': 1500,
'enable-l3ha': True,
'enable-dvr': True,
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'neutron-api-ready': 'no',
'enable-nsg-logging': False,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = True
self.get_l3ha.return_value = True
self.get_l2population.return_value = False
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = ''
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_neutron_plugin_api_relation_joined_dns(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
_relation_data = {
'neutron-security-groups': False,
'enable-dvr': False,
'enable-l3ha': False,
'enable-qos': False,
'enable-vlan-trunking': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': False,
'overlay-network-type': 'vxlan',
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'neutron-api-ready': 'no',
'dns-domain': 'openstack.example.',
'enable-nsg-logging': False,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = False
self.get_l3ha.return_value = False
self.get_l2population.return_value = False
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = 'openstack.example.'
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
@patch.object(hooks, 'check_local_db_actions_complete')
def test_cluster_changed(self, mock_check_local_db_actions_complete):
self._call_hook('cluster-relation-changed')
self.assertTrue(self.CONFIGS.write_all.called)
self.assertTrue(mock_check_local_db_actions_complete.called)
def test_ha_relation_joined(self):
self.generate_ha_relation_data.return_value = {'rel_data': 'data'}
self._call_hook('ha-relation-joined')
self.relation_set.assert_called_once_with(
relation_id=None, rel_data='data')
def test_ha_changed(self):
self.test_relation.set({
'clustered': 'true',
})
self.relation_ids.side_effect = self._fake_relids
_n_api_rel_joined = self.patch('neutron_api_relation_joined')
_id_rel_joined = self.patch('identity_joined')
self._call_hook('ha-relation-changed')
self.assertTrue(_n_api_rel_joined.called)
self.assertTrue(_id_rel_joined.called)
def test_ha_changed_not_clustered(self):
self.test_relation.set({
'clustered': None,
})
self.relation_ids.side_effect = self._fake_relids
_n_api_rel_joined = self.patch('neutron_api_relation_joined')
_id_rel_joined = self.patch('identity_joined')
self._call_hook('ha-relation-changed')
self.assertFalse(_n_api_rel_joined.called)
self.assertFalse(_id_rel_joined.called)
def test_configure_https(self):
self.CONFIGS.complete_contexts.return_value = ['https']
self.relation_ids.side_effect = self._fake_relids
_id_rel_joined = self.patch('identity_joined')
hooks.configure_https()
self.check_call.assert_called_with(['a2ensite',
'openstack_https_frontend'])
self.assertTrue(_id_rel_joined.called)
def test_configure_https_nohttps(self):
self.CONFIGS.complete_contexts.return_value = []
self.relation_ids.side_effect = self._fake_relids
_id_rel_joined = self.patch('identity_joined')
hooks.configure_https()
self.check_call.assert_called_with(['a2dissite',
'openstack_https_frontend'])
self.assertTrue(_id_rel_joined.called)
def test_conditional_neutron_migration_leader(self):
self.test_relation.set({
'allowed_units': 'neutron-api/0 neutron-api/1 neutron-api/4',
})
self.local_unit.return_value = 'neutron-api/1'
self.is_elected_leader.return_value = True
self.os_release.return_value = 'kilo'
hooks.conditional_neutron_migration()
self.migrate_neutron_database.assert_called_with()
def test_conditional_neutron_migration_leader_icehouse(self):
self.test_relation.set({
'allowed_units': 'neutron-api/0 neutron-api/1 neutron-api/4',
})
self.local_unit.return_value = 'neutron-api/1'
self.is_elected_leader.return_value = True
self.os_release.return_value = 'icehouse'
hooks.conditional_neutron_migration()
self.assertFalse(self.migrate_neutron_database.called)
def test_conditional_neutron_migration_notleader(self):
self.is_elected_leader.return_value = False
self.os_release.return_value = 'icehouse'
hooks.conditional_neutron_migration()
self.assertFalse(self.migrate_neutron_database.called)
def test_etcd_peer_joined(self):
self._call_hook('etcd-proxy-relation-joined')
self.assertTrue(self.CONFIGS.register.called)
self.CONFIGS.write.assert_any_call('/etc/init/etcd.conf')
self.CONFIGS.write.assert_any_call('/etc/default/etcd')
def test_designate_peer_joined(self):
self.test_relation.set({
'endpoint': 'http://1.2.3.4:9001',
})
self.relation_ids.side_effect = self._fake_relids
self._call_hook('external-dns-relation-joined')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
def test_designate_peer_departed(self):
self._call_hook('external-dns-relation-departed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))