Merge "Add the rebinding chance in _bind_port_if_needed"

This commit is contained in:
Jenkins
2016-02-15 17:13:59 +00:00
committed by Gerrit Code Review
8 changed files with 252 additions and 66 deletions

View File

@@ -301,18 +301,32 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _bind_port_if_needed(self, context, allow_notify=False,
need_notify=False):
# Binding limit does not need to be tunable because no
# more than a couple of attempts should ever be required in
# normal operation.
for count in range(1, MAX_BIND_TRIES + 1):
if count > 1:
# yield for binding retries so that we give other threads a
# chance to do their work
greenthread.sleep(0)
# multiple attempts shouldn't happen very often so we log each
# attempt after the 1st.
greenthread.sleep(0) # yield
LOG.info(_LI("Attempt %(count)s to bind port %(port)s"),
{'count': count, 'port': context.current['id']})
context, need_notify, try_again = self._attempt_binding(
bind_context, need_notify, try_again = self._attempt_binding(
context, need_notify)
if count == MAX_BIND_TRIES or not try_again:
if self._should_bind_port(context):
# At this point, we attempted to bind a port and reached
# its final binding state. Binding either succeeded or
# exhausted all attempts, thus no need to try again.
# Now, the port and its binding state should be committed.
context, need_notify, try_again = (
self._commit_port_binding(context, bind_context,
need_notify, try_again))
else:
context = bind_context
if not try_again:
if allow_notify and need_notify:
self._notify_port_updated(context)
@@ -323,50 +337,26 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
{'port': context.current['id'], 'max': MAX_BIND_TRIES})
return context
def _attempt_binding(self, context, need_notify):
# Since the mechanism driver bind_port() calls must be made
# outside a DB transaction locking the port state, it is
# possible (but unlikely) that the port's state could change
# concurrently while these calls are being made. If another
# thread or process succeeds in binding the port before this
# thread commits its results, the already committed results are
# used. If attributes such as binding:host_id,
# binding:profile, or binding:vnic_type are updated
# concurrently, the try_again flag is returned to indicate that
# the commit was unsuccessful.
plugin_context = context._plugin_context
port_id = context.current['id']
binding = context._binding
try_again = False
# First, determine whether it is necessary and possible to
# bind the port.
if (binding.vif_type != portbindings.VIF_TYPE_UNBOUND
or not binding.host):
# We either don't need to bind the port or can't
return context, need_notify, try_again
def _should_bind_port(self, context):
return (context._binding.host and context._binding.vif_type
in (portbindings.VIF_TYPE_UNBOUND,
portbindings.VIF_TYPE_BINDING_FAILED))
# The port isn't already bound and the necessary
# information is available, so attempt to bind the port.
bind_context = self._bind_port(context)
# Now try to commit result of attempting to bind the port.
new_context, did_commit = self._commit_port_binding(
plugin_context, port_id, binding, bind_context)
if not new_context:
# The port has been deleted concurrently, so just
# return the unbound result from the initial
# transaction that completed before the deletion.
LOG.debug("Port %s has been deleted concurrently",
port_id)
need_notify = False
return context, need_notify, try_again
# Need to notify if we succeed and our results were
# committed.
if did_commit and (new_context._binding.vif_type !=
portbindings.VIF_TYPE_BINDING_FAILED):
need_notify = True
return new_context, need_notify, try_again
try_again = True
return new_context, need_notify, try_again
def _attempt_binding(self, context, need_notify):
try_again = False
if self._should_bind_port(context):
bind_context = self._bind_port(context)
if bind_context.vif_type != portbindings.VIF_TYPE_BINDING_FAILED:
# Binding succeeded. Suggest notifying of successful binding.
need_notify = True
else:
# Current attempt binding failed, try to bind again.
try_again = True
context = bind_context
return context, need_notify, try_again
def _bind_port(self, orig_context):
# Construct a new PortContext from the one from the previous
@@ -390,10 +380,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.mechanism_manager.bind_port(new_context)
return new_context
def _commit_port_binding(self, plugin_context, port_id, orig_binding,
new_context):
def _commit_port_binding(self, orig_context, bind_context,
need_notify, try_again):
port_id = orig_context.current['id']
plugin_context = orig_context._plugin_context
session = plugin_context.session
new_binding = new_context._binding
orig_binding = orig_context._binding
new_binding = bind_context._binding
# After we've attempted to bind the port, we begin a
# transaction, get the current port state, and decide whether
@@ -404,12 +397,24 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# mechanism driver update_port_*commit() calls.
port_db, cur_binding = db.get_locked_port_and_binding(session,
port_id)
# Since the mechanism driver bind_port() calls must be made
# outside a DB transaction locking the port state, it is
# possible (but unlikely) that the port's state could change
# concurrently while these calls are being made. If another
# thread or process succeeds in binding the port before this
# thread commits its results, the already committed results are
# used. If attributes such as binding:host_id, binding:profile,
# or binding:vnic_type are updated concurrently, the try_again
# flag is returned to indicate that the commit was unsuccessful.
if not port_db:
# The port has been deleted concurrently.
return (None, False)
# The port has been deleted concurrently, so just
# return the unbound result from the initial
# transaction that completed before the deletion.
LOG.debug("Port %s has been deleted concurrently", port_id)
return orig_context, False, False
oport = self._make_port_dict(port_db)
port = self._make_port_dict(port_db)
network = new_context.network.current
network = bind_context.network.current
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
# REVISIT(rkukura): The PortBinding instance from the
# ml2_port_bindings table, returned as cur_binding
@@ -453,29 +458,34 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
cur_binding.vif_type = new_binding.vif_type
cur_binding.vif_details = new_binding.vif_details
db.clear_binding_levels(session, port_id, cur_binding.host)
db.set_binding_levels(session, new_context._binding_levels)
cur_context._binding_levels = new_context._binding_levels
db.set_binding_levels(session, bind_context._binding_levels)
cur_context._binding_levels = bind_context._binding_levels
# Update PortContext's port dictionary to reflect the
# updated binding state.
self._update_port_dict_binding(port, cur_binding)
# Update the port status if requested by the bound driver.
if (new_context._binding_levels and
new_context._new_port_status):
port_db.status = new_context._new_port_status
port['status'] = new_context._new_port_status
if (bind_context._binding_levels and
bind_context._new_port_status):
port_db.status = bind_context._new_port_status
port['status'] = bind_context._new_port_status
# Call the mechanism driver precommit methods, commit
# the results, and call the postcommit methods.
self.mechanism_manager.update_port_precommit(cur_context)
if commit:
# Continue, using the port state as of the transaction that
# just finished, whether that transaction committed new
# results or discovered concurrent port state changes.
# Also, Trigger notification for successful binding commit.
self.mechanism_manager.update_port_postcommit(cur_context)
need_notify = True
try_again = False
else:
try_again = True
# Continue, using the port state as of the transaction that
# just finished, whether that transaction committed new
# results or discovered concurrent port state changes.
return (cur_context, commit)
return cur_context, need_notify, try_again
def _update_port_dict_binding(self, port, binding):
port[portbindings.VNIC_TYPE] = binding.vnic_type

