Browse Source

Distributed OVSDB lock: Make use of the HashRing

Changes:

* New OvnIdlDistributedLock class added. This is now the base class
  which the NB and SB OVSDBs IDLs will inherit from. The old OvnIdl
  class was kept because services like the Metadata agent and Octavia
  driver uses it and for those services the OVSDB lock seems sufficient
  (but nothing prevents us from updating them too in the future)

* A new pre_fork_initialize() hook was added to the mechanism driver.
  This hook runs before the process is forked (to create the workers)
  and it does two things:

  - Set a signal handler for SIGTERM. So, in case where a SIGTERM is
    sent the service will handle it and clean up the Hash Ring before
    exiting.

  - Clean up the Hash Ring at start up. If there's some leftover (in
    case of a SIGKILL for example) the workers will be considered part
    of the Hash Ring until they timeout. And in the start of the service
    we can get rid of those dead entries.

* Remove OvnWorker: The OvnWorker was responsible for running the
  ovn_db_sync() code and handling the events from OVSDB. Now, this
  patch moved the ovn_db_sync() into the MaintenanceWorker and since
  OVSDB events are now handled by all other workers in parallel there's
  no more use for the OvnWorker and it has been removed from the code.

* The unittests test_notify_no_ovsdb_lock() and
  test_notify_ovsdb_lock_not_yet_contended() were converged into a single
  test called test_notify_different_target_node(). Different from the
  OVSDB lock that has "locked" and "lock contended" the Hash Ring only
  cares about whether the hash matches with the node_uuid of the
  instance or not.

Closes-Bug: #1823715
Change-Id: I00b24cd1f8eaae2386d732af34365fa1f81e565a
Signed-off-by: Lucas Alvares Gomes <lucasagomes@gmail.com>
changes/08/655408/7
Lucas Alvares Gomes 3 years ago
parent
commit
6ef5489cf7
  1. 34
      networking_ovn/common/maintenance.py
  2. 35
      networking_ovn/ml2/mech_driver.py
  3. 24
      networking_ovn/ovsdb/impl_idl_ovn.py
  4. 47
      networking_ovn/ovsdb/ovsdb_monitor.py
  5. 9
      networking_ovn/ovsdb/worker.py
  6. 10
      networking_ovn/tests/functional/base.py
  7. 4
      networking_ovn/tests/functional/test_ovn_db_sync.py
  8. 101
      networking_ovn/tests/functional/test_ovsdb_monitor.py
  9. 8
      networking_ovn/tests/functional/test_router.py
  10. 9
      networking_ovn/tests/unit/ml2/test_mech_driver.py
  11. 21
      networking_ovn/tests/unit/ovsdb/test_ovsdb_monitor.py

34
networking_ovn/common/maintenance.py

@ -17,16 +17,15 @@ import inspect
import threading
from futurist import periodics
from neutron.common import config as n_conf
from neutron_lib import constants as n_const
from neutron_lib import context as n_context
from neutron_lib import exceptions as n_exc
from neutron_lib import worker
from oslo_log import log
from oslo_utils import timeutils
from networking_ovn.common import config as ovn_conf
from networking_ovn.common import constants as ovn_const
from networking_ovn.db import hash_ring as db_hash_ring
from networking_ovn.db import maintenance as db_maint
from networking_ovn.db import revision as db_rev
from networking_ovn import ovn_db_sync
@ -36,27 +35,6 @@ LOG = log.getLogger(__name__)
DB_CONSISTENCY_CHECK_INTERVAL = 300 # 5 minutes
class MaintenanceWorker(worker.BaseWorker):
def start(self):
super(MaintenanceWorker, self).start()
# NOTE(twilson) The super class will trigger the post_fork_initialize
# in the driver, which starts the connection/IDL notify loop which
# keeps the process from exiting
def stop(self):
"""Stop service."""
super(MaintenanceWorker, self).stop()
def wait(self):
"""Wait for service to complete."""
super(MaintenanceWorker, self).wait()
@staticmethod
def reset():
n_conf.reset_service()
class MaintenanceThread(object):
def __init__(self):
@ -407,3 +385,13 @@ class DBInconsistenciesPeriodics(object):
port.name, addresses=addresses).execute(check_error=True)
raise periodics.NeverAgain()
# The static spacing value here is half of the
# HASH_RING_NODES_TIMEOUT, we want to be able to try to touch the nodes
# at least twice before they are considered dead.
@periodics.periodic(spacing=ovn_const.HASH_RING_NODES_TIMEOUT / 2)
def touch_hash_ring_nodes(self):
# NOTE(lucasagomes): Note that we do not rely on the OVSDB lock
# here because we want the maintenance tasks from each instance to
# execute this task.
db_hash_ring.touch_nodes_from_host()

