Add recent neutron extentions and IB support

Additions to Mellanox Plugin:
Add ML2 compatibility support.
Enable Infiniband Network Type.
Keep the compatibility to Linux Bridge Plugin to allow Network node deployment via Linux Bridge L2 Agent.
Add support for Host Port Binding and L3/DHCP Agent Scheduler.

Implements: blueprint mlnx-plugin-improvments

Change-Id: Ieca518bbd07a39258cd14ceeecd3f9d3e2b6f219
This commit is contained in:
Irena Berezovsky 2013-08-11 09:19:03 +03:00
parent 6eae300755
commit 58ca4c40b9
13 changed files with 482 additions and 111 deletions

View File

@ -31,7 +31,7 @@
# (StrOpt) Type of Network Interface to allocate for VM:
# direct or hosdev according to libvirt terminology
# vnic_type = direct
# vnic_type = mlnx_direct
# (StrOpt) Eswitch daemon end point connection url
# daemon_endpoint = 'tcp://127.0.0.1:5001'
@ -44,3 +44,8 @@
[agent]
# Agent's polling interval in seconds
# polling_interval = 2
# (BoolOpt) Enable server RPC compatibility with old (pre-havana)
# agents.
#
# rpc_support_old_agents = True

View File

@ -68,6 +68,7 @@ AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent'
AGENT_TYPE_NEC = 'NEC plugin agent'
AGENT_TYPE_L3 = 'L3 agent'
AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent'
AGENT_TYPE_MLNX = 'Mellanox plugin agent'
L2_AGENT_TOPIC = 'N/A'
PAGINATION_INFINITE = 'infinite'

View File

