Make SecurityGroupsRpcCallback a separate callback class

RPC has a version of itself. In Neutron a plugin implements
several RPC interface, so a single RPC version doesn't work.
In Mixin callback class approach, RPC versioning depends on
each plugin implementation and it makes harder to maintain
RPC version appropriately. This patch series replaces mixin
RPC callback of server side with a separate class.

This commit handles server-side callback of security group
RPC interface.
* The server-side callback of Security group RPC is moved to
  api/rpc/handler and db/securitygroups_rpc_base now only
  contains a mixin class to add agent-based security group
  implementation with db operations.
* get_port_from_device method in server-side callback class
  is moved to a mixin class of plugin implementation
  (SecurityGroupServerRpcMixin) because it involves DB lookup
  and is tightly coupled with plugin implementation rather
  than RPC interface definition.

Most unit tests for SGServerRpcCallBackTestCase were skipped
in the base class before, but now they are no longer skipped.

The following items will be planned in later patches
to avoid drastic changes in a single patch.
* Merge security group RPC API and agent callback classes in
  agent/securitygroups_rpc into api/rpc/handlers/securitygroup_rpc
* Remove completely duplicated db access code in get_port_from_device
  and get_port_and_sgs

Partial-Bug: #1359416
Change-Id: Ia6535217d2e3b849a95667c1b53dd09675002892
This commit is contained in:
Akihiro Motoki 2014-08-23 18:16:18 +09:00 committed by shihanzhang
parent 974178601d
commit 2781fce238
22 changed files with 338 additions and 276 deletions

View File

@ -0,0 +1,59 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import rpc as n_rpc
from neutron import manager
# TODO(amotoki): Move security group RPC API and agent callback
# from securitygroups_rpc.py.
class SecurityGroupServerRpcCallback(n_rpc.RpcCallback):
"""Callback for SecurityGroup agent RPC in plugin implementations.
Subclass which inherits this class must implement get_port_from_device().
"""
# API version history:
# 1.1 - Initial version
# NOTE: RPC_API_VERSION must not be overridden in subclasses
# to keep RPC API version consistent across plugins.
RPC_API_VERSION = '1.1'
@property
def plugin(self):
return manager.NeutronManager.get_plugin()
def security_group_rules_for_devices(self, context, **kwargs):
"""Callback method to return security group rules for each port.
also convert remote_group_id rule
to source_ip_prefix and dest_ip_prefix rule
:params devices: list of devices
:returns: port correspond to the devices with security group rules
"""
devices = kwargs.get('devices')
ports = {}
for device in devices:
port = self.plugin.get_port_from_device(device)
if not port:
continue
if port['device_owner'].startswith('network:'):
continue
ports[port['id']] = port
return self.plugin.security_group_rules_for_ports(context, ports)

View File

@ -36,6 +36,27 @@ DIRECTION_IP_PREFIX = {'ingress': 'source_ip_prefix',
class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin): class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
"""Mixin class to add agent-based security group implementation."""
def get_port_from_device(self, device):
"""Get port dict from device name on an agent.
Subclass must provide this method.
:param device: device name which identifies a port on the agent side.
What is specified in "device" depends on a plugin agent implementation.
For example, it is a port ID in OVS agent and netdev name in Linux
Bridge agent.
:return: port dict returned by DB plugin get_port(). In addition,
it must contain the following fields in the port dict returned.
- device
- security_groups
- security_group_rules,
- security_group_source_groups
- fixed_ips
"""
raise NotImplementedError(_("%s must implement get_port_from_device.")
% self.__class__.__name__)
def create_security_group_rule(self, context, security_group_rule): def create_security_group_rule(self, context, security_group_rule):
bulk_rule = {'security_group_rules': [security_group_rule]} bulk_rule = {'security_group_rules': [security_group_rule]}
@ -128,33 +149,6 @@ class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
self.notifier.security_groups_member_updated( self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS)) context, port.get(ext_sg.SECURITYGROUPS))
class SecurityGroupServerRpcCallbackMixin(object):
"""A mix-in that enable SecurityGroup agent support in plugin
implementations.
"""
def security_group_rules_for_devices(self, context, **kwargs):
"""Return security group rules for each port.
also convert remote_group_id rule
to source_ip_prefix and dest_ip_prefix rule
:params devices: list of devices
:returns: port correspond to the devices with security group rules
"""
devices = kwargs.get('devices')
ports = {}
for device in devices:
port = self.get_port_from_device(device)
if not port:
continue
if port['device_owner'].startswith('network:'):
continue
ports[port['id']] = port
return self._security_group_rules_for_ports(context, ports)
def _select_rules_for_ports(self, context, ports): def _select_rules_for_ports(self, context, ports):
if not ports: if not ports:
return [] return []
@ -354,7 +348,7 @@ class SecurityGroupServerRpcCallbackMixin(object):
self._add_ingress_ra_rule(port, ips_ra) self._add_ingress_ra_rule(port, ips_ra)
self._add_ingress_dhcp_rule(port, ips_dhcp) self._add_ingress_dhcp_rule(port, ips_dhcp)
def _security_group_rules_for_ports(self, context, ports): def security_group_rules_for_ports(self, context, ports):
rules_in_db = self._select_rules_for_ports(context, ports) rules_in_db = self._select_rules_for_ports(context, ports)
for (binding, rule_in_db) in rules_in_db: for (binding, rule_in_db) in rules_in_db:
port_id = binding['port_id'] port_id = binding['port_id']

View File

