Add the rebinding chance in _bind_port_if_needed

Make function _bind_port_if_needed to bind at least one time when the port's
binding status passed in is already in binding_failed.

This is the second attempt to introduce the patch (the first one was
reverted due to regression that broke Ironic), now with proper
notification sent even when binding attempt failed.

The patch also fixes several cases when we attempted to notify with a
binding context that was not committed into database.

The patch changes _attempt_binding to call _commit_port_binding only
with the binding final state:
1. Successful binding: will just call _commit_port_binding.
2. Unsuccessful binding: will call _commit_port_binding at the final
attempt to bind the port.
This is in order to refrain from reverts, with will really complicate
things even  more.

Co-Authored-By: Yalei Wang <yalei.wang@intel.com>
Co-Authored-By: Nir Magnezi <nmagnezi@redhat.com>
Co-Authored-By: John Schwarz <jschwarz@redhat.com>
Change-Id: I437290affd8eb87177d0626bf7935a165859cbdd
Closes-Bug: #1399249
(cherry picked from commit 63fe3a418c)
This commit is contained in:
Ihar Hrachyshka 2015-03-09 18:05:18 +01:00 committed by Rossella Sblendido
parent 30cbef504f
commit f860bdd4b4
8 changed files with 252 additions and 66 deletions

View File

