Browse Source

ML2: Bind ports outside transactions

The ML2 plugin now calls the bind_port() operation on the registered
mechanism drivers outside of any enclosing DB transaction. Ports are
created or updated in one transaction, then a binding is established
if possible, and finally a second transaction commits the binding
result.

With [re]binding moved outside the DB transaction that triggered it,
it is now possible that multiple threads or processes will
concurrently try to bind the same port, or that the port will be
updated between transactions. Concurrent attempts to bind the same
port are allowed to proceed, which results are used is resolved in the
second transaction, and binding is retried if necessary.

Improvements to the Cisco Nexus driver and unit tests from Rich Curran
needed due to the binding changes are also included.

Closes-Bug: 1276391
Closes-Bug: 1335226
Change-Id: I65dafc330d6e812dad0667d2383858504d0ba299
tags/2014.2.b2
Robert Kukura 6 years ago
parent
commit
b1677dcb80
12 changed files with 402 additions and 202 deletions
  1. +32
    -10
      neutron/plugins/ml2/db.py
  2. +16
    -5
      neutron/plugins/ml2/driver_api.py
  3. +2
    -3
      neutron/plugins/ml2/driver_context.py
  4. +5
    -4
      neutron/plugins/ml2/drivers/mech_agent.py
  5. +2
    -2
      neutron/plugins/ml2/drivers/mech_bigswitch/driver.py
  6. +2
    -3
      neutron/plugins/ml2/managers.py
  7. +267
    -89
      neutron/plugins/ml2/plugin.py
  8. +38
    -64
      neutron/plugins/ml2/rpc.py
  9. +34
    -9
      neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py
  10. +2
    -1
      neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_nexus.py
  11. +1
    -11
      neutron/tests/unit/ml2/drivers/mechanism_test.py
  12. +1
    -1
      neutron/tests/unit/ml2/drivers/test_bigswitch_mech.py

+ 32
- 10
neutron/plugins/ml2/db.py View File

@@ -56,20 +56,38 @@ def get_network_segments(session, network_id):
for record in records]


def ensure_port_binding(session, port_id):
def add_port_binding(session, port_id):
with session.begin(subtransactions=True):
try:
record = (session.query(models.PortBinding).
filter_by(port_id=port_id).
one())
except exc.NoResultFound:
record = models.PortBinding(
port_id=port_id,
vif_type=portbindings.VIF_TYPE_UNBOUND)
session.add(record)
record = models.PortBinding(
port_id=port_id,
vif_type=portbindings.VIF_TYPE_UNBOUND)
session.add(record)
return record


def get_locked_port_and_binding(session, port_id):
"""Get port and port binding records for update within transaction."""

try:
# REVISIT(rkukura): We need the Port and PortBinding records
# to both be added to the session and locked for update. A
# single joined query should work, but the combination of left
# outer joins and postgresql doesn't seem to work.
port = (session.query(models_v2.Port).
enable_eagerloads(False).
filter_by(id=port_id).
with_lockmode('update').
one())
binding = (session.query(models.PortBinding).
enable_eagerloads(False).
filter_by(port_id=port_id).
with_lockmode('update').
one())
return port, binding
except exc.NoResultFound:
return None, None


def get_port(session, port_id):
"""Get port record for update within transcation."""

@@ -133,4 +151,8 @@ def get_port_binding_host(port_id):
LOG.debug(_("No binding found for port %(port_id)s"),
{'port_id': port_id})
return
except exc.MultipleResultsFound:
LOG.error(_("Multiple ports have port_id starting with %s"),
port_id)
return
return query.host

+ 16
- 5
neutron/plugins/ml2/driver_api.py View File

@@ -588,10 +588,21 @@ class MechanismDriver(object):

:param context: PortContext instance describing the port

Called inside transaction context on session, prior to
create_port_precommit or update_port_precommit, to
attempt to establish a port binding. If the driver is able to
bind the port, it calls context.set_binding with the binding
details.
Called outside any transaction to attempt to establish a port
binding using this mechanism driver. If the driver is able to
bind the port, it must call context.set_binding() with the
binding details. If the binding results are committed after
bind_port() returns, they will be seen by all mechanism
drivers as update_port_precommit() and
update_port_postcommit() calls.

