Multiple port binding for ML2

Functionality is added to the ML2 plugin to handle multiple port
bindings

Co-Authored-By: Anindita Das <anindita.das@intel.com>
Co-Authored-By: Miguel Lavalle <miguel.lavalle@huawei.com>

Partial-Bug: #1580880

Change-Id: Ie31d4e27e3f55edfe334c4029ca9ed685e684c39
changes/51/414251/74
Jakub Libosvar 5 years ago committed by Miguel Lavalle
parent e8168345ef
commit f7b62a7f29
  1. 6
      neutron/agent/rpc.py
  2. 20
      neutron/common/exceptions.py
  3. 35
      neutron/common/utils.py
  4. 77
      neutron/extensions/portbindings_extended.py
  5. 18
      neutron/objects/ports.py
  6. 5
      neutron/plugins/ml2/driver_context.py
  7. 4
      neutron/plugins/ml2/models.py
  8. 324
      neutron/plugins/ml2/plugin.py
  9. 9
      neutron/services/logapi/common/validators.py
  10. 9
      neutron/services/qos/drivers/manager.py
  11. 1
      neutron/tests/contrib/hooks/api_all_extensions
  12. 32
      neutron/tests/unit/common/test_utils.py
  13. 2
      neutron/tests/unit/objects/test_objects.py
  14. 34
      neutron/tests/unit/objects/test_ports.py
  15. 41
      neutron/tests/unit/plugins/ml2/test_plugin.py
  16. 313
      neutron/tests/unit/plugins/ml2/test_port_binding.py
  17. 2
      neutron/tests/unit/services/logapi/common/test_validators.py
  18. 2
      neutron/tests/unit/services/qos/drivers/test_manager.py

@ -29,6 +29,7 @@ from neutron.agent import resource_cache
from neutron.api.rpc.callbacks import resources
from neutron.common import constants as n_const
from neutron.common import rpc as n_rpc
from neutron.common import utils
from neutron import objects
LOG = logging.getLogger(__name__)
@ -239,6 +240,9 @@ class CacheBackedPluginApi(PluginApi):
# match format of old RPC interface
mac_addr = str(netaddr.EUI(str(port_obj.mac_address),
dialect=netaddr.mac_unix_expanded))
binding = utils.get_port_binding_by_status_and_host(
port_obj.binding, constants.ACTIVE, raise_if_not_found=True,
port_id=port_obj.id)
entry = {
'device': device,
'network_id': port_obj.network_id,
@ -259,7 +263,7 @@ class CacheBackedPluginApi(PluginApi):
'port_security_enabled', True),
'qos_policy_id': port_obj.qos_policy_id,
'network_qos_policy_id': net_qos_policy_id,
'profile': port_obj.binding.profile,
'profile': binding.profile,
'security_groups': list(port_obj.security_group_ids)
}
LOG.debug("Returning: %s", entry)

@ -334,3 +334,23 @@ class FilterIDForIPNotFound(e.NotFound):
class FailedToAddQdiscToDevice(e.NeutronException):
message = _("Failed to add %(direction)s qdisc "
"to device %(device)s.")
class PortBindingNotFound(e.NotFound):
message = _("Binding for port %(port_id)s for host %(host)s could not be "
"found.")
class PortBindingAlreadyActive(e.Conflict):
message = _("Binding for port %(port_id)s on host %(host)s is already "
"active.")
class PortBindingAlreadyExists(e.Conflict):
message = _("Binding for port %(port_id)s on host %(host)s already "
"exists.")
class PortBindingError(e.NeutronException):
message = _("Binding for port %(port_id)s on host %(host)s could not be "
"created or updated.")

@ -33,6 +33,7 @@ import uuid
import eventlet
from eventlet.green import subprocess
import netaddr
from neutron_lib.api.definitions import portbindings_extended as pb_ext
from neutron_lib import constants as n_const
from neutron_lib.utils import helpers
from oslo_config import cfg
@ -43,6 +44,7 @@ import six
import neutron
from neutron._i18n import _
from neutron.common import exceptions
from neutron.db import api as db_api
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
@ -783,3 +785,36 @@ def bytes_to_bits(value):
def bits_to_kilobits(value, base):
# NOTE(slaweq): round up that even 1 bit will give 1 kbit as a result
return int((value + (base - 1)) / base)
def get_port_binding_by_status_and_host(bindings, status, host='',
raise_if_not_found=False,
port_id=None):
"""Returns from an iterable the binding with the specified status and host.
The input iterable can contain zero or one binding in status ACTIVE
and zero or many bindings in status INACTIVE. As a consequence, to
unequivocally retrieve an inactive binding, the caller must specify a non
empty value for host. If host is the empty string, the first binding
satisfying the specified status will be returned. If no binding is found
with the specified status and host, None is returned or PortBindingNotFound
is raised if raise_if_not_found is True
:param bindings: An iterable containing port bindings
:param status: The status of the port binding to return. Possible values
are ACTIVE or INACTIVE as defined in
:file:`neutron_lib/constants.py`.
:param host: str representing the host of the binding to return.
:param raise_if_not_found: If a binding is not found and this parameter is
True, a PortBindingNotFound exception is raised
:param port_id: The id of the binding's port
:returns: The searched for port binding or None if it is not found
:raises: PortBindingNotFound if the binding is not found and
raise_if_not_found is True
"""
for binding in bindings:
if binding[pb_ext.STATUS] == status:
if not host or binding[pb_ext.HOST] == host:
return binding
if raise_if_not_found:
raise exceptions.PortBindingNotFound(port_id=port_id, host=host)

