Merge "ML2: Bind ports outside transactions"
This commit is contained in:
commit
b014ae30c6
|
@ -56,20 +56,38 @@ def get_network_segments(session, network_id):
|
|||
for record in records]
|
||||
|
||||
|
||||
def ensure_port_binding(session, port_id):
|
||||
def add_port_binding(session, port_id):
|
||||
with session.begin(subtransactions=True):
|
||||
try:
|
||||
record = (session.query(models.PortBinding).
|
||||
filter_by(port_id=port_id).
|
||||
one())
|
||||
except exc.NoResultFound:
|
||||
record = models.PortBinding(
|
||||
port_id=port_id,
|
||||
vif_type=portbindings.VIF_TYPE_UNBOUND)
|
||||
session.add(record)
|
||||
record = models.PortBinding(
|
||||
port_id=port_id,
|
||||
vif_type=portbindings.VIF_TYPE_UNBOUND)
|
||||
session.add(record)
|
||||
return record
|
||||
|
||||
|
||||
def get_locked_port_and_binding(session, port_id):
|
||||
"""Get port and port binding records for update within transaction."""
|
||||
|
||||
try:
|
||||
# REVISIT(rkukura): We need the Port and PortBinding records
|
||||
# to both be added to the session and locked for update. A
|
||||
# single joined query should work, but the combination of left
|
||||
# outer joins and postgresql doesn't seem to work.
|
||||
port = (session.query(models_v2.Port).
|
||||
enable_eagerloads(False).
|
||||
filter_by(id=port_id).
|
||||
with_lockmode('update').
|
||||
one())
|
||||
binding = (session.query(models.PortBinding).
|
||||
enable_eagerloads(False).
|
||||
filter_by(port_id=port_id).
|
||||
with_lockmode('update').
|
||||
one())
|
||||
return port, binding
|
||||
except exc.NoResultFound:
|
||||
return None, None
|
||||
|
||||
|
||||
def get_port(session, port_id):
|
||||
"""Get port record for update within transcation."""
|
||||
|
||||
|
@ -133,4 +151,8 @@ def get_port_binding_host(port_id):
|
|||
LOG.debug(_("No binding found for port %(port_id)s"),
|
||||
{'port_id': port_id})
|
||||
return
|
||||
except exc.MultipleResultsFound:
|
||||
LOG.error(_("Multiple ports have port_id starting with %s"),
|
||||
port_id)
|
||||
return
|
||||
return query.host
|
||||
|
|
|
@ -588,10 +588,21 @@ class MechanismDriver(object):
|
|||
|
||||
:param context: PortContext instance describing the port
|
||||
|
||||
Called inside transaction context on session, prior to
|
||||
create_port_precommit or update_port_precommit, to
|
||||
attempt to establish a port binding. If the driver is able to
|
||||
bind the port, it calls context.set_binding with the binding
|
||||
details.
|
||||
Called outside any transaction to attempt to establish a port
|
||||
binding using this mechanism driver. If the driver is able to
|
||||
bind the port, it must call context.set_binding() with the
|
||||
binding details. If the binding results are committed after
|
||||
bind_port() returns, they will be seen by all mechanism
|
||||
drivers as update_port_precommit() and
|
||||
update_port_postcommit() calls.
|
||||
|
||||
Note that if some other thread or process concurrently binds
|
||||
or updates the port, these binding results will not be
|
||||
committed, and update_port_precommit() and
|
||||
update_port_postcommit() will not be called on the mechanism
|
||||
drivers with these results. Because binding results can be
|
||||
discarded rather than committed, drivers should avoid making
|
||||
persistent state changes in bind_port(), or else must ensure
|
||||
that such state changes are eventually cleaned up.
|
||||
"""
|
||||
pass
|
||||
|
|
|
@ -69,15 +69,14 @@ class SubnetContext(MechanismDriverContext, api.SubnetContext):
|
|||
|
||||
class PortContext(MechanismDriverContext, api.PortContext):
|
||||
|
||||
def __init__(self, plugin, plugin_context, port, network,
|
||||
def __init__(self, plugin, plugin_context, port, network, binding,
|
||||
original_port=None):
|
||||
super(PortContext, self).__init__(plugin, plugin_context)
|
||||
self._port = port
|
||||
self._original_port = original_port
|
||||
self._network_context = NetworkContext(plugin, plugin_context,
|
||||
network)
|
||||
self._binding = db.ensure_port_binding(plugin_context.session,
|
||||
port['id'])
|
||||
self._binding = binding
|
||||
if original_port:
|
||||
self._original_bound_segment_id = self._binding.segment
|
||||
self._original_bound_driver = self._binding.driver
|
||||
|
|
|
@ -82,10 +82,11 @@ class AgentMechanismDriverBase(api.MechanismDriver):
|
|||
:param agent: agents_db entry describing agent to bind
|
||||
:returns: True iff segment has been bound for agent
|
||||
|
||||
Called inside transaction during bind_port() so that derived
|
||||
MechanismDrivers can use agent_db data along with built-in
|
||||
knowledge of the corresponding agent's capabilities to attempt
|
||||
to bind to the specified network segment for the agent.
|
||||
Called outside any transaction during bind_port() so that
|
||||
derived MechanismDrivers can use agent_db data along with
|
||||
built-in knowledge of the corresponding agent's capabilities
|
||||
to attempt to bind to the specified network segment for the
|
||||
agent.
|
||||
|
||||
If the segment can be bound for the agent, this function must
|
||||
call context.set_binding() with appropriate values and then
|
||||
|
|
|
@ -85,8 +85,8 @@ class BigSwitchMechanismDriver(plugin.NeutronRestProxyV2Base,
|
|||
port = self._prepare_port_for_controller(context)
|
||||
if port:
|
||||
try:
|
||||
self.servers.rest_update_port(port["network"]["tenant_id"],
|
||||
port["network"]["id"], port)
|
||||
self.async_port_create(port["network"]["tenant_id"],
|
||||
port["network"]["id"], port)
|
||||
except servermanager.RemoteRestError as e:
|
||||
with excutils.save_and_reraise_exception() as ctxt:
|
||||
if (cfg.CONF.RESTPROXY.auto_sync_on_failure and
|
||||
|
|
|
@ -439,9 +439,8 @@ class MechanismManager(stevedore.named.NamedExtensionManager):
|
|||
|
||||
:param context: PortContext instance describing the port
|
||||
|
||||
Called inside transaction context on session, prior to
|
||||
create_port_precommit or update_port_precommit, to
|
||||
attempt to establish a port binding.
|
||||
Called outside any transaction to attempt to establish a port
|
||||
binding.
|
||||
"""
|
||||
binding = context._binding
|
||||
LOG.debug(_("Attempting to bind port %(port)s on host %(host)s "
|
||||
|
|
|
@ -12,7 +12,9 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
from eventlet import greenthread
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo.db import exception as os_db_exception
|
||||
|
@ -58,6 +60,8 @@ from neutron.plugins.ml2 import rpc
|
|||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
MAX_BIND_TRIES = 10
|
||||
|
||||
# REVISIT(rkukura): Move this and other network_type constants to
|
||||
# providernet.py?
|
||||
TYPE_MULTI_SEGMENT = 'multi-segment'
|
||||
|
@ -211,69 +215,200 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
def _process_port_binding(self, mech_context, attrs):
|
||||
binding = mech_context._binding
|
||||
port = mech_context.current
|
||||
self._update_port_dict_binding(port, binding)
|
||||
changes = False
|
||||
|
||||
host = attrs and attrs.get(portbindings.HOST_ID)
|
||||
host_set = attributes.is_attr_set(host)
|
||||
if (attributes.is_attr_set(host) and
|
||||
binding.host != host):
|
||||
binding.host = host
|
||||
changes = True
|
||||
|
||||
vnic_type = attrs and attrs.get(portbindings.VNIC_TYPE)
|
||||
vnic_type_set = attributes.is_attr_set(vnic_type)
|
||||
|
||||
# CLI can't send {}, so treat None as {}
|
||||
profile = attrs and attrs.get(portbindings.PROFILE)
|
||||
profile_set = profile is not attributes.ATTR_NOT_SPECIFIED
|
||||
if profile_set and not profile:
|
||||
profile = {}
|
||||
|
||||
if binding.vif_type != portbindings.VIF_TYPE_UNBOUND:
|
||||
if (not host_set and not vnic_type_set and not profile_set and
|
||||
binding.segment):
|
||||
return False
|
||||
self._delete_port_binding(mech_context)
|
||||
|
||||
# Return True only if an agent notification is needed.
|
||||
# This will happen if a new host, vnic_type, or profile was specified
|
||||
# that differs from the current one. Note that host_set is True
|
||||
# even if the host is an empty string
|
||||
ret_value = ((host_set and binding.get('host') != host) or
|
||||
(vnic_type_set and
|
||||
binding.get('vnic_type') != vnic_type) or
|
||||
(profile_set and self._get_profile(binding) != profile))
|
||||
|
||||
if host_set:
|
||||
binding.host = host
|
||||
port[portbindings.HOST_ID] = host
|
||||
|
||||
if vnic_type_set:
|
||||
if (attributes.is_attr_set(vnic_type) and
|
||||
binding.vnic_type != vnic_type):
|
||||
binding.vnic_type = vnic_type
|
||||
port[portbindings.VNIC_TYPE] = vnic_type
|
||||
changes = True
|
||||
|
||||
if profile_set:
|
||||
# CLI can't send {}, so treat None as {}.
|
||||
profile = attrs and attrs.get(portbindings.PROFILE) or {}
|
||||
if (profile is not attributes.ATTR_NOT_SPECIFIED and
|
||||
self._get_profile(binding) != profile):
|
||||
binding.profile = jsonutils.dumps(profile)
|
||||
if len(binding.profile) > models.BINDING_PROFILE_LEN:
|
||||
msg = _("binding:profile value too large")
|
||||
raise exc.InvalidInput(error_message=msg)
|
||||
port[portbindings.PROFILE] = profile
|
||||
changes = True
|
||||
|
||||
# To try to [re]bind if host is non-empty.
|
||||
if binding.host:
|
||||
self.mechanism_manager.bind_port(mech_context)
|
||||
self._update_port_dict_binding(port, binding)
|
||||
# Unbind the port if needed.
|
||||
if changes:
|
||||
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
|
||||
binding.vif_details = ''
|
||||
binding.driver = None
|
||||
binding.segment = None
|
||||
|
||||
# Update the port status if requested by the bound driver.
|
||||
if binding.segment and mech_context._new_port_status:
|
||||
# REVISIT(rkukura): This function is currently called
|
||||
# inside a transaction with the port either newly
|
||||
# created or locked for update. After the fix for bug
|
||||
# 1276391 is merged, this will no longer be true, and
|
||||
# the port status update will need to be handled in
|
||||
# the transaction that commits the new binding.
|
||||
port_db = db.get_port(mech_context._plugin_context.session,
|
||||
port['id'])
|
||||
port_db.status = mech_context._new_port_status
|
||||
port['status'] = mech_context._new_port_status
|
||||
self._update_port_dict_binding(port, binding)
|
||||
return changes
|
||||
|
||||
return ret_value
|
||||
def _bind_port_if_needed(self, context, allow_notify=False,
|
||||
need_notify=False):
|
||||
plugin_context = context._plugin_context
|
||||
port_id = context._port['id']
|
||||
|
||||
# Since the mechanism driver bind_port() calls must be made
|
||||
# outside a DB transaction locking the port state, it is
|
||||
# possible (but unlikely) that the port's state could change
|
||||
# concurrently while these calls are being made. If another
|
||||
# thread or process succeeds in binding the port before this
|
||||
# thread commits its results, the already commited results are
|
||||
# used. If attributes such as binding:host_id,
|
||||
# binding:profile, or binding:vnic_type are updated
|
||||
# concurrently, this loop retries binding using the new
|
||||
# values.
|
||||
count = 0
|
||||
while True:
|
||||
# First, determine whether it is necessary and possible to
|
||||
# bind the port.
|
||||
binding = context._binding
|
||||
if (binding.vif_type != portbindings.VIF_TYPE_UNBOUND
|
||||
or not binding.host):
|
||||
# We either don't need to bind the port, or can't, so
|
||||
# notify if needed and return.
|
||||
if allow_notify and need_notify:
|
||||
self._notify_port_updated(context)
|
||||
return context
|
||||
|
||||
# Limit binding attempts to avoid any possibility of
|
||||
# infinite looping and to ensure an error is logged
|
||||
# instead. This does not need to be tunable because no
|
||||
# more than a couple attempts should ever be required in
|
||||
# normal operation. Log at info level if not 1st attempt.
|
||||
count += 1
|
||||
if count > MAX_BIND_TRIES:
|
||||
LOG.error(_("Failed to commit binding results for %(port)s "
|
||||
"after %(max)s tries"),
|
||||
{'port': port_id, 'max': MAX_BIND_TRIES})
|
||||
return context
|
||||
if count > 1:
|
||||
greenthread.sleep(0) # yield
|
||||
LOG.info(_("Attempt %(count)s to bind port %(port)s"),
|
||||
{'count': count, 'port': port_id})
|
||||
|
||||
# The port isn't already bound and the necessary
|
||||
# information is available, so attempt to bind the port.
|
||||
bind_context = self._bind_port(context)
|
||||
|
||||
# Now try to commit result of attempting to bind the port.
|
||||
new_context, did_commit = self._commit_port_binding(
|
||||
plugin_context, port_id, binding, bind_context)
|
||||
if not new_context:
|
||||
# The port has been deleted concurrently, so just
|
||||
# return the unbound result from the initial
|
||||
# transaction that completed before the deletion.
|
||||
return context._port
|
||||
# Need to notify if we succeed and our results were
|
||||
# committed.
|
||||
if did_commit and (new_context._binding.vif_type !=
|
||||
portbindings.VIF_TYPE_BINDING_FAILED):
|
||||
need_notify = True
|
||||
context = new_context
|
||||
|
||||
def _bind_port(self, orig_context):
|
||||
# Construct a new PortContext from the one from the previous
|
||||
# transaction.
|
||||
port = orig_context._port
|
||||
orig_binding = orig_context._binding
|
||||
new_binding = models.PortBinding(
|
||||
host=orig_binding.host,
|
||||
vnic_type=orig_binding.vnic_type,
|
||||
profile=orig_binding.profile,
|
||||
vif_type=portbindings.VIF_TYPE_UNBOUND,
|
||||
vif_details=''
|
||||
)
|
||||
self._update_port_dict_binding(port, new_binding)
|
||||
new_context = driver_context.PortContext(
|
||||
self, orig_context._plugin_context, port,
|
||||
orig_context._network_context._network, new_binding)
|
||||
|
||||
# Attempt to bind the port and return the context with the
|
||||
# result.
|
||||
self.mechanism_manager.bind_port(new_context)
|
||||
return new_context
|
||||
|
||||
def _commit_port_binding(self, plugin_context, port_id, orig_binding,
|
||||
new_context):
|
||||
session = plugin_context.session
|
||||
new_binding = new_context._binding
|
||||
|
||||
# After we've attempted to bind the port, we begin a
|
||||
# transaction, get the current port state, and decide whether
|
||||
# to commit the binding results.
|
||||
#
|
||||
# REVISIT: Serialize this operation with a semaphore to
|
||||
# prevent deadlock waiting to acquire a DB lock held by
|
||||
# another thread in the same process, leading to 'lock wait
|
||||
# timeout' errors.
|
||||
with contextlib.nested(lockutils.lock('db-access'),
|
||||
session.begin(subtransactions=True)):
|
||||
# Get the current port state and build a new PortContext
|
||||
# reflecting this state as original state for subsequent
|
||||
# mechanism driver update_port_*commit() calls.
|
||||
port_db, cur_binding = db.get_locked_port_and_binding(session,
|
||||
port_id)
|
||||
if not port_db:
|
||||
# The port has been deleted concurrently.
|
||||
return
|
||||
oport = self._make_port_dict(port_db)
|
||||
port = self._make_port_dict(port_db)
|
||||
network = self.get_network(plugin_context, port['network_id'])
|
||||
cur_context = driver_context.PortContext(
|
||||
self, plugin_context, port, network, cur_binding,
|
||||
original_port=oport)
|
||||
|
||||
# Commit our binding results only if port has not been
|
||||
# successfully bound concurrently by another thread or
|
||||
# process and no binding inputs have been changed.
|
||||
commit = ((cur_binding.vif_type in
|
||||
[portbindings.VIF_TYPE_UNBOUND,
|
||||
portbindings.VIF_TYPE_BINDING_FAILED]) and
|
||||
orig_binding.host == cur_binding.host and
|
||||
orig_binding.vnic_type == cur_binding.vnic_type and
|
||||
orig_binding.profile == cur_binding.profile)
|
||||
|
||||
if commit:
|
||||
# Update the port's binding state with our binding
|
||||
# results.
|
||||
cur_binding.vif_type = new_binding.vif_type
|
||||
cur_binding.vif_details = new_binding.vif_details
|
||||
cur_binding.driver = new_binding.driver
|
||||
cur_binding.segment = new_binding.segment
|
||||
|
||||
# REVISIT(rkukura): The binding:profile attribute is
|
||||
# supposed to be input-only, but the Mellanox driver
|
||||
# currently modifies it while binding. Remove this
|
||||
# code when the Mellanox driver has been updated to
|
||||
# use binding:vif_details instead.
|
||||
if cur_binding.profile != new_binding.profile:
|
||||
cur_binding.profile = new_binding.profile
|
||||
|
||||
# Update PortContext's port dictionary to reflect the
|
||||
# updated binding state.
|
||||
self._update_port_dict_binding(port, cur_binding)
|
||||
|
||||
# Update the port status if requested by the bound driver.
|
||||
if new_binding.segment and new_context._new_port_status:
|
||||
port_db.status = new_context._new_port_status
|
||||
port['status'] = new_context._new_port_status
|
||||
|
||||
# Call the mechanism driver precommit methods, commit
|
||||
# the results, and call the postcommit methods.
|
||||
self.mechanism_manager.update_port_precommit(cur_context)
|
||||
if commit:
|
||||
self.mechanism_manager.update_port_postcommit(cur_context)
|
||||
|
||||
# Continue, using the port state as of the transaction that
|
||||
# just finished, whether that transaction committed new
|
||||
# results or discovered concurrent port state changes.
|
||||
return (cur_context, commit)
|
||||
|
||||
def _update_port_dict_binding(self, port, binding):
|
||||
port[portbindings.HOST_ID] = binding.host
|
||||
|
@ -304,15 +439,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
'port': binding.port_id})
|
||||
return {}
|
||||
|
||||
def _delete_port_binding(self, mech_context):
|
||||
binding = mech_context._binding
|
||||
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
|
||||
binding.vif_details = ''
|
||||
binding.driver = None
|
||||
binding.segment = None
|
||||
port = mech_context.current
|
||||
self._update_port_dict_binding(port, binding)
|
||||
|
||||
def _ml2_extend_port_dict_binding(self, port_res, port_db):
|
||||
# None when called during unit tests for other plugins.
|
||||
if port_db.port_binding:
|
||||
|
@ -457,9 +583,18 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
session = context.session
|
||||
while True:
|
||||
try:
|
||||
with session.begin(subtransactions=True):
|
||||
# REVISIT(rkukura): Its not clear that
|
||||
# with_lockmode('update') is really needed in this
|
||||
# transaction, and if not, the semaphore can also be
|
||||
# removed.
|
||||
#
|
||||
# REVISIT: Serialize this operation with a semaphore
|
||||
# to prevent deadlock waiting to acquire a DB lock
|
||||
# held by another thread in the same process, leading
|
||||
# to 'lock wait timeout' errors.
|
||||
with contextlib.nested(lockutils.lock('db-access'),
|
||||
session.begin(subtransactions=True)):
|
||||
self._process_l3_delete(context, id)
|
||||
|
||||
# Get ports to auto-delete.
|
||||
ports = (session.query(models_v2.Port).
|
||||
enable_eagerloads(False).
|
||||
|
@ -577,7 +712,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
LOG.debug(_("Deleting subnet %s"), id)
|
||||
session = context.session
|
||||
while True:
|
||||
with session.begin(subtransactions=True):
|
||||
# REVISIT: Serialize this operation with a semaphore to
|
||||
# prevent deadlock waiting to acquire a DB lock held by
|
||||
# another thread in the same process, leading to 'lock
|
||||
# wait timeout' errors.
|
||||
with contextlib.nested(lockutils.lock('db-access'),
|
||||
session.begin(subtransactions=True)):
|
||||
subnet = self.get_subnet(context, id)
|
||||
# Get ports to auto-deallocate
|
||||
allocated = (session.query(models_v2.IPAllocation).
|
||||
|
@ -644,8 +784,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
result = super(Ml2Plugin, self).create_port(context, port)
|
||||
self._process_port_create_security_group(context, result, sgids)
|
||||
network = self.get_network(context, result['network_id'])
|
||||
binding = db.add_port_binding(session, result['id'])
|
||||
mech_context = driver_context.PortContext(self, context, result,
|
||||
network)
|
||||
network, binding)
|
||||
self._process_port_binding(mech_context, attrs)
|
||||
result[addr_pair.ADDRESS_PAIRS] = (
|
||||
self._process_create_allowed_address_pairs(
|
||||
|
@ -662,20 +803,34 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
LOG.error(_("mechanism_manager.create_port_postcommit "
|
||||
"failed, deleting port '%s'"), result['id'])
|
||||
self.delete_port(context, result['id'])
|
||||
|
||||
# REVISIT(rkukura): Is there any point in calling this before
|
||||
# a binding has been succesfully established?
|
||||
self.notify_security_groups_member_updated(context, result)
|
||||
return result
|
||||
|
||||
try:
|
||||
bound_context = self._bind_port_if_needed(mech_context)
|
||||
except ml2_exc.MechanismDriverError:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("_bind_port_if_needed "
|
||||
"failed, deleting port '%s'"), result['id'])
|
||||
self.delete_port(context, result['id'])
|
||||
return bound_context._port
|
||||
|
||||
def update_port(self, context, id, port):
|
||||
attrs = port['port']
|
||||
need_port_update_notify = False
|
||||
|
||||
session = context.session
|
||||
with session.begin(subtransactions=True):
|
||||
try:
|
||||
port_db = (session.query(models_v2.Port).
|
||||
enable_eagerloads(False).
|
||||
filter_by(id=id).with_lockmode('update').one())
|
||||
except sa_exc.NoResultFound:
|
||||
|
||||
# REVISIT: Serialize this operation with a semaphore to
|
||||
# prevent deadlock waiting to acquire a DB lock held by
|
||||
# another thread in the same process, leading to 'lock wait
|
||||
# timeout' errors.
|
||||
with contextlib.nested(lockutils.lock('db-access'),
|
||||
session.begin(subtransactions=True)):
|
||||
port_db, binding = db.get_locked_port_and_binding(session, id)
|
||||
if not port_db:
|
||||
raise exc.PortNotFound(port_id=id)
|
||||
original_port = self._make_port_dict(port_db)
|
||||
updated_port = super(Ml2Plugin, self).update_port(context, id,
|
||||
|
@ -691,7 +846,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
need_port_update_notify |= self._update_extra_dhcp_opts_on_port(
|
||||
context, id, port, updated_port)
|
||||
mech_context = driver_context.PortContext(
|
||||
self, context, updated_port, network,
|
||||
self, context, updated_port, network, binding,
|
||||
original_port=original_port)
|
||||
need_port_update_notify |= self._process_port_binding(
|
||||
mech_context, attrs)
|
||||
|
@ -709,10 +864,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
if original_port['admin_state_up'] != updated_port['admin_state_up']:
|
||||
need_port_update_notify = True
|
||||
|
||||
if need_port_update_notify:
|
||||
self._notify_port_updated(mech_context)
|
||||
|
||||
return updated_port
|
||||
bound_port = self._bind_port_if_needed(
|
||||
mech_context,
|
||||
allow_notify=True,
|
||||
need_notify=need_port_update_notify)
|
||||
return bound_port._port
|
||||
|
||||
def delete_port(self, context, id, l3_port_check=True):
|
||||
LOG.debug(_("Deleting port %s"), id)
|
||||
|
@ -722,15 +878,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
l3plugin.prevent_l3_port_deletion(context, id)
|
||||
|
||||
session = context.session
|
||||
# REVISIT: Serialize this operation with a semaphore to prevent
|
||||
# undesired eventlet yields leading to 'lock wait timeout' errors
|
||||
# REVISIT: Serialize this operation with a semaphore to
|
||||
# prevent deadlock waiting to acquire a DB lock held by
|
||||
# another thread in the same process, leading to 'lock wait
|
||||
# timeout' errors.
|
||||
with contextlib.nested(lockutils.lock('db-access'),
|
||||
session.begin(subtransactions=True)):
|
||||
try:
|
||||
port_db = (session.query(models_v2.Port).
|
||||
enable_eagerloads(False).
|
||||
filter_by(id=id).with_lockmode('update').one())
|
||||
except sa_exc.NoResultFound:
|
||||
port_db, binding = db.get_locked_port_and_binding(session, id)
|
||||
if not port_db:
|
||||
# the port existed when l3plugin.prevent_l3_port_deletion
|
||||
# was called but now is already gone
|
||||
LOG.debug(_("The port '%s' was deleted"), id)
|
||||
|
@ -739,7 +894,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
|
||||
network = self.get_network(context, port['network_id'])
|
||||
mech_context = driver_context.PortContext(self, context, port,
|
||||
network)
|
||||
network, binding)
|
||||
self.mechanism_manager.delete_port_precommit(mech_context)
|
||||
self._delete_port_security_group_bindings(context, id)
|
||||
LOG.debug(_("Calling base delete_port"))
|
||||
|
@ -762,11 +917,34 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
LOG.error(_("mechanism_manager.delete_port_postcommit failed"))
|
||||
self.notify_security_groups_member_updated(context, port)
|
||||
|
||||
def get_bound_port_context(self, plugin_context, port_id):
|
||||
session = plugin_context.session
|
||||
with session.begin(subtransactions=True):
|
||||
try:
|
||||
port_db = (session.query(models_v2.Port).
|
||||
enable_eagerloads(False).
|
||||
filter(models_v2.Port.id.startswith(port_id)).
|
||||
one())
|
||||
except sa_exc.NoResultFound:
|
||||
return
|
||||
except exc.MultipleResultsFound:
|
||||
LOG.error(_("Multiple ports have port_id starting with %s"),
|
||||
port_id)
|
||||
return
|
||||
port = self._make_port_dict(port_db)
|
||||
network = self.get_network(plugin_context, port['network_id'])
|
||||
port_context = driver_context.PortContext(
|
||||
self, plugin_context, port, network, port_db.port_binding)
|
||||
|
||||
return self._bind_port_if_needed(port_context)
|
||||
|
||||
def update_port_status(self, context, port_id, status):
|
||||
updated = False
|
||||
session = context.session
|
||||
# REVISIT: Serialize this operation with a semaphore to prevent
|
||||
# undesired eventlet yields leading to 'lock wait timeout' errors
|
||||
# REVISIT: Serialize this operation with a semaphore to
|
||||
# prevent deadlock waiting to acquire a DB lock held by
|
||||
# another thread in the same process, leading to 'lock wait
|
||||
# timeout' errors.
|
||||
with contextlib.nested(lockutils.lock('db-access'),
|
||||
session.begin(subtransactions=True)):
|
||||
port = db.get_port(session, port_id)
|
||||
|
@ -781,7 +959,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
network = self.get_network(context,
|
||||
original_port['network_id'])
|
||||
mech_context = driver_context.PortContext(
|
||||
self, context, updated_port, network,
|
||||
self, context, updated_port, network, port.port_binding,
|
||||
original_port=original_port)
|
||||
self.mechanism_manager.update_port_precommit(mech_context)
|
||||
updated = True
|
||||
|
|
|
@ -17,9 +17,9 @@ from neutron.agent import securitygroups_rpc as sg_rpc
|
|||
from neutron.common import constants as q_const
|
||||
from neutron.common import rpc as n_rpc
|
||||
from neutron.common import topics
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db import dhcp_rpc_base
|
||||
from neutron.db import securitygroups_rpc_base as sg_db_rpc
|
||||
from neutron.extensions import portbindings
|
||||
from neutron import manager
|
||||
from neutron.openstack.common import log
|
||||
from neutron.openstack.common import uuidutils
|
||||
|
@ -83,64 +83,43 @@ class RpcCallbacks(n_rpc.RpcCallback,
|
|||
{'device': device, 'agent_id': agent_id})
|
||||
port_id = self._device_to_port_id(device)
|
||||
|
||||
session = db_api.get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
port = db.get_port(session, port_id)
|
||||
if not port:
|
||||
LOG.warning(_("Device %(device)s requested by agent "
|
||||
"%(agent_id)s not found in database"),
|
||||
{'device': device, 'agent_id': agent_id})
|
||||
return {'device': device}
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
port_context = plugin.get_bound_port_context(rpc_context, port_id)
|
||||
if not port_context:
|
||||
LOG.warning(_("Device %(device)s requested by agent "
|
||||
"%(agent_id)s not found in database"),
|
||||
{'device': device, 'agent_id': agent_id})
|
||||
return {'device': device}
|
||||
|
||||
segments = db.get_network_segments(session, port.network_id)
|
||||
if not segments:
|
||||
LOG.warning(_("Device %(device)s requested by agent "
|
||||
"%(agent_id)s has network %(network_id)s with "
|
||||
"no segments"),
|
||||
{'device': device,
|
||||
'agent_id': agent_id,
|
||||
'network_id': port.network_id})
|
||||
return {'device': device}
|
||||
segment = port_context.bound_segment
|
||||
port = port_context.current
|
||||
|
||||
binding = db.ensure_port_binding(session, port.id)
|
||||
if not binding.segment:
|
||||
LOG.warning(_("Device %(device)s requested by agent "
|
||||
"%(agent_id)s on network %(network_id)s not "
|
||||
"bound, vif_type: %(vif_type)s"),
|
||||
{'device': device,
|
||||
'agent_id': agent_id,
|
||||
'network_id': port.network_id,
|
||||
'vif_type': binding.vif_type})
|
||||
return {'device': device}
|
||||
if not segment:
|
||||
LOG.warning(_("Device %(device)s requested by agent "
|
||||
"%(agent_id)s on network %(network_id)s not "
|
||||
"bound, vif_type: %(vif_type)s"),
|
||||
{'device': device,
|
||||
'agent_id': agent_id,
|
||||
'network_id': port['network_id'],
|
||||
'vif_type': port[portbindings.VIF_TYPE]})
|
||||
return {'device': device}
|
||||
|
||||
segment = self._find_segment(segments, binding.segment)
|
||||
if not segment:
|
||||
LOG.warning(_("Device %(device)s requested by agent "
|
||||
"%(agent_id)s on network %(network_id)s "
|
||||
"invalid segment, vif_type: %(vif_type)s"),
|
||||
{'device': device,
|
||||
'agent_id': agent_id,
|
||||
'network_id': port.network_id,
|
||||
'vif_type': binding.vif_type})
|
||||
return {'device': device}
|
||||
new_status = (q_const.PORT_STATUS_BUILD if port['admin_state_up']
|
||||
else q_const.PORT_STATUS_DOWN)
|
||||
if port['status'] != new_status:
|
||||
plugin.update_port_status(rpc_context,
|
||||
port_id,
|
||||
new_status)
|
||||
|
||||
new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up
|
||||
else q_const.PORT_STATUS_DOWN)
|
||||
if port.status != new_status:
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
plugin.update_port_status(rpc_context,
|
||||
port_id,
|
||||
new_status)
|
||||
port.status = new_status
|
||||
entry = {'device': device,
|
||||
'network_id': port.network_id,
|
||||
'port_id': port.id,
|
||||
'admin_state_up': port.admin_state_up,
|
||||
'network_type': segment[api.NETWORK_TYPE],
|
||||
'segmentation_id': segment[api.SEGMENTATION_ID],
|
||||
'physical_network': segment[api.PHYSICAL_NETWORK]}
|
||||
LOG.debug(_("Returning: %s"), entry)
|
||||
return entry
|
||||
entry = {'device': device,
|
||||
'network_id': port['network_id'],
|
||||
'port_id': port_id,
|
||||
'admin_state_up': port['admin_state_up'],
|
||||
'network_type': segment[api.NETWORK_TYPE],
|
||||
'segmentation_id': segment[api.SEGMENTATION_ID],
|
||||
'physical_network': segment[api.PHYSICAL_NETWORK]}
|
||||
LOG.debug(_("Returning: %s"), entry)
|
||||
return entry
|
||||
|
||||
def get_devices_details_list(self, rpc_context, **kwargs):
|
||||
return [
|
||||
|
@ -152,11 +131,6 @@ class RpcCallbacks(n_rpc.RpcCallback,
|
|||
for device in kwargs.pop('devices', [])
|
||||
]
|
||||
|
||||
def _find_segment(self, segments, segment_id):
|
||||
for segment in segments:
|
||||
if segment[api.ID] == segment_id:
|
||||
return segment
|
||||
|
||||
def update_device_down(self, rpc_context, **kwargs):
|
||||
"""Device no longer exists on agent."""
|
||||
# TODO(garyk) - live migration and port status
|
||||
|
|
|
@ -19,7 +19,6 @@ import mock
|
|||
import webob.exc as wexc
|
||||
|
||||
from neutron.api.v2 import base
|
||||
from neutron.common import constants as n_const
|
||||
from neutron import context
|
||||
from neutron.extensions import portbindings
|
||||
from neutron import manager
|
||||
|
@ -123,15 +122,33 @@ class CiscoML2MechanismTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
|
|||
new_callable=mock.PropertyMock).start()
|
||||
self.mock_original_bound_segment.return_value = None
|
||||
|
||||
mock_status = mock.patch.object(
|
||||
# Use _is_status_active method to determine bind state.
|
||||
def _mock_check_bind_state(port_context):
|
||||
if (port_context[portbindings.VIF_TYPE] !=
|
||||
portbindings.VIF_TYPE_UNBOUND):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
self.mock_status = mock.patch.object(
|
||||
mech_cisco_nexus.CiscoNexusMechanismDriver,
|
||||
'_is_status_active').start()
|
||||
mock_status.return_value = n_const.PORT_STATUS_ACTIVE
|
||||
self.mock_status.side_effect = _mock_check_bind_state
|
||||
|
||||
super(CiscoML2MechanismTestCase, self).setUp(ML2_PLUGIN)
|
||||
|
||||
self.port_create_status = 'DOWN'
|
||||
|
||||
def _create_deviceowner_mock(self):
|
||||
# Mock deviceowner method for UT's that expect update precommit
|
||||
# failures. This allows control of delete_port_pre/postcommit()
|
||||
# actions.
|
||||
mock_deviceowner = mock.patch.object(
|
||||
mech_cisco_nexus.CiscoNexusMechanismDriver,
|
||||
'_is_deviceowner_compute').start()
|
||||
mock_deviceowner.return_value = False
|
||||
self.addCleanup(mock_deviceowner.stop)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _patch_ncclient(self, attr, value):
|
||||
"""Configure an attribute on the mock ncclient module.
|
||||
|
@ -223,7 +240,8 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase,
|
|||
@contextlib.contextmanager
|
||||
def _create_resources(self, name=NETWORK_NAME, cidr=CIDR_1,
|
||||
device_id=DEVICE_ID_1,
|
||||
host_id=COMP_HOST_NAME):
|
||||
host_id=COMP_HOST_NAME,
|
||||
expected_failure=False):
|
||||
"""Create network, subnet, and port resources for test cases.
|
||||
|
||||
Create a network, subnet, port and then update the port, yield the
|
||||
|
@ -233,18 +251,23 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase,
|
|||
:param cidr: cidr address of subnetwork to be created.
|
||||
:param device_id: Device ID to use for port to be created/updated.
|
||||
:param host_id: Host ID to use for port create/update.
|
||||
|
||||
:param expected_failure: Set to True when an update_port_precommit
|
||||
failure is expected. Results in no actions being taken in
|
||||
delete_port_pre/postcommit() methods.
|
||||
"""
|
||||
with self.network(name=name) as network:
|
||||
with self.subnet(network=network, cidr=cidr) as subnet:
|
||||
with self.port(subnet=subnet, cidr=cidr) as port:
|
||||
|
||||
data = {'port': {portbindings.HOST_ID: host_id,
|
||||
'device_id': device_id,
|
||||
'device_owner': 'compute:none',
|
||||
'device_owner': DEVICE_OWNER,
|
||||
'admin_state_up': True}}
|
||||
req = self.new_update_request('ports', data,
|
||||
port['port']['id'])
|
||||
yield req.get_response(self.api)
|
||||
if expected_failure:
|
||||
self._create_deviceowner_mock()
|
||||
|
||||
def _assertExpectedHTTP(self, status, exc):
|
||||
"""Confirm that an HTTP status corresponds to an expected exception.
|
||||
|
@ -578,7 +601,8 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase,
|
|||
a fictitious host name during port creation.
|
||||
|
||||
"""
|
||||
with self._create_resources(host_id='fake_host') as result:
|
||||
with self._create_resources(host_id='fake_host',
|
||||
expected_failure=True) as result:
|
||||
self._assertExpectedHTTP(result.status_int,
|
||||
c_exc.NexusComputeHostNotConfigured)
|
||||
|
||||
|
@ -586,10 +610,11 @@ class TestCiscoPortsV2(CiscoML2MechanismTestCase,
|
|||
"""Test handling of a NexusMissingRequiredFields exception.
|
||||
|
||||
Test the Cisco NexusMissingRequiredFields exception by using
|
||||
empty host_id and device_id values during port creation.
|
||||
empty device_id value during port creation.
|
||||
|
||||
"""
|
||||
with self._create_resources(device_id='', host_id='') as result:
|
||||
with self._create_resources(device_id='',
|
||||
expected_failure=True) as result:
|
||||
self._assertExpectedHTTP(result.status_int,
|
||||
c_exc.NexusMissingRequiredFields)
|
||||
|
||||
|
|
|
@ -74,7 +74,8 @@ class FakePortContext(object):
|
|||
'status': PORT_STATE,
|
||||
'device_id': device_id,
|
||||
'device_owner': DEVICE_OWNER,
|
||||
portbindings.HOST_ID: host_name
|
||||
portbindings.HOST_ID: host_name,
|
||||
portbindings.VIF_TYPE: portbindings.VIF_TYPE_OVS
|
||||
}
|
||||
self._network = network_context
|
||||
self._segment = network_context.network_segments
|
||||
|
|
|
@ -142,17 +142,7 @@ class TestMechanismDriver(api.MechanismDriver):
|
|||
self._check_port_context(context, False)
|
||||
|
||||
def bind_port(self, context):
|
||||
# REVISIT(rkukura): The upcoming fix for bug 1276391 will
|
||||
# ensure the MDs see the unbinding of the port as a port
|
||||
# update prior to re-binding, at which point this should be
|
||||
# removed.
|
||||
self.bound_ports.discard(context.current['id'])
|
||||
|
||||
# REVISIT(rkukura): Currently, bind_port() is called as part
|
||||
# of either a create or update transaction. The fix for bug
|
||||
# 1276391 will change it to be called outside any transaction,
|
||||
# so the context.original* will no longer be available.
|
||||
self._check_port_context(context, context.original is not None)
|
||||
self._check_port_context(context, False)
|
||||
|
||||
host = context.current.get(portbindings.HOST_ID, None)
|
||||
segment = context.network.network_segments[0][api.ID]
|
||||
|
|
|
@ -112,7 +112,7 @@ class TestBigSwitchMechDriverPortsV2(test_db_plugin.TestPortsV2,
|
|||
|
||||
def test_udpate404_triggers_background_sync(self):
|
||||
with contextlib.nested(
|
||||
mock.patch(SERVER_POOL + '.rest_update_port',
|
||||
mock.patch(DRIVER + '.async_port_create',
|
||||
side_effect=servermanager.RemoteRestError(
|
||||
reason=servermanager.NXNETWORK, status=404)),
|
||||
mock.patch(DRIVER + '._send_all_data'),
|
||||
|
|
Loading…
Reference in New Issue