OpenStack Networking (Neutron)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

6746 lines
329 KiB

# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import copy
import functools
import itertools
import eventlet
import mock
import netaddr
from neutron_lib.callbacks import exceptions
from neutron_lib.callbacks import registry
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.db import api as lib_db_api
from neutron_lib import exceptions as lib_exc
from neutron_lib import fixture
from neutron_lib.plugins import directory
from neutron_lib.utils import helpers
from neutron_lib.utils import net
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_utils import importutils
from oslo_utils import netutils
from oslo_utils import uuidutils
from sqlalchemy import orm
import testtools
from testtools import matchers
import webob.exc
import neutron
from neutron.api import api_common
from neutron.api import extensions
from neutron.api.v2 import router
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.common import test_lib
from neutron.common import utils
from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.db import ipam_backend_mixin
from neutron.db.models import l3 as l3_models
from neutron.db.models import securitygroup as sg_models
from neutron.db import models_v2
from neutron.db import rbac_db_models
from neutron.db import standard_attr
from neutron.ipam.drivers.neutrondb_ipam import driver as ipam_driver
from neutron.ipam import exceptions as ipam_exc
from neutron.objects import router as l3_obj
from neutron.tests import base
from neutron.tests import tools
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit import testlib_api
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
DEVICE_OWNER_COMPUTE = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
DEVICE_OWNER_NOT_COMPUTE = constants.DEVICE_OWNER_DHCP
TEST_TENANT_ID = '46f70361-ba71-4bd0-9769-3573fd227c4b'
def optional_ctx(obj, fallback, **kwargs):
if not obj:
return fallback(**kwargs)
@contextlib.contextmanager
def context_wrapper():
yield obj
return context_wrapper()
def _fake_get_pagination_helper(self, request):
return api_common.PaginationEmulatedHelper(request, self._primary_key)
def _fake_get_sorting_helper(self, request):
return api_common.SortingEmulatedHelper(request, self._attr_info)
# TODO(banix): Move the following method to ML2 db test module when ML2
# mechanism driver unit tests are corrected to use Ml2PluginV2TestCase
# instead of directly using NeutronDbPluginV2TestCase
def _get_create_db_method(resource):
ml2_method = '_create_%s_db' % resource
if hasattr(directory.get_plugin(), ml2_method):
return ml2_method
else:
return 'create_%s' % resource
class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
fmt = 'json'
resource_prefix_map = {}
block_dhcp_notifier = True
def setUp(self, plugin=None, service_plugins=None,
ext_mgr=None):
super(NeutronDbPluginV2TestCase, self).setUp()
cfg.CONF.set_override('notify_nova_on_port_status_changes', False)
cfg.CONF.set_override('allow_overlapping_ips', True)
# Make sure at each test according extensions for the plugin is loaded
extensions.PluginAwareExtensionManager._instance = None
# Save the attributes map in case the plugin will alter it
# loading extensions
self.useFixture(fixture.APIDefinitionFixture())
self._tenant_id = TEST_TENANT_ID
if not plugin:
plugin = DB_PLUGIN_KLASS
if self.block_dhcp_notifier:
mock.patch('neutron.api.rpc.agentnotifiers.dhcp_rpc_agent_api.'
'DhcpAgentNotifyAPI').start()
# Update the plugin
self.setup_coreplugin(plugin, load_plugins=False)
cfg.CONF.set_override(
'service_plugins',
[test_lib.test_config.get(key, default)
for key, default in (service_plugins or {}).items()]
)
cfg.CONF.set_override('base_mac', "12:34:56:78:00:00")
cfg.CONF.set_override('max_dns_nameservers', 2)
cfg.CONF.set_override('max_subnet_host_routes', 2)
self.api = router.APIRouter()
# Set the default status
self.net_create_status = 'ACTIVE'
self.port_create_status = 'ACTIVE'
def _is_native_bulk_supported():
plugin_obj = directory.get_plugin()
native_bulk_attr_name = ("_%s__native_bulk_support"
% plugin_obj.__class__.__name__)
return getattr(plugin_obj, native_bulk_attr_name, False)
self._skip_native_bulk = not _is_native_bulk_supported()
def _is_native_pagination_support():
native_pagination_attr_name = (
"_%s__native_pagination_support" %
directory.get_plugin().__class__.__name__)
return getattr(directory.get_plugin(),
native_pagination_attr_name, False)
self._skip_native_pagination = not _is_native_pagination_support()
def _is_native_sorting_support():
native_sorting_attr_name = (
"_%s__native_sorting_support" %
directory.get_plugin().__class__.__name__)
return getattr(directory.get_plugin(),
native_sorting_attr_name, False)
self.plugin = directory.get_plugin()
self._skip_native_sorting = not _is_native_sorting_support()
if ext_mgr:
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
def setup_config(self):
# Create the default configurations
args = ['--config-file', base.etcdir('neutron.conf')]
# If test_config specifies some config-file, use it, as well
for config_file in test_lib.test_config.get('config_files', []):
args.extend(['--config-file', config_file])
super(NeutronDbPluginV2TestCase, self).setup_config(args=args)
def _req(self, method, resource, data=None, fmt=None, id=None, params=None,
action=None, subresource=None, sub_id=None, context=None,
headers=None):
fmt = fmt or self.fmt
path = '/%s.%s' % (
'/'.join(p for p in
(resource, id, subresource, sub_id, action) if p),
fmt
)
prefix = self.resource_prefix_map.get(resource)
if prefix:
path = prefix + path
content_type = 'application/%s' % fmt
body = None
if data is not None: # empty dict is valid
body = self.serialize(data)
return testlib_api.create_request(path, body, content_type, method,
query_string=params, context=context,
headers=headers)
def new_create_request(self, resource, data, fmt=None, id=None,
subresource=None, context=None):
return self._req('POST', resource, data, fmt, id=id,
subresource=subresource, context=context)
def new_list_request(self, resource, fmt=None, params=None,
subresource=None):
return self._req(
'GET', resource, None, fmt, params=params, subresource=subresource
)
def new_show_request(self, resource, id, fmt=None,
subresource=None, fields=None, sub_id=None):
if fields:
params = "&".join(["fields=%s" % x for x in fields])
else:
params = None
return self._req('GET', resource, None, fmt, id=id,
params=params, subresource=subresource, sub_id=sub_id)
def new_delete_request(self, resource, id, fmt=None, subresource=None,
sub_id=None, data=None, headers=None):
return self._req(
'DELETE',
resource,
data,
fmt,
id=id,
subresource=subresource,
sub_id=sub_id,
headers=headers
)
def new_update_request(self, resource, data, id, fmt=None,
subresource=None, context=None, sub_id=None,
headers=None):
return self._req(
'PUT', resource, data, fmt, id=id, subresource=subresource,
sub_id=sub_id, context=context, headers=headers
)
def new_action_request(self, resource, data, id, action, fmt=None,
subresource=None, sub_id=None):
return self._req(
'PUT',
resource,
data,
fmt,
id=id,
action=action,
subresource=subresource,
sub_id=sub_id
)
def deserialize(self, content_type, response):
ctype = 'application/%s' % content_type
data = self._deserializers[ctype].deserialize(response.body)['body']
return data
def _create_bulk_from_list(self, fmt, resource, objects, **kwargs):
"""Creates a bulk request from a list of objects."""
collection = "%ss" % resource
req_data = {collection: objects}
req = self.new_create_request(collection, req_data, fmt)
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
elif 'context' in kwargs:
req.environ['neutron.context'] = kwargs['context']
return req.get_response(self.api)
def _create_bulk(self, fmt, number, resource, data, name='test', **kwargs):
"""Creates a bulk request for any kind of resource."""
objects = []
collection = "%ss" % resource
for i in range(number):
obj = copy.deepcopy(data)
obj[resource]['name'] = "%s_%s" % (name, i)
if 'override' in kwargs and i in kwargs['override']:
obj[resource].update(kwargs['override'][i])
objects.append(obj)
req_data = {collection: objects}
req = self.new_create_request(collection, req_data, fmt)
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
elif 'context' in kwargs:
req.environ['neutron.context'] = kwargs['context']
return req.get_response(self.api)
def _create_network(self, fmt, name, admin_state_up,
arg_list=None, set_context=False, tenant_id=None,
**kwargs):
tenant_id = tenant_id or self._tenant_id
data = {'network': {'name': name,
'admin_state_up': admin_state_up,
'tenant_id': tenant_id}}
for arg in (('admin_state_up', 'tenant_id', 'shared',
'vlan_transparent',
'availability_zone_hints') + (arg_list or ())):
# Arg must be present
if arg in kwargs:
data['network'][arg] = kwargs[arg]
network_req = self.new_create_request('networks', data, fmt)
if set_context and tenant_id:
# create a specific auth context for this request
network_req.environ['neutron.context'] = context.Context(
'', tenant_id)
return network_req.get_response(self.api)
def _create_network_bulk(self, fmt, number, name,
admin_state_up, **kwargs):
base_data = {'network': {'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
return self._create_bulk(fmt, number, 'network', base_data, **kwargs)
def _create_subnet(self, fmt, net_id, cidr,
expected_res_status=None, **kwargs):
data = {'subnet': {'network_id': net_id,
'ip_version': 4,
'tenant_id': self._tenant_id}}
if cidr:
data['subnet']['cidr'] = cidr
for arg in ('ip_version', 'tenant_id', 'subnetpool_id', 'prefixlen',
'enable_dhcp', 'allocation_pools', 'segment_id',
'dns_nameservers', 'host_routes',
'shared', 'ipv6_ra_mode', 'ipv6_address_mode',
'service_types'):
# Arg must be present and not null (but can be false)
if kwargs.get(arg) is not None:
data['subnet'][arg] = kwargs[arg]
if ('gateway_ip' in kwargs and
kwargs['gateway_ip'] is not constants.ATTR_NOT_SPECIFIED):
data['subnet']['gateway_ip'] = kwargs['gateway_ip']
subnet_req = self.new_create_request('subnets', data, fmt)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
subnet_req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
subnet_res = subnet_req.get_response(self.api)
if expected_res_status:
self.assertEqual(expected_res_status, subnet_res.status_int)
return subnet_res
def _create_subnet_bulk(self, fmt, number, net_id, name,
ip_version=4, **kwargs):
base_data = {'subnet': {'network_id': net_id,
'ip_version': ip_version,
'tenant_id': self._tenant_id}}
# auto-generate cidrs as they should not overlap
overrides = dict((k, v)
for (k, v) in zip(range(number),
[{'cidr': "10.0.%s.0/24" % num}
for num in range(number)]))
kwargs.update({'override': overrides})
return self._create_bulk(fmt, number, 'subnet', base_data, **kwargs)
def _create_subnetpool(self, fmt, prefixes,
expected_res_status=None, admin=False, **kwargs):
subnetpool = {'subnetpool': {'prefixes': prefixes}}
for k, v in kwargs.items():
subnetpool['subnetpool'][k] = str(v)
api = self._api_for_resource('subnetpools')
subnetpools_req = self.new_create_request('subnetpools',
subnetpool, fmt)
if not admin:
neutron_context = context.Context('', kwargs['tenant_id'])
subnetpools_req.environ['neutron.context'] = neutron_context
subnetpool_res = subnetpools_req.get_response(api)
if expected_res_status:
self.assertEqual(expected_res_status, subnetpool_res.status_int)
return subnetpool_res
def _create_port(self, fmt, net_id, expected_res_status=None,
arg_list=None, set_context=False, tenant_id=None,
**kwargs):
tenant_id = tenant_id or self._tenant_id
data = {'port': {'network_id': net_id,
'tenant_id': tenant_id}}
for arg in (('admin_state_up', 'device_id',
'mac_address', 'name', 'fixed_ips',
'tenant_id', 'device_owner', 'security_groups') +
(arg_list or ())):
# Arg must be present
if arg in kwargs:
data['port'][arg] = kwargs[arg]
# create a dhcp port device id if one hasn't been supplied
if ('device_owner' in kwargs and
kwargs['device_owner'] == constants.DEVICE_OWNER_DHCP and
'host' in kwargs and
'device_id' not in kwargs):
device_id = utils.get_dhcp_agent_device_id(net_id, kwargs['host'])
data['port']['device_id'] = device_id
port_req = self.new_create_request('ports', data, fmt)
if set_context and tenant_id:
# create a specific auth context for this request
port_req.environ['neutron.context'] = context.Context(
'', tenant_id)
port_res = port_req.get_response(self.api)
if expected_res_status:
self.assertEqual(expected_res_status, port_res.status_int)
return port_res
def _list_ports(self, fmt, expected_res_status=None,
net_id=None, **kwargs):
query_params = []
if net_id:
query_params.append("network_id=%s" % net_id)
if kwargs.get('device_owner'):
query_params.append("device_owner=%s" % kwargs.get('device_owner'))
port_req = self.new_list_request('ports', fmt, '&'.join(query_params))
if ('set_context' in kwargs and
kwargs['set_context'] is True and
'tenant_id' in kwargs):
# create a specific auth context for this request
port_req.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
port_res = port_req.get_response(self.api)
if expected_res_status:
self.assertEqual(expected_res_status, port_res.status_int)
return port_res
def _create_port_bulk(self, fmt, number, net_id, name,
admin_state_up, **kwargs):
base_data = {'port': {'network_id': net_id,
'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
return self._create_bulk(fmt, number, 'port', base_data, **kwargs)
def _make_network(self, fmt, name, admin_state_up, **kwargs):
res = self._create_network(fmt, name, admin_state_up, **kwargs)
# TODO(salvatore-orlando): do exception handling in this test module
# in a uniform way (we do it differently for ports, subnets, and nets
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_subnet(self, fmt, network, gateway, cidr, subnetpool_id=None,
allocation_pools=None, ip_version=4, enable_dhcp=True,
dns_nameservers=None, host_routes=None, shared=None,
ipv6_ra_mode=None, ipv6_address_mode=None,
tenant_id=None, set_context=False, segment_id=None):
res = self._create_subnet(fmt,
net_id=network['network']['id'],
cidr=cidr,
subnetpool_id=subnetpool_id,
segment_id=segment_id,
gateway_ip=gateway,
tenant_id=(tenant_id or
network['network']['tenant_id']),
allocation_pools=allocation_pools,
ip_version=ip_version,
enable_dhcp=enable_dhcp,
dns_nameservers=dns_nameservers,
host_routes=host_routes,
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode,
set_context=set_context)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_v6_subnet(self, network, ra_addr_mode, ipv6_pd=False):
cidr = 'fe80::/64'
gateway = 'fe80::1'
subnetpool_id = None
if ipv6_pd:
cidr = None
gateway = None
subnetpool_id = constants.IPV6_PD_POOL_ID
cfg.CONF.set_override('ipv6_pd_enabled', True)
return (self._make_subnet(self.fmt, network, gateway=gateway,
subnetpool_id=subnetpool_id,
cidr=cidr, ip_version=6,
ipv6_ra_mode=ra_addr_mode,
ipv6_address_mode=ra_addr_mode))
def _make_subnetpool(self, fmt, prefixes, admin=False, **kwargs):
res = self._create_subnetpool(fmt,
prefixes,
None,
admin,
**kwargs)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_port(self, fmt, net_id, expected_res_status=None, **kwargs):
res = self._create_port(fmt, net_id, expected_res_status, **kwargs)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _api_for_resource(self, resource):
if resource in ['networks', 'subnets', 'ports', 'subnetpools']:
return self.api
else:
return self.ext_api
def _delete(self, collection, id,
expected_code=webob.exc.HTTPNoContent.code,
neutron_context=None, headers=None):
req = self.new_delete_request(collection, id, headers=headers)
if neutron_context:
# create a specific auth context for this request
req.environ['neutron.context'] = neutron_context
res = req.get_response(self._api_for_resource(collection))
self.assertEqual(expected_code, res.status_int)
def _show_response(self, resource, id, neutron_context=None):
req = self.new_show_request(resource, id)
if neutron_context:
# create a specific auth context for this request
req.environ['neutron.context'] = neutron_context
return req.get_response(self._api_for_resource(resource))
def _show(self, resource, id,
expected_code=webob.exc.HTTPOk.code,
neutron_context=None):
res = self._show_response(resource, id,
neutron_context=neutron_context)
self.assertEqual(expected_code, res.status_int)
return self.deserialize(self.fmt, res)
def _update(self, resource, id, new_data,
expected_code=webob.exc.HTTPOk.code,
neutron_context=None, headers=None):
req = self.new_update_request(resource, new_data, id, headers=headers)
if neutron_context:
# create a specific auth context for this request
req.environ['neutron.context'] = neutron_context
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(expected_code, res.status_int)
return self.deserialize(self.fmt, res)
def _list(self, resource, fmt=None, neutron_context=None,
query_params=None):
fmt = fmt or self.fmt
req = self.new_list_request(resource, fmt, query_params)
if neutron_context:
req.environ['neutron.context'] = neutron_context
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(webob.exc.HTTPOk.code, res.status_int)
return self.deserialize(fmt, res)
def _fail_second_call(self, patched_plugin, orig, *args, **kwargs):
"""Invoked by test cases for injecting failures in plugin."""
def second_call(*args, **kwargs):
raise lib_exc.NeutronException()
patched_plugin.side_effect = second_call
return orig(*args, **kwargs)
def _validate_behavior_on_bulk_failure(
self, res, collection,
errcode=webob.exc.HTTPClientError.code):
self.assertEqual(errcode, res.status_int)
req = self.new_list_request(collection)
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPOk.code, res.status_int)
items = self.deserialize(self.fmt, res)
self.assertEqual(0, len(items[collection]))
def _validate_behavior_on_bulk_success(self, res, collection,
names=['test_0', 'test_1']):
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
items = self.deserialize(self.fmt, res)[collection]
self.assertEqual(len(items), 2)
self.assertEqual(items[0]['name'], 'test_0')
self.assertEqual(items[1]['name'], 'test_1')
def _test_list_resources(self, resource, items, neutron_context=None,
query_params=None):
res = self._list('%ss' % resource,
neutron_context=neutron_context,
query_params=query_params)
resource = resource.replace('-', '_')
self.assertItemsEqual([i['id'] for i in res['%ss' % resource]],
[i[resource]['id'] for i in items])
@contextlib.contextmanager
def network(self, name='net1',
admin_state_up=True,
fmt=None,
**kwargs):
network = self._make_network(fmt or self.fmt, name,
admin_state_up, **kwargs)
yield network
@contextlib.contextmanager
def subnet(self, network=None,
gateway_ip=constants.ATTR_NOT_SPECIFIED,
cidr='10.0.0.0/24',
subnetpool_id=None,
segment_id=None,
fmt=None,
ip_version=4,
allocation_pools=None,
enable_dhcp=True,
dns_nameservers=None,
host_routes=None,
shared=None,
ipv6_ra_mode=None,
ipv6_address_mode=None,
tenant_id=None,
service_types=None,
set_context=False):
cidr = netaddr.IPNetwork(cidr) if cidr else None
if (gateway_ip is not None and
gateway_ip != constants.ATTR_NOT_SPECIFIED):
gateway_ip = netaddr.IPAddress(gateway_ip)
with optional_ctx(network, self.network,
set_context=set_context,
tenant_id=tenant_id) as network_to_use:
subnet = self._make_subnet(fmt or self.fmt,
network_to_use,
gateway_ip,
cidr,
subnetpool_id,
allocation_pools,
ip_version,
enable_dhcp,
dns_nameservers,
host_routes,
segment_id=segment_id,
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode,
tenant_id=tenant_id,
set_context=set_context)
yield subnet
@contextlib.contextmanager
def subnetpool(self, prefixes, admin=False, **kwargs):
subnetpool = self._make_subnetpool(self.fmt,
prefixes,
admin,
**kwargs)
yield subnetpool
@contextlib.contextmanager
def port(self, subnet=None, fmt=None, set_context=False, tenant_id=None,
**kwargs):
with optional_ctx(
subnet, self.subnet,
set_context=set_context, tenant_id=tenant_id) as subnet_to_use:
net_id = subnet_to_use['subnet']['network_id']
port = self._make_port(
fmt or self.fmt, net_id,
set_context=set_context, tenant_id=tenant_id,
**kwargs)
yield port
def _test_list_with_sort(self, resource,
items, sorts, resources=None, query_params=''):
query_str = query_params
for key, direction in sorts:
query_str = query_str + "&sort_key=%s&sort_dir=%s" % (key,
direction)
if not resources:
resources = '%ss' % resource
req = self.new_list_request(resources,
params=query_str)
api = self._api_for_resource(resources)
res = self.deserialize(self.fmt, req.get_response(api))
resource = resource.replace('-', '_')
resources = resources.replace('-', '_')
expected_res = [item[resource]['id'] for item in items]
self.assertEqual(expected_res, [n['id'] for n in res[resources]])
def _test_list_with_pagination(self, resource, items, sort,
limit, expected_page_num,
resources=None,
query_params='',
verify_key='id'):
if not resources:
resources = '%ss' % resource
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&sort_key=%s&"
"sort_dir=%s") % (limit, sort[0], sort[1])
req = self.new_list_request(resources, params=query_str)
items_res = []
page_num = 0
api = self._api_for_resource(resources)
resource = resource.replace('-', '_')
resources = resources.replace('-', '_')
while req:
page_num = page_num + 1
res = self.deserialize(self.fmt, req.get_response(api))
self.assertThat(len(res[resources]),
matchers.LessThan(limit + 1))
items_res = items_res + res[resources]
req = None
if '%s_links' % resources in res:
for link in res['%s_links' % resources]:
if link['rel'] == 'next':
content_type = 'application/%s' % self.fmt
req = testlib_api.create_request(link['href'],
'', content_type)
self.assertEqual(len(res[resources]),
limit)
self.assertEqual(expected_page_num, page_num)
self.assertEqual([item[resource][verify_key] for item in items],
[n[verify_key] for n in items_res])
def _test_list_with_pagination_reverse(self, resource, items, sort,
limit, expected_page_num,
resources=None,
query_params=''):
if not resources:
resources = '%ss' % resource
resource = resource.replace('-', '_')
api = self._api_for_resource(resources)
marker = items[-1][resource]['id']
query_str = query_params + '&' if query_params else ''
query_str = query_str + ("limit=%s&page_reverse=True&"
"sort_key=%s&sort_dir=%s&"
"marker=%s") % (limit, sort[0], sort[1],
marker)
req = self.new_list_request(resources, params=query_str)
item_res = [items[-1][resource]]
page_num = 0
resources = resources.replace('-', '_')
while req:
page_num = page_num + 1
res = self.deserialize(self.fmt, req.get_response(api))
self.assertThat(len(res[resources]),
matchers.LessThan(limit + 1))
res[resources].reverse()
item_res = item_res + res[resources]
req = None
if '%s_links' % resources in res:
for link in res['%s_links' % resources]:
if link['rel'] == 'previous':
content_type = 'application/%s' % self.fmt
req = testlib_api.create_request(link['href'],
'', content_type)
self.assertEqual(len(res[resources]),
limit)
self.assertEqual(expected_page_num, page_num)
expected_res = [item[resource]['id'] for item in items]
expected_res.reverse()
self.assertEqual(expected_res, [n['id'] for n in item_res])
def _compare_resource(self, observed_res, expected_res, res_name):
'''
Compare the observed and expected resources (ie compare subnets)
'''
for k in expected_res:
self.assertIn(k, observed_res[res_name])
if isinstance(expected_res[k], list):
self.assertEqual(sorted(expected_res[k]),
sorted(observed_res[res_name][k]))
else:
self.assertEqual(expected_res[k], observed_res[res_name][k])
def _validate_resource(self, resource, keys, res_name):
for k in keys:
self.assertIn(k, resource[res_name])
if isinstance(keys[k], list):
self.assertEqual(
sorted(keys[k], key=helpers.safe_sort_key),
sorted(resource[res_name][k], key=helpers.safe_sort_key))
else:
self.assertEqual(keys[k], resource[res_name][k])
class TestBasicGet(NeutronDbPluginV2TestCase):
def test_single_get_admin(self):
plugin = neutron.db.db_base_plugin_v2.NeutronDbPluginV2()
with self.network() as network:
net_id = network['network']['id']
ctx = context.get_admin_context()
n = plugin._get_network(ctx, net_id)
self.assertEqual(net_id, n.id)
def test_single_get_tenant(self):
plugin = neutron.db.db_base_plugin_v2.NeutronDbPluginV2()
with self.network() as network:
net_id = network['network']['id']
ctx = context.get_admin_context()
n = plugin._get_network(ctx, net_id)
self.assertEqual(net_id, n.id)
class TestV2HTTPResponse(NeutronDbPluginV2TestCase):
def test_create_returns_201(self):
res = self._create_network(self.fmt, 'net2', True)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_list_returns_200(self):
req = self.new_list_request('networks')
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPOk.code, res.status_int)
def _check_list_with_fields(self, res, field_name):
self.assertEqual(webob.exc.HTTPOk.code, res.status_int)
body = self.deserialize(self.fmt, res)
# further checks: 1 networks
self.assertEqual(1, len(body['networks']))
# 1 field in the network record
self.assertEqual(1, len(body['networks'][0]))
# field is 'name'
self.assertIn(field_name, body['networks'][0])
def test_list_with_fields(self):
self._create_network(self.fmt, 'some_net', True)
req = self.new_list_request('networks', params="fields=name")
res = req.get_response(self.api)
self._check_list_with_fields(res, 'name')
def test_list_with_fields_noadmin(self):
tenant_id = 'some_tenant'
self._create_network(self.fmt,
'some_net',
True,
tenant_id=tenant_id,
set_context=True)
req = self.new_list_request('networks', params="fields=name")
req.environ['neutron.context'] = context.Context('', tenant_id)
res = req.get_response(self.api)
self._check_list_with_fields(res, 'name')
def test_list_with_fields_noadmin_and_policy_field(self):
"""If a field used by policy is selected, do not duplicate it.
Verifies that if the field parameter explicitly specifies a field
which is used by the policy engine, then it is not duplicated
in the response.
"""
tenant_id = 'some_tenant'
self._create_network(self.fmt,
'some_net',
True,
tenant_id=tenant_id,
set_context=True)
req = self.new_list_request('networks', params="fields=tenant_id")
req.environ['neutron.context'] = context.Context('', tenant_id)
res = req.get_response(self.api)
self._check_list_with_fields(res, 'tenant_id')
def test_show_returns_200(self):
with self.network() as net:
req = self.new_show_request('networks', net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPOk.code, res.status_int)
def test_delete_returns_204(self):
res = self._create_network(self.fmt, 'net1', True)
net = self.deserialize(self.fmt, res)
req = self.new_delete_request('networks', net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
def test_delete_with_req_body_returns_400(self):
res = self._create_network(self.fmt, 'net1', True)
net = self.deserialize(self.fmt, res)
data = {"network": {"id": net['network']['id']}}
req = self.new_delete_request('networks', net['network']['id'],
data=data)
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_update_returns_200(self):
with self.network() as net:
req = self.new_update_request('networks',
{'network': {'name': 'steve'}},
net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPOk.code, res.status_int)
def test_update_invalid_json_400(self):
with self.network() as net:
req = self.new_update_request('networks',
'{{"name": "aaa"}}',
net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPClientError.code, res.status_int)
def test_bad_route_404(self):
req = self.new_list_request('doohickeys')
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
class TestPortsV2(NeutronDbPluginV2TestCase):
def test_create_port_json(self):
keys = [('admin_state_up', True), ('status', self.port_create_status)]
with self.network(shared=True) as network:
with self.subnet(network=network) as subnet:
with self.port(name='myname') as port:
for k, v in keys:
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
ips = port['port']['fixed_ips']
subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
self.assertEqual(1, len(ips))
self.assertIn(netaddr.IPAddress(ips[0]['ip_address']),
subnet_ip_net)
self.assertEqual('myname', port['port']['name'])
def test_create_port_as_admin(self):
with self.network() as network:
self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='bad_tenant_id',
device_id='fake_device',
device_owner='fake_owner',
fixed_ips=[],
set_context=False)
def test_create_port_bad_tenant(self):
with self.network() as network:
self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPNotFound.code,
tenant_id='bad_tenant_id',
device_id='fake_device',
device_owner='fake_owner',
fixed_ips=[],
set_context=True)
def test_create_port_public_network(self):
keys = [('admin_state_up', True), ('status', self.port_create_status)]
with self.network(shared=True) as network:
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize(self.fmt, port_res)
for k, v in keys:
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
self._delete('ports', port['port']['id'])
def test_create_port_None_values(self):
with self.network() as network:
keys = ['device_owner', 'name', 'device_id']
for key in keys:
# test with each as None and rest as ''
kwargs = {k: '' for k in keys}
kwargs[key] = None
self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPClientError.code,
tenant_id='tenant_id',
fixed_ips=[],
set_context=False,
**kwargs)
def test_create_port_public_network_with_ip(self):
with self.network(shared=True) as network:
ip_net = netaddr.IPNetwork('10.0.0.0/24')
with self.subnet(network=network, cidr=str(ip_net)):
keys = [('admin_state_up', True),
('status', self.port_create_status)]
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize(self.fmt, port_res)
for k, v in keys:
self.assertEqual(port['port'][k], v)
port_ip = port['port']['fixed_ips'][0]['ip_address']
self.assertIn(port_ip, ip_net)
self.assertIn('mac_address', port['port'])
self._delete('ports', port['port']['id'])
def test_create_port_anticipating_allocation(self):
with self.network(shared=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
fixed_ips = [{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'}]
self._create_port(self.fmt, network['network']['id'],
webob.exc.HTTPCreated.code,
fixed_ips=fixed_ips)
def test_create_port_public_network_with_invalid_ip_no_subnet_id(self,
expected_error='InvalidIpForNetwork'):
with self.network(shared=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24'):
ips = [{'ip_address': '1.1.1.1'}]
res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPBadRequest.code,
fixed_ips=ips,
set_context=True)
data = self.deserialize(self.fmt, res)
msg = str(lib_exc.InvalidIpForNetwork(ip_address='1.1.1.1'))
self.assertEqual(expected_error, data['NeutronError']['type'])
self.assertEqual(msg, data['NeutronError']['message'])
def test_create_port_public_network_with_invalid_ip_and_subnet_id(self,
expected_error='InvalidIpForSubnet'):
with self.network(shared=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
ips = [{'subnet_id': subnet['subnet']['id'],
'ip_address': '1.1.1.1'}]
res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPBadRequest.code,
fixed_ips=ips,
set_context=True)
data = self.deserialize(self.fmt, res)
msg = str(lib_exc.InvalidIpForSubnet(ip_address='1.1.1.1'))
self.assertEqual(expected_error, data['NeutronError']['type'])
self.assertEqual(msg, data['NeutronError']['message'])
def test_create_ports_bulk_native(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
with self.network() as net:
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
for p in self.deserialize(self.fmt, res)['ports']:
self._delete('ports', p['id'])
def test_create_ports_bulk_emulated(self):
real_has_attr = hasattr
# ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('six.moves.builtins.hasattr',
new=fakehasattr):
with self.network() as net:
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True)
self._validate_behavior_on_bulk_success(res, 'ports')
for p in self.deserialize(self.fmt, res)['ports']:
self._delete('ports', p['id'])
def test_create_ports_bulk_wrong_input(self):
with self.network() as net:
overrides = {1: {'admin_state_up': 'doh'}}
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True,
override=overrides)
self.assertEqual(webob.exc.HTTPClientError.code, res.status_int)
req = self.new_list_request('ports')
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPOk.code, res.status_int)
ports = self.deserialize(self.fmt, res)
self.assertEqual(0, len(ports['ports']))
def test_get_ports_count(self):
with self.port(), self.port(), self.port(), self.port() as p:
tenid = p['port']['tenant_id']
ctx = context.Context(user_id=None, tenant_id=tenid,
is_admin=False)
pl = directory.get_plugin()
count = pl.get_ports_count(ctx, filters={'tenant_id': [tenid]})
self.assertEqual(4, count)
def test_create_ports_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
# ensures the API choose the emulation code path
def fakehasattr(item, attr):
if attr.endswith('__native_bulk_support'):
return False
return real_has_attr(item, attr)
with mock.patch('six.moves.builtins.hasattr',
new=fakehasattr):
orig = directory.get_plugin().create_port
method_to_patch = _get_create_db_method('port')
with mock.patch.object(directory.get_plugin(),
method_to_patch) as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
res = self._create_port_bulk(self.fmt, 2,
net['network']['id'],
'test',
True)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'ports', webob.exc.HTTPServerError.code
)
def test_create_ports_bulk_native_plugin_failure(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
ctx = context.get_admin_context()
with self.network() as net:
plugin = directory.get_plugin()
orig = plugin.create_port
method_to_patch = _get_create_db_method('port')
with mock.patch.object(plugin, method_to_patch) as patched_plugin:
def side_effect(*args, **kwargs):
return self._fail_second_call(patched_plugin, orig,
*args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True, context=ctx)
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'ports', webob.exc.HTTPServerError.code)
def test_list_ports(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port() as v1, self.port() as v2, self.port() as v3:
ports = (v1, v2, v3)
self._test_list_resources('port', ports)
def test_list_ports_filtered_by_fixed_ip(self):
# for this test we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port() as port1, self.port():
fixed_ips = port1['port']['fixed_ips'][0]
query_params = """
fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
""".strip() % (fixed_ips['ip_address'],
'192.168.126.5',
fixed_ips['subnet_id'])
self._test_list_resources('port', [port1],
query_params=query_params)
def test_list_ports_public_network(self):
with self.network(shared=True) as network:
with self.subnet(network) as subnet:
with self.port(subnet, tenant_id='tenant_1') as port1,\
self.port(subnet, tenant_id='tenant_2') as port2:
# Admin request - must return both ports
self._test_list_resources('port', [port1, port2])
# Tenant_1 request - must return single port
n_context = context.Context('', 'tenant_1')
self._test_list_resources('port', [port1],
neutron_context=n_context)
# Tenant_2 request - must return single port
n_context = context.Context('', 'tenant_2')
self._test_list_resources('port', [port2],
neutron_context=n_context)
def test_list_ports_for_network_owner(self):
with self.network(tenant_id='tenant_1') as network:
with self.subnet(network) as subnet:
with self.port(subnet, tenant_id='tenant_1') as port1,\
self.port(subnet, tenant_id='tenant_2') as port2:
# network owner request, should return all ports
port_res = self._list_ports(
'json', set_context=True, tenant_id='tenant_1')
port_list = self.deserialize('json', port_res)['ports']
port_ids = [p['id'] for p in port_list]
self.assertEqual(2, len(port_list))
self.assertIn(port1['port']['id'], port_ids)
self.assertIn(port2['port']['id'], port_ids)
# another tenant request, only return ports belong to it
port_res = self._list_ports(
'json', set_context=True, tenant_id='tenant_2')
port_list = self.deserialize('json', port_res)['ports']
port_ids = [p['id'] for p in port_list]
self.assertEqual(1, len(port_list))
self.assertNotIn(port1['port']['id'], port_ids)
self.assertIn(port2['port']['id'], port_ids)
def test_list_ports_with_sort_native(self):
if self._skip_native_sorting:
self.skipTest("Skip test for not implemented sorting feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port(admin_state_up='True',
mac_address='00:00:00:00:00:01') as port1,\
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:02') as port2,\
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_sort('port', (port3, port2, port1),
[('admin_state_up', 'asc'),
('mac_address', 'desc')])
def test_list_ports_with_sort_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_sorting_helper',
new=_fake_get_sorting_helper)
helper_patcher.start()
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port(admin_state_up='True',
mac_address='00:00:00:00:00:01') as port1,\
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:02') as port2,\
self.port(admin_state_up='False',
mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_sort('port', (port3, port2, port1),
[('admin_state_up', 'asc'),
('mac_address', 'desc')])
def test_list_ports_with_pagination_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port(mac_address='00:00:00:00:00:01') as port1,\
self.port(mac_address='00:00:00:00:00:02') as port2,\
self.port(mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_pagination('port',
(port1, port2, port3),
('mac_address', 'asc'), 2, 2)
def test_list_ports_with_pagination_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port(mac_address='00:00:00:00:00:01') as port1,\
self.port(mac_address='00:00:00:00:00:02') as port2,\
self.port(mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_pagination('port',
(port1, port2, port3),
('mac_address', 'asc'), 2, 2)
def test_list_ports_with_pagination_reverse_native(self):
if self._skip_native_pagination:
self.skipTest("Skip test for not implemented pagination feature")
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port(mac_address='00:00:00:00:00:01') as port1,\
self.port(mac_address='00:00:00:00:00:02') as port2,\
self.port(mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_pagination_reverse('port',
(port1, port2, port3),
('mac_address', 'asc'),
2, 2)
def test_list_ports_with_pagination_reverse_emulated(self):
helper_patcher = mock.patch(
'neutron.api.v2.base.Controller._get_pagination_helper',
new=_fake_get_pagination_helper)
helper_patcher.start()
cfg.CONF.set_default('allow_overlapping_ips', True)
with self.port(mac_address='00:00:00:00:00:01') as port1,\
self.port(mac_address='00:00:00:00:00:02') as port2,\
self.port(mac_address='00:00:00:00:00:03') as port3:
self._test_list_with_pagination_reverse('port',
(port1, port2, port3),
('mac_address', 'asc'),
2, 2)
def test_show_port(self):
with self.port() as port:
req = self.new_show_request('ports', port['port']['id'], self.fmt)
sport = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(port['port']['id'], sport['port']['id'])
def test_delete_port(self):
with self.port() as port:
self._delete('ports', port['port']['id'])
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
def test_delete_port_public_network(self):
with self.network(shared=True) as network:
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
tenant_id='another_tenant',
set_context=True)
port = self.deserialize(self.fmt, port_res)
self._delete('ports', port['port']['id'])
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
def test_delete_port_by_network_owner(self):
with self.network(tenant_id='tenant_1') as network:
with self.subnet(network) as subnet:
with self.port(subnet, tenant_id='tenant_2') as port:
self._delete(
'ports', port['port']['id'],
neutron_context=context.Context('', 'tenant_1'))
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
def test_update_port_with_stale_subnet(self):
with self.network(shared=True) as network:
port = self._make_port(self.fmt, network['network']['id'])
subnet = self._make_subnet(self.fmt, network,
'10.0.0.1', '10.0.0.0/24')
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id']}]}}
# mock _get_subnets, to return this subnet
mock.patch.object(ipam_backend_mixin.IpamBackendMixin,
'_ipam_get_subnets',
return_value=[subnet['subnet']]).start()
# Delete subnet, to mock the subnet as stale.
self._delete('subnets', subnet['subnet']['id'])
self._show('subnets', subnet['subnet']['id'],
expected_code=webob.exc.HTTPNotFound.code)
# Though _get_subnets returns the subnet, subnet was deleted later
# while ipam is updating the port. So port update should fail.
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
def test_port_update_with_ipam_error(self):
with self.network() as network,\
self.subnet(), self.subnet(),\
self.port(network=network) as port,\
mock.patch('neutron.ipam.drivers.neutrondb_ipam.'
'driver.NeutronDbSubnet.deallocate') as f:
f.side_effect = [
ipam_exc.IpAddressAllocationNotFound(
ip_address='foo_i', subnet_id='foo_s'),
None,
]
data = {'port': {'name': 'fool-me'}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual('fool-me', res['port']['name'])
def test_update_port(self):
with self.port() as port:
data = {'port': {'admin_state_up': False}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(res['port']['admin_state_up'],
data['port']['admin_state_up'])
def update_port_mac(self, port, updated_fixed_ips=None):
orig_mac = port['mac_address']
mac = orig_mac.split(':')
mac[5] = '01' if mac[5] != '01' else '00'
new_mac = ':'.join(mac)
data = {'port': {'mac_address': new_mac}}
if updated_fixed_ips:
data['port']['fixed_ips'] = updated_fixed_ips
req = self.new_update_request('ports', data, port['id'])
return req.get_response(self.api), new_mac
def _verify_ips_after_mac_change(self, orig_port, new_port):
for fip in orig_port['port']['fixed_ips']:
subnet = self._show('subnets', fip['subnet_id'])
if ipv6_utils.is_auto_address_subnet(subnet['subnet']):
port_mac = new_port['port']['mac_address']
subnet_cidr = subnet['subnet']['cidr']
eui_addr = str(netutils.get_ipv6_addr_by_EUI64(subnet_cidr,
port_mac))
fip = {'ip_address': eui_addr,
'subnet_id': subnet['subnet']['id']}
self.assertIn(fip, new_port['port']['fixed_ips'])
self.assertEqual(len(orig_port['port']['fixed_ips']),
len(new_port['port']['fixed_ips']))
def check_update_port_mac(
self, expected_status=webob.exc.HTTPOk.code,
expected_error='StateInvalid', subnet=None,
device_owner=DEVICE_OWNER_COMPUTE, updated_fixed_ips=None,
host_arg=None, arg_list=None):
host_arg = host_arg or {}
arg_list = arg_list or []
with self.port(device_owner=device_owner, subnet=subnet,
arg_list=arg_list, **host_arg) as port:
self.assertIn('mac_address', port['port'])
res, new_mac = self.update_port_mac(
port['port'], updated_fixed_ips=updated_fixed_ips)
self.assertEqual(expected_status, res.status_int)
if expected_status == webob.exc.HTTPOk.code:
result = self.deserialize(self.fmt, res)
self.assertIn('port', result)
self.assertEqual(new_mac, result['port']['mac_address'])
if updated_fixed_ips is None:
self._verify_ips_after_mac_change(port, result)
else:
self.assertEqual(len(updated_fixed_ips),
len(result['port']['fixed_ips']))
else:
error = self.deserialize(self.fmt, res)
self.assertEqual(expected_error,
error['NeutronError']['type'])
def test_update_port_mac(self):
self.check_update_port_mac()
# sub-classes for plugins/drivers that support mac address update
# override this method
def test_dhcp_port_ips_prefer_next_available_ip(self):
# test to check that DHCP ports get the first available IP in the
# allocation range
with self.subnet() as subnet:
port_ips = []
for _ in range(10):
with self.port(device_owner=constants.DEVICE_OWNER_DHCP,
subnet=subnet) as port:
port_ips.append(port['port']['fixed_ips'][0]['ip_address'])
first_ip = netaddr.IPAddress(port_ips[0])
expected = [str(first_ip + i) for i in range(10)]
self.assertEqual(expected, port_ips)
def test_update_port_mac_ip(self):
with self.subnet() as subnet:
updated_fixed_ips = [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.3'}]
self.check_update_port_mac(subnet=subnet,
updated_fixed_ips=updated_fixed_ips)
def test_update_port_mac_v6_slaac(self):
with self.network() as n:
pass
# add a couple of v4 networks to ensure they aren't interferred with
with self.subnet(network=n) as v4_1, \
self.subnet(network=n, cidr='7.0.0.0/24') as v4_2:
pass
with self.subnet(network=n,
gateway_ip='fe80::1',
cidr='2607:f0d0:1002:51::/64',
ip_version=6,
ipv6_address_mode=constants.IPV6_SLAAC) as subnet:
self.assertTrue(
ipv6_utils.is_auto_address_subnet(subnet['subnet']))
fixed_ips_req = {
'fixed_ips': [{'subnet_id': subnet['subnet']['id']},
{'subnet_id': v4_1['subnet']['id']},
{'subnet_id': v4_1['subnet']['id']},
{'subnet_id': v4_2['subnet']['id']},
{'subnet_id': v4_2['subnet']['id']}]
}
self.check_update_port_mac(subnet=subnet, host_arg=fixed_ips_req)
def test_update_port_mac_bad_owner(self):
self.check_update_port_mac(
device_owner=DEVICE_OWNER_NOT_COMPUTE,
expected_status=webob.exc.HTTPConflict.code,
expected_error='UnsupportedPortDeviceOwner')
def check_update_port_mac_used(self, expected_error='MacAddressInUse'):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
with self.port(subnet=subnet) as port2:
self.assertIn('mac_address', port['port'])
new_mac = port2['port']['mac_address']
data = {'port': {'mac_address': new_mac}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPConflict.code,
res.status_int)
error = self.deserialize(self.fmt, res)
self.assertEqual(expected_error,
error['NeutronError']['type'])
def test_update_port_mac_used(self):
self.check_update_port_mac_used()
def test_update_port_not_admin(self):
res = self._create_network(self.fmt, 'net1', True,
tenant_id='not_admin',
set_context=True)
net1 = self.deserialize(self.fmt, res)
res = self._create_port(self.fmt, net1['network']['id'],
tenant_id='not_admin', set_context=True)
port = self.deserialize(self.fmt, res)
data = {'port': {'admin_state_up': False}}
neutron_context = context.Context('', 'not_admin')
port = self._update('ports', port['port']['id'], data,
neutron_context=neutron_context)
self.assertFalse(port['port']['admin_state_up'])
def test_update_device_id_unchanged(self):
with self.port() as port:
data = {'port': {'admin_state_up': True,
'device_id': port['port']['device_id']}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertTrue(res['port']['admin_state_up'])
def test_update_device_id_null(self):
with self.port() as port:
data = {'port': {'device_id': None}}
req = self.new_update_request('ports', data, port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPClientError.code, res.status_int)
def test_delete_network_if_port_exists(self):
with self.port() as port:
req = self.new_delete_request('networks',
port['port']['network_id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_delete_network_port_exists_owned_by_network(self):
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
network_id = network['network']['id']
self._create_port(self.fmt, network_id,
device_owner=constants.DEVICE_OWNER_DHCP)
req = self.new_delete_request('networks', network_id)
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
def test_delete_network_port_exists_owned_by_network_race(self):
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
network_id = network['network']['id']
self._create_port(self.fmt, network_id,
device_owner=constants.DEVICE_OWNER_DHCP)
# skip first port delete to simulate create after auto clean
plugin = directory.get_plugin()
p = mock.patch.object(plugin, 'delete_port')
mock_del_port = p.start()
mock_del_port.side_effect = lambda *a, **k: p.stop()
req = self.new_delete_request('networks', network_id)
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
def test_delete_network_port_exists_owned_by_network_port_not_found(self):
"""Tests that we continue to gracefully delete the network even if
a neutron:dhcp-owned port was deleted concurrently.
"""
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
network_id = network['network']['id']
self._create_port(self.fmt, network_id,
device_owner=constants.DEVICE_OWNER_DHCP)
# Raise PortNotFound when trying to delete the port to simulate a
# concurrent delete race; note that we actually have to delete the port
# "out of band" otherwise deleting the network will fail because of
# constraints in the data model.
plugin = directory.get_plugin()
orig_delete = plugin.delete_port
def fake_delete_port(context, id):
# Delete the port for real from the database and then raise
# PortNotFound to simulate the race.
self.assertIsNone(orig_delete(context, id))
raise lib_exc.PortNotFound(port_id=id)
p = mock.patch.object(plugin, 'delete_port')
mock_del_port = p.start()
mock_del_port.side_effect = fake_delete_port
req = self.new_delete_request('networks', network_id)
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
def test_update_port_delete_ip(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': []}}
req = self.new_update_request('ports',
data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(data['port']['admin_state_up'],
res['port']['admin_state_up'])
self.assertEqual(data['port']['fixed_ips'],
res['port']['fixed_ips'])
def test_no_more_port_exception(self):
with self.subnet(cidr='10.0.0.0/31', enable_dhcp=False) as subnet:
id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, id)
data = self.deserialize(self.fmt, res)
msg = str(lib_exc.IpAddressGenerationFailure(net_id=id))
self.assertEqual(data['NeutronError']['message'], msg)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_ports_native_quotas(self):
quota = 1
cfg.CONF.set_override('quota_port', quota, group='QUOTAS')
with self.network() as network:
res = self._create_port(self.fmt, network['network']['id'])
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
res = self._create_port(self.fmt, network['network']['id'])
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_ports_bulk_native_quotas(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk port create")
quota = 4
cfg.CONF.set_override('quota_port', quota, group='QUOTAS')
with self.network() as network:
res = self._create_port_bulk(self.fmt, quota + 1,
network['network']['id'],
'test', True)
self._validate_behavior_on_bulk_failure(
res, 'ports',
errcode=webob.exc.HTTPConflict.code)
def test_update_port_update_ip(self):
"""Test update of port IP.
Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10.
"""
with self.subnet() as subnet:
fixed_ip_data = [{'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.10', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
def test_update_port_update_ip_address_only(self):
with self.subnet() as subnet:
ip_address = '10.0.0.2'
fixed_ip_data = [{'ip_address': ip_address,
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(ip_address, ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"},
{'ip_address': ip_address}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': ip_address,
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '10.0.0.10',
'subnet_id': subnet['subnet']['id']}, ips)
def test_update_port_update_ips(self):
"""Update IP and associate new IP on port.
Check a port update with the specified subnet_id's. A IP address
will be allocated for each subnet_id.
"""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': '10.0.0.3'}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(data['port']['admin_state_up'],
res['port']['admin_state_up'])
ips = res['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.3', ips[0]['ip_address'], '10.0.0.3')
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
def test_update_port_add_additional_ip(self):
"""Test update of port with additional IP."""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
data = {'port': {'admin_state_up': False,
'fixed_ips': [{'subnet_id':
subnet['subnet']['id']},
{'subnet_id':
subnet['subnet']['id']}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(data['port']['admin_state_up'],
res['port']['admin_state_up'])
ips = res['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertNotEqual(ips[0]['ip_address'],
ips[1]['ip_address'])
network_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
self.assertIn(ips[0]['ip_address'], network_ip_net)
self.assertIn(ips[1]['ip_address'], network_ip_net)
def test_update_port_invalid_fixed_ip_address_v6_slaac(self):
with self.subnet(
cidr='2607:f0d0:1002:51::/64',
ip_version=6,
ipv6_address_mode=constants.IPV6_SLAAC,
gateway_ip=constants.ATTR_NOT_SPECIFIED) as subnet:
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
ip_address = '2607:f0d0:1002:51::5'
self.assertEqual(1, len(ips))
port_mac = port['port']['mac_address']
subnet_id = subnet['subnet']['id']
subnet_cidr = subnet['subnet']['cidr']
eui_addr = str(netutils.get_ipv6_addr_by_EUI64(subnet_cidr,
port_mac))
self.assertEqual(ips[0]['ip_address'], eui_addr)
self.assertEqual(ips[0]['subnet_id'], subnet_id)
data = {'port': {'fixed_ips': [{'subnet_id': subnet_id,
'ip_address': ip_address}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
err = self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPClientError.code,
res.status_int)
self.assertEqual('AllocationOnAutoAddressSubnet',
err['NeutronError']['type'])
msg = str(ipam_exc.AllocationOnAutoAddressSubnet(
ip=ip_address, subnet_id=subnet_id))
self.assertEqual(err['NeutronError']['message'], msg)
def test_requested_duplicate_mac(self):
with self.port() as port:
mac = port['port']['mac_address']
# check that MAC address matches base MAC
base_mac = cfg.CONF.base_mac[0:2]
self.assertTrue(mac.startswith(base_mac))
kwargs = {"mac_address": mac}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_mac_generation(self):
cfg.CONF.set_override('base_mac', "12:34:56:00:00:00")
with self.port() as port:
mac = port['port']['mac_address']
self.assertTrue(mac.startswith("12:34:56"))
def test_mac_generation_4octet(self):
cfg.CONF.set_override('base_mac', "12:34:56:78:00:00")
with self.port() as port:
mac = port['port']['mac_address']
self.assertTrue(mac.startswith("12:34:56:78"))
def test_duplicate_mac_generation(self):
# simulate duplicate mac generation to make sure DBDuplicate is retried
responses = ['12:34:56:78:00:00', '12:34:56:78:00:00',
'12:34:56:78:00:01']
with mock.patch.object(net, 'get_random_mac',
side_effect=responses) as grand_mac:
with self.subnet() as s:
with self.port(subnet=s) as p1, self.port(subnet=s) as p2:
self.assertEqual('12:34:56:78:00:00',
p1['port']['mac_address'])
self.assertEqual('12:34:56:78:00:01',
p2['port']['mac_address'])
self.assertEqual(3, grand_mac.call_count)
def test_bad_mac_format(self):
cfg.CONF.set_override('base_mac', "bad_mac")
try:
self.plugin._check_base_mac_format()
except Exception:
return
self.fail("No exception for illegal base_mac format")
def test_is_mac_in_use(self):
ctx = context.get_admin_context()
with self.port() as port:
net_id = port['port']['network_id']
mac = port['port']['mac_address']
self.assertTrue(self.plugin._is_mac_in_use(ctx, net_id, mac))
mac2 = '00:22:00:44:00:66' # other mac, same network
self.assertFalse(self.plugin._is_mac_in_use(ctx, net_id, mac2))
net_id2 = port['port']['id'] # other net uuid, same mac
self.assertFalse(self.plugin._is_mac_in_use(ctx, net_id2, mac))
def test_requested_duplicate_ip(self):
with self.subnet() as subnet:
subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertIn(ips[0]['ip_address'], subnet_ip_net)
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Check configuring of duplicate IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': ips[0]['ip_address']}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_requested_subnet_id(self):
with self.subnet() as subnet:
subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertIn(netaddr.IPAddress(ips[0]['ip_address']),
netaddr.IPSet(subnet_ip_net))
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Request a IP from specific subnet
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id']}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
ips = port2['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertIn(ips[0]['ip_address'], subnet_ip_net)
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port2['port']['id'])
def test_requested_subnet_id_not_on_network(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
# Create new network
res = self._create_network(fmt=self.fmt, name='net2',
admin_state_up=True)
network2 = self.deserialize(self.fmt, res)
subnet2 = self._make_subnet(self.fmt, network2, "1.1.1.1",
"1.1.1.0/24", ip_version=4)
net_id = port['port']['network_id']
# Request a IP from specific subnet
kwargs = {"fixed_ips": [{'subnet_id':
subnet2['subnet']['id']}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(webob.exc.HTTPClientError.code,
res.status_int)
def test_overlapping_subnets(self):
with self.subnet() as subnet:
tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
res = self._create_subnet(self.fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='10.0.0.225/28',
ip_version=4,
gateway_ip=constants.ATTR_NOT_SPECIFIED)
self.assertEqual(webob.exc.HTTPClientError.code, res.status_int)
def test_requested_subnet_id_v4_and_v6(self):
with self.subnet() as subnet:
# Get a IPv4 and IPv6 address
tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
res = self._create_subnet(
self.fmt,
tenant_id=tenant_id,
net_id=net_id,
cidr='2607:f0d0:1002:51::/124',
ip_version=6,
gateway_ip=constants.ATTR_NOT_SPECIFIED)
subnet2 = self.deserialize(self.fmt, res)
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
{'subnet_id': subnet2['subnet']['id']}]}
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port3 = self.deserialize(self.fmt, res)
ips = port3['port']['fixed_ips']
cidr_v4 = subnet['subnet']['cidr']
cidr_v6 = subnet2['subnet']['cidr']
self.assertEqual(2, len(ips))
self._test_requested_port_subnet_ids(ips,
[subnet['subnet']['id'],
subnet2['subnet']['id']])
self._test_dual_stack_port_ip_addresses_in_subnets(ips,
cidr_v4,
cidr_v6)
res = self._create_port(self.fmt, net_id=net_id)
port4 = self.deserialize(self.fmt, res)
# Check that a v4 and a v6 address are allocated
ips = port4['port']['fixed_ips']
self.assertEqual(2, len(ips))
self._test_requested_port_subnet_ids(ips,
[subnet['subnet']['id'],
subnet2['subnet']['id']])
self._test_dual_stack_port_ip_addresses_in_subnets(ips,
cidr_v4,
cidr_v6)
self._delete('ports', port3['port']['id'])
self._delete('ports', port4['port']['id'])
def _test_requested_port_subnet_ids(self, ips, expected_subnet_ids):
self.assertEqual(set(x['subnet_id'] for x in ips),
set(expected_subnet_ids))
def _test_dual_stack_port_ip_addresses_in_subnets(self, ips, cidr_v4,
cidr_v6):
ip_net_v4 = netaddr.IPNetwork(cidr_v4)
ip_net_v6 = netaddr.IPNetwork(cidr_v6)
for address in ips:
ip_addr = netaddr.IPAddress(address['ip_address'])
expected_ip_net = ip_net_v4 if ip_addr.version == 4 else ip_net_v6
self.assertIn(ip_addr, expected_ip_net)
def test_create_port_invalid_fixed_ip_address_v6_pd_slaac(self):
with self.network(name='net') as network:
subnet = self._make_v6_subnet(
network, constants.IPV6_SLAAC, ipv6_pd=True)
net_id = subnet['subnet']['network_id']
subnet_id = subnet['subnet']['id']
# update subnet with new prefix
prefix = '2001::/64'
data = {'subnet': {'cidr': prefix}}
self.plugin.update_subnet(context.get_admin_context(),
subnet_id, data)
kwargs = {"fixed_ips": [{'subnet_id': subnet_id,
'ip_address': '2001::2'}]}
# pd is a auto address subnet, so can't have 2001::2
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(webob.exc.HTTPClientError.code,
res.status_int)
def test_update_port_invalid_fixed_ip_address_v6_pd_slaac(self):
with self.network(name='net') as network:
subnet = self._make_v6_subnet(
network, constants.IPV6_SLAAC, ipv6_pd=True)
net_id = subnet['subnet']['network_id']
subnet_id = subnet['subnet']['id']
# update subnet with new prefix
prefix = '2001::/64'
data = {'subnet': {'cidr': prefix}}
self.plugin.update_subnet(context.get_admin_context(),
subnet_id, data)
# create port and check for eui addr with 2001::/64 prefix.
res = self._create_port(self.fmt, net_id=net_id)
port = self.deserialize(self.fmt, res)
port_mac = port['port']['mac_address']
eui_addr = str(netutils.get_ipv6_addr_by_EUI64(
prefix, port_mac))
fixedips = [{'subnet_id': subnet_id, 'ip_address':<