View File

@@ -142,6 +142,13 @@ def kill_agent(agent_id):
'heartbeat_timestamp': hour_ago}})
def revive_agent(agent_id):
now = timeutils.utcnow()
FakePlugin().update_agent(
context.get_admin_context(), agent_id,
{'agent': {'started_at': now, 'heartbeat_timestamp': now}})
def set_agent_admin_state(agent_id, admin_state_up=False):
FakePlugin().update_agent(
context.get_admin_context(),

View File

@@ -0,0 +1,71 @@
# Copyright (c) 2016 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import constants
from neutron import context
from neutron.db import agents_db
from neutron.extensions import portbindings
from neutron.tests.common import helpers
from neutron.tests.unit.plugins.ml2 import base as ml2_test_base
DEVICE_OWNER_COMPUTE = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
class TestMl2PortBinding(ml2_test_base.ML2TestFramework,
agents_db.AgentDbMixin):
def setUp(self):
super(TestMl2PortBinding, self).setUp()
self.admin_context = context.get_admin_context()
self.host_args = {portbindings.HOST_ID: helpers.HOST,
'admin_state_up': True}
def test_port_bind_successfully(self):
helpers.register_ovs_agent(host=helpers.HOST)
with self.network() as network:
with self.subnet(network=network) as subnet:
with self.port(
subnet=subnet, device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID, 'admin_state_up',),
**self.host_args) as port:
# Note: Port creation invokes _bind_port_if_needed(),
# therefore it is all we need in order to test a successful
# binding
self.assertEqual(port['port']['binding:vif_type'],
portbindings.VIF_TYPE_OVS)
def test_port_bind_retry(self):
agent = helpers.register_ovs_agent(host=helpers.HOST)
helpers.kill_agent(agent_id=agent.id)
with self.network() as network:
with self.subnet(network=network) as subnet:
with self.port(
subnet=subnet, device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID, 'admin_state_up',),
**self.host_args) as port:
# Since the agent is dead, expect binding to fail
self.assertEqual(port['port']['binding:vif_type'],
portbindings.VIF_TYPE_BINDING_FAILED)
helpers.revive_agent(agent.id)
# When an agent starts, The RPC call get_device_details()
# will invoke get_bound_port_context() which eventually use
# _bind_port_if_needed()
bound_context = self.plugin.get_bound_port_context(
self.admin_context, port['port']['id'], helpers.HOST)
# Since the agent is back online, expect binding to succeed
self.assertEqual(bound_context.vif_type,
portbindings.VIF_TYPE_OVS)
self.assertEqual(bound_context.current['binding:vif_type'],
portbindings.VIF_TYPE_OVS)

