Merge "Prepare for unit test reorg"

This commit is contained in:
Jenkins 2015-04-06 13:37:14 +00:00 committed by Gerrit Code Review
commit aa943f930d
9 changed files with 469 additions and 559 deletions

View File

@ -1,226 +0,0 @@
# Copyright (c) 2015 Openstack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.agent.l3 import router_info
from neutron.agent.linux import ip_lib
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
from neutron.openstack.common import uuidutils
from neutron.tests import base
_uuid = uuidutils.generate_uuid
class BasicRouterTestCaseFramework(base.BaseTestCase):
def _create_router(self, router=None, **kwargs):
if not router:
router = mock.MagicMock()
self.agent_conf = mock.Mock()
# NOTE The use_namespaces config will soon be deprecated
self.agent_conf.use_namespaces = True
self.router_id = _uuid()
return router_info.RouterInfo(self.router_id,
router,
self.agent_conf,
mock.sentinel.interface_driver,
**kwargs)
class TestBasicRouterOperations(BasicRouterTestCaseFramework):
def test_get_floating_ips(self):
router = mock.MagicMock()
router.get.return_value = [mock.sentinel.floating_ip]
ri = self._create_router(router)
fips = ri.get_floating_ips()
self.assertEqual([mock.sentinel.floating_ip], fips)
def test_process_floating_ip_nat_rules(self):
ri = self._create_router()
fips = [{'fixed_ip_address': mock.sentinel.ip,
'floating_ip_address': mock.sentinel.fip}]
ri.get_floating_ips = mock.Mock(return_value=fips)
ri.iptables_manager = mock.MagicMock()
ipv4_nat = ri.iptables_manager.ipv4['nat']
ri.floating_forward_rules = mock.Mock(
return_value=[(mock.sentinel.chain, mock.sentinel.rule)])
ri.process_floating_ip_nat_rules()
# Be sure that the rules are cleared first and apply is called last
self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'),
ipv4_nat.mock_calls[0])
self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1])
# Be sure that add_rule is called somewhere in the middle
ipv4_nat.add_rule.assert_called_once_with(mock.sentinel.chain,
mock.sentinel.rule,
tag='floating_ip')
def test_process_floating_ip_nat_rules_removed(self):
ri = self._create_router()
ri.get_floating_ips = mock.Mock(return_value=[])
ri.iptables_manager = mock.MagicMock()
ipv4_nat = ri.iptables_manager.ipv4['nat']
ri.process_floating_ip_nat_rules()
# Be sure that the rules are cleared first and apply is called last
self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'),
ipv4_nat.mock_calls[0])
self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1])
# Be sure that add_rule is called somewhere in the middle
self.assertFalse(ipv4_nat.add_rule.called)
def _test_add_fip_addr_to_device_error(self, device):
ri = self._create_router()
ip = '15.1.2.3'
result = ri._add_fip_addr_to_device(
{'id': mock.sentinel.id, 'floating_ip_address': ip}, device)
device.addr.add.assert_called_with(ip + '/32')
return result
def test__add_fip_addr_to_device(self):
result = self._test_add_fip_addr_to_device_error(mock.Mock())
self.assertTrue(result)
def test__add_fip_addr_to_device_error(self):
device = mock.Mock()
device.addr.add.side_effect = RuntimeError
result = self._test_add_fip_addr_to_device_error(device)
self.assertFalse(result)
def test_process_snat_dnat_for_fip(self):
ri = self._create_router()
ri.process_floating_ip_nat_rules = mock.Mock(side_effect=Exception)
self.assertRaises(n_exc.FloatingIpSetupException,
ri.process_snat_dnat_for_fip)
ri.process_floating_ip_nat_rules.assert_called_once_with()
def test_put_fips_in_error_state(self):
ri = self._create_router()
ri.router = mock.Mock()
ri.router.get.return_value = [{'id': mock.sentinel.id1},
{'id': mock.sentinel.id2}]
statuses = ri.put_fips_in_error_state()
expected = [{mock.sentinel.id1: l3_constants.FLOATINGIP_STATUS_ERROR,
mock.sentinel.id2: l3_constants.FLOATINGIP_STATUS_ERROR}]
self.assertNotEqual(expected, statuses)
def test_configure_fip_addresses(self):
ri = self._create_router()
ri.process_floating_ip_addresses = mock.Mock(
side_effect=Exception)
self.assertRaises(n_exc.FloatingIpSetupException,
ri.configure_fip_addresses,
mock.sentinel.interface_name)
ri.process_floating_ip_addresses.assert_called_once_with(
mock.sentinel.interface_name)
def test_get_router_cidrs_returns_cidrs(self):
ri = self._create_router()
addresses = ['15.1.2.2/24', '15.1.2.3/32']
device = mock.MagicMock()
device.addr.list.return_value = [{'cidr': addresses[0]},
{'cidr': addresses[1]}]
self.assertEqual(set(addresses), ri.get_router_cidrs(device))
@mock.patch.object(ip_lib, 'IPDevice')
class TestFloatingIpWithMockDevice(BasicRouterTestCaseFramework):
def test_process_floating_ip_addresses_remap(self, IPDevice):
fip_id = _uuid()
fip = {
'id': fip_id, 'port_id': _uuid(),
'floating_ip_address': '15.1.2.3',
'fixed_ip_address': '192.168.0.2'
}
IPDevice.return_value = device = mock.Mock()
device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}]
ri = self._create_router()
ri.get_floating_ips = mock.Mock(return_value=[fip])
fip_statuses = ri.process_floating_ip_addresses(
mock.sentinel.interface_name)
self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ACTIVE},
fip_statuses)
self.assertFalse(device.addr.add.called)
self.assertFalse(device.addr.delete.called)
def test_process_router_with_disabled_floating_ip(self, IPDevice):
fip_id = _uuid()
fip = {
'id': fip_id, 'port_id': _uuid(),
'floating_ip_address': '15.1.2.3',
'fixed_ip_address': '192.168.0.2'
}
ri = self._create_router()
ri.floating_ips = [fip]
ri.get_floating_ips = mock.Mock(return_value=[])
fip_statuses = ri.process_floating_ip_addresses(
mock.sentinel.interface_name)
self.assertIsNone(fip_statuses.get(fip_id))
def test_process_router_floating_ip_with_device_add_error(self, IPDevice):
IPDevice.return_value = device = mock.Mock(side_effect=RuntimeError)
device.addr.list.return_value = []
fip_id = _uuid()
fip = {
'id': fip_id, 'port_id': _uuid(),
'floating_ip_address': '15.1.2.3',
'fixed_ip_address': '192.168.0.2'
}
ri = self._create_router()
ri.add_floating_ip = mock.Mock(
return_value=l3_constants.FLOATINGIP_STATUS_ERROR)
ri.get_floating_ips = mock.Mock(return_value=[fip])
fip_statuses = ri.process_floating_ip_addresses(
mock.sentinel.interface_name)
self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ERROR},
fip_statuses)
# TODO(mrsmith): refactor for DVR cases
def test_process_floating_ip_addresses_remove(self, IPDevice):
IPDevice.return_value = device = mock.Mock()
device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}]
ri = self._create_router()
ri.remove_floating_ip = mock.Mock()
ri.router.get = mock.Mock(return_value=[])
fip_statuses = ri.process_floating_ip_addresses(
mock.sentinel.interface_name)
self.assertEqual({}, fip_statuses)
ri.remove_floating_ip.assert_called_once_with(device, '15.1.2.3/32')