35
networking_ovn/ml2/mech_driver.py

@ -12,8 +12,10 @@
# under the License.
#
import atexit
import functools
import operator
import signal
import threading
import types
import uuid
@ -47,6 +49,7 @@ from networking_ovn.common import exceptions as ovn_exc
from networking_ovn.common import maintenance
from networking_ovn.common import ovn_client
from networking_ovn.common import utils
from networking_ovn.db import hash_ring as db_hash_ring
from networking_ovn.db import revision as db_rev
from networking_ovn.ml2 import qos_driver
from networking_ovn.ml2 import trunk_driver
@ -103,6 +106,7 @@ class OVNMechanismDriver(api.MechanismDriver):
self._plugin_property = None
self._ovn_client_inst = None
self._maintenance_thread = None
self.node_uuid = None
self.sg_enabled = ovn_acl.is_sg_enabled()
self._post_fork_event = threading.Event()
if cfg.CONF.SECURITYGROUP.firewall_driver:
@ -145,10 +149,12 @@ class OVNMechanismDriver(api.MechanismDriver):
}
def subscribe(self):
registry.subscribe(self.pre_fork_initialize,
resources.PROCESS,
events.BEFORE_SPAWN)
registry.subscribe(self.post_fork_initialize,
resources.PROCESS,
events.AFTER_INIT)
registry.subscribe(self._add_segment_host_mapping_for_segment,
resources.SEGMENT,
events.AFTER_CREATE)
@ -177,13 +183,29 @@ class OVNMechanismDriver(api.MechanismDriver):
resources.SECURITY_GROUP_RULE,
events.BEFORE_DELETE)
def _clean_hash_ring(self, *args, **kwargs):
db_hash_ring.remove_nodes_from_host()
def pre_fork_initialize(self, resource, event, trigger, payload=None):
"""Pre-initialize the ML2/OVN driver."""
self._clean_hash_ring()
atexit.register(self._clean_hash_ring)
signal.signal(signal.SIGTERM, self._clean_hash_ring)
def post_fork_initialize(self, resource, event, trigger, payload=None):
# NOTE(rtheis): This will initialize all workers (API, RPC,
# plugin service and OVN) with OVN IDL connections.
self._post_fork_event.clear()
self._ovn_client_inst = None
self._nb_ovn, self._sb_ovn = impl_idl_ovn.get_ovn_idls(self,
trigger)
is_maintenance = (utils.get_method_class(trigger) ==
worker.MaintenanceWorker)
if not is_maintenance:
self.node_uuid = db_hash_ring.add_node()
self._nb_ovn, self._sb_ovn = impl_idl_ovn.get_ovn_idls(
self, trigger, binding_events=not is_maintenance)
# Override agents API methods
self.patch_plugin_merge("get_agents", get_agents)
self.patch_plugin_choose("get_agent", get_agent)
@ -193,8 +215,8 @@ class OVNMechanismDriver(api.MechanismDriver):
# Now IDL connections can be safely used.
self._post_fork_event.set()
if utils.get_method_class(trigger) == worker.OvnWorker:
# Call the synchronization task if its ovn worker
if is_maintenance:
# Call the synchronization task if its maintenance worker
# This sync neutron DB to OVN-NB DB only in inconsistent states
self.nb_synchronizer = ovn_db_sync.OvnNbSynchronizer(
self._plugin,
@ -213,7 +235,6 @@ class OVNMechanismDriver(api.MechanismDriver):
)
self.sb_synchronizer.sync()
if utils.get_method_class(trigger) == maintenance.MaintenanceWorker:
self._maintenance_thread = maintenance.MaintenanceThread()
self._maintenance_thread.add_periodics(
maintenance.DBInconsistenciesPeriodics(self._ovn_client))
@ -696,7 +717,7 @@ class OVNMechanismDriver(api.MechanismDriver):
workers, can return a sequence of worker instances.
"""
# See doc/source/design/ovn_worker.rst for more details.
return [worker.OvnWorker(), maintenance.MaintenanceWorker()]
return [worker.MaintenanceWorker()]
def _update_subport_host_if_needed(self, port_id):
parent_port = self._ovn_client.get_parent_port(port_id)

24
networking_ovn/ovsdb/impl_idl_ovn.py

@ -33,7 +33,6 @@ from networking_ovn.common import exceptions as ovn_exc
from networking_ovn.common import utils
from networking_ovn.ovsdb import commands as cmd
from networking_ovn.ovsdb import ovsdb_monitor
from networking_ovn.ovsdb import worker
LOG = log.getLogger(__name__)
@ -121,7 +120,7 @@ class OvsdbConnectionUnavailable(n_exc.ServiceUnavailable):
# Retry forever to get the OVN NB and SB IDLs. Wait 2^x * 1 seconds between
# each retry, up to 180 seconds, then 180 seconds afterwards.
def get_ovn_idls(driver, trigger):
def get_ovn_idls(driver, trigger, binding_events=False):
@tenacity.retry(
wait=tenacity.wait_exponential(max=180),
reraise=True)
@ -129,28 +128,29 @@ def get_ovn_idls(driver, trigger):
trigger_class = utils.get_method_class(trigger)
LOG.info('Getting %(cls)s for %(trigger)s with retry',
{'cls': cls.__name__, 'trigger': trigger_class.__name__})
return cls(get_connection(cls, trigger, driver))
return cls(get_connection(cls, trigger, driver, binding_events))
vlog.use_python_logger(max_level=cfg.get_ovn_ovsdb_log_level())
return tuple(get_ovn_idl_retry(c) for c in (OvsdbNbOvnIdl, OvsdbSbOvnIdl))
def get_connection(db_class, trigger=None, driver=None):
# The trigger is the start() method of the worker class
def get_connection(db_class, trigger=None, driver=None, binding_events=False):
if db_class == OvsdbNbOvnIdl:
args = (cfg.get_ovn_nb_connection(), 'OVN_Northbound')
cls = ovsdb_monitor.OvnNbIdl
elif db_class == OvsdbSbOvnIdl:
args = (cfg.get_ovn_sb_connection(), 'OVN_Southbound')
cls = ovsdb_monitor.OvnSbIdl
if trigger and utils.get_method_class(trigger) == worker.OvnWorker:
idl_ = cls.from_server(*args, driver=driver)
else:
if db_class == OvsdbSbOvnIdl:
idl_ = ovsdb_monitor.BaseOvnSbIdl.from_server(*args)
if binding_events:
if db_class == OvsdbNbOvnIdl:
idl_ = ovsdb_monitor.OvnNbIdl.from_server(*args, driver=driver)
else:
idl_ = ovsdb_monitor.OvnSbIdl.from_server(*args, driver=driver)
else:
if db_class == OvsdbNbOvnIdl:
idl_ = ovsdb_monitor.BaseOvnIdl.from_server(*args)
else:
idl_ = ovsdb_monitor.BaseOvnSbIdl.from_server(*args)
return connection.Connection(idl_, timeout=cfg.get_ovn_ovsdb_timeout())

47
networking_ovn/ovsdb/ovsdb_monitor.py

@ -17,6 +17,7 @@ import abc
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from neutron_lib.utils import helpers
from oslo_config import cfg
from oslo_log import log
from ovs.stream import Stream
from ovsdbapp.backend.ovs_idl import connection
@ -27,8 +28,11 @@ from ovsdbapp import event
from networking_ovn.agent import stats
from networking_ovn.common import config as ovn_config
from networking_ovn.common import constants as ovn_const
from networking_ovn.common import exceptions
from networking_ovn.common import hash_ring_manager
from networking_ovn.common import utils
CONF = cfg.CONF
LOG = log.getLogger(__name__)
@ -338,7 +342,38 @@ class OvnIdl(BaseOvnIdl):
"""Should be called after the idl has been initialized"""
class OvnNbIdl(OvnIdl):
class OvnIdlDistributedLock(BaseOvnIdl):
def __init__(self, driver, remote, schema):
super(OvnIdlDistributedLock, self).__init__(remote, schema)
self.driver = driver
self.notify_handler = OvnDbNotifyHandler(driver)
self._hash_ring = hash_ring_manager.HashRingManager()
self._node_uuid = self.driver.node_uuid
def notify(self, event, row, updates=None):
try:
target_node = self._hash_ring.get_node(str(row.uuid))
except exceptions.HashRingIsEmpty as e:
LOG.error('HashRing is empty, error: %s', e)
return
if target_node != self._node_uuid:
return
LOG.debug('Hash Ring: Node %(node)s (host: %(hostname)s) '
'handling event "%(event)s" for row %(row)s '
'(table: %(table)s)',
{'node': self._node_uuid, 'hostname': CONF.host,
'event': event, 'row': row.uuid, 'table': row._table.name})
self.notify_handler.notify(event, row, updates)
@abc.abstractmethod
def post_connect(self):
"""Should be called after the idl has been initialized"""
class OvnNbIdl(OvnIdlDistributedLock):
def __init__(self, driver, remote, schema):
super(OvnNbIdl, self).__init__(driver, remote, schema)
@ -360,9 +395,7 @@ class OvnNbIdl(OvnIdl):
_check_and_set_ssl_files(schema_name)
helper = idlutils.get_schema_helper(connection_string, schema_name)
helper.register_all()
_idl = cls(driver, connection_string, helper)
_idl.set_lock(_idl.event_lock_name)
return _idl
return cls(driver, connection_string, helper)
def unwatch_logical_switch_port_create_events(self):
"""Unwatch the logical switch port create events.
@ -382,7 +415,7 @@ class OvnNbIdl(OvnIdl):
self.unwatch_logical_switch_port_create_events()
class OvnSbIdl(OvnIdl):
class OvnSbIdl(OvnIdlDistributedLock):
@classmethod
def from_server(cls, connection_string, schema_name, driver):
@ -393,9 +426,7 @@ class OvnSbIdl(OvnIdl):
helper.register_table('Port_Binding')
helper.register_table('Datapath_Binding')
helper.register_table('MAC_Binding')
_idl = cls(driver, connection_string, helper)
_idl.set_lock(_idl.event_lock_name)
return _idl
return cls(driver, connection_string, helper)
def post_connect(self):
"""Watch Chassis events.

9
networking_ovn/ovsdb/worker.py

@ -16,20 +16,21 @@ from neutron.common import config
from neutron_lib import worker
class OvnWorker(worker.BaseWorker):
class MaintenanceWorker(worker.BaseWorker):
def start(self):
super(OvnWorker, self).start()
super(MaintenanceWorker, self).start()
# NOTE(twilson) The super class will trigger the post_fork_initialize
# in the driver, which starts the connection/IDL notify loop which
# keeps the process from exiting
def stop(self):
"""Stop service."""
# TODO(numans)
super(MaintenanceWorker, self).stop()
def wait(self):
"""Wait for service to complete."""
# TODO(numans)
super(MaintenanceWorker, self).wait()
@staticmethod
def reset():

10
networking_ovn/tests/functional/base.py

@ -95,7 +95,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase):
_counter = 0
l3_plugin = 'networking_ovn.l3.l3_ovn.OVNL3RouterPlugin'
def setUp(self, ovn_worker=False):
def setUp(self, maintenance_worker=False):
config.cfg.CONF.set_override('extension_drivers',
self._extension_drivers,
group='ml2')
@ -118,7 +118,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase):
self.l3_plugin = directory.get_plugin(constants.L3)
self.ovsdb_server_mgr = None
self.ovn_northd_mgr = None
self.ovn_worker = ovn_worker
self.maintenance_worker = maintenance_worker
self._start_ovsdb_server_and_idls()
self._start_ovn_northd()
@ -225,8 +225,8 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase):
pass
trigger_cls = TriggerCls()
if self.ovn_worker:
trigger_cls.trigger.__self__.__class__ = worker.OvnWorker
if self.maintenance_worker:
trigger_cls.trigger.__self__.__class__ = worker.MaintenanceWorker
cfg.CONF.set_override('neutron_sync_mode', 'off', 'ovn')
self.addCleanup(self.stop)
@ -236,7 +236,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase):
mock.ANY, mock.ANY, trigger_cls.trigger)
def stop(self):
if self.ovn_worker:
if self.maintenance_worker:
self.mech_driver.nb_synchronizer.stop()
self.mech_driver.sb_synchronizer.stop()
self.mech_driver._nb_ovn.ovsdb_connection.stop()

4
networking_ovn/tests/functional/test_ovn_db_sync.py

@ -41,7 +41,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
def setUp(self):
ovn_config.cfg.CONF.set_override('dns_domain', 'ovn.test')
super(TestOvnNbSync, self).setUp()
super(TestOvnNbSync, self).setUp(maintenance_worker=True)
ext_mgr = test_extraroute.ExtraRouteTestExtensionManager()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
sg_mgr = test_securitygroup.SecurityGroupTestExtensionManager()
@ -1518,7 +1518,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
class TestOvnSbSync(base.TestOVNFunctionalBase):
def setUp(self):
super(TestOvnSbSync, self).setUp(ovn_worker=False)
super(TestOvnSbSync, self).setUp(maintenance_worker=True)
self.segments_plugin = directory.get_plugin('segments')
self.sb_synchronizer = ovn_db_sync.OvnSbSynchronizer(
self.plugin, self.mech_driver._sb_ovn, self.mech_driver)

101
networking_ovn/tests/functional/test_ovsdb_monitor.py

@ -17,6 +17,8 @@ import threading
import mock
from oslo_utils import uuidutils
from networking_ovn.common import constants as ovn_const
from networking_ovn.db import hash_ring as db_hash_ring
from networking_ovn.ovsdb import ovsdb_monitor
from networking_ovn.tests.functional import base
from neutron.common import utils as n_utils
@ -48,10 +50,26 @@ class WaitForMACBindingDeleteEvent(event.RowEvent):
return self.event.wait(self.timeout)
class DistributedLockTestEvent(event.WaitEvent):
ONETIME = False
COUNTER = 0
def __init__(self):
table = 'Logical_Switch_Port'
events = (self.ROW_CREATE,)
super(DistributedLockTestEvent, self).__init__(
events, table, (), timeout=15)
self.event_name = 'DistributedLockTestEvent'
def run(self, event, row, old):
self.COUNTER += 1
self.event.set()
class TestNBDbMonitor(base.TestOVNFunctionalBase):
def setUp(self):
super(TestNBDbMonitor, self).setUp(ovn_worker=True)
super(TestNBDbMonitor, self).setUp()
self.chassis = self.add_fake_chassis('ovs-host1')
self.l3_plugin = directory.get_plugin(plugin_constants.L3)
@ -174,54 +192,41 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
self._test_port_binding_and_status(port['id'], 'bind', 'ACTIVE')
self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN')
def test_ovsdb_monitor_lock(self):
"""Test case to test the ovsdb monitor lock used by OvnConnection.
This test case created another IDL connection to the NB DB using
the ovsdb_monitor.OvnConnection.
With this we will have 2 'ovsdb_monitor.OvnConnection's. At the
start the lock should be with the IDL connection created by the
'TestOVNFunctionalBase' setup() function.
The port up/down events should be handled by the first IDL connection.
Then the first IDL connection will release the lock so that the 2nd IDL
connection created in this test case gets the lock and it should
handle the port up/down events. Later when 2nd IDL connection releases
lock, first IDL connection will get the lock and handles the
port up/down events.
Please note that the "self.monitor_nb_idl_con" created by the base
class is created using 'connection.Connection' and hence it will not
contend for any lock.
"""
fake_driver = mock.MagicMock()
_idl = ovsdb_monitor.OvnNbIdl.from_server(
self.ovsdb_server_mgr.get_ovsdb_connection_path(),
'OVN_Northbound', fake_driver)
tst_ovn_conn = self.useFixture(
base.ConnectionFixture(idl=_idl, timeout=10)).connection
tst_ovn_conn.start()
port = self.create_port()
# mech_driver will release the lock to fake test driver. During chassis
# binding and unbinding, port status won't change(i.e will be DOWN)
# as mech driver can't update it.
self.mech_driver._nb_ovn.idl.set_lock(None)
n_utils.wait_until_true(lambda: tst_ovn_conn.idl.has_lock)
self.mech_driver._nb_ovn.idl.set_lock(
self.mech_driver._nb_ovn.idl.event_lock_name)
self._test_port_binding_and_status(port['id'], 'bind', 'DOWN')
self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN')
def test_distributed_lock(self):
row_event = DistributedLockTestEvent()
self.mech_driver._nb_ovn.idl.notify_handler.watch_event(row_event)
worker_list = [self.mech_driver._nb_ovn, ]
# Create 10 fake workers
for _ in range(10):
node_uuid = uuidutils.generate_uuid()
db_hash_ring.add_node(node_uuid)
fake_driver = mock.MagicMock(node_uuid=node_uuid)
_idl = ovsdb_monitor.OvnNbIdl.from_server(
self.ovsdb_server_mgr.get_ovsdb_connection_path(),
'OVN_Northbound', fake_driver)
worker = self.useFixture(
base.ConnectionFixture(idl=_idl, timeout=10)).connection
worker.idl.notify_handler.watch_event(row_event)
worker.start()
worker_list.append(worker)
# Refresh the hash rings just in case
[worker.idl._hash_ring.refresh() for worker in worker_list]
# Assert we have 11 active workers in the ring
self.assertEqual(
11, len(db_hash_ring.get_active_nodes(
interval=ovn_const.HASH_RING_NODES_TIMEOUT)))
# Trigger the event
self.create_port()
# Wait for the event to complete
self.assertTrue(row_event.wait())
# Fake driver will relase the lock to mech driver. Port status will be
# updated to 'ACTIVE' for chassis binding and to 'DOWN' for chassis
# unbinding.
tst_ovn_conn.idl.set_lock(None)
n_utils.wait_until_true(lambda: self.mech_driver._nb_ovn.idl.has_lock)
self._test_port_binding_and_status(port['id'], 'bind', 'ACTIVE')
self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN')
# Assert that only one worker handled the event
self.assertEqual(1, row_event.COUNTER)
class TestNBDbMonitorOverTcp(TestNBDbMonitor):

8
networking_ovn/tests/functional/test_router.py

@ -31,7 +31,7 @@ from networking_ovn.tests.functional import base
class TestRouter(base.TestOVNFunctionalBase):
def setUp(self):
super(TestRouter, self).setUp(ovn_worker=True)
super(TestRouter, self).setUp()
self.chassis1 = self.add_fake_chassis(
'ovs-host1', physical_nets=['physnet1', 'physnet3'])
self.chassis2 = self.add_fake_chassis(
@ -237,9 +237,9 @@ class TestRouter(base.TestOVNFunctionalBase):
self._set_redirect_chassis_to_invalid_chassis(ovn_client)
self.l3_plugin.schedule_unhosted_gateways()
# We can't test call_count for these mocks, as we have enabled
# ovn_worker which will trigger chassis events and eventually
# calling schedule_unhosted_gateways
# We can't test call_count for these mocks, as we have disabled
# maintenance_worker which will trigger chassis events
# and eventually calling schedule_unhosted_gateways
self.assertTrue(client_select.called)
self.assertTrue(plugin_select.called)

9
networking_ovn/tests/unit/ml2/test_mech_driver.py

@ -1609,6 +1609,15 @@ class OVNMechanismDriverTestCase(test_plugin.Ml2PluginV2TestCase):
['8.8.8.8'],
group='ovn')
super(OVNMechanismDriverTestCase, self).setUp()
# Make sure the node and target_node for the hash ring in the
# mechanism driver matches
node_uuid = uuidutils.generate_uuid()
p = mock.patch('networking_ovn.common.hash_ring_manager.'
'HashRingManager.get_node', return_value=node_uuid)
p.start()
self.addCleanup(p.stop)
self.driver.node_uuid = node_uuid
mm = directory.get_plugin().mechanism_manager
self.mech_driver = mm.mech_drivers['ovn'].obj
nb_ovn = fakes.FakeOvsdbNbOvnIdl()

21
networking_ovn/tests/unit/ovsdb/test_ovsdb_monitor.py

@ -93,8 +93,6 @@ class TestOvnNbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
helper = ovs_idl.SchemaHelper(schema_json=OVN_NB_SCHEMA)
helper.register_all()
self.idl = ovsdb_monitor.OvnNbIdl(self.driver, "remote", helper)
self.idl.lock_name = self.idl.event_lock_name
self.idl.has_lock = True
self.lp_table = self.idl.tables.get('Logical_Switch_Port')
self.driver.set_port_status_up = mock.Mock()
self.driver.set_port_status_down = mock.Mock()
@ -198,18 +196,17 @@ class TestOvnNbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
self.assertFalse(self.driver.set_port_status_up.called)
self.assertFalse(self.driver.set_port_status_down.called)
def test_notify_no_ovsdb_lock(self):
self.idl.is_lock_contended = True
@mock.patch('networking_ovn.common.hash_ring_manager.'
'HashRingManager.get_node')
def test_notify_different_target_node(self, mock_get_node):
mock_get_node.return_value = 'this-is-a-different-node'
row = fakes.FakeOvsdbRow.create_one_ovsdb_row()
self.idl.notify_handler.notify = mock.Mock()
self.idl.notify("create", mock.ANY)
self.idl.notify("create", row)
# Assert that if the target_node returned by the ring is different
# than this driver's node_uuid, notify() won't be called
self.assertFalse(self.idl.notify_handler.notify.called)
def test_notify_ovsdb_lock_not_yet_contended(self):
self.idl.is_lock_contended = False
self.idl.notify_handler.notify = mock.Mock()
self.idl.notify("create", mock.ANY)
self.assertTrue(self.idl.notify_handler.notify.called)
class TestOvnSbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
@ -220,8 +217,6 @@ class TestOvnSbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
sb_helper = ovs_idl.SchemaHelper(schema_json=OVN_SB_SCHEMA)
sb_helper.register_table('Chassis')
self.sb_idl = ovsdb_monitor.OvnSbIdl(self.driver, "remote", sb_helper)
self.sb_idl.lock_name = self.sb_idl.event_lock_name
self.sb_idl.has_lock = True
self.sb_idl.post_connect()
self.chassis_table = self.sb_idl.tables.get('Chassis')
self.driver.update_segment_host_mapping = mock.Mock()

Loading…
Cancel
Save