@ -264,18 +264,32 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _bind_port_if_needed(self, context, allow_notify=False,
need_notify=False):
# Binding limit does not need to be tunable because no
# more than a couple of attempts should ever be required in
# normal operation.
for count in range(1, MAX_BIND_TRIES + 1):
if count > 1:
# yield for binding retries so that we give other threads a
# chance to do their work
greenthread.sleep(0)
# multiple attempts shouldn't happen very often so we log each
# attempt after the 1st.
greenthread.sleep(0) # yield
LOG.info(_LI("Attempt %(count)s to bind port %(port)s"),
{'count': count, 'port': context.current['id']})
context, need_notify, try_again = self._attempt_binding(
bind_context, need_notify, try_again = self._attempt_binding(
context, need_notify)
if count == MAX_BIND_TRIES or not try_again:
if self._should_bind_port(context):
# At this point, we attempted to bind a port and reached
# its final binding state. Binding either succeeded or
# exhausted all attempts, thus no need to try again.
# Now, the port and its binding state should be committed.
context, need_notify, try_again = (
self._commit_port_binding(context, bind_context,
need_notify, try_again))
else:
context = bind_context
if not try_again:
if allow_notify and need_notify:
self._notify_port_updated(context)
@ -286,50 +300,26 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
{'port': context.current['id'], 'max': MAX_BIND_TRIES})
return context
def _attempt_binding(self, context, need_notify):
# Since the mechanism driver bind_port() calls must be made
# outside a DB transaction locking the port state, it is
# possible (but unlikely) that the port's state could change
# concurrently while these calls are being made. If another
# thread or process succeeds in binding the port before this
# thread commits its results, the already committed results are
# used. If attributes such as binding:host_id,
# binding:profile, or binding:vnic_type are updated
# concurrently, the try_again flag is returned to indicate that
# the commit was unsuccessful.
plugin_context = context._plugin_context
port_id = context.current['id']
binding = context._binding
try_again = False
# First, determine whether it is necessary and possible to
# bind the port.
if (binding.vif_type != portbindings.VIF_TYPE_UNBOUND
or not binding.host):
# We either don't need to bind the port or can't
return context, need_notify, try_again
def _should_bind_port(self, context):
return (context._binding.host and context._binding.vif_type
in (portbindings.VIF_TYPE_UNBOUND,
portbindings.VIF_TYPE_BINDING_FAILED))
# The port isn't already bound and the necessary
# information is available, so attempt to bind the port.
bind_context = self._bind_port(context)
# Now try to commit result of attempting to bind the port.
new_context, did_commit = self._commit_port_binding(
plugin_context, port_id, binding, bind_context)
if not new_context:
# The port has been deleted concurrently, so just
# return the unbound result from the initial
# transaction that completed before the deletion.
LOG.debug("Port %s has been deleted concurrently",
port_id)
need_notify = False
return context, need_notify, try_again
# Need to notify if we succeed and our results were
# committed.
if did_commit and (new_context._binding.vif_type !=
portbindings.VIF_TYPE_BINDING_FAILED):
need_notify = True
return new_context, need_notify, try_again
try_again = True
return new_context, need_notify, try_again
def _attempt_binding(self, context, need_notify):
try_again = False
if self._should_bind_port(context):
bind_context = self._bind_port(context)
if bind_context.vif_type != portbindings.VIF_TYPE_BINDING_FAILED:
# Binding succeeded. Suggest notifying of successful binding.
need_notify = True
else:
# Current attempt binding failed, try to bind again.
try_again = True
context = bind_context
return context, need_notify, try_again
def _bind_port(self, orig_context):
# Construct a new PortContext from the one from the previous
@ -353,10 +343,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.mechanism_manager.bind_port(new_context)
return new_context
def _commit_port_binding(self, plugin_context, port_id, orig_binding,
new_context):
def _commit_port_binding(self, orig_context, bind_context,
need_notify, try_again):
port_id = orig_context.current['id']
plugin_context = orig_context._plugin_context
session = plugin_context.session
new_binding = new_context._binding
orig_binding = orig_context._binding
new_binding = bind_context._binding
# After we've attempted to bind the port, we begin a
# transaction, get the current port state, and decide whether
@ -367,12 +360,24 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# mechanism driver update_port_*commit() calls.
port_db, cur_binding = db.get_locked_port_and_binding(session,
port_id)
# Since the mechanism driver bind_port() calls must be made
# outside a DB transaction locking the port state, it is
# possible (but unlikely) that the port's state could change
# concurrently while these calls are being made. If another
# thread or process succeeds in binding the port before this
# thread commits its results, the already committed results are
# used. If attributes such as binding:host_id, binding:profile,
# or binding:vnic_type are updated concurrently, the try_again
# flag is returned to indicate that the commit was unsuccessful.
if not port_db:
# The port has been deleted concurrently.
return (None, False)
# The port has been deleted concurrently, so just
# return the unbound result from the initial
# transaction that completed before the deletion.
LOG.debug("Port %s has been deleted concurrently", port_id)
return orig_context, False, False
oport = self._make_port_dict(port_db)
port = self._make_port_dict(port_db)
network = new_context.network.current
network = bind_context.network.current
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
# REVISIT(rkukura): The PortBinding instance from the
# ml2_port_bindings table, returned as cur_binding
@ -416,29 +421,34 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
cur_binding.vif_type = new_binding.vif_type
cur_binding.vif_details = new_binding.vif_details
db.clear_binding_levels(session, port_id, cur_binding.host)
db.set_binding_levels(session, new_context._binding_levels)
cur_context._binding_levels = new_context._binding_levels
db.set_binding_levels(session, bind_context._binding_levels)
cur_context._binding_levels = bind_context._binding_levels
# Update PortContext's port dictionary to reflect the
# updated binding state.
self._update_port_dict_binding(port, cur_binding)
# Update the port status if requested by the bound driver.
if (new_context._binding_levels and
new_context._new_port_status):
port_db.status = new_context._new_port_status
port['status'] = new_context._new_port_status
if (bind_context._binding_levels and
bind_context._new_port_status):
port_db.status = bind_context._new_port_status
port['status'] = bind_context._new_port_status
# Call the mechanism driver precommit methods, commit
# the results, and call the postcommit methods.
self.mechanism_manager.update_port_precommit(cur_context)
if commit:
# Continue, using the port state as of the transaction that
# just finished, whether that transaction committed new
# results or discovered concurrent port state changes.
# Also, Trigger notification for successful binding commit.
self.mechanism_manager.update_port_postcommit(cur_context)
need_notify = True
try_again = False
else:
try_again = True
# Continue, using the port state as of the transaction that
# just finished, whether that transaction committed new
# results or discovered concurrent port state changes.
return (cur_context, commit)
return cur_context, need_notify, try_again
def _update_port_dict_binding(self, port, binding):
port[portbindings.VNIC_TYPE] = binding.vnic_type

