Merge "Use push-notificates for OVSPluginAPI"

This commit is contained in:
Jenkins 2017-05-27 05:11:49 +00:00 committed by Gerrit Code Review
commit de92596b9b
4 changed files with 133 additions and 6 deletions

View File

@ -16,13 +16,22 @@
from datetime import datetime
import itertools
import netaddr
from neutron_lib import constants
from oslo_log import log as logging
import oslo_messaging
from oslo_utils import uuidutils
from neutron.agent import resource_cache
from neutron.api.rpc.callbacks import resources
from neutron.callbacks import events as callback_events
from neutron.callbacks import registry
from neutron.common import constants as n_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron import objects
LOG = logging.getLogger(__name__)
def create_consumers(endpoints, prefix, topic_details, start_listening=True):
@ -145,3 +154,115 @@ class PluginApi(object):
cctxt = self.client.prepare(version='1.4')
return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type, host=host)
def create_cache_for_l2_agent():
"""Create a push-notifications cache for L2 agent related resources."""
objects.register_objects()
resource_types = [
resources.PORT,
resources.SECURITYGROUP,
resources.SECURITYGROUPRULE,
resources.NETWORK,
resources.SUBNET
]
rcache = resource_cache.RemoteResourceCache(resource_types)
rcache.start_watcher()
# TODO(kevinbenton): ensure flood uses filters or that this has a long
# timeout before Pike release.
rcache.bulk_flood_cache()
return rcache
class CacheBackedPluginApi(PluginApi):
def __init__(self, *args, **kwargs):
super(CacheBackedPluginApi, self).__init__(*args, **kwargs)
self.remote_resource_cache = create_cache_for_l2_agent()
def register_legacy_notification_callbacks(self, legacy_interface):
"""Emulates the server-side notifications from ml2 AgentNotifierApi.
legacy_interface is an object with 'delete'/'update' methods for
core resources.
"""
self._legacy_interface = legacy_interface
for e in (callback_events.AFTER_UPDATE, callback_events.AFTER_DELETE):
for r in (resources.PORT, resources.NETWORK):
registry.subscribe(self._legacy_notifier, r, e)
def _legacy_notifier(self, rtype, event, trigger, context, resource_id,
**kwargs):
"""Checks if legacy interface is expecting calls for resource.
looks for port_update, network_delete, etc and calls them with
the payloads the handlers are expecting (an ID).
"""
rtype = rtype.lower() # all legacy handlers don't camelcase
is_delete = event == callback_events.AFTER_DELETE
suffix = 'delete' if is_delete else 'update'
method = "%s_%s" % (rtype, suffix)
if not hasattr(self._legacy_interface, method):
# TODO(kevinbenton): once these notifications are stable, emit
# a deprecation warning for legacy handlers
return
payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id}
getattr(self._legacy_interface, method)(context, **payload)
def get_devices_details_list_and_failed_devices(self, context, devices,
agent_id, host=None):
result = {'devices': [], 'failed_devices': []}
for device in devices:
try:
result['devices'].append(
self.get_device_details(context, device, agent_id, host))
except Exception:
LOG.exception("Failed to get details for device %s", device)
result['failed_devices'].append(device)
return result
def get_device_details(self, context, device, agent_id, host=None):
port_obj = self.remote_resource_cache.get_resource_by_id(
resources.PORT, device)
if not port_obj:
LOG.debug("Device %s does not exist in cache.", device)
return {'device': device}
if not port_obj.binding_levels:
LOG.warning("Device %s is not bound.", port_obj)
return {'device': device}
segment = port_obj.binding_levels[-1].segment
net = self.remote_resource_cache.get_resource_by_id(
resources.NETWORK, port_obj.network_id)
net_qos_policy_id = net.qos_policy_id
# match format of old RPC interface
mac_addr = str(netaddr.EUI(str(port_obj.mac_address),
dialect=netaddr.mac_unix_expanded))
entry = {
'device': device,
'network_id': port_obj.network_id,
'port_id': port_obj.id,
'mac_address': mac_addr,
'admin_state_up': port_obj.admin_state_up,
'network_type': segment.network_type,
'segmentation_id': segment.segmentation_id,
'physical_network': segment.physical_network,
'fixed_ips': [{'subnet_id': o.subnet_id,
'ip_address': str(o.ip_address)}
for o in port_obj.fixed_ips],
'device_owner': port_obj.device_owner,
'allowed_address_pairs': [{'mac_address': o.mac_address,
'ip_address': o.ip_address}
for o in port_obj.allowed_address_pairs],
'port_security_enabled': port_obj.security.port_security_enabled,
'qos_policy_id': port_obj.qos_policy_id,
'network_qos_policy_id': net_qos_policy_id,
'profile': port_obj.binding.profile,
'security_groups': list(port_obj.security_group_ids)
}
LOG.debug("Returning: %s", entry)
return entry
def get_devices_details_list(self, context, devices, agent_id, host=None):
return [self.get_device_details(context, device, agent_id, host)
for device in devices]

View File

@ -80,7 +80,7 @@ class _mac_mydialect(netaddr.mac_unix):
word_fmt = '%.2x'
class OVSPluginApi(agent_rpc.PluginApi):
class OVSPluginApi(agent_rpc.CacheBackedPluginApi):
pass
@ -360,6 +360,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def setup_rpc(self):
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
# allow us to receive port_update/delete callbacks from the cache
self.plugin_rpc.register_legacy_notification_callbacks(self)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
@ -367,13 +369,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# RPC network init
self.context = context.get_admin_context_without_session()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.PORT, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
consumers = [[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE],
[topics.NETWORK, topics.UPDATE]]
[topics.DVR, topics.UPDATE]]
if self.l2_pop:
consumers.append([topics.L2POPULATION, topics.UPDATE])
self.connection = agent_rpc.create_consumers([self],

View File

@ -42,6 +42,7 @@ from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \
NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi'
PULLAPI = 'neutron.api.rpc.handlers.resources_rpc.ResourcesPullRpcApi'
OVS_LINUX_KERN_VERS_WITHOUT_VXLAN = "3.12.0"
FAKE_MAC = '00:11:22:33:44:55'
@ -101,6 +102,7 @@ class TestOvsNeutronAgent(object):
def setUp(self):
super(TestOvsNeutronAgent, self).setUp()
self.useFixture(test_vlanmanager.LocalVlanManagerFixture())
mock.patch(PULLAPI).start()
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
@ -2165,6 +2167,7 @@ class AncillaryBridgesTest(object):
'neutron.agent.ovsdb.impl_idl._connection')
conn_patcher.start()
self.addCleanup(conn_patcher.stop)
mock.patch(PULLAPI).start()
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
@ -2284,6 +2287,7 @@ class TestOvsDvrNeutronAgent(object):
def setUp(self):
super(TestOvsDvrNeutronAgent, self).setUp()
mock.patch(PULLAPI).start()
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()

View File

@ -80,6 +80,9 @@ class TunnelTest(object):
conn_patcher = mock.patch(
'neutron.agent.ovsdb.impl_idl._connection')
conn_patcher.start()
mock.patch(
'neutron.api.rpc.handlers.resources_rpc.ResourcesPullRpcApi'
).start()
self.addCleanup(conn_patcher.stop)
cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver',