Wait for vswitchd to add interfaces in native ovsdb
ovs-vsctl, unless --no-wait is passed, will wait until ovs-vswitchd has reacted to a successful transaction. This patch implements the same logic, waiting for next_cfg to be incremented and checking that any added interfaces have actually been assigned ofports. Closes-Bug: #1604816 Closes-Bug: #1604370 Related-Bug: #1604115 Change-Id: I638b82c13394f150c0bd23301285bd3375e66139
This commit is contained in:
parent
0ead749dde
commit
11dc21d3a6
|
@ -120,10 +120,7 @@ class BaseOVS(object):
|
|||
|
||||
self.ovsdb.add_br(bridge_name,
|
||||
datapath_type).execute()
|
||||
br = OVSBridge(bridge_name)
|
||||
# Don't return until vswitchd sets up the internal port
|
||||
br.get_port_ofport(bridge_name)
|
||||
return br
|
||||
return OVSBridge(bridge_name)
|
||||
|
||||
def delete_bridge(self, bridge_name):
|
||||
self.ovsdb.del_br(bridge_name).execute()
|
||||
|
@ -221,8 +218,6 @@ class OVSBridge(BaseOVS):
|
|||
if secure_mode:
|
||||
txn.add(self.ovsdb.set_fail_mode(self.br_name,
|
||||
FAILMODE_SECURE))
|
||||
# Don't return until vswitchd sets up the internal port
|
||||
self.get_port_ofport(self.br_name)
|
||||
|
||||
def destroy(self):
|
||||
self.delete_bridge(self.br_name)
|
||||
|
@ -248,8 +243,6 @@ class OVSBridge(BaseOVS):
|
|||
if interface_attr_tuples:
|
||||
txn.add(self.ovsdb.db_set('Interface', port_name,
|
||||
*interface_attr_tuples))
|
||||
# Don't return until the port has been assigned by vswitchd
|
||||
self.get_port_ofport(port_name)
|
||||
|
||||
def delete_port(self, port_name):
|
||||
self.ovsdb.del_port(port_name, self.br_name).execute()
|
||||
|
|
|
@ -14,13 +14,14 @@
|
|||
|
||||
import time
|
||||
|
||||
from neutron_lib import exceptions
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from ovs.db import idl
|
||||
from six.moves import queue as Queue
|
||||
|
||||
from neutron._i18n import _
|
||||
from neutron._i18n import _, _LE
|
||||
from neutron.agent.ovsdb import api
|
||||
from neutron.agent.ovsdb.native import commands as cmd
|
||||
from neutron.agent.ovsdb.native import connection
|
||||
|
@ -32,9 +33,13 @@ cfg.CONF.import_opt('ovs_vsctl_timeout', 'neutron.agent.common.ovs_lib')
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VswitchdInterfaceAddException(exceptions.NeutronException):
|
||||
message = _("Failed to add interfaces: %(ifaces)s")
|
||||
|
||||
|
||||
class Transaction(api.Transaction):
|
||||
def __init__(self, api, ovsdb_connection, timeout,
|
||||
check_error=False, log_errors=False):
|
||||
check_error=False, log_errors=True):
|
||||
self.api = api
|
||||
self.check_error = check_error
|
||||
self.log_errors = log_errors
|
||||
|
@ -42,6 +47,10 @@ class Transaction(api.Transaction):
|
|||
self.results = Queue.Queue(1)
|
||||
self.ovsdb_connection = ovsdb_connection
|
||||
self.timeout = timeout
|
||||
self.expected_ifaces = set()
|
||||
|
||||
def __str__(self):
|
||||
return ", ".join(str(cmd) for cmd in self.commands)
|
||||
|
||||
def add(self, command):
|
||||
"""Add a command to the transaction
|
||||
|
@ -61,23 +70,29 @@ class Transaction(api.Transaction):
|
|||
_("Commands %(commands)s exceeded timeout %(timeout)d "
|
||||
"seconds") % {'commands': self.commands,
|
||||
'timeout': self.timeout})
|
||||
if self.check_error:
|
||||
if isinstance(result, idlutils.ExceptionResult):
|
||||
if self.log_errors:
|
||||
LOG.error(result.tb)
|
||||
if isinstance(result, idlutils.ExceptionResult):
|
||||
if self.log_errors:
|
||||
LOG.error(result.tb)
|
||||
if self.check_error:
|
||||
raise result.ex
|
||||
return result
|
||||
|
||||
def pre_commit(self, txn):
|
||||
pass
|
||||
|
||||
def post_commit(self, txn):
|
||||
pass
|
||||
|
||||
def do_commit(self):
|
||||
start_time = time.time()
|
||||
self.start_time = time.time()
|
||||
attempts = 0
|
||||
while True:
|
||||
elapsed_time = time.time() - start_time
|
||||
if attempts > 0 and elapsed_time > self.timeout:
|
||||
if attempts > 0 and self.timeout_exceeded():
|
||||
raise RuntimeError("OVS transaction timed out")
|
||||
attempts += 1
|
||||
# TODO(twilson) Make sure we don't loop longer than vsctl_timeout
|
||||
txn = idl.Transaction(self.api.idl)
|
||||
self.pre_commit(txn)
|
||||
for i, command in enumerate(self.commands):
|
||||
LOG.debug("Running txn command(idx=%(idx)s): %(cmd)s",
|
||||
{'idx': i, 'cmd': command})
|
||||
|
@ -92,9 +107,8 @@ class Transaction(api.Transaction):
|
|||
status = txn.commit_block()
|
||||
if status == txn.TRY_AGAIN:
|
||||
LOG.debug("OVSDB transaction returned TRY_AGAIN, retrying")
|
||||
idlutils.wait_for_change(
|
||||
self.api.idl, self.timeout - elapsed_time,
|
||||
seqno)
|
||||
idlutils.wait_for_change(self.api.idl, self.time_remaining(),
|
||||
seqno)
|
||||
continue
|
||||
elif status == txn.ERROR:
|
||||
msg = _("OVSDB Error: %s") % txn.get_error()
|
||||
|
@ -109,9 +123,67 @@ class Transaction(api.Transaction):
|
|||
return
|
||||
elif status == txn.UNCHANGED:
|
||||
LOG.debug("Transaction caused no change")
|
||||
elif status == txn.SUCCESS:
|
||||
self.post_commit(txn)
|
||||
|
||||
return [cmd.result for cmd in self.commands]
|
||||
|
||||
def elapsed_time(self):
|
||||
return time.time() - self.start_time
|
||||
|
||||
def time_remaining(self):
|
||||
return self.timeout - self.elapsed_time()
|
||||
|
||||
def timeout_exceeded(self):
|
||||
return self.elapsed_time() > self.timeout
|
||||
|
||||
|
||||
class NeutronOVSDBTransaction(Transaction):
|
||||
def pre_commit(self, txn):
|
||||
self.api._ovs.increment('next_cfg')
|
||||
txn.expected_ifaces = set()
|
||||
|
||||
def post_commit(self, txn):
|
||||
# ovs-vsctl only logs these failures and does not return nonzero
|
||||
try:
|
||||
self.do_post_commit(txn)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Post-commit checks failed"))
|
||||
|
||||
def do_post_commit(self, txn):
|
||||
next_cfg = txn.get_increment_new_value()
|
||||
while not self.timeout_exceeded():
|
||||
self.api.idl.run()
|
||||
if self.vswitchd_has_completed(next_cfg):
|
||||
failed = self.post_commit_failed_interfaces(txn)
|
||||
if failed:
|
||||
raise VswitchdInterfaceAddException(
|
||||
ifaces=", ".join(failed))
|
||||
break
|
||||
self.ovsdb_connection.poller.timer_wait(
|
||||
self.time_remaining() * 1000)
|
||||
self.api.idl.wait(self.ovsdb_connection.poller)
|
||||
self.ovsdb_connection.poller.block()
|
||||
else:
|
||||
raise api.TimeoutException(
|
||||
_("Commands %(commands)s exceeded timeout %(timeout)d "
|
||||
"seconds post-commit") % {'commands': self.commands,
|
||||
'timeout': self.timeout})
|
||||
|
||||
def post_commit_failed_interfaces(self, txn):
|
||||
failed = []
|
||||
for iface_uuid in txn.expected_ifaces:
|
||||
uuid = txn.get_insert_uuid(iface_uuid)
|
||||
if uuid:
|
||||
ifaces = self.api.idl.tables['Interface']
|
||||
iface = ifaces.rows.get(uuid)
|
||||
if iface and (not iface.ofport or iface.ofport == -1):
|
||||
failed.append(iface.name)
|
||||
return failed
|
||||
|
||||
def vswitchd_has_completed(self, next_cfg):
|
||||
return self.api._ovs.cur_cfg >= next_cfg
|
||||
|
||||
|
||||
class OvsdbIdl(api.API):
|
||||
|
||||
|
@ -133,9 +205,9 @@ class OvsdbIdl(api.API):
|
|||
return list(self._tables['Open_vSwitch'].rows.values())[0]
|
||||
|
||||
def transaction(self, check_error=False, log_errors=True, **kwargs):
|
||||
return Transaction(self, OvsdbIdl.ovsdb_connection,
|
||||
self.context.vsctl_timeout,
|
||||
check_error, log_errors)
|
||||
return NeutronOVSDBTransaction(self, OvsdbIdl.ovsdb_connection,
|
||||
self.context.vsctl_timeout,
|
||||
check_error, log_errors)
|
||||
|
||||
def add_br(self, name, may_exist=True, datapath_type=None):
|
||||
return cmd.AddBridgeCommand(self, name, may_exist, datapath_type)
|
||||
|
|
|
@ -303,6 +303,8 @@ class AddPortCommand(BaseCommand):
|
|||
br.ports = ports
|
||||
|
||||
iface = txn.insert(self.api._tables['Interface'])
|
||||
# NOTE(twilson) The OVS lib's __getattr__ breaks iface.uuid here
|
||||
txn.expected_ifaces.add(iface.__dict__['uuid'])
|
||||
iface.name = self.port
|
||||
port.verify('interfaces')
|
||||
ifaces = getattr(port, 'interfaces', [])
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
# Copyright (c) 2016 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutron.agent.common import ovs_lib
|
||||
from neutron.agent.ovsdb import api
|
||||
from neutron.agent.ovsdb import impl_idl
|
||||
from neutron.tests import base as test_base
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional import base
|
||||
|
||||
|
||||
# NOTE(twilson) functools.partial does not work for this
|
||||
def trpatch(*args, **kwargs):
|
||||
def wrapped(fn):
|
||||
return mock.patch.object(impl_idl.NeutronOVSDBTransaction,
|
||||
*args, **kwargs)(fn)
|
||||
return wrapped
|
||||
|
||||
|
||||
class ImplIdlTestCase(base.BaseSudoTestCase):
|
||||
def setUp(self):
|
||||
super(ImplIdlTestCase, self).setUp()
|
||||
self.config(group='OVS', ovsdb_interface='native')
|
||||
self.ovs = ovs_lib.BaseOVS()
|
||||
self.brname = test_base.get_rand_device_name(net_helpers.BR_PREFIX)
|
||||
# Make sure exceptions pass through by calling do_post_commit directly
|
||||
mock.patch.object(
|
||||
impl_idl.NeutronOVSDBTransaction, "post_commit",
|
||||
side_effect=impl_idl.NeutronOVSDBTransaction.do_post_commit,
|
||||
autospec=True).start()
|
||||
|
||||
def _add_br(self):
|
||||
# NOTE(twilson) we will be raising exceptions with add_br, so schedule
|
||||
# cleanup before that.
|
||||
self.addCleanup(self.ovs.delete_bridge, self.brname)
|
||||
ovsdb = self.ovs.ovsdb
|
||||
with ovsdb.transaction(check_error=True) as tr:
|
||||
tr.add(ovsdb.add_br(self.brname))
|
||||
return tr
|
||||
|
||||
def _add_br_and_test(self):
|
||||
self._add_br()
|
||||
ofport = self.ovs.db_get_val("Interface", self.brname, "ofport")
|
||||
self.assertTrue(int(ofport))
|
||||
self.assertTrue(ofport > -1)
|
||||
|
||||
def test_post_commit_vswitchd_completed_no_failures(self):
|
||||
self._add_br_and_test()
|
||||
|
||||
@trpatch("vswitchd_has_completed", return_value=True)
|
||||
@trpatch("post_commit_failed_interfaces", return_value=["failed_if1"])
|
||||
@trpatch("timeout_exceeded", return_value=False)
|
||||
def test_post_commit_vswitchd_completed_failures(self, *args):
|
||||
self.assertRaises(impl_idl.VswitchdInterfaceAddException, self._add_br)
|
||||
|
||||
@trpatch("vswitchd_has_completed", return_value=False)
|
||||
def test_post_commit_vswitchd_incomplete_timeout(self, *args):
|
||||
# Due to timing issues we may rarely hit the global timeout, which
|
||||
# raises RuntimeError to match the vsctl implementation
|
||||
self.ovs.vsctl_timeout = 3
|
||||
self.assertRaises((api.TimeoutException, RuntimeError), self._add_br)
|
|
@ -24,7 +24,15 @@ from neutron.tests import base
|
|||
class TransactionTestCase(base.BaseTestCase):
|
||||
def test_commit_raises_exception_on_timeout(self):
|
||||
with mock.patch.object(queue, 'Queue') as mock_queue:
|
||||
transaction = impl_idl.Transaction(mock.sentinel, mock.Mock(), 0)
|
||||
transaction = impl_idl.NeutronOVSDBTransaction(mock.sentinel,
|
||||
mock.Mock(), 0)
|
||||
mock_queue.return_value.get.side_effect = queue.Empty
|
||||
with testtools.ExpectedException(api.TimeoutException):
|
||||
transaction.commit()
|
||||
|
||||
def test_post_commit_does_not_raise_exception(self):
|
||||
with mock.patch.object(impl_idl.NeutronOVSDBTransaction,
|
||||
"do_post_commit", side_effect=Exception):
|
||||
transaction = impl_idl.NeutronOVSDBTransaction(mock.sentinel,
|
||||
mock.Mock(), 0)
|
||||
transaction.post_commit(mock.Mock())
|
||||
|
|
Loading…
Reference in New Issue