OpenStack Networking (Neutron)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
neutron/neutron/tests/unit/db/test_db_base_plugin_v2.py

5635 lines
275 KiB

# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import copy
import itertools
import mock
import netaddr
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_utils import importutils
import six
from sqlalchemy import orm
from testtools import matchers
import webob.exc
import neutron
from neutron.api import api_common
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron.api.v2 import router
from neutron.callbacks import exceptions
from neutron.callbacks import registry
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.common import test_lib
from neutron.common import utils
from neutron import context
from neutron.db import db_base_plugin_common
from neutron.db import db_base_plugin_v2
from neutron.db import ipam_non_pluggable_backend as non_ipam
from neutron.db import models_v2
from neutron import manager
from neutron.tests import base
from neutron.tests import tools
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit import testlib_api
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
DEVICE_OWNER_COMPUTE = 'compute:None'
DEVICE_OWNER_NOT_COMPUTE = constants.DEVICE_OWNER_DHCP
def optional_ctx(obj, fallback):
if not obj:
return fallback()
@contextlib.contextmanager
def context_wrapper():
yield obj
return context_wrapper()
def _fake_get_pagination_helper(self, request):
return api_common.PaginationEmulatedHelper(request, self._primary_key)
def _fake_get_sorting_helper(self, request):
return api_common.SortingEmulatedHelper(request, self._attr_info)
# TODO(banix): Move the following method to ML2 db test module when ML2
# mechanism driver unit tests are corrected to use Ml2PluginV2TestCase
# instead of directly using NeutronDbPluginV2TestCase
def _get_create_db_method(resource):
ml2_method = '_create_%s_db' % resource
if hasattr(manager.NeutronManager.get_plugin(), ml2_method):
return ml2_method
else:
return 'create_%s' % resource
class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
fmt = 'json'
resource_prefix_map = {}
def setUp(self, plugin=None, service_plugins=None,
ext_mgr=None):
super(NeutronDbPluginV2TestCase, self).setUp()
cfg.CONF.set_override('notify_nova_on_port_status_changes', False)
cfg.CONF.set_override('allow_overlapping_ips', True)
# Make sure at each test according extensions for the plugin is loaded
extensions.PluginAwareExtensionManager._instance = None
# Save the attributes map in case the plugin will alter it
# loading extensions
self.useFixture(tools.AttributeMapMemento())
self._tenant_id = 'test-tenant'
if not plugin:
plugin = DB_PLUGIN_KLASS
# Update the plugin
self.setup_coreplugin(plugin)
cfg.CONF.set_override(
'service_plugins',
[test_lib.test_config.get(key, default)
for key, default in six.iteritems(service_plugins or {})]
)
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
cfg.CONF.set_override('max_dns_nameservers', 2)
cfg.CONF.set_override('max_subnet_host_routes', 2)
cfg.CONF.set_override('allow_pagination', True)
cfg.CONF.set_override('allow_sorting', True)
self.api = router.APIRouter()
# Set the defualt status
self.net_create_status = 'ACTIVE'
self.port_create_status = 'ACTIVE'
def _is_native_bulk_supported():
plugin_obj = manager.NeutronManager.get_plugin()
native_bulk_attr_name = ("_%s__native_bulk_support"
% plugin_obj.__class__.__name__)
return getattr(plugin_obj, native_bulk_attr_name, False)
self._skip_native_bulk = not _is_native_bulk_supported()
def _is_native_pagination_support():
native_pagination_attr_name = (
"_%s__native_pagination_support" %
manager.NeutronManager.get_plugin().__class__.__name__)
return (cfg.CONF.allow_pagination and
getattr(manager.NeutronManager.get_plugin(),
native_pagination_attr_name, False))
self._skip_native_pagination = not _is_native_pagination_support()
def _is_native_sorting_support():
native_sorting_attr_name = (
"_%s__native_sorting_support" %
manager.NeutronManager.get_plugin().__class__.__name__)
return (cfg.CONF.allow_sorting and
getattr(manager.NeutronManager.get_plugin(),
native_sorting_attr_name, False))
self.plugin = manager.NeutronManager.get_plugin()
self._skip_native_sorting = not _is_native_sorting_support()
if ext_mgr:
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
def tearDown(self):
self.api = None
self._deserializers = None
self._skip_native_bulk = None
self._skip_native_pagination = None
self._skip_native_sortin = None
self.ext_api = None
super(NeutronDbPluginV2TestCase, self).tearDown()
def setup_config(self):
# Create the default configurations
args = ['--config-file', base.etcdir('neutron.conf')]
# If test_config specifies some config-file, use it, as well
for config_file in test_lib.test_config.get('config_files', []):
args.extend(['--config-file', config_file])
super(NeutronDbPluginV2TestCase, self).setup_config(args=args)
def _req(self, method, resource, data=None, fmt=None, id=None, params=None,
action=None, subresource=None, sub_id=None, context=None):
fmt = fmt or self.fmt
path = '/%s.%s' % (
'/'.join(p for p in
(resource, id, subresource, sub_id, action) if p),
fmt
)
prefix = self.resource_prefix_map.get(resource)
if prefix:
path = prefix + path
content_type = 'application/%s' % fmt
body = None
if data is not None: # empty dict is valid
body = self.serialize(data)
return testlib_api.create_request(path, body, content_type, method,
query_string=params, context=context)
def new_create_request(self, resource, data, fmt=None, id=None,
subresource=None, context=None):
return self._req('POST', resource, data, fmt, id=id,
subresource=subresource, context=context)
def new_list_request(self, resource, fmt=None, params=None,
subresource=None):
return self._req(
'GET', resource, None, fmt, params=params, subresource=subresource
)
def new_show_request(self, resource, id, fmt=None,
subresource=None, fields=None):
if fields:
params = "&".join(["fields=%s" % x for x in fields])
else:
params = None
return self._req('GET', resource, None, fmt, id=id,
params=params, subresource=subresource)
def new_delete_request(self, resource, id, fmt=None, subresource=None,
sub_id=None):
return self._req(
'DELETE',
resource,
None,
fmt,
id=id,
subresource=subresource,
sub_id=sub_id
)
def new_update_request(self, resource, data, id, fmt=None,
subresource=None, context=None):
return self._req(
'PUT', resource, data, fmt, id=id, subresource=subresource,
context=context
)
def new_action_request(self, resource, data, id, action, fmt=None,
subresource=None):
return self._req(
'PUT',
resource,
data,
fmt,
id=id,
action=action,
subresource=subresource
)
def deserialize(self, content_type, response):
ctype = 'application/%s' % content_type
data = self._deserializers[ctype].deserialize(response.body)['body']
return data
def _create_bulk_from_list(self, fmt, resource, objects, **kwargs):
"""Creates a bulk request from a list of objects."""
collection = "%ss" % resource
req_data = {collection: objects}
req = self.new_create_request(collection, req_data, fmt)
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
elif 'context' in kwargs:
req.environ['neutron.context'] = kwargs['context']
return req.get_response(self.api)
def _create_bulk(self, fmt, number, resource, data, name='test', **kwargs):
"""Creates a bulk request for any kind of resource."""
objects = []
collection = "%ss" % resource
for i in range(number):
obj = copy.deepcopy(data)
obj[resource]['name'] = "%s_%s" % (name, i)
if 'override' in kwargs and i in kwargs['override']:
obj[resource].update(kwargs['override'][i])
objects.append(obj)
req_data = {collection: objects}
req = self.new_create_request(collection, req_data, fmt)
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
elif 'context' in kwargs:
req.environ['neutron.context'] = kwargs['context']
return req.get_response(self.api)
def _create_network(self, fmt, name, admin_state_up,
arg_list=None, **kwargs):
data = {'network': {'name': name,
'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
for arg in (('admin_state_up', 'tenant_id', 'shared',
'vlan_transparent') + (arg_list or ())):
# Arg must be present
if arg in kwargs:
data['network'][arg] = kwargs[arg]
network_req = self.new_create_request('networks', data, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
network_req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
return network_req.get_response(self.api)
def _create_network_bulk(self, fmt, number, name,
admin_state_up, **kwargs):
base_data = {'network': {'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
return self._create_bulk(fmt, number, 'network', base_data, **kwargs)
def _create_subnet(self, fmt, net_id, cidr,
expected_res_status=None, **kwargs):
data = {'subnet': {'network_id': net_id,
'cidr': cidr,
'ip_version': 4,
'tenant_id': self._tenant_id}}
for arg in ('ip_version', 'tenant_id',
'enable_dhcp', 'allocation_pools',
'dns_nameservers', 'host_routes',
'shared', 'ipv6_ra_mode', 'ipv6_address_mode'):
# Arg must be present and not null (but can be false)
if kwargs.get(arg) is not None:
data['subnet'][arg] = kwargs[arg]
if ('gateway_ip' in kwargs and
kwargs['gateway_ip'] is not attributes.ATTR_NOT_SPECIFIED):
data['subnet']['gateway_ip'] = kwargs['gateway_ip']
subnet_req = self.new_create_request('subnets', data, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
subnet_req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
subnet_res = subnet_req.get_response(self.api)
if expected_res_status:
self.assertEqual(subnet_res.status_int, expected_res_status)
return subnet_res
def _create_subnet_bulk(self, fmt, number, net_id, name,
ip_version=4, **kwargs):
base_data = {'subnet': {'network_id': net_id,
'ip_version': ip_version,
'tenant_id': self._tenant_id}}
# auto-generate cidrs as they should not overlap
overrides = dict((k, v)
for (k, v) in zip(range(number),
[{'cidr': "10.0.%s.0/24" % num}
for num in range(number)]))
kwargs.update({'override': overrides})
return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
def _create_subnetpool(self, fmt, prefixes,
expected_res_status=None, admin=False, **kwargs):
subnetpool = {'subnetpool': {'prefixes': prefixes}}
for k, v in kwargs.items():
subnetpool['subnetpool'][k] = str(v)
api = self._api_for_resource('subnetpools')
subnetpools_req = self.new_create_request('subnetpools',
subnetpool, fmt)
if not admin:
neutron_context = context.Context('', kwargs['tenant_id'])
subnetpools_req.environ['neutron.context'] = neutron_context
subnetpool_res = subnetpools_req.get_response(api)
if expected_res_status:
self.assertEqual(subnetpool_res.status_int, expected_res_status)
return subnetpool_res
def _create_port(self, fmt, net_id, expected_res_status=None,
arg_list=None, **kwargs):
data = {'port': {'network_id': net_id,
'tenant_id': self._tenant_id}}
for arg in (('admin_state_up', 'device_id',
'mac_address', 'name', 'fixed_ips',
'tenant_id', 'device_owner', 'security_groups') +
(arg_list or ())):
# Arg must be present
if arg in kwargs:
data['port'][arg] = kwargs[arg]
# create a dhcp port device id if one hasn't been supplied
if ('device_owner' in kwargs and
kwargs['device_owner'] == constants.DEVICE_OWNER_DHCP and
'host' in kwargs and
'device_id' not in kwargs):
device_id = utils.get_dhcp_agent_device_id(net_id, kwargs['host'])
data['port']['device_id'] = device_id
port_req = self.new_create_request('ports', data, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
port_req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
port_res = port_req.get_response(self.api)
if expected_res_status:
self.assertEqual(port_res.status_int, expected_res_status)
return port_res
def _list_ports(self, fmt, expected_res_status=None,
net_id=None, **kwargs):
query_params = []
if net_id:
query_params.append("network_id=%s" % net_id)
if kwargs.get('device_owner'):
query_params.append("device_owner=%s" % kwargs.get('device_owner'))
port_req = self.new_list_request('ports', fmt, '&'.join(query_params))
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
port_req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
port_res = port_req.get_response(self.api)
if expected_res_status:
self.assertEqual(port_res.status_int, expected_res_status)
return port_res
def _create_port_bulk(self, fmt, number, net_id, name,
admin_state_up, **kwargs):
base_data = {'port': {'network_id': net_id,
'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
return self._create_bulk(fmt, number, 'port', base_data, **kwargs)
def _make_network(self, fmt, name, admin_state_up, **kwargs):
res = self._create_network(fmt, name, admin_state_up, **kwargs)
# TODO(salvatore-orlando): do exception handling in this test module
# in a uniform way (we do it differently for ports, subnets, and nets
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_subnet(self, fmt, network, gateway, cidr,
allocation_pools=None, ip_version=4, enable_dhcp=True,
dns_nameservers=None, host_routes=None, shared=None,
ipv6_ra_mode=None, ipv6_address_mode=None,
tenant_id=None, set_context=False):
res = self._create_subnet(fmt,
net_id=network['network']['id'],
cidr=cidr,
gateway_ip=gateway,
tenant_id=(tenant_id or
network['network']['tenant_id']),
allocation_pools=allocation_pools,
ip_version=ip_version,
enable_dhcp=enable_dhcp,
dns_nameservers=dns_nameservers,
host_routes=host_routes,
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode,
set_context=set_context)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_subnetpool(self, fmt, prefixes, admin=False, **kwargs):
res = self._create_subnetpool(fmt,
prefixes,
None,
admin,
**kwargs)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_port(self, fmt, net_id, expected_res_status=None, **kwargs):
res = self._create_port(fmt, net_id, expected_res_status, **kwargs)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _api_for_resource(self, resource):
if resource in ['networks', 'subnets', 'ports', 'subnetpools']:
return self.api
else:
return self.ext_api
def _delete(self, collection, id,
expected_code=webob.exc.HTTPNoContent.code,
neutron_context=None):
req = self.new_delete_request(collection, id)
if neutron_context:
# create a specific auth context for this request
req.environ['neutron.context'] = neutron_context
res = req.get_response(self._api_for_resource(collection))
self.assertEqual(res.status_int, expected_code)
def _show_response(self, resource, id, neutron_context=None):
req = self.new_show_request(resource, id)
if neutron_context:
# create a specific auth context for this request
req.environ['neutron.context'] = neutron_context
return req.get_response(self._api_for_resource(resource))
def _show(self, resource, id,
expected_code=webob.exc.HTTPOk.code,
neutron_context=None):
res = self._show_response(resource, id,
neutron_context=neutron_context)
self.assertEqual(expected_code, res.status_int)
return self.deserialize(self.fmt, res)
def _update(self, resource, id, new_data,
expected_code=webob.exc.HTTPOk.code,
neutron_context=None):
req = self.new_update_request(resource, new_data, id)
if neutron_context:
# create a specific auth context for this request
req.environ['neutron.context'] = neutron_context
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, expected_code)
return self.deserialize(self.fmt, res)
def _list(self, resource, fmt=None, neutron_context=None,
query_params=None):
fmt = fmt or self.fmt
req = self.new_list_request(resource, fmt, query_params)
if neutron_context:
req.environ['neutron.context'] = neutron_context
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
return self.deserialize(fmt, res)
def _fail_second_call(self, patched_plugin, orig, *args, **kwargs):
"""Invoked by test cases for injecting failures in plugin."""
def second_call(*args, **kwargs):
raise n_exc.NeutronException()
patched_plugin.side_effect = second_call
return orig(*args, **kwargs)
def _validate_behavior_on_bulk_failure(
self, res, collection,
errcode=webob.exc.HTTPClientError.code):
self.assertEqual(res.status_int, errcode)
req = self.new_list_request(collection)
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
items = self.deserialize(self.fmt, res)
self.assertEqual(len(items[collection]), 0)
def _validate_behavior_on_bulk_success(self, res, collection,
names=['test_0', 'test_1']):
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
items = self.deserialize(self.fmt, res)[collection]
self.assertEqual(len(items), 2)
self.assertEqual(items[0]['name'], 'test_0')
self.assertEqual(items[1]['name'], 'test_1')
def _test_list_resources(self, resource, items, neutron_context=None,
query_params=None):
res = self._list('%ss' % resource,
neutron_context=neutron_context,
query_params=query_params)
resource = resource.replace('-', '_')
self.assertItemsEqual([i['id'] for i in res['%ss' % resource]],
[i[resource]['id'] for i in items])
@contextlib.contextmanager
def network(self, name='net1',
admin_state_up=True,
fmt=None,
**kwargs):
network = self._make_network(fmt or self.fmt, name,
admin_state_up, **kwargs)
yield network
@contextlib.contextmanager
def subnet(self, network=None,
gateway_ip=attributes.ATTR_NOT_SPECIFIED,
cidr='10.0.0.0/24',
fmt=None,
ip_version=4,
allocation_pools=None,
enable_dhcp=True,
dns_nameservers=None,
host_routes=None,
shared=None,
ipv6_ra_mode=None,
ipv6_address_mode=None,
tenant_id=None,
set_context=False):
with optional_ctx(network, self.network) as network_to_use:
subnet = self._make_subnet(fmt or self.fmt,
network_to_use,
gateway_ip,
cidr,
allocation_pools,
ip_version,
enable_dhcp,
dns_nameservers,
host_routes,
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode,
tenant_id=tenant_id,
set_context=set_context)
yield subnet
@contextlib.contextmanager
def subnetpool(self, prefixes, admin=False, **kwargs):
subnetpool = self._make_subnetpool(self.fmt,
prefixes,
admin,
**kwargs)
yield subnetpool
@contextlib.contextmanager
def port(self, subnet=None, fmt=None, **kwargs):
with optional_ctx(subnet, self.subnet) as subnet_to_use:
net_id = subnet_to_use['subnet']['network_id']
port = self._make_port(fmt or self.fmt, net_id, **kwargs)
yield port
def _test_list_with_sort(self, resource,
items, sorts, resources=None, query_params=''):
query_str = query_params
for key, direction in sorts:
query_str = query_str + "&sort_key=%s&sort_dir=%s" % (key,
direction)
if not resources:
resources = '%ss' % resource
req = self.new_list_request(resources,
params=query_str)
api = self._api_for_resource(resources)
res = self.deserialize(self.fmt, req.get_response(api))
resource = resource.replace('-', '_')
resources = resources.replace('-', '_')
expected_res = [item[resource]['id'] for item in items]
self.assertEqual(expected_res, [n['id'] for n in res[resources]])
def _test_list_with_pagination(self, resource, items, sort,
limit, expected_page_num,
resources=None,
query_params='',
verify_key='id'):
if not resources:
resources = '%ss' % resource
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&sort_key=%s&"
"sort_dir=%s") % (limit, sort[0], sort[1])
req = self.new_list_request(resources, params=query_str)
items_res = []
page_num = 0
api = self._api_for_resource(resources)
resource = resource.replace('-', '_')
resources = resources.replace('-', '_')
while req:
page_num = page_num + 1
res = self.deserialize(self.fmt, req.get_response(api))
self.assertThat(len(res[resources]),
matchers.LessThan(limit + 1))
items_res = items_res + res[resources]
req = None
if '%s_links' % resources in res:
for link in res['%s_links' % resources]:
if link['rel'] == 'next':
content_type = 'application/%s' % self.fmt
req = testlib_api.create_request(link['href'],
'', content_type)
self.assertEqual(len(res[resources]),
limit)
self.assertEqual(expected_page_num, page_num)
self.assertEqual([item[resource][verify_key] for item in items],
[n[verify_key] for n in items_res])
def _test_list_with_pagination_reverse(self, resource, items, sort,
limit, expected_page_num,
resources=None,
query_params=''):
if not resources:
resources = '%ss' % resource
resource = resource.replace('-', '_')
api = self._api_for_resource(resources)
marker = items[-1][resource]['id']
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&page_reverse=True&"
"sort_key=%s&sort_dir=%s&"
"marker=%s") % (limit, sort[0], sort[1],
marker)
req = self.new_list_request(resources, params=query_str)
item_res = [items[-1][resource]]
page_num = 0
resources = resources.replace('-', '_')
while req:
page_num = page_num + 1
res = self.deserialize(self.fmt, req.get_response(api))
self.assertThat(len(res[resources]),
matchers.LessThan(limit + 1))
res[resources].reverse()
item_res = item_res + res[resources]
req = None
if '%s_links' % resources in res:
for link in res['%s_links' % resources]:
if link['rel'] == 'previous':
content_type = 'application/%s' % self.fmt
req = testlib_api.create_request(link['href'],
'', content_type)
self.assertEqual(len(res[resources]),
limit)
self.assertEqual(expected_page_num, page_num)
expected_res = [item[resource]['id'] for item in items]
expected_res.reverse()
self.assertEqual(expected_res, [n['id'] for n in item_res])
def _compare_resource(self, observed_res, expected_res, res_name):
'''
Compare the observed and expected resources (ie compare subnets)
'''
for k in expected_res:
self.assertIn(k, observed_res[res_name])
if isinstance(expected_res[k], list):
self.assertEqual(sorted(observed_res[res_name][k]),
sorted(expected_res[k]))
else:
self.assertEqual(observed_res[res_name][k], expected_res[k])
def _validate_resource(self, resource, keys, res_name):
for k in keys:
self.assertIn(k, resource[res_name])
if isinstance(keys[k], list):
self.assertEqual(sorted(resource[res_name][k]),
sorted(keys[k]))
else:
self.assertEqual(resource[res_name][k], keys[k])
class TestBasicGet(NeutronDbPluginV2TestCase):
def test_single_get_admin(self):
plugin = neutron.db.db_base_plugin_v2.NeutronDbPluginV2()
with self.network() as network:
net_id = network['network']['id']
ctx = context.get_admin_context()
n = plugin._get_network(ctx, net_id)
self.assertEqual(net_id, n.id)
def test_single_get_tenant(self):
plugin = neutron.db.db_base_plugin_v2.NeutronDbPluginV2()
with self.network() as network:
net_id = network['network']['id']
ctx = context.get_admin_context()
n = plugin._get_network(ctx, net_id)
self.assertEqual(net_id, n.id)
class TestV2HTTPResponse(NeutronDbPluginV2TestCase):
def test_create_returns_201(self):
res = self._create_network(self.fmt, 'net2', True)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
def test_list_returns_200(self):
req = self.new_list_request('networks')
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
def _check_list_with_fields(self, res, field_name):
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
body = self.deserialize(self.fmt, res)
# further checks: 1 networks
self.assertEqual(len(body['networks']), 1)
# 1 field in the network record
self.assertEqual(len(body['networks'][0]), 1)
# field is 'name'
self.assertIn(field_name, body['networks'][0])
def test_list_with_fields(self):
self._create_network(self.fmt, 'some_net', True)
req = self.new_list_request('networks', params="fields=name")
res = req.get_response(self.api)
self._check_list_with_fields(res, 'name')
def test_list_with_fields_noadmin(self):
tenant_id = 'some_tenant'
self._create_network(self.fmt,
'some_net',
True,
tenant_id=tenant_id,
set_context=True)
req = self.new_list_request('networks', params="fields=name")
req.environ['neutron.context'] = context.Context('', tenant_id)
res = req.get_response(self.api)
self._check_list_with_fields(res, 'name')
def test_list_with_fields_noadmin_and_policy_field(self):
"""If a field used by policy is selected, do not duplicate it.
Verifies that if the field parameter explicitly specifies a field
which is used by the policy engine, then it is not duplicated
in the response.
"""
tenant_id = 'some_tenant'
self._create_network(self.fmt,
'some_net',
True,
tenant_id=tenant_id,
set_context=True)
req = self.new_list_request('networks', params="fields=tenant_id")
req.environ['neutron.context'] = context.Context('', tenant_id)
res = req.get_response(self.api)
self._check_list_with_fields(res, 'tenant_id')
def test_show_returns_200(self):
with self.network() as net:
req = self.new_show_request('networks', net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
def test_delete_returns_204(self):
res = self._create_network(self.fmt, 'net1', True)
net = self.deserialize(self.fmt, res)
req = self.new_delete_request('networks', net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
def test_update_returns_200(self):
with self.network() as net:
req = self.new_update_request('networks',
{'network': {'name': 'steve'}},
net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
def test_update_invalid_json_400(self):
with self.network() as net:
req = self.new_update_request('networks',
'{{"name": "aaa"}}',
net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
def test_bad_route_404(self):
req = self.new_list_request('doohickeys')
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
class TestPortsV2(NeutronDbPluginV2TestCase):
def test_create_port_json(self):
keys = [('admin_state_up', True), ('status', self.port_create_status)]
with self.port(name='myname') as port:
for k, v in keys:
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual('myname', port['port']['name'])
def test_create_port_as_admin(self):
with self.network() as network:
self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='bad_tenant_id',
device_id='fake_device',
device_owner='fake_owner',
fixed_ips=[],
set_context=False)
def test_create_port_bad_tenant(self):
with self.network() as network:
self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPNotFound.code,
tenant_id='bad_tenant_id',
device_id='fake_device',
device_owner='fake_owner',
fixed_ips=[],
set_context=True)
def test_create_port_public_network(self):
keys = [('admin_state_up', True), ('status', self.port_create_status)]
with self.network(shared=True) as network:
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize(self.fmt, port_res)
for k, v in keys:
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
self._delete('ports', port['port']['id'])
def test_create_port_public_network_with_ip(self):
with self.network(shared=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
keys = [('admin_state_up', True),
('status', self.port_create_status),
('fixed_ips', [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'}])]
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize(self.fmt, port_res)
for k, v in keys:
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
self._delete('ports', port['port']['id'])
def test_create_port_anticipating_allocation(self):
with self.network(shared=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
fixed_ips = [{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'}]
self._create_port(self.fmt, network['network']['id'],
webob.exc.HTTPCreated.code,
fixed_ips=fixed_ips)
def test_create_port_public_network_with_invalid_ip_no_subnet_id(self,
expected_error='InvalidIpForNetwork'):
with self.network(shared=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24'):
ips = [{'ip_address': '1.1.1.1'}]
res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPBadRequest.code,
fixed_ips=ips,
set_context=True)
data = self.deserialize(self.fmt, res)
msg = str(n_exc.InvalidIpForNetwork(ip_address='1.1.1.1'))
self.assertEqual(expected_error, data['NeutronError']['type'])
self.assertEqual(msg, data['NeutronError']['message'])
def test_create_port_public_network_with_invalid_ip_and_subnet_id(self,
expected_error='InvalidIpForSubnet'):
with self.network(shared=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
ips = [{'subnet_id': subnet['subnet']['id'],
'ip_address': '1.1.1.1'}]
res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPBadRequest.code,
fixed_ips=ips,
set_context=True)
data = self.deserialize(self.fmt, res)
msg = str(n_exc.InvalidIpForSubnet(ip_address='1.1.1.1'))
self.assertEqual(expected_error, data['NeutronError']['type'])
self.assertEqual(msg, data['NeutronError']['message'])
def test_create_ports_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
with self.network() as net:
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
for p in self.deserialize(self.fmt, res)['ports']:
self._delete('ports', p['id'])
def test_create_ports_bulk_emulated(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('six.moves.builtins.hasattr',
new=fakehasattr):
with self.network() as net:
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
for p in self.deserialize(self.fmt, res)['ports']:
self._delete('ports', p['id'])
def test_create_ports_bulk_wrong_input(self):
with self.network() as net:
overrides = {1: {'admin_state_up': 'doh'}}
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True,
override=overrides)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
req = self.new_list_request('ports')
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
ports = self.deserialize(self.fmt, res)
self.assertEqual(len(ports['ports']), 0)
def test_create_ports_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
#ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('six.moves.builtins.hasattr',
new=fakehasattr):
orig = manager.NeutronManager.get_plugin().create_port
method_to_patch = _get_create_db_method('port')
with mock.patch.object(manager.NeutronManager.get_plugin(),
method_to_patch) as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_port_bulk(self.fmt, 2,
net['network']['id'],
'test',
True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'ports', webob.exc.HTTPServerError.code
)
def test_create_ports_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
ctx = context.get_admin_context()
with self.network() as net:
plugin = manager.NeutronManager.get_plugin()
orig = plugin.create_port
method_to_patch = _get_create_db_method('port')
with mock.patch.object(plugin, method_to_patch) as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True, context=ctx)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'ports', webob.exc.HTTPServerError.code)
def test_list_ports(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port() as v1, self.port() as v2, self.port() as v3:
ports = (v1, v2, v3)
self._test_list_resources('port', ports)
def test_list_ports_filtered_by_fixed_ip(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port() as port1, self.port():
fixed_ips = port1['port']['fixed_ips'][0]
query_params = """
fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
""".strip() % (fixed_ips['ip_address'],
'192.168.126.5',
fixed_ips['subnet_id'])
self._test_list_resources('port', [port1],
query_params=query_params)
def test_list_ports_public_network(self):
with self.network(shared=True) as network:
with self.subnet(network) as subnet:
with self.port(subnet, tenant_id='tenant_1') as port1,\
self.port(subnet, tenant_id='tenant_2') as port2:
# Admin request - must return both ports
self._test_list_resources('port', [port1, port2])
# Tenant_1 request - must return single port
n_context = context.Context('', 'tenant_1')
self._test_list_resources('port', [port1],
neutron_context=n_context)
# Tenant_2 request - must return single port
n_context = context.Context('', 'tenant_2')
self._test_list_resources('port', [port2],
neutron_context=n_context)
def test_list_ports_with_sort_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port(admin_state_up='True',
mac_address='00:00:00:00:00:01') as port1,\
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:02') as port2,\
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_sort('port', (port3, port2, port1),
[('admin_state_up', 'asc'),
('mac_address', 'desc')])
def test_list_ports_with_sort_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_sorting_helper',
new=_fake_get_sorting_helper)
helper_patcher.start()
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port(admin_state_up='True',
mac_address='00:00:00:00:00:01') as port1,\
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:02') as port2,\
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_sort('port', (port3, port2, port1),
[('admin_state_up', 'asc'),
('mac_address', 'desc')])
def test_list_ports_with_pagination_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port(mac_address='00:00:00:00:00:01') as port1,\
self.port(mac_address='00:00:00:00:00:02') as port2,\
self.port(mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_pagination('port',
(port1, port2, port3),
('mac_address', 'asc'), 2, 2)
def test_list_ports_with_pagination_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port(mac_address='00:00:00:00:00:01') as port1,\
self.port(mac_address='00:00:00:00:00:02') as port2,\
self.port(mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_pagination('port',
(port1, port2, port3),
('mac_address', 'asc'), 2, 2)
def test_list_ports_with_pagination_reverse_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port(mac_address='00:00:00:00:00:01') as port1,\
self.port(mac_address='00:00:00:00:00:02') as port2,\
self.port(mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_pagination_reverse('port',
(port1, port2, port3),
('mac_address', 'asc'),
2, 2)
def test_list_ports_with_pagination_reverse_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port(mac_address='00:00:00:00:00:01') as port1,\
self.port(mac_address='00:00:00:00:00:02') as port2,\
self.port(mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_pagination_reverse('port',
(port1, port2, port3),
('mac_address', 'asc'),
2, 2)
def test_show_port(self):
with self.port() as port:
req = self.new_show_request('ports', port['port']['id'], self.fmt)
sport = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(port['port']['id'], sport['port']['id'])
def test_delete_port(self):
with self.port() as port:
self._delete('ports', port['port']['id'])
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
def test_delete_port_public_network(self):
with self.network(shared=True) as network:
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize(self.fmt, port_res)
self._delete('ports', port['port']['id'])
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
def test_update_port(self):
with self.port() as port:
data = {'port': {'admin_state_up': False}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
def update_port_mac(self, port, updated_fixed_ips=None):
orig_mac = port['mac_address']
mac = orig_mac.split(':')
mac[5] = '01' if mac[5] != '01' else '00'
new_mac = ':'.join(mac)
data = {'port': {'mac_address': new_mac}}
if updated_fixed_ips:
data['port']['fixed_ips'] = updated_fixed_ips
req = self.new_update_request('ports', data, port['id'])
return req.get_response(self.api), new_mac
def _check_v6_auto_address_address(self, port, subnet):
if ipv6_utils.is_auto_address_subnet(subnet['subnet']):
port_mac = port['port']['mac_address']
subnet_cidr = subnet['subnet']['cidr']
eui_addr = str(ipv6_utils.get_ipv6_addr_by_EUI64(subnet_cidr,
port_mac))
self.assertEqual(port['port']['fixed_ips'][0]['ip_address'],
eui_addr)
def check_update_port_mac(
self, expected_status=webob.exc.HTTPOk.code,
expected_error='StateInvalid', subnet=None,
device_owner=DEVICE_OWNER_COMPUTE, updated_fixed_ips=None,
host_arg={}, arg_list=[]):
with self.port(device_owner=device_owner, subnet=subnet,
arg_list=arg_list, **host_arg) as port:
self.assertIn('mac_address', port['port'])
res, new_mac = self.update_port_mac(
port['port'], updated_fixed_ips=updated_fixed_ips)
self.assertEqual(expected_status, res.status_int)
if expected_status == webob.exc.HTTPOk.code:
result = self.deserialize(self.fmt, res)
self.assertIn('port', result)
self.assertEqual(new_mac, result['port']['mac_address'])
if subnet and subnet['subnet']['ip_version'] == 6:
self._check_v6_auto_address_address(port, subnet)
else:
error = self.deserialize(self.fmt, res)
self.assertEqual(expected_error,
error['NeutronError']['type'])
def test_update_port_mac(self):
self.check_update_port_mac()
# sub-classes for plugins/drivers that support mac address update
# override this method
def test_update_port_mac_ip(self):
with self.subnet() as subnet:
updated_fixed_ips = [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.3'}]
self.check_update_port_mac(subnet=subnet,
updated_fixed_ips=updated_fixed_ips)
def test_update_port_mac_v6_slaac(self):
with self.subnet(gateway_ip='fe80::1',
cidr='2607:f0d0:1002:51::/64',
ip_version=6,
ipv6_address_mode=constants.IPV6_SLAAC) as subnet:
self.assertTrue(
ipv6_utils.is_auto_address_subnet(subnet['subnet']))
self.check_update_port_mac(subnet=subnet)
def test_update_port_mac_bad_owner(self):
self.check_update_port_mac(
device_owner=DEVICE_OWNER_NOT_COMPUTE,
expected_status=webob.exc.HTTPConflict.code,
expected_error='UnsupportedPortDeviceOwner')
def check_update_port_mac_used(self, expected_error='MacAddressInUse'):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
with self.port(subnet=subnet) as port2:
self.assertIn('mac_address', port['port'])
new_mac = port2['port']['mac_address']
data = {'port': {'mac_address': new_mac}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPConflict.code,
res.status_int)
error = self.deserialize(self.fmt, res)
self.assertEqual(expected_error,
error['NeutronError']['type'])
def test_update_port_mac_used(self):
self.check_update_port_mac_used()
def test_update_port_not_admin(self):
res = self._create_network(self.fmt, 'net1', True,
tenant_id='not_admin',
set_context=True)
net1 = self.deserialize(self.fmt, res)
res = self._create_port(self.fmt, net1['network']['id'],
tenant_id='not_admin', set_context=True)
port = self.deserialize(self.fmt, res)
data = {'port': {'admin_state_up': False}}
neutron_context = context.Context('', 'not_admin')
port = self._update('ports', port['port']['id'], data,
neutron_context=neutron_context)
self.assertEqual(port['port']['admin_state_up'], False)
def test_update_device_id_unchanged(self):
with self.port() as port:
data = {'port': {'admin_state_up': True,
'device_id': port['port']['device_id']}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'], True)
def test_update_device_id_null(self):
with self.port() as port:
data = {'port': {'device_id': None}}
req = self.new_update_request('ports', data, port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
def test_delete_network_if_port_exists(self):
with self.port() as port:
req = self.new_delete_request('networks',
port['port']['network_id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_delete_network_port_exists_owned_by_network(self):
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
network_id = network['network']['id']
self._create_port(self.fmt, network_id,
device_owner=constants.DEVICE_OWNER_DHCP)
req = self.new_delete_request('networks', network_id)
res = req.get_response(self.api)
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
def test_update_port_delete_ip(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': []}}
req = self.new_update_request('ports',
data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
self.assertEqual(res['port']['fixed_ips'],
data['port']['fixed_ips'])
def test_no_more_port_exception(self):
with self.subnet(cidr='10.0.0.0/31', enable_dhcp=False) as subnet:
id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, id)
data = self.deserialize(self.fmt, res)
msg = str(n_exc.IpAddressGenerationFailure(net_id=id))
self.assertEqual(data['NeutronError']['message'], msg)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_update_port_update_ip(self):
"""Test update of port IP.
Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10.
"""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.10')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
def test_update_port_update_ip_address_only(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"},
{'ip_address': "10.0.0.2"}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self.assertIn({'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '10.0.0.10',
'subnet_id': subnet['subnet']['id']}, ips)
def test_update_port_update_ips(self):
"""Update IP and associate new IP on port.
Check a port update with the specified subnet_id's. A IP address
will be allocated for each subnet_id.
"""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': '10.0.0.3'}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.3')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
def test_update_port_add_additional_ip(self):
"""Test update of port with additional IP."""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self.assertIn({'ip_address': '10.0.0.3',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '10.0.0.4',
'subnet_id': subnet['subnet']['id']}, ips)
def test_update_port_invalid_fixed_ip_address_v6_slaac(self):
with self.subnet(
cidr='2607:f0d0:1002:51::/64',
ip_version=6,
ipv6_address_mode=constants.IPV6_SLAAC,
gateway_ip=attributes.ATTR_NOT_SPECIFIED) as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
port_mac = port['port']['mac_address']
subnet_cidr = subnet['subnet']['cidr']
eui_addr = str(ipv6_utils.get_ipv6_addr_by_EUI64(subnet_cidr,
port_mac))
self.assertEqual(ips[0]['ip_address'], eui_addr)
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address':
'2607:f0d0:1002:51::5'}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
err = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
self.assertEqual(err['NeutronError']['type'], 'InvalidInput')
def test_requested_duplicate_mac(self):
with self.port() as port:
mac = port['port']['mac_address']
# check that MAC address matches base MAC
base_mac = cfg.CONF.base_mac[0:2]
self.assertTrue(mac.startswith(base_mac))
kwargs = {"mac_address": mac}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_mac_generation(self):
cfg.CONF.set_override('base_mac', "12:34:56:00:00:00")
with self.port() as port:
mac = port['port']['mac_address']
self.assertTrue(mac.startswith("12:34:56"))
def test_mac_generation_4octet(self):
cfg.CONF.set_override('base_mac', "12:34:56:78:00:00")
with self.port() as port:
mac = port['port']['mac_address']
self.assertTrue(mac.startswith("12:34:56:78"))
def test_bad_mac_format(self):
cfg.CONF.set_override('base_mac', "bad_mac")
try:
self.plugin._check_base_mac_format()
except Exception:
return
self.fail("No exception for illegal base_mac format")
def test_mac_exhaustion(self):
# rather than actually consuming all MAC (would take a LONG time)
# we try to allocate an already allocated mac address
cfg.CONF.set_override('mac_generation_retries', 3)
res = self._create_network(fmt=self.fmt, name='net1',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
net_id = network['network']['id']
error = n_exc.MacAddressInUse(net_id=net_id, mac='00:11:22:33:44:55')
with mock.patch.object(
neutron.db.db_base_plugin_v2.NeutronDbPluginV2,
'_create_port_with_mac', side_effect=error) as create_mock:
res = self._create_port(self.fmt, net_id=net_id)
self.assertEqual(res.status_int,
webob.exc.HTTPServiceUnavailable.code)
self.assertEqual(3, create_mock.call_count)
def test_requested_duplicate_ip(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Check configuring of duplicate IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': ips[0]['ip_address']}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPConflict.code)
def test_requested_subnet_id(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Request a IP from specific subnet
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id']}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
ips = port2['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.3')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port2['port']['id'])
def test_requested_subnet_id_not_on_network(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
# Create new network
res = self._create_network(fmt=self.fmt, name='net2',
admin_state_up=True)
network2 = self.deserialize(self.fmt, res)
subnet2 = self._make_subnet(self.fmt, network2, "1.1.1.1",
"1.1.1.0/24", ip_version=4)
net_id = port['port']['network_id']
# Request a IP from specific subnet
kwargs = {"fixed_ips": [{'subnet_id':
subnet2['subnet']['id']}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
def test_overlapping_subnets(self):
with self.subnet() as subnet:
tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
res = self._create_subnet(self.fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='10.0.0.225/28',
ip_version=4,
gateway_ip=attributes.ATTR_NOT_SPECIFIED)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
def test_requested_subnet_id_v4_and_v6(self):
with self.subnet() as subnet:
# Get a IPv4 and IPv6 address
tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
res = self._create_subnet(
self.fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='2607:f0d0:1002:51::/124',
ip_version=6,
gateway_ip=attributes.ATTR_NOT_SPECIFIED)
subnet2 = self.deserialize(self.fmt, res)
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet2['subnet']['id']}]}
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port3 = self.deserialize(self.fmt, res)
ips = port3['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self.assertIn({'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '2607:f0d0:1002:51::2',
'subnet_id': subnet2['subnet']['id']}, ips)
res = self._create_port(self.fmt, net_id=net_id)
port4 = self.deserialize(self.fmt, res)
# Check that a v4 and a v6 address are allocated
ips = port4['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self.assertIn({'ip_address': '10.0.0.3',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '2607:f0d0:1002:51::3',
'subnet_id': subnet2['subnet']['id']}, ips)
self._delete('ports', port3['port']['id'])
self._delete('ports', port4['port']['id'])
def test_requested_invalid_fixed_ip_address_v6_slaac(self):
with self.subnet(gateway_ip='fe80::1',
cidr='2607:f0d0:1002:51::/64',
ip_version=6,
ipv6_address_mode=constants.IPV6_SLAAC) as subnet:
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '2607:f0d0:1002:51::5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
@mock.patch.object(non_ipam.IpamNonPluggableBackend,
'_allocate_specific_ip')
def test_requested_fixed_ip_address_v6_slaac_router_iface(
self, alloc_specific_ip):
with self.subnet(gateway_ip='fe80::1',
cidr='fe80::/64',
ip_version=6,
ipv6_address_mode=constants.IPV6_SLAAC) as subnet:
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': 'fe80::1'}]}
net_id = subnet['subnet']['network_id']
device_owner = constants.DEVICE_OWNER_ROUTER_INTF
res = self._create_port(self.fmt, net_id=net_id,
device_owner=device_owner, **kwargs)
port = self.deserialize(self.fmt, res)
self.assertEqual(len(port['port']['fixed_ips']), 1)
self.assertEqual(port['port']['fixed_ips'][0]['ip_address'],
'fe80::1')
self.assertFalse(alloc_specific_ip.called)
def test_requested_subnet_id_v6_slaac(self):
with self.subnet(gateway_ip='fe80::1',
cidr='2607:f0d0:1002:51::/64',
ip_version=6,
ipv6_address_mode=constants.IPV6_SLAAC) as subnet:
with self.port(subnet,
fixed_ips=[{'subnet_id':
subnet['subnet']['id']}]) as port:
port_mac = port['port']['mac_address']
subnet_cidr = subnet['subnet']['cidr']
eui_addr = str(ipv6_utils.get_ipv6_addr_by_EUI64(subnet_cidr,
port_mac))
self.assertEqual(port['port']['fixed_ips'][0]['ip_address'],
eui_addr)
def test_requested_subnet_id_v4_and_v6_slaac(self):
with self.network() as network:
with self.subnet(network) as subnet,\
self.subnet(
network,
cidr='2607:f0d0:1002:51::/64',
ip_version=6,
gateway_ip='fe80::1',
ipv6_address_mode=constants.IPV6_SLAAC) as subnet2:
with self.port(
subnet,
fixed_ips=[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet2['subnet']['id']}]
) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 2)
self.assertIn({'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}, ips)
port_mac = port['port']['mac_address']
subnet_cidr = subnet2['subnet']['cidr']
eui_addr = str(ipv6_utils.get_ipv6_addr_by_EUI64(
subnet_cidr, port_mac))
self.assertIn({'ip_address': eui_addr,
'subnet_id': subnet2['subnet']['id']}, ips)
def test_create_router_port_ipv4_and_ipv6_slaac_no_fixed_ips(self):
with self.network() as network:
# Create an IPv4 and an IPv6 SLAAC subnet on the network
with self.subnet(network),\
self.subnet(network,
cidr='2607:f0d0:1002:51::/64',
ip_version=6,
gateway_ip='fe80::1',
ipv6_address_mode=constants.IPV6_SLAAC):
# Create a router port without specifying fixed_ips
port = self._make_port(
self.fmt, network['network']['id'],
device_owner=constants.DEVICE_OWNER_ROUTER_INTF)
# Router port should only have an IPv4 address
fixed_ips = port['port']['fixed_ips']
self.assertEqual(1, len(fixed_ips))
self.assertEqual('10.0.0.2', fixed_ips[0]['ip_address'])
def _make_v6_subnet(self, network, ra_addr_mode):
return (self._make_subnet(self.fmt, network, gateway='fe80::1',
cidr='fe80::/64', ip_version=6,
ipv6_ra_mode=ra_addr_mode,
ipv6_address_mode=ra_addr_mode))
@staticmethod
def _calc_ipv6_addr_by_EUI64(port, subnet):
port_mac = port['port']['mac_address']
subnet_cidr = subnet['subnet']['cidr']
return str(ipv6_utils.get_ipv6_addr_by_EUI64(subnet_cidr, port_mac))
def test_ip_allocation_for_ipv6_subnet_slaac_address_mode(self):
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
subnet = self._make_v6_subnet(network, constants.IPV6_SLAAC)
port = self._make_port(self.fmt, network['network']['id'])
self.assertEqual(1, len(port['port']['fixed_ips']))
self.assertEqual(self._calc_ipv6_addr_by_EUI64(port, subnet),
port['port']['fixed_ips'][0]['ip_address'])
def _test_create_port_with_ipv6_subnet_in_fixed_ips(self, addr_mode):
"""Test port create with an IPv6 subnet incl in fixed IPs."""
with self.network(name='net') as network:
subnet = self._make_v6_subnet(network, addr_mode)
subnet_id = subnet['subnet']['id']
fixed_ips = [{'subnet_id': subnet_id}]
with self.port(subnet=subnet, fixed_ips=fixed_ips) as port:
if addr_mode == constants.IPV6_SLAAC:
exp_ip_addr = self._calc_ipv6_addr_by_EUI64(port, subnet)
else:
exp_ip_addr = 'fe80::2'
port_fixed_ips = port['port']['fixed_ips']
self.assertEqual(1, len(port_fixed_ips))
self.assertEqual(exp_ip_addr,
port_fixed_ips[0]['ip_address'])
def test_create_port_with_ipv6_slaac_subnet_in_fixed_ips(self):
self._test_create_port_with_ipv6_subnet_in_fixed_ips(
addr_mode=constants.IPV6_SLAAC)
def test_create_port_with_ipv6_dhcp_stateful_subnet_in_fixed_ips(self):
self._test_create_port_with_ipv6_subnet_in_fixed_ips(
addr_mode=constants.DHCPV6_STATEFUL)
def test_create_port_with_multiple_ipv4_and_ipv6_subnets(self):
"""Test port create with multiple IPv4, IPv6 DHCP/SLAAC subnets."""
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
sub_dicts = [
{'gateway': '10.0.0.1', 'cidr': '10.0.0.0/24',
'ip_version': 4, 'ra_addr_mode': None},
{'gateway': '10.0.1.1', 'cidr': '10.0.1.0/24',
'ip_version': 4, 'ra_addr_mode': None},
{'gateway': 'fe80::1', 'cidr': 'fe80::/64',
'ip_version': 6, 'ra_addr_mode': constants.IPV6_SLAAC},
{'gateway': 'fe81::1', 'cidr': 'fe81::/64',
'ip_version': 6, 'ra_addr_mode': constants.IPV6_SLAAC},
{'gateway': 'fe82::1', 'cidr': 'fe82::/64',
'ip_version': 6, 'ra_addr_mode': constants.DHCPV6_STATEFUL},
{'gateway': 'fe83::1', 'cidr': 'fe83::/64',
'ip_version': 6, 'ra_addr_mode': constants.DHCPV6_STATEFUL}]
subnets = {}
for sub_dict in sub_dicts:
subnet = self._make_subnet(
self.fmt, network,
gateway=sub_dict['gateway'],
cidr=sub_dict['cidr'],
ip_version=sub_dict['ip_version'],
ipv6_ra_mode=sub_dict['ra_addr_mode'],
ipv6_address_mode=sub_dict['ra_addr_mode'])
subnets[subnet['subnet']['id']] = sub_dict
res = self._create_port(self.fmt, net_id=network['network']['id'])
port = self.deserialize(self.fmt, res)
# Since the create port request was made without a list of fixed IPs,
# the port should be associated with addresses for one of the
# IPv4 subnets, one of the DHCPv6 subnets, and both of the IPv6
# SLAAC subnets.
self.assertEqual(4, len(port['port']['fixed_ips']))
addr_mode_count = {None: 0, constants.DHCPV6_STATEFUL: 0,
constants.IPV6_SLAAC: 0}
for fixed_ip in port['port']['fixed_ips']:
subnet_id = fixed_ip['subnet_id']
if subnet_id in subnets:
addr_mode_count[subnets[subnet_id]['ra_addr_mode']] += 1
self.assertEqual(1, addr_mode_count[None])
self.assertEqual(1, addr_mode_count[constants.DHCPV6_STATEFUL])
self.assertEqual(2, addr_mode_count[constants.IPV6_SLAAC])
def test_delete_port_with_ipv6_slaac_address(self):
"""Test that a port with an IPv6 SLAAC address can be deleted."""
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
# Create a port that has an associated IPv6 SLAAC address
self._make_v6_subnet(network, constants.IPV6_SLAAC)
res = self._create_port(self.fmt, net_id=network['network']['id'])
port = self.deserialize(self.fmt, res)
self.assertEqual(1, len(port['port']['fixed_ips']))
# Confirm that the port can be deleted
self._delete('ports', port['port']['id'])
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
def test_update_port_with_ipv6_slaac_subnet_in_fixed_ips(self):
"""Test port update with an IPv6 SLAAC subnet in fixed IPs."""
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
# Create a port using an IPv4 subnet and an IPv6 SLAAC subnet
self._make_subnet(self.fmt, network, gateway='10.0.0.1',
cidr='10.0.0.0/24', ip_version=4)
subnet_v6 = self._make_v6_subnet(network, constants.IPV6_SLAAC)
res = self._create_port(self.fmt, net_id=network['network']['id'])
port = self.deserialize(self.fmt, res)
self.assertEqual(2, len(port['port']['fixed_ips']))
# Update port including only the IPv6 SLAAC subnet
data = {'port': {'fixed_ips': [{'subnet_id':
subnet_v6['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
# Port should only have an address corresponding to IPv6 SLAAC subnet
ips = res['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(self._calc_ipv6_addr_by_EUI64(port, subnet_v6),
ips[0]['ip_address'])
def test_update_port_excluding_ipv6_slaac_subnet_from_fixed_ips(self):
"""Test port update excluding IPv6 SLAAC subnet from fixed ips."""
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
# Create a port using an IPv4 subnet and an IPv6 SLAAC subnet
subnet_v4 = self._make_subnet(self.fmt, network, gateway='10.0.0.1',
cidr='10.0.0.0/24', ip_version=4)
subnet_v6 = self._make_v6_subnet(network, constants.IPV6_SLAAC)
res = self._create_port(self.fmt, net_id=network['network']['id'])
port = self.deserialize(self.fmt, res)
self.assertEqual(2, len(port['port']['fixed_ips']))
# Update port including only the IPv4 subnet
data = {'port': {'fixed_ips': [{'subnet_id':
subnet_v4['subnet']['id'],
'ip_address': "10.0.0.10"}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
# Port should still have an addr corresponding to IPv6 SLAAC subnet
ips = res['port']['fixed_ips']
self.assertEqual(2, len(ips))
eui_addr = self._calc_ipv6_addr_by_EUI64(port, subnet_v6)
expected_v6_ip = {'subnet_id': subnet_v6['subnet']['id'],
'ip_address': eui_addr}
self.assertIn(expected_v6_ip, ips)
def test_ip_allocation_for_ipv6_2_subnet_slaac_mode(self):
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
v6_subnet_1 = self._make_subnet(self.fmt, network,
gateway='2001:100::1',
cidr='2001:100::0/64',
ip_version=6,
ipv6_ra_mode=constants.IPV6_SLAAC)
v6_subnet_2 = self._make_subnet(self.fmt, network,
gateway='2001:200::1',
cidr='2001:200::0/64',
ip_version=6,
ipv6_ra_mode=constants.IPV6_SLAAC)
port = self._make_port(self.fmt, network['network']['id'])
port_mac = port['port']['mac_address']
cidr_1 = v6_subnet_1['subnet']['cidr']
cidr_2 = v6_subnet_2['subnet']['cidr']
eui_addr_1 = str(ipv6_utils.get_ipv6_addr_by_EUI64(cidr_1,
port_mac))
eui_addr_2 = str(ipv6_utils.get_ipv6_addr_by_EUI64(cidr_2,
port_mac))
self.assertEqual({eui_addr_1, eui_addr_2},
{fixed_ip['ip_address'] for fixed_ip in
port['port']['fixed_ips']})
def test_range_allocation(self):
with self.subnet(gateway_ip='10.0.0.3',
cidr='10.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 5)
alloc = ['10.0.0.1', '10.0.0.2', '10.0.0.4', '10.0.0.5',
'10.0.0.6']
for ip in ips:
self.assertIn(ip['ip_address'], alloc)
self.assertEqual(ip['subnet_id'],
subnet['subnet']['id'])
alloc.remove(ip['ip_address'])
self.assertEqual(len(alloc), 0)
self._delete('ports', port['port']['id'])
with self.subnet(gateway_ip='11.0.0.6',
cidr='11.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id']}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port = self.deserialize(self.fmt, res)
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 5)
alloc = ['11.0.0.1', '11.0.0.2', '11.0.0.3', '11.0.0.4',
'11.0.0.5']
for ip in ips:
self.assertIn(ip['ip_address'], alloc)
self.assertEqual(ip['subnet_id'],
subnet['subnet']['id'])
alloc.remove(ip['ip_address'])
self.assertEqual(len(alloc), 0)
self._delete('ports', port['port']['id'])
def test_requested_invalid_fixed_ips(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Test invalid subnet_id
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id':
'00000000-ffff-ffff-ffff-000000000000'}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPNotFound.code)
# Test invalid IP address on specified subnet_id
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id'],
'ip_address': '1.1.1.1'}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
# Test invalid addresses - IP's not on subnet or network
# address or broadcast address
bad_ips = ['1.1.1.1', '10.0.0.0', '10.0.0.255']
net_id = port['port']['network_id']
for ip in bad_ips:
kwargs = {"fixed_ips": [{'ip_address': ip}]}
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
# Enable allocation of gateway address
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.1'}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
ips = port2['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.1')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port2['port']['id'])
def test_invalid_ip(self):
with self.subnet() as subnet:
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '1011.0.0.5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, webob.exc.HTTPClientError.code)
def test_requested_split(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ports_to_delete = []
ips = port['port']['fixed_ips']
self.assertEqual(len(ips), 1)
self.assertEqual(ips[0]['ip_address'], '10.0.0.2')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.5'}]}
net_id = port['port']['network_id