From b1677dcb80ce8b83aadb2180efad3527a96bd3bc Mon Sep 17 00:00:00 2001 From: Robert Kukura Date: Tue, 11 Mar 2014 21:54:35 -0400 Subject: [PATCH] ML2: Bind ports outside transactions The ML2 plugin now calls the bind_port() operation on the registered mechanism drivers outside of any enclosing DB transaction. Ports are created or updated in one transaction, then a binding is established if possible, and finally a second transaction commits the binding result. With [re]binding moved outside the DB transaction that triggered it, it is now possible that multiple threads or processes will concurrently try to bind the same port, or that the port will be updated between transactions. Concurrent attempts to bind the same port are allowed to proceed, which results are used is resolved in the second transaction, and binding is retried if necessary. Improvements to the Cisco Nexus driver and unit tests from Rich Curran needed due to the binding changes are also included. Closes-Bug: 1276391 Closes-Bug: 1335226 Change-Id: I65dafc330d6e812dad0667d2383858504d0ba299 --- neutron/plugins/ml2/db.py | 42 ++- neutron/plugins/ml2/driver_api.py | 21 +- neutron/plugins/ml2/driver_context.py | 5 +- neutron/plugins/ml2/drivers/mech_agent.py | 9 +- .../ml2/drivers/mech_bigswitch/driver.py | 4 +- neutron/plugins/ml2/managers.py | 5 +- neutron/plugins/ml2/plugin.py | 350 +++++++++++++----- neutron/plugins/ml2/rpc.py | 94 ++--- .../drivers/cisco/nexus/test_cisco_mech.py | 43 ++- .../drivers/cisco/nexus/test_cisco_nexus.py | 3 +- .../tests/unit/ml2/drivers/mechanism_test.py | 12 +- .../unit/ml2/drivers/test_bigswitch_mech.py | 2 +- 12 files changed, 395 insertions(+), 195 deletions(-) diff --git a/neutron/plugins/ml2/db.py b/neutron/plugins/ml2/db.py index 4cf8eed328b..176be08da2b 100644 --- a/neutron/plugins/ml2/db.py +++ b/neutron/plugins/ml2/db.py @@ -56,20 +56,38 @@ def get_network_segments(session, network_id): for record in records] -def ensure_port_binding(session, port_id): +def add_port_binding(session, port_id): with session.begin(subtransactions=True): - try: - record = (session.query(models.PortBinding). - filter_by(port_id=port_id). - one()) - except exc.NoResultFound: - record = models.PortBinding( - port_id=port_id, - vif_type=portbindings.VIF_TYPE_UNBOUND) - session.add(record) + record = models.PortBinding( + port_id=port_id, + vif_type=portbindings.VIF_TYPE_UNBOUND) + session.add(record) return record +def get_locked_port_and_binding(session, port_id): + """Get port and port binding records for update within transaction.""" + + try: + # REVISIT(rkukura): We need the Port and PortBinding records + # to both be added to the session and locked for update. A + # single joined query should work, but the combination of left + # outer joins and postgresql doesn't seem to work. + port = (session.query(models_v2.Port). + enable_eagerloads(False). + filter_by(id=port_id). + with_lockmode('update'). + one()) + binding = (session.query(models.PortBinding). + enable_eagerloads(False). + filter_by(port_id=port_id). + with_lockmode('update'). + one()) + return port, binding + except exc.NoResultFound: + return None, None + + def get_port(session, port_id): """Get port record for update within transcation.""" @@ -133,4 +151,8 @@ def get_port_binding_host(port_id): LOG.debug(_("No binding found for port %(port_id)s"), {'port_id': port_id}) return + except exc.MultipleResultsFound: + LOG.error(_("Multiple ports have port_id starting with %s"), + port_id) + return return query.host diff --git a/neutron/plugins/ml2/driver_api.py b/neutron/plugins/ml2/driver_api.py index 2384b0cf9d6..30dab693b0c 100644 --- a/neutron/plugins/ml2/driver_api.py +++ b/neutron/plugins/ml2/driver_api.py @@ -588,10 +588,21 @@ class MechanismDriver(object): :param context: PortContext instance describing the port - Called inside transaction context on session, prior to - create_port_precommit or update_port_precommit, to - attempt to establish a port binding. If the driver is able to - bind the port, it calls context.set_binding with the binding - details. + Called outside any transaction to attempt to establish a port + binding using this mechanism driver. If the driver is able to + bind the port, it must call context.set_binding() with the + binding details. If the binding results are committed after + bind_port() returns, they will be seen by all mechanism + drivers as update_port_precommit() and + update_port_postcommit() calls. + + Note that if some other thread or process concurrently binds + or updates the port, these binding results will not be + committed, and update_port_precommit() and + update_port_postcommit() will not be called on the mechanism + drivers with these results. Because binding results can be + discarded rather than committed, drivers should avoid making + persistent state changes in bind_port(), or else must ensure + that such state changes are eventually cleaned up. """ pass diff --git a/neutron/plugins/ml2/driver_context.py b/neutron/plugins/ml2/driver_context.py index 0c1180619f8..08f9b12c932 100644 --- a/neutron/plugins/ml2/driver_context.py +++ b/neutron/plugins/ml2/driver_context.py @@ -69,15 +69,14 @@ class SubnetContext(MechanismDriverContext, api.SubnetContext): class PortContext(MechanismDriverContext, api.PortContext): - def __init__(self, plugin, plugin_context, port, network, + def __init__(self, plugin, plugin_context, port, network, binding, original_port=None): super(PortContext, self).__init__(plugin, plugin_context) self._port = port self._original_port = original_port self._network_context = NetworkContext(plugin, plugin_context, network) - self._binding = db.ensure_port_binding(plugin_context.session, - port['id']) + self._binding = binding if original_port: self._original_bound_segment_id = self._binding.segment self._original_bound_driver = self._binding.driver diff --git a/neutron/plugins/ml2/drivers/mech_agent.py b/neutron/plugins/ml2/drivers/mech_agent.py index d0aad3ae967..9bde154ba59 100644 --- a/neutron/plugins/ml2/drivers/mech_agent.py +++ b/neutron/plugins/ml2/drivers/mech_agent.py @@ -82,10 +82,11 @@ class AgentMechanismDriverBase(api.MechanismDriver): :param agent: agents_db entry describing agent to bind :returns: True iff segment has been bound for agent - Called inside transaction during bind_port() so that derived - MechanismDrivers can use agent_db data along with built-in - knowledge of the corresponding agent's capabilities to attempt - to bind to the specified network segment for the agent. + Called outside any transaction during bind_port() so that + derived MechanismDrivers can use agent_db data along with + built-in knowledge of the corresponding agent's capabilities + to attempt to bind to the specified network segment for the + agent. If the segment can be bound for the agent, this function must call context.set_binding() with appropriate values and then diff --git a/neutron/plugins/ml2/drivers/mech_bigswitch/driver.py b/neutron/plugins/ml2/drivers/mech_bigswitch/driver.py index c40b4a099fe..259b14673c8 100644 --- a/neutron/plugins/ml2/drivers/mech_bigswitch/driver.py +++ b/neutron/plugins/ml2/drivers/mech_bigswitch/driver.py @@ -85,8 +85,8 @@ class BigSwitchMechanismDriver(plugin.NeutronRestProxyV2Base, port = self._prepare_port_for_controller(context) if port: try: - self.servers.rest_update_port(port["network"]["tenant_id"], - port["network"]["id"], port) + self.async_port_create(port["network"]["tenant_id"], + port["network"]["id"], port) except servermanager.RemoteRestError as e: with excutils.save_and_reraise_exception() as ctxt: if (cfg.CONF.RESTPROXY.auto_sync_on_failure and diff --git a/neutron/plugins/ml2/managers.py b/neutron/plugins/ml2/managers.py index 13df6732e2f..d3518f39e7e 100644 --- a/neutron/plugins/ml2/managers.py +++ b/neutron/plugins/ml2/managers.py @@ -439,9 +439,8 @@ class MechanismManager(stevedore.named.NamedExtensionManager): :param context: PortContext instance describing the port - Called inside transaction context on session, prior to - create_port_precommit or update_port_precommit, to - attempt to establish a port binding. + Called outside any transaction to attempt to establish a port + binding. """ binding = context._binding LOG.debug(_("Attempting to bind port %(port)s on host %(host)s " diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 9407b31ca90..0eb7b809ebf 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -12,7 +12,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + import contextlib +from eventlet import greenthread from oslo.config import cfg from oslo.db import exception as os_db_exception @@ -58,6 +60,8 @@ from neutron.plugins.ml2 import rpc LOG = log.getLogger(__name__) +MAX_BIND_TRIES = 10 + # REVISIT(rkukura): Move this and other network_type constants to # providernet.py? TYPE_MULTI_SEGMENT = 'multi-segment' @@ -211,69 +215,200 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, def _process_port_binding(self, mech_context, attrs): binding = mech_context._binding port = mech_context.current - self._update_port_dict_binding(port, binding) + changes = False host = attrs and attrs.get(portbindings.HOST_ID) - host_set = attributes.is_attr_set(host) + if (attributes.is_attr_set(host) and + binding.host != host): + binding.host = host + changes = True vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE) - vnic_type_set = attributes.is_attr_set(vnic_type) - - # CLI can't send {}, so treat None as {} - profile = attrs and attrs.get(portbindings.PROFILE) - profile_set = profile is not attributes.ATTR_NOT_SPECIFIED - if profile_set and not profile: - profile = {} - - if binding.vif_type != portbindings.VIF_TYPE_UNBOUND: - if (not host_set and not vnic_type_set and not profile_set and - binding.segment): - return False - self._delete_port_binding(mech_context) - - # Return True only if an agent notification is needed. - # This will happen if a new host, vnic_type, or profile was specified - # that differs from the current one. Note that host_set is True - # even if the host is an empty string - ret_value = ((host_set and binding.get('host') != host) or - (vnic_type_set and - binding.get('vnic_type') != vnic_type) or - (profile_set and self._get_profile(binding) != profile)) - - if host_set: - binding.host = host - port[portbindings.HOST_ID] = host - - if vnic_type_set: + if (attributes.is_attr_set(vnic_type) and + binding.vnic_type != vnic_type): binding.vnic_type = vnic_type - port[portbindings.VNIC_TYPE] = vnic_type + changes = True - if profile_set: + # CLI can't send {}, so treat None as {}. + profile = attrs and attrs.get(portbindings.PROFILE) or {} + if (profile is not attributes.ATTR_NOT_SPECIFIED and + self._get_profile(binding) != profile): binding.profile = jsonutils.dumps(profile) if len(binding.profile) > models.BINDING_PROFILE_LEN: msg = _("binding:profile value too large") raise exc.InvalidInput(error_message=msg) - port[portbindings.PROFILE] = profile + changes = True - # To try to [re]bind if host is non-empty. - if binding.host: - self.mechanism_manager.bind_port(mech_context) - self._update_port_dict_binding(port, binding) + # Unbind the port if needed. + if changes: + binding.vif_type = portbindings.VIF_TYPE_UNBOUND + binding.vif_details = '' + binding.driver = None + binding.segment = None - # Update the port status if requested by the bound driver. - if binding.segment and mech_context._new_port_status: - # REVISIT(rkukura): This function is currently called - # inside a transaction with the port either newly - # created or locked for update. After the fix for bug - # 1276391 is merged, this will no longer be true, and - # the port status update will need to be handled in - # the transaction that commits the new binding. - port_db = db.get_port(mech_context._plugin_context.session, - port['id']) - port_db.status = mech_context._new_port_status - port['status'] = mech_context._new_port_status + self._update_port_dict_binding(port, binding) + return changes - return ret_value + def _bind_port_if_needed(self, context, allow_notify=False, + need_notify=False): + plugin_context = context._plugin_context + port_id = context._port['id'] + + # Since the mechanism driver bind_port() calls must be made + # outside a DB transaction locking the port state, it is + # possible (but unlikely) that the port's state could change + # concurrently while these calls are being made. If another + # thread or process succeeds in binding the port before this + # thread commits its results, the already commited results are + # used. If attributes such as binding:host_id, + # binding:profile, or binding:vnic_type are updated + # concurrently, this loop retries binding using the new + # values. + count = 0 + while True: + # First, determine whether it is necessary and possible to + # bind the port. + binding = context._binding + if (binding.vif_type != portbindings.VIF_TYPE_UNBOUND + or not binding.host): + # We either don't need to bind the port, or can't, so + # notify if needed and return. + if allow_notify and need_notify: + self._notify_port_updated(context) + return context + + # Limit binding attempts to avoid any possibility of + # infinite looping and to ensure an error is logged + # instead. This does not need to be tunable because no + # more than a couple attempts should ever be required in + # normal operation. Log at info level if not 1st attempt. + count += 1 + if count > MAX_BIND_TRIES: + LOG.error(_("Failed to commit binding results for %(port)s " + "after %(max)s tries"), + {'port': port_id, 'max': MAX_BIND_TRIES}) + return context + if count > 1: + greenthread.sleep(0) # yield + LOG.info(_("Attempt %(count)s to bind port %(port)s"), + {'count': count, 'port': port_id}) + + # The port isn't already bound and the necessary + # information is available, so attempt to bind the port. + bind_context = self._bind_port(context) + + # Now try to commit result of attempting to bind the port. + new_context, did_commit = self._commit_port_binding( + plugin_context, port_id, binding, bind_context) + if not new_context: + # The port has been deleted concurrently, so just + # return the unbound result from the initial + # transaction that completed before the deletion. + return context._port + # Need to notify if we succeed and our results were + # committed. + if did_commit and (new_context._binding.vif_type != + portbindings.VIF_TYPE_BINDING_FAILED): + need_notify = True + context = new_context + + def _bind_port(self, orig_context): + # Construct a new PortContext from the one from the previous + # transaction. + port = orig_context._port + orig_binding = orig_context._binding + new_binding = models.PortBinding( + host=orig_binding.host, + vnic_type=orig_binding.vnic_type, + profile=orig_binding.profile, + vif_type=portbindings.VIF_TYPE_UNBOUND, + vif_details='' + ) + self._update_port_dict_binding(port, new_binding) + new_context = driver_context.PortContext( + self, orig_context._plugin_context, port, + orig_context._network_context._network, new_binding) + + # Attempt to bind the port and return the context with the + # result. + self.mechanism_manager.bind_port(new_context) + return new_context + + def _commit_port_binding(self, plugin_context, port_id, orig_binding, + new_context): + session = plugin_context.session + new_binding = new_context._binding + + # After we've attempted to bind the port, we begin a + # transaction, get the current port state, and decide whether + # to commit the binding results. + # + # REVISIT: Serialize this operation with a semaphore to + # prevent deadlock waiting to acquire a DB lock held by + # another thread in the same process, leading to 'lock wait + # timeout' errors. + with contextlib.nested(lockutils.lock('db-access'), + session.begin(subtransactions=True)): + # Get the current port state and build a new PortContext + # reflecting this state as original state for subsequent + # mechanism driver update_port_*commit() calls. + port_db, cur_binding = db.get_locked_port_and_binding(session, + port_id) + if not port_db: + # The port has been deleted concurrently. + return + oport = self._make_port_dict(port_db) + port = self._make_port_dict(port_db) + network = self.get_network(plugin_context, port['network_id']) + cur_context = driver_context.PortContext( + self, plugin_context, port, network, cur_binding, + original_port=oport) + + # Commit our binding results only if port has not been + # successfully bound concurrently by another thread or + # process and no binding inputs have been changed. + commit = ((cur_binding.vif_type in + [portbindings.VIF_TYPE_UNBOUND, + portbindings.VIF_TYPE_BINDING_FAILED]) and + orig_binding.host == cur_binding.host and + orig_binding.vnic_type == cur_binding.vnic_type and + orig_binding.profile == cur_binding.profile) + + if commit: + # Update the port's binding state with our binding + # results. + cur_binding.vif_type = new_binding.vif_type + cur_binding.vif_details = new_binding.vif_details + cur_binding.driver = new_binding.driver + cur_binding.segment = new_binding.segment + + # REVISIT(rkukura): The binding:profile attribute is + # supposed to be input-only, but the Mellanox driver + # currently modifies it while binding. Remove this + # code when the Mellanox driver has been updated to + # use binding:vif_details instead. + if cur_binding.profile != new_binding.profile: + cur_binding.profile = new_binding.profile + + # Update PortContext's port dictionary to reflect the + # updated binding state. + self._update_port_dict_binding(port, cur_binding) + + # Update the port status if requested by the bound driver. + if new_binding.segment and new_context._new_port_status: + port_db.status = new_context._new_port_status + port['status'] = new_context._new_port_status + + # Call the mechanism driver precommit methods, commit + # the results, and call the postcommit methods. + self.mechanism_manager.update_port_precommit(cur_context) + if commit: + self.mechanism_manager.update_port_postcommit(cur_context) + + # Continue, using the port state as of the transaction that + # just finished, whether that transaction committed new + # results or discovered concurrent port state changes. + return (cur_context, commit) def _update_port_dict_binding(self, port, binding): port[portbindings.HOST_ID] = binding.host @@ -304,15 +439,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, 'port': binding.port_id}) return {} - def _delete_port_binding(self, mech_context): - binding = mech_context._binding - binding.vif_type = portbindings.VIF_TYPE_UNBOUND - binding.vif_details = '' - binding.driver = None - binding.segment = None - port = mech_context.current - self._update_port_dict_binding(port, binding) - def _ml2_extend_port_dict_binding(self, port_res, port_db): # None when called during unit tests for other plugins. if port_db.port_binding: @@ -457,9 +583,18 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, session = context.session while True: try: - with session.begin(subtransactions=True): + # REVISIT(rkukura): Its not clear that + # with_lockmode('update') is really needed in this + # transaction, and if not, the semaphore can also be + # removed. + # + # REVISIT: Serialize this operation with a semaphore + # to prevent deadlock waiting to acquire a DB lock + # held by another thread in the same process, leading + # to 'lock wait timeout' errors. + with contextlib.nested(lockutils.lock('db-access'), + session.begin(subtransactions=True)): self._process_l3_delete(context, id) - # Get ports to auto-delete. ports = (session.query(models_v2.Port). enable_eagerloads(False). @@ -577,7 +712,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, LOG.debug(_("Deleting subnet %s"), id) session = context.session while True: - with session.begin(subtransactions=True): + # REVISIT: Serialize this operation with a semaphore to + # prevent deadlock waiting to acquire a DB lock held by + # another thread in the same process, leading to 'lock + # wait timeout' errors. + with contextlib.nested(lockutils.lock('db-access'), + session.begin(subtransactions=True)): subnet = self.get_subnet(context, id) # Get ports to auto-deallocate allocated = (session.query(models_v2.IPAllocation). @@ -644,8 +784,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, result = super(Ml2Plugin, self).create_port(context, port) self._process_port_create_security_group(context, result, sgids) network = self.get_network(context, result['network_id']) + binding = db.add_port_binding(session, result['id']) mech_context = driver_context.PortContext(self, context, result, - network) + network, binding) self._process_port_binding(mech_context, attrs) result[addr_pair.ADDRESS_PAIRS] = ( self._process_create_allowed_address_pairs( @@ -662,20 +803,34 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, LOG.error(_("mechanism_manager.create_port_postcommit " "failed, deleting port '%s'"), result['id']) self.delete_port(context, result['id']) + + # REVISIT(rkukura): Is there any point in calling this before + # a binding has been succesfully established? self.notify_security_groups_member_updated(context, result) - return result + + try: + bound_context = self._bind_port_if_needed(mech_context) + except ml2_exc.MechanismDriverError: + with excutils.save_and_reraise_exception(): + LOG.error(_("_bind_port_if_needed " + "failed, deleting port '%s'"), result['id']) + self.delete_port(context, result['id']) + return bound_context._port def update_port(self, context, id, port): attrs = port['port'] need_port_update_notify = False session = context.session - with session.begin(subtransactions=True): - try: - port_db = (session.query(models_v2.Port). - enable_eagerloads(False). - filter_by(id=id).with_lockmode('update').one()) - except sa_exc.NoResultFound: + + # REVISIT: Serialize this operation with a semaphore to + # prevent deadlock waiting to acquire a DB lock held by + # another thread in the same process, leading to 'lock wait + # timeout' errors. + with contextlib.nested(lockutils.lock('db-access'), + session.begin(subtransactions=True)): + port_db, binding = db.get_locked_port_and_binding(session, id) + if not port_db: raise exc.PortNotFound(port_id=id) original_port = self._make_port_dict(port_db) updated_port = super(Ml2Plugin, self).update_port(context, id, @@ -691,7 +846,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, need_port_update_notify |= self._update_extra_dhcp_opts_on_port( context, id, port, updated_port) mech_context = driver_context.PortContext( - self, context, updated_port, network, + self, context, updated_port, network, binding, original_port=original_port) need_port_update_notify |= self._process_port_binding( mech_context, attrs) @@ -709,10 +864,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, if original_port['admin_state_up'] != updated_port['admin_state_up']: need_port_update_notify = True - if need_port_update_notify: - self._notify_port_updated(mech_context) - - return updated_port + bound_port = self._bind_port_if_needed( + mech_context, + allow_notify=True, + need_notify=need_port_update_notify) + return bound_port._port def delete_port(self, context, id, l3_port_check=True): LOG.debug(_("Deleting port %s"), id) @@ -722,15 +878,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, l3plugin.prevent_l3_port_deletion(context, id) session = context.session - # REVISIT: Serialize this operation with a semaphore to prevent - # undesired eventlet yields leading to 'lock wait timeout' errors + # REVISIT: Serialize this operation with a semaphore to + # prevent deadlock waiting to acquire a DB lock held by + # another thread in the same process, leading to 'lock wait + # timeout' errors. with contextlib.nested(lockutils.lock('db-access'), session.begin(subtransactions=True)): - try: - port_db = (session.query(models_v2.Port). - enable_eagerloads(False). - filter_by(id=id).with_lockmode('update').one()) - except sa_exc.NoResultFound: + port_db, binding = db.get_locked_port_and_binding(session, id) + if not port_db: # the port existed when l3plugin.prevent_l3_port_deletion # was called but now is already gone LOG.debug(_("The port '%s' was deleted"), id) @@ -739,7 +894,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, network = self.get_network(context, port['network_id']) mech_context = driver_context.PortContext(self, context, port, - network) + network, binding) self.mechanism_manager.delete_port_precommit(mech_context) self._delete_port_security_group_bindings(context, id) LOG.debug(_("Calling base delete_port")) @@ -762,11 +917,34 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, LOG.error(_("mechanism_manager.delete_port_postcommit failed")) self.notify_security_groups_member_updated(context, port) + def get_bound_port_context(self, plugin_context, port_id): + session = plugin_context.session + with session.begin(subtransactions=True): + try: + port_db = (session.query(models_v2.Port). + enable_eagerloads(False). + filter(models_v2.Port.id.startswith(port_id)). + one()) + except sa_exc.NoResultFound: + return + except exc.MultipleResultsFound: + LOG.error(_("Multiple ports have port_id starting with %s"), + port_id) + return + port = self._make_port_dict(port_db) + network = self.get_network(plugin_context, port['network_id']) + port_context = driver_context.PortContext( + self, plugin_context, port, network, port_db.port_binding) + + return self._bind_port_if_needed(port_context) + def update_port_status(self, context, port_id, status): updated = False session = context.session - # REVISIT: Serialize this operation with a semaphore to prevent - # undesired eventlet yields leading to 'lock wait timeout' errors + # REVISIT: Serialize this operation with a semaphore to + # prevent deadlock waiting to acquire a DB lock held by + # another thread in the same process, leading to 'lock wait + # timeout' errors. with contextlib.nested(lockutils.lock('db-access'), session.begin(subtransactions=True)): port = db.get_port(session, port_id) @@ -781,7 +959,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, network = self.get_network(context, original_port['network_id']) mech_context = driver_context.PortContext( - self, context, updated_port, network, + self, context, updated_port, network, port.port_binding, original_port=original_port) self.mechanism_manager.update_port_precommit(mech_context) updated = True diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index f03ababf083..2876231b815 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -17,9 +17,9 @@ from neutron.agent import securitygroups_rpc as sg_rpc from neutron.common import constants as q_const from neutron.common import rpc as n_rpc from neutron.common import topics -from neutron.db import api as db_api from neutron.db import dhcp_rpc_base from neutron.db import securitygroups_rpc_base as sg_db_rpc +from neutron.extensions import portbindings from neutron import manager from neutron.openstack.common import log from neutron.openstack.common import uuidutils @@ -83,64 +83,43 @@ class RpcCallbacks(n_rpc.RpcCallback, {'device': device, 'agent_id': agent_id}) port_id = self._device_to_port_id(device) - session = db_api.get_session() - with session.begin(subtransactions=True): - port = db.get_port(session, port_id) - if not port: - LOG.warning(_("Device %(device)s requested by agent " - "%(agent_id)s not found in database"), - {'device': device, 'agent_id': agent_id}) - return {'device': device} + plugin = manager.NeutronManager.get_plugin() + port_context = plugin.get_bound_port_context(rpc_context, port_id) + if not port_context: + LOG.warning(_("Device %(device)s requested by agent " + "%(agent_id)s not found in database"), + {'device': device, 'agent_id': agent_id}) + return {'device': device} - segments = db.get_network_segments(session, port.network_id) - if not segments: - LOG.warning(_("Device %(device)s requested by agent " - "%(agent_id)s has network %(network_id)s with " - "no segments"), - {'device': device, - 'agent_id': agent_id, - 'network_id': port.network_id}) - return {'device': device} + segment = port_context.bound_segment + port = port_context.current - binding = db.ensure_port_binding(session, port.id) - if not binding.segment: - LOG.warning(_("Device %(device)s requested by agent " - "%(agent_id)s on network %(network_id)s not " - "bound, vif_type: %(vif_type)s"), - {'device': device, - 'agent_id': agent_id, - 'network_id': port.network_id, - 'vif_type': binding.vif_type}) - return {'device': device} + if not segment: + LOG.warning(_("Device %(device)s requested by agent " + "%(agent_id)s on network %(network_id)s not " + "bound, vif_type: %(vif_type)s"), + {'device': device, + 'agent_id': agent_id, + 'network_id': port['network_id'], + 'vif_type': port[portbindings.VIF_TYPE]}) + return {'device': device} - segment = self._find_segment(segments, binding.segment) - if not segment: - LOG.warning(_("Device %(device)s requested by agent " - "%(agent_id)s on network %(network_id)s " - "invalid segment, vif_type: %(vif_type)s"), - {'device': device, - 'agent_id': agent_id, - 'network_id': port.network_id, - 'vif_type': binding.vif_type}) - return {'device': device} + new_status = (q_const.PORT_STATUS_BUILD if port['admin_state_up'] + else q_const.PORT_STATUS_DOWN) + if port['status'] != new_status: + plugin.update_port_status(rpc_context, + port_id, + new_status) - new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up - else q_const.PORT_STATUS_DOWN) - if port.status != new_status: - plugin = manager.NeutronManager.get_plugin() - plugin.update_port_status(rpc_context, - port_id, - new_status) - port.status = new_status - entry = {'device': device, - 'network_id': port.network_id, - 'port_id': port.id, - 'admin_state_up': port.admin_state_up, - 'network_type': segment[api.NETWORK_TYPE], - 'segmentation_id': segment[api.SEGMENTATION_ID], - 'physical_network': segment[api.PHYSICAL_NETWORK]} - LOG.debug(_("Returning: %s"), entry) - return entry + entry = {'device': device, + 'network_id': port['network_id'], + 'port_id': port_id, + 'admin_state_up': port['admin_state_up'], + 'network_type': segment[api.NETWORK_TYPE], + 'segmentation_id': segment[api.SEGMENTATION_ID], + 'physical_network': segment[api.PHYSICAL_NETWORK]} + LOG.debug(_("Returning: %s"), entry) + return entry def get_devices_details_list(self, rpc_context, **kwargs): return [ @@ -152,11 +131,6 @@ class RpcCallbacks(n_rpc.RpcCallback, for device in kwargs.pop('devices', []) ] - def _find_segment(self, segments, segment_id): - for segment in segments: - if segment[api.ID] == segment_id: - return segment - def update_device_down(self, rpc_context, **kwargs): """Device no longer exists on agent.""" # TODO(garyk) - live migration and port status diff --git a/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py b/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py index 55717ebe2cc..5968d2422d7 100644 --- a/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py +++ b/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py @@ -19,7 +19,6 @@ import mock import webob.exc as wexc from neutron.api.v2 import base -from neutron.common import constants as n_const from neutron import context from neutron.extensions import portbindings from neutron import manager @@ -123,15 +122,33 @@ class CiscoML2MechanismTestCase(test_db_plugin.NeutronDbPluginV2TestCase): new_callable=mock.PropertyMock).start() self.mock_original_bound_segment.return_value = None - mock_status = mock.patch.object( + # Use _is_status_active method to determine bind state. + def _mock_check_bind_state(port_context): + if (port_context[portbindings.VIF_TYPE] != + portbindings.VIF_TYPE_UNBOUND): + return True + else: + return False + + self.mock_status = mock.patch.object( mech_cisco_nexus.CiscoNexusMechanismDriver, '_is_status_active').start() - mock_status.return_value = n_const.PORT_STATUS_ACTIVE + self.mock_status.side_effect = _mock_check_bind_state super(CiscoML2MechanismTestCase, self).setUp(ML2_PLUGIN) self.port_create_status = 'DOWN' + def _create_deviceowner_mock(self): + # Mock deviceowner method for UT's that expect update precommit + # failures. This allows control of delete_port_pre/postcommit() + # actions. + mock_deviceowner = mock.patch.object( + mech_cisco_nexus.CiscoNexusMechanismDriver, + '_is_deviceowner_compute').start() + mock_deviceowner.return_value = False + self.addCleanup(mock_deviceowner.stop) + @contextlib.contextmanager def _patch_ncclient(self, attr, value): """Configure an attribute on the mock ncclient module. @@ -223,7 +240,8 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase, @contextlib.contextmanager def _create_resources(self, name=NETWORK_NAME, cidr=CIDR_1, device_id=DEVICE_ID_1, - host_id=COMP_HOST_NAME): + host_id=COMP_HOST_NAME, + expected_failure=False): """Create network, subnet, and port resources for test cases. Create a network, subnet, port and then update the port, yield the @@ -233,18 +251,23 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase, :param cidr: cidr address of subnetwork to be created. :param device_id: Device ID to use for port to be created/updated. :param host_id: Host ID to use for port create/update. - + :param expected_failure: Set to True when an update_port_precommit + failure is expected. Results in no actions being taken in + delete_port_pre/postcommit() methods. """ with self.network(name=name) as network: with self.subnet(network=network, cidr=cidr) as subnet: with self.port(subnet=subnet, cidr=cidr) as port: + data = {'port': {portbindings.HOST_ID: host_id, 'device_id': device_id, - 'device_owner': 'compute:none', + 'device_owner': DEVICE_OWNER, 'admin_state_up': True}} req = self.new_update_request('ports', data, port['port']['id']) yield req.get_response(self.api) + if expected_failure: + self._create_deviceowner_mock() def _assertExpectedHTTP(self, status, exc): """Confirm that an HTTP status corresponds to an expected exception. @@ -578,7 +601,8 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase, a fictitious host name during port creation. """ - with self._create_resources(host_id='fake_host') as result: + with self._create_resources(host_id='fake_host', + expected_failure=True) as result: self._assertExpectedHTTP(result.status_int, c_exc.NexusComputeHostNotConfigured) @@ -586,10 +610,11 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase, """Test handling of a NexusMissingRequiredFields exception. Test the Cisco NexusMissingRequiredFields exception by using - empty host_id and device_id values during port creation. + empty device_id value during port creation. """ - with self._create_resources(device_id='', host_id='') as result: + with self._create_resources(device_id='', + expected_failure=True) as result: self._assertExpectedHTTP(result.status_int, c_exc.NexusMissingRequiredFields) diff --git a/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_nexus.py b/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_nexus.py index 31573b82b09..482c4500b18 100644 --- a/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_nexus.py +++ b/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_nexus.py @@ -74,7 +74,8 @@ class FakePortContext(object): 'status': PORT_STATE, 'device_id': device_id, 'device_owner': DEVICE_OWNER, - portbindings.HOST_ID: host_name + portbindings.HOST_ID: host_name, + portbindings.VIF_TYPE: portbindings.VIF_TYPE_OVS } self._network = network_context self._segment = network_context.network_segments diff --git a/neutron/tests/unit/ml2/drivers/mechanism_test.py b/neutron/tests/unit/ml2/drivers/mechanism_test.py index 6a0ca1e8630..090dfad4370 100644 --- a/neutron/tests/unit/ml2/drivers/mechanism_test.py +++ b/neutron/tests/unit/ml2/drivers/mechanism_test.py @@ -142,17 +142,7 @@ class TestMechanismDriver(api.MechanismDriver): self._check_port_context(context, False) def bind_port(self, context): - # REVISIT(rkukura): The upcoming fix for bug 1276391 will - # ensure the MDs see the unbinding of the port as a port - # update prior to re-binding, at which point this should be - # removed. - self.bound_ports.discard(context.current['id']) - - # REVISIT(rkukura): Currently, bind_port() is called as part - # of either a create or update transaction. The fix for bug - # 1276391 will change it to be called outside any transaction, - # so the context.original* will no longer be available. - self._check_port_context(context, context.original is not None) + self._check_port_context(context, False) host = context.current.get(portbindings.HOST_ID, None) segment = context.network.network_segments[0][api.ID] diff --git a/neutron/tests/unit/ml2/drivers/test_bigswitch_mech.py b/neutron/tests/unit/ml2/drivers/test_bigswitch_mech.py index c037fb3a259..b3536c856bb 100644 --- a/neutron/tests/unit/ml2/drivers/test_bigswitch_mech.py +++ b/neutron/tests/unit/ml2/drivers/test_bigswitch_mech.py @@ -112,7 +112,7 @@ class TestBigSwitchMechDriverPortsV2(test_db_plugin.TestPortsV2, def test_udpate404_triggers_background_sync(self): with contextlib.nested( - mock.patch(SERVER_POOL + '.rest_update_port', + mock.patch(DRIVER + '.async_port_create', side_effect=servermanager.RemoteRestError( reason=servermanager.NXNETWORK, status=404)), mock.patch(DRIVER + '._send_all_data'),