Create notifications for dependent ports
Changes to certain neutron resources, such as DHCP or LBaaS ports, require notiications be generated to ports who depend on those changes (e.g. if a DHCP agent port state changes, the host route to the metadata agent needs updating). This patch checks if: o an update or delete to a DHCP or LBaaS port occurs o the port belongs to an opflex type network If both of the above conditions are met, the plugin generates port update notifications to all the ports on that network, so they can update their parameters using the request_endpoint_details_list RPC. Change-Id: Ida893b1c331c4778a1276de54a379627d54b7d23
This commit is contained in:
parent
2d2ef04040
commit
0db062f60b
@ -37,6 +37,7 @@ from neutron.db import models_v2
|
||||
from neutron.db import rbac_db_models
|
||||
from neutron.db import segments_db
|
||||
from neutron.extensions import portbindings
|
||||
from neutron.extensions import providernet as provider
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import constants as pconst
|
||||
from neutron.plugins.ml2 import driver_api as api
|
||||
@ -331,6 +332,37 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
|
||||
if not self.aim.get(aim_ctx, ap):
|
||||
self.aim.create(aim_ctx, ap)
|
||||
|
||||
def _is_dependent_port_change(self, port):
|
||||
# For now, we are only handling changes to DHCP
|
||||
# ports and LBaaS ports. This can be expanded in
|
||||
# the future to handle other changes, if needed
|
||||
return bool(port and any(port['device_owner'].startswith(x) for x in
|
||||
(n_constants.DEVICE_OWNER_DHCP,
|
||||
n_constants.DEVICE_OWNER_LOADBALANCERV2)))
|
||||
|
||||
def _notify_if_dependent_port_change(self, context, port):
|
||||
# Under some scenarios, opflex agents might not get
|
||||
# triggers to update EP files, even though parameters
|
||||
# contained in the files have changed (e.g. metadata
|
||||
# route when using HA DHCP). We ensure that the EP files
|
||||
# get updated by triggering port update notifications
|
||||
# to the agents, which tells them to go get all their EP
|
||||
# files for the ports they own on that network.
|
||||
if self._is_dependent_port_change(port):
|
||||
plugin_context = context._plugin_context
|
||||
net = self.plugin.get_network(plugin_context, port['network_id'])
|
||||
# Notifications are only needed to opflex networks, which will
|
||||
# have opflex agents
|
||||
if not net or not self._is_opflex_type(net[provider.NETWORK_TYPE]):
|
||||
return
|
||||
|
||||
filters = {'network_id': [net['id']]}
|
||||
ports_to_update = self.plugin.get_ports(plugin_context, filters)
|
||||
# Exclude ports that triggered this notification
|
||||
affected_port_ids = [p['id'] for p in ports_to_update
|
||||
if not self._is_dependent_port_change(p)]
|
||||
self._notify_port_update_bulk(plugin_context, affected_port_ids)
|
||||
|
||||
def create_network_precommit(self, context):
|
||||
current = context.current
|
||||
LOG.debug("APIC AIM MD creating network: %s", current)
|
||||
@ -1410,6 +1442,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
|
||||
context.bottom_bound_segment,
|
||||
context.host
|
||||
)
|
||||
# REVISIT: it may be possible to move this to the precommit
|
||||
self._notify_if_dependent_port_change(context, port)
|
||||
|
||||
def delete_port_precommit(self, context):
|
||||
port = context.current
|
||||
@ -1556,6 +1590,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
|
||||
context.bottom_bound_segment,
|
||||
context.host
|
||||
)
|
||||
# REVISIT: it may be possible to move this to the precommit
|
||||
self._notify_if_dependent_port_change(context, port)
|
||||
|
||||
def create_floatingip(self, context, current):
|
||||
if current['port_id']:
|
||||
|
@ -15,9 +15,7 @@ from apic_ml2.neutron.db import port_ha_ipaddress_binding as ha_ip_db
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.db import api as db_api
|
||||
from neutron.extensions import portbindings
|
||||
from neutron.plugins.ml2 import rpc as ml2_rpc
|
||||
from neutron_lib import constants
|
||||
from opflexagent import rpc as o_rpc
|
||||
from oslo_log import log
|
||||
|
||||
@ -107,32 +105,6 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
|
||||
LOG.exception(e)
|
||||
return None
|
||||
|
||||
# REVISIT: this should exist in the mechanism driver, and should
|
||||
# be addressed by any patch that refactors this RPC class
|
||||
def notify_filtered_ports_per_network(self, context, **kwargs):
|
||||
LOG.debug("APIC AIM handling get_ports_for_network for: %s", kwargs)
|
||||
try:
|
||||
host_id = kwargs.get('host')
|
||||
network_id = kwargs.get('network')
|
||||
core_plugin = self._core_plugin
|
||||
filters = {'network_id': [network_id]}
|
||||
ports_to_update = core_plugin.get_ports(context, filters)
|
||||
# Exclude DHCP and LBaaS ports -- these are possible triggers
|
||||
# for the network notifaction that resulted in the agents calling
|
||||
# this RPC. Updates for those ports and will be handled from their
|
||||
# own port notifications, if needed.
|
||||
for p in ports_to_update:
|
||||
if (p.get(portbindings.HOST_ID) == host_id) and not (
|
||||
p['device_owner'].startswith(
|
||||
constants.DEVICE_OWNER_DHCP) or
|
||||
p['device_owner'].startswith(
|
||||
constants.DEVICE_OWNER_LOADBALANCERV2)):
|
||||
self._send_port_update_notification(context, p['id'])
|
||||
except Exception as e:
|
||||
LOG.error("An exception has occurred while requesting ports "
|
||||
"for network %s", kwargs.get('network'))
|
||||
LOG.exception(e)
|
||||
|
||||
# Child class needs to support:
|
||||
# - self._send_port_update_notification(context, port)
|
||||
def ip_address_owner_update(self, context, **kwargs):
|
||||
|
@ -293,9 +293,10 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase,
|
||||
self.fmt)
|
||||
return self.deserialize(self.fmt, req.get_response(self.api))
|
||||
|
||||
def _bind_dhcp_port_to_host(self, port_id, host):
|
||||
def _bind_other_port_to_host(self, port_id, host,
|
||||
owner=n_constants.DEVICE_OWNER_DHCP):
|
||||
data = {'port': {'binding:host_id': host,
|
||||
'device_owner': 'network:dhcp',
|
||||
'device_owner': owner,
|
||||
'device_id': 'someid'}}
|
||||
# Create EP with bound port
|
||||
req = self.new_update_request('ports', data, port_id,
|
||||
@ -3695,6 +3696,45 @@ class TestPortBinding(ApicAimTestCase):
|
||||
port['binding:vif_details'])
|
||||
self.assertEqual(n_constants.PORT_STATUS_ACTIVE, port['status'])
|
||||
|
||||
def test_bind_dependent_port_triggers_port_notify(self):
|
||||
# Test that a port update to a DHCP or LBaaS port triggers a
|
||||
# port update notification to a compute port on the same net,
|
||||
# but not to a compute port on a different net
|
||||
self._register_agent('host1', AGENT_CONF_OPFLEX)
|
||||
self._register_agent('host2', AGENT_CONF_OPFLEX)
|
||||
net1 = self._make_network(self.fmt, 'net1', True)
|
||||
self._make_subnet(self.fmt, net1, '10.0.1.1', '10.0.1.0/24')
|
||||
net2 = self._make_network(self.fmt, 'net2', True)
|
||||
self._make_subnet(self.fmt, net1, '20.0.1.1', '20.0.1.0/24')
|
||||
|
||||
p1 = self._make_port(self.fmt, net1['network']['id'])['port']
|
||||
p1 = self._bind_port_to_host(p1['id'], 'host1')['port']
|
||||
self.assertEqual(net1['network']['id'], p1['network_id'])
|
||||
self.assertEqual('ovs', p1['binding:vif_type'])
|
||||
self.assertEqual({'port_filter': False, 'ovs_hybrid_plug': False},
|
||||
p1['binding:vif_details'])
|
||||
p2 = self._make_port(self.fmt, net2['network']['id'])['port']
|
||||
p2 = self._bind_port_to_host(p2['id'], 'host2')['port']
|
||||
self.assertEqual(net2['network']['id'], p2['network_id'])
|
||||
self.assertEqual('ovs', p2['binding:vif_type'])
|
||||
self.assertEqual({'port_filter': False, 'ovs_hybrid_plug': False},
|
||||
p2['binding:vif_details'])
|
||||
with mock.patch.object(self.driver,
|
||||
'_notify_port_update_bulk') as notify:
|
||||
p3 = self._make_port(self.fmt, net1['network']['id'],
|
||||
device_owner=n_constants.DEVICE_OWNER_DHCP)['port']
|
||||
p3 = self._bind_other_port_to_host(p3['id'], 'host1',
|
||||
owner=n_constants.DEVICE_OWNER_DHCP)['port']
|
||||
mock_calls = [mock.call(mock.ANY, p1['id']),
|
||||
mock.call(mock.ANY, p1['id'])]
|
||||
notify.has_calls(mock_calls)
|
||||
notify.reset_mock()
|
||||
p4 = self._make_port(self.fmt, net1['network']['id'],
|
||||
device_owner=n_constants.DEVICE_OWNER_LOADBALANCERV2)['port']
|
||||
p4 = self._bind_other_port_to_host(p4['id'], 'host1',
|
||||
owner=n_constants.DEVICE_OWNER_LOADBALANCERV2)['port']
|
||||
notify.has_calls(mock_calls)
|
||||
|
||||
# TODO(rkukura): Add tests for opflex, local and unsupported
|
||||
# network_type values.
|
||||
|
||||
@ -3792,7 +3832,7 @@ class TestPortBindingDvs(ApicAimTestCase):
|
||||
self.driver.dvs_notifier.reset_mock()
|
||||
p2 = self._make_port(self.fmt, net['network']['id'])['port']
|
||||
self.assertEqual(net['network']['id'], p2['network_id'])
|
||||
newp2 = self._bind_dhcp_port_to_host(p2['id'], 'h1')
|
||||
newp2 = self._bind_other_port_to_host(p2['id'], 'h1')
|
||||
# Called on the network's tenant
|
||||
vif_det = newp2['port']['binding:vif_details']
|
||||
self.assertIsNone(vif_det.get('dvs_port_group_name', None))
|
||||
@ -3828,7 +3868,7 @@ class TestPortBindingDvs(ApicAimTestCase):
|
||||
p2 = self._make_port(self.fmt, net['network']['id'])['port']
|
||||
self.assertEqual(net['network']['id'], p2['network_id'])
|
||||
# Bind port to trigger path binding
|
||||
newp2 = self._bind_dhcp_port_to_host(p2['id'], 'h1')
|
||||
newp2 = self._bind_other_port_to_host(p2['id'], 'h1')
|
||||
# Called on the network's tenant
|
||||
vif_det = newp2['port']['binding:vif_details']
|
||||
self.assertIsNone(vif_det.get('dvs_port_group_name', None))
|
||||
|
@ -3228,46 +3228,6 @@ class TestPolicyTarget(AIMBaseTestCase):
|
||||
def test_get_gbp_details_no_pt_no_as_unrouted(self):
|
||||
self._do_test_gbp_details_no_pt(use_as=False, routed=False)
|
||||
|
||||
def test_notify_filtered_ports_per_network(self, pre_vrf=None):
|
||||
l3p = self.create_l3_policy(name='myl3')['l3_policy']
|
||||
l2p = self.create_l2_policy(name='myl2',
|
||||
l3_policy_id=l3p['id'])['l2_policy']
|
||||
ptg = self.create_policy_target_group(
|
||||
name="ptg1", l2_policy_id=l2p['id'])['policy_target_group']
|
||||
net_id = l2p['network_id']
|
||||
|
||||
pt1 = self.create_policy_target(name="pt1",
|
||||
policy_target_group_id=ptg['id'])['policy_target']
|
||||
pt2 = self.create_policy_target(name="pt2",
|
||||
policy_target_group_id=ptg['id'])['policy_target']
|
||||
pt3 = self.create_policy_target(name="pt3",
|
||||
policy_target_group_id=ptg['id'])['policy_target']
|
||||
with mock.patch.object(self.driver,
|
||||
'_send_port_update_notification') as notify:
|
||||
ctx = self._neutron_admin_context
|
||||
self._bind_port_to_host(pt1['port_id'], 'h1')
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h1', network=net_id)
|
||||
notify.assert_called_with(mock.ANY, pt1['port_id'])
|
||||
notify.reset_mock()
|
||||
self._bind_other_port_to_host(pt2['port_id'], 'h2')
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h2', network=net_id)
|
||||
notify.assert_not_called()
|
||||
self._bind_other_port_to_host(pt3['port_id'], 'h3',
|
||||
n_constants.DEVICE_OWNER_LOADBALANCERV2)
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h3', network=net_id)
|
||||
notify.assert_not_called()
|
||||
# test non-existing host
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h4', network=net_id)
|
||||
notify.assert_not_called()
|
||||
# test non-existing network
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h3', network='foo')
|
||||
notify.assert_not_called()
|
||||
|
||||
def test_ip_address_owner_update(self):
|
||||
l3p = self.create_l3_policy(name='myl3')['l3_policy']
|
||||
l2p = self.create_l2_policy(name='myl2',
|
||||
|
Loading…
Reference in New Issue
Block a user