OpenStack Networking (Neutron)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

4682 lines
228 KiB

# Copyright 2012 VMware, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import contextlib
import copy
import mock
import netaddr
from neutron_lib.api.definitions import external_net as extnet_apidef
from neutron_lib.api.definitions import l3 as l3_apidef
from neutron_lib.api.definitions import portbindings
from neutron_lib.callbacks import events
from neutron_lib.callbacks import exceptions
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as lib_constants
from neutron_lib import context
from neutron_lib.db import resource_extend
from neutron_lib import exceptions as n_exc
from neutron_lib.exceptions import l3 as l3_exc
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
from neutron_lib.tests.unit import fake_notifier
from oslo_config import cfg
from oslo_utils import importutils
from oslo_utils import uuidutils
from sqlalchemy import orm
import testtools
from webob import exc
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
from neutron.db import common_db_mixin
from neutron.db import db_base_plugin_v2
from neutron.db import dns_db
from neutron.db import external_net_db
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_attrs_db
from neutron.db import l3_db
from neutron.db import l3_dvr_db
from neutron.db import l3_dvrscheduler_db
from neutron.db import l3_hamode_db
from neutron.db.models import l3 as l3_models
from neutron.db import models_v2
from neutron.extensions import l3
from neutron.services.revisions import revision_plugin
from neutron.tests import base
from neutron.tests.common import helpers
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.api.v2 import test_base
from neutron.tests.unit.db import test_db_base_plugin_v2
from neutron.tests.unit.extensions import base as test_extensions_base
from neutron.tests.unit.extensions import test_agent
from neutron.tests.unit.plugins.ml2 import base as ml2_base
from neutron.tests.unit import testlib_api
_uuid = uuidutils.generate_uuid
_get_path = test_base._get_path
DEVICE_OWNER_COMPUTE = lib_constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
class L3TestExtensionManager(object):
def get_resources(self):
return l3.L3.get_resources()
def get_actions(self):
return []
def get_request_extensions(self):
return []
class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
fmt = 'json'
def setUp(self):
super(L3NatExtensionTestCase, self).setUp()
self.setup_extension(
'neutron.services.l3_router.l3_router_plugin.L3RouterPlugin',
plugin_constants.L3, l3.L3, '', allow_pagination=True,
allow_sorting=True, supported_extension_aliases=['router'],
use_quota=True)
def test_router_create(self):
router_id = _uuid()
tenant_id = _uuid()
data = {'router': {'name': 'router1', 'admin_state_up': True,
'tenant_id': tenant_id, 'project_id': tenant_id,
'external_gateway_info': None}}
return_value = copy.deepcopy(data['router'])
return_value.update({'status': "ACTIVE", 'id': router_id})
instance = self.plugin.return_value
instance.create_router.return_value = return_value
instance.get_routers_count.return_value = 0
res = self.api.post(_get_path('routers', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt)
instance.create_router.assert_called_with(mock.ANY,
router=data)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self.deserialize(res)
self.assertIn('router', res)
router = res['router']
self.assertEqual(router_id, router['id'])
self.assertEqual("ACTIVE", router['status'])
self.assertTrue(router['admin_state_up'])
def test_router_list(self):
router_id = _uuid()
return_value = [{'name': 'router1', 'admin_state_up': True,
'tenant_id': _uuid(), 'id': router_id}]
instance = self.plugin.return_value
instance.get_routers.return_value = return_value
res = self.api.get(_get_path('routers', fmt=self.fmt))
instance.get_routers.assert_called_with(mock.ANY, fields=mock.ANY,
filters=mock.ANY,
sorts=mock.ANY,
limit=mock.ANY,
marker=mock.ANY,
page_reverse=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('routers', res)
self.assertEqual(1, len(res['routers']))
self.assertEqual(router_id, res['routers'][0]['id'])
def test_router_update(self):
router_id = _uuid()
update_data = {'router': {'admin_state_up': False}}
return_value = {'name': 'router1', 'admin_state_up': False,
'tenant_id': _uuid(),
'status': "ACTIVE", 'id': router_id}
instance = self.plugin.return_value
instance.update_router.return_value = return_value
res = self.api.put(_get_path('routers', id=router_id,
fmt=self.fmt),
self.serialize(update_data))
instance.update_router.assert_called_with(mock.ANY, router_id,
router=update_data)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('router', res)
router = res['router']
self.assertEqual(router_id, router['id'])
self.assertEqual("ACTIVE", router['status'])
self.assertFalse(router['admin_state_up'])
def test_router_get(self):
router_id = _uuid()
return_value = {'name': 'router1', 'admin_state_up': False,
'tenant_id': _uuid(),
'status': "ACTIVE", 'id': router_id}
instance = self.plugin.return_value
instance.get_router.return_value = return_value
res = self.api.get(_get_path('routers', id=router_id,
fmt=self.fmt))
instance.get_router.assert_called_with(mock.ANY, router_id,
fields=mock.ANY)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('router', res)
router = res['router']
self.assertEqual(router_id, router['id'])
self.assertEqual("ACTIVE", router['status'])
self.assertFalse(router['admin_state_up'])
def test_router_delete(self):
router_id = _uuid()
res = self.api.delete(_get_path('routers', id=router_id))
instance = self.plugin.return_value
instance.delete_router.assert_called_with(mock.ANY, router_id)
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
def test_router_add_interface(self):
router_id = _uuid()
subnet_id = _uuid()
port_id = _uuid()
interface_data = {'subnet_id': subnet_id}
return_value = copy.deepcopy(interface_data)
return_value['port_id'] = port_id
instance = self.plugin.return_value
instance.add_router_interface.return_value = return_value
path = _get_path('routers', id=router_id,
action="add_router_interface",
fmt=self.fmt)
res = self.api.put(path, self.serialize(interface_data))
instance.add_router_interface.assert_called_with(mock.ANY, router_id,
interface_data)
self.assertEqual(exc.HTTPOk.code, res.status_int)
res = self.deserialize(res)
self.assertIn('port_id', res)
self.assertEqual(port_id, res['port_id'])
self.assertEqual(subnet_id, res['subnet_id'])
def test_router_add_interface_empty_body(self):
router_id = _uuid()
instance = self.plugin.return_value
path = _get_path('routers', id=router_id,
action="add_router_interface",
fmt=self.fmt)
res = self.api.put(path)
self.assertEqual(exc.HTTPOk.code, res.status_int)
instance.add_router_interface.assert_called_with(mock.ANY, router_id)
class TestL3PluginBaseAttributes(object):
IP_UPDATE_NOT_ALLOWED_LIST = [
lib_constants.DEVICE_OWNER_ROUTER_INTF,
lib_constants.DEVICE_OWNER_ROUTER_HA_INTF,
lib_constants.DEVICE_OWNER_HA_REPLICATED_INT,
lib_constants.DEVICE_OWNER_ROUTER_SNAT,
lib_constants.DEVICE_OWNER_DVR_INTERFACE]
def router_supports_scheduling(self, context, router_id):
return True
# This base plugin class is for tests.
class TestL3NatBasePlugin(TestL3PluginBaseAttributes,
db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin):
__native_pagination_support = True
__native_sorting_support = True
def create_network(self, context, network):
session = context.session
with session.begin(subtransactions=True):
net = super(TestL3NatBasePlugin, self).create_network(context,
network)
self._process_l3_create(context, net, network['network'])
return net
def update_network(self, context, id, network):
session = context.session
with session.begin(subtransactions=True):
net = super(TestL3NatBasePlugin, self).update_network(context, id,
network)
self._process_l3_update(context, net, network['network'])
return net
def delete_port(self, context, id, l3_port_check=True):
plugin = directory.get_plugin(plugin_constants.L3)
if plugin:
if l3_port_check:
plugin.prevent_l3_port_deletion(context, id)
plugin.disassociate_floatingips(context, id)
return super(TestL3NatBasePlugin, self).delete_port(context, id)
def update_port(self, context, id, port):
original_port = self.get_port(context, id)
session = context.session
with session.begin(subtransactions=True):
new_port = super(TestL3NatBasePlugin, self).update_port(
context, id, port)
# Notifications must be sent after the above transaction is complete
kwargs = {
'context': context,
'port': new_port,
'original_port': original_port,
}
registry.notify(resources.PORT, events.AFTER_UPDATE, self, **kwargs)
return new_port
# This plugin class is for tests with plugin that integrates L3.
class TestL3NatIntPlugin(TestL3NatBasePlugin,
l3_db.L3_NAT_db_mixin, dns_db.DNSDbMixin):
__native_pagination_support = True
__native_sorting_support = True
supported_extension_aliases = ["external-net", "router", "dns-integration"]
# This plugin class is for tests with plugin that integrates L3 and L3 agent
# scheduling.
class TestL3NatIntAgentSchedulingPlugin(TestL3NatIntPlugin,
l3_agentschedulers_db.
L3AgentSchedulerDbMixin,
l3_hamode_db.L3_HA_NAT_db_mixin):
supported_extension_aliases = ["external-net", "router",
"l3_agent_scheduler"]
router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
# This plugin class is for tests with plugin not supporting L3.
class TestNoL3NatPlugin(TestL3NatBasePlugin):
__native_pagination_support = True
__native_sorting_support = True
supported_extension_aliases = ["external-net"]
# A L3 routing service plugin class for tests with plugins that
# delegate away L3 routing functionality
class TestL3NatServicePlugin(TestL3PluginBaseAttributes,
common_db_mixin.CommonDbMixin,
l3_dvr_db.L3_NAT_with_dvr_db_mixin,
l3_db.L3_NAT_db_mixin, dns_db.DNSDbMixin):
__native_pagination_support = True
__native_sorting_support = True
supported_extension_aliases = ["router", "dns-integration"]
@classmethod
def get_plugin_type(cls):
return plugin_constants.L3
def get_plugin_description(self):
return "L3 Routing Service Plugin for testing"
# A L3 routing with L3 agent scheduling service plugin class for tests with
# plugins that delegate away L3 routing functionality
class TestL3NatAgentSchedulingServicePlugin(TestL3NatServicePlugin,
l3_dvrscheduler_db.
L3_DVRsch_db_mixin,
l3_hamode_db.L3_HA_NAT_db_mixin):
supported_extension_aliases = ["router", "l3_agent_scheduler"]
def __init__(self):
super(TestL3NatAgentSchedulingServicePlugin, self).__init__()
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
self.agent_notifiers.update(
{lib_constants.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
class L3NatTestCaseMixin(object):
def _create_router(self, fmt, tenant_id, name=None,
admin_state_up=None, set_context=False,
arg_list=None, **kwargs):
tenant_id = tenant_id or _uuid()
data = {'router': {'tenant_id': tenant_id}}
if name:
data['router']['name'] = name
if admin_state_up is not None:
data['router']['admin_state_up'] = admin_state_up
flavor_id = kwargs.get('flavor_id', None)
if flavor_id:
data['router']['flavor_id'] = flavor_id
for arg in (('admin_state_up', 'tenant_id',
'availability_zone_hints') +
(arg_list or ())):
# Arg must be present and not empty
if arg in kwargs:
data['router'][arg] = kwargs[arg]
if 'distributed' in kwargs:
data['router']['distributed'] = bool(kwargs['distributed'])
router_req = self.new_create_request('routers', data, fmt)
if set_context and tenant_id:
# create a specific auth context for this request
router_req.environ['neutron.context'] = context.Context(
'', tenant_id)
return router_req.get_response(self.ext_api)
def _make_router(self, fmt, tenant_id, name=None, admin_state_up=None,
external_gateway_info=None, set_context=False,
arg_list=None, **kwargs):
if external_gateway_info:
arg_list = ('external_gateway_info', ) + (arg_list or ())
res = self._create_router(fmt, tenant_id, name,
admin_state_up, set_context,
arg_list=arg_list,
external_gateway_info=external_gateway_info,
**kwargs)
return self.deserialize(fmt, res)
def _add_external_gateway_to_router(self, router_id, network_id,
expected_code=exc.HTTPOk.code,
neutron_context=None, ext_ips=None,
**kwargs):
ext_ips = ext_ips or []
body = {'router':
{'external_gateway_info': {'network_id': network_id}}}
if ext_ips:
body['router']['external_gateway_info'][
'external_fixed_ips'] = ext_ips
if 'policy_id' in kwargs:
body['router']['external_gateway_info'][
'qos_policy_id'] = kwargs.get('policy_id')
return self._update('routers', router_id, body,
expected_code=expected_code,
neutron_context=neutron_context)
def _remove_external_gateway_from_router(self, router_id, network_id,
expected_code=exc.HTTPOk.code,
external_gw_info=None):
return self._update('routers', router_id,
{'router': {'external_gateway_info':
external_gw_info}},
expected_code=expected_code)
def _router_interface_action(self, action, router_id, subnet_id, port_id,
expected_code=exc.HTTPOk.code,
expected_body=None,
tenant_id=None,
msg=None):
interface_data = {}
if subnet_id is not None:
interface_data.update({'subnet_id': subnet_id})
if port_id is not None:
interface_data.update({'port_id': port_id})
req = self.new_action_request('routers', interface_data, router_id,
"%s_router_interface" % action)
# if tenant_id was specified, create a tenant context for this request
if tenant_id:
req.environ['neutron.context'] = context.Context(
'', tenant_id)
res = req.get_response(self.ext_api)
self.assertEqual(expected_code, res.status_int, msg)
response = self.deserialize(self.fmt, res)
if expected_body:
self.assertEqual(expected_body, response, msg)
return response
@contextlib.contextmanager
def router(self, name='router1', admin_state_up=True,
fmt=None, tenant_id=None,
external_gateway_info=None, set_context=False,
**kwargs):
router = self._make_router(fmt or self.fmt, tenant_id, name,
admin_state_up, external_gateway_info,
set_context, **kwargs)
yield router
def _set_net_external(self, net_id):
self._update('networks', net_id,
{'network': {extnet_apidef.EXTERNAL: True}})
def _create_floatingip(self, fmt, network_id, port_id=None,
fixed_ip=None, set_context=False,
floating_ip=None, subnet_id=None,
tenant_id=None, **kwargs):
tenant_id = tenant_id or self._tenant_id
data = {'floatingip': {'floating_network_id': network_id,
'tenant_id': tenant_id}}
if port_id:
data['floatingip']['port_id'] = port_id
if fixed_ip:
data['floatingip']['fixed_ip_address'] = fixed_ip
if floating_ip:
data['floatingip']['floating_ip_address'] = floating_ip
if subnet_id:
data['floatingip']['subnet_id'] = subnet_id
data['floatingip'].update(kwargs)
floatingip_req = self.new_create_request('floatingips', data, fmt)
if set_context and tenant_id:
# create a specific auth context for this request
floatingip_req.environ['neutron.context'] = context.Context(
'', tenant_id)
return floatingip_req.get_response(self.ext_api)
def _make_floatingip(self, fmt, network_id, port_id=None,
fixed_ip=None, set_context=False, tenant_id=None,
floating_ip=None, http_status=exc.HTTPCreated.code,
**kwargs):
res = self._create_floatingip(fmt, network_id, port_id,
fixed_ip, set_context, floating_ip,
tenant_id=tenant_id, **kwargs)
self.assertEqual(http_status, res.status_int)
return self.deserialize(fmt, res)
def _validate_floating_ip(self, fip):
body = self._list('floatingips')
self.assertEqual(1, len(body['floatingips']))
self.assertEqual(body['floatingips'][0]['id'],
fip['floatingip']['id'])
body = self._show('floatingips', fip['floatingip']['id'])
self.assertEqual(body['floatingip']['id'],
fip['floatingip']['id'])
@contextlib.contextmanager
def floatingip_with_assoc(self, port_id=None, fmt=None, fixed_ip=None,
public_cidr='11.0.0.0/24', set_context=False,
tenant_id=None, flavor_id=None, **kwargs):
with self.subnet(cidr=public_cidr,
set_context=set_context,
tenant_id=tenant_id) as public_sub:
self._set_net_external(public_sub['subnet']['network_id'])
args_list = {'set_context': set_context,
'tenant_id': tenant_id}
if flavor_id:
args_list['flavor_id'] = flavor_id
private_port = None
if port_id:
private_port = self._show('ports', port_id)
with test_db_base_plugin_v2.optional_ctx(
private_port, self.port,
set_context=set_context,
tenant_id=tenant_id) as private_port:
with self.router(**args_list) as r:
sid = private_port['port']['fixed_ips'][0]['subnet_id']
private_sub = {'subnet': {'id': sid}}
floatingip = None
self._add_external_gateway_to_router(
r['router']['id'],
public_sub['subnet']['network_id'])
self._router_interface_action(
'add', r['router']['id'],
private_sub['subnet']['id'], None)
floatingip = self._make_floatingip(
fmt or self.fmt,
public_sub['subnet']['network_id'],
port_id=private_port['port']['id'],
fixed_ip=fixed_ip,
tenant_id=tenant_id,
set_context=set_context,
**kwargs)
yield floatingip
if floatingip:
self._delete('floatingips',
floatingip['floatingip']['id'])
@contextlib.contextmanager
def floatingip_no_assoc_with_public_sub(
self, private_sub, fmt=None, set_context=False,
public_sub=None, flavor_id=None, **kwargs):
self._set_net_external(public_sub['subnet']['network_id'])
args_list = {}
if flavor_id:
# NOTE(manjeets) Flavor id None is not accepted
# and return Flavor None not found error. So for
# neutron testing this argument should not be passed
# at all to router.
args_list['flavor_id'] = flavor_id
with self.router(**args_list) as r:
floatingip = None
self._add_external_gateway_to_router(
r['router']['id'],
public_sub['subnet']['network_id'])
self._router_interface_action('add', r['router']['id'],
private_sub['subnet']['id'],
None)
floatingip = self._make_floatingip(
fmt or self.fmt,
public_sub['subnet']['network_id'],
set_context=set_context,
**kwargs)
yield floatingip, r
if floatingip:
self._delete('floatingips',
floatingip['floatingip']['id'])
@contextlib.contextmanager
def floatingip_no_assoc(self, private_sub, fmt=None,
set_context=False, flavor_id=None, **kwargs):
with self.subnet(cidr='12.0.0.0/24') as public_sub:
with self.floatingip_no_assoc_with_public_sub(
private_sub, fmt, set_context, public_sub,
flavor_id, **kwargs) as (f, r):
# Yield only the floating ip object
yield f
class ExtraAttributesMixinTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(ExtraAttributesMixinTestCase, self).setUp()
self.mixin = l3_attrs_db.ExtraAttributesMixin()
directory.add_plugin(plugin_constants.L3, self.mixin)
self.ctx = context.get_admin_context()
self.router = l3_models.Router()
with self.ctx.session.begin():
self.ctx.session.add(self.router)
def _get_default_api_values(self):
return {k: v.get('transform_from_db', lambda x: x)(v['default'])
for k, v in l3_attrs_db.get_attr_info().items()}
def test_set_extra_attr_key_bad(self):
with testtools.ExpectedException(RuntimeError):
with self.ctx.session.begin():
self.mixin.set_extra_attr_value(self.ctx, self.router,
'bad', 'value')
def test_set_attrs_and_extend_no_transaction(self):
with testtools.ExpectedException(RuntimeError):
self.mixin.set_extra_attr_value(self.ctx, self.router,
'ha_vr_id', 99)
def test__extend_extra_router_dict_defaults(self):
rdict = {}
self.mixin._extend_extra_router_dict(rdict, self.router)
self.assertEqual(self._get_default_api_values(), rdict)
def test_set_attrs_and_extend(self):
with self.ctx.session.begin():
self.mixin.set_extra_attr_value(self.ctx, self.router,
'ha_vr_id', 99)
self.mixin.set_extra_attr_value(self.ctx, self.router,
'availability_zone_hints',
['x', 'y', 'z'])
expected = self._get_default_api_values()
expected.update({'ha_vr_id': 99,
'availability_zone_hints': ['x', 'y', 'z']})
rdict = {}
self.mixin._extend_extra_router_dict(rdict, self.router)
self.assertEqual(expected, rdict)
with self.ctx.session.begin():
self.mixin.set_extra_attr_value(self.ctx, self.router,
'availability_zone_hints',
['z', 'y', 'z'])
expected['availability_zone_hints'] = ['z', 'y', 'z']
self.mixin._extend_extra_router_dict(rdict, self.router)
self.assertEqual(expected, rdict)
class L3NatTestCaseBase(L3NatTestCaseMixin):
def test_router_create(self):
name = 'router1'
tenant_id = _uuid()
expected_value = [('name', name), ('tenant_id', tenant_id),
('admin_state_up', True), ('status', 'ACTIVE'),
('external_gateway_info', None)]
with self.router(name='router1', admin_state_up=True,
tenant_id=tenant_id) as router:
for k, v in expected_value:
self.assertEqual(router['router'][k], v)
def test_router_create_call_extensions(self):
self.extension_called = False
def _extend_router_dict_test_attr(*args, **kwargs):
self.extension_called = True
resource_extend.register_funcs(
l3_apidef.ROUTERS, [_extend_router_dict_test_attr])
self.assertFalse(self.extension_called)
with self.router():
self.assertTrue(self.extension_called)
def test_router_create_with_gwinfo(self):
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
data = {'router': {'tenant_id': _uuid()}}
data['router']['name'] = 'router1'
data['router']['external_gateway_info'] = {
'network_id': s['subnet']['network_id']}
router_req = self.new_create_request('routers', data, self.fmt)
res = router_req.get_response(self.ext_api)
router = self.deserialize(self.fmt, res)
self.assertEqual(
s['subnet']['network_id'],
router['router']['external_gateway_info']['network_id'])
def test_router_create_with_gwinfo_ext_ip(self):
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
ext_info = {
'network_id': s['subnet']['network_id'],
'external_fixed_ips': [{'ip_address': '10.0.0.99'}]
}
res = self._create_router(
self.fmt, _uuid(), arg_list=('external_gateway_info',),
external_gateway_info=ext_info
)
router = self.deserialize(self.fmt, res)
self.assertEqual(
[{'ip_address': '10.0.0.99', 'subnet_id': s['subnet']['id']}],
router['router']['external_gateway_info'][
'external_fixed_ips'])
def test_router_create_with_gwinfo_ext_ip_subnet(self):
with self.network() as n:
with self.subnet(network=n) as v1,\
self.subnet(network=n, cidr='1.0.0.0/24') as v2,\
self.subnet(network=n, cidr='2.0.0.0/24') as v3:
subnets = (v1, v2, v3)
self._set_net_external(n['network']['id'])
for s in subnets:
ext_info = {
'network_id': n['network']['id'],
'external_fixed_ips': [
{'subnet_id': s['subnet']['id']}]
}
res = self._create_router(
self.fmt, _uuid(), arg_list=('external_gateway_info',),
external_gateway_info=ext_info
)
router = self.deserialize(self.fmt, res)
ext_ips = router['router']['external_gateway_info'][
'external_fixed_ips']
self.assertEqual(
[{'subnet_id': s['subnet']['id'],
'ip_address': mock.ANY}], ext_ips)
def test_router_create_with_gwinfo_ext_ip_non_admin(self):
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
ext_info = {
'network_id': s['subnet']['network_id'],
'external_fixed_ips': [{'ip_address': '10.0.0.99'}]
}
res = self._create_router(
self.fmt, _uuid(), arg_list=('external_gateway_info',),
set_context=True, external_gateway_info=ext_info
)
self.assertEqual(exc.HTTPForbidden.code, res.status_int)
def test_create_routers_native_quotas(self):
tenant_id = _uuid()
quota = 1
cfg.CONF.set_override('quota_router', quota, group='QUOTAS')
res = self._create_router(self.fmt, tenant_id)
self.assertEqual(exc.HTTPCreated.code, res.status_int)
res = self._create_router(self.fmt, tenant_id)
self.assertEqual(exc.HTTPConflict.code, res.status_int)
def test_router_list(self):
with self.router() as v1, self.router() as v2, self.router() as v3:
routers = (v1, v2, v3)
self._test_list_resources('router', routers)
def test_router_list_with_parameters(self):
with self.router(name='router1') as router1,\
self.router(name='router2') as router2:
query_params = 'name=router1'
self._test_list_resources('router', [router1],
query_params=query_params)
query_params = 'name=router2'
self._test_list_resources('router', [router2],
query_params=query_params)
query_params = 'name=router3'
self._test_list_resources('router', [],
query_params=query_params)
def test_router_list_with_sort(self):
with self.router(name='router1') as router1,\
self.router(name='router2') as router2,\
self.router(name='router3') as router3:
self._test_list_with_sort('router', (router3, router2, router1),
[('name', 'desc')])
def test_router_list_with_pagination(self):
with self.router(name='router1') as router1,\
self.router(name='router2') as router2,\
self.router(name='router3') as router3:
self._test_list_with_pagination('router',
(router1, router2, router3),
('name', 'asc'), 2, 2)
def test_router_list_with_pagination_reverse(self):
with self.router(name='router1') as router1,\
self.router(name='router2') as router2,\
self.router(name='router3') as router3:
self._test_list_with_pagination_reverse('router',
(router1, router2,
router3),
('name', 'asc'), 2, 2)
def test_router_update(self):
rname1 = "yourrouter"
rname2 = "nachorouter"
with self.router(name=rname1) as r:
body = self._show('routers', r['router']['id'])
self.assertEqual(body['router']['name'], rname1)
body = self._update('routers', r['router']['id'],
{'router': {'name': rname2}})
body = self._show('routers', r['router']['id'])
self.assertEqual(body['router']['name'], rname2)
def test_router_update_gateway(self):
with self.router() as r:
with self.subnet() as s1:
with self.subnet() as s2:
self._set_net_external(s1['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
s1['subnet']['network_id'])
body = self._show('routers', r['router']['id'])
net_id = (body['router']
['external_gateway_info']['network_id'])
self.assertEqual(net_id, s1['subnet']['network_id'])
self._set_net_external(s2['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
s2['subnet']['network_id'])
body = self._show('routers', r['router']['id'])
net_id = (body['router']
['external_gateway_info']['network_id'])
self.assertEqual(net_id, s2['subnet']['network_id'])
# Validate that we can clear the gateway with
# an empty dict, in any other case, we fall back
# on None as default value
self._remove_external_gateway_from_router(
r['router']['id'],
s2['subnet']['network_id'],
external_gw_info={})
def test_router_update_gateway_with_external_ip_used_by_gw(self):
with self.router() as r:
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
s['subnet']['network_id'],
ext_ips=[{'ip_address': s['subnet']['gateway_ip']}],
expected_code=exc.HTTPBadRequest.code)
def test_router_update_gateway_with_invalid_external_ip(self):
with self.router() as r:
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
s['subnet']['network_id'],
ext_ips=[{'ip_address': '99.99.99.99'}],
expected_code=exc.HTTPBadRequest.code)
def test_router_update_gateway_with_invalid_external_subnet(self):
with self.subnet() as s1,\
self.subnet(cidr='1.0.0.0/24') as s2,\
self.router() as r:
self._set_net_external(s1['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
s1['subnet']['network_id'],
# this subnet is not on the same network so this should fail
ext_ips=[{'subnet_id': s2['subnet']['id']}],
expected_code=exc.HTTPBadRequest.code)
def test_router_update_gateway_with_different_external_subnet(self):
with self.network() as n:
with self.subnet(network=n) as s1,\
self.subnet(network=n, cidr='1.0.0.0/24') as s2,\
self.router() as r:
self._set_net_external(n['network']['id'])
res1 = self._add_external_gateway_to_router(
r['router']['id'],
n['network']['id'],
ext_ips=[{'subnet_id': s1['subnet']['id']}])
res2 = self._add_external_gateway_to_router(
r['router']['id'],
n['network']['id'],
ext_ips=[{'subnet_id': s2['subnet']['id']}])
fip1 = res1['router']['external_gateway_info']['external_fixed_ips'][0]
fip2 = res2['router']['external_gateway_info']['external_fixed_ips'][0]
self.assertEqual(s1['subnet']['id'], fip1['subnet_id'])
self.assertEqual(s2['subnet']['id'], fip2['subnet_id'])
self.assertNotEqual(fip1['subnet_id'], fip2['subnet_id'])
self.assertNotEqual(fip1['ip_address'], fip2['ip_address'])
def test_router_update_gateway_with_existed_floatingip(self):
with self.subnet() as subnet:
self._set_net_external(subnet['subnet']['network_id'])
with self.floatingip_with_assoc() as fip:
self._add_external_gateway_to_router(
fip['floatingip']['router_id'],
subnet['subnet']['network_id'],
expected_code=exc.HTTPConflict.code)
def test_router_update_gateway_to_empty_with_existed_floatingip(self):
with self.floatingip_with_assoc() as fip:
self._remove_external_gateway_from_router(
fip['floatingip']['router_id'], None,
expected_code=exc.HTTPConflict.code)
def test_router_update_gateway_add_multiple_prefixes_ipv6(self):
with self.network() as n:
with self.subnet(network=n) as s1, \
self.subnet(network=n, ip_version=lib_constants.IP_VERSION_6,
cidr='2001:db8::/32') \
as s2, (self.router()) as r:
self._set_net_external(n['network']['id'])
res1 = self._add_external_gateway_to_router(
r['router']['id'],
n['network']['id'],
ext_ips=[{'subnet_id': s1['subnet']['id']}])
fip1 = (res1['router']['external_gateway_info']
['external_fixed_ips'][0])
self.assertEqual(s1['subnet']['id'], fip1['subnet_id'])
res2 = self._add_external_gateway_to_router(
r['router']['id'],
n['network']['id'],
ext_ips=[{'ip_address': fip1['ip_address'],
'subnet_id': s1['subnet']['id']},
{'subnet_id': s2['subnet']['id']}])
self.assertEqual(fip1, res2['router']['external_gateway_info']
['external_fixed_ips'][0])
fip2 = (res2['router']['external_gateway_info']
['external_fixed_ips'][1])
self.assertEqual(s2['subnet']['id'], fip2['subnet_id'])
self.assertNotEqual(fip1['subnet_id'],
fip2['subnet_id'])
self.assertNotEqual(fip1['ip_address'],
fip2['ip_address'])
def test_router_concurrent_delete_upon_subnet_create(self):
with self.network() as n:
with self.subnet(network=n) as s1, self.router() as r:
self._set_net_external(n['network']['id'])
self._add_external_gateway_to_router(
r['router']['id'],
n['network']['id'],
ext_ips=[{'subnet_id': s1['subnet']['id']}])
plugin = directory.get_plugin(plugin_constants.L3)
mock.patch.object(
plugin, 'update_router',
side_effect=l3_exc.RouterNotFound(router_id='1')).start()
# ensure the router disappearing doesn't interfere with subnet
# creation
self._create_subnet(self.fmt, net_id=n['network']['id'],
ip_version=lib_constants.IP_VERSION_6,
cidr='2001:db8::/32',
expected_res_status=(exc.HTTPCreated.code))
def test_router_update_gateway_upon_subnet_create_ipv6(self):
with self.network() as n:
with self.subnet(network=n) as s1, self.router() as r:
self._set_net_external(n['network']['id'])
res1 = self._add_external_gateway_to_router(
r['router']['id'],
n['network']['id'],
ext_ips=[{'subnet_id': s1['subnet']['id']}])
fip1 = (res1['router']['external_gateway_info']
['external_fixed_ips'][0])
sres = self._create_subnet(self.fmt, net_id=n['network']['id'],
ip_version=lib_constants.IP_VERSION_6,
cidr='2001:db8::/32',
expected_res_status=(
exc.HTTPCreated.code))
s2 = self.deserialize(self.fmt, sres)
res2 = self._show('routers', r['router']['id'])
self.assertEqual(fip1, res2['router']['external_gateway_info']
['external_fixed_ips'][0])
fip2 = (res2['router']['external_gateway_info']
['external_fixed_ips'][1])
self.assertEqual(s2['subnet']['id'], fip2['subnet_id'])
self.assertNotEqual(fip1['subnet_id'], fip2['subnet_id'])
self.assertNotEqual(fip1['ip_address'], fip2['ip_address'])
def test_router_update_gateway_upon_subnet_create_max_ips_ipv6(self):
"""Create subnet should not cause excess fixed IPs on router gw
If a router gateway port has the maximum of one IPv4 and one IPv6
fixed, create subnet should not add any more IP addresses to the port
(unless this is the subnet is a SLAAC/DHCPv6-stateless subnet in which
case the addresses are added automatically)
"""
with self.router() as r, self.network() as n:
with self.subnet(cidr='10.0.0.0/24', network=n) as s1, (
self.subnet(ip_version=lib_constants.IP_VERSION_6,
cidr='2001:db8::/64',
network=n)) as s2:
self._set_net_external(n['network']['id'])
self._add_external_gateway_to_router(
r['router']['id'],
n['network']['id'],
ext_ips=[{'subnet_id': s1['subnet']['id']},
{'subnet_id': s2['subnet']['id']}],
expected_code=exc.HTTPOk.code)
res1 = self._show('routers', r['router']['id'])
original_fips = (res1['router']['external_gateway_info']
['external_fixed_ips'])
# Add another IPv4 subnet - a fip SHOULD NOT be added
# to the external gateway port as it already has a v4 address
self._create_subnet(self.fmt, net_id=n['network']['id'],
cidr='10.0.1.0/24')
res2 = self._show('routers', r['router']['id'])
self.assertEqual(original_fips,
res2['router']['external_gateway_info']
['external_fixed_ips'])
# Add a SLAAC subnet - a fip from this subnet SHOULD be added
# to the external gateway port
s3 = self.deserialize(self.fmt,
self._create_subnet(self.fmt,
net_id=n['network']['id'],
ip_version=lib_constants.IP_VERSION_6,
cidr='2001:db8:1::/64',
ipv6_ra_mode=lib_constants.IPV6_SLAAC,
ipv6_address_mode=lib_constants.IPV6_SLAAC))
res3 = self._show('routers', r['router']['id'])
fips = (res3['router']['external_gateway_info']
['external_fixed_ips'])
fip_subnet_ids = [fip['subnet_id'] for fip in fips]
self.assertIn(s1['subnet']['id'], fip_subnet_ids)
self.assertIn(s2['subnet']['id'], fip_subnet_ids)
self.assertIn(s3['subnet']['id'], fip_subnet_ids)
self._remove_external_gateway_from_router(
r['router']['id'],
n['network']['id'])
def _test_router_add_interface_subnet(self, router, subnet, msg=None):
exp_notifications = ['router.create.start',
'router.create.end',
'network.create.start',
'network.create.end',
'subnet.create.start',
'subnet.create.end',
'router.interface.create',
'router.interface.delete']
body = self._router_interface_action('add',
router['router']['id'],
subnet['subnet']['id'],
None)
self.assertIn('port_id', body, msg)
# fetch port and confirm device_id
r_port_id = body['port_id']
port = self._show('ports', r_port_id)
self.assertEqual(port['port']['device_id'],
router['router']['id'], msg)
self._router_interface_action('remove',
router['router']['id'],
subnet['subnet']['id'],
None)
self._show('ports', r_port_id,
expected_code=exc.HTTPNotFound.code)
self.assertEqual(
set(exp_notifications),
set(n['event_type'] for n in fake_notifier.NOTIFICATIONS), msg)
for n in fake_notifier.NOTIFICATIONS:
if n['event_type'].startswith('router.interface.'):
payload = n['payload']['router_interface']
self.assertIn('id', payload)
self.assertEqual(payload['id'], router['router']['id'])
self.assertIn('tenant_id', payload)
rtid = router['router']['tenant_id']
# tolerate subnet tenant deliberately set to '' in the
# nsx metadata access case
self.assertIn(payload['tenant_id'], [rtid, ''], msg)
def test_router_add_interface_bad_values(self):
with self.router() as r:
exp_code = exc.HTTPBadRequest.code
self._router_interface_action('add',
r['router']['id'],
False,
None,
expected_code=exp_code)
self._router_interface_action('add',
r['router']['id'],
None,
False,
expected_code=exp_code)
def test_router_add_interface_subnet(self):
fake_notifier.reset()
with self.router() as r:
with self.network() as n:
with self.subnet(network=n) as s:
self._test_router_add_interface_subnet(r, s)
def test_router_delete_race_with_interface_add(self):
# this test depends on protection from the revision plugin so
# we have to initialize it
revision_plugin.RevisionPlugin()
with self.router() as r, self.subnet() as s:
def jam_in_interface(*args, **kwargs):
self._router_interface_action('add', r['router']['id'],
s['subnet']['id'], None)
# unsubscribe now that the evil is done
registry.unsubscribe(jam_in_interface, resources.ROUTER,
events.PRECOMMIT_DELETE)
registry.subscribe(jam_in_interface, resources.ROUTER,
events.PRECOMMIT_DELETE)
self._delete('routers', r['router']['id'],
expected_code=exc.HTTPConflict.code)
def test_router_add_interface_ipv6_subnet(self):
"""Test router-interface-add for valid ipv6 subnets.
Verify the valid use-cases of an IPv6 subnet where we
are allowed to associate to the Neutron Router are successful.
"""
slaac = lib_constants.IPV6_SLAAC
stateful = lib_constants.DHCPV6_STATEFUL
stateless = lib_constants.DHCPV6_STATELESS
use_cases = [{'msg': 'IPv6 Subnet Modes (slaac, none)',
'ra_mode': slaac, 'address_mode': None},
{'msg': 'IPv6 Subnet Modes (none, none)',
'ra_mode': None, 'address_mode': None},
{'msg': 'IPv6 Subnet Modes (dhcpv6-stateful, none)',
'ra_mode': stateful, 'address_mode': None},
{'msg': 'IPv6 Subnet Modes (dhcpv6-stateless, none)',
'ra_mode': stateless, 'address_mode': None},
{'msg': 'IPv6 Subnet Modes (slaac, slaac)',
'ra_mode': slaac, 'address_mode': slaac},
{'msg': 'IPv6 Subnet Modes (dhcpv6-stateful,'
'dhcpv6-stateful)', 'ra_mode': stateful,
'address_mode': stateful},
{'msg': 'IPv6 Subnet Modes (dhcpv6-stateless,'
'dhcpv6-stateless)', 'ra_mode': stateless,
'address_mode': stateless}]
for uc in use_cases:
fake_notifier.reset()
with self.router() as r, self.network() as n:
with self.subnet(network=n, cidr='fd00::1/64',
gateway_ip='fd00::1',
ip_version=lib_constants.IP_VERSION_6,
ipv6_ra_mode=uc['ra_mode'],
ipv6_address_mode=uc['address_mode']) as s:
self._test_router_add_interface_subnet(r, s, uc['msg'])
def test_router_add_interface_multiple_ipv4_subnets(self):
"""Test router-interface-add for multiple ipv4 subnets.
Verify that adding multiple ipv4 subnets from the same network
to a router places them all on different router interfaces.
"""
with self.router() as r, self.network() as n:
with self.subnet(network=n, cidr='10.0.0.0/24') as s1, (
self.subnet(network=n, cidr='10.0.1.0/24')) as s2:
body = self._router_interface_action('add',
r['router']['id'],
s1['subnet']['id'],
None)
pid1 = body['port_id']
body = self._router_interface_action('add',
r['router']['id'],
s2['subnet']['id'],
None)
pid2 = body['port_id']
self.assertNotEqual(pid1, pid2)
self._router_interface_action('remove', r['router']['id'],
s1['subnet']['id'], None)
self._router_interface_action('remove', r['router']['id'],
s2['subnet']['id'], None)
def test_router_add_interface_multiple_ipv6_subnets_same_net(self):
"""Test router-interface-add for multiple ipv6 subnets on a network.
Verify that adding multiple ipv6 subnets from the same network
to a router places them all on the same router interface.
"""
with self.router() as r, self.network() as n:
with (self.subnet(network=n, cidr='fd00::1/64',
ip_version=lib_constants.IP_VERSION_6)
) as s1, self.subnet(network=n, cidr='fd01::1/64',
ip_version=lib_constants.IP_VERSION_6
) as s2:
body = self._router_interface_action('add',
r['router']['id'],
s1['subnet']['id'],
None)
pid1 = body['port_id']
body = self._router_interface_action('add',
r['router']['id'],
s2['subnet']['id'],
None)
pid2 = body['port_id']
self.assertEqual(pid1, pid2)
port = self._show('ports', pid1)
self.assertEqual(2, len(port['port']['fixed_ips']))
port_subnet_ids = [fip['subnet_id'] for fip in
port['port']['fixed_ips']]
self.assertIn(s1['subnet']['id'], port_subnet_ids)
self.assertIn(s2['subnet']['id'], port_subnet_ids)
self._router_interface_action('remove', r['router']['id'],
s1['subnet']['id'], None)
self._router_interface_action('remove', r['router']['id'],
s2['subnet']['id'], None)
def test_router_add_interface_multiple_ipv6_subnets_different_net(self):
"""Test router-interface-add for ipv6 subnets on different networks.
Verify that adding multiple ipv6 subnets from different networks
to a router places them on different router interfaces.
"""
with self.router() as r, self.network() as n1, self.network() as n2:
with (self.subnet(network=n1, cidr='fd00::1/64',
ip_version=lib_constants.IP_VERSION_6)
) as s1, self.subnet(network=n2, cidr='fd01::1/64',
ip_version=lib_constants.IP_VERSION_6
) as s2:
body = self._router_interface_action('add',
r['router']['id'],
s1['subnet']['id'],
None)
pid1 = body['port_id']
body = self._router_interface_action('add',
r['router']['id'],
s2['subnet']['id'],
None)
pid2 = body['port_id']
self.assertNotEqual(pid1, pid2)
self._router_interface_action('remove', r['router']['id'],
s1['subnet']['id'], None)
self._router_interface_action('remove', r['router']['id'],
s2['subnet']['id'], None)
def test_router_add_iface_ipv6_ext_ra_subnet_returns_400(self):
"""Test router-interface-add for in-valid ipv6 subnets.
Verify that an appropriate error message is displayed when
an IPv6 subnet configured to use an external_router for Router
Advertisements (i.e., ipv6_ra_mode is None and ipv6_address_mode
is not None) is attempted to associate with a Neutron Router.
"""
use_cases = [{'msg': 'IPv6 Subnet Modes (none, slaac)',
'ra_mode': None,
'address_mode': lib_constants.IPV6_SLAAC},
{'msg': 'IPv6 Subnet Modes (none, dhcpv6-stateful)',
'ra_mode': None,
'address_mode': lib_constants.DHCPV6_STATEFUL},
{'msg': 'IPv6 Subnet Modes (none, dhcpv6-stateless)',
'ra_mode': None,
'address_mode': lib_constants.DHCPV6_STATELESS}]
for uc in use_cases:
with self.router() as r, self.network() as n:
with self.subnet(network=n, cidr='fd00::1/64',
gateway_ip='fd00::1',
ip_version=lib_constants.IP_VERSION_6,
ipv6_ra_mode=uc['ra_mode'],
ipv6_address_mode=uc['address_mode']) as s:
exp_code = exc.HTTPBadRequest.code
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None,
expected_code=exp_code,
msg=uc['msg'])
def test_router_add_interface_ipv6_subnet_without_gateway_ip(self):
with self.router() as r:
with self.subnet(ip_version=lib_constants.IP_VERSION_6,
cidr='fe80::/64',
gateway_ip=None) as s:
error_code = exc.HTTPBadRequest.code
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None,
expected_code=error_code)
def test_router_add_interface_subnet_with_bad_tenant_returns_404(self):
tenant_id = _uuid()
with self.router(tenant_id=tenant_id, set_context=True) as r:
with self.network(tenant_id=tenant_id, set_context=True) as n:
with self.subnet(network=n, set_context=True) as s:
err_code = exc.HTTPNotFound.code
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None,
expected_code=err_code,
tenant_id='bad_tenant')
body = self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None)
self.assertIn('port_id', body)
self._router_interface_action('remove',
r['router']['id'],
s['subnet']['id'],
None,
expected_code=err_code,
tenant_id='bad_tenant')
def test_router_add_interface_by_subnet_other_tenant_subnet_returns_400(
self):
router_tenant_id = _uuid()
with self.router(tenant_id=router_tenant_id, set_context=True) as r:
with self.network(shared=True) as n:
with self.subnet(network=n) as s:
err_code = exc.HTTPBadRequest.code
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None,
expected_code=err_code,
tenant_id=router_tenant_id)
def _test_router_add_interface_by_port_allocation_pool(
self, out_of_pool=False, router_action_as_admin=False,
expected_code=exc.HTTPOk.code):
router_tenant_id = _uuid()
with self.router(tenant_id=router_tenant_id, set_context=True) as r:
with self.network(shared=True) as n:
with self.subnet(network=n) as s1, (
self.subnet(network=n, cidr='fd00::/64',
ip_version=lib_constants.IP_VERSION_6)
) as s2, (
self.subnet(network=n, cidr='fd01::/64',
ip_version=lib_constants.IP_VERSION_6)
) as s3:
fixed_ips = [{'subnet_id': s1['subnet']['id']},
{'subnet_id': s2['subnet']['id']},
{'subnet_id': s3['subnet']['id']}]
if out_of_pool:
fixed_ips[1] = {'subnet_id': s2['subnet']['id'],
'ip_address':
s2['subnet']['gateway_ip']}
with self.port(subnet=s1, fixed_ips=fixed_ips,
tenant_id=router_tenant_id) as p:
kwargs = {'expected_code': expected_code}
if not router_action_as_admin:
kwargs['tenant_id'] = router_tenant_id
self._router_interface_action(
'add', r['router']['id'], None, p['port']['id'],
**kwargs)
def test_router_add_interface_by_port_other_tenant_address_in_pool(
self):
self._test_router_add_interface_by_port_allocation_pool()
def test_router_add_interface_by_port_other_tenant_address_out_of_pool(
self):
self._test_router_add_interface_by_port_allocation_pool(
out_of_pool=True, expected_code=exc.HTTPBadRequest.code)
def test_router_add_interface_by_port_admin_address_out_of_pool(
self):
self._test_router_add_interface_by_port_allocation_pool(
out_of_pool=True, router_action_as_admin=True)
def test_router_add_interface_subnet_with_port_from_other_tenant(self):
tenant_id = _uuid()
other_tenant_id = _uuid()
with self.router(tenant_id=tenant_id) as r,\
self.network(tenant_id=tenant_id) as n1,\
self.network(tenant_id=other_tenant_id) as n2:
with self.subnet(network=n1, cidr='10.0.0.0/24') as s1,\
self.subnet(network=n2, cidr='10.1.0.0/24') as s2:
body = self._router_interface_action(
'add',
r['router']['id'],
s2['subnet']['id'],
None)
self.assertIn('port_id', body)
self._router_interface_action(
'add',
r['router']['id'],
s1['subnet']['id'],
None,
tenant_id=tenant_id)
self.assertIn('port_id', body)
def test_router_add_interface_port(self):
orig_update_port = self.plugin.update_port
with self.router() as r, (
self.port()) as p, (
mock.patch.object(self.plugin, 'update_port')) as update_port:
update_port.side_effect = orig_update_port
body = self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'])
self.assertIn('port_id', body)
self.assertEqual(p['port']['id'], body['port_id'])
expected_port_update = {
'device_owner': lib_constants.DEVICE_OWNER_ROUTER_INTF,
'device_id': r['router']['id']}
update_port.assert_any_call(
mock.ANY, p['port']['id'], {'port': expected_port_update})
# fetch port and confirm device_id
body = self._show('ports', p['port']['id'])
self.assertEqual(r['router']['id'], body['port']['device_id'])
# clean-up
self._router_interface_action('remove',
r['router']['id'],
None,
p['port']['id'])
def test_update_router_interface_port_ip_not_allowed(self):
with self.router() as r, (
self.port()) as p:
body = self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'])
self.assertIn('port_id', body)
self.assertEqual(p['port']['id'], body['port_id'])
body = self._show('ports', p['port']['id'])
self.assertEqual(r['router']['id'], body['port']['device_id'])
data = {'port': {'fixed_ips': [
{'ip_address': '1.1.1.1'},
{'ip_address': '2.2.2.2'}]}}
self._update('ports', p['port']['id'], data,
neutron_context=context.get_admin_context(),
expected_code=exc.HTTPBadRequest.code)
self._router_interface_action('remove',
r['router']['id'],
None,
p['port']['id'])
def test_router_add_interface_delete_port_after_failure(self):
with self.router() as r, self.subnet(enable_dhcp=False) as s:
plugin = directory.get_plugin()
# inject a failure in the update port that happens at the end
# to ensure the port gets deleted
with mock.patch.object(
plugin, 'update_port',
side_effect=n_exc.InvalidInput(error_message='x')):
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None,
exc.HTTPBadRequest.code)
self.assertFalse(plugin.get_ports(context.get_admin_context()))
def test_router_add_interface_dup_port(self):
'''This tests that if multiple routers add one port as their
interfaces. Only the first router's interface would be added
to this port. All the later requests would return exceptions.
'''
with self.router() as r1, self.router() as r2, self.network() as n:
with self.subnet(network=n) as s:
with self.port(subnet=s) as p:
self._router_interface_action('add',
r1['router']['id'],
None,
p['port']['id'])
# mock out the sequential check
plugin = 'neutron.db.l3_db.L3_NAT_dbonly_mixin'
check_p = mock.patch(plugin + '._check_router_port',
port_id=p['port']['id'],
device_id=r2['router']['id'],
return_value=p['port'])
checkport = check_p.start()
# do regular checkport after first skip
checkport.side_effect = check_p.stop()
self._router_interface_action('add',
r2['router']['id'],
None,
p['port']['id'],
exc.HTTPConflict.code)
# clean-up
self._router_interface_action('remove',
r1['router']['id'],
None,
p['port']['id'])
def test_update_router_interface_port_ipv6_subnet_ext_ra(self):
use_cases = [{'msg': 'IPv6 Subnet Modes (none, slaac)',
'ra_mode': None,
'address_mode': lib_constants.IPV6_SLAAC},
{'msg': 'IPv6 Subnet Modes (none, dhcpv6-stateful)',
'ra_mode': None,
'address_mode': lib_constants.DHCPV6_STATEFUL},
{'msg': 'IPv6 Subnet Modes (none, dhcpv6-stateless)',
'ra_mode': None,
'address_mode': lib_constants.DHCPV6_STATELESS}]
for uc in use_cases:
with self.network() as network, self.router() as router:
with self.subnet(
network=network, cidr='fd00::/64',
ip_version=lib_constants.IP_VERSION_6,
ipv6_ra_mode=uc['ra_mode'],
ipv6_address_mode=uc['address_mode']) as subnet:
fixed_ips = [{'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ips) as port:
self._router_interface_action(
'add',
router['router']['id'],
None,
port['port']['id'],
expected_code=exc.HTTPBadRequest.code,
msg=uc['msg'])
def _assert_body_port_id_and_update_port(self, body, mock_update_port,
port_id, device_id):
self.assertNotIn('port_id', body)
expected_port_update_before_update = {
'device_owner': lib_constants.DEVICE_OWNER_ROUTER_INTF,
'device_id': device_id}
expected_port_update_after_fail = {
'device_owner': '',
'device_id': ''}
mock_update_port.assert_has_calls(
[mock.call(
mock.ANY,
port_id,
{'port': expected_port_update_before_update}),
mock.call(
mock.ANY,
port_id,
{'port': expected_port_update_after_fail})],
any_order=False)
# fetch port and confirm device_id and device_owner
body = self._show('ports', port_id)
self.assertEqual('', body['port']['device_owner'])
self.assertEqual('', body['port']['device_id'])
def test_router_add_interface_multiple_ipv4_subnet_port_returns_400(self):
"""Test adding router port with multiple IPv4 subnets fails.
Multiple IPv4 subnets are not allowed on a single router port.
Ensure that adding a port with multiple IPv4 subnets to a router fails.
"""
with self.network() as n, self.router() as r:
with self.subnet(network=n, cidr='10.0.0.0/24') as s1, (
self.subnet(network=n, cidr='10.0.1.0/24')) as s2:
fixed_ips = [{'subnet_id': s1['subnet']['id']},
{'subnet_id': s2['subnet']['id']}]
orig_update_port = self.plugin.update_port
with self.port(subnet=s1, fixed_ips=fixed_ips) as p, (
mock.patch.object(self.plugin,
'update_port')) as update_port:
update_port.side_effect = orig_update_port
exp_code = exc.HTTPBadRequest.code
body = self._router_interface_action(
'add', r['router']['id'], None, p['port']['id'],
expected_code=exp_code)
self._assert_body_port_id_and_update_port(
body, update_port, p['port']['id'], r['router']['id'])
def test_router_add_interface_ipv6_port_existing_network_returns_400(self):
"""Ensure unique IPv6 router ports per network id.
Adding a router port containing one or more IPv6 subnets with the same
network id as an existing router port should fail. This is so
there is no ambiguity regarding on which port to add an IPv6 subnet
when executing router-interface-add with a subnet and no port.
"""
with self.network() as n, self.router() as r:
with self.subnet(network=n, cidr='fd00::/64',
ip_version=lib_constants.IP_VERSION_6) as s1, (
self.subnet(network=n, cidr='fd01::/64',
ip_version=lib_constants.IP_VERSION_6)) as s2:
orig_update_port = self.plugin.update_port
with self.port(subnet=s1) as p, (
mock.patch.object(self.plugin,
'update_port')) as update_port:
update_port.side_effect = orig_update_port
self._router_interface_action('add',
r['router']['id'],
s2['subnet']['id'],
None)
exp_code = exc.HTTPBadRequest.code
body = self._router_interface_action(
'add', r['router']['id'], None, p['port']['id'],
expected_code=exp_code)
self._assert_body_port_id_and_update_port(
body, update_port, p['port']['id'], r['router']['id'])
self._router_interface_action('remove',
r['router']['id'],
s2['subnet']['id'],
None)
def test_router_add_interface_multiple_ipv6_subnet_port(self):
"""A port with multiple IPv6 subnets can be added to a router
Create a port with multiple associated IPv6 subnets and attach
it to a router. The action should succeed.
"""
with self.network() as n, self.router() as r:
with self.subnet(network=n, cidr='fd00::/64',
ip_version=lib_constants.IP_VERSION_6) as s1, (
self.subnet(network=n, cidr='fd01::/64',
ip_version=lib_constants.IP_VERSION_6)) as s2:
fixed_ips = [{'subnet_id': s1['subnet']['id']},
{'subnet_id': s2['subnet']['id']}]
with self.port(subnet=s1, fixed_ips=fixed_ips) as p:
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'])
self._router_interface_action('remove',
r['router']['id'],
None,
p['port']['id'])
def test_router_add_interface_empty_port_and_subnet_ids(self):
with self.router() as r:
self._router_interface_action('add', r['router']['id'],
None, None,
expected_code=exc.
HTTPBadRequest.code)
def test_router_add_interface_port_bad_tenant_returns_404(self):
tenant_id = _uuid()
with self.router(tenant_id=tenant_id, set_context=True) as r:
with self.network(tenant_id=tenant_id, set_context=True) as n:
with self.subnet(tenant_id=tenant_id, network=n,
set_context=True) as s:
with self.port(tenant_id=tenant_id, subnet=s,
set_context=True) as p:
err_code = exc.HTTPNotFound.code
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'],
expected_code=err_code,
tenant_id='bad_tenant')
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'],
tenant_id=tenant_id)
# clean-up should fail as well
self._router_interface_action('remove',
r['router']['id'],
None,
p['port']['id'],
expected_code=err_code,
tenant_id='bad_tenant')
def test_router_add_interface_port_without_ips(self):
with self.network() as network, self.router() as r:
# Create a router port without ips
p = self._make_port(self.fmt, network['network']['id'],
device_owner=lib_constants.DEVICE_OWNER_ROUTER_INTF)
err_code = exc.HTTPBadRequest.code
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'],
expected_code=err_code)
def test_router_add_interface_dup_subnet1_returns_400(self):
with self.router() as r:
with self.subnet() as s:
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None)
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None,
expected_code=exc.
HTTPBadRequest.code)
def test_router_add_interface_dup_subnet2_returns_400(self):
with self.router() as r:
with self.subnet() as s1, self.subnet(cidr='1.0.0.0/24') as s2:
with self.port(subnet=s1) as p1, self.port(subnet=s2) as p2:
orig_update_port = self.plugin.update_port
with self.port(subnet=s1) as p3, (
mock.patch.object(self.plugin,
'update_port')) as update_port:
update_port.side_effect = orig_update_port
for p in [p1, p2]:
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'])
body = self._router_interface_action(
'add', r['router']['id'], None, p3['port']['id'],
expected_code=exc.HTTPBadRequest.code)
self._assert_body_port_id_and_update_port(
body, update_port, p3['port']['id'],
r['router']['id'])
def test_router_add_interface_overlapped_cidr_returns_400(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s1, self.subnet(
cidr='10.0.2.0/24') as s2:
self._router_interface_action('add',
r['router']['id'],
s1['subnet']['id'],
None)
self._router_interface_action('add',
r['router']['id'],
s2['subnet']['id'],
None)
def try_overlapped_cidr(cidr):
with self.subnet(cidr=cidr) as s3:
self._router_interface_action('add',
r['router']['id'],
s3['subnet']['id'],
None,
expected_code=exc.
HTTPBadRequest.code)
# another subnet with same cidr
try_overlapped_cidr('10.0.1.0/24')
try_overlapped_cidr('10.0.2.0/24')
# another subnet with overlapped cidr including s1
try_overlapped_cidr('10.0.0.0/16')
# another subnet with overlapped cidr including s2
try_overlapped_cidr('10.0.2.128/28')
def test_router_add_interface_no_data_returns_400(self):
with self.router() as r:
self._router_interface_action('add',
r['router']['id'],
None,
None,
expected_code=exc.
HTTPBadRequest.code)
def test_router_add_interface_with_both_ids_returns_400(self):
with self.router() as r:
with self.subnet() as s:
with self.port(subnet=s) as p:
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
p['port']['id'],
expected_code=exc.
HTTPBadRequest.code)
def test_router_add_interface_cidr_overlapped_with_gateway(self):
with self.router() as r, self.network() as ext_net:
with self.subnet(cidr='10.0.1.0/24') as s1, self.subnet(
network=ext_net, cidr='10.0.0.0/16') as s2:
ext_net_id = ext_net['network']['id']
self._set_net_external(ext_net_id)
self._add_external_gateway_to_router(
r['router']['id'], ext_net_id)
res = self._router_interface_action(
'add', r['router']['id'], s1['subnet']['id'], None,
expected_code=exc.HTTPBadRequest.code)
expected_msg = ("Bad router request: Cidr 10.0.1.0/24 of "
"subnet %(internal_subnet_id)s overlaps with "
"cidr 10.0.0.0/16 of subnet "
"%(external_subnet_id)s.") % {
"external_subnet_id": s2['subnet']['id'],
"internal_subnet_id": s1['subnet']['id']}
self.assertEqual(expected_msg, res['NeutronError']['message'])
# External network have multiple subnets.
with self.subnet(network=ext_net,
cidr='192.168.1.0/24') as s3, \
self.subnet(cidr='192.168.1.0/24') as s4:
res = self._router_interface_action(
'add', r['router']['id'], s4['subnet']['id'], None,
expected_code=exc.HTTPBadRequest.code)
expected_msg = (
"Bad router request: Cidr 192.168.1.0/24 of subnet "
"%(internal_subnet_id)s overlaps with cidr "
"192.168.1.0/24 of subnet %(external_subnet_id)s.") % {
"external_subnet_id": s3['subnet']['id'],
"internal_subnet_id": s4['subnet']['id']}
self.assertEqual(expected_msg,
res['NeutronError']['message'])
def test_router_set_gateway_cidr_overlapped_with_subnets(self):
with self.router() as r, self.network() as ext_net:
with self.subnet(network=ext_net, cidr='10.0.1.0/24') as s1, \
self.subnet(network=ext_net, cidr='10.0.2.0/24') as s2, \
self.subnet(cidr='10.0.2.0/24') as s3:
ext_net_id = ext_net['network']['id']
self._set_net_external(ext_net_id)
self._router_interface_action(
'add', r['router']['id'],
s3['subnet']['id'], None)
res = self._add_external_gateway_to_router(
r['router']['id'], ext_net_id,
ext_ips=[{'subnet_id': s1['subnet']['id']}],
expected_code=exc.HTTPBadRequest.code)
expected_msg = (
"Bad router request: Cidr 10.0.2.0/24 of subnet "
"%(external_subnet_id)s overlaps with cidr 10.0.2.0/24 of "
"subnet %(internal_subnet_id)s.") % {
"external_subnet_id": s2["subnet"]["id"],
"internal_subnet_id": s3["subnet"]["id"]}
self.assertEqual(expected_msg, res['NeutronError']['message'])
def test_router_add_interface_by_port_cidr_overlapped_with_gateway(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s1, self.subnet(
cidr='10.0.0.0/16') as s2:
with self.port(subnet=s1) as p:
self._set_net_external(s2['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
s2['subnet']['network_id'])
res = self._router_interface_action(
'add', r['router']['id'], None, p['port']['id'],
expected_code=exc.HTTPBadRequest.code)
expected_msg = (
"Bad router request: Cidr 10.0.1.0/24 of subnet "
"%(internal_subnet_id)s overlaps with cidr "
"10.0.0.0/16 of subnet %(external_subnet_id)s.") % {
"external_subnet_id": s2['subnet']['id'],
"internal_subnet_id": s1['subnet']['id']}
self.assertEqual(expected_msg,
res['NeutronError']['message'])
def test_router_add_gateway_dup_subnet1_returns_400(self):
with self.router() as r:
with self.subnet() as s:
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None)
self._set_net_external(s['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
s['subnet']['network_id'],
expected_code=exc.HTTPBadRequest.code)
def test_router_add_gateway_dup_subnet2_returns_400(self):
with self.router() as r:
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
s['subnet']['network_id'])
self._router_interface_action('add',
r['router']['id'],
s['subnet']['id'],
None,
expected_code=exc.
HTTPBadRequest.code)
def test_router_add_gateway_multiple_subnets_ipv6(self):
"""Ensure external gateway set doesn't add excess IPs on router gw
Setting the gateway of a router to an external network with more than
one IPv4 and one IPv6 subnet should only add an address from the first
IPv4 subnet, an address from the first IPv6-stateful subnet, and an
address from each IPv6-stateless (SLAAC and DHCPv6-stateless) subnet
"""
with self.router() as r, self.network() as n:
with self.subnet(
cidr='10.0.0.0/24', network=n) as s1, (
self.subnet(
cidr='10.0.1.0/24', network=n)) as s2, (
self.subnet(
cidr='2001:db8::/64', network=n,
ip_version=lib_constants.IP_VERSION_6,
ipv6_ra_mode=lib_constants.IPV6_SLAAC,
ipv6_address_mode=lib_constants.IPV6_SLAAC)) as s3, (
self.subnet(
cidr='2001:db8:1::/64', network=n,
ip_version=lib_constants.IP_VERSION_6,
ipv6_ra_mode=lib_constants.DHCPV6_STATEFUL,
ipv6_address_mode=lib_constants.DHCPV6_STATEFUL)) as s4, (
self.subnet(
cidr='2001:db8:2::/64', network=n,
ip_version=lib_constants.IP_VERSION_6,
ipv6_ra_mode=lib_constants.DHCPV6_STATELESS,
ipv6_address_mode=lib_constants.DHCPV6_STATELESS)) as s5:
self._set_net_external(n['network']['id'])
self._add_external_gateway_to_router(
r['router']['id'],
n['network']['id'])
res = self._show('routers', r['router']['id'])
fips = (res['router']['external_gateway_info']
['external_fixed_ips'])
fip_subnet_ids = {fip['subnet_id'] for fip in fips}
# one of s1 or s2 should be in the list.
if s1['subnet']['id'] in fip_subnet_ids:
self.assertEqual({s1['subnet']['id'],
s3['subnet']['id'],
s4['subnet']['id'],
s5['subnet']['id']},
fip_subnet_ids)
else:
self.assertEqual({s2['subnet']['id'],
s3['subnet']['id'],
s4['subnet']['id'],
s5['subnet']['id']},
fip_subnet_ids)
self._remove_external_gateway_from_router(
r['router']['id'],
n['network']['id'])
def test_router_add_and_remove_gateway(self):
with self.router() as r:
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
s['subnet']['network_id'])
body = self._show('routers', r['router']['id'])
net_id = body['router']['external_gateway_info']['network_id']
self.assertEqual(net_id, s['subnet']['network_id'])
self._remove_external_gateway_from_router(
r['router']['id'],
s['subnet']['network_id'])
body = self._show('routers', r['router']['id'])
gw_info = body['router']['external_gateway_info']
self.assertIsNone(gw_info)
def test_router_add_and_remove_gateway_tenant_ctx(self):
with self.router(tenant_id='noadmin',
set_context=True) as r:
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
ctx = context.Context('', 'noadmin')
self._add_external_gateway_to_router(
r['router']['id'],
s['subnet']['network_id'],
neutron_context=ctx)
body = self._show('routers', r['router']['id'])
net_id = body['router']['external_gateway_info']['network_id']
self.assertEqual(net_id, s['subnet']['network_id'])
self._remove_external_gateway_from_router(
r['router']['id'],
s['subnet']['network_id'])
body = self._show('routers', r['router']['id'])
gw_info = body['router']['external_gateway_info']
self.assertIsNone(gw_info)
def test_create_router_port_with_device_id_of_other_tenants_router(self):
with self.router() as admin_router:
with self.network(tenant_id='tenant_a',
set_context=True) as n:
with self.subnet(network=n):
for device_owner in lib_constants.ROUTER_INTERFACE_OWNERS:
self._create_port(
self.fmt, n['network']['id'],
tenant_id='tenant_a',
device_id=admin_router['router']['id'],
device_owner=device_owner,
set_context=<