@ -24,6 +24,7 @@ import eventlet
from oslo.config import cfg
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as logging_config
from neutron.common import constants as q_constants
from neutron.common import topics
@ -31,6 +32,7 @@ from neutron.common import utils as q_utils
from neutron import context
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.mlnx.agent import utils
from neutron.plugins.mlnx.common import config # noqa
@ -100,8 +102,9 @@ class EswitchManager(object):
net_map = self.network_map[network_id]
net_map['ports'].append({'port_id': port_id, 'port_mac': port_mac})
if network_type == constants.TYPE_VLAN:
LOG.info(_('Binding VLAN ID %(seg_id)s'
if network_type in (constants.TYPE_VLAN,
constants.TYPE_IB):
LOG.info(_('Binding Segmentation ID %(seg_id)s'
'to eSwitch for vNIC mac_address %(mac)s'),
{'seg_id': seg_id,
'mac': port_mac})
@ -109,8 +112,6 @@ class EswitchManager(object):
seg_id,
port_mac)
self.utils.port_up(physical_network, port_mac)
elif network_type == constants.TYPE_IB:
LOG.debug(_('Network Type IB currently not supported'))
else:
LOG.error(_('Unsupported network type %s'), network_type)
@ -131,7 +132,7 @@ class EswitchManager(object):
if network_type == constants.TYPE_VLAN:
LOG.debug(_("creating VLAN Network"))
elif network_type == constants.TYPE_IB:
LOG.debug(_("currently IB network provisioning is not supported"))
LOG.debug(_("creating IB Network"))
else:
LOG.error(_("Unknown network type %(network_type) "
"for network %(network_id)"),
@ -146,14 +147,18 @@ class EswitchManager(object):
self.network_map[network_id] = data
class MlnxEswitchRpcCallbacks():
class MlnxEswitchRpcCallbacks(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
# history
# 1.1 Support Security Group RPC
RPC_API_VERSION = '1.1'
def __init__(self, context, eswitch):
def __init__(self, context, agent):
self.context = context
self.eswitch = eswitch
self.agent = agent
self.eswitch = agent.eswitch
self.sg_agent = agent
def network_delete(self, context, **kwargs):
LOG.debug(_("network_delete received"))
@ -167,22 +172,39 @@ class MlnxEswitchRpcCallbacks():
def port_update(self, context, **kwargs):
LOG.debug(_("port_update received"))
port = kwargs.get('port')
vlan_id = kwargs.get('vlan_id')
physical_network = kwargs.get('physical_network')
net_type = kwargs.get('network_type')
segmentation_id = kwargs.get('segmentation_id')
if not segmentation_id:
# compatibility with pre-Havana RPC vlan_id encoding
segmentation_id = kwargs.get('vlan_id')
physical_network = kwargs.get('physical_network')
net_id = port['network_id']
if self.eswitch.vnic_port_exists(port['mac_address']):
if 'security_groups' in port:
self.sg_agent.refresh_firewall()
try:
if port['admin_state_up']:
self.eswitch.port_up(net_id,
net_type,
physical_network,
vlan_id,
segmentation_id,
port['id'],
port['mac_address'])
# update plugin about port status
self.agent.plugin_rpc.update_device_up(self.context,
port['mac_address'],
self.agent.agent_id)
else:
self.eswitch.port_down(net_id,
physical_network,
port['mac_address'])
# update plugin about port status
self.agent.plugin_rpc.update_device_down(
self.context,
port['mac_address'],
self.agent.agent_id)
except rpc_common.Timeout:
LOG.error(_("RPC timeout while updating port %s"), port['id'])
else:
LOG.debug(_("No port %s defined on agent."), port['id'])
@ -196,9 +218,14 @@ class MlnxEswitchRpcCallbacks():
return dispatcher.RpcDispatcher([self])
class MlnxEswitchNeutronAgent(object):
class MlnxEswitchPluginApi(agent_rpc.PluginApi,
sg_rpc.SecurityGroupServerRpcApiMixin):
pass
class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
#RPC_API_VERSION = '1.0'
def __init__(self, interface_mapping):
self._polling_interval = cfg.CONF.AGENT.polling_interval
@ -208,9 +235,10 @@ class MlnxEswitchNeutronAgent(object):
'host': cfg.CONF.host,
'topic': q_constants.L2_AGENT_TOPIC,
'configurations': interface_mapping,
'agent_type': 'eSwitch agent',
'agent_type': q_constants.AGENT_TYPE_MLNX,
'start_flag': True}
self._setup_rpc()
self.init_firewall()
def _setup_eswitches(self, interface_mapping):
daemon = cfg.CONF.ESWITCH.daemon_endpoint
@ -229,17 +257,21 @@ class MlnxEswitchNeutronAgent(object):
def _setup_rpc(self):
self.agent_id = 'mlnx-agent.%s' % socket.gethostname()
LOG.info(_("RPC agent_id: %s"), self.agent_id)
self.topic = topics.AGENT
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.plugin_rpc = MlnxEswitchPluginApi(topics.PLUGIN)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.callbacks = MlnxEswitchRpcCallbacks(self.context, self.eswitch)
self.callbacks = MlnxEswitchRpcCallbacks(self.context,
self)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE]]
[topics.NETWORK, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
@ -262,10 +294,10 @@ class MlnxEswitchNeutronAgent(object):
def process_network_ports(self, port_info):
resync_a = False
resync_b = False
if 'added' in port_info:
if port_info.get('added'):
LOG.debug(_("ports added!"))
resync_a = self.treat_devices_added(port_info['added'])
if 'removed' in port_info:
if port_info.get('removed'):
LOG.debug(_("ports removed!"))
resync_b = self.treat_devices_removed(port_info['removed'])
# If one of the above opertaions fails => resync with plugin
@ -334,9 +366,9 @@ class MlnxEswitchNeutronAgent(object):
continue
if dev_details['exists']:
LOG.info(_("Port %s updated."), device)
self.eswitch.port_release(device)
else:
LOG.debug(_("Device %s not defined on plugin"), device)
self.eswitch.port_release(device)
return resync
def daemon_loop(self):
@ -356,7 +388,7 @@ class MlnxEswitchNeutronAgent(object):
port_info = self.update_ports(ports)
# notify plugin about port deltas
if port_info:
LOG.debug(_("Agent loop has new devices!"))
LOG.debug(_("Agent loop process devices!"))
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info)
ports = port_info['current']

