diff --git a/neutron/callbacks/resources.py b/neutron/callbacks/resources.py index 14ec195e278..defe1e36545 100644 --- a/neutron/callbacks/resources.py +++ b/neutron/callbacks/resources.py @@ -26,6 +26,7 @@ ROUTER_INTERFACE = 'router_interface' SECURITY_GROUP = 'security_group' SECURITY_GROUP_RULE = 'security_group_rule' SEGMENT = 'segment' +SEGMENT_HOST_MAPPING = 'segment_host_mapping' SUBNET = 'subnet' SUBNETS = 'subnets' SUBNET_GATEWAY = 'subnet_gateway' diff --git a/neutron/common/exceptions.py b/neutron/common/exceptions.py index 81151579baa..2553c05e266 100644 --- a/neutron/common/exceptions.py +++ b/neutron/common/exceptions.py @@ -41,6 +41,10 @@ class NetworkQosBindingNotFound(e.NotFound): "could not be found.") +class PlacementEndpointNotFound(e.NotFound): + message = _("Placement API endpoint not found") + + class PlacementResourceProviderNotFound(e.NotFound): message = _("Placement resource provider not found %(resource_provider)s.") diff --git a/neutron/services/segments/db.py b/neutron/services/segments/db.py index 8cc4ee595f7..e4134eba230 100644 --- a/neutron/services/segments/db.py +++ b/neutron/services/segments/db.py @@ -275,6 +275,9 @@ def _update_segment_host_mapping_for_agent(resource, event, trigger, segment['id'] for segment in segments if check_segment_for_agent(segment, agent)} update_segment_host_mapping(context, host, current_segment_ids) + registry.notify(resources.SEGMENT_HOST_MAPPING, events.AFTER_CREATE, + plugin, context=context, host=host, + current_segment_ids=current_segment_ids) def _add_segment_host_mapping_for_segment(resource, event, trigger, diff --git a/neutron/services/segments/placement_client.py b/neutron/services/segments/placement_client.py index 5c92680112b..3fd683ac440 100644 --- a/neutron/services/segments/placement_client.py +++ b/neutron/services/segments/placement_client.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import functools + from keystoneauth1 import exceptions as ks_exc from keystoneauth1 import loading as ks_loading from keystoneauth1 import session @@ -27,6 +29,16 @@ LOG = logging.getLogger(__name__) PLACEMENT_API_WITH_AGGREGATES = 'placement 1.1' +def check_placement_api_available(f): + @functools.wraps(f) + def wrapper(self, *a, **k): + try: + return f(self, *a, **k) + except ks_exc.EndpointNotFound: + raise n_exc.PlacementEndpointNotFound() + return wrapper + + class PlacementAPIClient(object): """Client class for placement ReST API.""" @@ -55,6 +67,7 @@ class PlacementAPIClient(object): return self._client.delete(url, endpoint_filter=self.ks_filter, **kwargs) + @check_placement_api_available def create_resource_provider(self, resource_provider): """Create a resource provider. @@ -64,6 +77,7 @@ class PlacementAPIClient(object): url = '/resource_providers' self._post(url, resource_provider) + @check_placement_api_available def delete_resource_provider(self, resource_provider_uuid): """Delete a resource provider. @@ -73,6 +87,7 @@ class PlacementAPIClient(object): url = '/resource_providers/%s' % resource_provider_uuid self._delete(url) + @check_placement_api_available def create_inventory(self, resource_provider_uuid, inventory): """Create an inventory. @@ -86,6 +101,7 @@ class PlacementAPIClient(object): url = '/resource_providers/%s/inventories' % resource_provider_uuid self._post(url, inventory) + @check_placement_api_available def get_inventory(self, resource_provider_uuid, resource_class): """Get resource provider inventory. @@ -112,6 +128,7 @@ class PlacementAPIClient(object): else: raise + @check_placement_api_available def update_inventory(self, resource_provider_uuid, inventory, resource_class): """Update an inventory. @@ -134,6 +151,7 @@ class PlacementAPIClient(object): resource_provider=resource_provider_uuid, resource_class=resource_class) + @check_placement_api_available def associate_aggregates(self, resource_provider_uuid, aggregates): """Associate a list of aggregates with a resource provider. @@ -147,6 +165,7 @@ class PlacementAPIClient(object): headers={'openstack-api-version': PLACEMENT_API_WITH_AGGREGATES}) + @check_placement_api_available def list_aggregates(self, resource_provider_uuid): """List resource provider aggregates. diff --git a/neutron/services/segments/plugin.py b/neutron/services/segments/plugin.py index d9146cc236d..0a1afb68583 100644 --- a/neutron/services/segments/plugin.py +++ b/neutron/services/segments/plugin.py @@ -14,20 +14,37 @@ # License for the specific language governing permissions and limitations # under the License. +from keystoneauth1 import loading as ks_loading +import netaddr +from neutron_lib import constants from neutron_lib.plugins import directory +from novaclient import client as nova_client +from novaclient import exceptions as nova_exc +from oslo_config import cfg +from oslo_log import log -from neutron._i18n import _ +from neutron._i18n import _, _LE, _LI from neutron.api.v2 import attributes from neutron.callbacks import events from neutron.callbacks import registry from neutron.callbacks import resources +from neutron.common import exceptions as n_exc from neutron.db import common_db_mixin from neutron.db import models_v2 from neutron.extensions import ip_allocation from neutron.extensions import l2_adjacency from neutron.extensions import segment +from neutron.notifiers import batch_notifier from neutron.services.segments import db from neutron.services.segments import exceptions +from neutron.services.segments import placement_client + +LOG = log.getLogger(__name__) + +NOVA_API_VERSION = '2.41' +IPV4_RESOURCE_CLASS = 'IPV4_ADDRESS' +SEGMENT_NAME_STUB = 'Neutron segment id %s' +MAX_INVENTORY_UPDATE_RETRIES = 10 def _extend_network_dict_binding(plugin, network_res, network_db): @@ -67,6 +84,7 @@ class Plugin(db.SegmentDbMixin, segment.SegmentPluginBase): attributes.SUBNETS, [_extend_subnet_dict_binding]) common_db_mixin.CommonDbMixin.register_dict_extend_funcs( attributes.PORTS, [_extend_port_dict_binding]) + self.nova_updater = NovaSegmentNotifier() registry.subscribe( self._prevent_segment_delete_with_subnet_associated, @@ -90,3 +108,297 @@ class Plugin(db.SegmentDbMixin, segment.SegmentPluginBase): reason = _("The segment is still associated with subnet(s) " "%s") % ", ".join(subnet_ids) raise exceptions.SegmentInUse(segment_id=segment_id, reason=reason) + + +class Event(object): + + def __init__(self, method, segment_ids, total=None, reserved=None, + segment_host_mappings=None, host=None): + self.method = method + if isinstance(segment_ids, set): + self.segment_ids = segment_ids + else: + self.segment_id = segment_ids + self.total = total + self.reserved = reserved + self.segment_host_mappings = segment_host_mappings + self.host = host + + +class NovaSegmentNotifier(object): + + def __init__(self): + self.p_client, self.n_client = self._get_clients() + self.batch_notifier = batch_notifier.BatchNotifier( + cfg.CONF.send_events_interval, self._send_notifications) + + registry.subscribe(self._notify_subnet_created, resources.SUBNET, + events.AFTER_CREATE) + registry.subscribe(self._notify_subnet_updated, resources.SUBNET, + events.AFTER_UPDATE) + registry.subscribe(self._notify_subnet_deleted, resources.SUBNET, + events.AFTER_DELETE) + registry.subscribe(self._notify_host_addition_to_aggregate, + resources.SEGMENT_HOST_MAPPING, events.AFTER_CREATE) + registry.subscribe(self._notify_port_created_or_deleted, + resources.PORT, events.AFTER_CREATE) + registry.subscribe(self._notify_port_updated, resources.PORT, + events.AFTER_UPDATE) + registry.subscribe(self._notify_port_created_or_deleted, + resources.PORT, events.AFTER_DELETE) + + def _get_clients(self): + p_client = placement_client.PlacementAPIClient() + + n_auth = ks_loading.load_auth_from_conf_options(cfg.CONF, 'nova') + n_session = ks_loading.load_session_from_conf_options( + cfg.CONF, + 'nova', + auth=n_auth) + extensions = [ + ext for ext in nova_client.discover_extensions(NOVA_API_VERSION) + if ext.name == "server_external_events"] + n_client = nova_client.Client( + NOVA_API_VERSION, + session=n_session, + region_name=cfg.CONF.nova.region_name, + endpoint_type=cfg.CONF.nova.endpoint_type, + extensions=extensions) + + return p_client, n_client + + def _send_notifications(self, batched_events): + for event in batched_events: + try: + event.method(event) + except n_exc.PlacementEndpointNotFound: + LOG.debug('Placement API was not found when trying to ' + 'update routed networks IPv4 inventories') + return + + def _notify_subnet_created(self, resource, event, trigger, context, + subnet, **kwargs): + segment_id = subnet.get('segment_id') + if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4: + return + total, reserved = self._calculate_inventory_total_and_reserved(subnet) + if total: + query = context.session.query( + db.SegmentHostMapping).filter_by(segment_id=segment_id) + self.batch_notifier.queue_event(Event( + self._create_or_update_nova_inventory, segment_id, total=total, + reserved=reserved, segment_host_mappings=query.all())) + + def _create_or_update_nova_inventory(self, event): + try: + self._update_nova_inventory(event) + except n_exc.PlacementResourceProviderNotFound: + self._create_nova_inventory(event.segment_id, event.total, + event.reserved, + event.segment_host_mappings) + + def _update_nova_inventory(self, event): + for count in range(MAX_INVENTORY_UPDATE_RETRIES): + ipv4_inventory = self.p_client.get_inventory(event.segment_id, + IPV4_RESOURCE_CLASS) + if event.total: + ipv4_inventory['total'] += event.total + if event.reserved: + ipv4_inventory['reserved'] += event.reserved + try: + self.p_client.update_inventory(event.segment_id, + ipv4_inventory, + IPV4_RESOURCE_CLASS) + return + except n_exc.PlacementInventoryUpdateConflict: + LOG.debug('Re-trying to update Nova IPv4 inventory for ' + 'routed network segment: %s', event.segment_id) + LOG.error(_LE('Failed to update Nova IPv4 inventory for routed ' + 'network segment: %s'), event.segment_id) + + def _create_nova_inventory(self, segment_id, total, reserved, + segment_host_mappings): + name = SEGMENT_NAME_STUB % segment_id + resource_provider = {'name': name, 'uuid': segment_id} + self.p_client.create_resource_provider(resource_provider) + aggregate = self.n_client.aggregates.create(name, None) + self.p_client.associate_aggregates(segment_id, [aggregate.uuid]) + for mapping in segment_host_mappings: + self.n_client.aggregates.add_host(aggregate.id, mapping['host']) + ipv4_inventory = {'total': total, 'reserved': reserved, + 'min_unit': 1, 'max_unit': 1, 'step_size': 1, + 'allocation_ratio': 1.0, + 'resource_class': IPV4_RESOURCE_CLASS} + self.p_client.create_inventory(segment_id, ipv4_inventory) + + def _calculate_inventory_total_and_reserved(self, subnet): + total = 0 + reserved = 0 + allocation_pools = subnet.get('allocation_pools') or [] + for pool in allocation_pools: + total += int(netaddr.IPAddress(pool['end']) - + netaddr.IPAddress(pool['start'])) + 1 + if total: + if subnet['gateway_ip']: + total += 1 + reserved += 1 + if subnet['enable_dhcp']: + reserved += 1 + return total, reserved + + def _notify_subnet_updated(self, resource, event, trigger, context, + subnet, original_subnet, **kwargs): + segment_id = subnet.get('segment_id') + if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4: + return + filters = {'segment_id': [segment_id], + 'ip_version': [constants.IP_VERSION_4]} + if not subnet['allocation_pools']: + plugin = directory.get_plugin() + alloc_pools = [s['allocation_pools'] for s in + plugin.get_subnets(context, filters=filters)] + if not any(alloc_pools): + self.batch_notifier.queue_event(Event( + self._delete_nova_inventory, segment_id)) + return + original_total, original_reserved = ( + self._calculate_inventory_total_and_reserved(original_subnet)) + updated_total, updated_reserved = ( + self._calculate_inventory_total_and_reserved(subnet)) + total = updated_total - original_total + reserved = updated_reserved - original_reserved + if total or reserved: + segment_host_mappings = None + if not original_subnet['allocation_pools']: + segment_host_mappings = context.session.query( + db.SegmentHostMapping).filter_by( + segment_id=segment_id).all() + self.batch_notifier.queue_event(Event( + self._create_or_update_nova_inventory, segment_id, total=total, + reserved=reserved, + segment_host_mappings=segment_host_mappings)) + + def _notify_subnet_deleted(self, resource, event, trigger, context, + subnet, **kwargs): + segment_id = subnet.get('segment_id') + if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4: + return + total, reserved = self._calculate_inventory_total_and_reserved(subnet) + if total: + filters = {'segment_id': [segment_id], 'ip_version': [4]} + plugin = directory.get_plugin() + if plugin.get_subnets_count(context, filters=filters) > 0: + self.batch_notifier.queue_event(Event( + self._update_nova_inventory, segment_id, total=-total, + reserved=-reserved)) + else: + self.batch_notifier.queue_event(Event( + self._delete_nova_inventory, segment_id)) + + def _get_aggregate_id(self, segment_id): + aggregate_uuid = self.p_client.list_aggregates( + segment_id)['aggregates'][0] + aggregates = self.n_client.aggregates.list() + for aggregate in aggregates: + if aggregate.uuid == aggregate_uuid: + return aggregate.id + + def _delete_nova_inventory(self, event): + aggregate_id = self._get_aggregate_id(event.segment_id) + aggregate = self.n_client.aggregates.get_details( + aggregate_id) + for host in aggregate.hosts: + self.n_client.aggregates.remove_host(aggregate_id, + host) + self.n_client.aggregates.delete(aggregate_id) + self.p_client.delete_resource_provider(event.segment_id) + + def _notify_host_addition_to_aggregate(self, resource, event, trigger, + context, host, current_segment_ids, + **kwargs): + query = context.session.query(models_v2.Subnet).filter( + models_v2.Subnet.segment_id.in_(current_segment_ids)) + segment_ids = {subnet['segment_id'] for subnet in query} + self.batch_notifier.queue_event(Event(self._add_host_to_aggregate, + segment_ids, host=host)) + + def _add_host_to_aggregate(self, event): + for segment_id in event.segment_ids: + try: + aggregate_id = self._get_aggregate_id(segment_id) + except n_exc.PlacementAggregateNotFound: + LOG.info(_LI('When adding host %(host)s, aggregate not found ' + 'for routed network segment %(segment_id)s'), + {'host': event.host, 'segment_id': segment_id}) + continue + + try: + self.n_client.aggregates.add_host(aggregate_id, event.host) + except nova_exc.Conflict: + LOG.info(_LI('Host %(host)s already exists in aggregate for ' + 'routed network segment %(segment_id)s'), + {'host': event.host, 'segment_id': segment_id}) + + def _notify_port_created_or_deleted(self, resource, event, trigger, + context, port, **kwargs): + if not self._does_port_require_nova_inventory_update(port): + return + ipv4_subnets_number, segment_id = ( + self._get_ipv4_subnets_number_and_segment_id(port, context)) + if segment_id: + if event == events.AFTER_DELETE: + ipv4_subnets_number = -ipv4_subnets_number + self.batch_notifier.queue_event(Event(self._update_nova_inventory, + segment_id, reserved=ipv4_subnets_number)) + + def _notify_port_updated(self, resource, event, trigger, context, + **kwargs): + port = kwargs.get('port') + original_port = kwargs.get('original_port') + does_original_port_require_nova_inventory_update = ( + self._does_port_require_nova_inventory_update(original_port)) + does_port_require_nova_inventory_update = ( + self._does_port_require_nova_inventory_update(port)) + if not (does_original_port_require_nova_inventory_update or + does_port_require_nova_inventory_update): + return + original_port_ipv4_subnets_number, segment_id = ( + self._get_ipv4_subnets_number_and_segment_id(original_port, + context)) + if not segment_id: + return + port_ipv4_subnets_number = len(self._get_ipv4_subnet_ids(port)) + if not does_original_port_require_nova_inventory_update: + original_port_ipv4_subnets_number = 0 + if not does_port_require_nova_inventory_update: + port_ipv4_subnets_number = 0 + update = port_ipv4_subnets_number - original_port_ipv4_subnets_number + if update: + self.batch_notifier.queue_event(Event(self._update_nova_inventory, + segment_id, reserved=update)) + + def _get_ipv4_subnets_number_and_segment_id(self, port, context): + ipv4_subnet_ids = self._get_ipv4_subnet_ids(port) + if not ipv4_subnet_ids: + return 0, None + segment_id = context.session.query( + models_v2.Subnet).filter_by(id=ipv4_subnet_ids[0]).one()[ + 'segment_id'] + if not segment_id: + return 0, None + return len(ipv4_subnet_ids), segment_id + + def _does_port_require_nova_inventory_update(self, port): + device_owner = port.get('device_owner') + if (device_owner.startswith(constants.DEVICE_OWNER_COMPUTE_PREFIX) or + device_owner == constants.DEVICE_OWNER_DHCP): + return False + return True + + def _get_ipv4_subnet_ids(self, port): + ipv4_subnet_ids = [] + for ip in port.get('fixed_ips', []): + if netaddr.IPAddress( + ip['ip_address']).version == constants.IP_VERSION_4: + ipv4_subnet_ids.append(ip['subnet_id']) + return ipv4_subnet_ids diff --git a/neutron/tests/unit/extensions/test_segment.py b/neutron/tests/unit/extensions/test_segment.py index 10855c65466..be04b901513 100644 --- a/neutron/tests/unit/extensions/test_segment.py +++ b/neutron/tests/unit/extensions/test_segment.py @@ -12,11 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. +import copy from keystoneauth1 import exceptions as ks_exc + import mock import netaddr from neutron_lib import constants from neutron_lib import exceptions as n_exc +from neutron_lib.plugins import directory +from novaclient import exceptions as nova_exc from oslo_config import cfg from oslo_utils import uuidutils import webob.exc @@ -44,6 +48,7 @@ from neutron.plugins.ml2 import config from neutron.services.segments import db from neutron.services.segments import exceptions as segment_exc from neutron.services.segments import placement_client +from neutron.services.segments import plugin as seg_plugin from neutron.tests import base from neutron.tests.common import helpers from neutron.tests.unit.db import test_db_base_plugin_v2 @@ -796,7 +801,8 @@ class TestMl2HostSegmentMappingAgentServerSynch(HostSegmentMappingTestCase): mock_function.assert_not_called() -class TestSegmentAwareIpam(SegmentTestCase): +class SegmentAwareIpamTestCase(SegmentTestCase): + def _setup_host_mappings(self, mappings=()): ctx = context.get_admin_context() for segment_id, host in mappings: @@ -808,6 +814,13 @@ class TestSegmentAwareIpam(SegmentTestCase): cidr='2001:db8:0:0::/64', physnet='physnet'): """Creates one network with one segment and one subnet""" + network, segment = self._create_test_network_and_segment(network, + physnet) + subnet = self._create_test_subnet_with_segment(network, segment, cidr) + return network, segment, subnet + + def _create_test_network_and_segment(self, network=None, + physnet='physnet'): if not network: with self.network() as network: pass @@ -816,15 +829,29 @@ class TestSegmentAwareIpam(SegmentTestCase): network_id=network['network']['id'], physical_network=physnet, network_type=p_constants.TYPE_VLAN) + return network, segment + def _create_test_subnet_with_segment(self, network, segment, + cidr='2001:db8:0:0::/64', + allocation_pools=None): ip_version = netaddr.IPNetwork(cidr).version if cidr else None with self.subnet(network=network, segment_id=segment['segment']['id'], ip_version=ip_version, - cidr=cidr) as subnet: + cidr=cidr, + allocation_pools=allocation_pools) as subnet: self._validate_l2_adjacency(network['network']['id'], is_adjacent=False) - return network, segment, subnet + return subnet + + def _validate_l2_adjacency(self, network_id, is_adjacent): + request = self.new_show_request('networks', network_id) + response = self.deserialize(self.fmt, request.get_response(self.api)) + self.assertEqual(is_adjacent, + response['network'][l2_adjacency.L2_ADJACENCY]) + + +class TestSegmentAwareIpam(SegmentAwareIpamTestCase): def _create_test_segments_with_subnets(self, num): """Creates one network with num segments and num subnets""" @@ -1101,12 +1128,6 @@ class TestSegmentAwareIpam(SegmentTestCase): self.assertEqual(webob.exc.HTTPOk.code, response.status_int) self._assert_one_ip_in_subnet(response, subnet['subnet']['cidr']) - def _validate_l2_adjacency(self, network_id, is_adjacent): - request = self.new_show_request('networks', network_id) - response = self.deserialize(self.fmt, request.get_response(self.api)) - self.assertEqual(is_adjacent, - response['network'][l2_adjacency.L2_ADJACENCY]) - def _validate_deferred_ip_allocation(self, port_id): request = self.new_show_request('ports', port_id) response = self.deserialize(self.fmt, request.get_response(self.api)) @@ -1394,6 +1415,562 @@ class TestSegmentAwareIpamML2(TestSegmentAwareIpam): super(TestSegmentAwareIpamML2, self).setUp(plugin='ml2') +class TestNovaSegmentNotifier(SegmentAwareIpamTestCase): + _mechanism_drivers = ['openvswitch', 'logger'] + + def setUp(self): + config.cfg.CONF.set_override('mechanism_drivers', + self._mechanism_drivers, + group='ml2') + config.cfg.CONF.set_override('network_vlan_ranges', + ['physnet:200:209', 'physnet0:200:209', + 'physnet1:200:209', 'physnet2:200:209'], + group='ml2_type_vlan') + super(TestNovaSegmentNotifier, self).setUp(plugin='ml2') + self.segments_plugin = directory.get_plugin(ext_segment.SEGMENTS) + + nova_updater = self.segments_plugin.nova_updater + nova_updater.p_client = mock.MagicMock() + self.mock_p_client = nova_updater.p_client + nova_updater.n_client = mock.MagicMock() + self.mock_n_client = nova_updater.n_client + self.batch_notifier = nova_updater.batch_notifier + self.batch_notifier._waiting_to_send = True + + def _calculate_inventory_total_and_reserved(self, subnet): + total = 0 + reserved = 0 + allocation_pools = subnet.get('allocation_pools') or [] + for pool in allocation_pools: + total += int(netaddr.IPAddress(pool['end']) - + netaddr.IPAddress(pool['start'])) + 1 + if total: + if subnet['gateway_ip']: + total += 1 + reserved += 1 + if subnet['enable_dhcp']: + reserved += 1 + return total, reserved + + def _assert_inventory_creation(self, segment_id, aggregate, subnet): + self.batch_notifier._notify() + self.mock_p_client.get_inventory.assert_called_with( + segment_id, seg_plugin.IPV4_RESOURCE_CLASS) + self.mock_p_client.update_inventory.assert_not_called() + name = seg_plugin.SEGMENT_NAME_STUB % segment_id + resource_provider = {'name': name, 'uuid': segment_id} + self.mock_p_client.create_resource_provider.assert_called_with( + resource_provider) + self.mock_n_client.aggregates.create.assert_called_with(name, None) + self.mock_p_client.associate_aggregates.assert_called_with( + segment_id, [aggregate.uuid]) + self.mock_n_client.aggregates.add_host.assert_called_with(aggregate.id, + 'fakehost') + total, reserved = self._calculate_inventory_total_and_reserved( + subnet['subnet']) + inventory, _ = self._get_inventory(total, reserved) + self.mock_p_client.create_inventory.assert_called_with( + segment_id, inventory) + self.assertEqual( + inventory['total'], + self.mock_p_client.create_inventory.call_args[0][1]['total']) + self.assertEqual( + inventory['reserved'], + self.mock_p_client.create_inventory.call_args[0][1]['reserved']) + self.mock_p_client.reset_mock() + self.mock_p_client.get_inventory.side_effect = None + self.mock_n_client.reset_mock() + + def _test_first_subnet_association_with_segment(self, cidr='10.0.0.0/24', + allocation_pools=None): + network, segment = self._create_test_network_and_segment() + segment_id = segment['segment']['id'] + self._setup_host_mappings([(segment_id, 'fakehost')]) + self.mock_p_client.get_inventory.side_effect = ( + neutron_exc.PlacementResourceProviderNotFound( + resource_provider=segment_id, + resource_class=seg_plugin.IPV4_RESOURCE_CLASS)) + aggregate = mock.MagicMock() + aggregate.uuid = uuidutils.generate_uuid() + aggregate.id = 1 + self.mock_n_client.aggregates.create.return_value = aggregate + subnet = self._create_test_subnet_with_segment( + network, segment, cidr=cidr, allocation_pools=allocation_pools) + self._assert_inventory_creation(segment_id, aggregate, subnet) + return network, segment, subnet + + def test_first_subnet_association_with_segment(self): + self._test_first_subnet_association_with_segment() + + def _assert_inventory_update(self, segment_id, inventory, subnet=None, + original_subnet=None): + self.batch_notifier._notify() + self.mock_p_client.get_inventory.assert_called_with( + segment_id, seg_plugin.IPV4_RESOURCE_CLASS) + original_total = original_reserved = total = reserved = 0 + if original_subnet: + original_total, original_reserved = ( + self._calculate_inventory_total_and_reserved(original_subnet)) + if subnet: + total, reserved = self._calculate_inventory_total_and_reserved( + subnet) + inventory['total'] += total - original_total + inventory['reserved'] += reserved - original_reserved + self.mock_p_client.update_inventory.assert_called_with(segment_id, + inventory, seg_plugin.IPV4_RESOURCE_CLASS) + self.assertEqual( + inventory['total'], + self.mock_p_client.update_inventory.call_args[0][1]['total']) + self.assertEqual( + inventory['reserved'], + self.mock_p_client.update_inventory.call_args[0][1]['reserved']) + self.mock_p_client.reset_mock() + self.mock_n_client.reset_mock() + + def _get_inventory(self, total, reserved): + inventory = {'total': total, 'reserved': reserved, 'min_unit': 1, + 'max_unit': 1, 'step_size': 1, 'allocation_ratio': 1.0, + 'resource_class': seg_plugin.IPV4_RESOURCE_CLASS} + return inventory, copy.deepcopy(inventory) + + def _test_second_subnet_association_with_segment(self): + network, segment, first_subnet = ( + self._test_first_subnet_association_with_segment()) + segment_id = segment['segment']['id'] + # Associate an IPv6 subnet with the segment + self._create_test_subnet_with_segment(network, segment) + first_total, first_reserved = ( + self._calculate_inventory_total_and_reserved( + first_subnet['subnet'])) + inventory, original_inventory = self._get_inventory(first_total, + first_reserved) + self.mock_p_client.get_inventory.return_value = inventory + second_subnet = self._create_test_subnet_with_segment( + network, segment, cidr='10.0.1.0/24') + self._assert_inventory_update(segment_id, original_inventory, + subnet=second_subnet['subnet']) + return segment_id, first_subnet, second_subnet + + def test_second_subnet_association_with_segment(self): + self._test_second_subnet_association_with_segment() + + def test_delete_last_ipv4_subnet(self): + network, segment, subnet = ( + self._test_first_subnet_association_with_segment()) + # Associate an IPv6 subnet with the segment + self._create_test_subnet_with_segment(network, segment) + segment_id = segment['segment']['id'] + aggregate = mock.MagicMock() + aggregate.uuid = uuidutils.generate_uuid() + aggregate.id = 1 + aggregate.hosts = ['fakehost1'] + self.mock_p_client.list_aggregates.return_value = { + 'aggregates': [aggregate.uuid]} + self.mock_n_client.aggregates.list.return_value = [aggregate] + self.mock_n_client.aggregates.get_details.return_value = aggregate + self._delete('subnets', subnet['subnet']['id']) + self.batch_notifier._notify() + self._assert_inventory_delete(segment_id, aggregate) + + def _assert_inventory_delete(self, segment_id, aggregate): + self.mock_p_client.list_aggregates.assert_called_with(segment_id) + self.assertEqual(1, self.mock_n_client.aggregates.list.call_count) + self.mock_n_client.aggregates.get_details.assert_called_with( + aggregate.id) + calls = [mock.call(aggregate.id, host) for host in aggregate.hosts] + self.mock_n_client.aggregates.remove_host.assert_has_calls(calls) + self.mock_n_client.aggregates.delete.assert_called_with(aggregate.id) + self.mock_p_client.delete_resource_provider.assert_called_with( + segment_id) + self.mock_p_client.reset_mock() + self.mock_n_client.reset_mock() + + def test_delete_ipv4_subnet(self): + segment_id, first_subnet, second_subnet = ( + self._test_second_subnet_association_with_segment()) + first_total, first_reserved = ( + self._calculate_inventory_total_and_reserved( + first_subnet['subnet'])) + second_total, second_reserved = ( + self._calculate_inventory_total_and_reserved( + second_subnet['subnet'])) + inventory, original_inventory = self._get_inventory( + first_total + second_total, first_reserved + second_reserved) + self.mock_p_client.get_inventory.return_value = inventory + self._delete('subnets', first_subnet['subnet']['id']) + self._assert_inventory_update(segment_id, original_inventory, + original_subnet=first_subnet['subnet']) + + def _test_update_ipv4_subnet_allocation_pools(self, allocation_pools, + new_allocation_pools): + network, segment, original_subnet = ( + self._test_first_subnet_association_with_segment( + cidr='10.0.0.0/24', allocation_pools=allocation_pools)) + segment_id = segment['segment']['id'] + self.mock_p_client.reset_mock() + self.mock_n_client.reset_mock() + total, reserved = self._calculate_inventory_total_and_reserved( + original_subnet['subnet']) + inventory, original_inventory = self._get_inventory(total, reserved) + self.mock_p_client.get_inventory.return_value = inventory + subnet_data = {'subnet': {'allocation_pools': new_allocation_pools}} + subnet_req = self.new_update_request('subnets', + subnet_data, + original_subnet['subnet']['id']) + subnet = self.deserialize(self.fmt, subnet_req.get_response(self.api)) + self._assert_inventory_update( + segment_id, original_inventory, subnet=subnet['subnet'], + original_subnet=original_subnet['subnet']) + + def test_update_ipv4_subnet_expand_allocation_pool(self): + self._test_update_ipv4_subnet_allocation_pools( + [{'start': '10.0.0.2', 'end': '10.0.0.100'}], + [{'start': '10.0.0.2', 'end': '10.0.0.254'}]) + + def test_update_ipv4_subnet_add_allocation_pool(self): + self._test_update_ipv4_subnet_allocation_pools( + [{'start': '10.0.0.2', 'end': '10.0.0.100'}], + [{'start': '10.0.0.2', 'end': '10.0.0.100'}, + {'start': '10.0.0.200', 'end': '10.0.0.254'}]) + + def test_update_ipv4_subnet_contract_allocation_pool(self): + self._test_update_ipv4_subnet_allocation_pools( + [{'start': '10.0.0.2', 'end': '10.0.0.254'}], + [{'start': '10.0.0.2', 'end': '10.0.0.100'}]) + + def test_update_ipv4_subnet_remove_allocation_pool(self): + self._test_update_ipv4_subnet_allocation_pools( + [{'start': '10.0.0.2', 'end': '10.0.0.100'}, + {'start': '10.0.0.200', 'end': '10.0.0.254'}], + [{'start': '10.0.0.2', 'end': '10.0.0.100'}]) + + def _test_update_ipv4_subnet_delete_allocation_pools(self): + segment_id, first_subnet, second_subnet = ( + self._test_second_subnet_association_with_segment()) + first_total, first_reserved = ( + self._calculate_inventory_total_and_reserved( + first_subnet['subnet'])) + second_total, second_reserved = ( + self._calculate_inventory_total_and_reserved( + second_subnet['subnet'])) + inventory, original_inventory = self._get_inventory( + first_total + second_total, first_reserved + second_reserved) + self.mock_p_client.get_inventory.return_value = inventory + subnet_data = {'subnet': {'allocation_pools': []}} + subnet_req = self.new_update_request('subnets', + subnet_data, + first_subnet['subnet']['id']) + subnet_req.get_response(self.api) + self._assert_inventory_update(segment_id, original_inventory, + original_subnet=first_subnet['subnet']) + return segment_id, second_subnet + + def test_update_ipv4_subnet_delete_allocation_pools(self): + self._test_update_ipv4_subnet_delete_allocation_pools() + + def test_update_ipv4_subnet_delete_restore_last_allocation_pool(self): + segment_id, subnet = ( + self._test_update_ipv4_subnet_delete_allocation_pools()) + self.mock_p_client.reset_mock() + self.mock_n_client.reset_mock() + allocation_pools = subnet['subnet']['allocation_pools'] + aggregate = mock.MagicMock() + aggregate.uuid = uuidutils.generate_uuid() + aggregate.id = 1 + aggregate.hosts = ['fakehost1'] + self.mock_p_client.list_aggregates.return_value = { + 'aggregates': [aggregate.uuid]} + self.mock_n_client.aggregates.list.return_value = [aggregate] + self.mock_n_client.aggregates.get_details.return_value = aggregate + subnet_data = {'subnet': {'allocation_pools': []}} + self._update('subnets', subnet['subnet']['id'], subnet_data) + self.batch_notifier._notify() + self._assert_inventory_delete(segment_id, aggregate) + self.mock_p_client.get_inventory.side_effect = ( + neutron_exc.PlacementResourceProviderNotFound( + resource_provider=segment_id, + resource_class=seg_plugin.IPV4_RESOURCE_CLASS)) + aggregate.hosts = [] + self.mock_n_client.aggregates.create.return_value = aggregate + subnet_data = {'subnet': {'allocation_pools': allocation_pools}} + subnet = self._update('subnets', subnet['subnet']['id'], subnet_data) + self._assert_inventory_creation(segment_id, aggregate, subnet) + + def test_add_host_to_segment_aggregate(self): + db.subscribe() + network, segment, first_subnet = ( + self._test_first_subnet_association_with_segment()) + segment_id = segment['segment']['id'] + aggregate = mock.MagicMock() + aggregate.uuid = uuidutils.generate_uuid() + aggregate.id = 1 + aggregate.hosts = ['fakehost1'] + self.mock_p_client.list_aggregates.return_value = { + 'aggregates': [aggregate.uuid]} + self.mock_n_client.aggregates.list.return_value = [aggregate] + host = 'otherfakehost' + helpers.register_ovs_agent(host=host, + bridge_mappings={'physnet': 'br-eth-1'}, + plugin=self.plugin, start_flag=True) + self.batch_notifier._notify() + self.mock_p_client.list_aggregates.assert_called_with(segment_id) + self.assertEqual(1, self.mock_n_client.aggregates.list.call_count) + self.mock_n_client.aggregates.add_host.assert_called_with(aggregate.id, + host) + + def test_add_host_to_non_existent_segment_aggregate(self): + db.subscribe() + network, segment, first_subnet = ( + self._test_first_subnet_association_with_segment()) + with mock.patch.object(seg_plugin.LOG, 'info') as log: + segment_id = segment['segment']['id'] + aggregate = mock.MagicMock() + aggregate.uuid = uuidutils.generate_uuid() + aggregate.id = 1 + aggregate.hosts = ['fakehost1'] + self.mock_p_client.list_aggregates.side_effect = ( + neutron_exc.PlacementAggregateNotFound( + resource_provider=segment_id)) + self.mock_n_client.aggregates.list.return_value = [aggregate] + host = 'otherfakehost' + helpers.register_ovs_agent(host=host, + bridge_mappings={'physnet': 'br-eth-1'}, + plugin=self.plugin, start_flag=True) + self.batch_notifier._notify() + self.mock_p_client.list_aggregates.assert_called_with(segment_id) + self.assertTrue(log.called) + self.mock_n_client.aggregates.add_host.assert_not_called() + + def test_add_host_segment_aggregate_conflict(self): + db.subscribe() + network, segment, first_subnet = ( + self._test_first_subnet_association_with_segment()) + with mock.patch.object(seg_plugin.LOG, 'info') as log: + segment_id = segment['segment']['id'] + aggregate = mock.MagicMock() + aggregate.uuid = uuidutils.generate_uuid() + aggregate.id = 1 + aggregate.hosts = ['fakehost1'] + self.mock_p_client.list_aggregates.return_value = { + 'aggregates': [aggregate.uuid]} + self.mock_n_client.aggregates.add_host.side_effect = ( + nova_exc.Conflict(nova_exc.Conflict.http_status)) + self.mock_n_client.aggregates.list.return_value = [aggregate] + host = 'otherfakehost' + helpers.register_ovs_agent(host=host, + bridge_mappings={'physnet': 'br-eth-1'}, + plugin=self.plugin, start_flag=True) + self.batch_notifier._notify() + self.mock_p_client.list_aggregates.assert_called_with(segment_id) + self.mock_n_client.aggregates.add_host.assert_called_with( + aggregate.id, host) + self.assertTrue(log.called) + + def _assert_inventory_update_port(self, segment_id, inventory, + num_fixed_ips): + inventory['reserved'] += num_fixed_ips + self.mock_p_client.get_inventory.assert_called_with( + segment_id, seg_plugin.IPV4_RESOURCE_CLASS) + self.mock_p_client.update_inventory.assert_called_with(segment_id, + inventory, seg_plugin.IPV4_RESOURCE_CLASS) + self.assertEqual( + inventory['total'], + self.mock_p_client.update_inventory.call_args[0][1]['total']) + self.assertEqual( + inventory['reserved'], + self.mock_p_client.update_inventory.call_args[0][1]['reserved']) + self.mock_p_client.reset_mock() + self.mock_n_client.reset_mock() + + def _create_test_port(self, network_id, tenant_id, subnet, **kwargs): + port = self._make_port(self.fmt, network_id, tenant_id=tenant_id, + arg_list=(portbindings.HOST_ID,), **kwargs) + self.batch_notifier._notify() + return port + + def _test_create_port(self, **kwargs): + network, segment, subnet = ( + self._test_first_subnet_association_with_segment()) + total, reserved = self._calculate_inventory_total_and_reserved( + subnet['subnet']) + inventory, original_inventory = self._get_inventory(total, reserved) + self.mock_p_client.get_inventory.return_value = inventory + port = self._create_test_port(network['network']['id'], + network['network']['tenant_id'], subnet, + **kwargs) + return segment['segment']['id'], original_inventory, port + + def test_create_bound_port(self): + kwargs = {portbindings.HOST_ID: 'fakehost'} + segment_id, original_inventory, _ = self._test_create_port(**kwargs) + self._assert_inventory_update_port(segment_id, original_inventory, 1) + + def test_create_bound_port_compute_owned(self): + kwargs = {portbindings.HOST_ID: 'fakehost', + 'device_owner': constants.DEVICE_OWNER_COMPUTE_PREFIX} + self._test_create_port(**kwargs) + self.mock_p_client.get_inventory.assert_not_called() + self.mock_p_client.update_inventory.assert_not_called() + + def test_create_bound_port_dhcp_owned(self): + kwargs = {portbindings.HOST_ID: 'fakehost', + 'device_owner': constants.DEVICE_OWNER_DHCP} + self._test_create_port(**kwargs) + self.mock_p_client.get_inventory.assert_not_called() + self.mock_p_client.update_inventory.assert_not_called() + + def test_create_unbound_port(self): + self._test_create_port() + self.mock_p_client.get_inventory.assert_not_called() + self.mock_p_client.update_inventory.assert_not_called() + + def test_delete_bound_port(self): + kwargs = {portbindings.HOST_ID: 'fakehost'} + segment_id, before_create_inventory, port = self._test_create_port( + **kwargs) + self.mock_p_client.reset_mock() + inventory, original_inventory = self._get_inventory( + before_create_inventory['total'], + before_create_inventory['reserved'] + 1) + self.mock_p_client.get_inventory.return_value = inventory + self._delete('ports', port['port']['id']) + self.batch_notifier._notify() + self._assert_inventory_update_port(segment_id, original_inventory, -1) + + def _create_port_for_update_test(self, num_fixed_ips=1, dhcp_owned=False, + compute_owned=False): + segment_id, first_subnet, second_subnet = ( + self._test_second_subnet_association_with_segment()) + first_total, first_reserved = ( + self._calculate_inventory_total_and_reserved( + first_subnet['subnet'])) + second_total, second_reserved = ( + self._calculate_inventory_total_and_reserved( + second_subnet['subnet'])) + inventory, original_inventory = self._get_inventory( + first_total + second_total, first_reserved + second_reserved) + self.mock_p_client.get_inventory.return_value = inventory + kwargs = {portbindings.HOST_ID: 'fakehost', + 'fixed_ips': [{'subnet_id': first_subnet['subnet']['id']}]} + created_fixed_ips = num_fixed_ips + if num_fixed_ips > 1: + kwargs['fixed_ips'].append( + {'subnet_id': second_subnet['subnet']['id']}) + if dhcp_owned: + kwargs['device_owner'] = constants.DEVICE_OWNER_DHCP + if compute_owned: + kwargs['device_owner'] = constants.DEVICE_OWNER_COMPUTE_PREFIX + port = self._create_test_port(first_subnet['subnet']['network_id'], + first_subnet['subnet']['tenant_id'], + first_subnet, **kwargs) + if dhcp_owned or compute_owned: + self.mock_p_client.get_inventory.assert_not_called() + self.mock_p_client.update_inventory.assert_not_called() + else: + self._assert_inventory_update_port(segment_id, original_inventory, + created_fixed_ips) + return first_subnet, second_subnet, port + + def _port_update(self, first_subnet, second_subnet, fixed_ips_subnets, + port, reserved_increment_before=1, + reserved_increment_after=1, dhcp_owned=False, + compute_owned=False): + first_total, first_reserved = ( + self._calculate_inventory_total_and_reserved( + first_subnet['subnet'])) + second_total, second_reserved = ( + self._calculate_inventory_total_and_reserved( + second_subnet['subnet'])) + inventory, original_inventory = self._get_inventory( + first_total + second_total, + first_reserved + second_reserved + reserved_increment_before) + self.mock_p_client.get_inventory.return_value = inventory + port_data = {'port': {'device_owner': ''}} + if fixed_ips_subnets: + port_data['port']['fixed_ips'] = [] + for subnet in fixed_ips_subnets: + port_data['port']['fixed_ips'].append( + {'subnet_id': subnet['subnet']['id']}) + if dhcp_owned: + port_data['port']['device_owner'] = constants.DEVICE_OWNER_DHCP + if compute_owned: + port_data['port']['device_owner'] = ( + constants.DEVICE_OWNER_COMPUTE_PREFIX) + self._update('ports', port['port']['id'], port_data) + self.batch_notifier._notify() + self._assert_inventory_update_port( + first_subnet['subnet']['segment_id'], original_inventory, + reserved_increment_after) + + def test_update_port_add_fixed_ip(self): + first_subnet, second_subnet, port = self._create_port_for_update_test() + self._port_update(first_subnet, second_subnet, + [first_subnet, second_subnet], port) + + def test_update_port_remove_fixed_ip(self): + first_subnet, second_subnet, port = self._create_port_for_update_test( + num_fixed_ips=2) + self._port_update(first_subnet, second_subnet, + [first_subnet], port, reserved_increment_before=2, + reserved_increment_after=-1) + + def test_update_port_change_to_dhcp_owned(self): + first_subnet, second_subnet, port = self._create_port_for_update_test() + self._port_update(first_subnet, second_subnet, [], port, + reserved_increment_after=-1, dhcp_owned=True) + + def test_update_port_change_to_no_dhcp_owned(self): + first_subnet, second_subnet, port = self._create_port_for_update_test( + dhcp_owned=True) + self._port_update(first_subnet, second_subnet, [], port, + reserved_increment_before=0, + reserved_increment_after=1) + + def test_update_port_change_to_compute_owned(self): + first_subnet, second_subnet, port = self._create_port_for_update_test() + self._port_update(first_subnet, second_subnet, [], port, + reserved_increment_after=-1, compute_owned=True) + + def test_update_port_change_to_no_compute_owned(self): + first_subnet, second_subnet, port = self._create_port_for_update_test( + compute_owned=True) + self._port_update(first_subnet, second_subnet, [], port, + reserved_increment_before=0, + reserved_increment_after=1) + + def test_placement_api_inventory_update_conflict(self): + with mock.patch.object(seg_plugin.LOG, 'debug') as log_debug: + with mock.patch.object(seg_plugin.LOG, 'error') as log_error: + event = seg_plugin.Event(mock.ANY, mock.ANY, total=1, + reserved=0) + inventory, original_inventory = self._get_inventory(100, 2) + self.mock_p_client.get_inventory.return_value = inventory + self.mock_p_client.update_inventory.side_effect = ( + neutron_exc.PlacementInventoryUpdateConflict( + resource_provider=mock.ANY, + resource_class=seg_plugin.IPV4_RESOURCE_CLASS)) + self.segments_plugin.nova_updater._update_nova_inventory(event) + self.assertEqual(seg_plugin.MAX_INVENTORY_UPDATE_RETRIES, + self.mock_p_client.get_inventory.call_count) + self.assertEqual( + seg_plugin.MAX_INVENTORY_UPDATE_RETRIES, + self.mock_p_client.update_inventory.call_count) + self.assertEqual( + seg_plugin.MAX_INVENTORY_UPDATE_RETRIES, + log_debug.call_count) + self.assertTrue(log_error.called) + + def test_placement_api_not_available(self): + with mock.patch.object(seg_plugin.LOG, 'debug') as log: + event = seg_plugin.Event( + self.segments_plugin.nova_updater._update_nova_inventory, + mock.ANY, total=1, reserved=0) + self.mock_p_client.get_inventory.side_effect = ( + neutron_exc.PlacementEndpointNotFound()) + self.segments_plugin.nova_updater._send_notifications([event]) + self.assertTrue(log.called) + + class TestDhcpAgentSegmentScheduling(HostSegmentMappingTestCase): _mechanism_drivers = ['openvswitch', 'logger'] @@ -1613,3 +2190,9 @@ class PlacementAPIClientTestCase(base.DietTestCase): self.mock_request.side_effect = ks_exc.NotFound self.assertRaises(neutron_exc.PlacementAggregateNotFound, self.client.list_aggregates, rp_uuid) + + def test_placement_api_not_found(self): + rp_uuid = uuidutils.generate_uuid() + self.mock_request.side_effect = ks_exc.EndpointNotFound + self.assertRaises(neutron_exc.PlacementEndpointNotFound, + self.client.list_aggregates, rp_uuid)