Retry on unassigned ofport instead of treating it as a failure

Open vSwitch will return '[]' when querying an interface's ofport
when the ofport has not yet been assigned. This doesn't signal a
failure, but the get_port_ofport code was treating it as such.
This patch uses a decorator from python-retrying which has been
added as a dependency of oslo_concurrency and therefore packaged
everywhere. The call to fetch the ofport is retried until the
vsctl_timeout is reached and, on failure, INVALID_OFPORT is
returned.

The add_port function will attempt to delete the port if
INVALID_OFPORT is returned from get_port_ofport. add_port is also
extended to take optional Interface options so that the
add_tunnel_port and add_patch_port functions can reuse it instead
of just duplicating its functionality.

Closes-Bug: #1341020
Change-Id: Ifc52d8589c7aafd360893cb9c1cdcbf43b04ee2c
This commit is contained in:
Terry Wilson 2014-07-11 17:55:30 -06:00
parent 2fccc7ed2c
commit cd78c7cc11
7 changed files with 106 additions and 104 deletions

View File

@ -19,6 +19,8 @@ import operator
from oslo.config import cfg
from oslo.serialization import jsonutils
from oslo.utils import excutils
import retrying
import six
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
@ -43,6 +45,35 @@ cfg.CONF.register_opts(OPTS)
LOG = logging.getLogger(__name__)
def _ofport_result_pending(result):
"""Return True if ovs-vsctl indicates the result is still pending."""
# ovs-vsctl can return '[]' for an ofport that has not yet been assigned
try:
int(result)
return False
except (ValueError, TypeError):
return True
def _ofport_retry(fn):
"""Decorator for retrying when OVS has yet to assign an ofport.
The instance's vsctl_timeout is used as the max waiting time. This relies
on the fact that instance methods receive self as the first argument.
"""
@six.wraps(fn)
def wrapped(*args, **kwargs):
self = args[0]
new_fn = retrying.retry(
retry_on_result=_ofport_result_pending,
stop_max_delay=self.vsctl_timeout * 1000,
wait_exponential_multiplier=10,
wait_exponential_max=1000,
retry_on_exception=lambda _: False)(fn)
return new_fn(*args, **kwargs)
return wrapped
class VifPort:
def __init__(self, port_name, ofport, vif_id, vif_mac, switch):
self.port_name = port_name
@ -145,10 +176,16 @@ class OVSBridge(BaseOVS):
self.destroy()
self.create()
def add_port(self, port_name):
self.run_vsctl(["--", "--may-exist", "add-port", self.br_name,
port_name])
return self.get_port_ofport(port_name)
def add_port(self, port_name, *interface_options):
args = ["--", "--may-exist", "add-port", self.br_name, port_name]
if interface_options:
args += ['--', 'set', 'Interface', port_name]
args += ['%s=%s' % kv for kv in interface_options]
self.run_vsctl(args)
ofport = self.get_port_ofport(port_name)
if ofport == INVALID_OFPORT:
self.delete_port(port_name)
return ofport
def replace_port(self, port_name, *interface_attr_tuples):
"""Replace existing port or create it, and configure port interface."""
@ -188,15 +225,20 @@ class OVSBridge(BaseOVS):
def remove_all_flows(self):
self.run_ofctl("del-flows", [])
@_ofport_retry
def _get_port_ofport(self, port_name):
return self.db_get_val("Interface", port_name, "ofport")
def get_port_ofport(self, port_name):
ofport = self.db_get_val("Interface", port_name, "ofport")
# This can return a non-integer string, like '[]' so ensure a
# common failure case
"""Get the port's assigned ofport, retrying if not yet assigned."""
ofport = INVALID_OFPORT
try:
int(ofport)
return ofport
except (ValueError, TypeError):
return INVALID_OFPORT
ofport = self._get_port_ofport(port_name)
except retrying.RetryError as e:
LOG.exception(_LE("Timed out retrieving ofport on port %(pname)s. "
"Exception: %(exception)s"),
{'pname': port_name, 'exception': e})
return ofport
def get_datapath_id(self):
return self.db_get_val('Bridge',
@ -231,34 +273,25 @@ class OVSBridge(BaseOVS):
tunnel_type=constants.TYPE_GRE,
vxlan_udp_port=constants.VXLAN_UDP_PORT,
dont_fragment=True):
vsctl_command = ["--", "--may-exist", "add-port", self.br_name,
port_name]
vsctl_command.extend(["--", "set", "Interface", port_name,
"type=%s" % tunnel_type])
if tunnel_type == constants.TYPE_VXLAN:
# Only set the VXLAN UDP port if it's not the default
if vxlan_udp_port != constants.VXLAN_UDP_PORT:
vsctl_command.append("options:dst_port=%s" % vxlan_udp_port)
vsctl_command.append(("options:df_default=%s" %
bool(dont_fragment)).lower())
vsctl_command.extend(["options:remote_ip=%s" % remote_ip,
"options:local_ip=%s" % local_ip,
"options:in_key=flow",
"options:out_key=flow"])
self.run_vsctl(vsctl_command)
ofport = self.get_port_ofport(port_name)
options = [('type', tunnel_type)]
if (tunnel_type == constants.TYPE_VXLAN and
ofport == INVALID_OFPORT):
LOG.error(_LE('Unable to create VXLAN tunnel port. Please ensure '
'that an openvswitch version that supports VXLAN is '
'installed.'))
return ofport
vxlan_udp_port != constants.VXLAN_UDP_PORT):
# Only set the VXLAN UDP port if it's not the default
options.append(("options:dst_port", vxlan_udp_port))
options.extend([
("options:df_default", str(bool(dont_fragment)).lower()),
("options:remote_ip", remote_ip),
("options:local_ip", local_ip),
("options:in_key", "flow"),
("options:out_key", "flow")])
return self.add_port(port_name, *options)
def add_patch_port(self, local_name, remote_name):
self.run_vsctl(["add-port", self.br_name, local_name,
"--", "set", "Interface", local_name,
"type=patch", "options:peer=%s" % remote_name])
return self.get_port_ofport(local_name)
options = [
('type', 'patch'),
('options:peer', remote_name)
]
return self.add_port(local_name, *options)
def db_get_map(self, table, record, column, check_error=False):
output = self.run_vsctl(["get", table, record, column], check_error)