Note that if some other thread or process concurrently binds
or updates the port, these binding results will not be
committed, and update_port_precommit() and
update_port_postcommit() will not be called on the mechanism
drivers with these results. Because binding results can be
discarded rather than committed, drivers should avoid making
persistent state changes in bind_port(), or else must ensure
that such state changes are eventually cleaned up.
"""
pass

+ 2
- 3
neutron/plugins/ml2/driver_context.py View File

@@ -69,15 +69,14 @@ class SubnetContext(MechanismDriverContext, api.SubnetContext):

class PortContext(MechanismDriverContext, api.PortContext):

def __init__(self, plugin, plugin_context, port, network,
def __init__(self, plugin, plugin_context, port, network, binding,
original_port=None):
super(PortContext, self).__init__(plugin, plugin_context)
self._port = port
self._original_port = original_port
self._network_context = NetworkContext(plugin, plugin_context,
network)
self._binding = db.ensure_port_binding(plugin_context.session,
port['id'])
self._binding = binding
if original_port:
self._original_bound_segment_id = self._binding.segment
self._original_bound_driver = self._binding.driver


+ 5
- 4
neutron/plugins/ml2/drivers/mech_agent.py View File

@@ -82,10 +82,11 @@ class AgentMechanismDriverBase(api.MechanismDriver):
:param agent: agents_db entry describing agent to bind
:returns: True iff segment has been bound for agent

Called inside transaction during bind_port() so that derived
MechanismDrivers can use agent_db data along with built-in
knowledge of the corresponding agent's capabilities to attempt
to bind to the specified network segment for the agent.
Called outside any transaction during bind_port() so that
derived MechanismDrivers can use agent_db data along with
built-in knowledge of the corresponding agent's capabilities
to attempt to bind to the specified network segment for the
agent.

If the segment can be bound for the agent, this function must
call context.set_binding() with appropriate values and then


+ 2
- 2
neutron/plugins/ml2/drivers/mech_bigswitch/driver.py View File

@@ -85,8 +85,8 @@ class BigSwitchMechanismDriver(plugin.NeutronRestProxyV2Base,
port = self._prepare_port_for_controller(context)
if port:
try:
self.servers.rest_update_port(port["network"]["tenant_id"],
port["network"]["id"], port)
self.async_port_create(port["network"]["tenant_id"],
port["network"]["id"], port)
except servermanager.RemoteRestError as e:
with excutils.save_and_reraise_exception() as ctxt:
if (cfg.CONF.RESTPROXY.auto_sync_on_failure and


+ 2
- 3
neutron/plugins/ml2/managers.py View File

@@ -439,9 +439,8 @@ class MechanismManager(stevedore.named.NamedExtensionManager):

:param context: PortContext instance describing the port

Called inside transaction context on session, prior to
create_port_precommit or update_port_precommit, to
attempt to establish a port binding.
Called outside any transaction to attempt to establish a port
binding.
"""
binding = context._binding
LOG.debug(_("Attempting to bind port %(port)s on host %(host)s "


+ 267
- 89
neutron/plugins/ml2/plugin.py View File

@@ -12,7 +12,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import contextlib
from eventlet import greenthread

from oslo.config import cfg
from oslo.db import exception as os_db_exception
@@ -58,6 +60,8 @@ from neutron.plugins.ml2 import rpc

LOG = log.getLogger(__name__)

MAX_BIND_TRIES = 10

# REVISIT(rkukura): Move this and other network_type constants to
# providernet.py?
TYPE_MULTI_SEGMENT = 'multi-segment'
@@ -211,69 +215,200 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _process_port_binding(self, mech_context, attrs):
binding = mech_context._binding
port = mech_context.current
self._update_port_dict_binding(port, binding)
changes = False

host = attrs and attrs.get(portbindings.HOST_ID)
host_set = attributes.is_attr_set(host)

vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
vnic_type_set = attributes.is_attr_set(vnic_type)

# CLI can't send {}, so treat None as {}
profile = attrs and attrs.get(portbindings.PROFILE)
profile_set = profile is not attributes.ATTR_NOT_SPECIFIED
if profile_set and not profile:
profile = {}

if binding.vif_type != portbindings.VIF_TYPE_UNBOUND:
if (not host_set and not vnic_type_set and not profile_set and
binding.segment):
return False
self._delete_port_binding(mech_context)

# Return True only if an agent notification is needed.
# This will happen if a new host, vnic_type, or profile was specified
# that differs from the current one. Note that host_set is True
# even if the host is an empty string
ret_value = ((host_set and binding.get('host') != host) or
(vnic_type_set and
binding.get('vnic_type') != vnic_type) or
(profile_set and self._get_profile(binding) != profile))

if host_set:
if (attributes.is_attr_set(host) and
binding.host != host):
binding.host = host
port[portbindings.HOST_ID] = host
changes = True

if vnic_type_set:
vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
if (attributes.is_attr_set(vnic_type) and
binding.vnic_type != vnic_type):
binding.vnic_type = vnic_type
port[portbindings.VNIC_TYPE] = vnic_type
changes = True

if profile_set:
# CLI can't send {}, so treat None as {}.
profile = attrs and attrs.get(portbindings.PROFILE) or {}
if (profile is not attributes.ATTR_NOT_SPECIFIED and
self._get_profile(binding) != profile):
binding.profile = jsonutils.dumps(profile)
if len(binding.profile) > models.BINDING_PROFILE_LEN:
msg = _("binding:profile value too large")
raise exc.InvalidInput(error_message=msg)
port[portbindings.PROFILE] = profile

# To try to [re]bind if host is non-empty.
if binding.host:
self.mechanism_manager.bind_port(mech_context)
self._update_port_dict_binding(port, binding)

# Update the port status if requested by the bound driver.
if binding.segment and mech_context._new_port_status:
# REVISIT(rkukura): This function is currently called
# inside a transaction with the port either newly
# created or locked for update. After the fix for bug
# 1276391 is merged, this will no longer be true, and
# the port status update will need to be handled in
# the transaction that commits the new binding.
port_db = db.get_port(mech_context._plugin_context.session,
port['id'])
port_db.status = mech_context._new_port_status
port['status'] = mech_context._new_port_status

return ret_value
changes = True

# Unbind the port if needed.
if changes:
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
binding.vif_details = ''
binding.driver = None
binding.segment = None

self._update_port_dict_binding(port, binding)
return changes

def _bind_port_if_needed(self, context, allow_notify=False,
need_notify=False):
plugin_context = context._plugin_context
port_id = context._port['id']

# Since the mechanism driver bind_port() calls must be made
# outside a DB transaction locking the port state, it is
# possible (but unlikely) that the port's state could change
# concurrently while these calls are being made. If another
# thread or process succeeds in binding the port before this
# thread commits its results, the already commited results are
# used. If attributes such as binding:host_id,
# binding:profile, or binding:vnic_type are updated
# concurrently, this loop retries binding using the new
# values.
count = 0
while True:
# First, determine whether it is necessary and possible to
# bind the port.
binding = context._binding
if (binding.vif_type != portbindings.VIF_TYPE_UNBOUND
or not binding.host):
# We either don't need to bind the port, or can't, so
# notify if needed and return.
if allow_notify and need_notify:
self._notify_port_updated(context)
return context

# Limit binding attempts to avoid any possibility of
# infinite looping and to ensure an error is logged
# instead. This does not need to be tunable because no
# more than a couple attempts should ever be required in
# normal operation. Log at info level if not 1st attempt.
count += 1
if count > MAX_BIND_TRIES:
LOG.error(_("Failed to commit binding results for %(port)s "
"after %(max)s tries"),
{'port': port_id, 'max': MAX_BIND_TRIES})
return context
if count > 1:
greenthread.sleep(0) # yield
LOG.info(_("Attempt %(count)s to bind port %(port)s"),
{'count': count, 'port': port_id})

# The port isn't already bound and the necessary
# information is available, so attempt to bind the port.
bind_context = self._bind_port(context)

# Now try to commit result of attempting to bind the port.
new_context, did_commit = self._commit_port_binding(
plugin_context, port_id, binding, bind_context)
if not new_context:
# The port has been deleted concurrently, so just
# return the unbound result from the initial
# transaction that completed before the deletion.
return context._port
# Need to notify if we succeed and our results were
# committed.
if did_commit and (new_context._binding.vif_type !=
portbindings.VIF_TYPE_BINDING_FAILED):
need_notify = True
context = new_context

def _bind_port(self, orig_context):
# Construct a new PortContext from the one from the previous
# transaction.
port = orig_context._port
orig_binding = orig_context._binding
new_binding = models.PortBinding(
host=orig_binding.host,
vnic_type=orig_binding.vnic_type,
profile=orig_binding.profile,
vif_type=portbindings.VIF_TYPE_UNBOUND,
vif_details=''
)
self._update_port_dict_binding(port, new_binding)
new_context = driver_context.PortContext(
self, orig_context._plugin_context, port,
orig_context._network_context._network, new_binding)

# Attempt to bind the port and return the context with the
# result.
self.mechanism_manager.bind_port(new_context)
return new_context

def _commit_port_binding(self, plugin_context, port_id, orig_binding,
new_context):
session = plugin_context.session
new_binding = new_context._binding

# After we've attempted to bind the port, we begin a
# transaction, get the current port state, and decide whether
# to commit the binding results.
#
# REVISIT: Serialize this operation with a semaphore to
# prevent deadlock waiting to acquire a DB lock held by
# another thread in the same process, leading to 'lock wait
# timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
# Get the current port state and build a new PortContext
# reflecting this state as original state for subsequent
# mechanism driver update_port_*commit() calls.
port_db, cur_binding = db.get_locked_port_and_binding(session,
port_id)
if not port_db:
# The port has been deleted concurrently.
return
oport = self._make_port_dict(port_db)
port = self._make_port_dict(port_db)
network = self.get_network(plugin_context, port['network_id'])
cur_context = driver_context.PortContext(
self, plugin_context, port, network, cur_binding,
original_port=oport)

# Commit our binding results only if port has not been
# successfully bound concurrently by another thread or
# process and no binding inputs have been changed.
commit = ((cur_binding.vif_type in
[portbindings.VIF_TYPE_UNBOUND,
portbindings.VIF_TYPE_BINDING_FAILED]) and
orig_binding.host == cur_binding.host and
orig_binding.vnic_type == cur_binding.vnic_type and
orig_binding.profile == cur_binding.profile)

if commit:
# Update the port's binding state with our binding
# results.
cur_binding.vif_type = new_binding.vif_type
cur_binding.vif_details = new_binding.vif_details
cur_binding.driver = new_binding.driver
cur_binding.segment = new_binding.segment

# REVISIT(rkukura): The binding:profile attribute is
# supposed to be input-only, but the Mellanox driver
# currently modifies it while binding. Remove this
# code when the Mellanox driver has been updated to
# use binding:vif_details instead.
if cur_binding.profile != new_binding.profile:
cur_binding.profile = new_binding.profile

# Update PortContext's port dictionary to reflect the
# updated binding state.
self._update_port_dict_binding(port, cur_binding)

# Update the port status if requested by the bound driver.
if new_binding.segment and new_context._new_port_status:
port_db.status = new_context._new_port_status
port['status'] = new_context._new_port_status

# Call the mechanism driver precommit methods, commit
# the results, and call the postcommit methods.
self.mechanism_manager.update_port_precommit(cur_context)
if commit:
self.mechanism_manager.update_port_postcommit(cur_context)

# Continue, using the port state as of the transaction that
# just finished, whether that transaction committed new
# results or discovered concurrent port state changes.
return (cur_context, commit)

def _update_port_dict_binding(self, port, binding):
port[portbindings.HOST_ID] = binding.host
@@ -304,15 +439,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
'port': binding.port_id})
return {}