@ -56,6 +56,7 @@ from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api import extensions as neutron_extensions from neutron.api import extensions as neutron_extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as const from neutron.common import constants as const
from neutron.common import exceptions from neutron.common import exceptions
from neutron.common import rpc as n_rpc from neutron.common import rpc as n_rpc
@ -72,7 +73,7 @@ from neutron.db import extradhcpopt_db
from neutron.db import l3_db from neutron.db import l3_db
from neutron.db import models_v2 from neutron.db import models_v2
from neutron.db import securitygroups_db as sg_db from neutron.db import securitygroups_db as sg_db
from neutron.db import securitygroups_rpc_base as sg_rpc_base from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import allowedaddresspairs as addr_pair from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.extensions import external_net from neutron.extensions import external_net
from neutron.extensions import extra_dhcp_opt as edo_ext from neutron.extensions import extra_dhcp_opt as edo_ext
@ -113,10 +114,7 @@ class AgentNotifierApi(n_rpc.RpcProxy,
topic=self.topic_port_update) topic=self.topic_port_update)
class RestProxyCallbacks(n_rpc.RpcCallback, class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
sg_rpc_base.SecurityGroupServerRpcCallbackMixin):
RPC_API_VERSION = '1.1'
def get_port_from_device(self, device): def get_port_from_device(self, device):
port_id = re.sub(r"^tap", "", device) port_id = re.sub(r"^tap", "", device)
@ -454,7 +452,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
addr_pair_db.AllowedAddressPairsMixin, addr_pair_db.AllowedAddressPairsMixin,
extradhcpopt_db.ExtraDhcpOptMixin, extradhcpopt_db.ExtraDhcpOptMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin, agentschedulers_db.DhcpAgentSchedulerDbMixin,
sg_rpc_base.SecurityGroupServerRpcMixin): SecurityGroupServerRpcMixin):
_supported_extension_aliases = ["external-net", "router", "binding", _supported_extension_aliases = ["external-net", "router", "binding",
"router_rules", "extra_dhcp_opt", "quotas", "router_rules", "extra_dhcp_opt", "quotas",
@ -509,7 +507,7 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
self.agent_notifiers[const.AGENT_TYPE_DHCP] = ( self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
self._dhcp_agent_notifier self._dhcp_agent_notifier
) )
self.endpoints = [RestProxyCallbacks(), self.endpoints = [securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(), dhcp_rpc.DhcpRpcCallback(),
agents_db.AgentExtRpcCallback()] agents_db.AgentExtRpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints, self.conn.create_consumer(self.topic, self.endpoints,

View File

@ -30,6 +30,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc from neutron.api.rpc.handlers import l3_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as q_const from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc from neutron.common import rpc as n_rpc
from neutron.common import topics from neutron.common import topics
@ -57,6 +58,7 @@ LOG = logging.getLogger(__name__)
PLUGIN_VERSION = 0.88 PLUGIN_VERSION = 0.88
AGENT_OWNER_PREFIX = "network:" AGENT_OWNER_PREFIX = "network:"
NOS_DRIVER = 'neutron.plugins.brocade.nos.nosdriver.NOSdriver' NOS_DRIVER = 'neutron.plugins.brocade.nos.nosdriver.NOSdriver'
TAP_PREFIX_LEN = 3
SWITCH_OPTS = [cfg.StrOpt('address', default='', SWITCH_OPTS = [cfg.StrOpt('address', default='',
help=_('The address of the host to SSH to')), help=_('The address of the host to SSH to')),
@ -77,8 +79,7 @@ cfg.CONF.register_opts(SWITCH_OPTS, "SWITCH")
cfg.CONF.register_opts(PHYSICAL_INTERFACE_OPTS, "PHYSICAL_INTERFACE") cfg.CONF.register_opts(PHYSICAL_INTERFACE_OPTS, "PHYSICAL_INTERFACE")
class BridgeRpcCallbacks(n_rpc.RpcCallback, class BridgeRpcCallbacks(n_rpc.RpcCallback):
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
"""Agent callback.""" """Agent callback."""
RPC_API_VERSION = '1.2' RPC_API_VERSION = '1.2'
@ -86,32 +87,6 @@ class BridgeRpcCallbacks(n_rpc.RpcCallback,
# history # history
# 1.1 Support Security Group RPC # 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list # 1.2 Support get_devices_details_list
TAP_PREFIX_LEN = 3
@classmethod
def get_port_from_device(cls, device):
"""Get port from the brocade specific db."""
# TODO(shh) context is not being passed as
# an argument to this function;
#
# need to be fixed in:
# file: neutron/db/securtygroups_rpc_base.py
# function: securitygroup_rules_for_devices()
# which needs to pass context to us
# Doing what other plugins are doing
session = db.get_session()
port = brocade_db.get_port_from_device(
session, device[cls.TAP_PREFIX_LEN:])
# TODO(shiv): need to extend the db model to include device owners
# make it appears that the device owner is of type network
if port:
port['device'] = device
port['device_owner'] = AGENT_OWNER_PREFIX
port['binding:vif_type'] = 'bridge'
return port
def get_device_details(self, rpc_context, **kwargs): def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details.""" """Agent requests device details."""
@ -120,7 +95,7 @@ class BridgeRpcCallbacks(n_rpc.RpcCallback,
device = kwargs.get('device') device = kwargs.get('device')
LOG.debug(_("Device %(device)s details requested from %(agent_id)s"), LOG.debug(_("Device %(device)s details requested from %(agent_id)s"),
{'device': device, 'agent_id': agent_id}) {'device': device, 'agent_id': agent_id})
port = brocade_db.get_port(rpc_context, device[self.TAP_PREFIX_LEN:]) port = brocade_db.get_port(rpc_context, device[TAP_PREFIX_LEN:])
if port: if port:
entry = {'device': device, entry = {'device': device,
'vlan_id': port.vlan_id, 'vlan_id': port.vlan_id,
@ -163,6 +138,34 @@ class BridgeRpcCallbacks(n_rpc.RpcCallback,
return entry return entry
class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
@classmethod
def get_port_from_device(cls, device):
"""Get port from the brocade specific db."""
# TODO(shh) context is not being passed as
# an argument to this function;
#
# need to be fixed in:
# file: neutron/db/securtygroups_rpc_base.py
# function: securitygroup_rules_for_devices()
# which needs to pass context to us
# Doing what other plugins are doing
session = db.get_session()
port = brocade_db.get_port_from_device(
session, device[TAP_PREFIX_LEN:])
# TODO(shiv): need to extend the db model to include device owners
# make it appears that the device owner is of type network
if port:
port['device'] = device
port['device_owner'] = AGENT_OWNER_PREFIX
port['binding:vif_type'] = 'bridge'
return port
class AgentNotifierApi(n_rpc.RpcProxy, class AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin): sg_rpc.SecurityGroupAgentRpcApiMixin):
"""Agent side of the linux bridge rpc API. """Agent side of the linux bridge rpc API.
@ -205,7 +208,7 @@ class AgentNotifierApi(n_rpc.RpcProxy,
class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2, class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin, external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin, extraroute_db.ExtraRoute_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin, SecurityGroupServerRpcMixin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin, l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin, agentschedulers_db.DhcpAgentSchedulerDbMixin,
portbindings_base.PortBindingBaseMixin): portbindings_base.PortBindingBaseMixin):
@ -262,6 +265,7 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
is_admin=False) is_admin=False)
self.conn = n_rpc.create_connection(new=True) self.conn = n_rpc.create_connection(new=True)
self.endpoints = [BridgeRpcCallbacks(), self.endpoints = [BridgeRpcCallbacks(),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(), dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(), l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()] agents_db.AgentExtRpcCallback()]

View File

@ -22,6 +22,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc from neutron.api.rpc.handlers import l3_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import constants as q_const from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc from neutron.common import exceptions as n_exc
@ -52,24 +53,16 @@ from neutron.plugins.linuxbridge.db import l2network_db_v2 as db
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
# Device names start with "tap"
TAP_PREFIX_LEN = 3
class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback):
):
# history # history
# 1.1 Support Security Group RPC # 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list # 1.2 Support get_devices_details_list
RPC_API_VERSION = '1.2' RPC_API_VERSION = '1.2'
# Device names start with "tap"
TAP_PREFIX_LEN = 3
@classmethod
def get_port_from_device(cls, device):
port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:])
if port:
port['device'] = device
return port
def get_device_details(self, rpc_context, **kwargs): def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details.""" """Agent requests device details."""
@ -77,7 +70,8 @@ class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback,
device = kwargs.get('device') device = kwargs.get('device')
LOG.debug(_("Device %(device)s details requested from %(agent_id)s"), LOG.debug(_("Device %(device)s details requested from %(agent_id)s"),
{'device': device, 'agent_id': agent_id}) {'device': device, 'agent_id': agent_id})
port = self.get_port_from_device(device) plugin = manager.NeutronManager.get_plugin()
port = plugin.get_port_from_device(device)
if port: if port:
binding = db.get_network_binding(db_api.get_session(), binding = db.get_network_binding(db_api.get_session(),
port['network_id']) port['network_id'])
@ -117,10 +111,10 @@ class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback,
agent_id = kwargs.get('agent_id') agent_id = kwargs.get('agent_id')
device = kwargs.get('device') device = kwargs.get('device')
host = kwargs.get('host') host = kwargs.get('host')
port = self.get_port_from_device(device)
LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"), LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
{'device': device, 'agent_id': agent_id}) {'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port = plugin.get_port_from_device(device)
if port: if port:
entry = {'device': device, entry = {'device': device,
'exists': True} 'exists': True}
@ -143,10 +137,10 @@ class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback,
agent_id = kwargs.get('agent_id') agent_id = kwargs.get('agent_id')
device = kwargs.get('device') device = kwargs.get('device')
host = kwargs.get('host') host = kwargs.get('host')
port = self.get_port_from_device(device)
LOG.debug(_("Device %(device)s up on %(agent_id)s"), LOG.debug(_("Device %(device)s up on %(agent_id)s"),
{'device': device, 'agent_id': agent_id}) {'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port = plugin.get_port_from_device(device)
if port: if port:
if (host and if (host and
not plugin.get_port_host(rpc_context, port['id']) == host): not plugin.get_port_host(rpc_context, port['id']) == host):
@ -283,6 +277,7 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN} svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = n_rpc.create_connection(new=True) self.conn = n_rpc.create_connection(new=True)
self.endpoints = [LinuxBridgeRpcCallbacks(), self.endpoints = [LinuxBridgeRpcCallbacks(),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(), dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(), l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()] agents_db.AgentExtRpcCallback()]
@ -542,3 +537,10 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.notifier.port_update(context, port, self.notifier.port_update(context, port,
binding.physical_network, binding.physical_network,
binding.vlan_id) binding.vlan_id)
@classmethod
def get_port_from_device(cls, device):
port = db.get_port_from_device(device[TAP_PREFIX_LEN:])
if port:
port['device'] = device
return port

View File

@ -25,6 +25,7 @@ from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import dvr_rpc from neutron.api.rpc.handlers import dvr_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import constants as const from neutron.common import constants as const
from neutron.common import exceptions as exc from neutron.common import exceptions as exc
@ -51,6 +52,7 @@ from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils from neutron.openstack.common import jsonutils
from neutron.openstack.common import lockutils from neutron.openstack.common import lockutils
from neutron.openstack.common import log from neutron.openstack.common import log
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as service_constants from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2.common import exceptions as ml2_exc from neutron.plugins.ml2.common import exceptions as ml2_exc
from neutron.plugins.ml2 import config # noqa from neutron.plugins.ml2 import config # noqa
@ -69,6 +71,9 @@ MAX_BIND_TRIES = 10
# providernet.py? # providernet.py?
TYPE_MULTI_SEGMENT = 'multi-segment' TYPE_MULTI_SEGMENT = 'multi-segment'
TAP_DEVICE_PREFIX = 'tap'
TAP_DEVICE_PREFIX_LENGTH = 3
class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
dvr_mac_db.DVRDbMixin, dvr_mac_db.DVRDbMixin,
@ -136,6 +141,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def start_rpc_listeners(self): def start_rpc_listeners(self):
self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager), self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dvr_rpc.DVRServerRpcCallback(), dvr_rpc.DVRServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(), dhcp_rpc.DhcpRpcCallback(),
agents_db.AgentExtRpcCallback()] agents_db.AgentExtRpcCallback()]
@ -1083,3 +1089,25 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
else: else:
port_host = db.get_port_binding_host(port_id) port_host = db.get_port_binding_host(port_id)
return (port_host == host) return (port_host == host)
def get_port_from_device(self, device):
port_id = self._device_to_port_id(device)
port = db.get_port_and_sgs(port_id)
if port:
port['device'] = device
return port
def _device_to_port_id(self, device):
# REVISIT(rkukura): Consider calling into MechanismDrivers to
# process device names, or having MechanismDrivers supply list
# of device prefixes to strip.
if device.startswith(TAP_DEVICE_PREFIX):
return device[TAP_DEVICE_PREFIX_LENGTH:]
else:
# REVISIT(irenab): Consider calling into bound MD to
# handle the get_device_details RPC, then remove the 'else' clause
if not uuidutils.is_uuid_like(device):
port = db.get_port_from_device_mac(device)
if port:
return port.id
return device

View File

@ -20,13 +20,10 @@ from neutron.common import exceptions
from neutron.common import rpc as n_rpc from neutron.common import rpc as n_rpc
from neutron.common import topics from neutron.common import topics
from neutron.common import utils from neutron.common import utils
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings from neutron.extensions import portbindings
from neutron import manager from neutron import manager
from neutron.openstack.common import log from neutron.openstack.common import log
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as service_constants from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2 import db
from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel from neutron.plugins.ml2.drivers import type_tunnel
# REVISIT(kmestery): Allow the type and mechanism drivers to supply the # REVISIT(kmestery): Allow the type and mechanism drivers to supply the
@ -34,12 +31,8 @@ from neutron.plugins.ml2.drivers import type_tunnel
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
TAP_DEVICE_PREFIX = 'tap'
TAP_DEVICE_PREFIX_LENGTH = 3
class RpcCallbacks(n_rpc.RpcCallback, class RpcCallbacks(n_rpc.RpcCallback,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin,
type_tunnel.TunnelRpcCallbackMixin): type_tunnel.TunnelRpcCallbackMixin):
RPC_API_VERSION = '1.3' RPC_API_VERSION = '1.3'
@ -53,30 +46,6 @@ class RpcCallbacks(n_rpc.RpcCallback,
self.setup_tunnel_callback_mixin(notifier, type_manager) self.setup_tunnel_callback_mixin(notifier, type_manager)
super(RpcCallbacks, self).__init__() super(RpcCallbacks, self).__init__()
@classmethod
def _device_to_port_id(cls, device):
# REVISIT(rkukura): Consider calling into MechanismDrivers to
# process device names, or having MechanismDrivers supply list
# of device prefixes to strip.
if device.startswith(TAP_DEVICE_PREFIX):
return device[TAP_DEVICE_PREFIX_LENGTH:]
else:
# REVISIT(irenab): Consider calling into bound MD to
# handle the get_device_details RPC, then remove the 'else' clause
if not uuidutils.is_uuid_like(device):
port = db.get_port_from_device_mac(device)
if port:
return port.id
return device
@classmethod
def get_port_from_device(cls, device):
port_id = cls._device_to_port_id(device)
port = db.get_port_and_sgs(port_id)
if port:
port['device'] = device
return port
def get_device_details(self, rpc_context, **kwargs): def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details.""" """Agent requests device details."""
agent_id = kwargs.get('agent_id') agent_id = kwargs.get('agent_id')
@ -85,9 +54,9 @@ class RpcCallbacks(n_rpc.RpcCallback,
LOG.debug("Device %(device)s details requested by agent " LOG.debug("Device %(device)s details requested by agent "
"%(agent_id)s with host %(host)s", "%(agent_id)s with host %(host)s",
{'device': device, 'agent_id': agent_id, 'host': host}) {'device': device, 'agent_id': agent_id, 'host': host})
port_id = self._device_to_port_id(device)
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port_id = plugin._device_to_port_id(device)
port_context = plugin.get_bound_port_context(rpc_context, port_context = plugin.get_bound_port_context(rpc_context,
port_id, port_id,
host) host)
@ -152,7 +121,7 @@ class RpcCallbacks(n_rpc.RpcCallback,
"%(agent_id)s"), "%(agent_id)s"),
{'device': device, 'agent_id': agent_id}) {'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port_id = self._device_to_port_id(device) port_id = plugin._device_to_port_id(device)
port_exists = True port_exists = True
if (host and not plugin.port_bound_to_host(rpc_context, if (host and not plugin.port_bound_to_host(rpc_context,
port_id, host)): port_id, host)):
@ -177,7 +146,7 @@ class RpcCallbacks(n_rpc.RpcCallback,
LOG.debug(_("Device %(device)s up at agent %(agent_id)s"), LOG.debug(_("Device %(device)s up at agent %(agent_id)s"),
{'device': device, 'agent_id': agent_id}) {'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port_id = self._device_to_port_id(device) port_id = plugin._device_to_port_id(device)
if (host and not plugin.port_bound_to_host(rpc_context, if (host and not plugin.port_bound_to_host(rpc_context,
port_id, host)): port_id, host)):
LOG.debug(_("Device %(device)s not bound to the" LOG.debug(_("Device %(device)s not bound to the"

View File

@ -22,6 +22,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc from neutron.api.rpc.handlers import l3_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import constants as q_const from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc from neutron.common import exceptions as n_exc
@ -51,6 +52,9 @@ from neutron.plugins.mlnx import rpc_callbacks
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
#to be compatible with Linux Bridge Agent on Network Node
TAP_PREFIX_LEN = 3
class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2, class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin, external_net_db.External_net_db_mixin,
@ -122,6 +126,7 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN} svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = n_rpc.create_connection(new=True) self.conn = n_rpc.create_connection(new=True)
self.endpoints = [rpc_callbacks.MlnxRpcCallbacks(), self.endpoints = [rpc_callbacks.MlnxRpcCallbacks(),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(), dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(), l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()] agents_db.AgentExtRpcCallback()]
@ -515,3 +520,20 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
# now that we've left db transaction, we are safe to notify # now that we've left db transaction, we are safe to notify
self.notify_routers_updated(context, router_ids) self.notify_routers_updated(context, router_ids)
self.notify_security_groups_member_updated(context, port) self.notify_security_groups_member_updated(context, port)
@classmethod
def get_port_from_device(cls, device):
"""Get port according to device.
To maintain compatibility with Linux Bridge L2 Agent for DHCP/L3
services get device either by linux bridge plugin
device name convention or by mac address
"""
port = db.get_port_from_device(device[TAP_PREFIX_LEN:])
if port:
port['device'] = device
else:
port = db.get_port_from_device_mac(device)
if port:
port['device'] = device
return port

View File

@ -17,40 +17,18 @@ from oslo.config import cfg
from neutron.common import constants as q_const from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc from neutron.common import rpc as n_rpc
from neutron.db import api as db_api from neutron.db import api as db_api
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
from neutron.plugins.mlnx.db import mlnx_db_v2 as db from neutron.plugins.mlnx.db import mlnx_db_v2 as db
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class MlnxRpcCallbacks(n_rpc.RpcCallback, class MlnxRpcCallbacks(n_rpc.RpcCallback):
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
# History # History
# 1.1 Support Security Group RPC # 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list # 1.2 Support get_devices_details_list
RPC_API_VERSION = '1.2' RPC_API_VERSION = '1.2'
#to be compatible with Linux Bridge Agent on Network Node
TAP_PREFIX_LEN = 3
@classmethod
def get_port_from_device(cls, device):
"""Get port according to device.
To maintain compatibility with Linux Bridge L2 Agent for DHCP/L3
services get device either by linux bridge plugin
device name convention or by mac address
"""
port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:])
if port:
port['device'] = device
else:
port = db.get_port_from_device_mac(device)
if port:
port['device'] = device
return port
def get_device_details(self, rpc_context, **kwargs): def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details.""" """Agent requests device details."""
agent_id = kwargs.get('agent_id') agent_id = kwargs.get('agent_id')

View File

@ -19,6 +19,7 @@ from neutron.api import extensions as neutron_extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc from neutron.api.rpc.handlers import l3_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes as attrs from neutron.api.v2 import attributes as attrs
from neutron.common import constants as const from neutron.common import constants as const
from neutron.common import exceptions as n_exc from neutron.common import exceptions as n_exc
@ -53,10 +54,23 @@ from neutron.plugins.nec import packet_filter
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
@staticmethod
def get_port_from_device(device):
port = ndb.get_port_from_device(device)
if port:
port['device'] = device
LOG.debug("NECPluginV2.get_port_from_device() called, "
"device=%(device)s => %(ret)s.",
{'device': device, 'ret': port})
return port
class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2, class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin, external_net_db.External_net_db_mixin,
nec_router.RouterMixin, nec_router.RouterMixin,
sg_db_rpc.SecurityGroupServerRpcMixin, SecurityGroupServerRpcMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin, agentschedulers_db.DhcpAgentSchedulerDbMixin,
nec_router.L3AgentSchedulerDbMixin, nec_router.L3AgentSchedulerDbMixin,
packet_filter.PacketFilterMixin, packet_filter.PacketFilterMixin,
@ -143,7 +157,7 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
) )
# NOTE: callback_sg is referred to from the sg unit test. # NOTE: callback_sg is referred to from the sg unit test.
self.callback_sg = SecurityGroupServerRpcCallback() self.callback_sg = securitygroups_rpc.SecurityGroupServerRpcCallback()
self.endpoints = [ self.endpoints = [
NECPluginV2RPCCallbacks(self.safe_reference), NECPluginV2RPCCallbacks(self.safe_reference),
dhcp_rpc.DhcpRpcCallback(), dhcp_rpc.DhcpRpcCallback(),
@ -680,23 +694,6 @@ class NECPluginV2AgentNotifierApi(n_rpc.RpcProxy,
topic=self.topic_port_update) topic=self.topic_port_update)
class SecurityGroupServerRpcCallback(
n_rpc.RpcCallback,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
RPC_API_VERSION = sg_rpc.SG_RPC_VERSION
@staticmethod
def get_port_from_device(device):
port = ndb.get_port_from_device(device)
if port:
port['device'] = device
LOG.debug(_("NECPluginV2RPCCallbacks.get_port_from_device() called, "
"device=%(device)s => %(ret)s."),
{'device': device, 'ret': port})
return port
class NECPluginV2RPCCallbacks(n_rpc.RpcCallback): class NECPluginV2RPCCallbacks(n_rpc.RpcCallback):
RPC_API_VERSION = '1.0' RPC_API_VERSION = '1.0'

View File

@ -23,6 +23,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc from neutron.api.rpc.handlers import l3_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as q_const from neutron.common import constants as q_const
from neutron.common import exceptions as nexception from neutron.common import exceptions as nexception
from neutron.common import rpc as n_rpc from neutron.common import rpc as n_rpc
@ -51,10 +52,7 @@ LOG = logging.getLogger(__name__)
IPv6 = 6 IPv6 = 6
class NVSDPluginRpcCallbacks(n_rpc.RpcCallback, class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
RPC_API_VERSION = '1.1'
@staticmethod @staticmethod
def get_port_from_device(device): def get_port_from_device(device):
@ -88,7 +86,7 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin, external_net_db.External_net_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin, l3_gwmode_db.L3_NAT_db_mixin,
portbindings_base.PortBindingBaseMixin, portbindings_base.PortBindingBaseMixin,
sg_db_rpc.SecurityGroupServerRpcMixin): SecurityGroupServerRpcMixin):
"""L2 Virtual Network Plugin. """L2 Virtual Network Plugin.
@ -159,7 +157,7 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.agent_notifiers[q_const.AGENT_TYPE_L3] = ( self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotifyAPI() l3_rpc_agent_api.L3AgentNotifyAPI()
) )
self.endpoints = [NVSDPluginRpcCallbacks(), self.endpoints = [securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(), dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(), l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()] agents_db.AgentExtRpcCallback()]

View File

@ -22,6 +22,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc from neutron.api.rpc.handlers import l3_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import constants as q_const from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc from neutron.common import exceptions as n_exc
@ -58,8 +59,7 @@ from neutron.plugins.openvswitch import ovs_db_v2
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class OVSRpcCallbacks(n_rpc.RpcCallback, class OVSRpcCallbacks(n_rpc.RpcCallback):
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
# history # history
# 1.0 Initial version # 1.0 Initial version
@ -73,13 +73,6 @@ class OVSRpcCallbacks(n_rpc.RpcCallback,
self.notifier = notifier self.notifier = notifier
self.tunnel_type = tunnel_type self.tunnel_type = tunnel_type
@classmethod
def get_port_from_device(cls, device):
port = ovs_db_v2.get_port_from_device(device)
if port:
port['device'] = device
return port
def get_device_details(self, rpc_context, **kwargs): def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details.""" """Agent requests device details."""
agent_id = kwargs.get('agent_id') agent_id = kwargs.get('agent_id')
@ -183,6 +176,16 @@ class OVSRpcCallbacks(n_rpc.RpcCallback,
return entry return entry
class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
@classmethod
def get_port_from_device(cls, device):
port = ovs_db_v2.get_port_from_device(device)
if port:
port['device'] = device
return port
class AgentNotifierApi(n_rpc.RpcProxy, class AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin): sg_rpc.SecurityGroupAgentRpcApiMixin):
'''Agent side of the openvswitch rpc API. '''Agent side of the openvswitch rpc API.
@ -236,7 +239,7 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin, external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin, extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin, l3_gwmode_db.L3_NAT_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin, SecurityGroupServerRpcMixin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin, l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin, agentschedulers_db.DhcpAgentSchedulerDbMixin,
portbindings_db.PortBindingMixin, portbindings_db.PortBindingMixin,
@ -344,6 +347,7 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
l3_rpc_agent_api.L3AgentNotifyAPI() l3_rpc_agent_api.L3AgentNotifyAPI()
) )
self.endpoints = [OVSRpcCallbacks(self.notifier, self.tunnel_type), self.endpoints = [OVSRpcCallbacks(self.notifier, self.tunnel_type),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(), dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback(), l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()] agents_db.AgentExtRpcCallback()]

View File

@ -22,6 +22,7 @@ from ryu.app import rest_nw_id
from neutron.agent import securitygroups_rpc as sg_rpc from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc from neutron.api.rpc.handlers import l3_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as q_const from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc from neutron.common import rpc as n_rpc
@ -45,8 +46,17 @@ from neutron.plugins.ryu.db import api_v2 as db_api_v2
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class RyuRpcCallbacks(n_rpc.RpcCallback, class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
@classmethod
def get_port_from_device(cls, device):
port = db_api_v2.get_port_from_device(device)
if port:
port['device'] = device
return port
class RyuRpcCallbacks(n_rpc.RpcCallback):
RPC_API_VERSION = '1.1' RPC_API_VERSION = '1.1'
@ -58,13 +68,6 @@ class RyuRpcCallbacks(n_rpc.RpcCallback,
LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr) LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
return self.ofp_rest_api_addr return self.ofp_rest_api_addr
@classmethod
def get_port_from_device(cls, device):
port = db_api_v2.get_port_from_device(device)
if port:
port['device'] = device
return port
class AgentNotifierApi(n_rpc.RpcProxy, class AgentNotifierApi(n_rpc.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin): sg_rpc.SecurityGroupAgentRpcApiMixin):
@ -88,7 +91,7 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin, external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin, extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin, l3_gwmode_db.L3_NAT_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin, SecurityGroupServerRpcMixin,
portbindings_base.PortBindingBaseMixin): portbindings_base.PortBindingBaseMixin):
_supported_extension_aliases = ["external-net", "router", "ext-gw-mode", _supported_extension_aliases = ["external-net", "router", "ext-gw-mode",
@ -138,6 +141,7 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.conn = n_rpc.create_connection(new=True) self.conn = n_rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT) self.notifier = AgentNotifierApi(topics.AGENT)
self.endpoints = [RyuRpcCallbacks(self.ofp_api_host), self.endpoints = [RyuRpcCallbacks(self.ofp_api_host),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(), dhcp_rpc.DhcpRpcCallback(),
l3_rpc.L3RpcCallback()] l3_rpc.L3RpcCallback()]
for svc_topic in self.service_topics.values(): for svc_topic in self.service_topics.values():

View File

@ -36,7 +36,7 @@ class RestProxySecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase,
self.startHttpPatch() self.startHttpPatch()
class TestSecServerRpcCallBack(test_sg_rpc.SGServerRpcCallBackMixinTestCase, class TestSecServerRpcCallBack(test_sg_rpc.SGServerRpcCallBackTestCase,
RestProxySecurityGroupsTestCase): RestProxySecurityGroupsTestCase):
pass pass

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import contextlib
import mock import mock
from oslo.config import cfg from oslo.config import cfg
@ -95,11 +93,9 @@ class TestLinuxBridgePluginRpcCallbacks(test_plugin.NeutronDbPluginV2TestCase):
self.callbacks = lb_neutron_plugin.LinuxBridgeRpcCallbacks() self.callbacks = lb_neutron_plugin.LinuxBridgeRpcCallbacks()
def test_update_device_down(self): def test_update_device_down(self):
with contextlib.nested( with mock.patch.object(manager.NeutronManager, "get_plugin") as gp:
mock.patch.object(self.callbacks, "get_port_from_device", plugin = gp.return_value
return_value=None), plugin.get_port_from_device.return_value = None
mock.patch.object(manager.NeutronManager, "get_plugin")
) as (gpfd, gp):
self.assertEqual( self.assertEqual(
self.callbacks.update_device_down("fake_context", self.callbacks.update_device_down("fake_context",
agent_id="123", agent_id="123",
@ -107,7 +103,8 @@ class TestLinuxBridgePluginRpcCallbacks(test_plugin.NeutronDbPluginV2TestCase):
host="host"), host="host"),
{'device': 'device', 'exists': False} {'device': 'device', 'exists': False}
) )
gpfd.return_value = {'id': 'fakeid', plugin.get_port_from_device.return_value = {
'id': 'fakeid',
'status': q_const.PORT_STATUS_ACTIVE} 'status': q_const.PORT_STATUS_ACTIVE}
self.assertEqual( self.assertEqual(
self.callbacks.update_device_down("fake_context", self.callbacks.update_device_down("fake_context",
@ -118,15 +115,13 @@ class TestLinuxBridgePluginRpcCallbacks(test_plugin.NeutronDbPluginV2TestCase):
) )
def test_update_device_up(self): def test_update_device_up(self):
with contextlib.nested( with mock.patch.object(manager.NeutronManager, "get_plugin") as gp:
mock.patch.object(self.callbacks, "get_port_from_device", plugin = gp.return_value
return_value=None), plugin.get_port_from_device.return_value = {
mock.patch.object(manager.NeutronManager, "get_plugin") 'id': 'fakeid',
) as (gpfd, gp):
gpfd.return_value = {'id': 'fakeid',
'status': q_const.PORT_STATUS_ACTIVE} 'status': q_const.PORT_STATUS_ACTIVE}
self.callbacks.update_device_up("fake_context", self.callbacks.update_device_up("fake_context",
agent_id="123", agent_id="123",
device="device", device="device",
host="host") host="host")
gpfd.assert_called_once_with('device') plugin.get_port_from_device.assert_called_once_with('device')

View File

@ -42,7 +42,8 @@ class RpcCallbacksTestCase(base.BaseTestCase):
} }
def _test_update_device_up(self, extensions, kwargs): def _test_update_device_up(self, extensions, kwargs):
with mock.patch.object(self.callbacks, '_device_to_port_id'): with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin'
'._device_to_port_id'):
type(self.l3plugin).supported_extension_aliases = ( type(self.l3plugin).supported_extension_aliases = (
mock.PropertyMock(return_value=extensions)) mock.PropertyMock(return_value=extensions))
self.callbacks.update_device_up(mock.ANY, **kwargs) self.callbacks.update_device_up(mock.ANY, **kwargs)

View File

@ -74,8 +74,7 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
req.get_response(self.api)) req.get_response(self.api))
port_id = res['port']['id'] port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
callbacks = plugin.endpoints[0] port_dict = plugin.get_port_from_device(port_id)
port_dict = callbacks.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id']) self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id], self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS]) port_dict[ext_sg.SECURITYGROUPS])
@ -86,7 +85,7 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
def test_security_group_get_port_from_device_with_no_port(self): def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id') port_dict = plugin.get_port_from_device('bad_device_id')
self.assertIsNone(port_dict) self.assertIsNone(port_dict)
@ -96,11 +95,11 @@ class TestMl2SecurityGroupsXML(TestMl2SecurityGroups):
class TestMl2SGServerRpcCallBack( class TestMl2SGServerRpcCallBack(
Ml2SecurityGroupsTestCase, Ml2SecurityGroupsTestCase,
test_sg_rpc.SGServerRpcCallBackMixinTestCase): test_sg_rpc.SGServerRpcCallBackTestCase):
pass pass
class TestMl2SGServerRpcCallBackXML( class TestMl2SGServerRpcCallBackXML(
Ml2SecurityGroupsTestCase, Ml2SecurityGroupsTestCase,
test_sg_rpc.SGServerRpcCallBackMixinTestCaseXML): test_sg_rpc.SGServerRpcCallBackTestCaseXML):
pass pass

View File

@ -51,13 +51,13 @@ class NecSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
class TestNecSGServerRpcCallBack( class TestNecSGServerRpcCallBack(
test_sg_rpc.SGServerRpcCallBackMixinTestCase, test_sg_rpc.SGServerRpcCallBackTestCase,
NecSecurityGroupsTestCase): NecSecurityGroupsTestCase):
pass pass
class TestNecSGServerRpcCallBackXML( class TestNecSGServerRpcCallBackXML(
test_sg_rpc.SGServerRpcCallBackMixinTestCaseXML, test_sg_rpc.SGServerRpcCallBackTestCaseXML,
NecSecurityGroupsTestCase): NecSecurityGroupsTestCase):
pass pass
@ -84,7 +84,7 @@ class TestNecSecurityGroups(NecSecurityGroupsTestCase,
req.get_response(self.api)) req.get_response(self.api))
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.callback_sg.get_port_from_device(port_id) port_dict = plugin.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id']) self.assertEqual(port_id, port_dict['id'])
self.assertEqual([sg_id], self.assertEqual([sg_id],
port_dict[ext_sg.SECURITYGROUPS]) port_dict[ext_sg.SECURITYGROUPS])

View File

@ -68,15 +68,13 @@ class OneConvergenceSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
class TestOneConvergenceSGServerRpcCallBack( class TestOneConvergenceSGServerRpcCallBack(
OneConvergenceSecurityGroupsTestCase, OneConvergenceSecurityGroupsTestCase,
test_sg_rpc.SGServerRpcCallBackMixinTestCase): test_sg_rpc.SGServerRpcCallBackTestCase):
pass pass
class TestOneConvergenceSGServerRpcCallBackXML( class TestOneConvergenceSGServerRpcCallBackXML(
OneConvergenceSecurityGroupsTestCase, OneConvergenceSecurityGroupsTestCase,
test_sg_rpc.SGServerRpcCallBackMixinTestCaseXML): test_sg_rpc.SGServerRpcCallBackTestCaseXML):
pass pass
@ -103,8 +101,7 @@ class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase,
req.get_response(self.api)) req.get_response(self.api))
port_id = res['port']['id'] port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
callbacks = plugin.endpoints[0] port_dict = plugin.get_port_from_device(port_id)
port_dict = callbacks.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id']) self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id], self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS]) port_dict[ext_sg.SECURITYGROUPS])
@ -116,7 +113,7 @@ class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase,
def test_security_group_get_port_from_device_with_no_port(self): def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id') port_dict = plugin.get_port_from_device('bad_device_id')
self.assertIsNone(port_dict) self.assertIsNone(port_dict)

View File

@ -50,13 +50,13 @@ class OpenvswitchSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
class TestOpenvswitchSGServerRpcCallBack( class TestOpenvswitchSGServerRpcCallBack(
OpenvswitchSecurityGroupsTestCase, OpenvswitchSecurityGroupsTestCase,
test_sg_rpc.SGServerRpcCallBackMixinTestCase): test_sg_rpc.SGServerRpcCallBackTestCase):
pass pass
class TestOpenvswitchSGServerRpcCallBackXML( class TestOpenvswitchSGServerRpcCallBackXML(
OpenvswitchSecurityGroupsTestCase, OpenvswitchSecurityGroupsTestCase,
test_sg_rpc.SGServerRpcCallBackMixinTestCaseXML): test_sg_rpc.SGServerRpcCallBackTestCaseXML):
pass pass
@ -82,8 +82,7 @@ class TestOpenvswitchSecurityGroups(OpenvswitchSecurityGroupsTestCase,
req.get_response(self.api)) req.get_response(self.api))
port_id = res['port']['id'] port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
callbacks = plugin.endpoints[0] port_dict = plugin.get_port_from_device(port_id)
port_dict = callbacks.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id']) self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id], self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS]) port_dict[ext_sg.SECURITYGROUPS])
@ -94,7 +93,7 @@ class TestOpenvswitchSecurityGroups(OpenvswitchSecurityGroupsTestCase,
def test_security_group_get_port_from_device_with_no_port(self): def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id') port_dict = plugin.get_port_from_device('bad_device_id')
self.assertIsNone(port_dict) self.assertIsNone(port_dict)

View File

@ -71,7 +71,7 @@ class TestRyuSecurityGroups(RyuSecurityGroupsTestCase,
req.get_response(self.api)) req.get_response(self.api))
port_id = res['port']['id'] port_id = res['port']['id']
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.endpoints[0].get_port_from_device(port_id) port_dict = plugin.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id']) self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id], self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS]) port_dict[ext_sg.SECURITYGROUPS])
@ -82,7 +82,7 @@ class TestRyuSecurityGroups(RyuSecurityGroupsTestCase,
def test_security_group_get_port_from_device_with_no_port(self): def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.NeutronManager.get_plugin() plugin = manager.NeutronManager.get_plugin()
port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id') port_dict = plugin.get_port_from_device('bad_device_id')
self.assertIsNone(port_dict) self.assertIsNone(port_dict)

View File

@ -25,6 +25,7 @@ from neutron.agent import firewall as firewall_base
from neutron.agent.linux import iptables_manager from neutron.agent.linux import iptables_manager
from neutron.agent import rpc as agent_rpc from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as const from neutron.common import constants as const
from neutron.common import ipv6_utils as ipv6 from neutron.common import ipv6_utils as ipv6
from neutron.common import rpc as n_rpc from neutron.common import rpc as n_rpc
@ -45,7 +46,38 @@ FAKE_IP = {const.IPv4: '10.0.0.1',
'IPv6_LLA': 'fe80::123'} 'IPv6_LLA': 'fe80::123'}
class FakeSGCallback(sg_db_rpc.SecurityGroupServerRpcCallbackMixin): TEST_PLUGIN_CLASS = ('neutron.tests.unit.test_security_groups_rpc.'
'SecurityGroupRpcTestPlugin')
class SecurityGroupRpcTestPlugin(test_sg.SecurityGroupTestPlugin,
sg_db_rpc.SecurityGroupServerRpcMixin):
def __init__(self):
super(SecurityGroupRpcTestPlugin, self).__init__()
self.notifier = mock.Mock()
self.devices = {}
def create_port(self, context, port):
result = super(SecurityGroupRpcTestPlugin,
self).create_port(context, port)
self.devices[result['id']] = result
self.notify_security_groups_member_updated(context, result)
return result
def update_port(self, context, id, port):
original_port = self.get_port(context, id)
updated_port = super(SecurityGroupRpcTestPlugin,
self).update_port(context, id, port)
self.devices[id] = updated_port
self.update_security_group_on_port(
context, id, port, original_port, updated_port)
def delete_port(self, context, id):
port = self.get_port(context, id)
super(SecurityGroupRpcTestPlugin, self).delete_port(context, id)
self.notify_security_groups_member_updated(context, port)
del self.devices[id]
def get_port_from_device(self, device): def get_port_from_device(self, device):
device = self.devices.get(device) device = self.devices.get(device)
if device: if device:
@ -56,13 +88,15 @@ class FakeSGCallback(sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
return device return device
class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase): class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
def setUp(self, plugin=None): def setUp(self, plugin=None):
plugin = plugin or TEST_PLUGIN_CLASS
cfg.CONF.set_default('firewall_driver', cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver', 'neutron.agent.firewall.NoopFirewallDriver',
group='SECURITYGROUP') group='SECURITYGROUP')
super(SGServerRpcCallBackMixinTestCase, self).setUp(plugin) super(SGServerRpcCallBackTestCase, self).setUp(plugin)
self.rpc = FakeSGCallback() self.notifier = manager.NeutronManager.get_plugin().notifier
self.rpc = securitygroups_rpc.SecurityGroupServerRpcCallback()
def _test_security_group_port(self, device_owner, gw_ip, def _test_security_group_port(self, device_owner, gw_ip,
cidr, ip_version, ip_address): cidr, ip_version, ip_address):
@ -71,9 +105,6 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
gateway_ip=gw_ip, gateway_ip=gw_ip,
cidr=cidr, cidr=cidr,
ip_version=ip_version) as subnet: ip_version=ip_version) as subnet:
with mock.patch.object(
self.notifier,
'security_groups_provider_updated') as mock_notifier:
kwargs = { kwargs = {
'fixed_ips': [{'subnet_id': subnet['subnet']['id'], 'fixed_ips': [{'subnet_id': subnet['subnet']['id'],
'ip_address': ip_address}]} 'ip_address': ip_address}]}
@ -89,75 +120,58 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
res = self.deserialize(self.fmt, res = self.deserialize(self.fmt,
req.get_response(self.api)) req.get_response(self.api))
self._delete('ports', port_id) self._delete('ports', port_id)
return mock_notifier
def test_notify_security_group_ipv6_gateway_port_added(self): def test_notify_security_group_ipv6_gateway_port_added(self):
if getattr(self, "notifier", None) is None: self._test_security_group_port(
self.skipTest("Notifier mock is not set so security group "
"RPC calls can't be tested")
mock_notifier = self._test_security_group_port(
const.DEVICE_OWNER_ROUTER_INTF, const.DEVICE_OWNER_ROUTER_INTF,
'2001:0db8::1', '2001:0db8::1',
'2001:0db8::/64', '2001:0db8::/64',
6, 6,
'2001:0db8::1') '2001:0db8::1')
self.assertTrue(mock_notifier.called) self.assertTrue(self.notifier.security_groups_provider_updated.called)
def test_notify_security_group_ipv6_normal_port_added(self): def test_notify_security_group_ipv6_normal_port_added(self):
if getattr(self, "notifier", None) is None: self._test_security_group_port(
self.skipTest("Notifier mock is not set so security group "
"RPC calls can't be tested")
mock_notifier = self._test_security_group_port(
None, None,
'2001:0db8::1', '2001:0db8::1',
'2001:0db8::/64', '2001:0db8::/64',
6, 6,
'2001:0db8::3') '2001:0db8::3')
self.assertFalse(mock_notifier.called) self.assertFalse(self.notifier.security_groups_provider_updated.called)
def test_notify_security_group_ipv4_dhcp_port_added(self): def test_notify_security_group_ipv4_dhcp_port_added(self):
if getattr(self, "notifier", None) is None: self._test_security_group_port(
self.skipTest("Notifier mock is not set so security group "
"RPC calls can't be tested")
mock_notifier = self._test_security_group_port(
const.DEVICE_OWNER_DHCP, const.DEVICE_OWNER_DHCP,
'192.168.1.1', '192.168.1.1',
'192.168.1.0/24', '192.168.1.0/24',
4, 4,
'192.168.1.2') '192.168.1.2')
self.assertTrue(mock_notifier.called) self.assertTrue(self.notifier.security_groups_provider_updated.called)
def test_notify_security_group_ipv4_gateway_port_added(self): def test_notify_security_group_ipv4_gateway_port_added(self):
if getattr(self, "notifier", None) is None: self._test_security_group_port(
self.skipTest("Notifier mock is not set so security group "
"RPC calls can't be tested")
mock_notifier = self._test_security_group_port(
const.DEVICE_OWNER_ROUTER_INTF, const.DEVICE_OWNER_ROUTER_INTF,
'192.168.1.1', '192.168.1.1',
'192.168.1.0/24', '192.168.1.0/24',
4, 4,
'192.168.1.1') '192.168.1.1')
self.assertFalse(mock_notifier.called) self.assertFalse(self.notifier.security_groups_provider_updated.called)
def test_notify_security_group_ipv4_normal_port_added(self): def test_notify_security_group_ipv4_normal_port_added(self):
if getattr(self, "notifier", None) is None: self._test_security_group_port(
self.skipTest("Notifier mock is not set so security group "
"RPC calls can't be tested")
mock_notifier = self._test_security_group_port(
None, None,
'192.168.1.1', '192.168.1.1',
'192.168.1.0/24', '192.168.1.0/24',
4, 4,
'192.168.1.3') '192.168.1.3')
self.assertFalse(mock_notifier.called) self.assertFalse(self.notifier.security_groups_provider_updated.called)
def test_security_group_rules_for_devices_ipv4_ingress(self): def test_security_group_rules_for_devices_ipv4_ingress(self):
fake_prefix = FAKE_PREFIX[const.IPv4] fake_prefix = FAKE_PREFIX[const.IPv4]
with self.network() as n: with self.network() as n:
with contextlib.nested(self.subnet(n), with contextlib.nested(
self.security_group()) as (subnet_v4, self.subnet(n),
sg1): self.security_group()) as (subnet_v4, sg1):
sg1_id = sg1['security_group']['id'] sg1_id = sg1['security_group']['id']
rule1 = self._build_security_group_rule( rule1 = self._build_security_group_rule(
sg1_id, sg1_id,
@ -829,7 +843,7 @@ class SGServerRpcCallBackMixinTestCase(test_sg.SecurityGroupDBTestCase):
self._delete('ports', port_id2) self._delete('ports', port_id2)
class SGServerRpcCallBackMixinTestCaseXML(SGServerRpcCallBackMixinTestCase): class SGServerRpcCallBackTestCaseXML(SGServerRpcCallBackTestCase):
fmt = 'xml' fmt = 'xml'