Merge "Routed networks IPv4 inventory in Nova GRP"

This commit is contained in:
Jenkins 2017-01-24 22:37:41 +00:00 committed by Gerrit Code Review
commit 571b925c7e
6 changed files with 932 additions and 10 deletions

View File

@ -26,6 +26,7 @@ ROUTER_INTERFACE = 'router_interface'
SECURITY_GROUP = 'security_group'
SECURITY_GROUP_RULE = 'security_group_rule'
SEGMENT = 'segment'
SEGMENT_HOST_MAPPING = 'segment_host_mapping'
SUBNET = 'subnet'
SUBNETS = 'subnets'
SUBNET_GATEWAY = 'subnet_gateway'

View File

@ -41,6 +41,10 @@ class NetworkQosBindingNotFound(e.NotFound):
"could not be found.")
class PlacementEndpointNotFound(e.NotFound):
message = _("Placement API endpoint not found")
class PlacementResourceProviderNotFound(e.NotFound):
message = _("Placement resource provider not found %(resource_provider)s.")

View File

@ -275,6 +275,9 @@ def _update_segment_host_mapping_for_agent(resource, event, trigger,
segment['id'] for segment in segments
if check_segment_for_agent(segment, agent)}
update_segment_host_mapping(context, host, current_segment_ids)
registry.notify(resources.SEGMENT_HOST_MAPPING, events.AFTER_CREATE,
plugin, context=context, host=host,
current_segment_ids=current_segment_ids)
def _add_segment_host_mapping_for_segment(resource, event, trigger,

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
from keystoneauth1 import exceptions as ks_exc
from keystoneauth1 import loading as ks_loading
from keystoneauth1 import session
@ -27,6 +29,16 @@ LOG = logging.getLogger(__name__)
PLACEMENT_API_WITH_AGGREGATES = 'placement 1.1'
def check_placement_api_available(f):
@functools.wraps(f)
def wrapper(self, *a, **k):
try:
return f(self, *a, **k)
except ks_exc.EndpointNotFound:
raise n_exc.PlacementEndpointNotFound()
return wrapper
class PlacementAPIClient(object):
"""Client class for placement ReST API."""
@ -55,6 +67,7 @@ class PlacementAPIClient(object):
return self._client.delete(url, endpoint_filter=self.ks_filter,
**kwargs)
@check_placement_api_available
def create_resource_provider(self, resource_provider):
"""Create a resource provider.
@ -64,6 +77,7 @@ class PlacementAPIClient(object):
url = '/resource_providers'
self._post(url, resource_provider)
@check_placement_api_available
def delete_resource_provider(self, resource_provider_uuid):
"""Delete a resource provider.
@ -73,6 +87,7 @@ class PlacementAPIClient(object):
url = '/resource_providers/%s' % resource_provider_uuid
self._delete(url)
@check_placement_api_available
def create_inventory(self, resource_provider_uuid, inventory):
"""Create an inventory.
@ -86,6 +101,7 @@ class PlacementAPIClient(object):
url = '/resource_providers/%s/inventories' % resource_provider_uuid
self._post(url, inventory)
@check_placement_api_available
def get_inventory(self, resource_provider_uuid, resource_class):
"""Get resource provider inventory.
@ -112,6 +128,7 @@ class PlacementAPIClient(object):
else:
raise
@check_placement_api_available
def update_inventory(self, resource_provider_uuid, inventory,
resource_class):
"""Update an inventory.
@ -134,6 +151,7 @@ class PlacementAPIClient(object):
resource_provider=resource_provider_uuid,
resource_class=resource_class)
@check_placement_api_available
def associate_aggregates(self, resource_provider_uuid, aggregates):
"""Associate a list of aggregates with a resource provider.
@ -147,6 +165,7 @@ class PlacementAPIClient(object):
headers={'openstack-api-version':
PLACEMENT_API_WITH_AGGREGATES})
@check_placement_api_available
def list_aggregates(self, resource_provider_uuid):
"""List resource provider aggregates.

View File

@ -14,20 +14,37 @@
# License for the specific language governing permissions and limitations
# under the License.
from keystoneauth1 import loading as ks_loading
import netaddr
from neutron_lib import constants
from neutron_lib.plugins import directory
from novaclient import client as nova_client
from novaclient import exceptions as nova_exc
from oslo_config import cfg
from oslo_log import log
from neutron._i18n import _
from neutron._i18n import _, _LE, _LI
from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import exceptions as n_exc
from neutron.db import common_db_mixin
from neutron.db import models_v2
from neutron.extensions import ip_allocation
from neutron.extensions import l2_adjacency
from neutron.extensions import segment
from neutron.notifiers import batch_notifier
from neutron.services.segments import db
from neutron.services.segments import exceptions
from neutron.services.segments import placement_client
LOG = log.getLogger(__name__)
NOVA_API_VERSION = '2.41'
IPV4_RESOURCE_CLASS = 'IPV4_ADDRESS'
SEGMENT_NAME_STUB = 'Neutron segment id %s'
MAX_INVENTORY_UPDATE_RETRIES = 10
def _extend_network_dict_binding(plugin, network_res, network_db):
@ -67,6 +84,7 @@ class Plugin(db.SegmentDbMixin, segment.SegmentPluginBase):
attributes.SUBNETS, [_extend_subnet_dict_binding])
common_db_mixin.CommonDbMixin.register_dict_extend_funcs(
attributes.PORTS, [_extend_port_dict_binding])
self.nova_updater = NovaSegmentNotifier()
registry.subscribe(
self._prevent_segment_delete_with_subnet_associated,
@ -90,3 +108,297 @@ class Plugin(db.SegmentDbMixin, segment.SegmentPluginBase):
reason = _("The segment is still associated with subnet(s) "
"%s") % ", ".join(subnet_ids)
raise exceptions.SegmentInUse(segment_id=segment_id, reason=reason)
class Event(object):
def __init__(self, method, segment_ids, total=None, reserved=None,
segment_host_mappings=None, host=None):
self.method = method
if isinstance(segment_ids, set):
self.segment_ids = segment_ids
else:
self.segment_id = segment_ids
self.total = total
self.reserved = reserved
self.segment_host_mappings = segment_host_mappings
self.host = host
class NovaSegmentNotifier(object):
def __init__(self):
self.p_client, self.n_client = self._get_clients()
self.batch_notifier = batch_notifier.BatchNotifier(
cfg.CONF.send_events_interval, self._send_notifications)
registry.subscribe(self._notify_subnet_created, resources.SUBNET,
events.AFTER_CREATE)
registry.subscribe(self._notify_subnet_updated, resources.SUBNET,
events.AFTER_UPDATE)
registry.subscribe(self._notify_subnet_deleted, resources.SUBNET,
events.AFTER_DELETE)
registry.subscribe(self._notify_host_addition_to_aggregate,
resources.SEGMENT_HOST_MAPPING, events.AFTER_CREATE)
registry.subscribe(self._notify_port_created_or_deleted,
resources.PORT, events.AFTER_CREATE)
registry.subscribe(self._notify_port_updated, resources.PORT,
events.AFTER_UPDATE)
registry.subscribe(self._notify_port_created_or_deleted,
resources.PORT, events.AFTER_DELETE)
def _get_clients(self):
p_client = placement_client.PlacementAPIClient()
n_auth = ks_loading.load_auth_from_conf_options(cfg.CONF, 'nova')
n_session = ks_loading.load_session_from_conf_options(
cfg.CONF,
'nova',
auth=n_auth)
extensions = [
ext for ext in nova_client.discover_extensions(NOVA_API_VERSION)
if ext.name == "server_external_events"]
n_client = nova_client.Client(
NOVA_API_VERSION,
session=n_session,
region_name=cfg.CONF.nova.region_name,
endpoint_type=cfg.CONF.nova.endpoint_type,
extensions=extensions)
return p_client, n_client
def _send_notifications(self, batched_events):
for event in batched_events:
try:
event.method(event)
except n_exc.PlacementEndpointNotFound:
LOG.debug('Placement API was not found when trying to '
'update routed networks IPv4 inventories')
return
def _notify_subnet_created(self, resource, event, trigger, context,
subnet, **kwargs):
segment_id = subnet.get('segment_id')
if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4:
return
total, reserved = self._calculate_inventory_total_and_reserved(subnet)
if total:
query = context.session.query(
db.SegmentHostMapping).filter_by(segment_id=segment_id)
self.batch_notifier.queue_event(Event(
self._create_or_update_nova_inventory, segment_id, total=total,
reserved=reserved, segment_host_mappings=query.all()))
def _create_or_update_nova_inventory(self, event):
try:
self._update_nova_inventory(event)
except n_exc.PlacementResourceProviderNotFound:
self._create_nova_inventory(event.segment_id, event.total,
event.reserved,
event.segment_host_mappings)
def _update_nova_inventory(self, event):
for count in range(MAX_INVENTORY_UPDATE_RETRIES):
ipv4_inventory = self.p_client.get_inventory(event.segment_id,
IPV4_RESOURCE_CLASS)
if event.total:
ipv4_inventory['total'] += event.total
if event.reserved:
ipv4_inventory['reserved'] += event.reserved
try:
self.p_client.update_inventory(event.segment_id,
ipv4_inventory,
IPV4_RESOURCE_CLASS)
return
except n_exc.PlacementInventoryUpdateConflict:
LOG.debug('Re-trying to update Nova IPv4 inventory for '
'routed network segment: %s', event.segment_id)
LOG.error(_LE('Failed to update Nova IPv4 inventory for routed '
'network segment: %s'), event.segment_id)
def _create_nova_inventory(self, segment_id, total, reserved,
segment_host_mappings):
name = SEGMENT_NAME_STUB % segment_id
resource_provider = {'name': name, 'uuid': segment_id}
self.p_client.create_resource_provider(resource_provider)
aggregate = self.n_client.aggregates.create(name, None)
self.p_client.associate_aggregates(segment_id, [aggregate.uuid])
for mapping in segment_host_mappings:
self.n_client.aggregates.add_host(aggregate.id, mapping['host'])
ipv4_inventory = {'total': total, 'reserved': reserved,
'min_unit': 1, 'max_unit': 1, 'step_size': 1,
'allocation_ratio': 1.0,
'resource_class': IPV4_RESOURCE_CLASS}
self.p_client.create_inventory(segment_id, ipv4_inventory)
def _calculate_inventory_total_and_reserved(self, subnet):
total = 0
reserved = 0
allocation_pools = subnet.get('allocation_pools') or []
for pool in allocation_pools:
total += int(netaddr.IPAddress(pool['end']) -
netaddr.IPAddress(pool['start'])) + 1
if total:
if subnet['gateway_ip']:
total += 1
reserved += 1
if subnet['enable_dhcp']:
reserved += 1
return total, reserved
def _notify_subnet_updated(self, resource, event, trigger, context,
subnet, original_subnet, **kwargs):
segment_id = subnet.get('segment_id')
if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4:
return
filters = {'segment_id': [segment_id],
'ip_version': [constants.IP_VERSION_4]}
if not subnet['allocation_pools']:
plugin = directory.get_plugin()
alloc_pools = [s['allocation_pools'] for s in
plugin.get_subnets(context, filters=filters)]
if not any(alloc_pools):
self.batch_notifier.queue_event(Event(
self._delete_nova_inventory, segment_id))
return
original_total, original_reserved = (
self._calculate_inventory_total_and_reserved(original_subnet))
updated_total, updated_reserved = (
self._calculate_inventory_total_and_reserved(subnet))
total = updated_total - original_total
reserved = updated_reserved - original_reserved
if total or reserved:
segment_host_mappings = None
if not original_subnet['allocation_pools']:
segment_host_mappings = context.session.query(
db.SegmentHostMapping).filter_by(
segment_id=segment_id).all()
self.batch_notifier.queue_event(Event(
self._create_or_update_nova_inventory, segment_id, total=total,
reserved=reserved,
segment_host_mappings=segment_host_mappings))
def _notify_subnet_deleted(self, resource, event, trigger, context,
subnet, **kwargs):
segment_id = subnet.get('segment_id')
if not segment_id or subnet['ip_version'] != constants.IP_VERSION_4:
return
total, reserved = self._calculate_inventory_total_and_reserved(subnet)
if total:
filters = {'segment_id': [segment_id], 'ip_version': [4]}
plugin = directory.get_plugin()
if plugin.get_subnets_count(context, filters=filters) > 0:
self.batch_notifier.queue_event(Event(
self._update_nova_inventory, segment_id, total=-total,
reserved=-reserved))
else:
self.batch_notifier.queue_event(Event(
self._delete_nova_inventory, segment_id))
def _get_aggregate_id(self, segment_id):
aggregate_uuid = self.p_client.list_aggregates(
segment_id)['aggregates'][0]
aggregates = self.n_client.aggregates.list()
for aggregate in aggregates:
if aggregate.uuid == aggregate_uuid:
return aggregate.id
def _delete_nova_inventory(self, event):
aggregate_id = self._get_aggregate_id(event.segment_id)
aggregate = self.n_client.aggregates.get_details(
aggregate_id)
for host in aggregate.hosts:
self.n_client.aggregates.remove_host(aggregate_id,
host)
self.n_client.aggregates.delete(aggregate_id)
self.p_client.delete_resource_provider(event.segment_id)
def _notify_host_addition_to_aggregate(self, resource, event, trigger,
context, host, current_segment_ids,
**kwargs):
query = context.session.query(models_v2.Subnet).filter(
models_v2.Subnet.segment_id.in_(current_segment_ids))
segment_ids = {subnet['segment_id'] for subnet in query}
self.batch_notifier.queue_event(Event(self._add_host_to_aggregate,
segment_ids, host=host))
def _add_host_to_aggregate(self, event):
for segment_id in event.segment_ids:
try:
aggregate_id = self._get_aggregate_id(segment_id)
except n_exc.PlacementAggregateNotFound:
LOG.info(_LI('When adding host %(host)s, aggregate not found '
'for routed network segment %(segment_id)s'),
{'host': event.host, 'segment_id': segment_id})
continue
try:
self.n_client.aggregates.add_host(aggregate_id, event.host)
except nova_exc.Conflict:
LOG.info(_LI('Host %(host)s already exists in aggregate for '
'routed network segment %(segment_id)s'),
{'host': event.host, 'segment_id': segment_id})
def _notify_port_created_or_deleted(self, resource, event, trigger,
context, port, **kwargs):
if not self._does_port_require_nova_inventory_update(port):
return
ipv4_subnets_number, segment_id = (
self._get_ipv4_subnets_number_and_segment_id(port, context))
if segment_id:
if event == events.AFTER_DELETE:
ipv4_subnets_number = -ipv4_subnets_number
self.batch_notifier.queue_event(Event(self._update_nova_inventory,
segment_id, reserved=ipv4_subnets_number))
def _notify_port_updated(self, resource, event, trigger, context,
**kwargs):
port = kwargs.get('port')
original_port = kwargs.get('original_port')
does_original_port_require_nova_inventory_update = (
self._does_port_require_nova_inventory_update(original_port))
does_port_require_nova_inventory_update = (
self._does_port_require_nova_inventory_update(port))
if not (does_original_port_require_nova_inventory_update or
does_port_require_nova_inventory_update):
return
original_port_ipv4_subnets_number, segment_id = (
self._get_ipv4_subnets_number_and_segment_id(original_port,
context))
if not segment_id:
return
port_ipv4_subnets_number = len(self._get_ipv4_subnet_ids(port))
if not does_original_port_require_nova_inventory_update:
original_port_ipv4_subnets_number = 0
if not does_port_require_nova_inventory_update:
port_ipv4_subnets_number = 0
update = port_ipv4_subnets_number - original_port_ipv4_subnets_number
if update:
self.batch_notifier.queue_event(Event(self._update_nova_inventory,
segment_id, reserved=update))
def _get_ipv4_subnets_number_and_segment_id(self, port, context):
ipv4_subnet_ids = self._get_ipv4_subnet_ids(port)
if not ipv4_subnet_ids:
return 0, None
segment_id = context.session.query(
models_v2.Subnet).filter_by(id=ipv4_subnet_ids[0]).one()[
'segment_id']
if not segment_id:
return 0, None
return len(ipv4_subnet_ids), segment_id
def _does_port_require_nova_inventory_update(self, port):
device_owner = port.get('device_owner')
if (device_owner.startswith(constants.DEVICE_OWNER_COMPUTE_PREFIX) or
device_owner == constants.DEVICE_OWNER_DHCP):
return False
return True
def _get_ipv4_subnet_ids(self, port):
ipv4_subnet_ids = []
for ip in port.get('fixed_ips', []):
if netaddr.IPAddress(
ip['ip_address']).version == constants.IP_VERSION_4:
ipv4_subnet_ids.append(ip['subnet_id'])
return ipv4_subnet_ids

View File

@ -12,11 +12,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from keystoneauth1 import exceptions as ks_exc
import mock
import netaddr
from neutron_lib import constants
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import directory
from novaclient import exceptions as nova_exc
from oslo_config import cfg
from oslo_utils import uuidutils
import webob.exc
@ -44,6 +48,7 @@ from neutron.plugins.ml2 import config
from neutron.services.segments import db
from neutron.services.segments import exceptions as segment_exc
from neutron.services.segments import placement_client
from neutron.services.segments import plugin as seg_plugin
from neutron.tests import base
from neutron.tests.common import helpers
from neutron.tests.unit.db import test_db_base_plugin_v2
@ -796,7 +801,8 @@ class TestMl2HostSegmentMappingAgentServerSynch(HostSegmentMappingTestCase):
mock_function.assert_not_called()
class TestSegmentAwareIpam(SegmentTestCase):
class SegmentAwareIpamTestCase(SegmentTestCase):
def _setup_host_mappings(self, mappings=()):
ctx = context.get_admin_context()
for segment_id, host in mappings:
@ -808,6 +814,13 @@ class TestSegmentAwareIpam(SegmentTestCase):
cidr='2001:db8:0:0::/64',
physnet='physnet'):
"""Creates one network with one segment and one subnet"""
network, segment = self._create_test_network_and_segment(network,
physnet)
subnet = self._create_test_subnet_with_segment(network, segment, cidr)
return network, segment, subnet
def _create_test_network_and_segment(self, network=None,
physnet='physnet'):
if not network:
with self.network() as network:
pass
@ -816,15 +829,29 @@ class TestSegmentAwareIpam(SegmentTestCase):
network_id=network['network']['id'],
physical_network=physnet,
network_type=p_constants.TYPE_VLAN)
return network, segment
def _create_test_subnet_with_segment(self, network, segment,
cidr='2001:db8:0:0::/64',
allocation_pools=None):
ip_version = netaddr.IPNetwork(cidr).version if cidr else None
with self.subnet(network=network,
segment_id=segment['segment']['id'],
ip_version=ip_version,
cidr=cidr) as subnet:
cidr=cidr,
allocation_pools=allocation_pools) as subnet:
self._validate_l2_adjacency(network['network']['id'],
is_adjacent=False)
return network, segment, subnet
return subnet
def _validate_l2_adjacency(self, network_id, is_adjacent):
request = self.new_show_request('networks', network_id)
response = self.deserialize(self.fmt, request.get_response(self.api))
self.assertEqual(is_adjacent,
response['network'][l2_adjacency.L2_ADJACENCY])
class TestSegmentAwareIpam(SegmentAwareIpamTestCase):
def _create_test_segments_with_subnets(self, num):
"""Creates one network with num segments and num subnets"""
@ -1101,12 +1128,6 @@ class TestSegmentAwareIpam(SegmentTestCase):
self.assertEqual(webob.exc.HTTPOk.code, response.status_int)
self._assert_one_ip_in_subnet(response, subnet['subnet']['cidr'])
def _validate_l2_adjacency(self, network_id, is_adjacent):
request = self.new_show_request('networks', network_id)
response = self.deserialize(self.fmt, request.get_response(self.api))
self.assertEqual(is_adjacent,
response['network'][l2_adjacency.L2_ADJACENCY])
def _validate_deferred_ip_allocation(self, port_id):
request = self.new_show_request('ports', port_id)
response = self.deserialize(self.fmt, request.get_response(self.api))
@ -1394,6 +1415,562 @@ class TestSegmentAwareIpamML2(TestSegmentAwareIpam):
super(TestSegmentAwareIpamML2, self).setUp(plugin='ml2')
class TestNovaSegmentNotifier(SegmentAwareIpamTestCase):
_mechanism_drivers = ['openvswitch', 'logger']
def setUp(self):
config.cfg.CONF.set_override('mechanism_drivers',
self._mechanism_drivers,
group='ml2')
config.cfg.CONF.set_override('network_vlan_ranges',
['physnet:200:209', 'physnet0:200:209',
'physnet1:200:209', 'physnet2:200:209'],
group='ml2_type_vlan')
super(TestNovaSegmentNotifier, self).setUp(plugin='ml2')
self.segments_plugin = directory.get_plugin(ext_segment.SEGMENTS)
nova_updater = self.segments_plugin.nova_updater
nova_updater.p_client = mock.MagicMock()
self.mock_p_client = nova_updater.p_client
nova_updater.n_client = mock.MagicMock()
self.mock_n_client = nova_updater.n_client
self.batch_notifier = nova_updater.batch_notifier
self.batch_notifier._waiting_to_send = True
def _calculate_inventory_total_and_reserved(self, subnet):
total = 0
reserved = 0
allocation_pools = subnet.get('allocation_pools') or []
for pool in allocation_pools:
total += int(netaddr.IPAddress(pool['end']) -
netaddr.IPAddress(pool['start'])) + 1
if total:
if subnet['gateway_ip']:
total += 1
reserved += 1
if subnet['enable_dhcp']:
reserved += 1
return total, reserved
def _assert_inventory_creation(self, segment_id, aggregate, subnet):
self.batch_notifier._notify()
self.mock_p_client.get_inventory.assert_called_with(
segment_id, seg_plugin.IPV4_RESOURCE_CLASS)
self.mock_p_client.update_inventory.assert_not_called()
name = seg_plugin.SEGMENT_NAME_STUB % segment_id
resource_provider = {'name': name, 'uuid': segment_id}
self.mock_p_client.create_resource_provider.assert_called_with(
resource_provider)
self.mock_n_client.aggregates.create.assert_called_with(name, None)
self.mock_p_client.associate_aggregates.assert_called_with(
segment_id, [aggregate.uuid])
self.mock_n_client.aggregates.add_host.assert_called_with(aggregate.id,
'fakehost')
total, reserved = self._calculate_inventory_total_and_reserved(
subnet['subnet'])
inventory, _ = self._get_inventory(total, reserved)
self.mock_p_client.create_inventory.assert_called_with(
segment_id, inventory)
self.assertEqual(
inventory['total'],
self.mock_p_client.create_inventory.call_args[0][1]['total'])
self.assertEqual(
inventory['reserved'],
self.mock_p_client.create_inventory.call_args[0][1]['reserved'])
self.mock_p_client.reset_mock()
self.mock_p_client.get_inventory.side_effect = None
self.mock_n_client.reset_mock()
def _test_first_subnet_association_with_segment(self, cidr='10.0.0.0/24',
allocation_pools=None):
network, segment = self._create_test_network_and_segment()
segment_id = segment['segment']['id']
self._setup_host_mappings([(segment_id, 'fakehost')])
self.mock_p_client.get_inventory.side_effect = (
neutron_exc.PlacementResourceProviderNotFound(
resource_provider=segment_id,
resource_class=seg_plugin.IPV4_RESOURCE_CLASS))
aggregate = mock.MagicMock()
aggregate.uuid = uuidutils.generate_uuid()
aggregate.id = 1
self.mock_n_client.aggregates.create.return_value = aggregate
subnet = self._create_test_subnet_with_segment(
network, segment, cidr=cidr, allocation_pools=allocation_pools)
self._assert_inventory_creation(segment_id, aggregate, subnet)
return network, segment, subnet
def test_first_subnet_association_with_segment(self):
self._test_first_subnet_association_with_segment()
def _assert_inventory_update(self, segment_id, inventory, subnet=None,
original_subnet=None):
self.batch_notifier._notify()
self.mock_p_client.get_inventory.assert_called_with(
segment_id, seg_plugin.IPV4_RESOURCE_CLASS)
original_total = original_reserved = total = reserved = 0
if original_subnet:
original_total, original_reserved = (
self._calculate_inventory_total_and_reserved(original_subnet))
if subnet:
total, reserved = self._calculate_inventory_total_and_reserved(
subnet)
inventory['total'] += total - original_total
inventory['reserved'] += reserved - original_reserved
self.mock_p_client.update_inventory.assert_called_with(segment_id,
inventory, seg_plugin.IPV4_RESOURCE_CLASS)
self.assertEqual(
inventory['total'],
self.mock_p_client.update_inventory.call_args[0][1]['total'])
self.assertEqual(
inventory['reserved'],
self.mock_p_client.update_inventory.call_args[0][1]['reserved'])
self.mock_p_client.reset_mock()
self.mock_n_client.reset_mock()
def _get_inventory(self, total, reserved):
inventory = {'total': total, 'reserved': reserved, 'min_unit': 1,
'max_unit': 1, 'step_size': 1, 'allocation_ratio': 1.0,
'resource_class': seg_plugin.IPV4_RESOURCE_CLASS}
return inventory, copy.deepcopy(inventory)
def _test_second_subnet_association_with_segment(self):
network, segment, first_subnet = (
self._test_first_subnet_association_with_segment())
segment_id = segment['segment']['id']
# Associate an IPv6 subnet with the segment
self._create_test_subnet_with_segment(network, segment)
first_total, first_reserved = (
self._calculate_inventory_total_and_reserved(
first_subnet['subnet']))
inventory, original_inventory = self._get_inventory(first_total,
first_reserved)
self.mock_p_client.get_inventory.return_value = inventory
second_subnet = self._create_test_subnet_with_segment(
network, segment, cidr='10.0.1.0/24')
self._assert_inventory_update(segment_id, original_inventory,
subnet=second_subnet['subnet'])
return segment_id, first_subnet, second_subnet
def test_second_subnet_association_with_segment(self):
self._test_second_subnet_association_with_segment()
def test_delete_last_ipv4_subnet(self):
network, segment, subnet = (
self._test_first_subnet_association_with_segment())
# Associate an IPv6 subnet with the segment
self._create_test_subnet_with_segment(network, segment)
segment_id = segment['segment']['id']
aggregate = mock.MagicMock()
aggregate.uuid = uuidutils.generate_uuid()
aggregate.id = 1
aggregate.hosts = ['fakehost1']
self.mock_p_client.list_aggregates.return_value = {
'aggregates': [aggregate.uuid]}
self.mock_n_client.aggregates.list.return_value = [aggregate]
self.mock_n_client.aggregates.get_details.return_value = aggregate
self._delete('subnets', subnet['subnet']['id'])
self.batch_notifier._notify()
self._assert_inventory_delete(segment_id, aggregate)
def _assert_inventory_delete(self, segment_id, aggregate):
self.mock_p_client.list_aggregates.assert_called_with(segment_id)
self.assertEqual(1, self.mock_n_client.aggregates.list.call_count)
self.mock_n_client.aggregates.get_details.assert_called_with(
aggregate.id)
calls = [mock.call(aggregate.id, host) for host in aggregate.hosts]
self.mock_n_client.aggregates.remove_host.assert_has_calls(calls)
self.mock_n_client.aggregates.delete.assert_called_with(aggregate.id)
self.mock_p_client.delete_resource_provider.assert_called_with(
segment_id)
self.mock_p_client.reset_mock()
self.mock_n_client.reset_mock()
def test_delete_ipv4_subnet(self):
segment_id, first_subnet, second_subnet = (
self._test_second_subnet_association_with_segment())
first_total, first_reserved = (
self._calculate_inventory_total_and_reserved(
first_subnet['subnet']))
second_total, second_reserved = (
self._calculate_inventory_total_and_reserved(
second_subnet['subnet']))
inventory, original_inventory = self._get_inventory(
first_total + second_total, first_reserved + second_reserved)
self.mock_p_client.get_inventory.return_value = inventory
self._delete('subnets', first_subnet['subnet']['id'])
self._assert_inventory_update(segment_id, original_inventory,
original_subnet=first_subnet['subnet'])
def _test_update_ipv4_subnet_allocation_pools(self, allocation_pools,
new_allocation_pools):
network, segment, original_subnet = (
self._test_first_subnet_association_with_segment(
cidr='10.0.0.0/24', allocation_pools=allocation_pools))
segment_id = segment['segment']['id']
self.mock_p_client.reset_mock()
self.mock_n_client.reset_mock()
total, reserved = self._calculate_inventory_total_and_reserved(
original_subnet['subnet'])
inventory, original_inventory = self._get_inventory(total, reserved)
self.mock_p_client.get_inventory.return_value = inventory
subnet_data = {'subnet': {'allocation_pools': new_allocation_pools}}
subnet_req = self.new_update_request('subnets',
subnet_data,
original_subnet['subnet']['id'])
subnet = self.deserialize(self.fmt, subnet_req.get_response(self.api))
self._assert_inventory_update(
segment_id, original_inventory, subnet=subnet['subnet'],
original_subnet=original_subnet['subnet'])
def test_update_ipv4_subnet_expand_allocation_pool(self):
self._test_update_ipv4_subnet_allocation_pools(
[{'start': '10.0.0.2', 'end': '10.0.0.100'}],
[{'start': '10.0.0.2', 'end': '10.0.0.254'}])
def test_update_ipv4_subnet_add_allocation_pool(self):
self._test_update_ipv4_subnet_allocation_pools(
[{'start': '10.0.0.2', 'end': '10.0.0.100'}],
[{'start': '10.0.0.2', 'end': '10.0.0.100'},
{'start': '10.0.0.200', 'end': '10.0.0.254'}])
def test_update_ipv4_subnet_contract_allocation_pool(self):
self._test_update_ipv4_subnet_allocation_pools(
[{'start': '10.0.0.2', 'end': '10.0.0.254'}],
[{'start': '10.0.0.2', 'end': '10.0.0.100'}])
def test_update_ipv4_subnet_remove_allocation_pool(self):
self._test_update_ipv4_subnet_allocation_pools(
[{'start': '10.0.0.2', 'end': '10.0.0.100'},
{'start': '10.0.0.200', 'end': '10.0.0.254'}],
[{'start': '10.0.0.2', 'end': '10.0.0.100'}])
def _test_update_ipv4_subnet_delete_allocation_pools(self):
segment_id, first_subnet, second_subnet = (
self._test_second_subnet_association_with_segment())
first_total, first_reserved = (
self._calculate_inventory_total_and_reserved(
first_subnet['subnet']))
second_total, second_reserved = (
self._calculate_inventory_total_and_reserved(
second_subnet['subnet']))
inventory, original_inventory = self._get_inventory(
first_total + second_total, first_reserved + second_reserved)
self.mock_p_client.get_inventory.return_value = inventory
subnet_data = {'subnet': {'allocation_pools': []}}
subnet_req = self.new_update_request('subnets',
subnet_data,
first_subnet['subnet']['id'])
subnet_req.get_response(self.api)
self._assert_inventory_update(segment_id, original_inventory,
original_subnet=first_subnet['subnet'])
return segment_id, second_subnet
def test_update_ipv4_subnet_delete_allocation_pools(self):
self._test_update_ipv4_subnet_delete_allocation_pools()
def test_update_ipv4_subnet_delete_restore_last_allocation_pool(self):
segment_id, subnet = (
self._test_update_ipv4_subnet_delete_allocation_pools())
self.mock_p_client.reset_mock()
self.mock_n_client.reset_mock()
allocation_pools = subnet['subnet']['allocation_pools']
aggregate = mock.MagicMock()
aggregate.uuid = uuidutils.generate_uuid()
aggregate.id = 1
aggregate.hosts = ['fakehost1']
self.mock_p_client.list_aggregates.return_value = {
'aggregates': [aggregate.uuid]}
self.mock_n_client.aggregates.list.return_value = [aggregate]
self.mock_n_client.aggregates.get_details.return_value = aggregate
subnet_data = {'subnet': {'allocation_pools': []}}
self._update('subnets', subnet['subnet']['id'], subnet_data)
self.batch_notifier._notify()
self._assert_inventory_delete(segment_id, aggregate)
self.mock_p_client.get_inventory.side_effect = (
neutron_exc.PlacementResourceProviderNotFound(
resource_provider=segment_id,
resource_class=seg_plugin.IPV4_RESOURCE_CLASS))
aggregate.hosts = []
self.mock_n_client.aggregates.create.return_value = aggregate
subnet_data = {'subnet': {'allocation_pools': allocation_pools}}
subnet = self._update('subnets', subnet['subnet']['id'], subnet_data)
self._assert_inventory_creation(segment_id, aggregate, subnet)
def test_add_host_to_segment_aggregate(self):
db.subscribe()
network, segment, first_subnet = (
self._test_first_subnet_association_with_segment())
segment_id = segment['segment']['id']
aggregate = mock.MagicMock()
aggregate.uuid = uuidutils.generate_uuid()
aggregate.id = 1
aggregate.hosts = ['fakehost1']
self.mock_p_client.list_aggregates.return_value = {
'aggregates': [aggregate.uuid]}
self.mock_n_client.aggregates.list.return_value = [aggregate]
host = 'otherfakehost'
helpers.register_ovs_agent(host=host,
bridge_mappings={'physnet': 'br-eth-1'},
plugin=self.plugin, start_flag=True)
self.batch_notifier._notify()
self.mock_p_client.list_aggregates.assert_called_with(segment_id)
self.assertEqual(1, self.mock_n_client.aggregates.list.call_count)
self.mock_n_client.aggregates.add_host.assert_called_with(aggregate.id,
host)
def test_add_host_to_non_existent_segment_aggregate(self):
db.subscribe()
network, segment, first_subnet = (
self._test_first_subnet_association_with_segment())
with mock.patch.object(seg_plugin.LOG, 'info') as log:
segment_id = segment['segment']['id']
aggregate = mock.MagicMock()
aggregate.uuid = uuidutils.generate_uuid()
aggregate.id = 1
aggregate.hosts = ['fakehost1']
self.mock_p_client.list_aggregates.side_effect = (
neutron_exc.PlacementAggregateNotFound(
resource_provider=segment_id))
self.mock_n_client.aggregates.list.return_value = [aggregate]
host = 'otherfakehost'
helpers.register_ovs_agent(host=host,
bridge_mappings={'physnet': 'br-eth-1'},
plugin=self.plugin, start_flag=True)
self.batch_notifier._notify()
self.mock_p_client.list_aggregates.assert_called_with(segment_id)
self.assertTrue(log.called)
self.mock_n_client.aggregates.add_host.assert_not_called()
def test_add_host_segment_aggregate_conflict(self):
db.subscribe()
network, segment, first_subnet = (
self._test_first_subnet_association_with_segment())
with mock.patch.object(seg_plugin.LOG, 'info') as log:
segment_id = segment['segment']['id']
aggregate = mock.MagicMock()
aggregate.uuid = uuidutils.generate_uuid()
aggregate.id = 1
aggregate.hosts = ['fakehost1']
self.mock_p_client.list_aggregates.return_value = {
'aggregates': [aggregate.uuid]}
self.mock_n_client.aggregates.add_host.side_effect = (
nova_exc.Conflict(nova_exc.Conflict.http_status))
self.mock_n_client.aggregates.list.return_value = [aggregate]
host = 'otherfakehost'
helpers.register_ovs_agent(host=host,
bridge_mappings={'physnet': 'br-eth-1'},
plugin=self.plugin, start_flag=True)
self.batch_notifier._notify()
self.mock_p_client.list_aggregates.assert_called_with(segment_id)
self.mock_n_client.aggregates.add_host.assert_called_with(
aggregate.id, host)
self.assertTrue(log.called)
def _assert_inventory_update_port(self, segment_id, inventory,
num_fixed_ips):
inventory['reserved'] += num_fixed_ips
self.mock_p_client.get_inventory.assert_called_with(
segment_id, seg_plugin.IPV4_RESOURCE_CLASS)
self.mock_p_client.update_inventory.assert_called_with(segment_id,
inventory, seg_plugin.IPV4_RESOURCE_CLASS)
self.assertEqual(
inventory['total'],
self.mock_p_client.update_inventory.call_args[0][1]['total'])
self.assertEqual(
inventory['reserved'],
self.mock_p_client.update_inventory.call_args[0][1]['reserved'])
self.mock_p_client.reset_mock()
self.mock_n_client.reset_mock()
def _create_test_port(self, network_id, tenant_id, subnet, **kwargs):
port = self._make_port(self.fmt, network_id, tenant_id=tenant_id,
arg_list=(portbindings.HOST_ID,), **kwargs)
self.batch_notifier._notify()
return port
def _test_create_port(self, **kwargs):
network, segment, subnet = (
self._test_first_subnet_association_with_segment())
total, reserved = self._calculate_inventory_total_and_reserved(
subnet['subnet'])
inventory, original_inventory = self._get_inventory(total, reserved)
self.mock_p_client.get_inventory.return_value = inventory
port = self._create_test_port(network['network']['id'],
network['network']['tenant_id'], subnet,
**kwargs)
return segment['segment']['id'], original_inventory, port
def test_create_bound_port(self):
kwargs = {portbindings.HOST_ID: 'fakehost'}
segment_id, original_inventory, _ = self._test_create_port(**kwargs)
self._assert_inventory_update_port(segment_id, original_inventory, 1)
def test_create_bound_port_compute_owned(self):
kwargs = {portbindings.HOST_ID: 'fakehost',
'device_owner': constants.DEVICE_OWNER_COMPUTE_PREFIX}
self._test_create_port(**kwargs)
self.mock_p_client.get_inventory.assert_not_called()
self.mock_p_client.update_inventory.assert_not_called()
def test_create_bound_port_dhcp_owned(self):
kwargs = {portbindings.HOST_ID: 'fakehost',
'device_owner': constants.DEVICE_OWNER_DHCP}
self._test_create_port(**kwargs)
self.mock_p_client.get_inventory.assert_not_called()
self.mock_p_client.update_inventory.assert_not_called()
def test_create_unbound_port(self):
self._test_create_port()
self.mock_p_client.get_inventory.assert_not_called()
self.mock_p_client.update_inventory.assert_not_called()
def test_delete_bound_port(self):
kwargs = {portbindings.HOST_ID: 'fakehost'}
segment_id, before_create_inventory, port = self._test_create_port(
**kwargs)
self.mock_p_client.reset_mock()
inventory, original_inventory = self._get_inventory(
before_create_inventory['total'],
before_create_inventory['reserved'] + 1)
self.mock_p_client.get_inventory.return_value = inventory
self._delete('ports', port['port']['id'])
self.batch_notifier._notify()
self._assert_inventory_update_port(segment_id, original_inventory, -1)
def _create_port_for_update_test(self, num_fixed_ips=1, dhcp_owned=False,
compute_owned=False):
segment_id, first_subnet, second_subnet = (
self._test_second_subnet_association_with_segment())
first_total, first_reserved = (
self._calculate_inventory_total_and_reserved(
first_subnet['subnet']))
second_total, second_reserved = (
self._calculate_inventory_total_and_reserved(
second_subnet['subnet']))
inventory, original_inventory = self._get_inventory(
first_total + second_total, first_reserved + second_reserved)
self.mock_p_client.get_inventory.return_value = inventory
kwargs = {portbindings.HOST_ID: 'fakehost',
'fixed_ips': [{'subnet_id': first_subnet['subnet']['id']}]}
created_fixed_ips = num_fixed_ips
if num_fixed_ips > 1:
kwargs['fixed_ips'].append(
{'subnet_id': second_subnet['subnet']['id']})
if dhcp_owned:
kwargs['device_owner'] = constants.DEVICE_OWNER_DHCP
if compute_owned:
kwargs['device_owner'] = constants.DEVICE_OWNER_COMPUTE_PREFIX
port = self._create_test_port(first_subnet['subnet']['network_id'],
first_subnet['subnet']['tenant_id'],
first_subnet, **kwargs)
if dhcp_owned or compute_owned:
self.mock_p_client.get_inventory.assert_not_called()
self.mock_p_client.update_inventory.assert_not_called()
else:
self._assert_inventory_update_port(segment_id, original_inventory,
created_fixed_ips)
return first_subnet, second_subnet, port
def _port_update(self, first_subnet, second_subnet, fixed_ips_subnets,
port, reserved_increment_before=1,
reserved_increment_after=1, dhcp_owned=False,
compute_owned=False):
first_total, first_reserved = (
self._calculate_inventory_total_and_reserved(
first_subnet['subnet']))
second_total, second_reserved = (
self._calculate_inventory_total_and_reserved(
second_subnet['subnet']))
inventory, original_inventory = self._get_inventory(
first_total + second_total,
first_reserved + second_reserved + reserved_increment_before)
self.mock_p_client.get_inventory.return_value = inventory
port_data = {'port': {'device_owner': ''}}
if fixed_ips_subnets:
port_data['port']['fixed_ips'] = []
for subnet in fixed_ips_subnets:
port_data['port']['fixed_ips'].append(
{'subnet_id': subnet['subnet']['id']})
if dhcp_owned:
port_data['port']['device_owner'] = constants.DEVICE_OWNER_DHCP
if compute_owned:
port_data['port']['device_owner'] = (
constants.DEVICE_OWNER_COMPUTE_PREFIX)
self._update('ports', port['port']['id'], port_data)
self.batch_notifier._notify()
self._assert_inventory_update_port(
first_subnet['subnet']['segment_id'], original_inventory,
reserved_increment_after)
def test_update_port_add_fixed_ip(self):
first_subnet, second_subnet, port = self._create_port_for_update_test()
self._port_update(first_subnet, second_subnet,
[first_subnet, second_subnet], port)
def test_update_port_remove_fixed_ip(self):
first_subnet, second_subnet, port = self._create_port_for_update_test(
num_fixed_ips=2)
self._port_update(first_subnet, second_subnet,
[first_subnet], port, reserved_increment_before=2,
reserved_increment_after=-1)
def test_update_port_change_to_dhcp_owned(self):
first_subnet, second_subnet, port = self._create_port_for_update_test()
self._port_update(first_subnet, second_subnet, [], port,
reserved_increment_after=-1, dhcp_owned=True)
def test_update_port_change_to_no_dhcp_owned(self):
first_subnet, second_subnet, port = self._create_port_for_update_test(
dhcp_owned=True)
self._port_update(first_subnet, second_subnet, [], port,
reserved_increment_before=0,
reserved_increment_after=1)
def test_update_port_change_to_compute_owned(self):
first_subnet, second_subnet, port = self._create_port_for_update_test()
self._port_update(first_subnet, second_subnet, [], port,
reserved_increment_after=-1, compute_owned=True)
def test_update_port_change_to_no_compute_owned(self):
first_subnet, second_subnet, port = self._create_port_for_update_test(
compute_owned=True)
self._port_update(first_subnet, second_subnet, [], port,
reserved_increment_before=0,
reserved_increment_after=1)
def test_placement_api_inventory_update_conflict(self):
with mock.patch.object(seg_plugin.LOG, 'debug') as log_debug:
with mock.patch.object(seg_plugin.LOG, 'error') as log_error:
event = seg_plugin.Event(mock.ANY, mock.ANY, total=1,
reserved=0)
inventory, original_inventory = self._get_inventory(100, 2)
self.mock_p_client.get_inventory.return_value = inventory
self.mock_p_client.update_inventory.side_effect = (
neutron_exc.PlacementInventoryUpdateConflict(
resource_provider=mock.ANY,
resource_class=seg_plugin.IPV4_RESOURCE_CLASS))
self.segments_plugin.nova_updater._update_nova_inventory(event)
self.assertEqual(seg_plugin.MAX_INVENTORY_UPDATE_RETRIES,
self.mock_p_client.get_inventory.call_count)
self.assertEqual(
seg_plugin.MAX_INVENTORY_UPDATE_RETRIES,
self.mock_p_client.update_inventory.call_count)
self.assertEqual(
seg_plugin.MAX_INVENTORY_UPDATE_RETRIES,
log_debug.call_count)
self.assertTrue(log_error.called)
def test_placement_api_not_available(self):
with mock.patch.object(seg_plugin.LOG, 'debug') as log:
event = seg_plugin.Event(
self.segments_plugin.nova_updater._update_nova_inventory,
mock.ANY, total=1, reserved=0)
self.mock_p_client.get_inventory.side_effect = (
neutron_exc.PlacementEndpointNotFound())
self.segments_plugin.nova_updater._send_notifications([event])
self.assertTrue(log.called)
class TestDhcpAgentSegmentScheduling(HostSegmentMappingTestCase):
_mechanism_drivers = ['openvswitch', 'logger']
@ -1613,3 +2190,9 @@ class PlacementAPIClientTestCase(base.DietTestCase):
self.mock_request.side_effect = ks_exc.NotFound
self.assertRaises(neutron_exc.PlacementAggregateNotFound,
self.client.list_aggregates, rp_uuid)
def test_placement_api_not_found(self):
rp_uuid = uuidutils.generate_uuid()
self.mock_request.side_effect = ks_exc.EndpointNotFound
self.assertRaises(neutron_exc.PlacementEndpointNotFound,
self.client.list_aggregates, rp_uuid)