Refactor OVSDB native lib to be more customizable

This change, in a backward-compatible way, allows users of the
native OVSDB library to pass in their own Idl factory function
where they can return any subclass of the Idl, any schema, and
and register whatever tables/columns they want. This way, we
do not have to keep up with changes to the upstream OVSDB Idl
library.

The Connection connection, schema_name, and idl_class arguments
are deprecated in favor of the idl_factory function.

Change-Id: I8133da63caa8937c4fce68eb2fbb73d36b894043
This commit is contained in:
Terry Wilson 2016-10-20 00:54:46 -05:00 committed by Omer Anson
parent 41bf03f927
commit 2013058e13
4 changed files with 99 additions and 35 deletions

View File

@ -16,12 +16,12 @@ import os
import threading
import traceback
from debtcollector import removals
from ovs.db import idl
from ovs import poller
from six.moves import queue as Queue
import tenacity
from neutron.agent.ovsdb.native import helpers
from neutron._i18n import _
from neutron.agent.ovsdb.native import idlutils
@ -53,16 +53,52 @@ class TransactionQueue(Queue.Queue, object):
class Connection(object):
def __init__(self, connection, timeout, schema_name, idl_class=None):
__rm_args = {'version': 'Ocata', 'removal_version': 'Pike',
'message': _('Use an idl_factory function instead')}
@removals.removed_kwarg('connection', **__rm_args)
@removals.removed_kwarg('schema_name', **__rm_args)
@removals.removed_kwarg('idl_class', **__rm_args)
def __init__(self, connection=None, timeout=None, schema_name=None,
idl_class=None, idl_factory=None):
"""Create a connection to an OVSDB server using the OVS IDL
:param connection: (deprecated) An OVSDB connection string
:param timeout: The timeout value for OVSDB operations (required)
:param schema_name: (deprecated) The name ovs the OVSDB schema to use
:param idl_class: (deprecated) An Idl subclass. Defaults to idl.Idl
:param idl_factory: A factory function that produces an Idl instance
The signature of this class is changing. It is recommended to pass in
a timeout and idl_factory
"""
assert timeout is not None
self.idl = None
self.connection = connection
self.timeout = timeout
self.txns = TransactionQueue(1)
self.lock = threading.Lock()
self.schema_name = schema_name
self.idl_class = idl_class or idl.Idl
self._schema_filter = None
if idl_factory:
if connection or schema_name:
raise TypeError(_('Connection: Takes either idl_factory, or '
'connection and schema_name. Both given'))
self.idl_factory = idl_factory
else:
if not connection or not schema_name:
raise TypeError(_('Connection: Takes either idl_factory, or '
'connection and schema_name. Neither given'))
self.idl_factory = self._idl_factory
self.connection = connection
self.schema_name = schema_name
self.idl_class = idl_class or idl.Idl
self._schema_filter = None
@removals.remove(**__rm_args)
def _idl_factory(self):
helper = self.get_schema_helper()
self.update_schema_helper(helper)
return self.idl_class(self.connection, helper)
@removals.removed_kwarg('table_name_list', **__rm_args)
def start(self, table_name_list=None):
"""
:param table_name_list: A list of table names for schema_helper to
@ -76,36 +112,24 @@ class Connection(object):
if self.idl is not None:
return
helper = self.get_schema_helper()
self.update_schema_helper(helper)
self.idl = self.idl_class(self.connection, helper)
self.idl = self.idl_factory()
idlutils.wait_for_change(self.idl, self.timeout)
self.poller = poller.Poller()
self.thread = threading.Thread(target=self.run)
self.thread.setDaemon(True)
self.thread.start()
@removals.remove(
version='Ocata', removal_version='Pike',
message=_("Use idlutils.get_schema_helper(conn, schema, retry=True)"))
def get_schema_helper(self):
"""Retrieve the schema helper object from OVSDB"""
try:
helper = idlutils.get_schema_helper(self.connection,
self.schema_name)
except Exception:
# We may have failed do to set-manager not being called
helpers.enable_connection_uri(self.connection)
# There is a small window for a race, so retry up to a second
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=0.01),
stop=tenacity.stop_after_delay(1),
reraise=True)
def do_get_schema_helper():
return idlutils.get_schema_helper(self.connection,
self.schema_name)
helper = do_get_schema_helper()
return helper
return idlutils.get_schema_helper(self.connection, self.schema_name,
retry=True)
@removals.remove(
version='Ocata', removal_version='Pike',
message=_("Use an idl_factory and ovs.db.SchemaHelper for filtering"))
def update_schema_helper(self, helper):
if self._schema_filter:
for table_name in self._schema_filter:

View File

@ -18,14 +18,17 @@ import time
import uuid
from neutron_lib import exceptions
from oslo_utils import excutils
from ovs.db import idl
from ovs import jsonrpc
from ovs import poller
from ovs import stream
import six
import tenacity
from neutron._i18n import _
from neutron.agent.ovsdb import api
from neutron.agent.ovsdb.native import helpers
RowLookup = collections.namedtuple('RowLookup',
@ -97,7 +100,7 @@ class ExceptionResult(object):
self.tb = tb
def get_schema_helper(connection, schema_name):
def _get_schema_helper(connection, schema_name):
err, strm = stream.Stream.open_block(
stream.Stream.open(connection))
if err:
@ -115,6 +118,26 @@ def get_schema_helper(connection, schema_name):
return idl.SchemaHelper(None, resp.result)
def get_schema_helper(connection, schema_name, retry=True):
try:
return _get_schema_helper(connection, schema_name)
except Exception:
with excutils.save_and_reraise_exception(reraise=False) as ctx:
if not retry:
ctx.reraise = True
# We may have failed due to set-manager not being called
helpers.enable_connection_uri(connection)
# There is a small window for a race, so retry up to a second
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=0.01),
stop=tenacity.stop_after_delay(1),
reraise=True)
def do_get_schema_helper():
return _get_schema_helper(connection, schema_name)
return do_get_schema_helper()
def wait_for_change(_idl, timeout, seqno=None):
if seqno is None:
seqno = _idl.change_seqno

View File

@ -13,18 +13,35 @@
# under the License.
from neutron.agent.ovsdb.native import connection
from neutron.agent.ovsdb.native import idlutils
from neutron.tests.functional import base
from oslo_config import cfg
from ovs.db import idl
class OVSDBConnectionTestCase(base.BaseSudoTestCase):
def setUp(self):
super(OVSDBConnectionTestCase, self).setUp()
def test_limit_tables(self):
self.connection = connection.Connection(
cfg.CONF.OVS.ovsdb_connection,
cfg.CONF.ovs_vsctl_timeout, 'Open_vSwitch')
def test_limit_tables(self):
tables = ['Open_vSwitch', 'Bridge', 'Port']
self.connection.start(table_name_list=tables)
self.assertItemsEqual(tables, self.connection.idl.tables.keys())
def test_idl_factory(self):
tables = ['Open_vSwitch', 'Bridge', 'Port']
def _idl_factory():
connection = cfg.CONF.OVS.ovsdb_connection
helper = idlutils.get_schema_helper(connection, 'Open_vSwitch')
for table in tables:
helper.register_table(table)
return idl.Idl(connection, helper)
self.connection = connection.Connection(
idl_factory=_idl_factory,
timeout=cfg.CONF.ovs_vsctl_timeout,
)
self.connection.start(table_name_list=tables)
self.assertItemsEqual(tables, self.connection.idl.tables.keys())

View File

@ -86,8 +86,8 @@ class TestOVSNativeConnection(base.BaseTestCase):
@mock.patch.object(connection, 'threading')
@mock.patch.object(connection.idlutils, 'wait_for_change')
@mock.patch.object(connection, 'idl')
@mock.patch.object(connection.helpers, 'enable_connection_uri')
@mock.patch.object(connection.idlutils, 'get_schema_helper')
@mock.patch.object(idlutils.helpers, 'enable_connection_uri')
@mock.patch.object(connection.idlutils, '_get_schema_helper')
def test_do_get_schema_helper_retry(self, mock_get_schema_helper,
mock_enable_conn,
mock_idl,