View File

@ -648,23 +648,17 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.debug("No VIF port for port %s defined on agent.", port_id)
def _setup_tunnel_port(self, br, port_name, remote_ip, tunnel_type):
ofport_str = br.add_tunnel_port(port_name,
remote_ip,
self.local_ip,
tunnel_type,
self.vxlan_udp_port,
self.dont_fragment)
ofport = -1
try:
ofport = int(ofport_str)
except (TypeError, ValueError):
LOG.exception(_LE("ofport should have a value that can be "
"interpreted as an integer"))
if ofport < 0:
ofport = br.add_tunnel_port(port_name,
remote_ip,
self.local_ip,
tunnel_type,
self.vxlan_udp_port,
self.dont_fragment)
if ofport == ovs_lib.INVALID_OFPORT:
LOG.error(_LE("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': tunnel_type, 'ip': remote_ip})
return 0
ofport = int(ofport)
self.tun_ofports[tunnel_type][remote_ip] = ofport
br.check_in_port_add_tunnel_port(tunnel_type, ofport)
return ofport

View File

@ -763,11 +763,12 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port)
self.patch_int_ofport = self.tun_br.add_patch_port(
cfg.CONF.OVS.tun_peer_patch_port, cfg.CONF.OVS.int_peer_patch_port)
if int(self.patch_tun_ofport) < 0 or int(self.patch_int_ofport) < 0:
if ovs_lib.INVALID_OFPORT in (self.patch_tun_ofport,
self.patch_int_ofport):
LOG.error(_LE("Failed to create OVS patch port. Cannot have "
"tunneling enabled on this agent, since this "
"version of OVS does not support tunnels or "
"patch ports. Agent terminated!"))
"version of OVS does not support tunnels or patch "
"ports. Agent terminated!"))
exit(1)
self.tun_br.remove_all_flows()
@ -1042,13 +1043,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
tunnel_type,
self.vxlan_udp_port,
self.dont_fragment)
ofport_int = -1
try:
ofport_int = int(ofport)
except (TypeError, ValueError):
LOG.exception(_LE("ofport should have a value that can be "
"interpreted as an integer"))
if ofport_int < 0:
if ofport == ovs_lib.INVALID_OFPORT:
LOG.error(_LE("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': tunnel_type, 'ip': remote_ip})
return 0

View File

@ -363,11 +363,16 @@ class OVS_Lib_Test(base.BaseTestCase):
"actions=normal",
root_helper=self.root_helper)
def _set_timeout(self, val):
self.TO = '--timeout=%d' % val
self.br.vsctl_timeout = val
def _test_get_port_ofport(self, ofport, expected_result):
pname = "tap99"
self._set_timeout(0) # Don't waste precious time retrying
self.execute.return_value = ofport
self.assertEqual(self.br.get_port_ofport(pname), expected_result)
self.execute.assert_called_once_with(
self.execute.assert_called_with(
["ovs-vsctl", self.TO, "get", "Interface", pname, "ofport"],
root_helper=self.root_helper)
@ -380,6 +385,10 @@ class OVS_Lib_Test(base.BaseTestCase):
def test_get_port_ofport_returns_invalid_ofport_for_none(self):
self._test_get_port_ofport(None, ovs_lib.INVALID_OFPORT)
def test_get_port_ofport_returns_invalid_for_invalid(self):
self._test_get_port_ofport(ovs_lib.INVALID_OFPORT,
ovs_lib.INVALID_OFPORT)
def test_get_datapath_id(self):
datapath_id = '"0000b67f4fbcc149"'
self.execute.return_value = datapath_id
@ -539,7 +548,8 @@ class OVS_Lib_Test(base.BaseTestCase):
ofport = "6"
# Each element is a tuple of (expected mock call, return_value)
command = ["ovs-vsctl", self.TO, "add-port", self.BR_NAME, pname]
command = ["ovs-vsctl", self.TO, "--", "--may-exist", "add-port",
self.BR_NAME, pname]
command.extend(["--", "set", "Interface", pname])
command.extend(["type=patch", "options:peer=" + peer])
expected_calls_and_values = [

View File

@ -29,6 +29,7 @@ from oslo.config import cfg
from oslo.utils import importutils
import testtools
from neutron.agent.linux import ovs_lib
from neutron.common import constants as n_const
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
@ -748,25 +749,13 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
self.assertEqual(ofport, 0)
def test__setup_tunnel_port_error_not_int(self):
with contextlib.nested(
mock.patch.object(self.agent.int_br, 'add_tunnel_port',
return_value=None),
mock.patch.object(self.mod_agent.LOG, 'exception'),
mock.patch.object(self.mod_agent.LOG, 'error')
) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
ofport = self.agent._setup_tunnel_port(
self.agent.int_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
add_tunnel_port_fn.assert_called_once_with(
'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
self.agent.vxlan_udp_port, self.agent.dont_fragment)
log_exc_fn.assert_called_once_with(
_("ofport should have a value that can be "
"interpreted as an integer"))
log_error_fn.assert_called_once_with(
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
self.assertEqual(ofport, 0)
def test_setup_tunnel_port_returns_zero_for_failed_port_add(self):
with mock.patch.object(self.agent.int_br, 'add_tunnel_port',
return_value=ovs_lib.INVALID_OFPORT):
result = self.agent._setup_tunnel_port(self.agent.int_br, 'gre-1',
'remote_ip',
p_const.TYPE_GRE)
self.assertEqual(0, result)
def test_tunnel_sync(self):
self.agent.local_ip = 'agent_ip'

View File

@ -1316,10 +1316,10 @@ class TestOvsNeutronAgent(base.BaseTestCase):
constants.DEFAULT_OVSDBMON_RESPAWN)
mock_loop.assert_called_once_with(polling_manager=mock.ANY)
def test__setup_tunnel_port_error_negative(self):
def test_setup_tunnel_port_invalid_ofport(self):
with contextlib.nested(
mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
return_value='-1'),
return_value=ovs_lib.INVALID_OFPORT),
mock.patch.object(ovs_neutron_agent.LOG, 'error')
) as (add_tunnel_port_fn, log_error_fn):
ofport = self.agent._setup_tunnel_port(
@ -1332,27 +1332,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
self.assertEqual(ofport, 0)
def test__setup_tunnel_port_error_not_int(self):
with contextlib.nested(
mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
return_value=None),
mock.patch.object(ovs_neutron_agent.LOG, 'exception'),
mock.patch.object(ovs_neutron_agent.LOG, 'error')
) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
ofport = self.agent._setup_tunnel_port(
self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
add_tunnel_port_fn.assert_called_once_with(
'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
self.agent.vxlan_udp_port, self.agent.dont_fragment)
log_exc_fn.assert_called_once_with(
_("ofport should have a value that can be "
"interpreted as an integer"))
log_error_fn.assert_called_once_with(
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
self.assertEqual(ofport, 0)
def test__setup_tunnel_port_error_negative_df_disabled(self):
def test_setup_tunnel_port_error_negative_df_disabled(self):
with contextlib.nested(
mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
return_value='-1'),

View File

@ -15,6 +15,7 @@ Jinja2>=2.6 # BSD License (3 clause)
keystonemiddleware>=1.0.0
netaddr>=0.7.12
python-neutronclient>=2.3.6,<3
retrying>=1.2.3,!=1.3.0 # Apache-2.0
SQLAlchemy>=0.9.7,<=0.9.99
WebOb>=1.2.3
python-keystoneclient>=0.11.1