diff --git a/neutron/tests/unit/agent/l3/test_l3_router.py b/neutron/tests/unit/agent/l3/test_l3_router.py deleted file mode 100644 index 27d5b8030cd..00000000000 --- a/neutron/tests/unit/agent/l3/test_l3_router.py +++ /dev/null @@ -1,226 +0,0 @@ -# Copyright (c) 2015 Openstack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock - -from neutron.agent.l3 import router_info -from neutron.agent.linux import ip_lib -from neutron.common import constants as l3_constants -from neutron.common import exceptions as n_exc -from neutron.openstack.common import uuidutils -from neutron.tests import base - -_uuid = uuidutils.generate_uuid - - -class BasicRouterTestCaseFramework(base.BaseTestCase): - def _create_router(self, router=None, **kwargs): - if not router: - router = mock.MagicMock() - self.agent_conf = mock.Mock() - # NOTE The use_namespaces config will soon be deprecated - self.agent_conf.use_namespaces = True - self.router_id = _uuid() - return router_info.RouterInfo(self.router_id, - router, - self.agent_conf, - mock.sentinel.interface_driver, - **kwargs) - - -class TestBasicRouterOperations(BasicRouterTestCaseFramework): - - def test_get_floating_ips(self): - router = mock.MagicMock() - router.get.return_value = [mock.sentinel.floating_ip] - ri = self._create_router(router) - - fips = ri.get_floating_ips() - - self.assertEqual([mock.sentinel.floating_ip], fips) - - def test_process_floating_ip_nat_rules(self): - ri = self._create_router() - fips = [{'fixed_ip_address': mock.sentinel.ip, - 'floating_ip_address': mock.sentinel.fip}] - ri.get_floating_ips = mock.Mock(return_value=fips) - ri.iptables_manager = mock.MagicMock() - ipv4_nat = ri.iptables_manager.ipv4['nat'] - ri.floating_forward_rules = mock.Mock( - return_value=[(mock.sentinel.chain, mock.sentinel.rule)]) - - ri.process_floating_ip_nat_rules() - - # Be sure that the rules are cleared first and apply is called last - self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'), - ipv4_nat.mock_calls[0]) - self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1]) - - # Be sure that add_rule is called somewhere in the middle - ipv4_nat.add_rule.assert_called_once_with(mock.sentinel.chain, - mock.sentinel.rule, - tag='floating_ip') - - def test_process_floating_ip_nat_rules_removed(self): - ri = self._create_router() - ri.get_floating_ips = mock.Mock(return_value=[]) - ri.iptables_manager = mock.MagicMock() - ipv4_nat = ri.iptables_manager.ipv4['nat'] - - ri.process_floating_ip_nat_rules() - - # Be sure that the rules are cleared first and apply is called last - self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'), - ipv4_nat.mock_calls[0]) - self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1]) - - # Be sure that add_rule is called somewhere in the middle - self.assertFalse(ipv4_nat.add_rule.called) - - def _test_add_fip_addr_to_device_error(self, device): - ri = self._create_router() - ip = '15.1.2.3' - - result = ri._add_fip_addr_to_device( - {'id': mock.sentinel.id, 'floating_ip_address': ip}, device) - - device.addr.add.assert_called_with(ip + '/32') - return result - - def test__add_fip_addr_to_device(self): - result = self._test_add_fip_addr_to_device_error(mock.Mock()) - self.assertTrue(result) - - def test__add_fip_addr_to_device_error(self): - device = mock.Mock() - device.addr.add.side_effect = RuntimeError - result = self._test_add_fip_addr_to_device_error(device) - self.assertFalse(result) - - def test_process_snat_dnat_for_fip(self): - ri = self._create_router() - ri.process_floating_ip_nat_rules = mock.Mock(side_effect=Exception) - - self.assertRaises(n_exc.FloatingIpSetupException, - ri.process_snat_dnat_for_fip) - - ri.process_floating_ip_nat_rules.assert_called_once_with() - - def test_put_fips_in_error_state(self): - ri = self._create_router() - ri.router = mock.Mock() - ri.router.get.return_value = [{'id': mock.sentinel.id1}, - {'id': mock.sentinel.id2}] - - statuses = ri.put_fips_in_error_state() - - expected = [{mock.sentinel.id1: l3_constants.FLOATINGIP_STATUS_ERROR, - mock.sentinel.id2: l3_constants.FLOATINGIP_STATUS_ERROR}] - self.assertNotEqual(expected, statuses) - - def test_configure_fip_addresses(self): - ri = self._create_router() - ri.process_floating_ip_addresses = mock.Mock( - side_effect=Exception) - - self.assertRaises(n_exc.FloatingIpSetupException, - ri.configure_fip_addresses, - mock.sentinel.interface_name) - - ri.process_floating_ip_addresses.assert_called_once_with( - mock.sentinel.interface_name) - - def test_get_router_cidrs_returns_cidrs(self): - ri = self._create_router() - addresses = ['15.1.2.2/24', '15.1.2.3/32'] - device = mock.MagicMock() - device.addr.list.return_value = [{'cidr': addresses[0]}, - {'cidr': addresses[1]}] - self.assertEqual(set(addresses), ri.get_router_cidrs(device)) - - -@mock.patch.object(ip_lib, 'IPDevice') -class TestFloatingIpWithMockDevice(BasicRouterTestCaseFramework): - - def test_process_floating_ip_addresses_remap(self, IPDevice): - fip_id = _uuid() - fip = { - 'id': fip_id, 'port_id': _uuid(), - 'floating_ip_address': '15.1.2.3', - 'fixed_ip_address': '192.168.0.2' - } - - IPDevice.return_value = device = mock.Mock() - device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}] - ri = self._create_router() - ri.get_floating_ips = mock.Mock(return_value=[fip]) - - fip_statuses = ri.process_floating_ip_addresses( - mock.sentinel.interface_name) - self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ACTIVE}, - fip_statuses) - - self.assertFalse(device.addr.add.called) - self.assertFalse(device.addr.delete.called) - - def test_process_router_with_disabled_floating_ip(self, IPDevice): - fip_id = _uuid() - fip = { - 'id': fip_id, 'port_id': _uuid(), - 'floating_ip_address': '15.1.2.3', - 'fixed_ip_address': '192.168.0.2' - } - - ri = self._create_router() - ri.floating_ips = [fip] - ri.get_floating_ips = mock.Mock(return_value=[]) - - fip_statuses = ri.process_floating_ip_addresses( - mock.sentinel.interface_name) - - self.assertIsNone(fip_statuses.get(fip_id)) - - def test_process_router_floating_ip_with_device_add_error(self, IPDevice): - IPDevice.return_value = device = mock.Mock(side_effect=RuntimeError) - device.addr.list.return_value = [] - fip_id = _uuid() - fip = { - 'id': fip_id, 'port_id': _uuid(), - 'floating_ip_address': '15.1.2.3', - 'fixed_ip_address': '192.168.0.2' - } - ri = self._create_router() - ri.add_floating_ip = mock.Mock( - return_value=l3_constants.FLOATINGIP_STATUS_ERROR) - ri.get_floating_ips = mock.Mock(return_value=[fip]) - - fip_statuses = ri.process_floating_ip_addresses( - mock.sentinel.interface_name) - - self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ERROR}, - fip_statuses) - - # TODO(mrsmith): refactor for DVR cases - def test_process_floating_ip_addresses_remove(self, IPDevice): - IPDevice.return_value = device = mock.Mock() - device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}] - - ri = self._create_router() - ri.remove_floating_ip = mock.Mock() - ri.router.get = mock.Mock(return_value=[]) - - fip_statuses = ri.process_floating_ip_addresses( - mock.sentinel.interface_name) - self.assertEqual({}, fip_statuses) - ri.remove_floating_ip.assert_called_once_with(device, '15.1.2.3/32') diff --git a/neutron/tests/unit/agent/l3/test_router_info.py b/neutron/tests/unit/agent/l3/test_router_info.py index acb764d373c..04aa55748c3 100644 --- a/neutron/tests/unit/agent/l3/test_router_info.py +++ b/neutron/tests/unit/agent/l3/test_router_info.py @@ -14,6 +14,9 @@ import mock from neutron.agent.common import config as agent_config from neutron.agent.l3 import router_info +from neutron.agent.linux import ip_lib +from neutron.common import constants as l3_constants +from neutron.common import exceptions as n_exc from neutron.openstack.common import uuidutils from neutron.tests import base @@ -104,3 +107,205 @@ class TestRouterInfo(base.BaseTestCase): expected = [['ip', 'route', 'delete', 'to', '110.100.30.0/24', 'via', '10.100.10.30']] self._check_agent_method_called(expected) + + +class BasicRouterTestCaseFramework(base.BaseTestCase): + def _create_router(self, router=None, **kwargs): + if not router: + router = mock.MagicMock() + self.agent_conf = mock.Mock() + # NOTE The use_namespaces config will soon be deprecated + self.agent_conf.use_namespaces = True + self.router_id = _uuid() + return router_info.RouterInfo(self.router_id, + router, + self.agent_conf, + mock.sentinel.interface_driver, + **kwargs) + + +class TestBasicRouterOperations(BasicRouterTestCaseFramework): + + def test_get_floating_ips(self): + router = mock.MagicMock() + router.get.return_value = [mock.sentinel.floating_ip] + ri = self._create_router(router) + + fips = ri.get_floating_ips() + + self.assertEqual([mock.sentinel.floating_ip], fips) + + def test_process_floating_ip_nat_rules(self): + ri = self._create_router() + fips = [{'fixed_ip_address': mock.sentinel.ip, + 'floating_ip_address': mock.sentinel.fip}] + ri.get_floating_ips = mock.Mock(return_value=fips) + ri.iptables_manager = mock.MagicMock() + ipv4_nat = ri.iptables_manager.ipv4['nat'] + ri.floating_forward_rules = mock.Mock( + return_value=[(mock.sentinel.chain, mock.sentinel.rule)]) + + ri.process_floating_ip_nat_rules() + + # Be sure that the rules are cleared first and apply is called last + self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'), + ipv4_nat.mock_calls[0]) + self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1]) + + # Be sure that add_rule is called somewhere in the middle + ipv4_nat.add_rule.assert_called_once_with(mock.sentinel.chain, + mock.sentinel.rule, + tag='floating_ip') + + def test_process_floating_ip_nat_rules_removed(self): + ri = self._create_router() + ri.get_floating_ips = mock.Mock(return_value=[]) + ri.iptables_manager = mock.MagicMock() + ipv4_nat = ri.iptables_manager.ipv4['nat'] + + ri.process_floating_ip_nat_rules() + + # Be sure that the rules are cleared first and apply is called last + self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'), + ipv4_nat.mock_calls[0]) + self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1]) + + # Be sure that add_rule is called somewhere in the middle + self.assertFalse(ipv4_nat.add_rule.called) + + def _test_add_fip_addr_to_device_error(self, device): + ri = self._create_router() + ip = '15.1.2.3' + + result = ri._add_fip_addr_to_device( + {'id': mock.sentinel.id, 'floating_ip_address': ip}, device) + + device.addr.add.assert_called_with(ip + '/32') + return result + + def test__add_fip_addr_to_device(self): + result = self._test_add_fip_addr_to_device_error(mock.Mock()) + self.assertTrue(result) + + def test__add_fip_addr_to_device_error(self): + device = mock.Mock() + device.addr.add.side_effect = RuntimeError + result = self._test_add_fip_addr_to_device_error(device) + self.assertFalse(result) + + def test_process_snat_dnat_for_fip(self): + ri = self._create_router() + ri.process_floating_ip_nat_rules = mock.Mock(side_effect=Exception) + + self.assertRaises(n_exc.FloatingIpSetupException, + ri.process_snat_dnat_for_fip) + + ri.process_floating_ip_nat_rules.assert_called_once_with() + + def test_put_fips_in_error_state(self): + ri = self._create_router() + ri.router = mock.Mock() + ri.router.get.return_value = [{'id': mock.sentinel.id1}, + {'id': mock.sentinel.id2}] + + statuses = ri.put_fips_in_error_state() + + expected = [{mock.sentinel.id1: l3_constants.FLOATINGIP_STATUS_ERROR, + mock.sentinel.id2: l3_constants.FLOATINGIP_STATUS_ERROR}] + self.assertNotEqual(expected, statuses) + + def test_configure_fip_addresses(self): + ri = self._create_router() + ri.process_floating_ip_addresses = mock.Mock( + side_effect=Exception) + + self.assertRaises(n_exc.FloatingIpSetupException, + ri.configure_fip_addresses, + mock.sentinel.interface_name) + + ri.process_floating_ip_addresses.assert_called_once_with( + mock.sentinel.interface_name) + + def test_get_router_cidrs_returns_cidrs(self): + ri = self._create_router() + addresses = ['15.1.2.2/24', '15.1.2.3/32'] + device = mock.MagicMock() + device.addr.list.return_value = [{'cidr': addresses[0]}, + {'cidr': addresses[1]}] + self.assertEqual(set(addresses), ri.get_router_cidrs(device)) + + +@mock.patch.object(ip_lib, 'IPDevice') +class TestFloatingIpWithMockDevice(BasicRouterTestCaseFramework): + + def test_process_floating_ip_addresses_remap(self, IPDevice): + fip_id = _uuid() + fip = { + 'id': fip_id, 'port_id': _uuid(), + 'floating_ip_address': '15.1.2.3', + 'fixed_ip_address': '192.168.0.2' + } + + IPDevice.return_value = device = mock.Mock() + device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}] + ri = self._create_router() + ri.get_floating_ips = mock.Mock(return_value=[fip]) + + fip_statuses = ri.process_floating_ip_addresses( + mock.sentinel.interface_name) + self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ACTIVE}, + fip_statuses) + + self.assertFalse(device.addr.add.called) + self.assertFalse(device.addr.delete.called) + + def test_process_router_with_disabled_floating_ip(self, IPDevice): + fip_id = _uuid() + fip = { + 'id': fip_id, 'port_id': _uuid(), + 'floating_ip_address': '15.1.2.3', + 'fixed_ip_address': '192.168.0.2' + } + + ri = self._create_router() + ri.floating_ips = [fip] + ri.get_floating_ips = mock.Mock(return_value=[]) + + fip_statuses = ri.process_floating_ip_addresses( + mock.sentinel.interface_name) + + self.assertIsNone(fip_statuses.get(fip_id)) + + def test_process_router_floating_ip_with_device_add_error(self, IPDevice): + IPDevice.return_value = device = mock.Mock(side_effect=RuntimeError) + device.addr.list.return_value = [] + fip_id = _uuid() + fip = { + 'id': fip_id, 'port_id': _uuid(), + 'floating_ip_address': '15.1.2.3', + 'fixed_ip_address': '192.168.0.2' + } + ri = self._create_router() + ri.add_floating_ip = mock.Mock( + return_value=l3_constants.FLOATINGIP_STATUS_ERROR) + ri.get_floating_ips = mock.Mock(return_value=[fip]) + + fip_statuses = ri.process_floating_ip_addresses( + mock.sentinel.interface_name) + + self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ERROR}, + fip_statuses) + + # TODO(mrsmith): refactor for DVR cases + def test_process_floating_ip_addresses_remove(self, IPDevice): + IPDevice.return_value = device = mock.Mock() + device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}] + + ri = self._create_router() + ri.remove_floating_ip = mock.Mock() + ri.router.get = mock.Mock(return_value=[]) + + fip_statuses = ri.process_floating_ip_addresses( + mock.sentinel.interface_name) + self.assertEqual({}, fip_statuses) + ri.remove_floating_ip.assert_called_once_with(device, '15.1.2.3/32') diff --git a/neutron/tests/unit/agent/linux/test_process_monitor.py b/neutron/tests/unit/agent/linux/test_process_monitor.py deleted file mode 100644 index fe4093892f3..00000000000 --- a/neutron/tests/unit/agent/linux/test_process_monitor.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright 2014 Red Hat Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock - -from neutron.agent.linux import external_process -from neutron.tests import base - -TEST_UUID = 'test-uuid' -TEST_SERVICE = 'testsvc' -TEST_PID = 1234 - - -class BaseTestProcessMonitor(base.BaseTestCase): - - def setUp(self): - super(BaseTestProcessMonitor, self).setUp() - self.log_patch = mock.patch("neutron.agent.linux.external_process." - "LOG.error") - self.error_log = self.log_patch.start() - - self.spawn_patch = mock.patch("eventlet.spawn") - self.eventlent_spawn = self.spawn_patch.start() - - # create a default process monitor - self.create_child_process_monitor('respawn') - - def create_child_process_monitor(self, action): - conf = mock.Mock() - conf.AGENT.check_child_processes_action = action - conf.AGENT.check_child_processes = True - self.pmonitor = external_process.ProcessMonitor( - config=conf, - resource_type='test') - - def get_monitored_process(self, uuid, service=None): - monitored_process = mock.Mock() - self.pmonitor.register(uuid=uuid, - service_name=service, - monitored_process=monitored_process) - return monitored_process - - -class TestProcessMonitor(BaseTestProcessMonitor): - - def test_error_logged(self): - pm = self.get_monitored_process(TEST_UUID) - pm.active = False - self.pmonitor._check_child_processes() - self.assertTrue(self.error_log.called) - - def test_exit_handler(self): - self.create_child_process_monitor('exit') - pm = self.get_monitored_process(TEST_UUID) - pm.active = False - with mock.patch.object(external_process.ProcessMonitor, - '_exit_handler') as exit_handler: - self.pmonitor._check_child_processes() - exit_handler.assert_called_once_with(TEST_UUID, None) - - def test_register(self): - pm = self.get_monitored_process(TEST_UUID) - self.assertEqual(len(self.pmonitor._monitored_processes), 1) - self.assertIn(pm, self.pmonitor._monitored_processes.values()) - - def test_register_same_service_twice(self): - self.get_monitored_process(TEST_UUID) - self.get_monitored_process(TEST_UUID) - self.assertEqual(len(self.pmonitor._monitored_processes), 1) - - def test_register_different_service_types(self): - self.get_monitored_process(TEST_UUID) - self.get_monitored_process(TEST_UUID, TEST_SERVICE) - self.assertEqual(len(self.pmonitor._monitored_processes), 2) - - def test_unregister(self): - self.get_monitored_process(TEST_UUID) - self.pmonitor.unregister(TEST_UUID, None) - self.assertEqual(len(self.pmonitor._monitored_processes), 0) - - def test_unregister_unknown_process(self): - self.pmonitor.unregister(TEST_UUID, None) - self.assertEqual(len(self.pmonitor._monitored_processes), 0) diff --git a/neutron/tests/unit/test_api_v2.py b/neutron/tests/unit/test_api_v2.py index dd98d9046f5..c4b706bd5ab 100644 --- a/neutron/tests/unit/test_api_v2.py +++ b/neutron/tests/unit/test_api_v2.py @@ -40,8 +40,7 @@ from neutron.tests import fake_notifier from neutron.tests.unit import testlib_api -ROOTDIR = os.path.dirname(os.path.dirname(__file__)) -EXTDIR = os.path.join(ROOTDIR, 'unit/extensions') +EXTDIR = os.path.join(base.ROOTDIR, 'unit/extensions') _uuid = uuidutils.generate_uuid diff --git a/neutron/tests/unit/test_db_plugin.py b/neutron/tests/unit/test_db_plugin.py index 26b62202c57..8a5393b4939 100644 --- a/neutron/tests/unit/test_db_plugin.py +++ b/neutron/tests/unit/test_db_plugin.py @@ -5452,3 +5452,63 @@ class NeutronDbPluginV2AsMixinTestCase(testlib_api.SqlTestCase): self.net_data['network']['status'] = 'BUILD' net = self.plugin.create_network(self.context, self.net_data) self.assertEqual(net['status'], 'BUILD') + + +class TestNetworks(testlib_api.SqlTestCase): + def setUp(self): + super(TestNetworks, self).setUp() + self._tenant_id = 'test-tenant' + + # Update the plugin + self.setup_coreplugin(DB_PLUGIN_KLASS) + + def _create_network(self, plugin, ctx, shared=True): + network = {'network': {'name': 'net', + 'shared': shared, + 'admin_state_up': True, + 'tenant_id': self._tenant_id}} + created_network = plugin.create_network(ctx, network) + return (network, created_network['id']) + + def _create_port(self, plugin, ctx, net_id, device_owner, tenant_id): + port = {'port': {'name': 'port', + 'network_id': net_id, + 'mac_address': attributes.ATTR_NOT_SPECIFIED, + 'fixed_ips': attributes.ATTR_NOT_SPECIFIED, + 'admin_state_up': True, + 'device_id': 'device_id', + 'device_owner': device_owner, + 'tenant_id': tenant_id}} + plugin.create_port(ctx, port) + + def _test_update_shared_net_used(self, + device_owner, + expected_exception=None): + plugin = manager.NeutronManager.get_plugin() + ctx = context.get_admin_context() + network, net_id = self._create_network(plugin, ctx) + + self._create_port(plugin, + ctx, + net_id, + device_owner, + self._tenant_id + '1') + + network['network']['shared'] = False + + if (expected_exception): + with testlib_api.ExpectedException(expected_exception): + plugin.update_network(ctx, net_id, network) + else: + plugin.update_network(ctx, net_id, network) + + def test_update_shared_net_used_fails(self): + self._test_update_shared_net_used('', n_exc.InvalidSharedSetting) + + def test_update_shared_net_used_as_router_gateway(self): + self._test_update_shared_net_used( + constants.DEVICE_OWNER_ROUTER_GW) + + def test_update_shared_net_used_by_floating_ip(self): + self._test_update_shared_net_used( + constants.DEVICE_OWNER_FLOATINGIP) diff --git a/neutron/tests/unit/test_db_plugin_level.py b/neutron/tests/unit/test_db_plugin_level.py deleted file mode 100644 index a0ebbb5e66e..00000000000 --- a/neutron/tests/unit/test_db_plugin_level.py +++ /dev/null @@ -1,82 +0,0 @@ -# Copyright (c) 2014 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from neutron.api.v2 import attributes -from neutron.common import constants -from neutron.common import exceptions as n_exc -from neutron import context -from neutron import manager -from neutron.tests.unit import test_db_plugin -from neutron.tests.unit import testlib_api - - -class TestNetworks(testlib_api.SqlTestCase): - def setUp(self): - super(TestNetworks, self).setUp() - self._tenant_id = 'test-tenant' - - # Update the plugin - self.setup_coreplugin(test_db_plugin.DB_PLUGIN_KLASS) - - def _create_network(self, plugin, ctx, shared=True): - network = {'network': {'name': 'net', - 'shared': shared, - 'admin_state_up': True, - 'tenant_id': self._tenant_id}} - created_network = plugin.create_network(ctx, network) - return (network, created_network['id']) - - def _create_port(self, plugin, ctx, net_id, device_owner, tenant_id): - port = {'port': {'name': 'port', - 'network_id': net_id, - 'mac_address': attributes.ATTR_NOT_SPECIFIED, - 'fixed_ips': attributes.ATTR_NOT_SPECIFIED, - 'admin_state_up': True, - 'device_id': 'device_id', - 'device_owner': device_owner, - 'tenant_id': tenant_id}} - plugin.create_port(ctx, port) - - def _test_update_shared_net_used(self, - device_owner, - expected_exception=None): - plugin = manager.NeutronManager.get_plugin() - ctx = context.get_admin_context() - network, net_id = self._create_network(plugin, ctx) - - self._create_port(plugin, - ctx, - net_id, - device_owner, - self._tenant_id + '1') - - network['network']['shared'] = False - - if (expected_exception): - with testlib_api.ExpectedException(expected_exception): - plugin.update_network(ctx, net_id, network) - else: - plugin.update_network(ctx, net_id, network) - - def test_update_shared_net_used_fails(self): - self._test_update_shared_net_used('', n_exc.InvalidSharedSetting) - - def test_update_shared_net_used_as_router_gateway(self): - self._test_update_shared_net_used( - constants.DEVICE_OWNER_ROUTER_GW) - - def test_update_shared_net_used_by_floating_ip(self): - self._test_update_shared_net_used( - constants.DEVICE_OWNER_FLOATINGIP) diff --git a/neutron/tests/unit/test_extension_extended_attribute.py b/neutron/tests/unit/test_extension_extended_attribute.py deleted file mode 100644 index 7ff48c837f0..00000000000 --- a/neutron/tests/unit/test_extension_extended_attribute.py +++ /dev/null @@ -1,154 +0,0 @@ -# Copyright 2013 VMware, Inc -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Unit tests for extension extended attribute -""" - -from oslo_config import cfg -import webob.exc as webexc - -import neutron -from neutron.api import extensions -from neutron.api.v2 import attributes -from neutron.common import config -from neutron import manager -from neutron.plugins.common import constants -from neutron.plugins.ml2 import plugin as ml2_plugin -from neutron import quota -from neutron.tests import base -from neutron.tests.unit.extensions import extendedattribute as extattr -from neutron.tests.unit import test_api_v2 -from neutron.tests.unit import testlib_api -from neutron import wsgi - -_uuid = test_api_v2._uuid -_get_path = test_api_v2._get_path -extensions_path = ':'.join(neutron.tests.unit.extensions.__path__) - - -class ExtensionExtendedAttributeTestPlugin( - ml2_plugin.Ml2Plugin): - - supported_extension_aliases = [ - 'ext-obj-test', "extended-ext-attr" - ] - - def __init__(self, configfile=None): - super(ExtensionExtendedAttributeTestPlugin, self) - self.objs = [] - self.objh = {} - - def create_ext_test_resource(self, context, ext_test_resource): - obj = ext_test_resource['ext_test_resource'] - id = _uuid() - obj['id'] = id - self.objs.append(obj) - self.objh.update({id: obj}) - return obj - - def get_ext_test_resources(self, context, filters=None, fields=None): - return self.objs - - def get_ext_test_resource(self, context, id, fields=None): - return self.objh[id] - - -class ExtensionExtendedAttributeTestCase(base.BaseTestCase): - def setUp(self): - super(ExtensionExtendedAttributeTestCase, self).setUp() - plugin = ( - "neutron.tests.unit.test_extension_extended_attribute." - "ExtensionExtendedAttributeTestPlugin" - ) - - # point config file to: neutron/tests/etc/neutron.conf.test - self.config_parse() - - self.setup_coreplugin(plugin) - - ext_mgr = extensions.PluginAwareExtensionManager( - extensions_path, - {constants.CORE: ExtensionExtendedAttributeTestPlugin} - ) - ext_mgr.extend_resources("2.0", {}) - extensions.PluginAwareExtensionManager._instance = ext_mgr - - app = config.load_paste_app('extensions_test_app') - self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr) - - self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1" - # Save the global RESOURCE_ATTRIBUTE_MAP - self.saved_attr_map = {} - for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems(): - self.saved_attr_map[resource] = attrs.copy() - # Add the resources to the global attribute map - # This is done here as the setup process won't - # initialize the main API router which extends - # the global attribute map - attributes.RESOURCE_ATTRIBUTE_MAP.update( - extattr.EXTENDED_ATTRIBUTES_2_0) - self.agentscheduler_dbMinxin = manager.NeutronManager.get_plugin() - self.addCleanup(self.restore_attribute_map) - - quota.QUOTAS._driver = None - cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver', - group='QUOTAS') - - def restore_attribute_map(self): - # Restore the original RESOURCE_ATTRIBUTE_MAP - attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map - - def _do_request(self, method, path, data=None, params=None, action=None): - content_type = 'application/json' - body = None - if data is not None: # empty dict is valid - body = wsgi.Serializer().serialize(data, content_type) - - req = testlib_api.create_request( - path, body, content_type, - method, query_string=params) - res = req.get_response(self._api) - if res.status_code >= 400: - raise webexc.HTTPClientError(detail=res.body, code=res.status_code) - if res.status_code != webexc.HTTPNoContent.code: - return res.json - - def _ext_test_resource_create(self, attr=None): - data = { - "ext_test_resource": { - "tenant_id": self._tenant_id, - "name": "test", - extattr.EXTENDED_ATTRIBUTE: attr - } - } - - res = self._do_request('POST', _get_path('ext_test_resources'), data) - return res['ext_test_resource'] - - def test_ext_test_resource_create(self): - ext_test_resource = self._ext_test_resource_create() - attr = _uuid() - ext_test_resource = self._ext_test_resource_create(attr) - self.assertEqual(ext_test_resource[extattr.EXTENDED_ATTRIBUTE], attr) - - def test_ext_test_resource_get(self): - attr = _uuid() - obj = self._ext_test_resource_create(attr) - obj_id = obj['id'] - res = self._do_request('GET', _get_path( - 'ext_test_resources/{0}'.format(obj_id))) - obj2 = res['ext_test_resource'] - self.assertEqual(obj2[extattr.EXTENDED_ATTRIBUTE], attr) diff --git a/neutron/tests/unit/test_extensions.py b/neutron/tests/unit/test_extensions.py index e1407299334..ae533032b03 100644 --- a/neutron/tests/unit/test_extensions.py +++ b/neutron/tests/unit/test_extensions.py @@ -16,25 +16,36 @@ import abc import mock +from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils import routes import webob +import webob.exc as webexc import webtest +import neutron from neutron.api import extensions +from neutron.api.v2 import attributes from neutron.common import config from neutron.common import exceptions from neutron.db import db_base_plugin_v2 +from neutron import manager from neutron.plugins.common import constants +from neutron.plugins.ml2 import plugin as ml2_plugin +from neutron import quota from neutron.tests import base from neutron.tests.unit import extension_stubs as ext_stubs import neutron.tests.unit.extensions +from neutron.tests.unit.extensions import extendedattribute as extattr +from neutron.tests.unit import test_api_v2 from neutron.tests.unit import testlib_api from neutron import wsgi LOG = logging.getLogger(__name__) +_uuid = test_api_v2._uuid +_get_path = test_api_v2._get_path extensions_path = ':'.join(neutron.tests.unit.extensions.__path__) @@ -677,3 +688,118 @@ class SimpleExtensionManager(object): if self.request_ext: request_extensions.append(self.request_ext) return request_extensions + + +class ExtensionExtendedAttributeTestPlugin( + ml2_plugin.Ml2Plugin): + + supported_extension_aliases = [ + 'ext-obj-test', "extended-ext-attr" + ] + + def __init__(self, configfile=None): + super(ExtensionExtendedAttributeTestPlugin, self) + self.objs = [] + self.objh = {} + + def create_ext_test_resource(self, context, ext_test_resource): + obj = ext_test_resource['ext_test_resource'] + id = _uuid() + obj['id'] = id + self.objs.append(obj) + self.objh.update({id: obj}) + return obj + + def get_ext_test_resources(self, context, filters=None, fields=None): + return self.objs + + def get_ext_test_resource(self, context, id, fields=None): + return self.objh[id] + + +class ExtensionExtendedAttributeTestCase(base.BaseTestCase): + def setUp(self): + super(ExtensionExtendedAttributeTestCase, self).setUp() + plugin = ( + "neutron.tests.unit.test_extensions." + "ExtensionExtendedAttributeTestPlugin" + ) + + # point config file to: neutron/tests/etc/neutron.conf.test + self.config_parse() + + self.setup_coreplugin(plugin) + + ext_mgr = extensions.PluginAwareExtensionManager( + extensions_path, + {constants.CORE: ExtensionExtendedAttributeTestPlugin} + ) + ext_mgr.extend_resources("2.0", {}) + extensions.PluginAwareExtensionManager._instance = ext_mgr + + app = config.load_paste_app('extensions_test_app') + self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr) + + self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1" + # Save the global RESOURCE_ATTRIBUTE_MAP + self.saved_attr_map = {} + for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems(): + self.saved_attr_map[resource] = attrs.copy() + # Add the resources to the global attribute map + # This is done here as the setup process won't + # initialize the main API router which extends + # the global attribute map + attributes.RESOURCE_ATTRIBUTE_MAP.update( + extattr.EXTENDED_ATTRIBUTES_2_0) + self.agentscheduler_dbMinxin = manager.NeutronManager.get_plugin() + self.addCleanup(self.restore_attribute_map) + + quota.QUOTAS._driver = None + cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver', + group='QUOTAS') + + def restore_attribute_map(self): + # Restore the original RESOURCE_ATTRIBUTE_MAP + attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map + + def _do_request(self, method, path, data=None, params=None, action=None): + content_type = 'application/json' + body = None + if data is not None: # empty dict is valid + body = wsgi.Serializer().serialize(data, content_type) + + req = testlib_api.create_request( + path, body, content_type, + method, query_string=params) + res = req.get_response(self._api) + if res.status_code >= 400: + raise webexc.HTTPClientError(detail=res.body, code=res.status_code) + if res.status_code != webexc.HTTPNoContent.code: + return res.json + + def _ext_test_resource_create(self, attr=None): + data = { + "ext_test_resource": { + "tenant_id": self._tenant_id, + "name": "test", + extattr.EXTENDED_ATTRIBUTE: attr + } + } + + res = self._do_request('POST', _get_path('ext_test_resources'), data) + return res['ext_test_resource'] + + def test_ext_test_resource_create(self): + ext_test_resource = self._ext_test_resource_create() + attr = _uuid() + ext_test_resource = self._ext_test_resource_create(attr) + self.assertEqual(ext_test_resource[extattr.EXTENDED_ATTRIBUTE], attr) + + def test_ext_test_resource_get(self): + attr = _uuid() + obj = self._ext_test_resource_create(attr) + obj_id = obj['id'] + res = self._do_request('GET', _get_path( + 'ext_test_resources/{0}'.format(obj_id))) + obj2 = res['ext_test_resource'] + self.assertEqual(obj2[extattr.EXTENDED_ATTRIBUTE], attr) diff --git a/neutron/tests/unit/test_linux_external_process.py b/neutron/tests/unit/test_linux_external_process.py index 99cd7d8f2f2..cb99da8f73f 100644 --- a/neutron/tests/unit/test_linux_external_process.py +++ b/neutron/tests/unit/test_linux_external_process.py @@ -20,6 +20,83 @@ from neutron.agent.linux import utils from neutron.tests import base +TEST_UUID = 'test-uuid' +TEST_SERVICE = 'testsvc' +TEST_PID = 1234 + + +class BaseTestProcessMonitor(base.BaseTestCase): + + def setUp(self): + super(BaseTestProcessMonitor, self).setUp() + self.log_patch = mock.patch("neutron.agent.linux.external_process." + "LOG.error") + self.error_log = self.log_patch.start() + + self.spawn_patch = mock.patch("eventlet.spawn") + self.eventlent_spawn = self.spawn_patch.start() + + # create a default process monitor + self.create_child_process_monitor('respawn') + + def create_child_process_monitor(self, action): + conf = mock.Mock() + conf.AGENT.check_child_processes_action = action + conf.AGENT.check_child_processes = True + self.pmonitor = ep.ProcessMonitor( + config=conf, + resource_type='test') + + def get_monitored_process(self, uuid, service=None): + monitored_process = mock.Mock() + self.pmonitor.register(uuid=uuid, + service_name=service, + monitored_process=monitored_process) + return monitored_process + + +class TestProcessMonitor(BaseTestProcessMonitor): + + def test_error_logged(self): + pm = self.get_monitored_process(TEST_UUID) + pm.active = False + self.pmonitor._check_child_processes() + self.assertTrue(self.error_log.called) + + def test_exit_handler(self): + self.create_child_process_monitor('exit') + pm = self.get_monitored_process(TEST_UUID) + pm.active = False + with mock.patch.object(ep.ProcessMonitor, + '_exit_handler') as exit_handler: + self.pmonitor._check_child_processes() + exit_handler.assert_called_once_with(TEST_UUID, None) + + def test_register(self): + pm = self.get_monitored_process(TEST_UUID) + self.assertEqual(len(self.pmonitor._monitored_processes), 1) + self.assertIn(pm, self.pmonitor._monitored_processes.values()) + + def test_register_same_service_twice(self): + self.get_monitored_process(TEST_UUID) + self.get_monitored_process(TEST_UUID) + self.assertEqual(len(self.pmonitor._monitored_processes), 1) + + def test_register_different_service_types(self): + self.get_monitored_process(TEST_UUID) + self.get_monitored_process(TEST_UUID, TEST_SERVICE) + self.assertEqual(len(self.pmonitor._monitored_processes), 2) + + def test_unregister(self): + self.get_monitored_process(TEST_UUID) + self.pmonitor.unregister(TEST_UUID, None) + self.assertEqual(len(self.pmonitor._monitored_processes), 0) + + def test_unregister_unknown_process(self): + self.pmonitor.unregister(TEST_UUID, None) + self.assertEqual(len(self.pmonitor._monitored_processes), 0) + + class TestProcessManager(base.BaseTestCase): def setUp(self): super(TestProcessManager, self).setUp()