diff --git a/neutron/plugins/ml2/db.py b/neutron/plugins/ml2/db.py index 4cf8eed328b..176be08da2b 100644 --- a/neutron/plugins/ml2/db.py +++ b/neutron/plugins/ml2/db.py @@ -56,20 +56,38 @@ def get_network_segments(session, network_id): for record in records] -def ensure_port_binding(session, port_id): +def add_port_binding(session, port_id): with session.begin(subtransactions=True): - try: - record = (session.query(models.PortBinding). - filter_by(port_id=port_id). - one()) - except exc.NoResultFound: - record = models.PortBinding( - port_id=port_id, - vif_type=portbindings.VIF_TYPE_UNBOUND) - session.add(record) + record = models.PortBinding( + port_id=port_id, + vif_type=portbindings.VIF_TYPE_UNBOUND) + session.add(record) return record +def get_locked_port_and_binding(session, port_id): + """Get port and port binding records for update within transaction.""" + + try: + # REVISIT(rkukura): We need the Port and PortBinding records + # to both be added to the session and locked for update. A + # single joined query should work, but the combination of left + # outer joins and postgresql doesn't seem to work. + port = (session.query(models_v2.Port). + enable_eagerloads(False). + filter_by(id=port_id). + with_lockmode('update'). + one()) + binding = (session.query(models.PortBinding). + enable_eagerloads(False). + filter_by(port_id=port_id). + with_lockmode('update'). + one()) + return port, binding + except exc.NoResultFound: + return None, None + + def get_port(session, port_id): """Get port record for update within transcation.""" @@ -133,4 +151,8 @@ def get_port_binding_host(port_id): LOG.debug(_("No binding found for port %(port_id)s"), {'port_id': port_id}) return + except exc.MultipleResultsFound: + LOG.error(_("Multiple ports have port_id starting with %s"), + port_id) + return return query.host diff --git a/neutron/plugins/ml2/driver_api.py b/neutron/plugins/ml2/driver_api.py index 2384b0cf9d6..30dab693b0c 100644 --- a/neutron/plugins/ml2/driver_api.py +++ b/neutron/plugins/ml2/driver_api.py @@ -588,10 +588,21 @@ class MechanismDriver(object): :param context: PortContext instance describing the port - Called inside transaction context on session, prior to - create_port_precommit or update_port_precommit, to - attempt to establish a port binding. If the driver is able to - bind the port, it calls context.set_binding with the binding - details. + Called outside any transaction to attempt to establish a port + binding using this mechanism driver. If the driver is able to + bind the port, it must call context.set_binding() with the + binding details. If the binding results are committed after + bind_port() returns, they will be seen by all mechanism + drivers as update_port_precommit() and + update_port_postcommit() calls. + + Note that if some other thread or process concurrently binds + or updates the port, these binding results will not be + committed, and update_port_precommit() and + update_port_postcommit() will not be called on the mechanism + drivers with these results. Because binding results can be + discarded rather than committed, drivers should avoid making + persistent state changes in bind_port(), or else must ensure + that such state changes are eventually cleaned up. """ pass diff --git a/neutron/plugins/ml2/driver_context.py b/neutron/plugins/ml2/driver_context.py index 0c1180619f8..08f9b12c932 100644 --- a/neutron/plugins/ml2/driver_context.py +++ b/neutron/plugins/ml2/driver_context.py @@ -69,15 +69,14 @@ class SubnetContext(MechanismDriverContext, api.SubnetContext): class PortContext(MechanismDriverContext, api.PortContext): - def __init__(self, plugin, plugin_context, port, network, + def __init__(self, plugin, plugin_context, port, network, binding, original_port=None): super(PortContext, self).__init__(plugin, plugin_context) self._port = port self._original_port = original_port self._network_context = NetworkContext(plugin, plugin_context, network) - self._binding = db.ensure_port_binding(plugin_context.session, - port['id']) + self._binding = binding if original_port: self._original_bound_segment_id = self._binding.segment self._original_bound_driver = self._binding.driver diff --git a/neutron/plugins/ml2/drivers/mech_agent.py b/neutron/plugins/ml2/drivers/mech_agent.py index d0aad3ae967..9bde154ba59 100644 --- a/neutron/plugins/ml2/drivers/mech_agent.py +++ b/neutron/plugins/ml2/drivers/mech_agent.py @@ -82,10 +82,11 @@ class AgentMechanismDriverBase(api.MechanismDriver): :param agent: agents_db entry describing agent to bind :returns: True iff segment has been bound for agent - Called inside transaction during bind_port() so that derived - MechanismDrivers can use agent_db data along with built-in - knowledge of the corresponding agent's capabilities to attempt - to bind to the specified network segment for the agent. + Called outside any transaction during bind_port() so that + derived MechanismDrivers can use agent_db data along with + built-in knowledge of the corresponding agent's capabilities + to attempt to bind to the specified network segment for the + agent. If the segment can be bound for the agent, this function must call context.set_binding() with appropriate values and then diff --git a/neutron/plugins/ml2/drivers/mech_bigswitch/driver.py b/neutron/plugins/ml2/drivers/mech_bigswitch/driver.py index c40b4a099fe..259b14673c8 100644 --- a/neutron/plugins/ml2/drivers/mech_bigswitch/driver.py +++ b/neutron/plugins/ml2/drivers/mech_bigswitch/driver.py @@ -85,8 +85,8 @@ class BigSwitchMechanismDriver(plugin.NeutronRestProxyV2Base, port = self._prepare_port_for_controller(context) if port: try: - self.servers.rest_update_port(port["network"]["tenant_id"], - port["network"]["id"], port) + self.async_port_create(port["network"]["tenant_id"], + port["network"]["id"], port) except servermanager.RemoteRestError as e: with excutils.save_and_reraise_exception() as ctxt: if (cfg.CONF.RESTPROXY.auto_sync_on_failure and diff --git a/neutron/plugins/ml2/managers.py b/neutron/plugins/ml2/managers.py index 13df6732e2f..d3518f39e7e 100644 --- a/neutron/plugins/ml2/managers.py +++ b/neutron/plugins/ml2/managers.py @@ -439,9 +439,8 @@ class MechanismManager(stevedore.named.NamedExtensionManager): :param context: PortContext instance describing the port - Called inside transaction context on session, prior to - create_port_precommit or update_port_precommit, to - attempt to establish a port binding. + Called outside any transaction to attempt to establish a port + binding. """ binding = context._binding LOG.debug(_("Attempting to bind port %(port)s on host %(host)s " diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 9407b31ca90..0eb7b809ebf 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -12,7 +12,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + import contextlib +from eventlet import greenthread from oslo.config import cfg from oslo.db import exception as os_db_exception @@ -58,6 +60,8 @@ from neutron.plugins.ml2 import rpc LOG = log.getLogger(__name__) +MAX_BIND_TRIES = 10 + # REVISIT(rkukura): Move this and other network_type constants to # providernet.py? TYPE_MULTI_SEGMENT = 'multi-segment' @@ -211,69 +215,200 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, def _process_port_binding(self, mech_context, attrs): binding = mech_context._binding port = mech_context.current - self._update_port_dict_binding(port, binding) + changes = False host = attrs and attrs.get(portbindings.HOST_ID) - host_set = attributes.is_attr_set(host) + if (attributes.is_attr_set(host) and + binding.host != host): + binding.host = host + changes = True vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE) - vnic_type_set = attributes.is_attr_set(vnic_type) - - # CLI can't send {}, so treat None as {} - profile = attrs and attrs.get(portbindings.PROFILE) - profile_set = profile is not attributes.ATTR_NOT_SPECIFIED - if profile_set and not profile: - profile = {} - - if binding.vif_type != portbindings.VIF_TYPE_UNBOUND: - if (not host_set and not vnic_type_set and not profile_set and - binding.segment): - return False - self._delete_port_binding(mech_context) - - # Return True only if an agent notification is needed. - # This will happen if a new host, vnic_type, or profile was specified - # that differs from the current one. Note that host_set is True - # even if the host is an empty string - ret_value = ((host_set and binding.get('host') != host) or - (vnic_type_set and - binding.get('vnic_type') != vnic_type) or - (profile_set and self._get_profile(binding) != profile)) - - if host_set: - binding.host = host - port[portbindings.HOST_ID] = host - - if vnic_type_set: + if (attributes.is_attr_set(vnic_type) and + binding.vnic_type != vnic_type): binding.vnic_type = vnic_type - port[portbindings.VNIC_TYPE] = vnic_type + changes = True - if profile_set: + # CLI can't send {}, so treat None as {}. + profile = attrs and attrs.get(portbindings.PROFILE) or {} + if (profile is not attributes.ATTR_NOT_SPECIFIED and + self._get_profile(binding) != profile): binding.profile = jsonutils.dumps(profile) if len(binding.profile) > models.BINDING_PROFILE_LEN: msg = _("binding:profile value too large") raise exc.InvalidInput(error_message=msg) - port[portbindings.PROFILE] = profile + changes = True - # To try to [re]bind if host is non-empty. - if binding.host: - self.mechanism_manager.bind_port(mech_context) - self._update_port_dict_binding(port, binding) + # Unbind the port if needed. + if changes: + binding.vif_type = portbindings.VIF_TYPE_UNBOUND + binding.vif_details = '' + binding.driver = None + binding.segment = None - # Update the port status if requested by the bound driver. - if binding.segment and mech_context._new_port_status: - # REVISIT(rkukura): This function is currently called - # inside a transaction with the port either newly - # created or locked for update. After the fix for bug - # 1276391 is merged, this will no longer be true, and - # the port status update will need to be handled in - # the transaction that commits the new binding. - port_db = db.get_port(mech_context._plugin_context.session, - port['id']) - port_db.status = mech_context._new_port_status - port['status'] = mech_context._new_port_status + self._update_port_dict_binding(port, binding) + return changes - return ret_value + def _bind_port_if_needed(self, context, allow_notify=False, + need_notify=False): + plugin_context = context._plugin_context + port_id = context._port['id'] + + # Since the mechanism driver bind_port() calls must be made + # outside a DB transaction locking the port state, it is + # possible (but unlikely) that the port's state could change + # concurrently while these calls are being made. If another + # thread or process succeeds in binding the port before this + # thread commits its results, the already commited results are + # used. If attributes such as binding:host_id, + # binding:profile, or binding:vnic_type are updated + # concurrently, this loop retries binding using the new + # values. + count = 0 + while True: + # First, determine whether it is necessary and possible to + # bind the port. + binding = context._binding + if (binding.vif_type != portbindings.VIF_TYPE_UNBOUND + or not binding.host): + # We either don't need to bind the port, or can't, so + # notify if needed and return. + if allow_notify and need_notify: + self._notify_port_updated(context) + return context + + # Limit binding attempts to avoid any possibility of + # infinite looping and to ensure an error is logged + # instead. This does not need to be tunable because no + # more than a couple attempts should ever be required in + # normal operation. Log at info level if not 1st attempt. + count += 1 + if count > MAX_BIND_TRIES: + LOG.error(_("Failed to commit binding results for %(port)s " + "after %(max)s tries"), + {'port': port_id, 'max': MAX_BIND_TRIES}) + return context + if count > 1: + greenthread.sleep(0) # yield + LOG.info(_("Attempt %(count)s to bind port %(port)s"), + {'count': count, 'port': port_id}) + + # The port isn't already bound and the necessary + # information is available, so attempt to bind the port. + bind_context = self._bind_port(context) + + # Now try to commit result of attempting to bind the port. + new_context, did_commit = self._commit_port_binding( + plugin_context, port_id, binding, bind_context) + if not new_context: + # The port has been deleted concurrently, so just + # return the unbound result from the initial + # transaction that completed before the deletion. + return context._port + # Need to notify if we succeed and our results were + # committed. + if did_commit and (new_context._binding.vif_type != + portbindings.VIF_TYPE_BINDING_FAILED): + need_notify = True + context = new_context + + def _bind_port(self, orig_context): + # Construct a new PortContext from the one from the previous + # transaction. + port = orig_context._port + orig_binding = orig_context._binding + new_binding = models.PortBinding( + host=orig_binding.host, + vnic_type=orig_binding.vnic_type, + profile=orig_binding.profile, + vif_type=portbindings.VIF_TYPE_UNBOUND, + vif_details='' + ) + self._update_port_dict_binding(port, new_binding) + new_context = driver_context.PortContext( + self, orig_context._plugin_context, port, + orig_context._network_context._network, new_binding) + + # Attempt to bind the port and return the context with the + # result. + self.mechanism_manager.bind_port(new_context) + return new_context + + def _commit_port_binding(self, plugin_context, port_id, orig_binding, + new_context): + session = plugin_context.session + new_binding = new_context._binding + + # After we've attempted to bind the port, we begin a + # transaction, get the current port state, and decide whether + # to commit the binding results. + # + # REVISIT: Serialize this operation with a semaphore to + # prevent deadlock waiting to acquire a DB lock held by + # another thread in the same process, leading to 'lock wait + # timeout' errors. + with contextlib.nested(lockutils.lock('db-access'), + session.begin(subtransactions=True)): + # Get the current port state and build a new PortContext + # reflecting this state as original state for subsequent + # mechanism driver update_port_*commit() calls. + port_db, cur_binding = db.get_locked_port_and_binding(session, + port_id) + if not port_db: + # The port has been deleted concurrently. + return + oport = self._make_port_dict(port_db) + port = self._make_port_dict(port_db) + network = self.get_network(plugin_context, port['network_id']) + cur_context = driver_context.PortContext( + self, plugin_context, port, network, cur_binding, + original_port=oport) + + # Commit our binding results only if port has not been + # successfully bound concurrently by another thread or + # process and no binding inputs have been changed. + commit = ((cur_binding.vif_type in + [portbindings.VIF_TYPE_UNBOUND, + portbindings.VIF_TYPE_BINDING_FAILED]) and + orig_binding.host == cur_binding.host and + orig_binding.vnic_type == cur_binding.vnic_type and + orig_binding.profile == cur_binding.profile) + + if commit: + # Update the port's binding state with our binding + # results. + cur_binding.vif_type = new_binding.vif_type + cur_binding.vif_details = new_binding.vif_details + cur_binding.driver = new_binding.driver + cur_binding.segment = new_binding.segment + + # REVISIT(rkukura): The binding:profile attribute is + # supposed to be input-only, but the Mellanox driver + # currently modifies it while binding. Remove this + # code when the Mellanox driver has been updated to + # use binding:vif_details instead. + if cur_binding.profile != new_binding.profile: + cur_binding.profile = new_binding.profile + + # Update PortContext's port dictionary to reflect the + # updated binding state. + self._update_port_dict_binding(port, cur_binding) + + # Update the port status if requested by the bound driver. + if new_binding.segment and new_context._new_port_status: + port_db.status = new_context._new_port_status + port['status'] = new_context._new_port_status + + # Call the mechanism driver precommit methods, commit + # the results, and call the postcommit methods. + self.mechanism_manager.update_port_precommit(cur_context) + if commit: + self.mechanism_manager.update_port_postcommit(cur_context) + + # Continue, using the port state as of the transaction that + # just finished, whether that transaction committed new + # results or discovered concurrent port state changes. + return (cur_context, commit) def _update_port_dict_binding(self, port, binding): port[portbindings.HOST_ID] = binding.host @@ -304,15 +439,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, 'port': binding.port_id}) return {} - def _delete_port_binding(self, mech_context): - binding = mech_context._binding - binding.vif_type = portbindings.VIF_TYPE_UNBOUND - binding.vif_details = '' - binding.driver = None - binding.segment = None - port = mech_context.current - self._update_port_dict_binding(port, binding) - def _ml2_extend_port_dict_binding(self, port_res, port_db): # None when called during unit tests for other plugins. if port_db.port_binding: @@ -457,9 +583,18 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, session = context.session while True: try: - with session.begin(subtransactions=True): + # REVISIT(rkukura): Its not clear that + # with_lockmode('update') is really needed in this + # transaction, and if not, the semaphore can also be + # removed. + # + # REVISIT: Serialize this operation with a semaphore + # to prevent deadlock waiting to acquire a DB lock + # held by another thread in the same process, leading + # to 'lock wait timeout' errors. + with contextlib.nested(lockutils.lock('db-access'), + session.begin(subtransactions=True)): self._process_l3_delete(context, id) - # Get ports to auto-delete. ports = (session.query(models_v2.Port). enable_eagerloads(False). @@ -577,7 +712,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, LOG.debug(_("Deleting subnet %s"), id) session = context.session while True: - with session.begin(subtransactions=True): + # REVISIT: Serialize this operation with a semaphore to + # prevent deadlock waiting to acquire a DB lock held by + # another thread in the same process, leading to 'lock + # wait timeout' errors. + with contextlib.nested(lockutils.lock('db-access'), + session.begin(subtransactions=True)): subnet = self.get_subnet(context, id) # Get ports to auto-deallocate allocated = (session.query(models_v2.IPAllocation). @@ -644,8 +784,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, result = super(Ml2Plugin, self).create_port(context, port) self._process_port_create_security_group(context, result, sgids) network = self.get_network(context, result['network_id']) + binding = db.add_port_binding(session, result['id']) mech_context = driver_context.PortContext(self, context, result, - network) + network, binding) self._process_port_binding(mech_context, attrs) result[addr_pair.ADDRESS_PAIRS] = ( self._process_create_allowed_address_pairs( @@ -662,20 +803,34 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, LOG.error(_("mechanism_manager.create_port_postcommit " "failed, deleting port '%s'"), result['id']) self.delete_port(context, result['id']) + + # REVISIT(rkukura): Is there any point in calling this before + # a binding has been succesfully established? self.notify_security_groups_member_updated(context, result) - return result + + try: + bound_context = self._bind_port_if_needed(mech_context) + except ml2_exc.MechanismDriverError: + with excutils.save_and_reraise_exception(): + LOG.error(_("_bind_port_if_needed " + "failed, deleting port '%s'"), result['id']) + self.delete_port(context, result['id']) + return bound_context._port def update_port(self, context, id, port): attrs = port['port'] need_port_update_notify = False session = context.session - with session.begin(subtransactions=True): - try: - port_db = (session.query(models_v2.Port). - enable_eagerloads(False). - filter_by(id=id).with_lockmode('update').one()) - except sa_exc.NoResultFound: + + # REVISIT: Serialize this operation with a semaphore to + # prevent deadlock waiting to acquire a DB lock held by + # another thread in the same process, leading to 'lock wait + # timeout' errors. + with contextlib.nested(lockutils.lock('db-access'), + session.begin(subtransactions=True)): + port_db, binding = db.get_locked_port_and_binding(session, id) + if not port_db: raise exc.PortNotFound(port_id=id) original_port = self._make_port_dict(port_db) updated_port = super(Ml2Plugin, self).update_port(context, id, @@ -691,7 +846,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, need_port_update_notify |= self._update_extra_dhcp_opts_on_port( context, id, port, updated_port) mech_context = driver_context.PortContext( - self, context, updated_port, network, + self, context, updated_port, network, binding, original_port=original_port) need_port_update_notify |= self._process_port_binding( mech_context, attrs) @@ -709,10 +864,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, if original_port['admin_state_up'] != updated_port['admin_state_up']: need_port_update_notify = True - if need_port_update_notify: - self._notify_port_updated(mech_context) - - return updated_port + bound_port = self._bind_port_if_needed( + mech_context, + allow_notify=True, + need_notify=need_port_update_notify) + return bound_port._port def delete_port(self, context, id, l3_port_check=True): LOG.debug(_("Deleting port %s"), id) @@ -722,15 +878,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, l3plugin.prevent_l3_port_deletion(context, id) session = context.session - # REVISIT: Serialize this operation with a semaphore to prevent - # undesired eventlet yields leading to 'lock wait timeout' errors + # REVISIT: Serialize this operation with a semaphore to + # prevent deadlock waiting to acquire a DB lock held by + # another thread in the same process, leading to 'lock wait + # timeout' errors. with contextlib.nested(lockutils.lock('db-access'), session.begin(subtransactions=True)): - try: - port_db = (session.query(models_v2.Port). - enable_eagerloads(False). - filter_by(id=id).with_lockmode('update').one()) - except sa_exc.NoResultFound: + port_db, binding = db.get_locked_port_and_binding(session, id) + if not port_db: # the port existed when l3plugin.prevent_l3_port_deletion # was called but now is already gone LOG.debug(_("The port '%s' was deleted"), id) @@ -739,7 +894,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, network = self.get_network(context, port['network_id']) mech_context = driver_context.PortContext(self, context, port, - network) + network, binding) self.mechanism_manager.delete_port_precommit(mech_context) self._delete_port_security_group_bindings(context, id) LOG.debug(_("Calling base delete_port")) @@ -762,11 +917,34 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, LOG.error(_("mechanism_manager.delete_port_postcommit failed")) self.notify_security_groups_member_updated(context, port) + def get_bound_port_context(self, plugin_context, port_id): + session = plugin_context.session + with session.begin(subtransactions=True): + try: + port_db = (session.query(models_v2.Port). + enable_eagerloads(False). + filter(models_v2.Port.id.startswith(port_id)). + one()) + except sa_exc.NoResultFound: + return + except exc.MultipleResultsFound: + LOG.error(_("Multiple ports have port_id starting with %s"), + port_id) + return + port = self._make_port_dict(port_db) + network = self.get_network(plugin_context, port['network_id']) + port_context = driver_context.PortContext( + self, plugin_context, port, network, port_db.port_binding) + + return self._bind_port_if_needed(port_context) + def update_port_status(self, context, port_id, status): updated = False session = context.session - # REVISIT: Serialize this operation with a semaphore to prevent - # undesired eventlet yields leading to 'lock wait timeout' errors + # REVISIT: Serialize this operation with a semaphore to + # prevent deadlock waiting to acquire a DB lock held by + # another thread in the same process, leading to 'lock wait + # timeout' errors. with contextlib.nested(lockutils.lock('db-access'), session.begin(subtransactions=True)): port = db.get_port(session, port_id) @@ -781,7 +959,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, network = self.get_network(context, original_port['network_id']) mech_context = driver_context.PortContext( - self, context, updated_port, network, + self, context, updated_port, network, port.port_binding, original_port=original_port) self.mechanism_manager.update_port_precommit(mech_context) updated = True diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index f03ababf083..2876231b815 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -17,9 +17,9 @@ from neutron.agent import securitygroups_rpc as sg_rpc from neutron.common import constants as q_const from neutron.common import rpc as n_rpc from neutron.common import topics -from neutron.db import api as db_api from neutron.db import dhcp_rpc_base from neutron.db import securitygroups_rpc_base as sg_db_rpc +from neutron.extensions import portbindings from neutron import manager from neutron.openstack.common import log from neutron.openstack.common import uuidutils @@ -83,64 +83,43 @@ class RpcCallbacks(n_rpc.RpcCallback, {'device': device, 'agent_id': agent_id}) port_id = self._device_to_port_id(device) - session = db_api.get_session() - with session.begin(subtransactions=True): - port = db.get_port(session, port_id) - if not port: - LOG.warning(_("Device %(device)s requested by agent " - "%(agent_id)s not found in database"), - {'device': device, 'agent_id': agent_id}) - return {'device': device} + plugin = manager.NeutronManager.get_plugin() + port_context = plugin.get_bound_port_context(rpc_context, port_id) + if not port_context: + LOG.warning(_("Device %(device)s requested by agent " + "%(agent_id)s not found in database"), + {'device': device, 'agent_id': agent_id}) + return {'device': device} - segments = db.get_network_segments(session, port.network_id) - if not segments: - LOG.warning(_("Device %(device)s requested by agent " - "%(agent_id)s has network %(network_id)s with " - "no segments"), - {'device': device, - 'agent_id': agent_id, - 'network_id': port.network_id}) - return {'device': device} + segment = port_context.bound_segment + port = port_context.current - binding = db.ensure_port_binding(session, port.id) - if not binding.segment: - LOG.warning(_("Device %(device)s requested by agent " - "%(agent_id)s on network %(network_id)s not " - "bound, vif_type: %(vif_type)s"), - {'device': device, - 'agent_id': agent_id, - 'network_id': port.network_id, - 'vif_type': binding.vif_type}) - return {'device': device} + if not segment: + LOG.warning(_("Device %(device)s requested by agent " + "%(agent_id)s on network %(network_id)s not " + "bound, vif_type: %(vif_type)s"), + {'device': device, + 'agent_id': agent_id, + 'network_id': port['network_id'], + 'vif_type': port[portbindings.VIF_TYPE]}) + return {'device': device} - segment = self._find_segment(segments, binding.segment) - if not segment: - LOG.warning(_("Device %(device)s requested by agent " - "%(agent_id)s on network %(network_id)s " - "invalid segment, vif_type: %(vif_type)s"), - {'device': device, - 'agent_id': agent_id, - 'network_id': port.network_id, - 'vif_type': binding.vif_type}) - return {'device': device} + new_status = (q_const.PORT_STATUS_BUILD if port['admin_state_up'] + else q_const.PORT_STATUS_DOWN) + if port['status'] != new_status: + plugin.update_port_status(rpc_context, + port_id, + new_status) - new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up - else q_const.PORT_STATUS_DOWN) - if port.status != new_status: - plugin = manager.NeutronManager.get_plugin() - plugin.update_port_status(rpc_context, - port_id, - new_status) - port.status = new_status - entry = {'device': device, - 'network_id': port.network_id, - 'port_id': port.id, - 'admin_state_up': port.admin_state_up, - 'network_type': segment[api.NETWORK_TYPE], - 'segmentation_id': segment[api.SEGMENTATION_ID], - 'physical_network': segment[api.PHYSICAL_NETWORK]} - LOG.debug(_("Returning: %s"), entry) - return entry + entry = {'device': device, + 'network_id': port['network_id'], + 'port_id': port_id, + 'admin_state_up': port['admin_state_up'], + 'network_type': segment[api.NETWORK_TYPE], + 'segmentation_id': segment[api.SEGMENTATION_ID], + 'physical_network': segment[api.PHYSICAL_NETWORK]} + LOG.debug(_("Returning: %s"), entry) + return entry def get_devices_details_list(self, rpc_context, **kwargs): return [ @@ -152,11 +131,6 @@ class RpcCallbacks(n_rpc.RpcCallback, for device in kwargs.pop('devices', []) ] - def _find_segment(self, segments, segment_id): - for segment in segments: - if segment[api.ID] == segment_id: - return segment - def update_device_down(self, rpc_context, **kwargs): """Device no longer exists on agent.""" # TODO(garyk) - live migration and port status diff --git a/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py b/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py index 55717ebe2cc..5968d2422d7 100644 --- a/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py +++ b/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_mech.py @@ -19,7 +19,6 @@ import mock import webob.exc as wexc from neutron.api.v2 import base -from neutron.common import constants as n_const from neutron import context from neutron.extensions import portbindings from neutron import manager @@ -123,15 +122,33 @@ class CiscoML2MechanismTestCase(test_db_plugin.NeutronDbPluginV2TestCase): new_callable=mock.PropertyMock).start() self.mock_original_bound_segment.return_value = None - mock_status = mock.patch.object( + # Use _is_status_active method to determine bind state. + def _mock_check_bind_state(port_context): + if (port_context[portbindings.VIF_TYPE] != + portbindings.VIF_TYPE_UNBOUND): + return True + else: + return False + + self.mock_status = mock.patch.object( mech_cisco_nexus.CiscoNexusMechanismDriver, '_is_status_active').start() - mock_status.return_value = n_const.PORT_STATUS_ACTIVE + self.mock_status.side_effect = _mock_check_bind_state super(CiscoML2MechanismTestCase, self).setUp(ML2_PLUGIN) self.port_create_status = 'DOWN' + def _create_deviceowner_mock(self): + # Mock deviceowner method for UT's that expect update precommit + # failures. This allows control of delete_port_pre/postcommit() + # actions. + mock_deviceowner = mock.patch.object( + mech_cisco_nexus.CiscoNexusMechanismDriver, + '_is_deviceowner_compute').start() + mock_deviceowner.return_value = False + self.addCleanup(mock_deviceowner.stop) + @contextlib.contextmanager def _patch_ncclient(self, attr, value): """Configure an attribute on the mock ncclient module. @@ -223,7 +240,8 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase, @contextlib.contextmanager def _create_resources(self, name=NETWORK_NAME, cidr=CIDR_1, device_id=DEVICE_ID_1, - host_id=COMP_HOST_NAME): + host_id=COMP_HOST_NAME, + expected_failure=False): """Create network, subnet, and port resources for test cases. Create a network, subnet, port and then update the port, yield the @@ -233,18 +251,23 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase, :param cidr: cidr address of subnetwork to be created. :param device_id: Device ID to use for port to be created/updated. :param host_id: Host ID to use for port create/update. - + :param expected_failure: Set to True when an update_port_precommit + failure is expected. Results in no actions being taken in + delete_port_pre/postcommit() methods. """ with self.network(name=name) as network: with self.subnet(network=network, cidr=cidr) as subnet: with self.port(subnet=subnet, cidr=cidr) as port: + data = {'port': {portbindings.HOST_ID: host_id, 'device_id': device_id, - 'device_owner': 'compute:none', + 'device_owner': DEVICE_OWNER, 'admin_state_up': True}} req = self.new_update_request('ports', data, port['port']['id']) yield req.get_response(self.api) + if expected_failure: + self._create_deviceowner_mock() def _assertExpectedHTTP(self, status, exc): """Confirm that an HTTP status corresponds to an expected exception. @@ -578,7 +601,8 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase, a fictitious host name during port creation. """ - with self._create_resources(host_id='fake_host') as result: + with self._create_resources(host_id='fake_host', + expected_failure=True) as result: self._assertExpectedHTTP(result.status_int, c_exc.NexusComputeHostNotConfigured) @@ -586,10 +610,11 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase, """Test handling of a NexusMissingRequiredFields exception. Test the Cisco NexusMissingRequiredFields exception by using - empty host_id and device_id values during port creation. + empty device_id value during port creation. """ - with self._create_resources(device_id='', host_id='') as result: + with self._create_resources(device_id='', + expected_failure=True) as result: self._assertExpectedHTTP(result.status_int, c_exc.NexusMissingRequiredFields) diff --git a/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_nexus.py b/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_nexus.py index 31573b82b09..482c4500b18 100644 --- a/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_nexus.py +++ b/neutron/tests/unit/ml2/drivers/cisco/nexus/test_cisco_nexus.py @@ -74,7 +74,8 @@ class FakePortContext(object): 'status': PORT_STATE, 'device_id': device_id, 'device_owner': DEVICE_OWNER, - portbindings.HOST_ID: host_name + portbindings.HOST_ID: host_name, + portbindings.VIF_TYPE: portbindings.VIF_TYPE_OVS } self._network = network_context self._segment = network_context.network_segments diff --git a/neutron/tests/unit/ml2/drivers/mechanism_test.py b/neutron/tests/unit/ml2/drivers/mechanism_test.py index 6a0ca1e8630..090dfad4370 100644 --- a/neutron/tests/unit/ml2/drivers/mechanism_test.py +++ b/neutron/tests/unit/ml2/drivers/mechanism_test.py @@ -142,17 +142,7 @@ class TestMechanismDriver(api.MechanismDriver): self._check_port_context(context, False) def bind_port(self, context): - # REVISIT(rkukura): The upcoming fix for bug 1276391 will - # ensure the MDs see the unbinding of the port as a port - # update prior to re-binding, at which point this should be - # removed. - self.bound_ports.discard(context.current['id']) - - # REVISIT(rkukura): Currently, bind_port() is called as part - # of either a create or update transaction. The fix for bug - # 1276391 will change it to be called outside any transaction, - # so the context.original* will no longer be available. - self._check_port_context(context, context.original is not None) + self._check_port_context(context, False) host = context.current.get(portbindings.HOST_ID, None) segment = context.network.network_segments[0][api.ID] diff --git a/neutron/tests/unit/ml2/drivers/test_bigswitch_mech.py b/neutron/tests/unit/ml2/drivers/test_bigswitch_mech.py index c037fb3a259..b3536c856bb 100644 --- a/neutron/tests/unit/ml2/drivers/test_bigswitch_mech.py +++ b/neutron/tests/unit/ml2/drivers/test_bigswitch_mech.py @@ -112,7 +112,7 @@ class TestBigSwitchMechDriverPortsV2(test_db_plugin.TestPortsV2, def test_udpate404_triggers_background_sync(self): with contextlib.nested( - mock.patch(SERVER_POOL + '.rest_update_port', + mock.patch(DRIVER + '.async_port_create', side_effect=servermanager.RemoteRestError( reason=servermanager.NXNETWORK, status=404)), mock.patch(DRIVER + '._send_all_data'),