View File

@ -14,6 +14,9 @@ import mock
from neutron.agent.common import config as agent_config
from neutron.agent.l3 import router_info
from neutron.agent.linux import ip_lib
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
from neutron.openstack.common import uuidutils
from neutron.tests import base
@ -104,3 +107,205 @@ class TestRouterInfo(base.BaseTestCase):
expected = [['ip', 'route', 'delete', 'to', '110.100.30.0/24',
'via', '10.100.10.30']]
self._check_agent_method_called(expected)
class BasicRouterTestCaseFramework(base.BaseTestCase):
def _create_router(self, router=None, **kwargs):
if not router:
router = mock.MagicMock()
self.agent_conf = mock.Mock()
# NOTE The use_namespaces config will soon be deprecated
self.agent_conf.use_namespaces = True
self.router_id = _uuid()
return router_info.RouterInfo(self.router_id,
router,
self.agent_conf,
mock.sentinel.interface_driver,
**kwargs)
class TestBasicRouterOperations(BasicRouterTestCaseFramework):
def test_get_floating_ips(self):
router = mock.MagicMock()
router.get.return_value = [mock.sentinel.floating_ip]
ri = self._create_router(router)
fips = ri.get_floating_ips()
self.assertEqual([mock.sentinel.floating_ip], fips)
def test_process_floating_ip_nat_rules(self):
ri = self._create_router()
fips = [{'fixed_ip_address': mock.sentinel.ip,
'floating_ip_address': mock.sentinel.fip}]
ri.get_floating_ips = mock.Mock(return_value=fips)
ri.iptables_manager = mock.MagicMock()
ipv4_nat = ri.iptables_manager.ipv4['nat']
ri.floating_forward_rules = mock.Mock(
return_value=[(mock.sentinel.chain, mock.sentinel.rule)])
ri.process_floating_ip_nat_rules()
# Be sure that the rules are cleared first and apply is called last
self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'),
ipv4_nat.mock_calls[0])
self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1])
# Be sure that add_rule is called somewhere in the middle
ipv4_nat.add_rule.assert_called_once_with(mock.sentinel.chain,
mock.sentinel.rule,
tag='floating_ip')
def test_process_floating_ip_nat_rules_removed(self):
ri = self._create_router()
ri.get_floating_ips = mock.Mock(return_value=[])
ri.iptables_manager = mock.MagicMock()
ipv4_nat = ri.iptables_manager.ipv4['nat']
ri.process_floating_ip_nat_rules()
# Be sure that the rules are cleared first and apply is called last
self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'),
ipv4_nat.mock_calls[0])
self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1])
# Be sure that add_rule is called somewhere in the middle
self.assertFalse(ipv4_nat.add_rule.called)
def _test_add_fip_addr_to_device_error(self, device):
ri = self._create_router()
ip = '15.1.2.3'
result = ri._add_fip_addr_to_device(
{'id': mock.sentinel.id, 'floating_ip_address': ip}, device)
device.addr.add.assert_called_with(ip + '/32')
return result
def test__add_fip_addr_to_device(self):
result = self._test_add_fip_addr_to_device_error(mock.Mock())
self.assertTrue(result)
def test__add_fip_addr_to_device_error(self):
device = mock.Mock()
device.addr.add.side_effect = RuntimeError
result = self._test_add_fip_addr_to_device_error(device)
self.assertFalse(result)
def test_process_snat_dnat_for_fip(self):
ri = self._create_router()
ri.process_floating_ip_nat_rules = mock.Mock(side_effect=Exception)
self.assertRaises(n_exc.FloatingIpSetupException,
ri.process_snat_dnat_for_fip)
ri.process_floating_ip_nat_rules.assert_called_once_with()
def test_put_fips_in_error_state(self):
ri = self._create_router()
ri.router = mock.Mock()
ri.router.get.return_value = [{'id': mock.sentinel.id1},
{'id': mock.sentinel.id2}]
statuses = ri.put_fips_in_error_state()
expected = [{mock.sentinel.id1: l3_constants.FLOATINGIP_STATUS_ERROR,
mock.sentinel.id2: l3_constants.FLOATINGIP_STATUS_ERROR}]
self.assertNotEqual(expected, statuses)
def test_configure_fip_addresses(self):
ri = self._create_router()
ri.process_floating_ip_addresses = mock.Mock(
side_effect=Exception)
self.assertRaises(n_exc.FloatingIpSetupException,
ri.configure_fip_addresses,
mock.sentinel.interface_name)
ri.process_floating_ip_addresses.assert_called_once_with(
mock.sentinel.interface_name)
def test_get_router_cidrs_returns_cidrs(self):
ri = self._create_router()
addresses = ['15.1.2.2/24', '15.1.2.3/32']
device = mock.MagicMock()
device.addr.list.return_value = [{'cidr': addresses[0]},
{'cidr': addresses[1]}]
self.assertEqual(set(addresses), ri.get_router_cidrs(device))
@mock.patch.object(ip_lib, 'IPDevice')
class TestFloatingIpWithMockDevice(BasicRouterTestCaseFramework):
def test_process_floating_ip_addresses_remap(self, IPDevice):
fip_id = _uuid()
fip = {
'id': fip_id, 'port_id': _uuid(),
'floating_ip_address': '15.1.2.3',
'fixed_ip_address': '192.168.0.2'
}
IPDevice.return_value = device = mock.Mock()
device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}]
ri = self._create_router()
ri.get_floating_ips = mock.Mock(return_value=[fip])
fip_statuses = ri.process_floating_ip_addresses(
mock.sentinel.interface_name)
self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ACTIVE},
fip_statuses)
self.assertFalse(device.addr.add.called)
self.assertFalse(device.addr.delete.called)
def test_process_router_with_disabled_floating_ip(self, IPDevice):
fip_id = _uuid()
fip = {
'id': fip_id, 'port_id': _uuid(),
'floating_ip_address': '15.1.2.3',
'fixed_ip_address': '192.168.0.2'
}
ri = self._create_router()
ri.floating_ips = [fip]
ri.get_floating_ips = mock.Mock(return_value=[])
fip_statuses = ri.process_floating_ip_addresses(
mock.sentinel.interface_name)
self.assertIsNone(fip_statuses.get(fip_id))
def test_process_router_floating_ip_with_device_add_error(self, IPDevice):
IPDevice.return_value = device = mock.Mock(side_effect=RuntimeError)
device.addr.list.return_value = []
fip_id = _uuid()
fip = {
'id': fip_id, 'port_id': _uuid(),
'floating_ip_address': '15.1.2.3',
'fixed_ip_address': '192.168.0.2'
}
ri = self._create_router()
ri.add_floating_ip = mock.Mock(
return_value=l3_constants.FLOATINGIP_STATUS_ERROR)
ri.get_floating_ips = mock.Mock(return_value=[fip])
fip_statuses = ri.process_floating_ip_addresses(
mock.sentinel.interface_name)
self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ERROR},
fip_statuses)
# TODO(mrsmith): refactor for DVR cases
def test_process_floating_ip_addresses_remove(self, IPDevice):
IPDevice.return_value = device = mock.Mock()
device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}]
ri = self._create_router()
ri.remove_floating_ip = mock.Mock()
ri.router.get = mock.Mock(return_value=[])
fip_statuses = ri.process_floating_ip_addresses(
mock.sentinel.interface_name)
self.assertEqual({}, fip_statuses)
ri.remove_floating_ip.assert_called_once_with(device, '15.1.2.3/32')