def _delete_port_binding(self, mech_context):
binding = mech_context._binding
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
binding.vif_details = ''
binding.driver = None
binding.segment = None
port = mech_context.current
self._update_port_dict_binding(port, binding)

def _ml2_extend_port_dict_binding(self, port_res, port_db):
# None when called during unit tests for other plugins.
if port_db.port_binding:
@@ -457,9 +583,18 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
session = context.session
while True:
try:
with session.begin(subtransactions=True):
# REVISIT(rkukura): Its not clear that
# with_lockmode('update') is really needed in this
# transaction, and if not, the semaphore can also be
# removed.
#
# REVISIT: Serialize this operation with a semaphore
# to prevent deadlock waiting to acquire a DB lock
# held by another thread in the same process, leading
# to 'lock wait timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
self._process_l3_delete(context, id)

# Get ports to auto-delete.
ports = (session.query(models_v2.Port).
enable_eagerloads(False).
@@ -577,7 +712,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.debug(_("Deleting subnet %s"), id)
session = context.session
while True:
with session.begin(subtransactions=True):
# REVISIT: Serialize this operation with a semaphore to
# prevent deadlock waiting to acquire a DB lock held by
# another thread in the same process, leading to 'lock
# wait timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
subnet = self.get_subnet(context, id)
# Get ports to auto-deallocate
allocated = (session.query(models_v2.IPAllocation).
@@ -644,8 +784,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
result = super(Ml2Plugin, self).create_port(context, port)
self._process_port_create_security_group(context, result, sgids)
network = self.get_network(context, result['network_id'])
binding = db.add_port_binding(session, result['id'])
mech_context = driver_context.PortContext(self, context, result,
network)
network, binding)
self._process_port_binding(mech_context, attrs)
result[addr_pair.ADDRESS_PAIRS] = (
self._process_create_allowed_address_pairs(
@@ -662,20 +803,34 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.error(_("mechanism_manager.create_port_postcommit "
"failed, deleting port '%s'"), result['id'])
self.delete_port(context, result['id'])

# REVISIT(rkukura): Is there any point in calling this before
# a binding has been succesfully established?
self.notify_security_groups_member_updated(context, result)
return result

try:
bound_context = self._bind_port_if_needed(mech_context)
except ml2_exc.MechanismDriverError:
with excutils.save_and_reraise_exception():
LOG.error(_("_bind_port_if_needed "
"failed, deleting port '%s'"), result['id'])
self.delete_port(context, result['id'])
return bound_context._port

def update_port(self, context, id, port):
attrs = port['port']
need_port_update_notify = False

session = context.session
with session.begin(subtransactions=True):
try:
port_db = (session.query(models_v2.Port).
enable_eagerloads(False).
filter_by(id=id).with_lockmode('update').one())
except sa_exc.NoResultFound:

# REVISIT: Serialize this operation with a semaphore to
# prevent deadlock waiting to acquire a DB lock held by
# another thread in the same process, leading to 'lock wait
# timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
port_db, binding = db.get_locked_port_and_binding(session, id)
if not port_db:
raise exc.PortNotFound(port_id=id)
original_port = self._make_port_dict(port_db)
updated_port = super(Ml2Plugin, self).update_port(context, id,
@@ -691,7 +846,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
need_port_update_notify |= self._update_extra_dhcp_opts_on_port(
context, id, port, updated_port)
mech_context = driver_context.PortContext(
self, context, updated_port, network,
self, context, updated_port, network, binding,
original_port=original_port)
need_port_update_notify |= self._process_port_binding(
mech_context, attrs)
@@ -709,10 +864,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if original_port['admin_state_up'] != updated_port['admin_state_up']:
need_port_update_notify = True

if need_port_update_notify:
self._notify_port_updated(mech_context)

return updated_port
bound_port = self._bind_port_if_needed(
mech_context,
allow_notify=True,
need_notify=need_port_update_notify)
return bound_port._port

def delete_port(self, context, id, l3_port_check=True):
LOG.debug(_("Deleting port %s"), id)
@@ -722,15 +878,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
l3plugin.prevent_l3_port_deletion(context, id)

session = context.session
# REVISIT: Serialize this operation with a semaphore to prevent
# undesired eventlet yields leading to 'lock wait timeout' errors
# REVISIT: Serialize this operation with a semaphore to
# prevent deadlock waiting to acquire a DB lock held by
# another thread in the same process, leading to 'lock wait
# timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
try:
port_db = (session.query(models_v2.Port).
enable_eagerloads(False).
filter_by(id=id).with_lockmode('update').one())
except sa_exc.NoResultFound:
port_db, binding = db.get_locked_port_and_binding(session, id)
if not port_db:
# the port existed when l3plugin.prevent_l3_port_deletion
# was called but now is already gone
LOG.debug(_("The port '%s' was deleted"), id)
@@ -739,7 +894,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,

network = self.get_network(context, port['network_id'])
mech_context = driver_context.PortContext(self, context, port,
network)
network, binding)
self.mechanism_manager.delete_port_precommit(mech_context)
self._delete_port_security_group_bindings(context, id)
LOG.debug(_("Calling base delete_port"))
@@ -762,11 +917,34 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.error(_("mechanism_manager.delete_port_postcommit failed"))
self.notify_security_groups_member_updated(context, port)

def get_bound_port_context(self, plugin_context, port_id):
session = plugin_context.session
with session.begin(subtransactions=True):
try:
port_db = (session.query(models_v2.Port).
enable_eagerloads(False).
filter(models_v2.Port.id.startswith(port_id)).
one())
except sa_exc.NoResultFound:
return
except exc.MultipleResultsFound:
LOG.error(_("Multiple ports have port_id starting with %s"),
port_id)
return
port = self._make_port_dict(port_db)
network = self.get_network(plugin_context, port['network_id'])
port_context = driver_context.PortContext(
self, plugin_context, port, network, port_db.port_binding)

return self._bind_port_if_needed(port_context)

def update_port_status(self, context, port_id, status):
updated = False
session = context.session
# REVISIT: Serialize this operation with a semaphore to prevent
# undesired eventlet yields leading to 'lock wait timeout' errors
# REVISIT: Serialize this operation with a semaphore to
# prevent deadlock waiting to acquire a DB lock held by
# another thread in the same process, leading to 'lock wait
# timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
port = db.get_port(session, port_id)
@@ -781,7 +959,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
network = self.get_network(context,
original_port['network_id'])
mech_context = driver_context.PortContext(
self, context, updated_port, network,
self, context, updated_port, network, port.port_binding,
original_port=original_port)
self.mechanism_manager.update_port_precommit(mech_context)
updated = True


+ 38
- 64
neutron/plugins/ml2/rpc.py View File

@@ -17,9 +17,9 @@ from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import api as db_api
from neutron.db import dhcp_rpc_base
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings
from neutron import manager
from neutron.openstack.common import log
from neutron.openstack.common import uuidutils
@@ -83,64 +83,43 @@ class RpcCallbacks(n_rpc.RpcCallback,
{'device': device, 'agent_id': agent_id})
port_id = self._device_to_port_id(device)

session = db_api.get_session()
with session.begin(subtransactions=True):
port = db.get_port(session, port_id)
if not port:
LOG.warning(_("Device %(device)s requested by agent "
"%(agent_id)s not found in database"),
{'device': device, 'agent_id': agent_id})
return {'device': device}

segments = db.get_network_segments(session, port.network_id)
if not segments:
LOG.warning(_("Device %(device)s requested by agent "
"%(agent_id)s has network %(network_id)s with "
"no segments"),
{'device': device,
'agent_id': agent_id,
'network_id': port.network_id})
return {'device': device}

binding = db.ensure_port_binding(session, port.id)
if not binding.segment:
LOG.warning(_("Device %(device)s requested by agent "
"%(agent_id)s on network %(network_id)s not "
"bound, vif_type: %(vif_type)s"),
{'device': device,
'agent_id': agent_id,
'network_id': port.network_id,
'vif_type': binding.vif_type})
return {'device': device}

segment = self._find_segment(segments, binding.segment)
if not segment:
LOG.warning(_("Device %(device)s requested by agent "
"%(agent_id)s on network %(network_id)s "
"invalid segment, vif_type: %(vif_type)s"),
{'device': device,
'agent_id': agent_id,
'network_id': port.network_id,
'vif_type': binding.vif_type})
return {'device': device}

new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up
else q_const.PORT_STATUS_DOWN)
if port.status != new_status:
plugin = manager.NeutronManager.get_plugin()
plugin.update_port_status(rpc_context,
port_id,
new_status)
port.status = new_status
entry = {'device': device,
'network_id': port.network_id,
'port_id': port.id,
'admin_state_up': port.admin_state_up,
'network_type': segment[api.NETWORK_TYPE],
'segmentation_id': segment[api.SEGMENTATION_ID],
'physical_network': segment[api.PHYSICAL_NETWORK]}
LOG.debug(_("Returning: %s"), entry)
return entry
plugin = manager.NeutronManager.get_plugin()
port_context = plugin.get_bound_port_context(rpc_context, port_id)
if not port_context:
LOG.warning(_("Device %(device)s requested by agent "
"%(agent_id)s not found in database"),
{'device': device, 'agent_id': agent_id})
return {'device': device}

segment = port_context.bound_segment
port = port_context.current

if not segment:
LOG.warning(_("Device %(device)s requested by agent "
"%(agent_id)s on network %(network_id)s not "
"bound, vif_type: %(vif_type)s"),
{'device': device,
'agent_id': agent_id,
'network_id': port['network_id'],
'vif_type': port[portbindings.VIF_TYPE]})
return {'device': device}

new_status = (q_const.PORT_STATUS_BUILD if port['admin_state_up']
else q_const.PORT_STATUS_DOWN)
if port['status'] != new_status:
plugin.update_port_status(rpc_context,
port_id,
new_status)

entry = {'device': device,
'network_id': port['network_id'],
'port_id': port_id,
'admin_state_up': port['admin_state_up'],
'network_type': segment[api.NETWORK_TYPE],
'segmentation_id': segment[api.SEGMENTATION_ID],
'physical_network': segment[api.PHYSICAL_NETWORK]}
LOG.debug(_("Returning: %s"), entry)
return entry

def get_devices_details_list(self, rpc_context, **kwargs):
return [
@@ -152,11 +131,6 @@ class RpcCallbacks(n_rpc.RpcCallback,
for device in kwargs.pop('devices', [])
]

def _find_segment(self, segments, segment_id):
for segment in segments:
if segment[api.ID] == segment_id:
return segment

def update_device_down(self, rpc_context, **kwargs):
"""Device no longer exists on agent."""
# TODO(garyk) - live migration and port status


+ 34
- 9
neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py View File

@@ -19,7 +19,6 @@ import mock
import webob.exc as wexc

from neutron.api.v2 import base
from neutron.common import constants as n_const
from neutron import context
from neutron.extensions import portbindings
from neutron import manager
@@ -123,15 +122,33 @@ class CiscoML2MechanismTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
new_callable=mock.PropertyMock).start()
self.mock_original_bound_segment.return_value = None

mock_status = mock.patch.object(
# Use _is_status_active method to determine bind state.
def _mock_check_bind_state(port_context):
if (port_context[portbindings.VIF_TYPE] !=
portbindings.VIF_TYPE_UNBOUND):
return True
else:
return False

self.mock_status = mock.patch.object(
mech_cisco_nexus.CiscoNexusMechanismDriver,
'_is_status_active').start()
mock_status.return_value = n_const.PORT_STATUS_ACTIVE
self.mock_status.side_effect = _mock_check_bind_state

super(CiscoML2MechanismTestCase, self).setUp(ML2_PLUGIN)

self.port_create_status = 'DOWN'

def _create_deviceowner_mock(self):
# Mock deviceowner method for UT's that expect update precommit
# failures. This allows control of delete_port_pre/postcommit()
# actions.
mock_deviceowner = mock.patch.object(
mech_cisco_nexus.CiscoNexusMechanismDriver,
'_is_deviceowner_compute').start()
mock_deviceowner.return_value = False
self.addCleanup(mock_deviceowner.stop)

@contextlib.contextmanager
def _patch_ncclient(self, attr, value):
"""Configure an attribute on the mock ncclient module.
@@ -223,7 +240,8 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase,
@contextlib.contextmanager
def _create_resources(self, name=NETWORK_NAME, cidr=CIDR_1,
device_id=DEVICE_ID_1,
host_id=COMP_HOST_NAME):
host_id=COMP_HOST_NAME,
expected_failure=False):
"""Create network, subnet, and port resources for test cases.

Create a network, subnet, port and then update the port, yield the
@@ -233,18 +251,23 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase,
:param cidr: cidr address of subnetwork to be created.
:param device_id: Device ID to use for port to be created/updated.
:param host_id: Host ID to use for port create/update.

:param expected_failure: Set to True when an update_port_precommit
failure is expected. Results in no actions being taken in
delete_port_pre/postcommit() methods.
"""
with self.network(name=name) as network:
with self.subnet(network=network, cidr=cidr) as subnet:
with self.port(subnet=subnet, cidr=cidr) as port:

data = {'port': {portbindings.HOST_ID: host_id,
'device_id': device_id,
'device_owner': 'compute:none',
'device_owner': DEVICE_OWNER,
'admin_state_up': True}}
req = self.new_update_request('ports', data,
port['port']['id'])
yield req.get_response(self.api)
if expected_failure:
self._create_deviceowner_mock()

def _assertExpectedHTTP(self, status, exc):
"""Confirm that an HTTP status corresponds to an expected exception.
@@ -578,7 +601,8 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase,
a fictitious host name during port creation.

"""
with self._create_resources(host_id='fake_host') as result:
with self._create_resources(host_id='fake_host',
expected_failure=True) as result:
self._assertExpectedHTTP(result.status_int,
c_exc.NexusComputeHostNotConfigured)

@@ -586,10 +610,11 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase,
"""Test handling of a NexusMissingRequiredFields exception.

Test the Cisco NexusMissingRequiredFields exception by using
empty host_id and device_id values during port creation.
empty device_id value during port creation.

"""
with self._create_resources(device_id='', host_id='') as result:
with self._create_resources(device_id='',
expected_failure=True) as result:
self._assertExpectedHTTP(result.status_int,
c_exc.NexusMissingRequiredFields)



+ 2
- 1
neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_nexus.py View File

@@ -74,7 +74,8 @@ class FakePortContext(object):
'status': PORT_STATE,
'device_id': device_id,
'device_owner': DEVICE_OWNER,
portbindings.HOST_ID: host_name
portbindings.HOST_ID: host_name,
portbindings.VIF_TYPE: portbindings.VIF_TYPE_OVS
}
self._network = network_context
self._segment = network_context.network_segments


+ 1
- 11
neutron/tests/unit/ml2/drivers/mechanism_test.py View File

@@ -142,17 +142,7 @@ class TestMechanismDriver(api.MechanismDriver):
self._check_port_context(context, False)

def bind_port(self, context):
# REVISIT(rkukura): The upcoming fix for bug 1276391 will
# ensure the MDs see the unbinding of the port as a port
# update prior to re-binding, at which point this should be
# removed.
self.bound_ports.discard(context.current['id'])

# REVISIT(rkukura): Currently, bind_port() is called as part
# of either a create or update transaction. The fix for bug
# 1276391 will change it to be called outside any transaction,
# so the context.original* will no longer be available.
self._check_port_context(context, context.original is not None)
self._check_port_context(context, False)

host = context.current.get(portbindings.HOST_ID, None)
segment = context.network.network_segments[0][api.ID]


+ 1
- 1
neutron/tests/unit/ml2/drivers/test_bigswitch_mech.py View File

@@ -112,7 +112,7 @@ class TestBigSwitchMechDriverPortsV2(test_db_plugin.TestPortsV2,

def test_udpate404_triggers_background_sync(self):
with contextlib.nested(
mock.patch(SERVER_POOL + '.rest_update_port',
mock.patch(DRIVER + '.async_port_create',
side_effect=servermanager.RemoteRestError(
reason=servermanager.NXNETWORK, status=404)),
mock.patch(DRIVER + '._send_all_data'),


Loading…
Cancel
Save