charm-neutron-api/unit_tests/test_neutron_api_hooks.py
Hervé Beraud 48cdd1a86c Use unittest.mock instead of mock
The mock third party library was needed for mock support in py2
runtimes. Since we now only support py36 and later, we can use the
standard lib unittest.mock module instead.

Note that https://github.com/openstack/charms.openstack is used during tests
and he need `mock`, unfortunatelly it doesn't declare `mock` in its
requirements so it retrieve mock from other charm project (cross dependency).
So we depend on charms.openstack first and when
Ib1ed5b598a52375e29e247db9ab4786df5b6d142 will be merged then CI
will pass without errors.

Depends-On: Ib1ed5b598a52375e29e247db9ab4786df5b6d142
Change-Id: I25d6d26f2a4169ddc30d8eeac193d768aa345058
2021-12-15 14:05:35 +00:00

1097 lines
43 KiB
Python

# Copyright 2016 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import sys
from unittest.mock import MagicMock, patch, call
from test_utils import CharmTestCase
# python-apt is not installed as part of test-requirements but is imported by
# some charmhelpers modules so create a fake import.
sys.modules['apt'] = MagicMock()
with patch('charmhelpers.core.hookenv.config') as config:
config.return_value = 'neutron'
import neutron_api_utils as utils
_reg = utils.register_configs
_map = utils.restart_map
utils.register_configs = MagicMock()
utils.restart_map = MagicMock()
with patch('charmhelpers.contrib.hardening.harden.harden') as mock_dec:
mock_dec.side_effect = (lambda *dargs, **dkwargs: lambda f:
lambda *args, **kwargs: f(*args, **kwargs))
import neutron_api_hooks as hooks
hooks.hooks._config_save = False
utils.register_configs = _reg
utils.restart_map = _map
TO_PATCH = [
'api_port',
'apt_update',
'apt_install',
'config',
'CONFIGS',
'check_call',
'add_source',
'configure_installation_source',
'determine_packages',
'determine_ports',
'do_openstack_upgrade',
'dvr_router_present',
'local_unit',
'l3ha_router_present',
'execd_preinstall',
'filter_installed_packages',
'get_dns_domain',
'get_dvr',
'get_l3ha',
'get_l2population',
'get_overlay_network_type',
'is_clustered',
'is_leader',
'is_elected_leader',
'is_qos_requested_and_valid',
'is_vlan_trunking_requested_and_valid',
'log',
'migrate_neutron_database',
'neutron_ready',
'open_port',
'openstack_upgrade_available',
'os_release',
'relation_get',
'relation_ids',
'relation_set',
'related_units',
'unit_get',
'update_nrpe_config',
'service_reload',
'neutron_plugin_attribute',
'IdentityServiceContext',
'force_etcd_restart',
'status_set',
'get_relation_ip',
'generate_ha_relation_data',
'is_nsg_logging_enabled',
'is_nfg_logging_enabled',
'is_port_forwarding_enabled',
'is_fwaas_enabled',
'remove_old_packages',
'services',
'service_restart',
'is_db_initialised',
'maybe_do_policyd_overrides',
'maybe_do_policyd_overrides_on_config_changed',
'is_db_maintenance_mode',
]
NEUTRON_CONF_DIR = "/etc/neutron"
NEUTRON_CONF = '%s/neutron.conf' % NEUTRON_CONF_DIR
from random import randrange
def _mock_nuage_npa(plugin, attr, net_manager=None):
plugins = {
'vsp': {
'config': '/etc/neutron/plugins/nuage/nuage_plugin.ini',
'driver': 'neutron.plugins.nuage.plugin.NuagePlugin',
'contexts': [],
'services': [],
'packages': [],
'server_packages': ['neutron-server',
'neutron-plugin-nuage'],
'server_services': ['neutron-server']
},
}
return plugins[plugin][attr]
class DummyContext():
def __init__(self, return_value):
self.return_value = return_value
def __call__(self):
return self.return_value
class NeutronAPIHooksTests(CharmTestCase):
def setUp(self):
super(NeutronAPIHooksTests, self).setUp(hooks, TO_PATCH)
self.config.side_effect = self.test_config.get
self.relation_get.side_effect = self.test_relation.get
self.test_config.set('openstack-origin', 'distro')
self.test_config.set('neutron-plugin', 'ovs')
self.neutron_plugin_attribute.side_effect = _mock_nuage_npa
self.is_nsg_logging_enabled.return_value = False
self.is_nfg_logging_enabled.return_value = False
self.is_port_forwarding_enabled.return_value = False
self.is_fwaas_enabled.return_value = True
def _fake_relids(self, rel_name):
return [randrange(100) for _count in range(2)]
def _call_hook(self, hookname):
hooks.hooks.execute([
'hooks/{}'.format(hookname)])
@patch.object(hooks, 'maybe_set_os_install_release')
def test_install_hook(self, mock_maybe_set_os_install_release):
_pkgs = ['foo', 'bar']
_ports = [80, 81, 82]
_port_calls = [call(port) for port in _ports]
self.determine_packages.return_value = _pkgs
self.determine_ports.return_value = _ports
self._call_hook('install')
self.configure_installation_source.assert_called_with(
'distro'
)
mock_maybe_set_os_install_release.assert_called_once_with('distro')
self.apt_update.assert_called_with(fatal=True)
self.apt_install.assert_has_calls([
call(_pkgs, fatal=True),
])
self.open_port.assert_has_calls(_port_calls)
self.assertTrue(self.execd_preinstall.called)
def test_nuage_install_hook(self):
self.test_config.set('neutron-plugin', 'vsp')
self.test_config.set('extra-source',
"deb http://10.14.4.1/nuage trusty main")
_pkgs = ['foo', 'bar', 'python-nuagenetlib']
_expected_pkgs = list(_pkgs)
_ports = [80, 81, 82]
_port_calls = [call(port) for port in _ports]
self.determine_packages.return_value = _pkgs
self.determine_ports.return_value = _ports
self._call_hook('install')
self.configure_installation_source.assert_called_with(
'distro'
)
self.apt_update.assert_called_with(fatal=True)
self.apt_install.assert_has_calls([
call(_expected_pkgs, fatal=True),
])
self.open_port.assert_has_calls(_port_calls)
self.assertTrue(self.execd_preinstall.called)
@patch.object(hooks, 'configure_https')
def test_config_changed(self, conf_https):
self.neutron_ready.return_value = True
self.openstack_upgrade_available.return_value = True
self.dvr_router_present.return_value = False
self.l3ha_router_present.return_value = False
self.relation_ids.side_effect = self._fake_relids
_n_api_rel_joined = self.patch('neutron_api_relation_joined')
_n_plugin_api_rel_joined =\
self.patch('neutron_plugin_api_relation_joined')
_amqp_rel_joined = self.patch('amqp_joined')
_id_rel_joined = self.patch('identity_joined')
_id_cluster_joined = self.patch('cluster_joined')
_id_ha_joined = self.patch('ha_joined')
_n_plugin_api_sub_rel_joined = self.patch(
'neutron_plugin_api_subordinate_relation_joined')
self._call_hook('config-changed')
self.assertTrue(_n_api_rel_joined.called)
self.assertTrue(_n_plugin_api_rel_joined.called)
self.assertTrue(_amqp_rel_joined.called)
self.assertTrue(_id_rel_joined.called)
self.assertTrue(_id_cluster_joined.called)
self.assertTrue(_id_ha_joined.called)
self.assertTrue(self.CONFIGS.write_all.called)
self.assertTrue(self.do_openstack_upgrade.called)
self.assertTrue(self.apt_install.called)
_n_plugin_api_sub_rel_joined.assert_called()
def test_config_changed_nodvr_disprouters(self):
self.neutron_ready.return_value = True
self.dvr_router_present.return_value = True
self.get_dvr.return_value = False
with self.assertRaises(Exception):
self._call_hook('config-changed')
def test_config_changed_nol3ha_harouters(self):
self.neutron_ready.return_value = True
self.dvr_router_present.return_value = False
self.l3ha_router_present.return_value = True
self.get_l3ha.return_value = False
with self.assertRaises(Exception):
self._call_hook('config-changed')
def test_config_changed_with_openstack_upgrade_action(self):
self.remove_old_packages.return_value = False
self.openstack_upgrade_available.return_value = True
self.test_config.set('action-managed-upgrade', True)
self._call_hook('config-changed')
self.assertFalse(self.do_openstack_upgrade.called)
def test_config_changed_with_purge(self):
self.remove_old_packages.return_value = True
self.services.return_value = ['neutron-server']
self.openstack_upgrade_available.return_value = False
self._call_hook('config-changed')
self.remove_old_packages.assert_called_once_with()
self.service_restart.assert_called_once_with('neutron-server')
def test_amqp_joined(self):
self._call_hook('amqp-relation-joined')
self.relation_set.assert_called_with(
username='neutron',
vhost='openstack',
relation_id=None
)
@patch.object(hooks, 'neutron_plugin_api_subordinate_relation_joined')
def test_amqp_changed(self, plugin_joined):
self.relation_ids.return_value = ['neutron-plugin-api-subordinate:1']
self.CONFIGS.complete_contexts.return_value = ['amqp']
self._call_hook('amqp-relation-changed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
self.relation_ids.assert_called_with('neutron-plugin-api-subordinate')
plugin_joined.assert_called_with(
relid='neutron-plugin-api-subordinate:1')
def test_amqp_departed(self):
self._call_hook('amqp-relation-departed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
def test_db_joined(self):
self.get_relation_ip.return_value = '10.0.0.1'
self._call_hook('shared-db-relation-joined')
self.relation_set.assert_called_with(
username='neutron',
database='neutron',
hostname='10.0.0.1',
)
def test_db_joined_spaces(self):
self.get_relation_ip.return_value = '192.168.20.1'
self.unit_get.return_value = 'myhostname'
self._call_hook('shared-db-relation-joined')
self.relation_set.assert_called_with(
username='neutron',
database='neutron',
hostname='192.168.20.1',
)
@patch.object(hooks, 'neutron_plugin_api_subordinate_relation_joined')
@patch.object(hooks, 'conditional_neutron_migration')
def test_shared_db_changed(self, cond_neutron_mig, plugin_joined):
self.is_db_maintenance_mode.return_value = False
self.CONFIGS.complete_contexts.return_value = ['shared-db']
self.relation_ids.return_value = ['neutron-plugin-api-subordinate:1']
self._call_hook('shared-db-relation-changed')
self.assertTrue(self.CONFIGS.write_all.called)
cond_neutron_mig.assert_called_with()
self.relation_ids.assert_called_with('neutron-plugin-api-subordinate')
plugin_joined.assert_called_with(
relid='neutron-plugin-api-subordinate:1')
def test_shared_db_changed_partial_ctxt(self):
self.is_db_maintenance_mode.return_value = False
self.CONFIGS.complete_contexts.return_value = []
self._call_hook('shared-db-relation-changed')
self.assertFalse(self.CONFIGS.write_all.called)
def test_amqp_broken(self):
self._call_hook('amqp-relation-broken')
self.assertTrue(self.CONFIGS.write_all.called)
@patch.object(hooks, 'canonical_url')
def test_identity_joined(self, _canonical_url):
_canonical_url.return_value = 'http://127.0.0.1'
self.api_port.return_value = '9696'
self.test_config.set('region', 'region1')
_neutron_url = 'http://127.0.0.1:9696'
_endpoints = {
'neutron_service': 'neutron',
'neutron_region': 'region1',
'neutron_public_url': _neutron_url,
'neutron_admin_url': _neutron_url,
'neutron_internal_url': _neutron_url,
'quantum_service': None,
'quantum_region': None,
'quantum_public_url': None,
'quantum_admin_url': None,
'quantum_internal_url': None,
}
self._call_hook('identity-service-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
relation_settings=_endpoints
)
def test_identity_joined_partial_cluster(self):
self.is_clustered.return_value = False
self.test_config.set('vip', '10.0.0.10')
hooks.identity_joined()
self.assertFalse(self.relation_set.called)
@patch('charmhelpers.contrib.openstack.ip.service_name',
lambda *args: 'neutron-api')
@patch('charmhelpers.contrib.openstack.ip.unit_get')
@patch('charmhelpers.contrib.openstack.ip.is_clustered')
@patch('charmhelpers.contrib.openstack.ip.config')
def test_identity_changed_public_name(self, _config, _is_clustered,
_unit_get):
_unit_get.return_value = '127.0.0.1'
_is_clustered.return_value = False
_config.side_effect = self.test_config.get
self.api_port.return_value = '9696'
self.test_config.set('region', 'region1')
self.test_config.set('os-public-hostname',
'neutron-api.example.com')
self._call_hook('identity-service-relation-joined')
_neutron_url = 'http://127.0.0.1:9696'
_endpoints = {
'neutron_service': 'neutron',
'neutron_region': 'region1',
'neutron_public_url': 'http://neutron-api.example.com:9696',
'neutron_admin_url': _neutron_url,
'neutron_internal_url': _neutron_url,
'quantum_service': None,
'quantum_region': None,
'quantum_public_url': None,
'quantum_admin_url': None,
'quantum_internal_url': None,
}
self.relation_set.assert_called_with(
relation_id=None,
relation_settings=_endpoints
)
def test_identity_changed_partial_ctxt(self):
self.CONFIGS.complete_contexts.return_value = []
_api_rel_joined = self.patch('neutron_api_relation_joined')
self.relation_ids.side_effect = self._fake_relids
self._call_hook('identity-service-relation-changed')
self.assertFalse(_api_rel_joined.called)
@patch.object(hooks, 'manage_plugin')
@patch.object(hooks, 'configure_https')
def test_identity_changed(self, conf_https, mock_manage_plugin):
self.CONFIGS.complete_contexts.return_value = ['identity-service']
_api_rel_joined = self.patch('neutron_api_relation_joined')
self.relation_ids.side_effect = self._fake_relids
self._call_hook('identity-service-relation-changed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
self.assertTrue(_api_rel_joined.called)
@patch.object(hooks, 'canonical_url')
def test_neutron_api_relation_no_id_joined(self, _canonical_url):
host = 'http://127.0.0.1'
port = 1234
_id_rel_joined = self.patch('identity_joined')
self.relation_ids.side_effect = self._fake_relids
_canonical_url.return_value = host
self.api_port.return_value = port
self.get_dns_domain.return_value = "dnsdomain."
neutron_url = '%s:%s' % (host, port)
_relation_data = {
'enable-sriov': False,
'enable-hardware-offload': False,
'neutron-plugin': 'ovs',
'neutron-url': neutron_url,
'neutron-security-groups': 'no',
'neutron-api-ready': 'no',
'dns-domain': 'dnsdomain.',
}
self._call_hook('neutron-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
self.assertTrue(_id_rel_joined.called)
self.test_config.set('neutron-security-groups', True)
self._call_hook('neutron-api-relation-joined')
_relation_data['neutron-security-groups'] = 'yes'
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
@patch.object(hooks, 'is_api_ready')
@patch.object(hooks, 'canonical_url')
def test_neutron_api_relation_joined(self, _canonical_url, api_ready):
api_ready.return_value = True
host = 'http://127.0.0.1'
port = 1234
_canonical_url.return_value = host
self.api_port.return_value = port
self.get_dns_domain.return_value = ""
neutron_url = '%s:%s' % (host, port)
_relation_data = {
'enable-sriov': False,
'enable-hardware-offload': False,
'neutron-plugin': 'ovs',
'neutron-url': neutron_url,
'neutron-security-groups': 'no',
'neutron-api-ready': 'yes',
}
self._call_hook('neutron-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_vsd_api_relation_changed(self):
self.os_release.return_value = 'kilo'
self.test_config.set('neutron-plugin', 'vsp')
self.test_relation.set({
'vsd-ip-address': '10.11.12.13',
'nuage-cms-id': 'abc'
})
self._call_hook('vsd-rest-api-relation-changed')
config_file = '/etc/neutron/plugins/nuage/nuage_plugin.ini'
self.assertTrue(self.CONFIGS.write.called_with(config_file))
def test_vsd_api_relation_joined(self):
self.os_release.return_value = 'kilo'
repo = 'cloud:trusty-kilo'
self.test_config.set('openstack-origin', repo)
self._call_hook('vsd-rest-api-relation-joined')
e = "Neutron Api hook failed as vsd-cms-name" \
" is not specified"
self.status_set.assert_called_with(
'blocked', e)
self.test_config.set('vsd-cms-name', '1234567890')
_relation_data = {
'vsd-cms-name': '1234567890',
}
self._call_hook('vsd-rest-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_neutron_api_relation_changed(self):
self.CONFIGS.complete_contexts.return_value = ['shared-db']
self._call_hook('neutron-api-relation-changed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
def test_neutron_api_relation_changed_incomplere_ctxt(self):
self.CONFIGS.complete_contexts.return_value = []
self._call_hook('neutron-api-relation-changed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
@patch.object(hooks, 'is_api_ready')
def test_neutron_load_balancer_relation_joined(self, is_api_ready):
is_api_ready.return_value = True
self._call_hook('neutron-load-balancer-relation-joined')
is_api_ready.assert_called_once_with(self.CONFIGS)
_relation_data = {'neutron-api-ready': True}
self.relation_set.assert_called_once_with(relation_id=None,
**_relation_data)
@patch.object(hooks, 'is_api_ready')
def test_neutron_load_balancer_relation_changed(self, is_api_ready):
is_api_ready.return_value = True
self._call_hook('neutron-load-balancer-relation-changed')
is_api_ready.assert_called_once_with(self.CONFIGS)
_relation_data = {'neutron-api-ready': True}
self.relation_set.assert_called_once_with(relation_id=None,
**_relation_data)
self.CONFIGS.write.assert_called_once_with(NEUTRON_CONF)
def test_neutron_plugin_api_relation_joined_nol2(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
_relation_data = {
'neutron-security-groups': False,
'enable-dvr': False,
'enable-l3ha': False,
'enable-qos': False,
'enable-vlan-trunking': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': False,
'overlay-network-type': 'vxlan',
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'internal_host': None,
'internal_port': None,
'internal_protocol': None,
'neutron-api-ready': 'no',
'enable-nsg-logging': False,
'enable-nfg-logging': False,
'enable-port-forwarding': False,
'enable-fwaas': True,
'global-physnet-mtu': 1500,
'physical-network-mtus': None,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = False
self.get_l3ha.return_value = False
self.get_l2population.return_value = False
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = ''
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_neutron_plugin_api_relation_joined_nsg_logging(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
_relation_data = {
'neutron-security-groups': False,
'enable-dvr': False,
'enable-l3ha': False,
'enable-qos': False,
'enable-vlan-trunking': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': False,
'overlay-network-type': 'vxlan',
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'internal_host': None,
'internal_port': None,
'internal_protocol': None,
'neutron-api-ready': 'no',
'enable-nsg-logging': True,
'enable-nfg-logging': False,
'enable-port-forwarding': False,
'enable-fwaas': True,
'global-physnet-mtu': 1500,
'physical-network-mtus': None,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = False
self.get_l3ha.return_value = False
self.get_l2population.return_value = False
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = ''
self.test_config.set('enable-security-group-logging', True)
self.is_nsg_logging_enabled.return_value = True
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_neutron_plugin_api_relation_joined_nfg_logging(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
_relation_data = {
'neutron-security-groups': False,
'enable-dvr': False,
'enable-l3ha': False,
'enable-qos': False,
'enable-vlan-trunking': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': False,
'overlay-network-type': 'vxlan',
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'internal_host': None,
'internal_port': None,
'internal_protocol': None,
'neutron-api-ready': 'no',
'enable-nsg-logging': False,
'enable-nfg-logging': True,
'enable-port-forwarding': False,
'enable-fwaas': True,
'global-physnet-mtu': 1500,
'physical-network-mtus': None,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = False
self.get_l3ha.return_value = False
self.get_l2population.return_value = False
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = ''
self.test_config.set('enable-firewall-group-logging', True)
self.is_nfg_logging_enabled.return_value = True
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data)
def test_neutron_plugin_api_relation_joined_port_forwarding(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
_relation_data = {
'neutron-security-groups': False,
'enable-dvr': False,
'enable-l3ha': False,
'enable-qos': False,
'enable-vlan-trunking': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': False,
'overlay-network-type': 'vxlan',
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'internal_host': None,
'internal_port': None,
'internal_protocol': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'neutron-api-ready': 'no',
'enable-nsg-logging': False,
'enable-nfg-logging': False,
'enable-port-forwarding': True,
'enable-fwaas': True,
'global-physnet-mtu': 1500,
'physical-network-mtus': None,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = False
self.get_l3ha.return_value = False
self.get_l2population.return_value = False
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = ''
self.test_config.set('enable-port-forwarding', True)
self.is_port_forwarding_enabled.return_value = True
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data)
def test_neutron_plugin_api_relation_joined_dvr(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
_relation_data = {
'neutron-security-groups': False,
'enable-dvr': True,
'enable-l3ha': False,
'enable-qos': False,
'enable-vlan-trunking': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': True,
'overlay-network-type': 'vxlan',
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'internal_host': None,
'internal_port': None,
'internal_protocol': None,
'neutron-api-ready': 'no',
'enable-nsg-logging': False,
'enable-nfg-logging': False,
'enable-port-forwarding': False,
'enable-fwaas': True,
'global-physnet-mtu': 1500,
'physical-network-mtus': None,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = True
self.get_l3ha.return_value = False
self.get_l2population.return_value = True
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = ''
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_neutron_plugin_api_relation_joined_l3ha(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
_relation_data = {
'neutron-security-groups': False,
'enable-dvr': False,
'enable-l3ha': True,
'enable-qos': False,
'enable-vlan-trunking': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': False,
'overlay-network-type': 'vxlan',
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'internal_host': None,
'internal_port': None,
'internal_protocol': None,
'neutron-api-ready': 'no',
'enable-nsg-logging': False,
'enable-nfg-logging': False,
'enable-port-forwarding': False,
'enable-fwaas': True,
'global-physnet-mtu': 1500,
'physical-network-mtus': None,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = False
self.get_l3ha.return_value = True
self.get_l2population.return_value = False
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = ''
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_neutron_plugin_api_relation_joined_w_mtu(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
self.test_config.set('network-device-mtu', 1500)
_relation_data = {
'neutron-security-groups': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': False,
'enable-qos': False,
'enable-vlan-trunking': False,
'overlay-network-type': 'vxlan',
'network-device-mtu': 1500,
'enable-l3ha': True,
'enable-dvr': True,
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'internal_host': None,
'internal_port': None,
'internal_protocol': None,
'neutron-api-ready': 'no',
'enable-nsg-logging': False,
'enable-nfg-logging': False,
'enable-port-forwarding': False,
'enable-fwaas': True,
'global-physnet-mtu': 1500,
'physical-network-mtus': None,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = True
self.get_l3ha.return_value = True
self.get_l2population.return_value = False
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = ''
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
def test_neutron_plugin_api_relation_joined_dns(self):
self.unit_get.return_value = '172.18.18.18'
self.IdentityServiceContext.return_value = \
DummyContext(return_value={})
_relation_data = {
'neutron-security-groups': False,
'enable-dvr': False,
'enable-l3ha': False,
'enable-qos': False,
'enable-vlan-trunking': False,
'addr': '172.18.18.18',
'polling-interval': 2,
'rpc-response-timeout': 60,
'report-interval': 30,
'l2-population': False,
'overlay-network-type': 'vxlan',
'service_protocol': None,
'auth_protocol': None,
'service_tenant': None,
'service_port': None,
'region': 'RegionOne',
'service_password': None,
'auth_port': None,
'auth_host': None,
'service_username': None,
'service_host': None,
'internal_host': None,
'internal_port': None,
'internal_protocol': None,
'neutron-api-ready': 'no',
'dns-domain': 'openstack.example.',
'enable-nsg-logging': False,
'enable-nfg-logging': False,
'enable-port-forwarding': False,
'enable-fwaas': True,
'global-physnet-mtu': 1500,
'physical-network-mtus': None,
}
self.is_qos_requested_and_valid.return_value = False
self.is_vlan_trunking_requested_and_valid.return_value = False
self.get_dvr.return_value = False
self.get_l3ha.return_value = False
self.get_l2population.return_value = False
self.get_overlay_network_type.return_value = 'vxlan'
self.get_dns_domain.return_value = 'openstack.example.'
self._call_hook('neutron-plugin-api-relation-joined')
self.relation_set.assert_called_with(
relation_id=None,
**_relation_data
)
@patch.object(hooks, 'check_local_db_actions_complete')
def test_cluster_changed(self, mock_check_local_db_actions_complete):
self._call_hook('cluster-relation-changed')
self.assertTrue(self.CONFIGS.write_all.called)
self.assertTrue(mock_check_local_db_actions_complete.called)
def test_ha_relation_joined(self):
self.generate_ha_relation_data.return_value = {'rel_data': 'data'}
self._call_hook('ha-relation-joined')
self.relation_set.assert_called_once_with(
relation_id=None, rel_data='data')
def test_ha_changed(self):
self.test_relation.set({
'clustered': 'true',
})
self.relation_ids.side_effect = self._fake_relids
_n_api_rel_joined = self.patch('neutron_api_relation_joined')
_id_rel_joined = self.patch('identity_joined')
self._call_hook('ha-relation-changed')
self.assertTrue(_n_api_rel_joined.called)
self.assertTrue(_id_rel_joined.called)
def test_ha_changed_not_clustered(self):
self.test_relation.set({
'clustered': None,
})
self.relation_ids.side_effect = self._fake_relids
_n_api_rel_joined = self.patch('neutron_api_relation_joined')
_id_rel_joined = self.patch('identity_joined')
self._call_hook('ha-relation-changed')
self.assertFalse(_n_api_rel_joined.called)
self.assertFalse(_id_rel_joined.called)
def test_configure_https(self):
self.CONFIGS.complete_contexts.return_value = ['https']
self.relation_ids.side_effect = self._fake_relids
_id_rel_joined = self.patch('identity_joined')
hooks.configure_https()
self.check_call.assert_called_with(['a2ensite',
'openstack_https_frontend'])
self.assertTrue(_id_rel_joined.called)
def test_configure_https_nohttps(self):
self.CONFIGS.complete_contexts.return_value = []
self.relation_ids.side_effect = self._fake_relids
_id_rel_joined = self.patch('identity_joined')
hooks.configure_https()
self.check_call.assert_called_with(['a2dissite',
'openstack_https_frontend'])
self.assertTrue(_id_rel_joined.called)
def test_conditional_neutron_migration_leader(self):
self.test_relation.set({
'allowed_units': 'neutron-api/0 neutron-api/1 neutron-api/4',
})
self.local_unit.return_value = 'neutron-api/1'
self.is_elected_leader.return_value = True
self.os_release.return_value = 'kilo'
hooks.conditional_neutron_migration()
self.migrate_neutron_database.assert_called()
def test_conditional_neutron_migration_leader_icehouse(self):
self.test_relation.set({
'allowed_units': 'neutron-api/0 neutron-api/1 neutron-api/4',
})
self.local_unit.return_value = 'neutron-api/1'
self.is_elected_leader.return_value = True
self.os_release.return_value = 'icehouse'
hooks.conditional_neutron_migration()
self.assertFalse(self.migrate_neutron_database.called)
def test_conditional_neutron_migration_notleader(self):
self.is_elected_leader.return_value = False
self.os_release.return_value = 'icehouse'
hooks.conditional_neutron_migration()
self.assertFalse(self.migrate_neutron_database.called)
def test_etcd_peer_joined(self):
self._call_hook('etcd-proxy-relation-joined')
self.assertTrue(self.CONFIGS.register.called)
self.CONFIGS.write.assert_any_call('/etc/init/etcd.conf')
self.CONFIGS.write.assert_any_call('/etc/default/etcd')
def test_designate_peer_joined(self):
self.test_relation.set({
'endpoint': 'http://1.2.3.4:9001',
})
self.relation_ids.side_effect = self._fake_relids
self._call_hook('external-dns-relation-joined')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
def test_designate_peer_departed(self):
self._call_hook('external-dns-relation-departed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
def test_infoblox_peer_changed(self):
self.is_db_initialised.return_value = True
self.test_relation.set({
'dc_id': '0',
})
self.os_release.return_value = 'queens'
self.relation_ids.side_effect = self._fake_relids
self._call_hook('infoblox-neutron-relation-changed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
def test_infoblox_peer_departed(self):
self._call_hook('infoblox-neutron-relation-departed')
self.assertTrue(self.CONFIGS.write.called_with(NEUTRON_CONF))
@patch.object(hooks, 'NeutronApiSDNContext')
@patch.object(hooks, 'NeutronCCContext')
@patch.object(hooks, 'manage_plugin')
def test_neutron_plugin_api_subordinate_relation(
self, _manage_plugin, _NeutronCCContext, _NeutronApiSDNContext):
_manage_plugin.return_value = True
self._call_hook('neutron-plugin-api-subordinate-relation-joined')
self.relation_set.assert_called_once_with(
**{'neutron-api-ready': 'no'}, relation_id=None)
self.CONFIGS.write_all.assert_called_once_with()
self.relation_set.reset_mock()
self.CONFIGS.reset_mock()
_manage_plugin.return_value = False
ncc_instance = _NeutronCCContext.return_value
ncc_instance.return_value = {'core_plugin': 'aPlugin'}
napisdn_instance = _NeutronApiSDNContext.return_value
napisdn_instance.is_allowed.return_value = True
self._call_hook('neutron-plugin-api-subordinate-relation-changed')
self.relation_set.assert_called_once_with(
**{'neutron-api-ready': 'no',
'neutron_config_data': json.dumps({'core_plugin': 'aPlugin'})},
relation_id=None)
self.CONFIGS.write_all.assert_called_once_with()
@patch.object(hooks, 'manage_plugin')
@patch.object(hooks, 'is_api_ready')
@patch.object(hooks, 'leader_set')
@patch.object(hooks, 'migrate_neutron_database')
@patch.object(hooks, 'leader_get')
@patch.object(hooks, 'relation_id')
@patch.object(hooks, 'is_db_initialised')
def test_neutron_plugin_api_subordinate_relation_db_migration(
self, _is_db_initialised, _relation_id, _leader_get,
_migrate_neutron_database, _leader_set, _is_api_ready,
_manage_plugin):
_is_db_initialised.return_value = True
_relation_id.return_value = 'neutron-plugin-api-subordinate:42'
self.related_units.return_value = ['aUnit']
self.is_leader.return_value = True
self.relation_get.side_effect = None
self.relation_get.return_value = 'fake-uuid'
_leader_get.return_value = (
'migrate-database-nonce-neutron-plugin-api-subordinate:42')
_is_api_ready.return_value = True
_manage_plugin.return_value = True
self._call_hook('neutron-plugin-api-subordinate-relation-changed')
_migrate_neutron_database.assert_called_once_with(upgrade=True)
self.relation_set.assert_called_once_with(
relation_id='neutron-plugin-api-subordinate:42',
**{'migrate-database-nonce': 'fake-uuid',
'neutron-api-ready': 'yes'})