View File

@ -1,95 +0,0 @@
# Copyright 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
from neutron.agent.linux import external_process
from neutron.tests import base
TEST_UUID = 'test-uuid'
TEST_SERVICE = 'testsvc'
TEST_PID = 1234
class BaseTestProcessMonitor(base.BaseTestCase):
def setUp(self):
super(BaseTestProcessMonitor, self).setUp()
self.log_patch = mock.patch("neutron.agent.linux.external_process."
"LOG.error")
self.error_log = self.log_patch.start()
self.spawn_patch = mock.patch("eventlet.spawn")
self.eventlent_spawn = self.spawn_patch.start()
# create a default process monitor
self.create_child_process_monitor('respawn')
def create_child_process_monitor(self, action):
conf = mock.Mock()
conf.AGENT.check_child_processes_action = action
conf.AGENT.check_child_processes = True
self.pmonitor = external_process.ProcessMonitor(
config=conf,
resource_type='test')
def get_monitored_process(self, uuid, service=None):
monitored_process = mock.Mock()
self.pmonitor.register(uuid=uuid,
service_name=service,
monitored_process=monitored_process)
return monitored_process
class TestProcessMonitor(BaseTestProcessMonitor):
def test_error_logged(self):
pm = self.get_monitored_process(TEST_UUID)
pm.active = False
self.pmonitor._check_child_processes()
self.assertTrue(self.error_log.called)
def test_exit_handler(self):
self.create_child_process_monitor('exit')
pm = self.get_monitored_process(TEST_UUID)
pm.active = False
with mock.patch.object(external_process.ProcessMonitor,
'_exit_handler') as exit_handler:
self.pmonitor._check_child_processes()
exit_handler.assert_called_once_with(TEST_UUID, None)
def test_register(self):
pm = self.get_monitored_process(TEST_UUID)
self.assertEqual(len(self.pmonitor._monitored_processes), 1)
self.assertIn(pm, self.pmonitor._monitored_processes.values())
def test_register_same_service_twice(self):
self.get_monitored_process(TEST_UUID)
self.get_monitored_process(TEST_UUID)
self.assertEqual(len(self.pmonitor._monitored_processes), 1)
def test_register_different_service_types(self):
self.get_monitored_process(TEST_UUID)
self.get_monitored_process(TEST_UUID, TEST_SERVICE)
self.assertEqual(len(self.pmonitor._monitored_processes), 2)
def test_unregister(self):
self.get_monitored_process(TEST_UUID)
self.pmonitor.unregister(TEST_UUID, None)
self.assertEqual(len(self.pmonitor._monitored_processes), 0)
def test_unregister_unknown_process(self):
self.pmonitor.unregister(TEST_UUID, None)
self.assertEqual(len(self.pmonitor._monitored_processes), 0)

