Add RPC for network ports
This adds an RPC to request notifications for ports on a network. This will be used by a subsequent patch to trigger port update notifications whenever resources unrelated to a port change (e.g. let an instance's port get notified if a HA DHCP port on a given network goes down). Change-Id: Ib316028dc9cfd8dd368787e7a25058af49f3e928
This commit is contained in:
parent
ed7dbd8e93
commit
7ac47391fd
@ -13,7 +13,9 @@
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.db import api as db_api
|
||||
from neutron.extensions import portbindings
|
||||
from neutron.plugins.ml2 import rpc as ml2_rpc
|
||||
from neutron_lib import constants
|
||||
from opflexagent import rpc as o_rpc
|
||||
from oslo_log import log
|
||||
|
||||
@ -104,6 +106,32 @@ class AIMMappingRPCMixin(ha_ip_db.HAIPOwnerDbMixin):
|
||||
LOG.exception(e)
|
||||
return None
|
||||
|
||||
# REVISIT: this should exist in the mechanism driver, and should
|
||||
# be addressed by any patch that refactors this RPC class
|
||||
def notify_filtered_ports_per_network(self, context, **kwargs):
|
||||
LOG.debug("APIC AIM handling get_ports_for_network for: %s", kwargs)
|
||||
try:
|
||||
host_id = kwargs.get('host')
|
||||
network_id = kwargs.get('network')
|
||||
core_plugin = self._core_plugin
|
||||
filters = {'network_id': [network_id]}
|
||||
ports_to_update = core_plugin.get_ports(context, filters)
|
||||
# Exclude DHCP and LBaaS ports -- these are possible triggers
|
||||
# for the network notifaction that resulted in the agents calling
|
||||
# this RPC. Updates for those ports and will be handled from their
|
||||
# own port notifications, if needed.
|
||||
for p in ports_to_update:
|
||||
if (p.get(portbindings.HOST_ID) == host_id) and not (
|
||||
p['device_owner'].startswith(
|
||||
constants.DEVICE_OWNER_DHCP) or
|
||||
p['device_owner'].startswith(
|
||||
constants.DEVICE_OWNER_LOADBALANCERV2)):
|
||||
self._send_port_update_notification(context, p['id'])
|
||||
except Exception as e:
|
||||
LOG.error("An exception has occurred while requesting ports "
|
||||
"for network %s", kwargs.get('network'))
|
||||
LOG.exception(e)
|
||||
|
||||
# Child class needs to support:
|
||||
# - self._send_port_update_notification(context, port)
|
||||
def ip_address_owner_update(self, context, **kwargs):
|
||||
|
@ -221,9 +221,10 @@ class AIMBaseTestCase(test_nr_base.CommonNeutronBaseTestCase,
|
||||
return super(AIMBaseTestCase, self)._bind_port_to_host(
|
||||
port_id, host, data=data)
|
||||
|
||||
def _bind_dhcp_port_to_host(self, port_id, host):
|
||||
def _bind_other_port_to_host(self, port_id, host,
|
||||
owner=n_constants.DEVICE_OWNER_DHCP):
|
||||
data = {'port': {'binding:host_id': host,
|
||||
'device_owner': 'network:dhcp',
|
||||
'device_owner': owner,
|
||||
'device_id': 'someid'}}
|
||||
return super(AIMBaseTestCase, self)._bind_port_to_host(
|
||||
port_id, host, data=data)
|
||||
@ -3225,6 +3226,46 @@ class TestPolicyTarget(AIMBaseTestCase):
|
||||
def test_get_gbp_details_no_pt_no_as_unrouted(self):
|
||||
self._do_test_gbp_details_no_pt(use_as=False, routed=False)
|
||||
|
||||
def test_notify_filtered_ports_per_network(self, pre_vrf=None):
|
||||
l3p = self.create_l3_policy(name='myl3')['l3_policy']
|
||||
l2p = self.create_l2_policy(name='myl2',
|
||||
l3_policy_id=l3p['id'])['l2_policy']
|
||||
ptg = self.create_policy_target_group(
|
||||
name="ptg1", l2_policy_id=l2p['id'])['policy_target_group']
|
||||
net_id = l2p['network_id']
|
||||
|
||||
pt1 = self.create_policy_target(name="pt1",
|
||||
policy_target_group_id=ptg['id'])['policy_target']
|
||||
pt2 = self.create_policy_target(name="pt2",
|
||||
policy_target_group_id=ptg['id'])['policy_target']
|
||||
pt3 = self.create_policy_target(name="pt3",
|
||||
policy_target_group_id=ptg['id'])['policy_target']
|
||||
with mock.patch.object(self.driver,
|
||||
'_send_port_update_notification') as notify:
|
||||
ctx = self._neutron_admin_context
|
||||
self._bind_port_to_host(pt1['port_id'], 'h1')
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h1', network=net_id)
|
||||
notify.assert_called_with(mock.ANY, pt1['port_id'])
|
||||
notify.reset_mock()
|
||||
self._bind_other_port_to_host(pt2['port_id'], 'h2')
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h2', network=net_id)
|
||||
notify.assert_not_called()
|
||||
self._bind_other_port_to_host(pt3['port_id'], 'h3',
|
||||
n_constants.DEVICE_OWNER_LOADBALANCERV2)
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h3', network=net_id)
|
||||
notify.assert_not_called()
|
||||
# test non-existing host
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h4', network=net_id)
|
||||
notify.assert_not_called()
|
||||
# test non-existing network
|
||||
self.driver.notify_filtered_ports_per_network(ctx,
|
||||
host='h3', network='foo')
|
||||
notify.assert_not_called()
|
||||
|
||||
def test_ip_address_owner_update(self):
|
||||
l3p = self.create_l3_policy(name='myl3')['l3_policy']
|
||||
l2p = self.create_l2_policy(name='myl2',
|
||||
@ -3436,7 +3477,7 @@ class TestPolicyTargetDvs(AIMBaseTestCase):
|
||||
self.agent_conf = AGENT_CONF
|
||||
pt2 = self.create_policy_target(
|
||||
policy_target_group_id=ptg['id'])['policy_target']
|
||||
newp2 = self._bind_dhcp_port_to_host(pt2['port_id'], 'h1')
|
||||
newp2 = self._bind_other_port_to_host(pt2['port_id'], 'h1')
|
||||
vif_details = newp2['port']['binding:vif_details']
|
||||
self.assertIsNone(vif_details.get('dvs_port_group_name'))
|
||||
port_key = newp2['port']['binding:vif_details'].get('dvs_port_key')
|
||||
|
Loading…
Reference in New Issue
Block a user