@ -0,0 +1,77 @@
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import portbindings_extended as pbe_ext
from neutron_lib.api import extensions as api_extensions
from neutron_lib.plugins import directory
from neutron.api import extensions
from neutron.api.v2 import base
EXT_ALIAS = pbe_ext.ALIAS
class Portbindings_extended(api_extensions.ExtensionDescriptor):
"""Extension class supporting port bindings.
This class is used by neutron's extension framework to make
metadata about the port bindings available to external applications.
With admin rights one will be able to update and read the values.
"""
@classmethod
def get_name(cls):
return pbe_ext.NAME
@classmethod
def get_alias(cls):
return pbe_ext.ALIAS
@classmethod
def get_description(cls):
return pbe_ext.DESCRIPTION
@classmethod
def get_updated(cls):
return pbe_ext.UPDATED_TIMESTAMP
@classmethod
def get_resources(cls):
plugin = directory.get_plugin()
params = pbe_ext.SUB_RESOURCE_ATTRIBUTE_MAP[
pbe_ext.COLLECTION_NAME]['parameters']
parent = pbe_ext.SUB_RESOURCE_ATTRIBUTE_MAP[
pbe_ext.COLLECTION_NAME]['parent']
controller = base.create_resource(
pbe_ext.COLLECTION_NAME,
pbe_ext.RESOURCE_NAME,
plugin,
params,
member_actions=pbe_ext.ACTION_MAP[pbe_ext.RESOURCE_NAME],
parent=parent,
allow_pagination=True,
allow_sorting=True,
)
exts = [
extensions.ResourceExtension(
pbe_ext.COLLECTION_NAME,
controller,
parent,
member_actions=pbe_ext.ACTION_MAP[pbe_ext.RESOURCE_NAME],
attr_map=params,
),
]
return exts