View File

@ -40,8 +40,7 @@ from neutron.tests import fake_notifier
from neutron.tests.unit import testlib_api
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
EXTDIR = os.path.join(ROOTDIR, 'unit/extensions')
EXTDIR = os.path.join(base.ROOTDIR, 'unit/extensions')
_uuid = uuidutils.generate_uuid

View File

@ -5452,3 +5452,63 @@ class NeutronDbPluginV2AsMixinTestCase(testlib_api.SqlTestCase):
self.net_data['network']['status'] = 'BUILD'
net = self.plugin.create_network(self.context, self.net_data)
self.assertEqual(net['status'], 'BUILD')
class TestNetworks(testlib_api.SqlTestCase):
def setUp(self):
super(TestNetworks, self).setUp()
self._tenant_id = 'test-tenant'
# Update the plugin
self.setup_coreplugin(DB_PLUGIN_KLASS)
def _create_network(self, plugin, ctx, shared=True):
network = {'network': {'name': 'net',
'shared': shared,
'admin_state_up': True,
'tenant_id': self._tenant_id}}
created_network = plugin.create_network(ctx, network)
return (network, created_network['id'])
def _create_port(self, plugin, ctx, net_id, device_owner, tenant_id):
port = {'port': {'name': 'port',
'network_id': net_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
'device_id': 'device_id',
'device_owner': device_owner,
'tenant_id': tenant_id}}
plugin.create_port(ctx, port)
def _test_update_shared_net_used(self,
device_owner,
expected_exception=None):
plugin = manager.NeutronManager.get_plugin()
ctx = context.get_admin_context()
network, net_id = self._create_network(plugin, ctx)
self._create_port(plugin,
ctx,
net_id,
device_owner,
self._tenant_id + '1')
network['network']['shared'] = False
if (expected_exception):
with testlib_api.ExpectedException(expected_exception):
plugin.update_network(ctx, net_id, network)
else:
plugin.update_network(ctx, net_id, network)
def test_update_shared_net_used_fails(self):
self._test_update_shared_net_used('', n_exc.InvalidSharedSetting)
def test_update_shared_net_used_as_router_gateway(self):
self._test_update_shared_net_used(
constants.DEVICE_OWNER_ROUTER_GW)
def test_update_shared_net_used_by_floating_ip(self):
self._test_update_shared_net_used(
constants.DEVICE_OWNER_FLOATINGIP)

