Use push-notificates for OVSPluginAPI

Replace the calls to the OVSPluginAPI info retrieval functions
with reads directly from the push notification cache.

Since we now depend on the cache for the source of truth, the
'port_update'/'port_delete'/'network_update' handlers are configured
to be called whenever the cache receives a corresponding resource update.
The OVS agent will no longer subscribe to topic notifications for ports
or networks from the legacy notification API.

Partially-Implements: blueprint push-notifications
Change-Id: Ib2234ec1f5d328649c6bb1c3fe07799d3e351f48
This commit is contained in:
Kevin Benton
2017-01-22 17:01:47 -08:00
parent fb825f2b9c
commit c3db9d6b0b
4 changed files with 133 additions and 6 deletions

View File

@@ -16,13 +16,22 @@
from datetime import datetime from datetime import datetime
import itertools import itertools
import netaddr
from neutron_lib import constants from neutron_lib import constants
from oslo_log import log as logging
import oslo_messaging import oslo_messaging
from oslo_utils import uuidutils from oslo_utils import uuidutils
from neutron.agent import resource_cache
from neutron.api.rpc.callbacks import resources
from neutron.callbacks import events as callback_events
from neutron.callbacks import registry
from neutron.common import constants as n_const from neutron.common import constants as n_const
from neutron.common import rpc as n_rpc from neutron.common import rpc as n_rpc
from neutron.common import topics from neutron.common import topics
from neutron import objects
LOG = logging.getLogger(__name__)
def create_consumers(endpoints, prefix, topic_details, start_listening=True): def create_consumers(endpoints, prefix, topic_details, start_listening=True):
@@ -145,3 +154,115 @@ class PluginApi(object):
cctxt = self.client.prepare(version='1.4') cctxt = self.client.prepare(version='1.4')
return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip, return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type, host=host) tunnel_type=tunnel_type, host=host)
def create_cache_for_l2_agent():
"""Create a push-notifications cache for L2 agent related resources."""
objects.register_objects()
resource_types = [
resources.PORT,
resources.SECURITYGROUP,
resources.SECURITYGROUPRULE,
resources.NETWORK,
resources.SUBNET
]
rcache = resource_cache.RemoteResourceCache(resource_types)
rcache.start_watcher()
# TODO(kevinbenton): ensure flood uses filters or that this has a long
# timeout before Pike release.
rcache.bulk_flood_cache()
return rcache
class CacheBackedPluginApi(PluginApi):
def __init__(self, *args, **kwargs):
super(CacheBackedPluginApi, self).__init__(*args, **kwargs)
self.remote_resource_cache = create_cache_for_l2_agent()
def register_legacy_notification_callbacks(self, legacy_interface):
"""Emulates the server-side notifications from ml2 AgentNotifierApi.
legacy_interface is an object with 'delete'/'update' methods for
core resources.
"""
self._legacy_interface = legacy_interface
for e in (callback_events.AFTER_UPDATE, callback_events.AFTER_DELETE):
for r in (resources.PORT, resources.NETWORK):
registry.subscribe(self._legacy_notifier, r, e)
def _legacy_notifier(self, rtype, event, trigger, context, resource_id,
**kwargs):
"""Checks if legacy interface is expecting calls for resource.
looks for port_update, network_delete, etc and calls them with
the payloads the handlers are expecting (an ID).
"""
rtype = rtype.lower() # all legacy handlers don't camelcase
is_delete = event == callback_events.AFTER_DELETE
suffix = 'delete' if is_delete else 'update'
method = "%s_%s" % (rtype, suffix)
if not hasattr(self._legacy_interface, method):
# TODO(kevinbenton): once these notifications are stable, emit
# a deprecation warning for legacy handlers
return
payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id}
getattr(self._legacy_interface, method)(context, **payload)
def get_devices_details_list_and_failed_devices(self, context, devices,
agent_id, host=None):
result = {'devices': [], 'failed_devices': []}
for device in devices:
try:
result['devices'].append(
self.get_device_details(context, device, agent_id, host))
except Exception:
LOG.exception("Failed to get details for device %s", device)
result['failed_devices'].append(device)
return result
def get_device_details(self, context, device, agent_id, host=None):
port_obj = self.remote_resource_cache.get_resource_by_id(
resources.PORT, device)
if not port_obj:
LOG.debug("Device %s does not exist in cache.", device)
return {'device': device}
if not port_obj.binding_levels:
LOG.warning("Device %s is not bound.", port_obj)
return {'device': device}
segment = port_obj.binding_levels[-1].segment
net = self.remote_resource_cache.get_resource_by_id(
resources.NETWORK, port_obj.network_id)
net_qos_policy_id = net.qos_policy_id
# match format of old RPC interface
mac_addr = str(netaddr.EUI(str(port_obj.mac_address),
dialect=netaddr.mac_unix_expanded))
entry = {
'device': device,
'network_id': port_obj.network_id,
'port_id': port_obj.id,
'mac_address': mac_addr,
'admin_state_up': port_obj.admin_state_up,
'network_type': segment.network_type,
'segmentation_id': segment.segmentation_id,
'physical_network': segment.physical_network,
'fixed_ips': [{'subnet_id': o.subnet_id,
'ip_address': str(o.ip_address)}
for o in port_obj.fixed_ips],
'device_owner': port_obj.device_owner,
'allowed_address_pairs': [{'mac_address': o.mac_address,
'ip_address': o.ip_address}
for o in port_obj.allowed_address_pairs],
'port_security_enabled': port_obj.security.port_security_enabled,
'qos_policy_id': port_obj.qos_policy_id,
'network_qos_policy_id': net_qos_policy_id,
'profile': port_obj.binding.profile,
'security_groups': list(port_obj.security_group_ids)
}
LOG.debug("Returning: %s", entry)
return entry
def get_devices_details_list(self, context, devices, agent_id, host=None):
return [self.get_device_details(context, device, agent_id, host)
for device in devices]

