
Currently, DVR router namespaces are created only when there is a valid VM port on the compute node, or for the gateway-port on the service node. But when an LBaaS VIP port is created the l3 agent does not create a DVR namespace to service the VIP port. This fix enables DVR namespaces to be created to service the LBaaS VIP port. Also, this fix enables L2 Agent running in DVR mode, to add-in OVS rules to enable packets to be routed to such LBaaS VIP Ports which are resident on DVR routed interfaces. Therefore, with this fix both East-West and North-South traffic will be serviced by DVR for LBaas VIP Ports. DocImpact Authored-by: Swaminathan Vasudevan <swaminathan.vasudevan@hp.com> Co-Authored-By: Vivekanandan Narasimhan <vivekanandan.narasimhan@hp.com> Change-Id: I698b971d50721fb0512a11569f7d3139d0d456f3 Closes-Bug: #1356464
774 lines
35 KiB
Python
774 lines
35 KiB
Python
# Copyright (c) 2013 OpenStack Foundation
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import contextlib
|
|
import mock
|
|
import testtools
|
|
import uuid
|
|
import webob
|
|
|
|
from neutron.common import constants
|
|
from neutron.common import exceptions as exc
|
|
from neutron.common import utils
|
|
from neutron import context
|
|
from neutron.extensions import multiprovidernet as mpnet
|
|
from neutron.extensions import portbindings
|
|
from neutron.extensions import providernet as pnet
|
|
from neutron import manager
|
|
from neutron.plugins.common import constants as service_constants
|
|
from neutron.plugins.ml2.common import exceptions as ml2_exc
|
|
from neutron.plugins.ml2 import config
|
|
from neutron.plugins.ml2 import db as ml2_db
|
|
from neutron.plugins.ml2 import driver_api
|
|
from neutron.plugins.ml2 import driver_context
|
|
from neutron.plugins.ml2 import plugin as ml2_plugin
|
|
from neutron.tests.unit import _test_extension_portbindings as test_bindings
|
|
from neutron.tests.unit.ml2.drivers import mechanism_logger as mech_logger
|
|
from neutron.tests.unit.ml2.drivers import mechanism_test as mech_test
|
|
from neutron.tests.unit import test_db_plugin as test_plugin
|
|
from neutron.tests.unit import test_extension_allowedaddresspairs as test_pair
|
|
from neutron.tests.unit import test_extension_extradhcpopts as test_dhcpopts
|
|
from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
|
|
|
|
|
|
config.cfg.CONF.import_opt('network_vlan_ranges',
|
|
'neutron.plugins.ml2.drivers.type_vlan',
|
|
group='ml2_type_vlan')
|
|
|
|
|
|
PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
|
|
|
|
|
|
class Ml2PluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
|
|
|
|
_plugin_name = PLUGIN_NAME
|
|
_mechanism_drivers = ['logger', 'test']
|
|
|
|
def setUp(self):
|
|
# We need a L3 service plugin
|
|
l3_plugin = ('neutron.tests.unit.test_l3_plugin.'
|
|
'TestL3NatServicePlugin')
|
|
service_plugins = {'l3_plugin_name': l3_plugin}
|
|
# Enable the test mechanism driver to ensure that
|
|
# we can successfully call through to all mechanism
|
|
# driver apis.
|
|
config.cfg.CONF.set_override('mechanism_drivers',
|
|
self._mechanism_drivers,
|
|
group='ml2')
|
|
self.physnet = 'physnet1'
|
|
self.vlan_range = '1:100'
|
|
self.phys_vrange = ':'.join([self.physnet, self.vlan_range])
|
|
config.cfg.CONF.set_override('network_vlan_ranges', [self.phys_vrange],
|
|
group='ml2_type_vlan')
|
|
super(Ml2PluginV2TestCase, self).setUp(PLUGIN_NAME,
|
|
service_plugins=service_plugins)
|
|
self.port_create_status = 'DOWN'
|
|
self.driver = ml2_plugin.Ml2Plugin()
|
|
self.context = context.get_admin_context()
|
|
|
|
|
|
class TestMl2BulkToggleWithBulkless(Ml2PluginV2TestCase):
|
|
|
|
_mechanism_drivers = ['logger', 'test', 'bulkless']
|
|
|
|
def test_bulk_disable_with_bulkless_driver(self):
|
|
self.assertTrue(self._skip_native_bulk)
|
|
|
|
|
|
class TestMl2BulkToggleWithoutBulkless(Ml2PluginV2TestCase):
|
|
|
|
_mechanism_drivers = ['logger', 'test']
|
|
|
|
def test_bulk_enabled_with_bulk_drivers(self):
|
|
self.assertFalse(self._skip_native_bulk)
|
|
|
|
|
|
class TestMl2BasicGet(test_plugin.TestBasicGet,
|
|
Ml2PluginV2TestCase):
|
|
pass
|
|
|
|
|
|
class TestMl2V2HTTPResponse(test_plugin.TestV2HTTPResponse,
|
|
Ml2PluginV2TestCase):
|
|
pass
|
|
|
|
|
|
class TestMl2NetworksV2(test_plugin.TestNetworksV2,
|
|
Ml2PluginV2TestCase):
|
|
pass
|
|
|
|
|
|
class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
|
|
Ml2PluginV2TestCase):
|
|
pass
|
|
|
|
|
|
class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
|
|
|
|
def test_update_port_status_build(self):
|
|
with self.port() as port:
|
|
self.assertEqual('DOWN', port['port']['status'])
|
|
self.assertEqual('DOWN', self.port_create_status)
|
|
|
|
def test_update_non_existent_port(self):
|
|
ctx = context.get_admin_context()
|
|
plugin = manager.NeutronManager.get_plugin()
|
|
data = {'port': {'admin_state_up': False}}
|
|
self.assertRaises(exc.PortNotFound, plugin.update_port, ctx,
|
|
'invalid-uuid', data)
|
|
|
|
def test_delete_non_existent_port(self):
|
|
ctx = context.get_admin_context()
|
|
plugin = manager.NeutronManager.get_plugin()
|
|
with mock.patch.object(ml2_plugin.LOG, 'debug') as log_debug:
|
|
plugin.delete_port(ctx, 'invalid-uuid', l3_port_check=False)
|
|
log_debug.assert_has_calls([
|
|
mock.call(_("Deleting port %s"), 'invalid-uuid'),
|
|
mock.call(_("The port '%s' was deleted"), 'invalid-uuid')
|
|
])
|
|
|
|
def test_delete_port_no_notify_in_disassociate_floatingips(self):
|
|
ctx = context.get_admin_context()
|
|
plugin = manager.NeutronManager.get_plugin()
|
|
l3plugin = manager.NeutronManager.get_service_plugins().get(
|
|
service_constants.L3_ROUTER_NAT)
|
|
with contextlib.nested(
|
|
self.port(do_delete=False),
|
|
mock.patch.object(l3plugin, 'disassociate_floatingips'),
|
|
mock.patch.object(l3plugin, 'notify_routers_updated')
|
|
) as (port, disassociate_floatingips, notify):
|
|
|
|
port_id = port['port']['id']
|
|
plugin.delete_port(ctx, port_id)
|
|
|
|
# check that no notification was requested while under
|
|
# transaction
|
|
disassociate_floatingips.assert_has_calls([
|
|
mock.call(ctx, port_id, do_notify=False)
|
|
])
|
|
|
|
# check that notifier was still triggered
|
|
notify.assert_has_calls([
|
|
mock.call(ctx, disassociate_floatingips.return_value)
|
|
])
|
|
|
|
def test_check_if_compute_port_serviced_by_dvr(self):
|
|
self.assertTrue(utils.is_dvr_serviced('compute:None'))
|
|
|
|
def test_check_if_lbaas_vip_port_serviced_by_dvr(self):
|
|
self.assertTrue(utils.is_dvr_serviced(
|
|
constants.DEVICE_OWNER_LOADBALANCER))
|
|
|
|
def test_check_if_port_not_serviced_by_dvr(self):
|
|
self.assertFalse(utils.is_dvr_serviced(
|
|
constants.DEVICE_OWNER_ROUTER_INTF))
|
|
|
|
def test_disassociate_floatingips_do_notify_returns_nothing(self):
|
|
ctx = context.get_admin_context()
|
|
l3plugin = manager.NeutronManager.get_service_plugins().get(
|
|
service_constants.L3_ROUTER_NAT)
|
|
with self.port() as port:
|
|
|
|
port_id = port['port']['id']
|
|
# check that nothing is returned when notifications are handled
|
|
# by the called method
|
|
self.assertIsNone(l3plugin.disassociate_floatingips(ctx, port_id))
|
|
|
|
|
|
class TestMl2DvrPortsV2(TestMl2PortsV2):
|
|
def setUp(self):
|
|
super(TestMl2DvrPortsV2, self).setUp()
|
|
extensions = ['router',
|
|
constants.L3_AGENT_SCHEDULER_EXT_ALIAS,
|
|
constants.L3_DISTRIBUTED_EXT_ALIAS]
|
|
self.plugin = manager.NeutronManager.get_plugin()
|
|
self.l3plugin = mock.Mock()
|
|
type(self.l3plugin).supported_extension_aliases = (
|
|
mock.PropertyMock(return_value=extensions))
|
|
self.service_plugins = {'L3_ROUTER_NAT': self.l3plugin}
|
|
|
|
def test_delete_last_vm_port(self):
|
|
fip_set = set()
|
|
ns_to_delete = {'host': 'vmhost', 'agent_id': 'vm_l3_agent',
|
|
'router_id': 'my_router'}
|
|
|
|
with contextlib.nested(
|
|
mock.patch.object(manager.NeutronManager,
|
|
'get_service_plugins',
|
|
return_value=self.service_plugins),
|
|
self.port(do_delete=False, device_owner='compute:None'),
|
|
mock.patch.object(self.l3plugin, 'notify_routers_updated'),
|
|
mock.patch.object(self.l3plugin, 'disassociate_floatingips',
|
|
return_value=fip_set),
|
|
mock.patch.object(self.l3plugin, 'dvr_deletens_if_no_vm',
|
|
return_value=[ns_to_delete]),
|
|
mock.patch.object(self.l3plugin, 'remove_router_from_l3_agent')
|
|
) as (get_service_plugin, port, notify, disassociate_floatingips,
|
|
ddinv, remove_router_from_l3_agent):
|
|
|
|
port_id = port['port']['id']
|
|
self.plugin.delete_port(self.context, port_id)
|
|
|
|
notify.assert_has_calls([mock.call(self.context, fip_set)])
|
|
remove_router_from_l3_agent.assert_has_calls([
|
|
mock.call(self.context, ns_to_delete['agent_id'],
|
|
ns_to_delete['router_id'])
|
|
])
|
|
|
|
def test_delete_last_vm_port_with_floatingip(self):
|
|
ns_to_delete = {'host': 'vmhost', 'agent_id': 'vm_l3_agent',
|
|
'router_id': 'my_router'}
|
|
fip_set = set([ns_to_delete['router_id']])
|
|
|
|
with contextlib.nested(
|
|
mock.patch.object(manager.NeutronManager,
|
|
'get_service_plugins',
|
|
return_value=self.service_plugins),
|
|
self.port(do_delete=False, device_owner='compute:None'),
|
|
mock.patch.object(self.l3plugin, 'notify_routers_updated'),
|
|
mock.patch.object(self.l3plugin, 'disassociate_floatingips',
|
|
return_value=fip_set),
|
|
mock.patch.object(self.l3plugin, 'dvr_deletens_if_no_vm',
|
|
return_value=[ns_to_delete]),
|
|
mock.patch.object(self.l3plugin, 'remove_router_from_l3_agent')
|
|
) as (get_service_plugins, port, notify, disassociate_floatingips,
|
|
ddinv, remove_router_from_l3_agent):
|
|
|
|
port_id = port['port']['id']
|
|
self.plugin.delete_port(self.context, port_id)
|
|
|
|
notify.assert_has_calls([mock.call(self.context, fip_set)])
|
|
remove_router_from_l3_agent.assert_has_calls([
|
|
mock.call(self.context, ns_to_delete['agent_id'],
|
|
ns_to_delete['router_id'])
|
|
])
|
|
|
|
|
|
class TestMl2PortBinding(Ml2PluginV2TestCase,
|
|
test_bindings.PortBindingsTestCase):
|
|
# Test case does not set binding:host_id, so ml2 does not attempt
|
|
# to bind port
|
|
VIF_TYPE = portbindings.VIF_TYPE_UNBOUND
|
|
HAS_PORT_FILTER = False
|
|
ENABLE_SG = True
|
|
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_HYBRID_DRIVER
|
|
|
|
def setUp(self, firewall_driver=None):
|
|
test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
|
|
config.cfg.CONF.set_override(
|
|
'enable_security_group', self.ENABLE_SG,
|
|
group='SECURITYGROUP')
|
|
super(TestMl2PortBinding, self).setUp()
|
|
|
|
def _check_port_binding_profile(self, port, profile=None):
|
|
self.assertIn('id', port)
|
|
self.assertIn(portbindings.PROFILE, port)
|
|
value = port[portbindings.PROFILE]
|
|
self.assertEqual(profile or {}, value)
|
|
|
|
def test_create_port_binding_profile(self):
|
|
self._test_create_port_binding_profile({'a': 1, 'b': 2})
|
|
|
|
def test_update_port_binding_profile(self):
|
|
self._test_update_port_binding_profile({'c': 3})
|
|
|
|
def test_create_port_binding_profile_too_big(self):
|
|
s = 'x' * 5000
|
|
profile_arg = {portbindings.PROFILE: {'d': s}}
|
|
try:
|
|
with self.port(expected_res_status=400,
|
|
arg_list=(portbindings.PROFILE,),
|
|
**profile_arg):
|
|
pass
|
|
except webob.exc.HTTPClientError:
|
|
pass
|
|
|
|
def test_remove_port_binding_profile(self):
|
|
profile = {'e': 5}
|
|
profile_arg = {portbindings.PROFILE: profile}
|
|
with self.port(arg_list=(portbindings.PROFILE,),
|
|
**profile_arg) as port:
|
|
self._check_port_binding_profile(port['port'], profile)
|
|
port_id = port['port']['id']
|
|
profile_arg = {portbindings.PROFILE: None}
|
|
port = self._update('ports', port_id,
|
|
{'port': profile_arg})['port']
|
|
self._check_port_binding_profile(port)
|
|
port = self._show('ports', port_id)['port']
|
|
self._check_port_binding_profile(port)
|
|
|
|
def test_return_on_concurrent_delete_and_binding(self):
|
|
# create a port and delete it so we have an expired mechanism context
|
|
with self.port() as port:
|
|
plugin = manager.NeutronManager.get_plugin()
|
|
binding = ml2_db.get_locked_port_and_binding(self.context.session,
|
|
port['port']['id'])[1]
|
|
binding['host'] = 'test'
|
|
mech_context = driver_context.PortContext(
|
|
plugin, self.context, port['port'],
|
|
plugin.get_network(self.context, port['port']['network_id']),
|
|
binding)
|
|
with contextlib.nested(
|
|
mock.patch('neutron.plugins.ml2.plugin.'
|
|
'db.get_locked_port_and_binding',
|
|
return_value=(None, None)),
|
|
mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin._make_port_dict')
|
|
) as (glpab_mock, mpd_mock):
|
|
plugin._bind_port_if_needed(mech_context)
|
|
# called during deletion to get port
|
|
self.assertTrue(glpab_mock.mock_calls)
|
|
# should have returned before calling _make_port_dict
|
|
self.assertFalse(mpd_mock.mock_calls)
|
|
|
|
def test_port_binding_profile_not_changed(self):
|
|
profile = {'e': 5}
|
|
profile_arg = {portbindings.PROFILE: profile}
|
|
with self.port(arg_list=(portbindings.PROFILE,),
|
|
**profile_arg) as port:
|
|
self._check_port_binding_profile(port['port'], profile)
|
|
port_id = port['port']['id']
|
|
state_arg = {'admin_state_up': True}
|
|
port = self._update('ports', port_id,
|
|
{'port': state_arg})['port']
|
|
self._check_port_binding_profile(port, profile)
|
|
port = self._show('ports', port_id)['port']
|
|
self._check_port_binding_profile(port, profile)
|
|
|
|
|
|
class TestMl2PortBindingNoSG(TestMl2PortBinding):
|
|
HAS_PORT_FILTER = False
|
|
ENABLE_SG = False
|
|
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
|
|
|
|
|
|
class TestMl2PortBindingHost(Ml2PluginV2TestCase,
|
|
test_bindings.PortBindingsHostTestCaseMixin):
|
|
pass
|
|
|
|
|
|
class TestMl2PortBindingVnicType(Ml2PluginV2TestCase,
|
|
test_bindings.PortBindingsVnicTestCaseMixin):
|
|
pass
|
|
|
|
|
|
class TestMultiSegmentNetworks(Ml2PluginV2TestCase):
|
|
|
|
def setUp(self, plugin=None):
|
|
super(TestMultiSegmentNetworks, self).setUp()
|
|
|
|
def test_create_network_provider(self):
|
|
data = {'network': {'name': 'net1',
|
|
pnet.NETWORK_TYPE: 'vlan',
|
|
pnet.PHYSICAL_NETWORK: 'physnet1',
|
|
pnet.SEGMENTATION_ID: 1,
|
|
'tenant_id': 'tenant_one'}}
|
|
network_req = self.new_create_request('networks', data)
|
|
network = self.deserialize(self.fmt,
|
|
network_req.get_response(self.api))
|
|
self.assertEqual('vlan', network['network'][pnet.NETWORK_TYPE])
|
|
self.assertEqual('physnet1', network['network'][pnet.PHYSICAL_NETWORK])
|
|
self.assertEqual(1, network['network'][pnet.SEGMENTATION_ID])
|
|
self.assertNotIn(mpnet.SEGMENTS, network['network'])
|
|
|
|
def test_create_network_single_multiprovider(self):
|
|
data = {'network': {'name': 'net1',
|
|
mpnet.SEGMENTS:
|
|
[{pnet.NETWORK_TYPE: 'vlan',
|
|
pnet.PHYSICAL_NETWORK: 'physnet1',
|
|
pnet.SEGMENTATION_ID: 1}],
|
|
'tenant_id': 'tenant_one'}}
|
|
net_req = self.new_create_request('networks', data)
|
|
network = self.deserialize(self.fmt, net_req.get_response(self.api))
|
|
self.assertEqual('vlan', network['network'][pnet.NETWORK_TYPE])
|
|
self.assertEqual('physnet1', network['network'][pnet.PHYSICAL_NETWORK])
|
|
self.assertEqual(1, network['network'][pnet.SEGMENTATION_ID])
|
|
self.assertNotIn(mpnet.SEGMENTS, network['network'])
|
|
|
|
# Tests get_network()
|
|
net_req = self.new_show_request('networks', network['network']['id'])
|
|
network = self.deserialize(self.fmt, net_req.get_response(self.api))
|
|
self.assertEqual('vlan', network['network'][pnet.NETWORK_TYPE])
|
|
self.assertEqual('physnet1', network['network'][pnet.PHYSICAL_NETWORK])
|
|
self.assertEqual(1, network['network'][pnet.SEGMENTATION_ID])
|
|
self.assertNotIn(mpnet.SEGMENTS, network['network'])
|
|
|
|
def test_create_network_multiprovider(self):
|
|
data = {'network': {'name': 'net1',
|
|
mpnet.SEGMENTS:
|
|
[{pnet.NETWORK_TYPE: 'vlan',
|
|
pnet.PHYSICAL_NETWORK: 'physnet1',
|
|
pnet.SEGMENTATION_ID: 1},
|
|
{pnet.NETWORK_TYPE: 'vlan',
|
|
pnet.PHYSICAL_NETWORK: 'physnet1',
|
|
pnet.SEGMENTATION_ID: 2}],
|
|
'tenant_id': 'tenant_one'}}
|
|
network_req = self.new_create_request('networks', data)
|
|
network = self.deserialize(self.fmt,
|
|
network_req.get_response(self.api))
|
|
tz = network['network'][mpnet.SEGMENTS]
|
|
for tz in data['network'][mpnet.SEGMENTS]:
|
|
for field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
|
|
pnet.SEGMENTATION_ID]:
|
|
self.assertEqual(tz.get(field), tz.get(field))
|
|
|
|
# Tests get_network()
|
|
net_req = self.new_show_request('networks', network['network']['id'])
|
|
network = self.deserialize(self.fmt, net_req.get_response(self.api))
|
|
tz = network['network'][mpnet.SEGMENTS]
|
|
for tz in data['network'][mpnet.SEGMENTS]:
|
|
for field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
|
|
pnet.SEGMENTATION_ID]:
|
|
self.assertEqual(tz.get(field), tz.get(field))
|
|
|
|
def test_create_network_with_provider_and_multiprovider_fail(self):
|
|
data = {'network': {'name': 'net1',
|
|
mpnet.SEGMENTS:
|
|
[{pnet.NETWORK_TYPE: 'vlan',
|
|
pnet.PHYSICAL_NETWORK: 'physnet1',
|
|
pnet.SEGMENTATION_ID: 1}],
|
|
pnet.NETWORK_TYPE: 'vlan',
|
|
pnet.PHYSICAL_NETWORK: 'physnet1',
|
|
pnet.SEGMENTATION_ID: 1,
|
|
'tenant_id': 'tenant_one'}}
|
|
|
|
network_req = self.new_create_request('networks', data)
|
|
res = network_req.get_response(self.api)
|
|
self.assertEqual(400, res.status_int)
|
|
|
|
def test_create_network_duplicate_full_segments(self):
|
|
data = {'network': {'name': 'net1',
|
|
mpnet.SEGMENTS:
|
|
[{pnet.NETWORK_TYPE: 'vlan',
|
|
pnet.PHYSICAL_NETWORK: 'physnet1',
|
|
pnet.SEGMENTATION_ID: 1},
|
|
{pnet.NETWORK_TYPE: 'vlan',
|
|
pnet.PHYSICAL_NETWORK: 'physnet1',
|
|
pnet.SEGMENTATION_ID: 1}],
|
|
'tenant_id': 'tenant_one'}}
|
|
network_req = self.new_create_request('networks', data)
|
|
res = network_req.get_response(self.api)
|
|
self.assertEqual(400, res.status_int)
|
|
|
|
def test_create_network_duplicate_partial_segments(self):
|
|
data = {'network': {'name': 'net1',
|
|
mpnet.SEGMENTS:
|
|
[{pnet.NETWORK_TYPE: 'vlan',
|
|
pnet.PHYSICAL_NETWORK: 'physnet1'},
|
|
{pnet.NETWORK_TYPE: 'vlan',
|
|
pnet.PHYSICAL_NETWORK: 'physnet1'}],
|
|
'tenant_id': 'tenant_one'}}
|
|
network_req = self.new_create_request('networks', data)
|
|
res = network_req.get_response(self.api)
|
|
self.assertEqual(201, res.status_int)
|
|
|
|
def test_release_segment_no_type_driver(self):
|
|
segment = {driver_api.NETWORK_TYPE: 'faketype',
|
|
driver_api.PHYSICAL_NETWORK: 'physnet1',
|
|
driver_api.ID: 1}
|
|
with mock.patch('neutron.plugins.ml2.managers.LOG') as log:
|
|
self.driver.type_manager.release_segment(session=None,
|
|
segment=segment)
|
|
log.error.assert_called_once_with(
|
|
"Failed to release segment '%s' because "
|
|
"network type is not supported.", segment)
|
|
|
|
def test_create_provider_fail(self):
|
|
segment = {pnet.NETWORK_TYPE: None,
|
|
pnet.PHYSICAL_NETWORK: 'phys_net',
|
|
pnet.SEGMENTATION_ID: None}
|
|
with testtools.ExpectedException(exc.InvalidInput):
|
|
self.driver.type_manager._process_provider_create(segment)
|
|
|
|
def test_create_network_plugin(self):
|
|
data = {'network': {'name': 'net1',
|
|
'admin_state_up': True,
|
|
'shared': False,
|
|
pnet.NETWORK_TYPE: 'vlan',
|
|
pnet.PHYSICAL_NETWORK: 'physnet1',
|
|
pnet.SEGMENTATION_ID: 1,
|
|
'tenant_id': 'tenant_one'}}
|
|
|
|
def raise_mechanism_exc(*args, **kwargs):
|
|
raise ml2_exc.MechanismDriverError(
|
|
method='create_network_postcommit')
|
|
|
|
with mock.patch('neutron.plugins.ml2.managers.MechanismManager.'
|
|
'create_network_precommit', new=raise_mechanism_exc):
|
|
with testtools.ExpectedException(ml2_exc.MechanismDriverError):
|
|
self.driver.create_network(self.context, data)
|
|
|
|
def test_extend_dictionary_no_segments(self):
|
|
network = dict(name='net_no_segment', id='5', tenant_id='tenant_one')
|
|
self.driver.type_manager._extend_network_dict_provider(self.context,
|
|
network)
|
|
self.assertIsNone(network[pnet.NETWORK_TYPE])
|
|
self.assertIsNone(network[pnet.PHYSICAL_NETWORK])
|
|
self.assertIsNone(network[pnet.SEGMENTATION_ID])
|
|
|
|
|
|
class TestMl2AllowedAddressPairs(Ml2PluginV2TestCase,
|
|
test_pair.TestAllowedAddressPairs):
|
|
def setUp(self, plugin=None):
|
|
super(test_pair.TestAllowedAddressPairs, self).setUp(
|
|
plugin=PLUGIN_NAME)
|
|
|
|
|
|
class DHCPOptsTestCase(test_dhcpopts.TestExtraDhcpOpt):
|
|
|
|
def setUp(self, plugin=None):
|
|
super(test_dhcpopts.ExtraDhcpOptDBTestCase, self).setUp(
|
|
plugin=PLUGIN_NAME)
|
|
|
|
|
|
class Ml2PluginV2FaultyDriverTestCase(test_plugin.NeutronDbPluginV2TestCase):
|
|
|
|
def setUp(self):
|
|
# Enable the test mechanism driver to ensure that
|
|
# we can successfully call through to all mechanism
|
|
# driver apis.
|
|
config.cfg.CONF.set_override('mechanism_drivers',
|
|
['test', 'logger'],
|
|
group='ml2')
|
|
super(Ml2PluginV2FaultyDriverTestCase, self).setUp(PLUGIN_NAME)
|
|
self.port_create_status = 'DOWN'
|
|
|
|
|
|
class TestFaultyMechansimDriver(Ml2PluginV2FaultyDriverTestCase):
|
|
|
|
def test_create_network_faulty(self):
|
|
|
|
with mock.patch.object(mech_test.TestMechanismDriver,
|
|
'create_network_postcommit',
|
|
side_effect=ml2_exc.MechanismDriverError):
|
|
tenant_id = str(uuid.uuid4())
|
|
data = {'network': {'name': 'net1',
|
|
'tenant_id': tenant_id}}
|
|
req = self.new_create_request('networks', data)
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(500, res.status_int)
|
|
error = self.deserialize(self.fmt, res)
|
|
self.assertEqual('MechanismDriverError',
|
|
error['NeutronError']['type'])
|
|
query_params = "tenant_id=%s" % tenant_id
|
|
nets = self._list('networks', query_params=query_params)
|
|
self.assertFalse(nets['networks'])
|
|
|
|
def test_delete_network_faulty(self):
|
|
|
|
with mock.patch.object(mech_test.TestMechanismDriver,
|
|
'delete_network_postcommit',
|
|
side_effect=ml2_exc.MechanismDriverError):
|
|
with mock.patch.object(mech_logger.LoggerMechanismDriver,
|
|
'delete_network_postcommit') as dnp:
|
|
|
|
data = {'network': {'name': 'net1',
|
|
'tenant_id': 'tenant_one'}}
|
|
network_req = self.new_create_request('networks', data)
|
|
network_res = network_req.get_response(self.api)
|
|
self.assertEqual(201, network_res.status_int)
|
|
network = self.deserialize(self.fmt, network_res)
|
|
net_id = network['network']['id']
|
|
req = self.new_delete_request('networks', net_id)
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(204, res.status_int)
|
|
# Test if other mechanism driver was called
|
|
self.assertTrue(dnp.called)
|
|
self._show('networks', net_id,
|
|
expected_code=webob.exc.HTTPNotFound.code)
|
|
|
|
def test_update_network_faulty(self):
|
|
|
|
with mock.patch.object(mech_test.TestMechanismDriver,
|
|
'update_network_postcommit',
|
|
side_effect=ml2_exc.MechanismDriverError):
|
|
with mock.patch.object(mech_logger.LoggerMechanismDriver,
|
|
'update_network_postcommit') as unp:
|
|
|
|
data = {'network': {'name': 'net1',
|
|
'tenant_id': 'tenant_one'}}
|
|
network_req = self.new_create_request('networks', data)
|
|
network_res = network_req.get_response(self.api)
|
|
self.assertEqual(201, network_res.status_int)
|
|
network = self.deserialize(self.fmt, network_res)
|
|
net_id = network['network']['id']
|
|
|
|
new_name = 'a_brand_new_name'
|
|
data = {'network': {'name': new_name}}
|
|
req = self.new_update_request('networks', data, net_id)
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(500, res.status_int)
|
|
error = self.deserialize(self.fmt, res)
|
|
self.assertEqual('MechanismDriverError',
|
|
error['NeutronError']['type'])
|
|
# Test if other mechanism driver was called
|
|
self.assertTrue(unp.called)
|
|
net = self._show('networks', net_id)
|
|
self.assertEqual(new_name, net['network']['name'])
|
|
|
|
self._delete('networks', net_id)
|
|
|
|
def test_create_subnet_faulty(self):
|
|
|
|
with mock.patch.object(mech_test.TestMechanismDriver,
|
|
'create_subnet_postcommit',
|
|
side_effect=ml2_exc.MechanismDriverError):
|
|
|
|
with self.network() as network:
|
|
net_id = network['network']['id']
|
|
data = {'subnet': {'network_id': net_id,
|
|
'cidr': '10.0.20.0/24',
|
|
'ip_version': '4',
|
|
'name': 'subnet1',
|
|
'tenant_id':
|
|
network['network']['tenant_id'],
|
|
'gateway_ip': '10.0.20.1'}}
|
|
req = self.new_create_request('subnets', data)
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(500, res.status_int)
|
|
error = self.deserialize(self.fmt, res)
|
|
self.assertEqual('MechanismDriverError',
|
|
error['NeutronError']['type'])
|
|
query_params = "network_id=%s" % net_id
|
|
subnets = self._list('subnets', query_params=query_params)
|
|
self.assertFalse(subnets['subnets'])
|
|
|
|
def test_delete_subnet_faulty(self):
|
|
|
|
with mock.patch.object(mech_test.TestMechanismDriver,
|
|
'delete_subnet_postcommit',
|
|
side_effect=ml2_exc.MechanismDriverError):
|
|
with mock.patch.object(mech_logger.LoggerMechanismDriver,
|
|
'delete_subnet_postcommit') as dsp:
|
|
|
|
with self.network() as network:
|
|
data = {'subnet': {'network_id':
|
|
network['network']['id'],
|
|
'cidr': '10.0.20.0/24',
|
|
'ip_version': '4',
|
|
'name': 'subnet1',
|
|
'tenant_id':
|
|
network['network']['tenant_id'],
|
|
'gateway_ip': '10.0.20.1'}}
|
|
subnet_req = self.new_create_request('subnets', data)
|
|
subnet_res = subnet_req.get_response(self.api)
|
|
self.assertEqual(201, subnet_res.status_int)
|
|
subnet = self.deserialize(self.fmt, subnet_res)
|
|
subnet_id = subnet['subnet']['id']
|
|
|
|
req = self.new_delete_request('subnets', subnet_id)
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(204, res.status_int)
|
|
# Test if other mechanism driver was called
|
|
self.assertTrue(dsp.called)
|
|
self._show('subnets', subnet_id,
|
|
expected_code=webob.exc.HTTPNotFound.code)
|
|
|
|
def test_update_subnet_faulty(self):
|
|
|
|
with mock.patch.object(mech_test.TestMechanismDriver,
|
|
'update_subnet_postcommit',
|
|
side_effect=ml2_exc.MechanismDriverError):
|
|
with mock.patch.object(mech_logger.LoggerMechanismDriver,
|
|
'update_subnet_postcommit') as usp:
|
|
|
|
with self.network() as network:
|
|
data = {'subnet': {'network_id':
|
|
network['network']['id'],
|
|
'cidr': '10.0.20.0/24',
|
|
'ip_version': '4',
|
|
'name': 'subnet1',
|
|
'tenant_id':
|
|
network['network']['tenant_id'],
|
|
'gateway_ip': '10.0.20.1'}}
|
|
subnet_req = self.new_create_request('subnets', data)
|
|
subnet_res = subnet_req.get_response(self.api)
|
|
self.assertEqual(201, subnet_res.status_int)
|
|
subnet = self.deserialize(self.fmt, subnet_res)
|
|
subnet_id = subnet['subnet']['id']
|
|
new_name = 'a_brand_new_name'
|
|
data = {'subnet': {'name': new_name}}
|
|
req = self.new_update_request('subnets', data, subnet_id)
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(500, res.status_int)
|
|
error = self.deserialize(self.fmt, res)
|
|
self.assertEqual('MechanismDriverError',
|
|
error['NeutronError']['type'])
|
|
# Test if other mechanism driver was called
|
|
self.assertTrue(usp.called)
|
|
subnet = self._show('subnets', subnet_id)
|
|
self.assertEqual(new_name, subnet['subnet']['name'])
|
|
|
|
self._delete('subnets', subnet['subnet']['id'])
|
|
|
|
def test_create_port_faulty(self):
|
|
|
|
with mock.patch.object(mech_test.TestMechanismDriver,
|
|
'create_port_postcommit',
|
|
side_effect=ml2_exc.MechanismDriverError):
|
|
|
|
with self.network() as network:
|
|
net_id = network['network']['id']
|
|
data = {'port': {'network_id': net_id,
|
|
'tenant_id':
|
|
network['network']['tenant_id'],
|
|
'name': 'port1',
|
|
'admin_state_up': 1,
|
|
'fixed_ips': []}}
|
|
req = self.new_create_request('ports', data)
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(500, res.status_int)
|
|
error = self.deserialize(self.fmt, res)
|
|
self.assertEqual('MechanismDriverError',
|
|
error['NeutronError']['type'])
|
|
query_params = "network_id=%s" % net_id
|
|
ports = self._list('ports', query_params=query_params)
|
|
self.assertFalse(ports['ports'])
|
|
|
|
def test_update_port_faulty(self):
|
|
|
|
with mock.patch.object(mech_test.TestMechanismDriver,
|
|
'update_port_postcommit',
|
|
side_effect=ml2_exc.MechanismDriverError):
|
|
with mock.patch.object(mech_logger.LoggerMechanismDriver,
|
|
'update_port_postcommit') as upp:
|
|
|
|
with self.network() as network:
|
|
data = {'port': {'network_id': network['network']['id'],
|
|
'tenant_id':
|
|
network['network']['tenant_id'],
|
|
'name': 'port1',
|
|
'admin_state_up': 1,
|
|
'fixed_ips': []}}
|
|
port_req = self.new_create_request('ports', data)
|
|
port_res = port_req.get_response(self.api)
|
|
self.assertEqual(201, port_res.status_int)
|
|
port = self.deserialize(self.fmt, port_res)
|
|
port_id = port['port']['id']
|
|
|
|
new_name = 'a_brand_new_name'
|
|
data = {'port': {'name': new_name}}
|
|
req = self.new_update_request('ports', data, port_id)
|
|
res = req.get_response(self.api)
|
|
self.assertEqual(500, res.status_int)
|
|
error = self.deserialize(self.fmt, res)
|
|
self.assertEqual('MechanismDriverError',
|
|
error['NeutronError']['type'])
|
|
# Test if other mechanism driver was called
|
|
self.assertTrue(upp.called)
|
|
port = self._show('ports', port_id)
|
|
self.assertEqual(new_name, port['port']['name'])
|
|
|
|
self._delete('ports', port['port']['id'])
|