ML2: Bind ports outside transactions

The ML2 plugin now calls the bind_port() operation on the registered
mechanism drivers outside of any enclosing DB transaction. Ports are
created or updated in one transaction, then a binding is established
if possible, and finally a second transaction commits the binding
result.

With [re]binding moved outside the DB transaction that triggered it,
it is now possible that multiple threads or processes will
concurrently try to bind the same port, or that the port will be
updated between transactions. Concurrent attempts to bind the same
port are allowed to proceed, which results are used is resolved in the
second transaction, and binding is retried if necessary.

Improvements to the Cisco Nexus driver and unit tests from Rich Curran
needed due to the binding changes are also included.

Closes-Bug: 1276391
Closes-Bug: 1335226
Change-Id: I65dafc330d6e812dad0667d2383858504d0ba299
This commit is contained in:
Robert Kukura
2014-03-11 21:54:35 -04:00
parent 019444170b
commit bc2076304a
12 changed files with 395 additions and 195 deletions

View File

@@ -12,7 +12,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from eventlet import greenthread
from oslo.config import cfg
from oslo.db import exception as os_db_exception
@@ -58,6 +60,8 @@ from neutron.plugins.ml2 import rpc
LOG = log.getLogger(__name__)
MAX_BIND_TRIES = 10
# REVISIT(rkukura): Move this and other network_type constants to
# providernet.py?
TYPE_MULTI_SEGMENT = 'multi-segment'
@@ -211,69 +215,200 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _process_port_binding(self, mech_context, attrs):
binding = mech_context._binding
port = mech_context.current
self._update_port_dict_binding(port, binding)
changes = False
host = attrs and attrs.get(portbindings.HOST_ID)
host_set = attributes.is_attr_set(host)
if (attributes.is_attr_set(host) and
binding.host != host):
binding.host = host
changes = True
vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
vnic_type_set = attributes.is_attr_set(vnic_type)
# CLI can't send {}, so treat None as {}
profile = attrs and attrs.get(portbindings.PROFILE)
profile_set = profile is not attributes.ATTR_NOT_SPECIFIED
if profile_set and not profile:
profile = {}
if binding.vif_type != portbindings.VIF_TYPE_UNBOUND:
if (not host_set and not vnic_type_set and not profile_set and
binding.segment):
return False
self._delete_port_binding(mech_context)
# Return True only if an agent notification is needed.
# This will happen if a new host, vnic_type, or profile was specified
# that differs from the current one. Note that host_set is True
# even if the host is an empty string
ret_value = ((host_set and binding.get('host') != host) or
(vnic_type_set and
binding.get('vnic_type') != vnic_type) or
(profile_set and self._get_profile(binding) != profile))
if host_set:
binding.host = host
port[portbindings.HOST_ID] = host
if vnic_type_set:
if (attributes.is_attr_set(vnic_type) and
binding.vnic_type != vnic_type):
binding.vnic_type = vnic_type
port[portbindings.VNIC_TYPE] = vnic_type
changes = True
if profile_set:
# CLI can't send {}, so treat None as {}.
profile = attrs and attrs.get(portbindings.PROFILE) or {}
if (profile is not attributes.ATTR_NOT_SPECIFIED and
self._get_profile(binding) != profile):
binding.profile = jsonutils.dumps(profile)
if len(binding.profile) > models.BINDING_PROFILE_LEN:
msg = _("binding:profile value too large")
raise exc.InvalidInput(error_message=msg)
port[portbindings.PROFILE] = profile
changes = True
# To try to [re]bind if host is non-empty.
if binding.host:
self.mechanism_manager.bind_port(mech_context)
self._update_port_dict_binding(port, binding)
# Unbind the port if needed.
if changes:
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
binding.vif_details = ''
binding.driver = None
binding.segment = None
# Update the port status if requested by the bound driver.
if binding.segment and mech_context._new_port_status:
# REVISIT(rkukura): This function is currently called
# inside a transaction with the port either newly
# created or locked for update. After the fix for bug
# 1276391 is merged, this will no longer be true, and
# the port status update will need to be handled in
# the transaction that commits the new binding.
port_db = db.get_port(mech_context._plugin_context.session,
port['id'])
port_db.status = mech_context._new_port_status
port['status'] = mech_context._new_port_status
self._update_port_dict_binding(port, binding)
return changes
return ret_value
def _bind_port_if_needed(self, context, allow_notify=False,
need_notify=False):
plugin_context = context._plugin_context
port_id = context._port['id']
# Since the mechanism driver bind_port() calls must be made
# outside a DB transaction locking the port state, it is
# possible (but unlikely) that the port's state could change
# concurrently while these calls are being made. If another
# thread or process succeeds in binding the port before this
# thread commits its results, the already commited results are
# used. If attributes such as binding:host_id,
# binding:profile, or binding:vnic_type are updated
# concurrently, this loop retries binding using the new
# values.
count = 0
while True:
# First, determine whether it is necessary and possible to
# bind the port.
binding = context._binding
if (binding.vif_type != portbindings.VIF_TYPE_UNBOUND
or not binding.host):
# We either don't need to bind the port, or can't, so
# notify if needed and return.
if allow_notify and need_notify:
self._notify_port_updated(context)
return context
# Limit binding attempts to avoid any possibility of
# infinite looping and to ensure an error is logged
# instead. This does not need to be tunable because no
# more than a couple attempts should ever be required in
# normal operation. Log at info level if not 1st attempt.
count += 1
if count > MAX_BIND_TRIES:
LOG.error(_("Failed to commit binding results for %(port)s "
"after %(max)s tries"),
{'port': port_id, 'max': MAX_BIND_TRIES})
return context
if count > 1:
greenthread.sleep(0) # yield
LOG.info(_("Attempt %(count)s to bind port %(port)s"),
{'count': count, 'port': port_id})
# The port isn't already bound and the necessary
# information is available, so attempt to bind the port.
bind_context = self._bind_port(context)
# Now try to commit result of attempting to bind the port.
new_context, did_commit = self._commit_port_binding(
plugin_context, port_id, binding, bind_context)
if not new_context:
# The port has been deleted concurrently, so just
# return the unbound result from the initial
# transaction that completed before the deletion.
return context._port
# Need to notify if we succeed and our results were
# committed.
if did_commit and (new_context._binding.vif_type !=
portbindings.VIF_TYPE_BINDING_FAILED):
need_notify = True
context = new_context
def _bind_port(self, orig_context):
# Construct a new PortContext from the one from the previous
# transaction.
port = orig_context._port
orig_binding = orig_context._binding
new_binding = models.PortBinding(
host=orig_binding.host,
vnic_type=orig_binding.vnic_type,
profile=orig_binding.profile,
vif_type=portbindings.VIF_TYPE_UNBOUND,
vif_details=''
)
self._update_port_dict_binding(port, new_binding)
new_context = driver_context.PortContext(
self, orig_context._plugin_context, port,
orig_context._network_context._network, new_binding)
# Attempt to bind the port and return the context with the
# result.
self.mechanism_manager.bind_port(new_context)
return new_context
def _commit_port_binding(self, plugin_context, port_id, orig_binding,
new_context):
session = plugin_context.session
new_binding = new_context._binding
# After we've attempted to bind the port, we begin a
# transaction, get the current port state, and decide whether
# to commit the binding results.
#
# REVISIT: Serialize this operation with a semaphore to
# prevent deadlock waiting to acquire a DB lock held by
# another thread in the same process, leading to 'lock wait
# timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
# Get the current port state and build a new PortContext
# reflecting this state as original state for subsequent
# mechanism driver update_port_*commit() calls.
port_db, cur_binding = db.get_locked_port_and_binding(session,
port_id)
if not port_db:
# The port has been deleted concurrently.
return
oport = self._make_port_dict(port_db)
port = self._make_port_dict(port_db)
network = self.get_network(plugin_context, port['network_id'])
cur_context = driver_context.PortContext(
self, plugin_context, port, network, cur_binding,
original_port=oport)
# Commit our binding results only if port has not been
# successfully bound concurrently by another thread or
# process and no binding inputs have been changed.
commit = ((cur_binding.vif_type in
[portbindings.VIF_TYPE_UNBOUND,
portbindings.VIF_TYPE_BINDING_FAILED]) and
orig_binding.host == cur_binding.host and
orig_binding.vnic_type == cur_binding.vnic_type and
orig_binding.profile == cur_binding.profile)
if commit:
# Update the port's binding state with our binding
# results.
cur_binding.vif_type = new_binding.vif_type
cur_binding.vif_details = new_binding.vif_details
cur_binding.driver = new_binding.driver
cur_binding.segment = new_binding.segment
# REVISIT(rkukura): The binding:profile attribute is
# supposed to be input-only, but the Mellanox driver
# currently modifies it while binding. Remove this
# code when the Mellanox driver has been updated to
# use binding:vif_details instead.
if cur_binding.profile != new_binding.profile:
cur_binding.profile = new_binding.profile
# Update PortContext's port dictionary to reflect the
# updated binding state.
self._update_port_dict_binding(port, cur_binding)
# Update the port status if requested by the bound driver.
if new_binding.segment and new_context._new_port_status:
port_db.status = new_context._new_port_status
port['status'] = new_context._new_port_status
# Call the mechanism driver precommit methods, commit
# the results, and call the postcommit methods.
self.mechanism_manager.update_port_precommit(cur_context)
if commit:
self.mechanism_manager.update_port_postcommit(cur_context)
# Continue, using the port state as of the transaction that
# just finished, whether that transaction committed new
# results or discovered concurrent port state changes.
return (cur_context, commit)
def _update_port_dict_binding(self, port, binding):
port[portbindings.HOST_ID] = binding.host
@@ -304,15 +439,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
'port': binding.port_id})
return {}
def _delete_port_binding(self, mech_context):
binding = mech_context._binding
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
binding.vif_details = ''
binding.driver = None
binding.segment = None
port = mech_context.current
self._update_port_dict_binding(port, binding)
def _ml2_extend_port_dict_binding(self, port_res, port_db):
# None when called during unit tests for other plugins.
if port_db.port_binding:
@@ -457,9 +583,18 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
session = context.session
while True:
try:
with session.begin(subtransactions=True):
# REVISIT(rkukura): Its not clear that
# with_lockmode('update') is really needed in this
# transaction, and if not, the semaphore can also be
# removed.
#
# REVISIT: Serialize this operation with a semaphore
# to prevent deadlock waiting to acquire a DB lock
# held by another thread in the same process, leading
# to 'lock wait timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
self._process_l3_delete(context, id)
# Get ports to auto-delete.
ports = (session.query(models_v2.Port).
enable_eagerloads(False).
@@ -577,7 +712,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.debug(_("Deleting subnet %s"), id)
session = context.session
while True:
with session.begin(subtransactions=True):
# REVISIT: Serialize this operation with a semaphore to
# prevent deadlock waiting to acquire a DB lock held by
# another thread in the same process, leading to 'lock
# wait timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
subnet = self.get_subnet(context, id)
# Get ports to auto-deallocate
allocated = (session.query(models_v2.IPAllocation).
@@ -644,8 +784,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
result = super(Ml2Plugin, self).create_port(context, port)
self._process_port_create_security_group(context, result, sgids)
network = self.get_network(context, result['network_id'])
binding = db.add_port_binding(session, result['id'])
mech_context = driver_context.PortContext(self, context, result,
network)
network, binding)
self._process_port_binding(mech_context, attrs)
result[addr_pair.ADDRESS_PAIRS] = (
self._process_create_allowed_address_pairs(
@@ -662,20 +803,34 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.error(_("mechanism_manager.create_port_postcommit "
"failed, deleting port '%s'"), result['id'])
self.delete_port(context, result['id'])
# REVISIT(rkukura): Is there any point in calling this before
# a binding has been succesfully established?
self.notify_security_groups_member_updated(context, result)
return result
try:
bound_context = self._bind_port_if_needed(mech_context)
except ml2_exc.MechanismDriverError:
with excutils.save_and_reraise_exception():
LOG.error(_("_bind_port_if_needed "
"failed, deleting port '%s'"), result['id'])
self.delete_port(context, result['id'])
return bound_context._port
def update_port(self, context, id, port):
attrs = port['port']
need_port_update_notify = False
session = context.session
with session.begin(subtransactions=True):
try:
port_db = (session.query(models_v2.Port).
enable_eagerloads(False).
filter_by(id=id).with_lockmode('update').one())
except sa_exc.NoResultFound:
# REVISIT: Serialize this operation with a semaphore to
# prevent deadlock waiting to acquire a DB lock held by
# another thread in the same process, leading to 'lock wait
# timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
port_db, binding = db.get_locked_port_and_binding(session, id)
if not port_db:
raise exc.PortNotFound(port_id=id)
original_port = self._make_port_dict(port_db)
updated_port = super(Ml2Plugin, self).update_port(context, id,
@@ -691,7 +846,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
need_port_update_notify |= self._update_extra_dhcp_opts_on_port(
context, id, port, updated_port)
mech_context = driver_context.PortContext(
self, context, updated_port, network,
self, context, updated_port, network, binding,
original_port=original_port)
need_port_update_notify |= self._process_port_binding(
mech_context, attrs)
@@ -709,10 +864,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if original_port['admin_state_up'] != updated_port['admin_state_up']:
need_port_update_notify = True
if need_port_update_notify:
self._notify_port_updated(mech_context)
return updated_port
bound_port = self._bind_port_if_needed(
mech_context,
allow_notify=True,
need_notify=need_port_update_notify)
return bound_port._port
def delete_port(self, context, id, l3_port_check=True):
LOG.debug(_("Deleting port %s"), id)
@@ -722,15 +878,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
l3plugin.prevent_l3_port_deletion(context, id)
session = context.session
# REVISIT: Serialize this operation with a semaphore to prevent
# undesired eventlet yields leading to 'lock wait timeout' errors
# REVISIT: Serialize this operation with a semaphore to
# prevent deadlock waiting to acquire a DB lock held by
# another thread in the same process, leading to 'lock wait
# timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
try:
port_db = (session.query(models_v2.Port).
enable_eagerloads(False).
filter_by(id=id).with_lockmode('update').one())
except sa_exc.NoResultFound:
port_db, binding = db.get_locked_port_and_binding(session, id)
if not port_db:
# the port existed when l3plugin.prevent_l3_port_deletion
# was called but now is already gone
LOG.debug(_("The port '%s' was deleted"), id)
@@ -739,7 +894,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
network = self.get_network(context, port['network_id'])
mech_context = driver_context.PortContext(self, context, port,
network)
network, binding)
self.mechanism_manager.delete_port_precommit(mech_context)
self._delete_port_security_group_bindings(context, id)
LOG.debug(_("Calling base delete_port"))
@@ -762,11 +917,34 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.error(_("mechanism_manager.delete_port_postcommit failed"))
self.notify_security_groups_member_updated(context, port)
def get_bound_port_context(self, plugin_context, port_id):
session = plugin_context.session
with session.begin(subtransactions=True):
try:
port_db = (session.query(models_v2.Port).
enable_eagerloads(False).
filter(models_v2.Port.id.startswith(port_id)).
one())
except sa_exc.NoResultFound:
return
except exc.MultipleResultsFound:
LOG.error(_("Multiple ports have port_id starting with %s"),
port_id)
return
port = self._make_port_dict(port_db)
network = self.get_network(plugin_context, port['network_id'])
port_context = driver_context.PortContext(
self, plugin_context, port, network, port_db.port_binding)
return self._bind_port_if_needed(port_context)
def update_port_status(self, context, port_id, status):
updated = False
session = context.session
# REVISIT: Serialize this operation with a semaphore to prevent
# undesired eventlet yields leading to 'lock wait timeout' errors
# REVISIT: Serialize this operation with a semaphore to
# prevent deadlock waiting to acquire a DB lock held by
# another thread in the same process, leading to 'lock wait
# timeout' errors.
with contextlib.nested(lockutils.lock('db-access'),
session.begin(subtransactions=True)):
port = db.get_port(session, port_id)
@@ -781,7 +959,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
network = self.get_network(context,
original_port['network_id'])
mech_context = driver_context.PortContext(
self, context, updated_port, network,
self, context, updated_port, network, port.port_binding,
original_port=original_port)
self.mechanism_manager.update_port_precommit(mech_context)
updated = True