View File

@ -113,6 +113,13 @@ def kill_agent(agent_id):
'heartbeat_timestamp': hour_ago}})
def revive_agent(agent_id):
now = timeutils.utcnow()
FakePlugin().update_agent(
context.get_admin_context(), agent_id,
{'agent': {'started_at': now, 'heartbeat_timestamp': now}})
def set_agent_admin_state(agent_id, admin_state_up=False):
FakePlugin().update_agent(
context.get_admin_context(),

View File

@ -0,0 +1,71 @@
# Copyright (c) 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import constants
from neutron import context
from neutron.db import agents_db
from neutron.extensions import portbindings
from neutron.tests.common import helpers
from neutron.tests.unit.plugins.ml2 import base as ml2_test_base
DEVICE_OWNER_COMPUTE = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
class TestMl2PortBinding(ml2_test_base.ML2TestFramework,
agents_db.AgentDbMixin):
def setUp(self):
super(TestMl2PortBinding, self).setUp()
self.admin_context = context.get_admin_context()
self.host_args = {portbindings.HOST_ID: helpers.HOST,
'admin_state_up': True}
def test_port_bind_successfully(self):
helpers.register_ovs_agent(host=helpers.HOST)
with self.network() as network:
with self.subnet(network=network) as subnet:
with self.port(
subnet=subnet, device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID, 'admin_state_up',),
**self.host_args) as port:
# Note: Port creation invokes _bind_port_if_needed(),
# therefore it is all we need in order to test a successful
# binding
self.assertEqual(port['port']['binding:vif_type'],
portbindings.VIF_TYPE_OVS)
def test_port_bind_retry(self):
agent = helpers.register_ovs_agent(host=helpers.HOST)
helpers.kill_agent(agent_id=agent.id)
with self.network() as network:
with self.subnet(network=network) as subnet:
with self.port(
subnet=subnet, device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID, 'admin_state_up',),
**self.host_args) as port:
# Since the agent is dead, expect binding to fail
self.assertEqual(port['port']['binding:vif_type'],
portbindings.VIF_TYPE_BINDING_FAILED)
helpers.revive_agent(agent.id)
# When an agent starts, The RPC call get_device_details()
# will invoke get_bound_port_context() which eventually use
# _bind_port_if_needed()
bound_context = self.plugin.get_bound_port_context(
self.admin_context, port['port']['id'], helpers.HOST)
# Since the agent is back online, expect binding to succeed
self.assertEqual(bound_context.vif_type,
portbindings.VIF_TYPE_OVS)
self.assertEqual(bound_context.current['binding:vif_type'],
portbindings.VIF_TYPE_OVS)

View File

@ -187,8 +187,9 @@ class TestMechanismDriver(api.MechanismDriver):
self._check_port_context(context, False)
def update_port_precommit(self, context):
if (context.original_top_bound_segment and
not context.top_bound_segment):
if ((context.original_top_bound_segment and
not context.top_bound_segment) or
(context.host == "host-fail")):
self.bound_ports.remove((context.original['id'],
context.original_host))
self._check_port_context(context, True)
@ -234,3 +235,8 @@ class TestMechanismDriver(api.MechanismDriver):
portbindings.VIF_TYPE_OVS,
{portbindings.CAP_PORT_FILTER: False})
self.bound_ports.add((context.current['id'], host))
elif host == "host-fail":
context.set_binding(None,
portbindings.VIF_TYPE_BINDING_FAILED,
{portbindings.CAP_PORT_FILTER: False})
self.bound_ports.add((context.current['id'], host))

View File

