Merge "Refactor OVSDB native lib to be more customizable"
This commit is contained in:
@@ -16,13 +16,13 @@ import os
|
|||||||
import threading
|
import threading
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
from debtcollector import removals
|
||||||
from ovs.db import idl
|
from ovs.db import idl
|
||||||
from ovs import poller
|
from ovs import poller
|
||||||
import six
|
import six
|
||||||
from six.moves import queue as Queue
|
from six.moves import queue as Queue
|
||||||
import tenacity
|
|
||||||
|
|
||||||
from neutron.agent.ovsdb.native import helpers
|
from neutron._i18n import _
|
||||||
from neutron.agent.ovsdb.native import idlutils
|
from neutron.agent.ovsdb.native import idlutils
|
||||||
|
|
||||||
|
|
||||||
@@ -54,16 +54,52 @@ class TransactionQueue(Queue.Queue, object):
|
|||||||
|
|
||||||
|
|
||||||
class Connection(object):
|
class Connection(object):
|
||||||
def __init__(self, connection, timeout, schema_name, idl_class=None):
|
__rm_args = {'version': 'Ocata', 'removal_version': 'Pike',
|
||||||
|
'message': _('Use an idl_factory function instead')}
|
||||||
|
|
||||||
|
@removals.removed_kwarg('connection', **__rm_args)
|
||||||
|
@removals.removed_kwarg('schema_name', **__rm_args)
|
||||||
|
@removals.removed_kwarg('idl_class', **__rm_args)
|
||||||
|
def __init__(self, connection=None, timeout=None, schema_name=None,
|
||||||
|
idl_class=None, idl_factory=None):
|
||||||
|
"""Create a connection to an OVSDB server using the OVS IDL
|
||||||
|
|
||||||
|
:param connection: (deprecated) An OVSDB connection string
|
||||||
|
:param timeout: The timeout value for OVSDB operations (required)
|
||||||
|
:param schema_name: (deprecated) The name ovs the OVSDB schema to use
|
||||||
|
:param idl_class: (deprecated) An Idl subclass. Defaults to idl.Idl
|
||||||
|
:param idl_factory: A factory function that produces an Idl instance
|
||||||
|
|
||||||
|
The signature of this class is changing. It is recommended to pass in
|
||||||
|
a timeout and idl_factory
|
||||||
|
"""
|
||||||
|
assert timeout is not None
|
||||||
self.idl = None
|
self.idl = None
|
||||||
self.connection = connection
|
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
self.txns = TransactionQueue(1)
|
self.txns = TransactionQueue(1)
|
||||||
self.lock = threading.Lock()
|
self.lock = threading.Lock()
|
||||||
self.schema_name = schema_name
|
if idl_factory:
|
||||||
self.idl_class = idl_class or idl.Idl
|
if connection or schema_name:
|
||||||
self._schema_filter = None
|
raise TypeError(_('Connection: Takes either idl_factory, or '
|
||||||
|
'connection and schema_name. Both given'))
|
||||||
|
self.idl_factory = idl_factory
|
||||||
|
else:
|
||||||
|
if not connection or not schema_name:
|
||||||
|
raise TypeError(_('Connection: Takes either idl_factory, or '
|
||||||
|
'connection and schema_name. Neither given'))
|
||||||
|
self.idl_factory = self._idl_factory
|
||||||
|
self.connection = connection
|
||||||
|
self.schema_name = schema_name
|
||||||
|
self.idl_class = idl_class or idl.Idl
|
||||||
|
self._schema_filter = None
|
||||||
|
|
||||||
|
@removals.remove(**__rm_args)
|
||||||
|
def _idl_factory(self):
|
||||||
|
helper = self.get_schema_helper()
|
||||||
|
self.update_schema_helper(helper)
|
||||||
|
return self.idl_class(self.connection, helper)
|
||||||
|
|
||||||
|
@removals.removed_kwarg('table_name_list', **__rm_args)
|
||||||
def start(self, table_name_list=None):
|
def start(self, table_name_list=None):
|
||||||
"""
|
"""
|
||||||
:param table_name_list: A list of table names for schema_helper to
|
:param table_name_list: A list of table names for schema_helper to
|
||||||
@@ -77,36 +113,24 @@ class Connection(object):
|
|||||||
if self.idl is not None:
|
if self.idl is not None:
|
||||||
return
|
return
|
||||||
|
|
||||||
helper = self.get_schema_helper()
|
self.idl = self.idl_factory()
|
||||||
self.update_schema_helper(helper)
|
|
||||||
|
|
||||||
self.idl = self.idl_class(self.connection, helper)
|
|
||||||
idlutils.wait_for_change(self.idl, self.timeout)
|
idlutils.wait_for_change(self.idl, self.timeout)
|
||||||
self.poller = poller.Poller()
|
self.poller = poller.Poller()
|
||||||
self.thread = threading.Thread(target=self.run)
|
self.thread = threading.Thread(target=self.run)
|
||||||
self.thread.setDaemon(True)
|
self.thread.setDaemon(True)
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
|
|
||||||
|
@removals.remove(
|
||||||
|
version='Ocata', removal_version='Pike',
|
||||||
|
message=_("Use idlutils.get_schema_helper(conn, schema, retry=True)"))
|
||||||
def get_schema_helper(self):
|
def get_schema_helper(self):
|
||||||
"""Retrieve the schema helper object from OVSDB"""
|
"""Retrieve the schema helper object from OVSDB"""
|
||||||
try:
|
return idlutils.get_schema_helper(self.connection, self.schema_name,
|
||||||
helper = idlutils.get_schema_helper(self.connection,
|
retry=True)
|
||||||
self.schema_name)
|
|
||||||
except Exception:
|
|
||||||
# We may have failed do to set-manager not being called
|
|
||||||
helpers.enable_connection_uri(self.connection)
|
|
||||||
|
|
||||||
# There is a small window for a race, so retry up to a second
|
|
||||||
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=0.01),
|
|
||||||
stop=tenacity.stop_after_delay(1),
|
|
||||||
reraise=True)
|
|
||||||
def do_get_schema_helper():
|
|
||||||
return idlutils.get_schema_helper(self.connection,
|
|
||||||
self.schema_name)
|
|
||||||
helper = do_get_schema_helper()
|
|
||||||
|
|
||||||
return helper
|
|
||||||
|
|
||||||
|
@removals.remove(
|
||||||
|
version='Ocata', removal_version='Pike',
|
||||||
|
message=_("Use an idl_factory and ovs.db.SchemaHelper for filtering"))
|
||||||
def update_schema_helper(self, helper):
|
def update_schema_helper(self, helper):
|
||||||
if self._schema_filter:
|
if self._schema_filter:
|
||||||
for table_name in self._schema_filter:
|
for table_name in self._schema_filter:
|
||||||
|
|||||||
@@ -18,14 +18,17 @@ import time
|
|||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from neutron_lib import exceptions
|
from neutron_lib import exceptions
|
||||||
|
from oslo_utils import excutils
|
||||||
from ovs.db import idl
|
from ovs.db import idl
|
||||||
from ovs import jsonrpc
|
from ovs import jsonrpc
|
||||||
from ovs import poller
|
from ovs import poller
|
||||||
from ovs import stream
|
from ovs import stream
|
||||||
import six
|
import six
|
||||||
|
import tenacity
|
||||||
|
|
||||||
from neutron._i18n import _
|
from neutron._i18n import _
|
||||||
from neutron.agent.ovsdb import api
|
from neutron.agent.ovsdb import api
|
||||||
|
from neutron.agent.ovsdb.native import helpers
|
||||||
|
|
||||||
|
|
||||||
RowLookup = collections.namedtuple('RowLookup',
|
RowLookup = collections.namedtuple('RowLookup',
|
||||||
@@ -97,7 +100,7 @@ class ExceptionResult(object):
|
|||||||
self.tb = tb
|
self.tb = tb
|
||||||
|
|
||||||
|
|
||||||
def get_schema_helper(connection, schema_name):
|
def _get_schema_helper(connection, schema_name):
|
||||||
err, strm = stream.Stream.open_block(
|
err, strm = stream.Stream.open_block(
|
||||||
stream.Stream.open(connection))
|
stream.Stream.open(connection))
|
||||||
if err:
|
if err:
|
||||||
@@ -115,6 +118,26 @@ def get_schema_helper(connection, schema_name):
|
|||||||
return idl.SchemaHelper(None, resp.result)
|
return idl.SchemaHelper(None, resp.result)
|
||||||
|
|
||||||
|
|
||||||
|
def get_schema_helper(connection, schema_name, retry=True):
|
||||||
|
try:
|
||||||
|
return _get_schema_helper(connection, schema_name)
|
||||||
|
except Exception:
|
||||||
|
with excutils.save_and_reraise_exception(reraise=False) as ctx:
|
||||||
|
if not retry:
|
||||||
|
ctx.reraise = True
|
||||||
|
# We may have failed due to set-manager not being called
|
||||||
|
helpers.enable_connection_uri(connection)
|
||||||
|
|
||||||
|
# There is a small window for a race, so retry up to a second
|
||||||
|
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=0.01),
|
||||||
|
stop=tenacity.stop_after_delay(1),
|
||||||
|
reraise=True)
|
||||||
|
def do_get_schema_helper():
|
||||||
|
return _get_schema_helper(connection, schema_name)
|
||||||
|
|
||||||
|
return do_get_schema_helper()
|
||||||
|
|
||||||
|
|
||||||
def wait_for_change(_idl, timeout, seqno=None):
|
def wait_for_change(_idl, timeout, seqno=None):
|
||||||
if seqno is None:
|
if seqno is None:
|
||||||
seqno = _idl.change_seqno
|
seqno = _idl.change_seqno
|
||||||
|
|||||||
@@ -13,18 +13,35 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from neutron.agent.ovsdb.native import connection
|
from neutron.agent.ovsdb.native import connection
|
||||||
|
from neutron.agent.ovsdb.native import idlutils
|
||||||
from neutron.tests.functional import base
|
from neutron.tests.functional import base
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
from ovs.db import idl
|
||||||
|
|
||||||
|
|
||||||
class OVSDBConnectionTestCase(base.BaseSudoTestCase):
|
class OVSDBConnectionTestCase(base.BaseSudoTestCase):
|
||||||
def setUp(self):
|
|
||||||
super(OVSDBConnectionTestCase, self).setUp()
|
def test_limit_tables(self):
|
||||||
self.connection = connection.Connection(
|
self.connection = connection.Connection(
|
||||||
cfg.CONF.OVS.ovsdb_connection,
|
cfg.CONF.OVS.ovsdb_connection,
|
||||||
cfg.CONF.ovs_vsctl_timeout, 'Open_vSwitch')
|
cfg.CONF.ovs_vsctl_timeout, 'Open_vSwitch')
|
||||||
|
|
||||||
def test_limit_tables(self):
|
|
||||||
tables = ['Open_vSwitch', 'Bridge', 'Port']
|
tables = ['Open_vSwitch', 'Bridge', 'Port']
|
||||||
self.connection.start(table_name_list=tables)
|
self.connection.start(table_name_list=tables)
|
||||||
self.assertItemsEqual(tables, self.connection.idl.tables.keys())
|
self.assertItemsEqual(tables, self.connection.idl.tables.keys())
|
||||||
|
|
||||||
|
def test_idl_factory(self):
|
||||||
|
tables = ['Open_vSwitch', 'Bridge', 'Port']
|
||||||
|
|
||||||
|
def _idl_factory():
|
||||||
|
connection = cfg.CONF.OVS.ovsdb_connection
|
||||||
|
helper = idlutils.get_schema_helper(connection, 'Open_vSwitch')
|
||||||
|
for table in tables:
|
||||||
|
helper.register_table(table)
|
||||||
|
return idl.Idl(connection, helper)
|
||||||
|
|
||||||
|
self.connection = connection.Connection(
|
||||||
|
idl_factory=_idl_factory,
|
||||||
|
timeout=cfg.CONF.ovs_vsctl_timeout,
|
||||||
|
)
|
||||||
|
self.connection.start(table_name_list=tables)
|
||||||
|
self.assertItemsEqual(tables, self.connection.idl.tables.keys())
|
||||||
|
|||||||
@@ -86,8 +86,8 @@ class TestOVSNativeConnection(base.BaseTestCase):
|
|||||||
@mock.patch.object(connection, 'threading')
|
@mock.patch.object(connection, 'threading')
|
||||||
@mock.patch.object(connection.idlutils, 'wait_for_change')
|
@mock.patch.object(connection.idlutils, 'wait_for_change')
|
||||||
@mock.patch.object(connection, 'idl')
|
@mock.patch.object(connection, 'idl')
|
||||||
@mock.patch.object(connection.helpers, 'enable_connection_uri')
|
@mock.patch.object(idlutils.helpers, 'enable_connection_uri')
|
||||||
@mock.patch.object(connection.idlutils, 'get_schema_helper')
|
@mock.patch.object(connection.idlutils, '_get_schema_helper')
|
||||||
def test_do_get_schema_helper_retry(self, mock_get_schema_helper,
|
def test_do_get_schema_helper_retry(self, mock_get_schema_helper,
|
||||||
mock_enable_conn,
|
mock_enable_conn,
|
||||||
mock_idl,
|
mock_idl,
|
||||||
|
|||||||
Reference in New Issue
Block a user