View File

@ -1,82 +0,0 @@
# Copyright (c) 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron import context
from neutron import manager
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import testlib_api
class TestNetworks(testlib_api.SqlTestCase):
def setUp(self):
super(TestNetworks, self).setUp()
self._tenant_id = 'test-tenant'
# Update the plugin
self.setup_coreplugin(test_db_plugin.DB_PLUGIN_KLASS)
def _create_network(self, plugin, ctx, shared=True):
network = {'network': {'name': 'net',
'shared': shared,
'admin_state_up': True,
'tenant_id': self._tenant_id}}
created_network = plugin.create_network(ctx, network)
return (network, created_network['id'])
def _create_port(self, plugin, ctx, net_id, device_owner, tenant_id):
port = {'port': {'name': 'port',
'network_id': net_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
'device_id': 'device_id',
'device_owner': device_owner,
'tenant_id': tenant_id}}
plugin.create_port(ctx, port)
def _test_update_shared_net_used(self,
device_owner,
expected_exception=None):
plugin = manager.NeutronManager.get_plugin()
ctx = context.get_admin_context()
network, net_id = self._create_network(plugin, ctx)
self._create_port(plugin,
ctx,
net_id,
device_owner,
self._tenant_id + '1')
network['network']['shared'] = False
if (expected_exception):
with testlib_api.ExpectedException(expected_exception):
plugin.update_network(ctx, net_id, network)
else:
plugin.update_network(ctx, net_id, network)
def test_update_shared_net_used_fails(self):
self._test_update_shared_net_used('', n_exc.InvalidSharedSetting)
def test_update_shared_net_used_as_router_gateway(self):
self._test_update_shared_net_used(
constants.DEVICE_OWNER_ROUTER_GW)
def test_update_shared_net_used_by_floating_ip(self):
self._test_update_shared_net_used(
constants.DEVICE_OWNER_FLOATINGIP)