@ -950,6 +950,92 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
# should have returned before calling _make_port_dict
self.assertFalse(mpd_mock.mock_calls)
def _create_port_and_bound_context(self, port_vif_type, bound_vif_type):
with self.port() as port:
plugin = manager.NeutronManager.get_plugin()
binding = ml2_db.get_locked_port_and_binding(self.context.session,
port['port']['id'])[1]
binding['host'] = 'fake_host'
binding['vif_type'] = port_vif_type
# Generates port context to be used before the bind.
port_context = driver_context.PortContext(
plugin, self.context, port['port'],
plugin.get_network(self.context, port['port']['network_id']),
binding, None)
bound_context = mock.MagicMock()
# Bound context is how port_context is expected to look
# after _bind_port.
bound_context.vif_type = bound_vif_type
return plugin, port_context, bound_context
def test__attempt_binding(self):
# Simulate a successful binding for vif_type unbound
# and keep the same binding state for other vif types.
vif_types = [(portbindings.VIF_TYPE_BINDING_FAILED,
portbindings.VIF_TYPE_BINDING_FAILED),
(portbindings.VIF_TYPE_UNBOUND,
portbindings.VIF_TYPE_OVS),
(portbindings.VIF_TYPE_OVS,
portbindings.VIF_TYPE_OVS)]
for port_vif_type, bound_vif_type in vif_types:
plugin, port_context, bound_context = (
self._create_port_and_bound_context(port_vif_type,
bound_vif_type))
with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin._bind_port',
return_value=bound_context) as bd_mock:
context, need_notify, try_again = (plugin._attempt_binding(
port_context, False))
expected_need_notify = port_vif_type not in (
portbindings.VIF_TYPE_BINDING_FAILED,
portbindings.VIF_TYPE_OVS)
if bound_vif_type == portbindings.VIF_TYPE_BINDING_FAILED:
expected_vif_type = port_vif_type
expected_try_again = True
expected_bd_mock_called = True
else:
expected_vif_type = portbindings.VIF_TYPE_OVS
expected_try_again = False
expected_bd_mock_called = (port_vif_type ==
portbindings.VIF_TYPE_UNBOUND)
self.assertEqual(expected_need_notify, need_notify)
self.assertEqual(expected_vif_type, context.vif_type)
self.assertEqual(expected_try_again, try_again)
self.assertEqual(expected_bd_mock_called, bd_mock.called)
def test__attempt_binding_retries(self):
# Simulate cases of both successful and failed binding states for
# vif_type unbound
vif_types = [(portbindings.VIF_TYPE_UNBOUND,
portbindings.VIF_TYPE_BINDING_FAILED),
(portbindings.VIF_TYPE_UNBOUND,
portbindings.VIF_TYPE_OVS)]
for port_vif_type, bound_vif_type in vif_types:
plugin, port_context, bound_context = (
self._create_port_and_bound_context(port_vif_type,
bound_vif_type))
with mock.patch(
'neutron.plugins.ml2.plugin.Ml2Plugin._bind_port',
return_value=bound_context),\
mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin._commit_'
'port_binding',
return_value=(bound_context, True, False)),\
mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin.'
'_attempt_binding',
side_effect=plugin._attempt_binding) as at_mock:
plugin._bind_port_if_needed(port_context)
if bound_vif_type == portbindings.VIF_TYPE_BINDING_FAILED:
# An unsuccessful binding attempt should be retried
# MAX_BIND_TRIES amount of times.
self.assertEqual(ml2_plugin.MAX_BIND_TRIES,
at_mock.call_count)
else:
# Successful binding should only be attempted once.
self.assertEqual(1, at_mock.call_count)
def test_port_binding_profile_not_changed(self):
profile = {'e': 5}
profile_arg = {portbindings.PROFILE: profile}

View File

@ -0,0 +1,6 @@
---
prelude: >
ML2: ports can now recover from binding failed state.
features:
- Ports that failed to bind when an L2 agent was offline can now recover after
the agent is back online.