Re-home OvnNbIdlForLb class
Seems like it should live with the other ovsdb code in ovsdb/impl_idl_ovn.py Change-Id: Idee498bc0104533dd58b9b9a82523986a241ec05
This commit is contained in:
parent
d6f65325a0
commit
21701b03d0
|
@ -15,6 +15,7 @@
|
|||
from oslo_log import log as logging
|
||||
|
||||
from ovn_octavia_provider import driver
|
||||
from ovn_octavia_provider.ovsdb import impl_idl_ovn
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -28,7 +29,7 @@ def OvnProviderAgent(exit_event):
|
|||
driver.LogicalSwitchPortUpdateEvent(helper)]
|
||||
|
||||
# NOTE(mjozefcz): This API is only for handling OVSDB events!
|
||||
ovn_nb_idl_for_events = driver.OvnNbIdlForLb(
|
||||
ovn_nb_idl_for_events = impl_idl_ovn.OvnNbIdlForLb(
|
||||
event_lock_name=OVN_EVENT_LOCK_NAME)
|
||||
ovn_nb_idl_for_events.notify_handler.watch_events(events)
|
||||
ovn_nb_idl_for_events.start()
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import atexit
|
||||
import copy
|
||||
import queue
|
||||
import re
|
||||
|
@ -29,7 +28,6 @@ from oslo_config import cfg
|
|||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from ovs.stream import Stream
|
||||
from ovsdbapp.backend.ovs_idl import connection
|
||||
from ovsdbapp.backend.ovs_idl import event as row_event
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
import tenacity
|
||||
|
@ -43,7 +41,6 @@ from ovn_octavia_provider.common import exceptions as ovn_exc
|
|||
from ovn_octavia_provider.common import utils
|
||||
from ovn_octavia_provider.i18n import _
|
||||
from ovn_octavia_provider.ovsdb import impl_idl_ovn
|
||||
from ovn_octavia_provider.ovsdb import ovsdb_monitor
|
||||
|
||||
CONF = cfg.CONF # Gets Octavia Conf as it runs under o-api domain
|
||||
|
||||
|
@ -148,48 +145,6 @@ class LogicalSwitchPortUpdateEvent(row_event.RowEvent):
|
|||
self.driver.vip_port_update_handler(row)
|
||||
|
||||
|
||||
class OvnNbIdlForLb(ovsdb_monitor.OvnIdl):
|
||||
SCHEMA = "OVN_Northbound"
|
||||
TABLES = ('Logical_Switch', 'Load_Balancer', 'Logical_Router',
|
||||
'Logical_Switch_Port', 'Logical_Router_Port',
|
||||
'Gateway_Chassis', 'NAT')
|
||||
|
||||
def __init__(self, event_lock_name=None):
|
||||
self.conn_string = ovn_conf.get_ovn_nb_connection()
|
||||
ovsdb_monitor._check_and_set_ssl_files(self.SCHEMA)
|
||||
helper = self._get_ovsdb_helper(self.conn_string)
|
||||
for table in OvnNbIdlForLb.TABLES:
|
||||
helper.register_table(table)
|
||||
super(OvnNbIdlForLb, self).__init__(
|
||||
driver=None, remote=self.conn_string, schema=helper)
|
||||
self.event_lock_name = event_lock_name
|
||||
if self.event_lock_name:
|
||||
self.set_lock(self.event_lock_name)
|
||||
atexit.register(self.stop)
|
||||
|
||||
@tenacity.retry(
|
||||
wait=tenacity.wait_exponential(max=180),
|
||||
reraise=True)
|
||||
def _get_ovsdb_helper(self, connection_string):
|
||||
return idlutils.get_schema_helper(connection_string, self.SCHEMA)
|
||||
|
||||
def start(self):
|
||||
self.conn = connection.Connection(
|
||||
self, timeout=ovn_conf.get_ovn_ovsdb_timeout())
|
||||
return impl_idl_ovn.OvsdbNbOvnIdl(self.conn)
|
||||
|
||||
def stop(self):
|
||||
# Close the running connection if it has been initalized
|
||||
if (hasattr(self, 'conn') and not
|
||||
self.conn.stop(timeout=ovn_conf.get_ovn_ovsdb_timeout())):
|
||||
LOG.debug("Connection terminated to OvnNb "
|
||||
"but a thread is still alive")
|
||||
# complete the shutdown for the event handler
|
||||
self.notify_handler.shutdown()
|
||||
# Close the idl session
|
||||
self.close()
|
||||
|
||||
|
||||
class OvnProviderHelper(object):
|
||||
|
||||
def __init__(self):
|
||||
|
@ -201,7 +156,7 @@ class OvnProviderHelper(object):
|
|||
self._init_lb_actions()
|
||||
|
||||
# NOTE(mjozefcz): This API is only for handling octavia API requests.
|
||||
self.ovn_nbdb = OvnNbIdlForLb()
|
||||
self.ovn_nbdb = impl_idl_ovn.OvnNbIdlForLb()
|
||||
self.ovn_nbdb_api = self.ovn_nbdb.start()
|
||||
|
||||
self.helper_thread.start()
|
||||
|
|
|
@ -10,11 +10,13 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import atexit
|
||||
import contextlib
|
||||
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from oslo_log import log
|
||||
from ovsdbapp.backend import ovs_idl
|
||||
from ovsdbapp.backend.ovs_idl import connection
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
from ovsdbapp.backend.ovs_idl import transaction as idl_trans
|
||||
from ovsdbapp.schema.ovn_northbound import impl_idl as nb_impl_idl
|
||||
|
@ -23,6 +25,8 @@ import tenacity
|
|||
from ovn_octavia_provider.common import config
|
||||
from ovn_octavia_provider.common import exceptions as ovn_exc
|
||||
from ovn_octavia_provider.i18n import _
|
||||
from ovn_octavia_provider.ovsdb import impl_idl_ovn
|
||||
from ovn_octavia_provider.ovsdb import ovsdb_monitor
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -136,3 +140,45 @@ class OvsdbNbOvnIdl(nb_impl_idl.OvnNbApiIdlImpl, Backend):
|
|||
yield t
|
||||
except ovn_exc.RevisionConflict as e:
|
||||
LOG.info('Transaction aborted. Reason: %s', e)
|
||||
|
||||
|
||||
class OvnNbIdlForLb(ovsdb_monitor.OvnIdl):
|
||||
SCHEMA = "OVN_Northbound"
|
||||
TABLES = ('Logical_Switch', 'Load_Balancer', 'Logical_Router',
|
||||
'Logical_Switch_Port', 'Logical_Router_Port',
|
||||
'Gateway_Chassis', 'NAT')
|
||||
|
||||
def __init__(self, event_lock_name=None):
|
||||
self.conn_string = config.get_ovn_nb_connection()
|
||||
ovsdb_monitor._check_and_set_ssl_files(self.SCHEMA)
|
||||
helper = self._get_ovsdb_helper(self.conn_string)
|
||||
for table in OvnNbIdlForLb.TABLES:
|
||||
helper.register_table(table)
|
||||
super(OvnNbIdlForLb, self).__init__(
|
||||
driver=None, remote=self.conn_string, schema=helper)
|
||||
self.event_lock_name = event_lock_name
|
||||
if self.event_lock_name:
|
||||
self.set_lock(self.event_lock_name)
|
||||
atexit.register(self.stop)
|
||||
|
||||
@tenacity.retry(
|
||||
wait=tenacity.wait_exponential(max=180),
|
||||
reraise=True)
|
||||
def _get_ovsdb_helper(self, connection_string):
|
||||
return idlutils.get_schema_helper(connection_string, self.SCHEMA)
|
||||
|
||||
def start(self):
|
||||
self.conn = connection.Connection(
|
||||
self, timeout=config.get_ovn_ovsdb_timeout())
|
||||
return impl_idl_ovn.OvsdbNbOvnIdl(self.conn)
|
||||
|
||||
def stop(self):
|
||||
# Close the running connection if it has been initalized
|
||||
if ((hasattr(self, 'conn') and not
|
||||
self.conn.stop(timeout=config.get_ovn_ovsdb_timeout()))):
|
||||
LOG.debug("Connection terminated to OvnNb "
|
||||
"but a thread is still alive")
|
||||
# complete the shutdown for the event handler
|
||||
self.notify_handler.shutdown()
|
||||
# Close the idl session
|
||||
self.close()
|
||||
|
|
|
@ -34,6 +34,7 @@ from neutron.tests.functional import base
|
|||
from ovn_octavia_provider import agent as ovn_agent
|
||||
from ovn_octavia_provider.common import constants as ovn_const
|
||||
from ovn_octavia_provider import driver as ovn_driver
|
||||
from ovn_octavia_provider.ovsdb import impl_idl_ovn
|
||||
|
||||
LR_REF_KEY_HEADER = 'neutron-'
|
||||
|
||||
|
@ -1180,7 +1181,7 @@ class TestOvnOctaviaProviderAgent(TestOvnOctaviaBase):
|
|||
da_helper = ovn_driver.OvnProviderHelper()
|
||||
events = [ovn_driver.LogicalRouterPortEvent(da_helper),
|
||||
ovn_driver.LogicalSwitchPortUpdateEvent(da_helper)]
|
||||
ovn_nb_idl_for_events = ovn_driver.OvnNbIdlForLb(
|
||||
ovn_nb_idl_for_events = impl_idl_ovn.OvnNbIdlForLb(
|
||||
event_lock_name='func_test')
|
||||
ovn_nb_idl_for_events.notify_handler.watch_events(events)
|
||||
ovn_nb_idl_for_events.start()
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import os
|
||||
from unittest import mock
|
||||
|
||||
from neutron.tests import base
|
||||
from ovs.db import idl as ovs_idl
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
|
||||
from ovn_octavia_provider.ovsdb import impl_idl_ovn
|
||||
|
||||
basedir = os.path.dirname(os.path.abspath(__file__))
|
||||
schema_files = {
|
||||
'OVN_Northbound': os.path.join(basedir,
|
||||
'..', 'schemas', 'ovn-nb.ovsschema')}
|
||||
|
||||
|
||||
class TestOvnNbIdlForLb(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestOvnNbIdlForLb, self).setUp()
|
||||
# TODO(haleyb) - figure out why every test in this class generates
|
||||
# this warning, think it's in relation to reading this schema file:
|
||||
# sys:1: ResourceWarning: unclosed file <_io.FileIO name=1 mode='wb'
|
||||
# closefd=True> ResourceWarning: Enable tracemalloc to get the object
|
||||
# allocation traceback
|
||||
self.mock_gsh = mock.patch.object(
|
||||
idlutils, 'get_schema_helper',
|
||||
side_effect=lambda x, y: ovs_idl.SchemaHelper(
|
||||
location=schema_files['OVN_Northbound'])).start()
|
||||
self.idl = impl_idl_ovn.OvnNbIdlForLb()
|
||||
|
||||
def test__get_ovsdb_helper(self):
|
||||
self.mock_gsh.reset_mock()
|
||||
self.idl._get_ovsdb_helper('foo')
|
||||
self.mock_gsh.assert_called_once_with('foo', 'OVN_Northbound')
|
||||
|
||||
def test_start(self):
|
||||
with mock.patch('ovsdbapp.backend.ovs_idl.connection.Connection',
|
||||
side_effect=lambda x, timeout: mock.Mock()):
|
||||
idl1 = impl_idl_ovn.OvnNbIdlForLb()
|
||||
ret1 = idl1.start()
|
||||
id1 = id(ret1.ovsdb_connection)
|
||||
idl2 = impl_idl_ovn.OvnNbIdlForLb()
|
||||
ret2 = idl2.start()
|
||||
id2 = id(ret2.ovsdb_connection)
|
||||
self.assertNotEqual(id1, id2)
|
||||
|
||||
@mock.patch('ovsdbapp.backend.ovs_idl.connection.Connection')
|
||||
def test_stop(self, mock_conn):
|
||||
mock_conn.stop.return_value = False
|
||||
with (
|
||||
mock.patch.object(
|
||||
self.idl.notify_handler, 'shutdown')) as mock_notify, (
|
||||
mock.patch.object(self.idl, 'close')) as mock_close:
|
||||
self.idl.start()
|
||||
self.idl.stop()
|
||||
mock_notify.assert_called_once_with()
|
||||
mock_close.assert_called_once_with()
|
||||
|
||||
def test_setlock(self):
|
||||
with mock.patch.object(impl_idl_ovn.OvnNbIdlForLb,
|
||||
'set_lock') as set_lock:
|
||||
self.idl = impl_idl_ovn.OvnNbIdlForLb(event_lock_name='foo')
|
||||
set_lock.assert_called_once_with('foo')
|
|
@ -12,7 +12,6 @@
|
|||
# under the License.
|
||||
#
|
||||
import copy
|
||||
import os
|
||||
from unittest import mock
|
||||
|
||||
from neutron.tests import base
|
||||
|
@ -22,7 +21,6 @@ from octavia_lib.api.drivers import driver_lib
|
|||
from octavia_lib.api.drivers import exceptions
|
||||
from octavia_lib.common import constants
|
||||
from oslo_utils import uuidutils
|
||||
from ovs.db import idl as ovs_idl
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
|
||||
from ovn_octavia_provider import agent as ovn_agent
|
||||
|
@ -30,55 +28,6 @@ from ovn_octavia_provider.common import constants as ovn_const
|
|||
from ovn_octavia_provider import driver as ovn_driver
|
||||
from ovn_octavia_provider.tests.unit import fakes
|
||||
|
||||
basedir = os.path.dirname(os.path.abspath(__file__))
|
||||
schema_files = {
|
||||
'OVN_Northbound': os.path.join(basedir,
|
||||
'schemas', 'ovn-nb.ovsschema')}
|
||||
|
||||
|
||||
class TestOvnNbIdlForLb(base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestOvnNbIdlForLb, self).setUp()
|
||||
self.mock_gsh = mock.patch.object(
|
||||
idlutils, 'get_schema_helper',
|
||||
side_effect=lambda x, y: ovs_idl.SchemaHelper(
|
||||
location=schema_files['OVN_Northbound'])).start()
|
||||
self.idl = ovn_driver.OvnNbIdlForLb()
|
||||
|
||||
def test__get_ovsdb_helper(self):
|
||||
self.mock_gsh.reset_mock()
|
||||
self.idl._get_ovsdb_helper('foo')
|
||||
self.mock_gsh.assert_called_once_with('foo', 'OVN_Northbound')
|
||||
|
||||
def test_start(self):
|
||||
with mock.patch('ovsdbapp.backend.ovs_idl.connection.Connection',
|
||||
side_effect=lambda x, timeout: mock.Mock()):
|
||||
idl1 = ovn_driver.OvnNbIdlForLb()
|
||||
ret1 = idl1.start()
|
||||
id1 = id(ret1.ovsdb_connection)
|
||||
idl2 = ovn_driver.OvnNbIdlForLb()
|
||||
ret2 = idl2.start()
|
||||
id2 = id(ret2.ovsdb_connection)
|
||||
self.assertNotEqual(id1, id2)
|
||||
|
||||
@mock.patch('ovsdbapp.backend.ovs_idl.connection.Connection')
|
||||
def test_stop(self, mock_conn):
|
||||
mock_conn.stop.return_value = False
|
||||
with (
|
||||
mock.patch.object(
|
||||
self.idl.notify_handler, 'shutdown')) as mock_notify, (
|
||||
mock.patch.object(self.idl, 'close')) as mock_close:
|
||||
self.idl.start()
|
||||
self.idl.stop()
|
||||
mock_notify.assert_called_once_with()
|
||||
mock_close.assert_called_once_with()
|
||||
|
||||
def test_setlock(self):
|
||||
with mock.patch.object(ovn_driver.OvnNbIdlForLb,
|
||||
'set_lock') as set_lock:
|
||||
self.idl = ovn_driver.OvnNbIdlForLb(event_lock_name='foo')
|
||||
set_lock.assert_called_once_with('foo')
|
||||
|
||||
|
||||
class TestOvnOctaviaBase(base.BaseTestCase):
|
||||
|
||||
|
@ -98,7 +47,8 @@ class TestOvnOctaviaBase(base.BaseTestCase):
|
|||
self.vip_network_id = uuidutils.generate_uuid()
|
||||
self.vip_port_id = uuidutils.generate_uuid()
|
||||
self.vip_subnet_id = uuidutils.generate_uuid()
|
||||
ovn_nb_idl = mock.patch("ovn_octavia_provider.driver.OvnNbIdlForLb")
|
||||
ovn_nb_idl = mock.patch(
|
||||
"ovn_octavia_provider.ovsdb.impl_idl_ovn.OvnNbIdlForLb")
|
||||
self.mock_ovn_nb_idl = ovn_nb_idl.start()
|
||||
self.member_address = "192.168.2.149"
|
||||
self.vip_address = '192.148.210.109'
|
||||
|
|
Loading…
Reference in New Issue