@ -264,7 +264,8 @@ class Port(base.NeutronDbObject):
# Version 1.1: Add data_plane_status field
# Version 1.2: Added segment_id to binding_levels
# Version 1.3: distributed_binding -> distributed_bindings
VERSION = '1.3'
# Version 1.4: Attribute binding becomes ListOfObjectsField
VERSION = '1.4'
db_model = models_v2.Port
@ -282,7 +283,7 @@ class Port(base.NeutronDbObject):
'allowed_address_pairs': obj_fields.ListOfObjectsField(
'AllowedAddressPair', nullable=True
),
'binding': obj_fields.ObjectField(
'binding': obj_fields.ListOfObjectsField(
'PortBinding', nullable=True
),
'data_plane_status': obj_fields.ObjectField(
@ -473,6 +474,19 @@ class Port(base.NeutronDbObject):
bindings = primitive.pop('distributed_bindings', [])
primitive['distributed_binding'] = (bindings[0]
if bindings else None)
if _target_version < (1, 4):
# In version 1.4 we add support for multiple port bindings.
# Previous versions only support one port binding. The following
# lines look for the active port binding, which is the only one
# needed in previous versions
if 'binding' in primitive:
original_binding = primitive['binding']
primitive['binding'] = None
for a_binding in original_binding:
if (a_binding['versioned_object.data']['status'] ==
constants.ACTIVE):
primitive['binding'] = a_binding
break
@classmethod
def get_ports_by_router(cls, context, router_id, owner, subnet):

@ -50,6 +50,11 @@ class InstanceSnapshot(object):
session.add(self._model_class(**{col: getattr(self, col)
for col in self._cols}))
def __getitem__(self, item):
if item not in self._cols:
raise KeyError(item)
return getattr(self, item)
class MechanismDriverContext(object):
"""MechanismDriver context base class."""

@ -58,8 +58,10 @@ class PortBinding(model_base.BASEV2):
port = orm.relationship(
models_v2.Port,
load_on_pending=True,
# TODO(mlavalle) change name of the relationship to reflect that it is
# now an iterable
backref=orm.backref("port_binding",
lazy='joined', uselist=False,
lazy='joined',
cascade='delete'))
revises_on_change = ('port', )

@ -23,6 +23,7 @@ from neutron_lib.api.definitions import network_mtu_writable as mtuw_apidef
from neutron_lib.api.definitions import port as port_def
from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import portbindings_extended as pbe_ext
from neutron_lib.api.definitions import subnet as subnet_def
from neutron_lib.api.definitions import vlantransparent as vlan_apidef
from neutron_lib.api import extensions
@ -64,6 +65,7 @@ from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import resources_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
from neutron.common import utils
from neutron.db import _model_query as model_query
@ -87,6 +89,8 @@ from neutron.db import subnet_service_type_mixin
from neutron.db import vlantransparent_db
from neutron.extensions import providernet as provider
from neutron.extensions import vlantransparent
from neutron.objects import base as base_obj
from neutron.objects import ports as ports_obj
from neutron.plugins.ml2.common import exceptions as ml2_exc
from neutron.plugins.ml2 import db
from neutron.plugins.ml2 import driver_context
@ -113,7 +117,7 @@ def _ml2_port_result_filter_hook(query, filters):
if not values:
return query
bind_criteria = models.PortBinding.host.in_(values)
return query.filter(models_v2.Port.port_binding.has(bind_criteria))
return query.filter(models_v2.Port.port_binding.any(bind_criteria))
@resource_extend.has_resource_extenders
@ -161,7 +165,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
"ip-substring-filtering",
"port-security-groups-filtering",
"empty-string-filtering",
"port-mac-address-regenerate"]
"port-mac-address-regenerate",
"binding-extended"]
@property
def supported_extension_aliases(self):
@ -241,12 +246,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
**kwargs):
port_id = object_id
port = db.get_port(context, port_id)
if not port or not port.port_binding:
port_binding = utils.get_port_binding_by_status_and_host(
getattr(port, 'port_binding', []), const.ACTIVE)
if not port or not port_binding:
LOG.debug("Port %s was deleted so its status cannot be updated.",
port_id)
return
if port.port_binding.vif_type in (portbindings.VIF_TYPE_BINDING_FAILED,
portbindings.VIF_TYPE_UNBOUND):
if port_binding.vif_type in (portbindings.VIF_TYPE_BINDING_FAILED,
portbindings.VIF_TYPE_UNBOUND):
# NOTE(kevinbenton): we hit here when a port is created without
# a host ID and the dhcp agent notifies that its wiring is done
LOG.debug("Port %s cannot update to ACTIVE because it "
@ -320,13 +327,15 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
new_mac=port['mac_address'])
return mac_change
def _process_port_binding(self, mech_context, attrs):
plugin_context = mech_context._plugin_context
binding = mech_context._binding
port = mech_context.current
port_id = port['id']
changes = False
def _clear_port_binding(self, mech_context, binding, port, original_host):
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
binding.vif_details = ''
db.clear_binding_levels(mech_context._plugin_context, port['id'],
original_host)
mech_context._clear_binding_levels()
def _process_port_binding_attributes(self, binding, attrs):
changes = False
host = const.ATTR_NOT_SPECIFIED
if attrs and portbindings.HOST_ID in attrs:
host = attrs.get(portbindings.HOST_ID) or ''
@ -354,23 +363,28 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
msg = _("binding:profile value too large")
raise exc.InvalidInput(error_message=msg)
changes = True
return changes, original_host
def _process_port_binding(self, mech_context, attrs):
plugin_context = mech_context._plugin_context
binding = mech_context._binding
port = mech_context.current
changes, original_host = self._process_port_binding_attributes(binding,
attrs)
# Unbind the port if needed.
if changes:
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
binding.vif_details = ''
db.clear_binding_levels(plugin_context, port_id, original_host)
mech_context._clear_binding_levels()
self._clear_port_binding(mech_context, binding, port,
original_host)
port['status'] = const.PORT_STATUS_DOWN
super(Ml2Plugin, self).update_port(
mech_context._plugin_context, port_id,
{port_def.RESOURCE_NAME: {'status': const.PORT_STATUS_DOWN}})
mech_context._plugin_context, port['id'],
{port_def.RESOURCE_NAME:
{'status': const.PORT_STATUS_DOWN}})
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
binding.vif_details = ''
db.clear_binding_levels(plugin_context, port_id, original_host)
mech_context._clear_binding_levels()
self._clear_port_binding(mech_context, binding, port,
original_host)
binding.host = ''
self._update_port_dict_binding(port, binding)
@ -379,7 +393,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@db_api.retry_db_errors
def _bind_port_if_needed(self, context, allow_notify=False,
need_notify=False):
need_notify=False, allow_commit=True):
if not context.network.network_segments:
LOG.debug("Network %s has no segments, skipping binding",
context.network.current['id'])
@ -399,7 +413,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
context, need_notify)
if count == MAX_BIND_TRIES or not try_again:
if self._should_bind_port(context):
if self._should_bind_port(context) and allow_commit:
# At this point, we attempted to bind a port and reached
# its final binding state. Binding either succeeded or
# exhausted all attempts, thus no need to try again.
@ -488,7 +502,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# mechanism driver update_port_*commit() calls.
try:
port_db = self._get_port(plugin_context, port_id)
cur_binding = port_db.port_binding
cur_binding = utils.get_port_binding_by_status_and_host(
port_db.port_binding, const.ACTIVE)
except exc.PortNotFound:
port_db, cur_binding = None, None
if not port_db or not cur_binding:
@ -532,8 +547,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# to optimize this code to avoid fetching it.
cur_binding = db.get_distributed_port_binding_by_host(
plugin_context, port_id, orig_binding.host)
cur_context_binding = cur_binding
if new_binding.status == const.INACTIVE:
cur_context_binding = (
utils.get_port_binding_by_status_and_host(
port_db.port_binding, const.INACTIVE,
host=new_binding.host))
cur_context = driver_context.PortContext(
self, plugin_context, port, network, cur_binding, None,
self, plugin_context, port, network, cur_context_binding, None,
original_port=oport)
# Commit our binding results only if port has not been
@ -549,20 +570,24 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if commit:
# Update the port's binding state with our binding
# results.
cur_binding.vif_type = new_binding.vif_type
cur_binding.vif_details = new_binding.vif_details
if new_binding.status == const.INACTIVE:
cur_context_binding.status = const.ACTIVE
cur_binding.status = const.INACTIVE
else:
cur_context_binding.vif_type = new_binding.vif_type
cur_context_binding.vif_details = new_binding.vif_details
db.clear_binding_levels(plugin_context, port_id,
cur_binding.host)
db.set_binding_levels(plugin_context,
bind_context._binding_levels)
# refresh context with a snapshot of updated state
cur_context._binding = driver_context.InstanceSnapshot(
cur_binding)
cur_context_binding)
cur_context._binding_levels = bind_context._binding_levels
# Update PortContext's port dictionary to reflect the
# updated binding state.
self._update_port_dict_binding(port, cur_binding)
self._update_port_dict_binding(port, cur_context_binding)
# Update the port status if requested by the bound driver.
if (bind_context._binding_levels and
@ -632,9 +657,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@resource_extend.extends([port_def.COLLECTION_NAME])
def _ml2_extend_port_dict_binding(port_res, port_db):
plugin = directory.get_plugin()
port_binding = utils.get_port_binding_by_status_and_host(
port_db.port_binding, const.ACTIVE)
# None when called during unit tests for other plugins.
if port_db.port_binding:
plugin._update_port_dict_binding(port_res, port_db.port_binding)
if port_binding:
plugin._update_port_dict_binding(port_res, port_binding)
# ML2's resource extend functions allow extension drivers that extend
# attributes for the resources to add those attributes to the result.
@ -1304,7 +1331,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
original_port=original_port)
with db_api.context_manager.writer.using(context):
port_db = self._get_port(context, id)
binding = port_db.port_binding
binding = utils.get_port_binding_by_status_and_host(
port_db.port_binding, const.ACTIVE)
if not binding:
raise exc.PortNotFound(port_id=id)
mac_address_updated = self._check_mac_update_allowed(
@ -1522,7 +1550,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
with db_api.context_manager.writer.using(context):
try:
port_db = self._get_port(context, id)
binding = port_db.port_binding
binding = utils.get_port_binding_by_status_and_host(
port_db.port_binding, const.ACTIVE,
raise_if_not_found=True, port_id=id)
except exc.PortNotFound:
LOG.debug("The port '%s' was deleted", id)
return
@ -1640,14 +1670,15 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# related attribute port_binding could disappear in
# concurrent port deletion.
# It's not an error condition.
binding = port_db.port_binding
binding = utils.get_port_binding_by_status_and_host(
port_db.port_binding, const.ACTIVE)
if not binding:
LOG.info("Binding info for port %s was not found, "
"it might have been deleted already.",
port_id)
return
levels = db.get_binding_levels(plugin_context, port_db.id,
port_db.port_binding.host)
binding.host)
port_context = driver_context.PortContext(
self, plugin_context, port, network, binding, levels)
@ -1683,7 +1714,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
plugin_context, port['id'], host)
bindlevelhost_match = host
else:
binding = port_db.port_binding
binding = utils.get_port_binding_by_status_and_host(
port_db.port_binding, const.ACTIVE)
bindlevelhost_match = binding.host if binding else None
if not binding:
LOG.info("Binding info for port %s was not found, "
@ -1768,11 +1800,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# listening for db events can modify the port if necessary
context.session.flush()
updated_port = self._make_port_dict(port)
levels = db.get_binding_levels(context, port.id,
port.port_binding.host)
binding = utils.get_port_binding_by_status_and_host(
port.port_binding, const.ACTIVE, raise_if_not_found=True,
port_id=port_id)
levels = db.get_binding_levels(context, port.id, binding.host)
mech_context = driver_context.PortContext(
self, context, updated_port, network, port.port_binding,
levels, original_port=original_port)
self, context, updated_port, network, binding, levels,
original_port=original_port)
self.mechanism_manager.update_port_precommit(mech_context)
updated = True
elif port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
@ -1964,3 +1998,213 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.mechanism_manager.update_network_precommit(mech_context)
elif event == events.AFTER_CREATE or event == events.AFTER_DELETE:
self.mechanism_manager.update_network_postcommit(mech_context)
@staticmethod
def _validate_compute_port(port):
if not port['device_owner'].startswith(
const.DEVICE_OWNER_COMPUTE_PREFIX):
msg = _('Invalid port %s. Operation only valid on compute '
'ports') % port['id']
raise exc.BadRequest(resource='port', msg=msg)
def _make_port_binding_dict(self, binding, fields=None):
res = {key: binding[key] for key in (
pbe_ext.HOST, pbe_ext.VIF_TYPE, pbe_ext.VNIC_TYPE,
pbe_ext.STATUS)}
if isinstance(binding, ports_obj.PortBinding):
res[pbe_ext.PROFILE] = binding.profile or {}
res[pbe_ext.VIF_DETAILS] = binding.vif_details or {}
else:
res[pbe_ext.PROFILE] = self._get_profile(binding)
res[pbe_ext.VIF_DETAILS] = self._get_vif_details(binding)
return db_utils.resource_fields(res, fields)
def _get_port_binding_attrs(self, binding, host=None):
return {portbindings.VNIC_TYPE: binding.get(pbe_ext.VNIC_TYPE),
portbindings.HOST_ID: binding.get(pbe_ext.HOST) or host,
portbindings.PROFILE: binding.get(pbe_ext.PROFILE, {})}
def _process_active_binding_change(self, changes, mech_context, port_dict,
original_host):
if changes:
self._clear_port_binding(mech_context,
mech_context._binding, port_dict,
original_host)
port_dict['status'] = const.PORT_STATUS_DOWN
super(Ml2Plugin, self).update_port(
mech_context._plugin_context, port_dict['id'],
{port_def.RESOURCE_NAME:
{'status': const.PORT_STATUS_DOWN}})
self._update_port_dict_binding(port_dict,
mech_context._binding)
mech_context._binding.persist_state_to_session(
mech_context._plugin_context.session)
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def create_port_binding(self, context, port_id, binding):
attrs = binding[pbe_ext.RESOURCE_NAME]
with db_api.context_manager.writer.using(context):
port_db = self._get_port(context, port_id)
self._validate_compute_port(port_db)
if self._get_binding_for_host(port_db.port_binding,
attrs[pbe_ext.HOST]):
raise n_exc.PortBindingAlreadyExists(
port_id=port_id, host=attrs[pbe_ext.HOST])
status = const.ACTIVE
is_active_binding = True
active_binding = utils.get_port_binding_by_status_and_host(
port_db.port_binding, const.ACTIVE)
if active_binding:
status = const.INACTIVE
is_active_binding = False
network = self.get_network(context, port_db['network_id'])
port_dict = self._make_port_dict(port_db)
new_binding = models.PortBinding(
port_id=port_id,
vif_type=portbindings.VIF_TYPE_UNBOUND,
status=status)
mech_context = driver_context.PortContext(self, context, port_dict,
network, new_binding,
None)
changes, original_host = self._process_port_binding_attributes(
mech_context._binding, self._get_port_binding_attrs(attrs))
if is_active_binding:
self._process_active_binding_change(changes, mech_context,
port_dict, original_host)
bind_context = self._bind_port_if_needed(
mech_context, allow_commit=is_active_binding)
if (bind_context._binding.vif_type ==
portbindings.VIF_TYPE_BINDING_FAILED):
raise n_exc.PortBindingError(port_id=port_id,
host=attrs[pbe_ext.HOST])
bind_context._binding.port_id = port_id
bind_context._binding.status = status
if not is_active_binding:
with db_api.context_manager.writer.using(context):
bind_context._binding.persist_state_to_session(context.session)
db.set_binding_levels(context, bind_context._binding_levels)
return self._make_port_binding_dict(bind_context._binding)
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def get_port_bindings(self, context, port_id, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
port = ports_obj.Port.get_object(context, id=port_id)
if not port:
raise exc.PortNotFound(port_id=port_id)
self._validate_compute_port(port)
filters = filters or {}
pager = base_obj.Pager(sorts, limit, page_reverse, marker)
bindings = ports_obj.PortBinding.get_objects(
context, _pager=pager, port_id=port_id, **filters)
return [self._make_port_binding_dict(binding, fields)
for binding in bindings]
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def get_port_binding(self, context, host, port_id, fields=None):
port = ports_obj.Port.get_object(context, id=port_id)
if not port:
raise exc.PortNotFound(port_id=port_id)
self._validate_compute_port(port)
binding = ports_obj.PortBinding.get_object(context, host=host,
port_id=port_id)
if not binding:
raise n_exc.PortBindingNotFound(port_id=port_id, host=host)
return self._make_port_binding_dict(binding, fields)
def _get_binding_for_host(self, bindings, host):
for binding in bindings:
if binding.host == host:
return binding
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def update_port_binding(self, context, host, port_id, binding):
attrs = binding[pbe_ext.RESOURCE_NAME]
with db_api.context_manager.writer.using(context):
port_db = self._get_port(context, port_id)
self._validate_compute_port(port_db)
original_binding = self._get_binding_for_host(port_db.port_binding,
host)
if not original_binding:
raise n_exc.PortBindingNotFound(port_id=port_id, host=host)
is_active_binding = (original_binding.status == const.ACTIVE)
network = self.get_network(context, port_db['network_id'])
port_dict = self._make_port_dict(port_db)
mech_context = driver_context.PortContext(self, context, port_dict,
network,
original_binding, None)
changes, original_host = self._process_port_binding_attributes(
mech_context._binding, self._get_port_binding_attrs(attrs,
host=host))
if is_active_binding:
self._process_active_binding_change(changes, mech_context,
port_dict, original_host)
bind_context = self._bind_port_if_needed(
mech_context, allow_commit=is_active_binding)
if (bind_context._binding.vif_type ==
portbindings.VIF_TYPE_BINDING_FAILED):
raise n_exc.PortBindingError(port_id=port_id, host=host)
if not is_active_binding:
with db_api.context_manager.writer.using(context):
bind_context._binding.persist_state_to_session(context.session)
db.set_binding_levels(context, bind_context._binding_levels)
return self._make_port_binding_dict(bind_context._binding)
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def activate(self, context, host, port_id):
with db_api.context_manager.writer.using(context):
# TODO(mlavalle) Next two lines can be removed when bug #1770267 is
# fixed
if isinstance(port_id, dict):
port_id = port_id['port_id']
port_db = self._get_port(context, port_id)
self._validate_compute_port(port_db)
active_binding = utils.get_port_binding_by_status_and_host(
port_db.port_binding, const.ACTIVE)
if host == (active_binding and active_binding.host):
raise n_exc.PortBindingAlreadyActive(port_id=port_id,
host=host)
inactive_binding = utils.get_port_binding_by_status_and_host(
port_db.port_binding, const.INACTIVE, host=host)
if not inactive_binding or inactive_binding.host != host:
raise n_exc.PortBindingNotFound(port_id=port_id, host=host)
network = self.get_network(context, port_db['network_id'])
port_dict = self._make_port_dict(port_db)
levels = db.get_binding_levels(context, port_id,
active_binding.host)
original_context = driver_context.PortContext(self, context,
port_dict, network,
active_binding,
levels)
self._clear_port_binding(original_context, active_binding,
port_dict, active_binding.host)
port_dict['status'] = const.PORT_STATUS_DOWN
super(Ml2Plugin, self).update_port(
context, port_dict['id'],
{port_def.RESOURCE_NAME:
{'status': const.PORT_STATUS_DOWN}})
levels = db.get_binding_levels(context, port_id,
inactive_binding.host)
bind_context = driver_context.PortContext(self, context, port_dict,
network,
inactive_binding, levels)
for count in range(MAX_BIND_TRIES):
cur_context, _, try_again = self._commit_port_binding(
original_context, bind_context, need_notify=True,
try_again=True)
if not try_again:
self.notifier.port_delete(context, port_id)
return self._make_port_binding_dict(cur_context._binding)
raise n_exc.PortBindingError(port_id=port_id, host=host)
@utils.transaction_guard
@db_api.retry_if_session_inactive()
def delete_port_binding(self, context, host, port_id):
ports_obj.PortBinding.delete_objects(context, host=host,
port_id=port_id)

@ -14,11 +14,13 @@
# under the License.
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants
from neutron_lib.plugins import constants as plugin_const
from neutron_lib.plugins import directory
from oslo_log import log as logging
from sqlalchemy.orm import exc as orm_exc
from neutron.common import utils
from neutron.db import _utils as db_utils
from neutron.db.models import securitygroup as sg_db
from neutron.objects import ports
@ -93,13 +95,16 @@ def validate_log_type_for_port(log_type, port):
log_plugin = directory.get_plugin(alias=plugin_const.LOG_API)
drivers = log_plugin.driver_manager.drivers
port_binding = utils.get_port_binding_by_status_and_host(
port.binding, constants.ACTIVE, raise_if_not_found=True,
port_id=port['id'])
for driver in drivers:
vif_type = port.binding.vif_type
vif_type = port_binding.vif_type
if vif_type not in SKIPPED_VIF_TYPES:
if not _validate_vif_type(driver, vif_type, port['id']):
continue
else:
vnic_type = port.binding.vnic_type
vnic_type = port_binding.vnic_type
if not _validate_vnic_type(driver, vnic_type, port['id']):
continue

@ -13,6 +13,7 @@
from neutron_lib.api.definitions import portbindings
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib import constants as lib_constants
from neutron_lib.services.qos import constants as qos_consts
from oslo_log import log as logging
@ -22,6 +23,7 @@ from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import utils
from neutron.objects.qos import policy as policy_object
@ -134,13 +136,16 @@ class QosServiceDriverManager(object):
self.rpc_notifications_required |= driver.requires_rpc_notifications
def validate_rule_for_port(self, rule, port):
port_binding = utils.get_port_binding_by_status_and_host(
port.binding, lib_constants.ACTIVE, raise_if_not_found=True,
port_id=port['id'])
for driver in self._drivers:
vif_type = port.binding.vif_type
vif_type = port_binding.vif_type
if vif_type not in SKIPPED_VIF_TYPES:
if not self._validate_vif_type(driver, vif_type, port['id']):
continue
else:
vnic_type = port.binding.vnic_type
vnic_type = port_binding.vnic_type
if not self._validate_vnic_type(driver, vnic_type, port['id']):
continue

@ -6,6 +6,7 @@ NETWORK_API_EXTENSIONS+=",allowed-address-pairs"
NETWORK_API_EXTENSIONS+=",auto-allocated-topology"
NETWORK_API_EXTENSIONS+=",availability_zone"
NETWORK_API_EXTENSIONS+=",binding"
NETWORK_API_EXTENSIONS+=",binding-extended"
NETWORK_API_EXTENSIONS+=",default-subnetpools"
NETWORK_API_EXTENSIONS+=",dhcp_agent_scheduler"
NETWORK_API_EXTENSIONS+=",dns-integration"

@ -21,14 +21,18 @@ import ddt
import eventlet
import mock
import netaddr
from neutron_lib.api.definitions import portbindings_extended as pb_ext
from neutron_lib import constants
from oslo_log import log as logging
from oslo_utils import uuidutils
import six
import testscenarios
import testtools
from neutron.common import constants as common_constants
from neutron.common import exceptions
from neutron.common import utils
from neutron.objects import ports
from neutron.tests import base
from neutron.tests.unit import tests
@ -524,3 +528,31 @@ class TestIECUnitConversions(BaseUnitConversionTest, base.BaseTestCase):
expected_kilobits,
utils.bits_to_kilobits(input_bits, self.base_unit)
)
class TestGetPortBindingByStatusAndHost(base.BaseTestCase):
def test_get_port_binding_by_status_and_host(self):
bindings = []
self.assertIsNone(utils.get_port_binding_by_status_and_host(
bindings, constants.INACTIVE))
bindings.extend([ports.PortBinding(
port_id=uuidutils.generate_uuid(), host='host-1',
status=constants.INACTIVE),
ports.PortBinding(
port_id=uuidutils.generate_uuid(), host='host-2',
status=constants.INACTIVE)])
self.assertEqual(
'host-1', utils.get_port_binding_by_status_and_host(
bindings,
constants.INACTIVE)[pb_ext.HOST])
self.assertEqual(
'host-2', utils.get_port_binding_by_status_and_host(
bindings,
constants.INACTIVE,
host='host-2')[pb_ext.HOST])
self.assertIsNone(utils.get_port_binding_by_status_and_host(
bindings, constants.ACTIVE))
self.assertRaises(exceptions.PortBindingNotFound,
utils.get_port_binding_by_status_and_host, bindings,
constants.ACTIVE, 'host', True, 'port_id')