View File

@ -1,154 +0,0 @@
# Copyright 2013 VMware, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for extension extended attribute
"""
from oslo_config import cfg
import webob.exc as webexc
import neutron
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron.common import config
from neutron import manager
from neutron.plugins.common import constants
from neutron.plugins.ml2 import plugin as ml2_plugin
from neutron import quota
from neutron.tests import base
from neutron.tests.unit.extensions import extendedattribute as extattr
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import testlib_api
from neutron import wsgi
_uuid = test_api_v2._uuid
_get_path = test_api_v2._get_path
extensions_path = ':'.join(neutron.tests.unit.extensions.__path__)
class ExtensionExtendedAttributeTestPlugin(
ml2_plugin.Ml2Plugin):
supported_extension_aliases = [
'ext-obj-test', "extended-ext-attr"
]
def __init__(self, configfile=None):
super(ExtensionExtendedAttributeTestPlugin, self)
self.objs = []
self.objh = {}
def create_ext_test_resource(self, context, ext_test_resource):
obj = ext_test_resource['ext_test_resource']
id = _uuid()
obj['id'] = id
self.objs.append(obj)
self.objh.update({id: obj})
return obj
def get_ext_test_resources(self, context, filters=None, fields=None):
return self.objs
def get_ext_test_resource(self, context, id, fields=None):
return self.objh[id]
class ExtensionExtendedAttributeTestCase(base.BaseTestCase):
def setUp(self):
super(ExtensionExtendedAttributeTestCase, self).setUp()
plugin = (
"neutron.tests.unit.test_extension_extended_attribute."
"ExtensionExtendedAttributeTestPlugin"
)
# point config file to: neutron/tests/etc/neutron.conf.test
self.config_parse()
self.setup_coreplugin(plugin)
ext_mgr = extensions.PluginAwareExtensionManager(
extensions_path,
{constants.CORE: ExtensionExtendedAttributeTestPlugin}
)
ext_mgr.extend_resources("2.0", {})
extensions.PluginAwareExtensionManager._instance = ext_mgr
app = config.load_paste_app('extensions_test_app')
self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1"
# Save the global RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
self.saved_attr_map[resource] = attrs.copy()
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
extattr.EXTENDED_ATTRIBUTES_2_0)
self.agentscheduler_dbMinxin = manager.NeutronManager.get_plugin()
self.addCleanup(self.restore_attribute_map)
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
group='QUOTAS')
def restore_attribute_map(self):
# Restore the original RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def _do_request(self, method, path, data=None, params=None, action=None):
content_type = 'application/json'
body = None
if data is not None: # empty dict is valid
body = wsgi.Serializer().serialize(data, content_type)
req = testlib_api.create_request(
path, body, content_type,
method, query_string=params)
res = req.get_response(self._api)
if res.status_code >= 400:
raise webexc.HTTPClientError(detail=res.body, code=res.status_code)
if res.status_code != webexc.HTTPNoContent.code:
return res.json
def _ext_test_resource_create(self, attr=None):
data = {
"ext_test_resource": {
"tenant_id": self._tenant_id,
"name": "test",
extattr.EXTENDED_ATTRIBUTE: attr
}
}
res = self._do_request('POST', _get_path('ext_test_resources'), data)
return res['ext_test_resource']
def test_ext_test_resource_create(self):
ext_test_resource = self._ext_test_resource_create()
attr = _uuid()
ext_test_resource = self._ext_test_resource_create(attr)
self.assertEqual(ext_test_resource[extattr.EXTENDED_ATTRIBUTE], attr)
def test_ext_test_resource_get(self):
attr = _uuid()
obj = self._ext_test_resource_create(attr)
obj_id = obj['id']
res = self._do_request('GET', _get_path(
'ext_test_resources/{0}'.format(obj_id)))
obj2 = res['ext_test_resource']
self.assertEqual(obj2[extattr.EXTENDED_ATTRIBUTE], attr)

View File

@ -16,25 +16,36 @@
import abc
import mock
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
import routes
import webob
import webob.exc as webexc
import webtest
import neutron
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron.common import config
from neutron.common import exceptions
from neutron.db import db_base_plugin_v2
from neutron import manager
from neutron.plugins.common import constants
from neutron.plugins.ml2 import plugin as ml2_plugin
from neutron import quota
from neutron.tests import base
from neutron.tests.unit import extension_stubs as ext_stubs
import neutron.tests.unit.extensions
from neutron.tests.unit.extensions import extendedattribute as extattr
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import testlib_api
from neutron import wsgi
LOG = logging.getLogger(__name__)
_uuid = test_api_v2._uuid
_get_path = test_api_v2._get_path
extensions_path = ':'.join(neutron.tests.unit.extensions.__path__)
@ -677,3 +688,118 @@ class SimpleExtensionManager(object):
if self.request_ext:
request_extensions.append(self.request_ext)
return request_extensions
class ExtensionExtendedAttributeTestPlugin(
ml2_plugin.Ml2Plugin):
supported_extension_aliases = [
'ext-obj-test', "extended-ext-attr"
]
def __init__(self, configfile=None):
super(ExtensionExtendedAttributeTestPlugin, self)
self.objs = []
self.objh = {}
def create_ext_test_resource(self, context, ext_test_resource):
obj = ext_test_resource['ext_test_resource']
id = _uuid()
obj['id'] = id
self.objs.append(obj)
self.objh.update({id: obj})
return obj
def get_ext_test_resources(self, context, filters=None, fields=None):
return self.objs
def get_ext_test_resource(self, context, id, fields=None):
return self.objh[id]
class ExtensionExtendedAttributeTestCase(base.BaseTestCase):
def setUp(self):
super(ExtensionExtendedAttributeTestCase, self).setUp()
plugin = (
"neutron.tests.unit.test_extensions."
"ExtensionExtendedAttributeTestPlugin"
)
# point config file to: neutron/tests/etc/neutron.conf.test
self.config_parse()
self.setup_coreplugin(plugin)
ext_mgr = extensions.PluginAwareExtensionManager(
extensions_path,
{constants.CORE: ExtensionExtendedAttributeTestPlugin}
)
ext_mgr.extend_resources("2.0", {})
extensions.PluginAwareExtensionManager._instance = ext_mgr
app = config.load_paste_app('extensions_test_app')
self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1"
# Save the global RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
self.saved_attr_map[resource] = attrs.copy()
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
extattr.EXTENDED_ATTRIBUTES_2_0)
self.agentscheduler_dbMinxin = manager.NeutronManager.get_plugin()
self.addCleanup(self.restore_attribute_map)
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
group='QUOTAS')
def restore_attribute_map(self):
# Restore the original RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def _do_request(self, method, path, data=None, params=None, action=None):
content_type = 'application/json'
body = None
if data is not None: # empty dict is valid
body = wsgi.Serializer().serialize(data, content_type)
req = testlib_api.create_request(
path, body, content_type,
method, query_string=params)
res = req.get_response(self._api)
if res.status_code >= 400:
raise webexc.HTTPClientError(detail=res.body, code=res.status_code)
if res.status_code != webexc.HTTPNoContent.code:
return res.json
def _ext_test_resource_create(self, attr=None):
data = {
"ext_test_resource": {
"tenant_id": self._tenant_id,
"name": "test",
extattr.EXTENDED_ATTRIBUTE: attr
}
}
res = self._do_request('POST', _get_path('ext_test_resources'), data)
return res['ext_test_resource']
def test_ext_test_resource_create(self):
ext_test_resource = self._ext_test_resource_create()
attr = _uuid()
ext_test_resource = self._ext_test_resource_create(attr)
self.assertEqual(ext_test_resource[extattr.EXTENDED_ATTRIBUTE], attr)
def test_ext_test_resource_get(self):
attr = _uuid()
obj = self._ext_test_resource_create(attr)
obj_id = obj['id']
res = self._do_request('GET', _get_path(
'ext_test_resources/{0}'.format(obj_id)))
obj2 = res['ext_test_resource']
self.assertEqual(obj2[extattr.EXTENDED_ATTRIBUTE], attr)

View File

@ -20,6 +20,83 @@ from neutron.agent.linux import utils
from neutron.tests import base
TEST_UUID = 'test-uuid'
TEST_SERVICE = 'testsvc'
TEST_PID = 1234
class BaseTestProcessMonitor(base.BaseTestCase):
def setUp(self):
super(BaseTestProcessMonitor, self).setUp()
self.log_patch = mock.patch("neutron.agent.linux.external_process."
"LOG.error")
self.error_log = self.log_patch.start()
self.spawn_patch = mock.patch("eventlet.spawn")
self.eventlent_spawn = self.spawn_patch.start()
# create a default process monitor
self.create_child_process_monitor('respawn')
def create_child_process_monitor(self, action):
conf = mock.Mock()
conf.AGENT.check_child_processes_action = action
conf.AGENT.check_child_processes = True
self.pmonitor = ep.ProcessMonitor(
config=conf,
resource_type='test')
def get_monitored_process(self, uuid, service=None):
monitored_process = mock.Mock()
self.pmonitor.register(uuid=uuid,
service_name=service,
monitored_process=monitored_process)
return monitored_process
class TestProcessMonitor(BaseTestProcessMonitor):
def test_error_logged(self):
pm = self.get_monitored_process(TEST_UUID)
pm.active = False
self.pmonitor._check_child_processes()
self.assertTrue(self.error_log.called)
def test_exit_handler(self):
self.create_child_process_monitor('exit')
pm = self.get_monitored_process(TEST_UUID)
pm.active = False
with mock.patch.object(ep.ProcessMonitor,
'_exit_handler') as exit_handler:
self.pmonitor._check_child_processes()
exit_handler.assert_called_once_with(TEST_UUID, None)
def test_register(self):
pm = self.get_monitored_process(TEST_UUID)
self.assertEqual(len(self.pmonitor._monitored_processes), 1)
self.assertIn(pm, self.pmonitor._monitored_processes.values())
def test_register_same_service_twice(self):
self.get_monitored_process(TEST_UUID)
self.get_monitored_process(TEST_UUID)
self.assertEqual(len(self.pmonitor._monitored_processes), 1)
def test_register_different_service_types(self):
self.get_monitored_process(TEST_UUID)
self.get_monitored_process(TEST_UUID, TEST_SERVICE)
self.assertEqual(len(self.pmonitor._monitored_processes), 2)
def test_unregister(self):
self.get_monitored_process(TEST_UUID)
self.pmonitor.unregister(TEST_UUID, None)
self.assertEqual(len(self.pmonitor._monitored_processes), 0)
def test_unregister_unknown_process(self):
self.pmonitor.unregister(TEST_UUID, None)
self.assertEqual(len(self.pmonitor._monitored_processes), 0)
class TestProcessManager(base.BaseTestCase):
def setUp(self):
super(TestProcessManager, self).setUp()