View File

@ -14,7 +14,9 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import topics
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import proxy
@ -22,17 +24,21 @@ from neutron.openstack.common.rpc import proxy
LOG = logging.getLogger(__name__)
class AgentNotifierApi(proxy.RpcProxy):
class AgentNotifierApi(proxy.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
"""Agent side of the Embedded Switch RPC API.
API version history:
1.0 - Initial version.
1.1 - Added get_active_networks_info, create_dhcp_port,
and update_dhcp_port methods.
"""
BASE_RPC_API_VERSION = '1.0'
BASE_RPC_API_VERSION = '1.1'
def __init__(self, topic):
super(AgentNotifierApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.topic = topic
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
topics.DELETE)
@ -50,10 +56,12 @@ class AgentNotifierApi(proxy.RpcProxy):
def port_update(self, context, port, physical_network,
network_type, vlan_id):
LOG.debug(_("Sending update port message"))
self.fanout_cast(context,
self.make_msg('port_update',
port=port,
physical_network=physical_network,
network_type=network_type,
vlan_id=vlan_id),
kwargs = {'port': port,
'network_type': network_type,
'physical_network': physical_network,
'segmentation_id': vlan_id}
if cfg.CONF.AGENT.rpc_support_old_agents:
kwargs['vlan_id'] = vlan_id
msg = self.make_msg('port_update', **kwargs)
self.fanout_cast(context, msg,
topic=self.topic_port_update)

View File

@ -40,7 +40,8 @@ eswitch_opts = [
help=_("List of <physical_network>:<physical_interface>")),
cfg.StrOpt('vnic_type',
default=constants.VIF_TYPE_DIRECT,
help=_("type of VM network interface: direct or hosdev")),
help=_("type of VM network interface: mlnx_direct or "
"hostdev")),
cfg.StrOpt('daemon_endpoint',
default='tcp://127.0.0.1:5001',
help=_('eswitch daemon end point')),
@ -53,6 +54,8 @@ agent_opts = [
cfg.IntOpt('polling_interval', default=2,
help=_("The number of seconds the agent will wait between "
"polling for local device changes.")),
cfg.BoolOpt('rpc_support_old_agents', default=True,
help=_("Enable server RPC compatibility with old agents")),
]

View File

@ -25,7 +25,7 @@ TYPE_VLAN = 'vlan'
TYPE_IB = 'ib'
TYPE_NONE = 'none'
VIF_TYPE_DIRECT = 'direct'
VIF_TYPE_DIRECT = 'mlnx_direct'
VIF_TYPE_HOSTDEV = 'hostdev'
VNIC_TYPE = 'vnic_type'

View File

@ -20,6 +20,8 @@ from sqlalchemy.orm import exc
from neutron.common import exceptions as q_exc
import neutron.db.api as db
from neutron.db import models_v2
from neutron.db import securitygroups_db as sg_db
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.plugins.mlnx.common import config # noqa
from neutron.plugins.mlnx.db import mlnx_models_v2
@ -115,6 +117,7 @@ def reserve_network(session):
with session.begin(subtransactions=True):
entry = (session.query(mlnx_models_v2.SegmentationIdAllocation).
filter_by(allocated=False).
with_lockmode('update').
first())
if not entry:
raise q_exc.NoNetworkAvailable()
@ -133,6 +136,7 @@ def reserve_specific_network(session, physical_network, segmentation_id):
entry = (session.query(mlnx_models_v2.SegmentationIdAllocation).
filter_by(physical_network=physical_network,
segmentation_id=segmentation_id).
with_lockmode('update').
one())
if entry.allocated:
raise q_exc.VlanIdInUse(vlan_id=segmentation_id,
@ -194,9 +198,8 @@ def add_network_binding(session, network_id, network_type,
def get_network_binding(session, network_id):
qry = session.query(mlnx_models_v2.NetworkBinding)
qry = qry.filter_by(network_id=network_id)
return qry.first()
return (session.query(mlnx_models_v2.NetworkBinding).
filter_by(network_id=network_id).first())
def add_port_profile_binding(session, port_id, vnic_type):
@ -206,18 +209,35 @@ def add_port_profile_binding(session, port_id, vnic_type):
def get_port_profile_binding(session, port_id):
qry = session.query(mlnx_models_v2.PortProfileBinding)
return qry.filter_by(port_id=port_id).first()
return (session.query(mlnx_models_v2.PortProfileBinding).
filter_by(port_id=port_id).first())
def get_port_from_device(device):
"""Get port from database."""
LOG.debug(_("get_port_from_device() called"))
session = db.get_session()
ports = session.query(models_v2.Port).all()
for port in ports:
if port['id'].startswith(device):
return port
sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
query = session.query(models_v2.Port,
sg_db.SecurityGroupPortBinding.security_group_id)
query = query.outerjoin(sg_db.SecurityGroupPortBinding,
models_v2.Port.id == sg_binding_port)
query = query.filter(models_v2.Port.id.startswith(device))
port_and_sgs = query.all()
if not port_and_sgs:
return
port = port_and_sgs[0][0]
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict['security_groups'] = [
sg_id for port_in_db, sg_id in port_and_sgs if sg_id
]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address']
for ip in port['fixed_ips']]
return port_dict
def get_port_from_device_mac(device_mac):

View File

@ -20,18 +20,23 @@ import sys
from oslo.config import cfg
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as q_exc
from neutron.common import topics
from neutron.common import utils
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import db_base_plugin_v2
from neutron.db import l3_db
from neutron.db import extraroute_db
from neutron.db import l3_gwmode_db
from neutron.db import portbindings_db
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings
from neutron.extensions import providernet as provider
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.plugins.common import utils as plugin_utils
@ -44,14 +49,25 @@ LOG = logging.getLogger(__name__)
class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
l3_db.L3_NAT_db_mixin,
agents_db.AgentDbMixin,
sg_db_rpc.SecurityGroupServerRpcMixin):
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin,
agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
portbindings_db.PortBindingMixin):
"""Realization of Neutron API on Mellanox HCA embedded switch technology.
Current plugin provides embedded HCA Switch connectivity.
Code is based on the Linux Bridge plugin content to
support consistency with L3 & DHCP Agents.
A new VLAN is created for each network. An agent is relied upon
to perform the actual HCA configuration on each host.
The provider extension is also supported.
The port binding extension enables an external application relay
information to and from the plugin.
"""
# This attribute specifies whether the plugin supports or not
@ -59,8 +75,11 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
# is qualified by class
__native_bulk_support = True
_supported_extension_aliases = ["provider", "router", "binding",
"agent", "quotas", "security-group"]
_supported_extension_aliases = ["provider", "router", "ext-gw-mode",
"binding", "quotas", "security-group",
"agent", "extraroute",
"l3_agent_scheduler",
"dhcp_agent_scheduler"]
@property
def supported_extension_aliases(self):
@ -70,11 +89,6 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
self._aliases = aliases
return self._aliases
network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set"
binding_view = "extension:port_binding:view"
binding_set = "extension:port_binding:set"
def __init__(self):
"""Start Mellanox Neutron Plugin."""
db.initialize()
@ -82,20 +96,37 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
db.sync_network_states(self.network_vlan_ranges)
self._set_tenant_network_type()
self.vnic_type = cfg.CONF.ESWITCH.vnic_type
self.base_binding_dict = {
portbindings.VIF_TYPE: self.vnic_type,
portbindings.CAPABILITIES: {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}}
self._setup_rpc()
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver
)
LOG.debug(_("Mellanox Embedded Switch Plugin initialisation complete"))
def _setup_rpc(self):
# RPC support
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
self.callbacks = rpc_callbacks.MlnxRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotify
)
def _parse_network_vlan_ranges(self):
try:
@ -219,16 +250,36 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
raise q_exc.InvalidInput(error_message=msg)
return physical_network
def _check_port_binding_for_net_type(self, vnic_type, net_type):
if net_type == constants.TYPE_VLAN:
return vnic_type in (constants.VIF_TYPE_DIRECT,
constants.VIF_TYPE_HOSTDEV)
elif net_type == constants.TYPE_IB:
return vnic_type == constants.VIF_TYPE_HOSTDEV
return False
def _process_port_binding_create(self, context, attrs):
binding_profile = attrs.get(portbindings.PROFILE)
binding_profile_set = attributes.is_attr_set(binding_profile)
net_binding = db.get_network_binding(context.session,
attrs.get('network_id'))
net_type = net_binding.network_type
if not binding_profile_set:
return self.vnic_type
if constants.VNIC_TYPE in binding_profile:
req_vnic_type = binding_profile[constants.VNIC_TYPE]
if req_vnic_type in (constants.VIF_TYPE_DIRECT,
vnic_type = binding_profile[constants.VNIC_TYPE]
if vnic_type in (constants.VIF_TYPE_DIRECT,
constants.VIF_TYPE_HOSTDEV):
return req_vnic_type
if self._check_port_binding_for_net_type(vnic_type,
net_type):
self.base_binding_dict[portbindings.VIF_TYPE] = vnic_type
return vnic_type
else:
msg = (_("unsupported vnic type %(vnic_type)s "
"for network type %(net_type)s") %
{'vnic_type': vnic_type, 'net_type': net_type})
else:
msg = _("invalid vnic_type on port_create")
else:
@ -241,6 +292,11 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
network['network'])
session = context.session
with session.begin(subtransactions=True):
#set up default security groups
tenant_id = self._get_tenant_id_for_create(
context, network['network'])
self._ensure_default_security_group(context, tenant_id)
if not network_type:
# tenant network
network_type = self.tenant_network_type
@ -272,7 +328,9 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
return net
def update_network(self, context, net_id, network):
LOG.debug(_("update network"))
provider._raise_if_updates_provider_attributes(network['network'])
session = context.session
with session.begin(subtransactions=True):
net = super(MellanoxEswitchPlugin, self).update_network(context,
@ -306,16 +364,16 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
self._extend_network_dict_provider(context, net)
return self._fields(net, fields)
def get_networks(self, context, filters=None, fields=None):
def get_networks(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
session = context.session
with session.begin(subtransactions=True):
nets = super(MellanoxEswitchPlugin, self).get_networks(context,
filters,
None)
nets = super(MellanoxEswitchPlugin,
self).get_networks(context, filters, None, sorts,
limit, marker, page_reverse)
for net in nets:
self._extend_network_dict_provider(context, net)
# TODO(rkukura): Filter on extended provider attributes.
nets = self._filter_nets_l3(context, nets, filters)
return [self._fields(net, fields) for net in nets]
def _extend_port_dict_binding(self, context, port):
@ -323,9 +381,6 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
port['id'])
if port_binding:
port[portbindings.VIF_TYPE] = port_binding.vnic_type
port[portbindings.CAPABILITIES] = {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}
binding = db.get_network_binding(context.session,
port['network_id'])
fabric = binding.physical_network
@ -334,38 +389,76 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
def create_port(self, context, port):
LOG.debug(_("create_port with %s"), port)
vnic_type = self._process_port_binding_create(context, port['port'])
port = super(MellanoxEswitchPlugin, self).create_port(context, port)
session = context.session
port_data = port['port']
with session.begin(subtransactions=True):
self._ensure_default_security_group_on_port(context, port)
sgids = self._get_security_groups_on_port(context, port)
# Set port status as 'DOWN'. This will be updated by agent
port['port']['status'] = q_const.PORT_STATUS_DOWN
vnic_type = self._process_port_binding_create(context,
port['port'])
port = super(MellanoxEswitchPlugin,
self).create_port(context, port)
self._process_portbindings_create_and_update(context,
port_data,
port)
db.add_port_profile_binding(context.session, port['id'], vnic_type)
self._process_port_create_security_group(
context, port, sgids)
self.notify_security_groups_member_updated(context, port)
return self._extend_port_dict_binding(context, port)
def get_port(self, context, id, fields=None):
port = super(MellanoxEswitchPlugin, self).get_port(context, id, fields)
return self._fields(self._extend_port_dict_binding(context, port),
port = super(MellanoxEswitchPlugin, self).get_port(context,
id,
fields)
self._extend_port_dict_binding(context, port)
return self._fields(port, fields)
def get_ports(self, context, filters=None, fields=None):
ports = super(MellanoxEswitchPlugin, self).get_ports(
context, filters, fields)
return [self._fields(self._extend_port_dict_binding(context, port),
fields) for port in ports]
def get_ports(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, page_reverse=False):
res_ports = []
ports = super(MellanoxEswitchPlugin,
self).get_ports(context, filters, fields, sorts,
limit, marker, page_reverse)
for port in ports:
port = self._extend_port_dict_binding(context, port)
res_ports.append(self._fields(port, fields))
return res_ports
def update_port(self, context, port_id, port):
original_port = super(MellanoxEswitchPlugin, self).get_port(context,
port_id)
original_port = self.get_port(context, port_id)
session = context.session
need_port_update_notify = False
with session.begin(subtransactions=True):
port = super(MellanoxEswitchPlugin, self).update_port(context,
port_id,
port)
if original_port['admin_state_up'] != port['admin_state_up']:
updated_port = super(MellanoxEswitchPlugin, self).update_port(
context, port_id, port)
self._process_portbindings_create_and_update(context,
port['port'],
updated_port)
need_port_update_notify = self.update_security_group_on_port(
context, port_id, port, original_port, updated_port)
need_port_update_notify |= self.is_security_group_member_updated(
context, original_port, updated_port)
if original_port['admin_state_up'] != updated_port['admin_state_up']:
need_port_update_notify = True
if need_port_update_notify:
binding = db.get_network_binding(context.session,
port['network_id'])
self.notifier.port_update(context, port,
updated_port['network_id'])
self.notifier.port_update(context, updated_port,
binding.physical_network,
binding.network_type,
binding.segmentation_id)
return self._extend_port_dict_binding(context, port)
return self._extend_port_dict_binding(context, updated_port)
def delete_port(self, context, port_id, l3_port_check=True):
# if needed, check to see if this is a port owned by
@ -376,6 +469,8 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
session = context.session
with session.begin(subtransactions=True):
self.disassociate_floatingips(context, port_id)
port = self.get_port(context, port_id)
self._delete_port_security_group_bindings(context, port_id)
super(MellanoxEswitchPlugin, self).delete_port(context, port_id)
return super(MellanoxEswitchPlugin, self).delete_port(context,
port_id)
self.notify_security_groups_member_updated(context, port)

View File

@ -14,6 +14,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from neutron.common import constants as q_const
from neutron.common import rpc as q_rpc
@ -64,13 +65,16 @@ class MlnxRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
port['device'] = device
else:
port = db.get_port_from_device_mac(device)
if port:
port['device'] = device
return port
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
LOG.debug("Device %s details requested from %s", device, agent_id)
LOG.debug(_("Device %(device)s details requested from %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
port = self.get_port_from_device(device)
if port:
binding = db.get_network_binding(db_api.get_session(),
@ -78,13 +82,17 @@ class MlnxRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
entry = {'device': device,
'physical_network': binding.physical_network,
'network_type': binding.network_type,
'vlan_id': binding.segmentation_id,
'segmentation_id': binding.segmentation_id,
'network_id': port['network_id'],
'port_mac': port['mac_address'],
'port_id': port['id'],
'admin_state_up': port['admin_state_up']}
# Set the port status to UP
db.set_port_status(port['id'], q_const.PORT_STATUS_ACTIVE)
if cfg.CONF.AGENT.rpc_support_old_agents:
entry['vlan_id'] = binding.segmentation_id
new_status = (q_const.PORT_STATUS_ACTIVE if port['admin_state_up']
else q_const.PORT_STATUS_DOWN)
if port['status'] != new_status:
db.set_port_status(port['id'], new_status)
else:
entry = {'device': device}
LOG.debug("%s can not be found in database", device)
@ -96,10 +104,11 @@ class MlnxRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
device = kwargs.get('device')
LOG.debug(_("Device %(device)s no longer exists on %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
port = db.get_port_from_device(device)
port = self.get_port_from_device(device)
if port:
entry = {'device': device,
'exists': True}
if port['status'] != q_const.PORT_STATUS_DOWN:
# Set port status to DOWN
db.set_port_status(port['id'], q_const.PORT_STATUS_DOWN)
else:

View File

@ -0,0 +1,32 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from neutron.tests.unit.mlnx import test_mlnx_plugin
from neutron.tests.unit.openvswitch import test_agent_scheduler
class MlnxAgentSchedulerTestCase(
test_agent_scheduler.OvsAgentSchedulerTestCase):
plugin_str = test_mlnx_plugin.PLUGIN_NAME
class MlnxL3AgentNotifierTestCase(
test_agent_scheduler.OvsL3AgentNotifierTestCase):
plugin_str = test_mlnx_plugin.PLUGIN_NAME
class MlnxDhcpAgentNotifierTestCase(
test_agent_scheduler.OvsDhcpAgentNotifierTestCase):
plugin_str = test_mlnx_plugin.PLUGIN_NAME

View File

@ -16,6 +16,8 @@
from neutron.plugins.mlnx.common import constants
from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit import test_db_plugin as test_plugin
from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
PLUGIN_NAME = ('neutron.plugins.mlnx.mlnx_plugin.MellanoxEswitchPlugin')
@ -25,6 +27,7 @@ class MlnxPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
def setUp(self):
super(MlnxPluginV2TestCase, self).setUp(self._plugin_name)
self.port_create_status = 'DOWN'
class TestMlnxBasicGet(test_plugin.TestBasicGet, MlnxPluginV2TestCase):
@ -49,3 +52,14 @@ class TestMlnxPortBinding(MlnxPluginV2TestCase,
test_bindings.PortBindingsTestCase):
VIF_TYPE = constants.VIF_TYPE_DIRECT
HAS_PORT_FILTER = False
class TestMlnxPortBindingNoSG(TestMlnxPortBinding):
HAS_PORT_FILTER = False
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
class TestMlnxPortBindingHost(
MlnxPluginV2TestCase,
test_bindings.PortBindingsHostTestCaseMixin):
pass

View File

@ -0,0 +1,103 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import webob.exc
from neutron.api.v2 import attributes
from neutron.extensions import securitygroup as ext_sg
from neutron.plugins.mlnx.db import mlnx_db_v2 as mlnx_db
from neutron.tests.unit import test_extension_security_group as test_sg
from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
PLUGIN_NAME = ('neutron.plugins.mlnx.'
'mlnx_plugin.MellanoxEswitchPlugin')
AGENT_NAME = ('neutron.plugins.mlnx.'
'agent.eswitch_neutron_agent.MlnxEswitchNeutronAgent')
NOTIFIER = ('neutron.plugins.mlnx.'
'agent_notify_api.AgentNotifierApi')
class MlnxSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
_plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
test_sg_rpc.set_firewall_driver(test_sg_rpc.FIREWALL_IPTABLES_DRIVER)
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
notifier_cls.return_value = self.notifier
self._attribute_map_bk_ = {}
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
self._attribute_map_bk_[item] = (attributes.
RESOURCE_ATTRIBUTE_MAP[item].
copy())
super(MlnxSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
self.addCleanup(mock.patch.stopall)
def tearDown(self):
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
super(MlnxSecurityGroupsTestCase, self).tearDown()
class TestMlnxSecurityGroups(MlnxSecurityGroupsTestCase,
test_sg.TestSecurityGroups,
test_sg_rpc.SGNotificationTestMixin):
pass
class TestMlnxSecurityGroupsXML(TestMlnxSecurityGroups):
fmt = 'xml'
class TestMlnxSecurityGroupsDB(MlnxSecurityGroupsTestCase):
def test_security_group_get_port_from_device(self):
with self.network() as n:
with self.subnet(n):
with self.security_group() as sg:
security_group_id = sg['security_group']['id']
res = self._create_port(self.fmt, n['network']['id'])
port = self.deserialize(self.fmt, res)
fixed_ips = port['port']['fixed_ips']
data = {'port': {'fixed_ips': fixed_ips,
'name': port['port']['name'],
ext_sg.SECURITYGROUPS:
[security_group_id]}}
req = self.new_update_request('ports', data,
port['port']['id'])
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
res = self.deserialize(self.fmt,
req.get_response(self.api))
port_id = res['port']['id']
device_id = port_id[:8]
port_dict = mlnx_db.get_port_from_device(device_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
self.assertEqual([], port_dict['security_group_rules'])
self.assertEqual([fixed_ips[0]['ip_address']],
port_dict['fixed_ips'])
self._delete('ports', port['port']['id'])
def test_security_group_get_port_from_device_with_no_port(self):
port_dict = mlnx_db.get_port_from_device('bad_device_id')
self.assertEqual(None, port_dict)
class TestMlnxSecurityGroupsDBXML(TestMlnxSecurityGroupsDB):
fmt = 'xml'

View File

@ -19,6 +19,7 @@
Unit Tests for Mellanox RPC (major reuse of linuxbridge rpc unit tests)
"""
from oslo.config import cfg
import stubout
from neutron.agent import rpc as agent_rpc
@ -31,9 +32,11 @@ from neutron.tests import base
class rpcApiTestCase(base.BaseTestCase):
def _test_mlnx_api(self, rpcapi, topic, method, rpc_method, **kwargs):
def _test_mlnx_api(self, rpcapi, topic, method, rpc_method,
expected_msg=None, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
if not expected_msg:
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
@ -53,11 +56,11 @@ class rpcApiTestCase(base.BaseTestCase):
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
self.assertEqual(expected_retval, retval)
expected_args = [ctxt, topic, expected_msg]
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
self.assertEqual(expected_arg, arg)
def test_delete_network(self):
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
@ -69,12 +72,58 @@ class rpcApiTestCase(base.BaseTestCase):
network_id='fake_request_spec')
def test_port_update(self):
cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT')
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
expected_msg = rpcapi.make_msg('port_update',
port='fake_port',
network_type='vlan',
physical_network='fake_net',
segmentation_id='fake_vlan_id')
self._test_mlnx_api(rpcapi,
topics.get_topic_name(topics.AGENT,
topics.PORT,
topics.UPDATE),
'port_update', rpc_method='fanout_cast',
expected_msg=expected_msg,
port='fake_port',
network_type='vlan',
physical_network='fake_net',
vlan_id='fake_vlan_id')
def test_port_update_ib(self):
cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT')
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
expected_msg = rpcapi.make_msg('port_update',
port='fake_port',
network_type='ib',
physical_network='fake_net',
segmentation_id='fake_vlan_id')
self._test_mlnx_api(rpcapi,
topics.get_topic_name(topics.AGENT,
topics.PORT,
topics.UPDATE),
'port_update', rpc_method='fanout_cast',
expected_msg=expected_msg,
port='fake_port',
network_type='ib',
physical_network='fake_net',
vlan_id='fake_vlan_id')
def test_port_update_old_agent(self):
cfg.CONF.set_override('rpc_support_old_agents', True, 'AGENT')
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
expected_msg = rpcapi.make_msg('port_update',
port='fake_port',
network_type='vlan',
physical_network='fake_net',
segmentation_id='fake_vlan_id',
vlan_id='fake_vlan_id')
self._test_mlnx_api(rpcapi,
topics.get_topic_name(topics.AGENT,
topics.PORT,
topics.UPDATE),
'port_update', rpc_method='fanout_cast',
expected_msg=expected_msg,
port='fake_port',
network_type='vlan',
physical_network='fake_net',