View File

@@ -80,7 +80,7 @@ class _mac_mydialect(netaddr.mac_unix):
word_fmt = '%.2x' word_fmt = '%.2x'
class OVSPluginApi(agent_rpc.PluginApi): class OVSPluginApi(agent_rpc.CacheBackedPluginApi):
pass pass
@@ -360,6 +360,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def setup_rpc(self): def setup_rpc(self):
self.plugin_rpc = OVSPluginApi(topics.PLUGIN) self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
# allow us to receive port_update/delete callbacks from the cache
self.plugin_rpc.register_legacy_notification_callbacks(self)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN) self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN) self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS) self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
@@ -367,13 +369,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# RPC network init # RPC network init
self.context = context.get_admin_context_without_session() self.context = context.get_admin_context_without_session()
# Define the listening consumers for the agent # Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE], consumers = [[constants.TUNNEL, topics.UPDATE],
[topics.PORT, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE], [constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE], [topics.DVR, topics.UPDATE]]
[topics.NETWORK, topics.UPDATE]]
if self.l2_pop: if self.l2_pop:
consumers.append([topics.L2POPULATION, topics.UPDATE]) consumers.append([topics.L2POPULATION, topics.UPDATE])
self.connection = agent_rpc.create_consumers([self], self.connection = agent_rpc.create_consumers([self],

View File

@@ -42,6 +42,7 @@ from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \
NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi' NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi'
PULLAPI = 'neutron.api.rpc.handlers.resources_rpc.ResourcesPullRpcApi'
OVS_LINUX_KERN_VERS_WITHOUT_VXLAN = "3.12.0" OVS_LINUX_KERN_VERS_WITHOUT_VXLAN = "3.12.0"
FAKE_MAC = '00:11:22:33:44:55' FAKE_MAC = '00:11:22:33:44:55'
@@ -101,6 +102,7 @@ class TestOvsNeutronAgent(object):
def setUp(self): def setUp(self):
super(TestOvsNeutronAgent, self).setUp() super(TestOvsNeutronAgent, self).setUp()
self.useFixture(test_vlanmanager.LocalVlanManagerFixture()) self.useFixture(test_vlanmanager.LocalVlanManagerFixture())
mock.patch(PULLAPI).start()
notifier_p = mock.patch(NOTIFIER) notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start() notifier_cls = notifier_p.start()
self.notifier = mock.Mock() self.notifier = mock.Mock()
@@ -2165,6 +2167,7 @@ class AncillaryBridgesTest(object):
'neutron.agent.ovsdb.impl_idl._connection') 'neutron.agent.ovsdb.impl_idl._connection')
conn_patcher.start() conn_patcher.start()
self.addCleanup(conn_patcher.stop) self.addCleanup(conn_patcher.stop)
mock.patch(PULLAPI).start()
notifier_p = mock.patch(NOTIFIER) notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start() notifier_cls = notifier_p.start()
self.notifier = mock.Mock() self.notifier = mock.Mock()
@@ -2284,6 +2287,7 @@ class TestOvsDvrNeutronAgent(object):
def setUp(self): def setUp(self):
super(TestOvsDvrNeutronAgent, self).setUp() super(TestOvsDvrNeutronAgent, self).setUp()
mock.patch(PULLAPI).start()
notifier_p = mock.patch(NOTIFIER) notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start() notifier_cls = notifier_p.start()
self.notifier = mock.Mock() self.notifier = mock.Mock()

View File

@@ -80,6 +80,9 @@ class TunnelTest(object):
conn_patcher = mock.patch( conn_patcher = mock.patch(
'neutron.agent.ovsdb.impl_idl._connection') 'neutron.agent.ovsdb.impl_idl._connection')
conn_patcher.start() conn_patcher.start()
mock.patch(
'neutron.api.rpc.handlers.resources_rpc.ResourcesPullRpcApi'
).start()
self.addCleanup(conn_patcher.stop) self.addCleanup(conn_patcher.stop)
cfg.CONF.set_default('firewall_driver', cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver', 'neutron.agent.firewall.NoopFirewallDriver',