neutron/neutron/tests/unit/extensions/test_segment.py

1248 lines
56 KiB
Python

# Copyright (c) 2016 Hewlett Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import netaddr
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from oslo_utils import uuidutils
import webob.exc
from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron import context
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import db_base_plugin_v2
from neutron.db import portbindings_db
from neutron.db import segments_db
from neutron.extensions import ip_allocation
from neutron.extensions import l2_adjacency
from neutron.extensions import portbindings
from neutron.extensions import segment as ext_segment
from neutron.plugins.common import constants as p_constants
from neutron.plugins.ml2 import config
from neutron.services.segments import db
from neutron.services.segments import exceptions as segment_exc
from neutron.tests.common import helpers
from neutron.tests.unit.db import test_db_base_plugin_v2
SERVICE_PLUGIN_KLASS = 'neutron.services.segments.plugin.Plugin'
TEST_PLUGIN_KLASS = (
'neutron.tests.unit.extensions.test_segment.SegmentTestPlugin')
DHCP_HOSTA = 'dhcp-host-a'
DHCP_HOSTB = 'dhcp-host-b'
HTTP_NOT_FOUND = 404
class SegmentTestExtensionManager(object):
def get_resources(self):
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
ext_segment.RESOURCE_ATTRIBUTE_MAP)
return ext_segment.Segment.get_resources()
def get_actions(self):
return []
def get_request_extensions(self):
return []
class SegmentTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def setUp(self, plugin=None):
if not plugin:
plugin = TEST_PLUGIN_KLASS
service_plugins = {'segments_plugin_name': SERVICE_PLUGIN_KLASS}
ext_mgr = SegmentTestExtensionManager()
super(SegmentTestCase, self).setUp(plugin=plugin, ext_mgr=ext_mgr,
service_plugins=service_plugins)
def _create_segment(self, fmt, expected_res_status=None, **kwargs):
segment = {'segment': {}}
for k, v in kwargs.items():
segment['segment'][k] = str(v)
segment_req = self.new_create_request(
'segments', segment, fmt)
segment_res = segment_req.get_response(self.ext_api)
if expected_res_status:
self.assertEqual(segment_res.status_int, expected_res_status)
return segment_res
def _make_segment(self, fmt, **kwargs):
res = self._create_segment(fmt, **kwargs)
if res.status_int >= webob.exc.HTTPClientError.code:
res.charset = 'utf8'
raise webob.exc.HTTPClientError(
code=res.status_int, explanation=str(res))
return self.deserialize(fmt, res)
def segment(self, **kwargs):
kwargs.setdefault('network_type', 'net_type')
return self._make_segment(
self.fmt, tenant_id=self._tenant_id, **kwargs)
def _test_create_segment(self, expected=None, **kwargs):
keys = kwargs.copy()
segment = self.segment(**keys)
self._validate_resource(segment, keys, 'segment')
if expected:
self._compare_resource(segment, expected, 'segment')
return segment
class SegmentTestPlugin(db_base_plugin_v2.NeutronDbPluginV2,
portbindings_db.PortBindingMixin,
db.SegmentDbMixin):
__native_pagination_support = True
__native_sorting_support = True
supported_extension_aliases = ["segment", "binding", "ip_allocation"]
def get_plugin_description(self):
return "Network Segments"
@classmethod
def get_plugin_type(cls):
return "segments"
def create_port(self, context, port):
port_dict = super(SegmentTestPlugin, self).create_port(context, port)
self._process_portbindings_create_and_update(
context, port['port'], port_dict)
return port_dict
def update_port(self, context, id, port):
port_dict = super(SegmentTestPlugin, self).update_port(
context, id, port)
self._process_portbindings_create_and_update(
context, port['port'], port_dict)
return port_dict
class TestSegment(SegmentTestCase):
def test_create_segment(self):
with self.network() as network:
network = network['network']
expected_segment = {'network_id': network['id'],
'physical_network': 'phys_net',
'network_type': 'net_type',
'segmentation_id': 200}
self._test_create_segment(network_id=network['id'],
physical_network='phys_net',
segmentation_id=200,
expected=expected_segment)
def test_create_segment_non_existent_network(self):
exc = self.assertRaises(webob.exc.HTTPClientError,
self._test_create_segment,
network_id=uuidutils.generate_uuid(),
physical_network='phys_net',
segmentation_id=200)
self.assertEqual(HTTP_NOT_FOUND, exc.code)
self.assertIn('NetworkNotFound', exc.explanation)
def test_create_segment_no_phys_net(self):
with self.network() as network:
network = network['network']
expected_segment = {'network_id': network['id'],
'physical_network': None,
'network_type': 'net_type',
'segmentation_id': 200}
self._test_create_segment(network_id=network['id'],
segmentation_id=200,
expected=expected_segment)
def test_create_segment_no_segmentation_id(self):
with self.network() as network:
network = network['network']
expected_segment = {'network_id': network['id'],
'physical_network': 'phys_net',
'network_type': 'net_type',
'segmentation_id': None}
self._test_create_segment(network_id=network['id'],
physical_network='phys_net',
expected=expected_segment)
def test_delete_segment(self):
with self.network() as network:
network = network['network']
segment = self.segment(network_id=network['id'])
self._delete('segments', segment['segment']['id'])
self._show('segments', segment['segment']['id'],
expected_code=webob.exc.HTTPNotFound.code)
def test_get_segment(self):
with self.network() as network:
network = network['network']
segment = self._test_create_segment(network_id=network['id'],
physical_network='phys_net',
segmentation_id=200)
req = self.new_show_request('segments', segment['segment']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(segment['segment']['id'], res['segment']['id'])
def test_list_segments(self):
with self.network() as network:
network = network['network']
self._test_create_segment(network_id=network['id'],
physical_network='phys_net1',
segmentation_id=200)
self._test_create_segment(network_id=network['id'],
physical_network='phys_net2',
segmentation_id=200)
res = self._list('segments')
self.assertEqual(2, len(res['segments']))
class TestSegmentML2(SegmentTestCase):
def setUp(self):
super(TestSegmentML2, self).setUp(plugin='ml2')
def test_segment_notification_on_create_network(self):
with mock.patch.object(registry, 'notify') as notify:
with self.network():
pass
notify.assert_any_call(resources.SEGMENT,
events.PRECOMMIT_CREATE,
context=mock.ANY,
segment=mock.ANY,
trigger=mock.ANY)
class TestSegmentSubnetAssociation(SegmentTestCase):
def test_basic_association(self):
with self.network() as network:
net = network['network']
segment = self._test_create_segment(network_id=net['id'])
segment_id = segment['segment']['id']
with self.subnet(network=network, segment_id=segment_id) as subnet:
subnet = subnet['subnet']
request = self.new_show_request('subnets', subnet['id'])
response = request.get_response(self.api)
res = self.deserialize(self.fmt, response)
self.assertEqual(segment_id,
res['subnet']['segment_id'])
def test_association_network_mismatch(self):
with self.network() as network1:
with self.network() as network2:
net = network1['network']
segment = self._test_create_segment(network_id=net['id'])
res = self._create_subnet(self.fmt,
net_id=network2['network']['id'],
tenant_id=network2['network']['tenant_id'],
gateway_ip=constants.ATTR_NOT_SPECIFIED,
cidr='10.0.0.0/24',
segment_id=segment['segment']['id'])
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_association_segment_not_found(self):
with self.network() as network:
net = network['network']
segment_id = uuidutils.generate_uuid()
res = self._create_subnet(self.fmt,
net_id=net['id'],
tenant_id=net['tenant_id'],
gateway_ip=constants.ATTR_NOT_SPECIFIED,
cidr='10.0.0.0/24',
segment_id=segment_id)
self.assertEqual(webob.exc.HTTPNotFound.code, res.status_int)
def test_only_some_subnets_associated_not_allowed(self):
with self.network() as network:
with self.subnet(network=network):
net = network['network']
segment = self._test_create_segment(network_id=net['id'])
res = self._create_subnet(self.fmt,
net_id=net['id'],
tenant_id=net['tenant_id'],
gateway_ip=constants.ATTR_NOT_SPECIFIED,
cidr='10.0.1.0/24',
segment_id=segment['segment']['id'])
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_association_to_dynamic_segment_not_allowed(self):
cxt = context.get_admin_context()
with self.network() as network:
net = network['network']
# Can't create a dynamic segment through the API
segment = {segments_db.NETWORK_TYPE: 'phys_net',
segments_db.PHYSICAL_NETWORK: 'net_type',
segments_db.SEGMENTATION_ID: 200}
segments_db.add_network_segment(cxt,
network_id=net['id'],
segment=segment,
is_dynamic=True)
res = self._create_subnet(self.fmt,
net_id=net['id'],
tenant_id=net['tenant_id'],
gateway_ip=constants.ATTR_NOT_SPECIFIED,
cidr='10.0.0.0/24',
segment_id=segment['id'])
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
class HostSegmentMappingTestCase(SegmentTestCase):
_mechanism_drivers = ['logger']
def setUp(self, plugin=None):
config.cfg.CONF.set_override('mechanism_drivers',
self._mechanism_drivers,
group='ml2')
if not plugin:
plugin = 'ml2'
super(HostSegmentMappingTestCase, self).setUp(plugin=plugin)
db.subscribe()
def _get_segments_for_host(self, host):
ctx = context.get_admin_context()
segments_host_list = ctx.session.query(
db.SegmentHostMapping).filter_by(host=host)
return {seg_host['segment_id']: seg_host
for seg_host in segments_host_list}
def _register_agent(self, host, mappings=None, plugin=None,
start_flag=True):
helpers.register_ovs_agent(host=host, bridge_mappings=mappings,
plugin=self.plugin, start_flag=start_flag)
def _test_one_segment_one_host(self, host):
physical_network = 'phys_net1'
with self.network() as network:
network = network['network']
segment = self._test_create_segment(
network_id=network['id'], physical_network=physical_network,
segmentation_id=200, network_type=p_constants.TYPE_VLAN)['segment']
self._register_agent(host, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin)
segments_host_db = self._get_segments_for_host(host)
self.assertEqual(1, len(segments_host_db))
self.assertEqual(segment['id'],
segments_host_db[segment['id']]['segment_id'])
self.assertEqual(host, segments_host_db[segment['id']]['host'])
return segment
class TestMl2HostSegmentMappingNoAgent(HostSegmentMappingTestCase):
def setUp(self, plugin=None):
if not plugin:
plugin = TEST_PLUGIN_KLASS
super(TestMl2HostSegmentMappingNoAgent, self).setUp(plugin=plugin)
def test_update_segment_host_mapping(self):
ctx = context.get_admin_context()
host = 'host1'
physnets = ['phys_net1']
with self.network() as network:
network = network['network']
segment = self._test_create_segment(
network_id=network['id'], physical_network='phys_net1',
segmentation_id=200, network_type=p_constants.TYPE_VLAN)['segment']
self._test_create_segment(
network_id=network['id'], physical_network='phys_net2',
segmentation_id=201, network_type=p_constants.TYPE_VLAN)['segment']
segments = db.get_segments_with_phys_nets(ctx, physnets)
segment_ids = {segment['id'] for segment in segments}
db.update_segment_host_mapping(ctx, host, segment_ids)
segments_host_db = self._get_segments_for_host(host)
self.assertEqual(1, len(segments_host_db))
self.assertEqual(segment['id'],
segments_host_db[segment['id']]['segment_id'])
self.assertEqual(host, segments_host_db[segment['id']]['host'])
def test_map_segment_to_hosts(self):
ctx = context.get_admin_context()
hosts = {'host1', 'host2', 'host3'}
with self.network() as network:
network = network['network']
segment = self._test_create_segment(
network_id=network['id'], physical_network='phys_net1',
segmentation_id=200, network_type=p_constants.TYPE_VLAN)['segment']
db.map_segment_to_hosts(ctx, segment['id'], hosts)
updated_segment = self.plugin.get_segment(ctx, segment['id'])
self.assertEqual(hosts, set(updated_segment['hosts']))
def test_get_all_hosts_mapped_with_segments(self):
ctx = context.get_admin_context()
hosts = set()
with self.network() as network:
network_id = network['network']['id']
for i in range(1, 3):
host = "host%s" % i
segment = self._test_create_segment(
network_id=network_id, physical_network='phys_net%s' % i,
segmentation_id=200 + i, network_type=p_constants.TYPE_VLAN)
db.update_segment_host_mapping(
ctx, host, {segment['segment']['id']})
hosts.add(host)
# Now they are 2 hosts with segment being mapped.
actual_hosts = db.get_hosts_mapped_with_segments(ctx)
self.assertEqual(hosts, actual_hosts)
class TestMl2HostSegmentMappingOVS(HostSegmentMappingTestCase):
_mechanism_drivers = ['openvswitch', 'logger']
mock_path = 'neutron.services.segments.db.update_segment_host_mapping'
def test_new_agent(self):
host = 'host1'
self._test_one_segment_one_host(host)
def test_updated_agent_changed_physical_networks(self):
host = 'host1'
physical_networks = ['phys_net1', 'phys_net2']
networks = []
segments = []
for i in range(len(physical_networks)):
with self.network() as network:
networks.append(network['network'])
segments.append(self._test_create_segment(
network_id=networks[i]['id'],
physical_network=physical_networks[i],
segmentation_id=200,
network_type=p_constants.TYPE_VLAN)['segment'])
self._register_agent(host, mappings={physical_networks[0]: 'br-eth-1',
physical_networks[1]: 'br-eth-2'},
plugin=self.plugin)
segments_host_db = self._get_segments_for_host(host)
self.assertEqual(len(physical_networks), len(segments_host_db))
for segment in segments:
self.assertEqual(segment['id'],
segments_host_db[segment['id']]['segment_id'])
self.assertEqual(host, segments_host_db[segment['id']]['host'])
self._register_agent(host, mappings={physical_networks[0]: 'br-eth-1'},
plugin=self.plugin)
segments_host_db = self._get_segments_for_host(host)
self.assertEqual(1, len(segments_host_db))
self.assertEqual(segments[0]['id'],
segments_host_db[segments[0]['id']]['segment_id'])
self.assertEqual(host, segments_host_db[segments[0]['id']]['host'])
def test_same_segment_two_hosts(self):
host1 = 'host1'
host2 = 'host2'
physical_network = 'phys_net1'
segment = self._test_one_segment_one_host(host1)
self._register_agent(host2, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin)
segments_host_db = self._get_segments_for_host(host2)
self.assertEqual(1, len(segments_host_db))
self.assertEqual(segment['id'],
segments_host_db[segment['id']]['segment_id'])
self.assertEqual(host2, segments_host_db[segment['id']]['host'])
def test_update_agent_only_change_agent_host_mapping(self):
host1 = 'host1'
host2 = 'host2'
physical_network = 'phys_net1'
with self.network() as network:
network = network['network']
segment1 = self._test_create_segment(
network_id=network['id'],
physical_network=physical_network,
segmentation_id=200,
network_type=p_constants.TYPE_VLAN)['segment']
self._register_agent(host1, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin)
self._register_agent(host2, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin)
# Update agent at host2 should only change mapping with host2.
other_phys_net = 'phys_net2'
segment2 = self._test_create_segment(
network_id=network['id'],
physical_network=other_phys_net,
segmentation_id=201,
network_type=p_constants.TYPE_VLAN)['segment']
self._register_agent(host2, mappings={other_phys_net: 'br-eth-2'},
plugin=self.plugin)
# We should have segment1 map to host1 and segment2 map to host2 now
segments_host_db1 = self._get_segments_for_host(host1)
self.assertEqual(1, len(segments_host_db1))
self.assertEqual(segment1['id'],
segments_host_db1[segment1['id']]['segment_id'])
self.assertEqual(host1, segments_host_db1[segment1['id']]['host'])
segments_host_db2 = self._get_segments_for_host(host2)
self.assertEqual(1, len(segments_host_db2))
self.assertEqual(segment2['id'],
segments_host_db2[segment2['id']]['segment_id'])
self.assertEqual(host2, segments_host_db2[segment2['id']]['host'])
def test_new_segment_after_host_reg(self):
host1 = 'host1'
physical_network = 'phys_net1'
segment = self._test_one_segment_one_host(host1)
with self.network() as network:
network = network['network']
segment2 = self._test_create_segment(
network_id=network['id'], physical_network=physical_network,
segmentation_id=200, network_type=p_constants.TYPE_VLAN)['segment']
segments_host_db = self._get_segments_for_host(host1)
self.assertEqual(set((segment['id'], segment2['id'])),
set(segments_host_db))
def test_segment_deletion_removes_host_mapping(self):
host = 'host1'
segment = self._test_one_segment_one_host(host)
self._delete('segments', segment['id'])
segments_host_db = self._get_segments_for_host(host)
self.assertFalse(segments_host_db)
@mock.patch(mock_path)
def test_agent_with_no_mappings(self, mock):
host = 'host1'
physical_network = 'phys_net1'
with self.network() as network:
network = network['network']
self._test_create_segment(
network_id=network['id'], physical_network=physical_network,
segmentation_id=200, network_type=p_constants.TYPE_VLAN)
self._register_agent(host, plugin=self.plugin)
segments_host_db = self._get_segments_for_host(host)
self.assertFalse(segments_host_db)
self.assertFalse(mock.mock_calls)
class TestMl2HostSegmentMappingLinuxBridge(TestMl2HostSegmentMappingOVS):
_mechanism_drivers = ['linuxbridge', 'logger']
def _register_agent(self, host, mappings=None, plugin=None):
helpers.register_linuxbridge_agent(host=host,
bridge_mappings=mappings,
plugin=self.plugin)
class TestMl2HostSegmentMappingMacvtap(TestMl2HostSegmentMappingOVS):
_mechanism_drivers = ['macvtap', 'logger']
def _register_agent(self, host, mappings=None, plugin=None):
helpers.register_macvtap_agent(host=host, interface_mappings=mappings,
plugin=self.plugin)
class TestMl2HostSegmentMappingSriovNicSwitch(TestMl2HostSegmentMappingOVS):
_mechanism_drivers = ['sriovnicswitch', 'logger']
def _register_agent(self, host, mappings=None, plugin=None):
helpers.register_sriovnicswitch_agent(host=host,
device_mappings=mappings,
plugin=self.plugin)
class NoSupportHostSegmentMappingPlugin(db_base_plugin_v2.NeutronDbPluginV2,
db.SegmentDbMixin,
agents_db.AgentDbMixin):
__native_pagination_support = True
__native_sorting_support = True
supported_extension_aliases = []
class TestHostSegmentMappingNoSupportFromPlugin(HostSegmentMappingTestCase):
mock_path = 'neutron.services.segments.db.update_segment_host_mapping'
def setUp(self):
plugin = ('neutron.tests.unit.extensions.test_segment.'
'NoSupportHostSegmentMappingPlugin')
super(TestHostSegmentMappingNoSupportFromPlugin, self).setUp(
plugin=plugin)
@mock.patch(mock_path)
def test_host_segments_not_updated(self, mock):
host = 'host1'
physical_network = 'phys_net1'
with self.network() as network:
network = network['network']
self._test_create_segment(network_id=network['id'],
physical_network=physical_network,
segmentation_id=200,
network_type=p_constants.TYPE_VLAN)
self._register_agent(host, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin)
segments_host_db = self._get_segments_for_host(host)
self.assertFalse(segments_host_db)
self.assertFalse(mock.mock_calls)
class TestMl2HostSegmentMappingAgentServerSynch(HostSegmentMappingTestCase):
_mechanism_drivers = ['openvswitch', 'logger']
mock_path = 'neutron.services.segments.db.update_segment_host_mapping'
@mock.patch(mock_path)
def test_starting_server_processes_agents(self, mock_function):
host = 'agent_updating_starting_server'
physical_network = 'phys_net1'
self._register_agent(host, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin, start_flag=False)
self.assertTrue(host in db.reported_hosts)
self.assertEqual(1, mock_function.call_count)
expected_call = mock.call(mock.ANY, host, set())
mock_function.assert_has_calls([expected_call])
@mock.patch(mock_path)
def test_starting_agent_is_processed(self, mock_function):
host = 'starting_agent'
physical_network = 'phys_net1'
self._register_agent(host, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin, start_flag=False)
self.assertTrue(host in db.reported_hosts)
self._register_agent(host, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin, start_flag=True)
self.assertTrue(host in db.reported_hosts)
self.assertEqual(2, mock_function.call_count)
expected_call = mock.call(mock.ANY, host, set())
mock_function.assert_has_calls([expected_call, expected_call])
@mock.patch(mock_path)
def test_no_starting_agent_is_not_processed(self, mock_function):
host = 'agent_with_no_start_update'
physical_network = 'phys_net1'
self._register_agent(host, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin, start_flag=False)
self.assertTrue(host in db.reported_hosts)
mock_function.reset_mock()
self._register_agent(host, mappings={physical_network: 'br-eth-1'},
plugin=self.plugin, start_flag=False)
self.assertTrue(host in db.reported_hosts)
mock_function.assert_not_called()
class TestSegmentAwareIpam(SegmentTestCase):
def _setup_host_mappings(self, mappings=()):
ctx = context.get_admin_context()
with ctx.session.begin(subtransactions=True):
for segment_id, host in mappings:
record = db.SegmentHostMapping(
segment_id=segment_id,
host=host)
ctx.session.add(record)
def _create_test_segment_with_subnet(self,
network=None,
cidr='2001:db8:0:0::/64',
physnet='physnet'):
"""Creates one network with one segment and one subnet"""
if not network:
with self.network() as network:
pass
segment = self._test_create_segment(
network_id=network['network']['id'],
physical_network=physnet)
ip_version = netaddr.IPNetwork(cidr).version if cidr else None
with self.subnet(network=network,
segment_id=segment['segment']['id'],
ip_version=ip_version,
cidr=cidr) as subnet:
self._validate_l2_adjacency(network['network']['id'],
is_adjacent=False)
return network, segment, subnet
def _create_test_segments_with_subnets(self, num):
"""Creates one network with num segments and num subnets"""
with self.network() as network:
segments, subnets = [], []
for i in range(num):
cidr = '2001:db8:0:%s::/64' % i
physnet = 'physnet%s' % i
_net, segment, subnet = self._create_test_segment_with_subnet(
network=network, cidr=cidr, physnet=physnet)
segments.append(segment)
subnets.append(subnet)
return network, segments, subnets
def test_port_create_with_segment_subnets(self):
"""No binding information is provided, defer IP allocation"""
network, segment, subnet = self._create_test_segment_with_subnet()
response = self._create_port(self.fmt,
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'])
res = self.deserialize(self.fmt, response)
# Don't allocate IPs in this case because we didn't give binding info
self.assertEqual(0, len(res['port']['fixed_ips']))
def _assert_one_ip_in_subnet(self, response, cidr):
res = self.deserialize(self.fmt, response)
self.assertEqual(1, len(res['port']['fixed_ips']))
ip = res['port']['fixed_ips'][0]['ip_address']
ip_net = netaddr.IPNetwork(cidr)
self.assertIn(ip, ip_net)
def test_port_create_with_binding_information(self):
"""Binding information is provided, subnets are on segments"""
network, segments, subnets = self._create_test_segments_with_subnets(3)
# Map the host to the middle segment (by mocking host/segment mapping)
self._setup_host_mappings([
(segments[1]['segment']['id'], 'fakehost'),
(segments[1]['segment']['id'], 'otherhost'),
(segments[0]['segment']['id'], 'thirdhost')])
response = self._create_port(self.fmt,
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'],
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: 'fakehost'})
res = self.deserialize(self.fmt, response)
self._validate_immediate_ip_allocation(res['port']['id'])
# Since host mapped to middle segment, IP must come from middle subnet
self._assert_one_ip_in_subnet(response, subnets[1]['subnet']['cidr'])
def test_port_create_with_binding_and_no_subnets(self):
"""Binding information is provided, no subnets."""
with self.network() as network:
segment = self._test_create_segment(
network_id=network['network']['id'],
physical_network='physnet')
# Map the host to the segment
self._setup_host_mappings([(segment['segment']['id'], 'fakehost')])
response = self._create_port(self.fmt,
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'],
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: 'fakehost'})
res = self.deserialize(self.fmt, response)
# No subnets, so no allocation. But, it shouldn't be an error.
self.assertEqual(0, len(res['port']['fixed_ips']))
def test_port_create_with_binding_information_fallback(self):
"""Binding information is provided, subnets not on segments"""
with self.network() as network:
with self.subnet(network=network,
ip_version=6,
cidr='2001:db8:0:0::/64') as subnet:
segment = self._test_create_segment(
network_id=network['network']['id'],
physical_network='physnet')
self._validate_l2_adjacency(network['network']['id'], is_adjacent=True)
# Map the host to the segment
self._setup_host_mappings([(segment['segment']['id'], 'fakehost')])
response = self._create_port(self.fmt,
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'],
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: 'fakehost'})
res = self.deserialize(self.fmt, response)
self._validate_immediate_ip_allocation(res['port']['id'])
# Since the subnet is not on a segment, fall back to it
self._assert_one_ip_in_subnet(response, subnet['subnet']['cidr'])
def test_port_create_on_unconnected_host(self):
"""Binding information provided, host not connected to any segment"""
network, segment, _subnet = self._create_test_segment_with_subnet()
response = self._create_port(self.fmt,
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'],
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: 'fakehost'})
res = self.deserialize(self.fmt, response)
self.assertEqual(webob.exc.HTTPConflict.code, response.status_int)
self.assertEqual(segment_exc.HostNotConnectedToAnySegment.__name__,
res['NeutronError']['type'])
# Ensure that mapping the segment to other hosts doesn't trip it up
self._setup_host_mappings([(segment['segment']['id'], 'otherhost')])
response = self._create_port(self.fmt,
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'],
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: 'fakehost'})
res = self.deserialize(self.fmt, response)
self.assertEqual(webob.exc.HTTPConflict.code, response.status_int)
self.assertEqual(segment_exc.HostNotConnectedToAnySegment.__name__,
res['NeutronError']['type'])
def test_port_create_on_multiconnected_host(self):
"""Binding information provided, host connected to multiple segments"""
network, segments, subnets = self._create_test_segments_with_subnets(2)
# This host is bound to multiple hosts
self._setup_host_mappings([(segments[0]['segment']['id'], 'fakehost'),
(segments[1]['segment']['id'], 'fakehost')])
response = self._create_port(self.fmt,
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'],
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: 'fakehost'})
res = self.deserialize(self.fmt, response)
self.assertEqual(webob.exc.HTTPConflict.code, response.status_int)
self.assertEqual(segment_exc.HostConnectedToMultipleSegments.__name__,
res['NeutronError']['type'])
def test_port_update_excludes_hosts_on_segments(self):
"""No binding information is provided, subnets on segments"""
with self.network() as network:
segment = self._test_create_segment(
network_id=network['network']['id'],
physical_network='physnet')
# Create a port with no IP address (since there is no subnet)
port = self._create_deferred_ip_port(network)
# Create the subnet and try to update the port to get an IP
with self.subnet(network=network,
segment_id=segment['segment']['id']) as subnet:
# Try requesting an IP (but the only subnet is on a segment)
data = {'port': {
'fixed_ips': [{'subnet_id': subnet['subnet']['id']}]}}
port_id = port['port']['id']
port_req = self.new_update_request('ports', data, port_id)
response = port_req.get_response(self.api)
# Gets bad request because there are no eligible subnets.
self.assertEqual(webob.exc.HTTPBadRequest.code, response.status_int)
def test_port_without_ip_not_deferred(self):
"""Ports without addresses on non-routed networks are not deferred"""
with self.network() as network:
pass
# Create a bound port with no IP address (since there is no subnet)
response = self._create_port(self.fmt,
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'],
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: 'fakehost'})
port = self.deserialize(self.fmt, response)
request = self.new_show_request('ports', port['port']['id'])
response = self.deserialize(self.fmt, request.get_response(self.api))
self.assertEqual(ip_allocation.IP_ALLOCATION_IMMEDIATE,
response['port'][ip_allocation.IP_ALLOCATION])
def test_port_update_is_host_aware(self):
"""Binding information is provided, subnets on segments"""
with self.network() as network:
segment = self._test_create_segment(
network_id=network['network']['id'],
physical_network='physnet')
# Map the host to the segment
self._setup_host_mappings([(segment['segment']['id'], 'fakehost')])
# Create a bound port with no IP address (since there is no subnet)
response = self._create_port(self.fmt,
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'],
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: 'fakehost'})
port = self.deserialize(self.fmt, response)
# Create the subnet and try to update the port to get an IP
with self.subnet(network=network,
segment_id=segment['segment']['id']) as subnet:
self._validate_deferred_ip_allocation(port['port']['id'])
self._validate_l2_adjacency(network['network']['id'],
is_adjacent=False)
# Try requesting an IP (but the only subnet is on a segment)
data = {'port': {
'fixed_ips': [{'subnet_id': subnet['subnet']['id']}]}}
port_id = port['port']['id']
port_req = self.new_update_request('ports', data, port_id)
response = port_req.get_response(self.api)
# Since port is bound and there is a mapping to segment, it succeeds.
self.assertEqual(webob.exc.HTTPOk.code, response.status_int)
self._assert_one_ip_in_subnet(response, subnet['subnet']['cidr'])
def _validate_l2_adjacency(self, network_id, is_adjacent):
request = self.new_show_request('networks', network_id)
response = self.deserialize(self.fmt, request.get_response(self.api))
self.assertEqual(is_adjacent,
response['network'][l2_adjacency.L2_ADJACENCY])
def _validate_deferred_ip_allocation(self, port_id):
request = self.new_show_request('ports', port_id)
response = self.deserialize(self.fmt, request.get_response(self.api))
self.assertEqual(ip_allocation.IP_ALLOCATION_DEFERRED,
response['port'][ip_allocation.IP_ALLOCATION])
ips = response['port']['fixed_ips']
self.assertEqual(0, len(ips))
def _validate_immediate_ip_allocation(self, port_id):
request = self.new_show_request('ports', port_id)
response = self.deserialize(self.fmt, request.get_response(self.api))
self.assertEqual(ip_allocation.IP_ALLOCATION_IMMEDIATE,
response['port'][ip_allocation.IP_ALLOCATION])
ips = response['port']['fixed_ips']
self.assertNotEqual(0, len(ips))
def _create_deferred_ip_port(self, network):
response = self._create_port(self.fmt,
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'])
port = self.deserialize(self.fmt, response)
ips = port['port']['fixed_ips']
self.assertEqual(0, len(ips))
return port
def test_port_update_deferred_allocation(self):
"""Binding information is provided on update, subnets on segments"""
network, segment, subnet = self._create_test_segment_with_subnet()
# Map the host to the segment
self._setup_host_mappings([(segment['segment']['id'], 'fakehost')])
port = self._create_deferred_ip_port(network)
self._validate_deferred_ip_allocation(port['port']['id'])
# Try requesting an IP (but the only subnet is on a segment)
data = {'port': {portbindings.HOST_ID: 'fakehost'}}
port_id = port['port']['id']
port_req = self.new_update_request('ports', data, port_id)
response = port_req.get_response(self.api)
# Port update succeeds and allocates a new IP address.
self.assertEqual(webob.exc.HTTPOk.code, response.status_int)
self._assert_one_ip_in_subnet(response, subnet['subnet']['cidr'])
def test_port_update_deferred_allocation_no_segments(self):
"""Binding information is provided, subnet created after port"""
with self.network() as network:
pass
port = self._create_deferred_ip_port(network)
# Create the subnet and try to update the port to get an IP
with self.subnet(network=network) as subnet:
data = {'port': {portbindings.HOST_ID: 'fakehost'}}
port_id = port['port']['id']
port_req = self.new_update_request('ports', data, port_id)
response = port_req.get_response(self.api)
self.assertEqual(webob.exc.HTTPOk.code, response.status_int)
self._assert_one_ip_in_subnet(response, subnet['subnet']['cidr'])
def test_port_update_deferred_allocation_no_segments_manual_alloc(self):
"""Binding information is provided, subnet created after port"""
with self.network() as network:
pass
port = self._create_deferred_ip_port(network)
# Create the subnet and try to update the port to get an IP
with self.subnet(network=network) as subnet:
data = {'port': {
portbindings.HOST_ID: 'fakehost',
'fixed_ips': [{'subnet_id': subnet['subnet']['id']}]}}
port_id = port['port']['id']
port_req = self.new_update_request('ports', data, port_id)
response = port_req.get_response(self.api)
self.assertEqual(webob.exc.HTTPOk.code, response.status_int)
self._assert_one_ip_in_subnet(response, subnet['subnet']['cidr'])
# Do a show to be sure that only one IP is recorded
port_req = self.new_show_request('ports', port_id)
response = port_req.get_response(self.api)
self.assertEqual(webob.exc.HTTPOk.code, response.status_int)
self._assert_one_ip_in_subnet(response, subnet['subnet']['cidr'])
def test_port_update_deferred_allocation_no_segments_empty_alloc(self):
"""Binding information is provided, subnet created after port"""
with self.network() as network:
pass
port = self._create_deferred_ip_port(network)
# Create the subnet and update the port but specify no IPs
with self.subnet(network=network):
data = {'port': {
portbindings.HOST_ID: 'fakehost',
'fixed_ips': []}}
port_id = port['port']['id']
port_req = self.new_update_request('ports', data, port_id)
response = port_req.get_response(self.api)
self.assertEqual(webob.exc.HTTPOk.code, response.status_int)
res = self.deserialize(self.fmt, response)
# Since I specifically requested no IP addresses, I shouldn't get one.
self.assertEqual(0, len(res['port']['fixed_ips']))
def test_port_update_deferred_allocation_no_host_mapping(self):
"""Binding information is provided on update, subnets on segments"""
network, segment, subnet = self._create_test_segment_with_subnet()
port = self._create_deferred_ip_port(network)
self._validate_deferred_ip_allocation(port['port']['id'])
# Try requesting an IP (but the only subnet is on a segment)
data = {'port': {portbindings.HOST_ID: 'fakehost'}}
port_id = port['port']['id']
port_req = self.new_update_request('ports', data, port_id)
response = port_req.get_response(self.api)
res = self.deserialize(self.fmt, response)
# Gets conflict because it can't map the host to a segment
self.assertEqual(webob.exc.HTTPConflict.code, response.status_int)
self.assertEqual(segment_exc.HostNotConnectedToAnySegment.__name__,
res['NeutronError']['type'])
def test_port_update_deferred_allocation_multiple_host_mapping(self):
"""Binding information is provided on update, subnets on segments"""
network, segments, _s = self._create_test_segments_with_subnets(2)
port = self._create_deferred_ip_port(network)
self._validate_deferred_ip_allocation(port['port']['id'])
# This host is bound to multiple segments
self._setup_host_mappings([(segments[0]['segment']['id'], 'fakehost'),
(segments[1]['segment']['id'], 'fakehost')])
# Try requesting an IP (but the only subnet is on a segment)
data = {'port': {portbindings.HOST_ID: 'fakehost'}}
port_id = port['port']['id']
port_req = self.new_update_request('ports', data, port_id)
response = port_req.get_response(self.api)
res = self.deserialize(self.fmt, response)
# Gets conflict because it can't map the host to a segment
self.assertEqual(webob.exc.HTTPConflict.code, response.status_int)
self.assertEqual(segment_exc.HostConnectedToMultipleSegments.__name__,
res['NeutronError']['type'])
def test_port_update_allocate_no_segments(self):
"""Binding information is provided, subnet created after port"""
with self.network() as network:
pass
# Create a bound port with no IP address (since there is not subnet)
port = self._create_deferred_ip_port(network)
# Create the subnet and try to update the port to get an IP
with self.subnet(network=network) as subnet:
# Try requesting an IP (but the only subnet is on a segment)
data = {'port': {
'fixed_ips': [{'subnet_id': subnet['subnet']['id']}]}}
port_id = port['port']['id']
port_req = self.new_update_request('ports', data, port_id)
response = port_req.get_response(self.api)
# Since port is bound and there is a mapping to segment, it succeeds.
self.assertEqual(webob.exc.HTTPOk.code, response.status_int)
self._assert_one_ip_in_subnet(response, subnet['subnet']['cidr'])
def test_port_update_deferred_allocation_no_ips(self):
"""Binding information is provided on update, subnets on segments"""
network, segments, subnets = self._create_test_segments_with_subnets(2)
self._setup_host_mappings([(segments[0]['segment']['id'], 'fakehost2'),
(segments[1]['segment']['id'], 'fakehost')])
port = self._create_deferred_ip_port(network)
# Update the subnet on the second segment to be out of IPs
subnet_data = {'subnet': {'allocation_pools': []}}
subnet_req = self.new_update_request('subnets',
subnet_data,
subnets[1]['subnet']['id'])
subnet_response = subnet_req.get_response(self.api)
res = self.deserialize(self.fmt, subnet_response)
# Try requesting an IP (but the subnet ran out of ips)
data = {'port': {portbindings.HOST_ID: 'fakehost'}}
port_id = port['port']['id']
port_req = self.new_update_request('ports', data, port_id)
response = port_req.get_response(self.api)
res = self.deserialize(self.fmt, response)
# Since port is bound and there is a mapping to segment, it succeeds.
self.assertEqual(webob.exc.HTTPConflict.code, response.status_int)
self.assertEqual(n_exc.IpAddressGenerationFailure.__name__,
res['NeutronError']['type'])
def test_port_update_fails_if_host_on_wrong_segment(self):
"""Update a port with existing IPs to a host where they don't work"""
network, segments, subnets = self._create_test_segments_with_subnets(2)
self._setup_host_mappings([(segments[0]['segment']['id'], 'fakehost2'),
(segments[1]['segment']['id'], 'fakehost')])
# Create a bound port with an IP address
response = self._create_port(self.fmt,
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'],
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: 'fakehost'})
self._assert_one_ip_in_subnet(response, subnets[1]['subnet']['cidr'])
port = self.deserialize(self.fmt, response)
# Now, try to update binding to a host on the other segment
data = {'port': {portbindings.HOST_ID: 'fakehost2'}}
port_req = self.new_update_request('ports', data, port['port']['id'])
response = port_req.get_response(self.api)
# It fails since the IP address isn't compatible with the new segment
self.assertEqual(webob.exc.HTTPConflict.code, response.status_int)
def test_port_update_fails_if_host_on_good_segment(self):
"""Update a port with existing IPs to a host where they don't work"""
network, segments, subnets = self._create_test_segments_with_subnets(2)
self._setup_host_mappings([(segments[0]['segment']['id'], 'fakehost2'),
(segments[1]['segment']['id'], 'fakehost1'),
(segments[1]['segment']['id'], 'fakehost')])
# Create a bound port with an IP address
response = self._create_port(self.fmt,
net_id=network['network']['id'],
tenant_id=network['network']['tenant_id'],
arg_list=(portbindings.HOST_ID,),
**{portbindings.HOST_ID: 'fakehost'})
self._assert_one_ip_in_subnet(response, subnets[1]['subnet']['cidr'])
port = self.deserialize(self.fmt, response)
# Now, try to update binding to another host in same segment
data = {'port': {portbindings.HOST_ID: 'fakehost1'}}
port_req = self.new_update_request('ports', data, port['port']['id'])
response = port_req.get_response(self.api)
# Since the new host is in the same segment, it succeeds.
self.assertEqual(webob.exc.HTTPOk.code, response.status_int)
class TestSegmentAwareIpamML2(TestSegmentAwareIpam):
def setUp(self):
super(TestSegmentAwareIpamML2, self).setUp(plugin='ml2')
class TestDhcpAgentSegmentScheduling(HostSegmentMappingTestCase):
_mechanism_drivers = ['openvswitch', 'logger']
mock_path = 'neutron.services.segments.db.update_segment_host_mapping'
def setUp(self):
super(TestDhcpAgentSegmentScheduling, self).setUp()
self.dhcp_agent_db = agentschedulers_db.DhcpAgentSchedulerDbMixin()
self.ctx = context.get_admin_context()
def _test_create_network_and_segment(self, phys_net):
with self.network() as net:
network = net['network']
segment = self._test_create_segment(network_id=network['id'],
physical_network=phys_net,
segmentation_id=200,
network_type='vxlan')
dhcp_agents = self.dhcp_agent_db.get_dhcp_agents_hosting_networks(
self.ctx, [network['id']])
self.assertEqual(0, len(dhcp_agents))
return network, segment['segment']
def _test_create_subnet(self, network, segment, cidr=None,
enable_dhcp=True):
cidr = cidr or '10.0.0.0/24'
ip_version = 4
with self.subnet(network={'network': network},
segment_id=segment['id'],
ip_version=ip_version,
cidr=cidr,
enable_dhcp=enable_dhcp) as subnet:
pass
return subnet['subnet']
def _register_dhcp_agents(self, hosts=None):
hosts = hosts or [DHCP_HOSTA, DHCP_HOSTB]
for host in hosts:
helpers.register_dhcp_agent(host)
def test_network_scheduling_on_segment_creation(self):
self._register_dhcp_agents()
self._test_create_network_and_segment('phys_net1')
def test_segment_scheduling_no_host_mapping(self):
self._register_dhcp_agents()
network, segment = self._test_create_network_and_segment('phys_net1')
self._test_create_subnet(network, segment)
dhcp_agents = self.dhcp_agent_db.get_dhcp_agents_hosting_networks(
self.ctx, [network['id']])
self.assertEqual(0, len(dhcp_agents))
def test_segment_scheduling_with_host_mapping(self):
phys_net1 = 'phys_net1'
self._register_dhcp_agents()
network, segment = self._test_create_network_and_segment(phys_net1)
self._register_agent(DHCP_HOSTA,
mappings={phys_net1: 'br-eth-1'},
plugin=self.plugin)
self._test_create_subnet(network, segment)
dhcp_agents = self.dhcp_agent_db.get_dhcp_agents_hosting_networks(
self.ctx, [network['id']])
self.assertEqual(1, len(dhcp_agents))
self.assertEqual(DHCP_HOSTA, dhcp_agents[0]['host'])
def test_segment_scheduling_with_multiple_host_mappings(self):
phys_net1 = 'phys_net1'
phys_net2 = 'phys_net2'
self._register_dhcp_agents([DHCP_HOSTA, DHCP_HOSTB, 'MEHA', 'MEHB'])
network, segment1 = self._test_create_network_and_segment(phys_net1)
segment2 = self._test_create_segment(network_id=network['id'],
physical_network=phys_net2,
segmentation_id=200,
network_type='vxlan')['segment']
self._register_agent(DHCP_HOSTA,
mappings={phys_net1: 'br-eth-1'},
plugin=self.plugin)
self._register_agent(DHCP_HOSTB,
mappings={phys_net2: 'br-eth-1'},
plugin=self.plugin)
self._test_create_subnet(network, segment1)
self._test_create_subnet(network, segment2, cidr='11.0.0.0/24')
dhcp_agents = self.dhcp_agent_db.get_dhcp_agents_hosting_networks(
self.ctx, [network['id']])
self.assertEqual(2, len(dhcp_agents))
agent_hosts = [agent['host'] for agent in dhcp_agents]
self.assertIn(DHCP_HOSTA, agent_hosts)
self.assertIn(DHCP_HOSTB, agent_hosts)