View File

@@ -187,8 +187,9 @@ class TestMechanismDriver(api.MechanismDriver):
self._check_port_context(context, False)
def update_port_precommit(self, context):
if (context.original_top_bound_segment and
not context.top_bound_segment):
if ((context.original_top_bound_segment and
not context.top_bound_segment) or
(context.host == "host-fail")):
self.bound_ports.remove((context.original['id'],
context.original_host))
self._check_port_context(context, True)
@@ -234,3 +235,8 @@ class TestMechanismDriver(api.MechanismDriver):
portbindings.VIF_TYPE_OVS,
{portbindings.CAP_PORT_FILTER: False})
self.bound_ports.add((context.current['id'], host))
elif host == "host-fail":
context.set_binding(None,
portbindings.VIF_TYPE_BINDING_FAILED,
{portbindings.CAP_PORT_FILTER: False})
self.bound_ports.add((context.current['id'], host))

View File

@@ -1067,6 +1067,92 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
# should have returned before calling _make_port_dict
self.assertFalse(mpd_mock.mock_calls)
def _create_port_and_bound_context(self, port_vif_type, bound_vif_type):
with self.port() as port:
plugin = manager.NeutronManager.get_plugin()
binding = ml2_db.get_locked_port_and_binding(self.context.session,
port['port']['id'])[1]
binding['host'] = 'fake_host'
binding['vif_type'] = port_vif_type
# Generates port context to be used before the bind.
port_context = driver_context.PortContext(
plugin, self.context, port['port'],
plugin.get_network(self.context, port['port']['network_id']),
binding, None)
bound_context = mock.MagicMock()
# Bound context is how port_context is expected to look
# after _bind_port.
bound_context.vif_type = bound_vif_type
return plugin, port_context, bound_context
def test__attempt_binding(self):
# Simulate a successful binding for vif_type unbound
# and keep the same binding state for other vif types.
vif_types = [(portbindings.VIF_TYPE_BINDING_FAILED,
portbindings.VIF_TYPE_BINDING_FAILED),
(portbindings.VIF_TYPE_UNBOUND,
portbindings.VIF_TYPE_OVS),
(portbindings.VIF_TYPE_OVS,
portbindings.VIF_TYPE_OVS)]
for port_vif_type, bound_vif_type in vif_types:
plugin, port_context, bound_context = (
self._create_port_and_bound_context(port_vif_type,
bound_vif_type))
with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin._bind_port',
return_value=bound_context) as bd_mock:
context, need_notify, try_again = (plugin._attempt_binding(
port_context, False))
expected_need_notify = port_vif_type not in (
portbindings.VIF_TYPE_BINDING_FAILED,
portbindings.VIF_TYPE_OVS)
if bound_vif_type == portbindings.VIF_TYPE_BINDING_FAILED:
expected_vif_type = port_vif_type
expected_try_again = True
expected_bd_mock_called = True
else:
expected_vif_type = portbindings.VIF_TYPE_OVS
expected_try_again = False
expected_bd_mock_called = (port_vif_type ==
portbindings.VIF_TYPE_UNBOUND)
self.assertEqual(expected_need_notify, need_notify)
self.assertEqual(expected_vif_type, context.vif_type)
self.assertEqual(expected_try_again, try_again)
self.assertEqual(expected_bd_mock_called, bd_mock.called)
def test__attempt_binding_retries(self):
# Simulate cases of both successful and failed binding states for
# vif_type unbound
vif_types = [(portbindings.VIF_TYPE_UNBOUND,
portbindings.VIF_TYPE_BINDING_FAILED),
(portbindings.VIF_TYPE_UNBOUND,
portbindings.VIF_TYPE_OVS)]
for port_vif_type, bound_vif_type in vif_types:
plugin, port_context, bound_context = (
self._create_port_and_bound_context(port_vif_type,
bound_vif_type))
with mock.patch(
'neutron.plugins.ml2.plugin.Ml2Plugin._bind_port',
return_value=bound_context),\
mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin._commit_'
'port_binding',
return_value=(bound_context, True, False)),\
mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin.'
'_attempt_binding',
side_effect=plugin._attempt_binding) as at_mock:
plugin._bind_port_if_needed(port_context)
if bound_vif_type == portbindings.VIF_TYPE_BINDING_FAILED:
# An unsuccessful binding attempt should be retried
# MAX_BIND_TRIES amount of times.
self.assertEqual(ml2_plugin.MAX_BIND_TRIES,
at_mock.call_count)
else:
# Successful binding should only be attempted once.
self.assertEqual(1, at_mock.call_count)
def test_port_binding_profile_not_changed(self):
profile = {'e': 5}
profile_arg = {portbindings.PROFILE: profile}

View File

@@ -0,0 +1,6 @@
---
prelude: >
ML2: ports can now recover from binding failed state.
features:
- Ports that failed to bind when an L2 agent was offline can now recover after
the agent is back online.