Merge "Server-side push notifications for ML2"

This commit is contained in:
Jenkins 2017-01-20 08:41:34 +00:00 committed by Gerrit Code Review
commit 0c05d49949
4 changed files with 233 additions and 0 deletions

View File

@ -10,7 +10,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.objects import network
from neutron.objects import ports
from neutron.objects.qos import policy
from neutron.objects import securitygroup
from neutron.objects import subnet
from neutron.objects import trunk
@ -18,12 +22,21 @@ from neutron.objects import trunk
TRUNK = trunk.Trunk.obj_name()
QOS_POLICY = policy.QosPolicy.obj_name()
SUBPORT = trunk.SubPort.obj_name()
PORT = ports.Port.obj_name()
NETWORK = network.Network.obj_name()
SECURITYGROUP = securitygroup.SecurityGroup.obj_name()
SECURITYGROUPRULE = securitygroup.SecurityGroupRule.obj_name()
_VALID_CLS = (
policy.QosPolicy,
trunk.Trunk,
trunk.SubPort,
ports.Port,
subnet.Subnet,
network.Network,
securitygroup.SecurityGroup,
securitygroup.SecurityGroupRule,
)
_TYPE_TO_CLS_MAP = {cls.obj_name(): cls for cls in _VALID_CLS}

View File

@ -0,0 +1,122 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import traceback
from oslo_log import log as logging
from neutron._i18n import _LE
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.handlers import resources_rpc
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.db import api as db_api
from neutron.objects import network
from neutron.objects import ports
from neutron.objects import securitygroup
from neutron.objects import subnet
LOG = logging.getLogger(__name__)
class _ObjectChangeHandler(object):
def __init__(self, resource, object_class, resource_push_api):
self._resource = resource
self._obj_class = object_class
self._resource_push_api = resource_push_api
for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
events.AFTER_DELETE):
registry.subscribe(self.handle_event, resource, event)
@staticmethod
def _is_session_semantic_violated(context, resource, event):
"""Return True and print an ugly error on transaction violation.
This code is to print ugly errors when AFTER_CREATE/UPDATE
event transaction semantics are violated by other parts of
the code.
"""
if not context.session.is_active:
return False
stack = traceback.extract_stack()
stack = "".join(traceback.format_list(stack))
LOG.error(_LE("This handler is supposed to handle AFTER "
"events, as in 'AFTER it's committed', "
"not BEFORE. Offending resource event: "
"%(r)s, %(e)s. Location:\n%(l)s"),
{'r': resource, 'e': event, 'l': stack})
return True
def handle_event(self, resource, event, trigger,
context, *args, **kwargs):
"""Callback handler for resource change that pushes change to RPC.
We always retrieve the latest state and ignore what was in the
payload to ensure that we don't get any stale data.
"""
if self._is_session_semantic_violated(context, resource, event):
return
resource_id = self._extract_resource_id(kwargs)
# attempt to get regardless of event type so concurrent delete
# after create/update is the same code-path as a delete event
with db_api.context_manager.independent.reader.using(context):
obj = self._obj_class.get_object(context, id=resource_id)
# CREATE events are always treated as UPDATE events to ensure
# listeners are written to handle out-of-order messages
if obj is None:
rpc_event = rpc_events.DELETED
# construct a fake object with the right ID so we can
# have a payload for the delete message.
obj = self._obj_class(id=resource_id)
else:
rpc_event = rpc_events.UPDATED
LOG.debug("Dispatching RPC callback event %s for %s %s.",
rpc_event, self._resource, resource_id)
self._resource_push_api.push(context, [obj], rpc_event)
def _extract_resource_id(self, callback_kwargs):
id_kwarg = '%s_id' % self._resource
if id_kwarg in callback_kwargs:
return callback_kwargs[id_kwarg]
if self._resource in callback_kwargs:
return callback_kwargs[self._resource]['id']
raise RuntimeError("Couldn't find resource ID in callback event")
class OVOServerRpcInterface(object):
"""ML2 server-side RPC interface.
Generates RPC callback notifications on ML2 object changes.
TODO(kevinbenton): interface to query server for these objects
"""
def __init__(self):
self._rpc_pusher = resources_rpc.ResourcesPushRpcApi()
self._setup_change_handlers()
LOG.debug("ML2 OVO RPC backend initialized.")
def _setup_change_handlers(self):
"""Setup all of the local callback listeners for resource changes."""
resource_objclass_map = {
resources.PORT: ports.Port,
resources.SUBNET: subnet.Subnet,
resources.NETWORK: network.Network,
resources.SECURITY_GROUP: securitygroup.SecurityGroup,
resources.SECURITY_GROUP_RULE: securitygroup.SecurityGroupRule,
}
self._resource_handlers = {
res: _ObjectChangeHandler(res, obj_class, self._rpc_pusher)
for res, obj_class in resource_objclass_map.items()
}