@ -62,7 +62,7 @@ object_data = {
'NetworkPortSecurity': '1.0-b30802391a87945ee9c07582b4ff95e3',
'NetworkRBAC': '1.0-c8a67f39809c5a3c8c7f26f2f2c620b2',
'NetworkSegment': '1.0-57b7f2960971e3b95ded20cbc59244a8',
'Port': '1.3-4cb798ffc8b08f2657c0bd8afa708e9e',
'Port': '1.4-c3937b92962d5b43a09a7de2f44e0ab7',
'PortBinding': '1.0-3306deeaa6deb01e33af06777d48d578',
'PortBindingLevel': '1.1-50d47f63218f87581b6cd9a62db574e5',
'PortDataPlaneStatus': '1.0-25be74bda46c749653a10357676c0ab2',

@ -11,6 +11,7 @@
# under the License.
import mock
from neutron_lib import constants
from oslo_utils import uuidutils
import testscenarios
@ -418,3 +419,36 @@ class PortDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
binding_data = (
port_data['distributed_binding']['versioned_object.data'])
self.assertEqual(binding.host, binding_data['host'])
def test_v1_4_to_v1_3_converts_binding_to_portbinding_object(self):
port_v1_4 = self._create_test_port()
port_v1_3 = port_v1_4.obj_to_primitive(target_version='1.3')
# Port has no bindings, so binding attribute should be None
self.assertIsNone(port_v1_3['versioned_object.data']['binding'])
active_binding = ports.PortBinding(self.context, port_id=port_v1_4.id,
host='host1', vif_type='type')
inactive_binding = ports.PortBinding(
self.context, port_id=port_v1_4.id, host='host2', vif_type='type',
status=constants.INACTIVE)
active_binding.create()
inactive_binding.create()
port_v1_4 = ports.Port.get_object(self.context, id=port_v1_4.id)
port_v1_3 = port_v1_4.obj_to_primitive(target_version='1.3')
binding = port_v1_3['versioned_object.data']['binding']
# Port has active binding, so the binding attribute should point to it
self.assertEqual('host1', binding['versioned_object.data']['host'])
active_binding.delete()
port_v1_4 = ports.Port.get_object(self.context, id=port_v1_4.id)
port_v1_3 = port_v1_4.obj_to_primitive(target_version='1.3')
# Port has no active bindings, so binding attribute should be None
self.assertIsNone(port_v1_3['versioned_object.data']['binding'])
# Port with no binding attribute should be handled without raising
# exception
primitive = port_v1_4.obj_to_primitive()
primitive['versioned_object.data'].pop('binding')
port_v1_4_no_binding = port_v1_4.obj_from_primitive(primitive)
port_v1_4_no_binding.obj_to_primitive(target_version='1.3')

@ -738,9 +738,14 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
plugin = directory.get_plugin()
ups = mock.patch.object(plugin, 'update_port_status').start()
port_id = 'fake_port_id'
binding = mock.Mock(vif_type=portbindings.VIF_TYPE_OVS)
port = mock.Mock(
id=port_id, admin_state_up=False, port_binding=binding)
def getitem(key):
return constants.ACTIVE
binding = mock.MagicMock(vif_type=portbindings.VIF_TYPE_OVS)
binding.__getitem__.side_effect = getitem
port = mock.MagicMock(
id=port_id, admin_state_up=False, port_binding=[binding])
with mock.patch('neutron.plugins.ml2.plugin.db.get_port',
return_value=port):
plugin._port_provisioned('port', 'evt', 'trigger',
@ -1801,8 +1806,10 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
# create a port and delete it so we have an expired mechanism context
with self.port() as port:
plugin = directory.get_plugin()
binding = plugin._get_port(self.context,
port['port']['id']).port_binding
binding = utils.get_port_binding_by_status_and_host(
plugin._get_port(self.context,
port['port']['id']).port_binding,
constants.ACTIVE)
binding['host'] = 'test'
mech_context = driver_context.PortContext(
plugin, self.context, port['port'],
@ -1822,8 +1829,10 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
def _create_port_and_bound_context(self, port_vif_type, bound_vif_type):
with self.port() as port:
plugin = directory.get_plugin()
binding = plugin._get_port(
self.context, port['port']['id']).port_binding
binding = utils.get_port_binding_by_status_and_host(
plugin._get_port(self.context,
port['port']['id']).port_binding,
constants.ACTIVE)
binding['host'] = 'fake_host'
binding['vif_type'] = port_vif_type
# Generates port context to be used before the bind.
@ -1937,8 +1946,10 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
def test_update_port_binding_host_id_none(self):
with self.port() as port:
plugin = directory.get_plugin()
binding = plugin._get_port(
self.context, port['port']['id']).port_binding
binding = utils.get_port_binding_by_status_and_host(
plugin._get_port(self.context,
port['port']['id']).port_binding,
constants.ACTIVE)
with self.context.session.begin(subtransactions=True):
binding.host = 'test'
mech_context = driver_context.PortContext(
@ -1957,8 +1968,10 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
def test_update_port_binding_host_id_not_changed(self):
with self.port() as port:
plugin = directory.get_plugin()
binding = plugin._get_port(
self.context, port['port']['id']).port_binding
binding = utils.get_port_binding_by_status_and_host(
plugin._get_port(self.context,
port['port']['id']).port_binding,
constants.ACTIVE)
binding['host'] = 'test'
mech_context = driver_context.PortContext(
plugin, self.context, port['port'],
@ -2944,8 +2957,10 @@ class TestML2Segments(Ml2PluginV2TestCase):
# add writer here to make sure that the following operations are
# performed in the same session
with db_api.context_manager.writer.using(self.context):
binding = plugin._get_port(
self.context, port['port']['id']).port_binding
binding = utils.get_port_binding_by_status_and_host(
plugin._get_port(self.context,
port['port']['id']).port_binding,
constants.ACTIVE)
binding['host'] = 'host-ovs-no_filter'
mech_context = driver_context.PortContext(
plugin, self.context, port['port'],

@ -15,16 +15,23 @@
import mock
from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import portbindings_extended as pbe_ext
from neutron_lib import constants as const
from neutron_lib import context
from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_serialization import jsonutils
import webob.exc
from neutron.common import exceptions
from neutron.common import utils
from neutron.conf.plugins.ml2 import config
from neutron.conf.plugins.ml2.drivers import driver_type
from neutron.plugins.ml2 import driver_context
from neutron.plugins.ml2 import models as ml2_models
from neutron.plugins.ml2 import plugin as ml2_plugin
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
from neutron.tests.unit.plugins.ml2.drivers import mechanism_test
class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
@ -327,3 +334,309 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
# Get port and verify status is still DOWN.
port = self._show('ports', port_id)
self.assertEqual('DOWN', port['port']['status'])
class ExtendedPortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
host = 'host-ovs-no_filter'
def setUp(self):
# Enable the test mechanism driver to ensure that
# we can successfully call through to all mechanism
# driver apis.
config.register_ml2_plugin_opts()
cfg.CONF.set_override('mechanism_drivers',
['logger', 'test'],
'ml2')
driver_type.register_ml2_drivers_vlan_opts()
cfg.CONF.set_override('network_vlan_ranges',
['physnet1:1000:1099'],
group='ml2_type_vlan')
super(ExtendedPortBindingTestCase, self).setUp('ml2')
self.port_create_status = 'DOWN'
self.plugin = directory.get_plugin()
self.plugin.start_rpc_listeners()
def _create_port_binding(self, fmt, port_id, host, tenant_id=None,
**kwargs):
tenant_id = tenant_id or self._tenant_id
data = {'binding': {'host': host, 'tenant_id': tenant_id}}
if kwargs:
data['binding'].update(kwargs)
binding_resource = 'ports/%s/bindings' % port_id
binding_req = self.new_create_request(binding_resource, data, fmt)
return binding_req.get_response(self.api)
def _make_port_binding(self, fmt, port_id, host, **kwargs):
res = self._create_port_binding(fmt, port_id, host, **kwargs)
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _update_port_binding(self, fmt, port_id, host, **kwargs):
data = {'binding': kwargs}
binding_req = self.new_update_request('ports', data, port_id, fmt,
subresource='bindings',
sub_id=host)
return binding_req.get_response(self.api)
def _do_update_port_binding(self, fmt, port_id, host, **kwargs):
res = self._update_port_binding(fmt, port_id, host, **kwargs)
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _activate_port_binding(self, port_id, host, raw_response=True):
response = self._req('PUT', 'ports', id=port_id,
data={'port_id': port_id},
subresource='bindings', sub_id=host,
action='activate').get_response(self.api)
return self._check_code_and_serialize(response, raw_response)
def _check_code_and_serialize(self, response, raw_response):
if raw_response:
return response
if response.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=response.status_int)
return self.deserialize(self.fmt, response)
def _list_port_bindings(self, port_id, params=None, raw_response=True):
response = self._req(
'GET', 'ports', fmt=self.fmt, id=port_id, subresource='bindings',
params=params).get_response(self.api)
return self._check_code_and_serialize(response, raw_response)
def _show_port_binding(self, port_id, host, params=None,
raw_response=True):
response = self._req(
'GET', 'ports', fmt=self.fmt, id=port_id, subresource='bindings',
sub_id=host, params=params).get_response(self.api)
return self._check_code_and_serialize(response, raw_response)
def _delete_port_binding(self, port_id, host):
response = self._req(
'DELETE', 'ports', fmt=self.fmt, id=port_id,
subresource='bindings', sub_id=host).get_response(self.api)
return response
def _create_port_and_binding(self, **kwargs):
device_owner = '%s%s' % (const.DEVICE_OWNER_COMPUTE_PREFIX, 'nova')
with self.port(device_owner=device_owner) as port:
port_id = port['port']['id']
binding = self._make_port_binding(self.fmt, port_id, self.host,
**kwargs)['binding']
self._assert_bound_port_binding(binding)
return port['port'], binding
def _assert_bound_port_binding(self, binding):
self.assertEqual(self.host, binding[pbe_ext.HOST])
self.assertEqual(portbindings.VIF_TYPE_OVS,
binding[pbe_ext.VIF_TYPE])
self.assertEqual({'port_filter': False},
binding[pbe_ext.VIF_DETAILS])
def _assert_unbound_port_binding(self, binding):
self.assertFalse(binding[pbe_ext.HOST])
self.assertEqual(portbindings.VIF_TYPE_UNBOUND,
binding[pbe_ext.VIF_TYPE])
self.assertEqual({}, binding[pbe_ext.VIF_DETAILS])
self.assertEqual({}, binding[pbe_ext.PROFILE])
def test_create_port_binding(self):
profile = {'key1': 'value1'}
kwargs = {pbe_ext.PROFILE: profile}
port, binding = self._create_port_and_binding(**kwargs)
self._assert_bound_port_binding(binding)
self.assertEqual({"key1": "value1"}, binding[pbe_ext.PROFILE])
def test_create_duplicate_port_binding(self):
device_owner = '%s%s' % (const.DEVICE_OWNER_COMPUTE_PREFIX, 'nova')
host_arg = {portbindings.HOST_ID: self.host}
with self.port(device_owner=device_owner,
arg_list=(portbindings.HOST_ID,),
**host_arg) as port:
response = self._create_port_binding(self.fmt, port['port']['id'],
self.host)
self.assertEqual(webob.exc.HTTPConflict.code,
response.status_int)
def test_create_port_binding_failure(self):
device_owner = '%s%s' % (const.DEVICE_OWNER_COMPUTE_PREFIX, 'nova')
with self.port(device_owner=device_owner) as port:
port_id = port['port']['id']
response = self._create_port_binding(self.fmt, port_id,
'host-fail')
self.assertEqual(webob.exc.HTTPInternalServerError.code,
response.status_int)
self.assertTrue