View File

@ -80,6 +80,7 @@ from neutron.plugins.ml2 import driver_context
from neutron.plugins.ml2.extensions import qos as qos_ext
from neutron.plugins.ml2 import managers
from neutron.plugins.ml2 import models
from neutron.plugins.ml2 import ovo_rpc
from neutron.plugins.ml2 import rpc
from neutron.quota import resource_registry
from neutron.services.qos import qos_consts
@ -242,6 +243,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@log_helpers.log_method_call
def _start_rpc_notifiers(self):
"""Initialize RPC notifiers for agents."""
self.ovo_notifier = ovo_rpc.OVOServerRpcInterface()
self.notifier = rpc.AgentNotifierApi(topics.AGENT)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()

View File

@ -0,0 +1,96 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib.plugins import directory
from neutron import context
from neutron.objects import network
from neutron.objects import securitygroup
from neutron.objects import subnet
from neutron.tests.unit.plugins.ml2 import test_plugin
class OVOServerRpcInterfaceTestCase(test_plugin.Ml2PluginV2TestCase):
def setUp(self):
super(OVOServerRpcInterfaceTestCase, self).setUp()
self.plugin = directory.get_plugin()
self.ctx = context.get_admin_context()
self.received = []
receive = lambda s, ctx, obs, evt: self.received.append((obs[0], evt))
mock.patch('neutron.api.rpc.handlers.resources_rpc.'
'ResourcesPushRpcApi.push', new=receive).start()
def _assert_object_received(self, ovotype, oid=None, event=None):
for obj, evt in self.received:
if isinstance(obj, ovotype):
if (obj.id == oid or not oid) and (not event or event == evt):
return obj
self.fail("Could not find OVO %s with ID %s in %s" %
(ovotype, oid, self.received))
def test_network_lifecycle(self):
with self.network() as n:
self._assert_object_received(network.Network,
n['network']['id'],
'updated')
self.plugin.delete_network(self.ctx, n['network']['id'])
self._assert_object_received(network.Network,
n['network']['id'],
'deleted')
def test_subnet_lifecycle(self):
with self.subnet() as s:
self._assert_object_received(subnet.Subnet,
s['subnet']['id'],
'updated')
self.plugin.delete_subnet(self.ctx, s['subnet']['id'])
self._assert_object_received(subnet.Subnet,
s['subnet']['id'],
'deleted')
def test_securitygroup_and_rule_lifecycle(self):
# making a network makes a default security group
with self.network() as n:
sg = self._assert_object_received(securitygroup.SecurityGroup,
event='updated')
self.assertEqual(sg.tenant_id, n['network']['tenant_id'])
sgr = self.plugin.create_security_group_rule(self.ctx,
{'security_group_rule': {'security_group_id': sg.id,
'tenant_id': sg.tenant_id,
'port_range_min': None,
'port_range_max': None,
'remote_ip_prefix': None,
'remote_group_id': None,
'protocol': None,
'direction': None,
'ethertype': 'IPv4'}})
self._assert_object_received(
securitygroup.SecurityGroupRule, sgr['id'], 'updated')
self.plugin.delete_security_group_rule(self.ctx, sgr['id'])
self._assert_object_received(
securitygroup.SecurityGroupRule, sgr['id'], 'deleted')
self.plugin.delete_security_group(self.ctx, sg.id)
self._assert_object_received(securitygroup.SecurityGroup, sg.id,
'deleted')
def test_transaction_state_error_doesnt_notify(self):
# running in a transaction should cause it to skip notification since
# fresh reads aren't possible.
with self.ctx.session.begin():
self.plugin.create_security_group(
self.ctx, {'security_group': {'tenant_id': 'test',
'description': 'desc',
'name': 'test'}})
self.assertEqual([], self.received)