diff --git a/neutron/common/ovn/hash_ring_manager.py b/neutron/common/ovn/hash_ring_manager.py index 95ea7255b8a..1d1886b53a6 100644 --- a/neutron/common/ovn/hash_ring_manager.py +++ b/neutron/common/ovn/hash_ring_manager.py @@ -22,6 +22,7 @@ from tooz import hashring from neutron.common.ovn import constants from neutron.common.ovn import exceptions from neutron.db import ovn_hash_ring_db as db_hash_ring +from neutron import service from neutron_lib import context LOG = log.getLogger(__name__) @@ -32,7 +33,7 @@ class HashRingManager(object): def __init__(self, group_name): self._hash_ring = None self._last_time_loaded = None - self._cache_startup_timeout = True + self._check_hashring_startup = True self._group = group_name self.admin_ctx = context.get_admin_context() @@ -41,28 +42,26 @@ class HashRingManager(object): # NOTE(lucasagomes): Some events are processed at the service's # startup time and since many services may be started concurrently # we do not want to use a cached hash ring at that point. This - # method checks if the created_at and updated_at columns from the - # nodes in the ring from this host is equal, and if so it means - # that the service just started. + # method ensures that we start allowing the use of cached HashRings + # once the number of HashRing nodes >= the number of api workers. # If the startup timeout already expired, there's no reason to # keep reading from the DB. At this point this will always # return False - if not self._cache_startup_timeout: + if not self._check_hashring_startup: return False + api_workers = service._get_api_workers() nodes = db_hash_ring.get_active_nodes( self.admin_ctx, constants.HASH_RING_CACHE_TIMEOUT, self._group, from_host=True) - # created_at and updated_at differ in microseonds so we compare their - # difference is less than a second to be safe on slow machines - dont_cache = nodes and ( - nodes[0].updated_at - nodes[0].created_at < datetime.timedelta( - seconds=1)) - if not dont_cache: - self._cache_startup_timeout = False - return dont_cache + if len(nodes) >= api_workers: + LOG.debug("Allow caching, nodes %s>=%s", len(nodes), api_workers) + self._check_hashring_startup = False + return False + LOG.debug("Disallow caching, nodes %s<%s", len(nodes), api_workers) + return True def _load_hash_ring(self, refresh=False): cache_timeout = timeutils.utcnow() - datetime.timedelta( diff --git a/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py b/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py index ac94c04e6bf..bfbe364b030 100644 --- a/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py +++ b/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py @@ -58,6 +58,7 @@ from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker from neutron.services.qos.drivers.ovn import driver as qos_driver from neutron.services.segments import db as segment_service_db from neutron.services.trunk.drivers.ovn import trunk_driver +import neutron.wsgi LOG = log.getLogger(__name__) @@ -248,16 +249,22 @@ class OVNMechanismDriver(api.MechanismDriver): else: raise + @staticmethod + def should_post_fork_initialize(worker_class): + return worker_class in (neutron.wsgi.WorkerService, + worker.MaintenanceWorker) + def post_fork_initialize(self, resource, event, trigger, payload=None): - # NOTE(rtheis): This will initialize all workers (API, RPC, - # plugin service and OVN) with OVN IDL connections. + # Initialize API/Maintenance workers with OVN IDL connections + worker_class = ovn_utils.get_method_class(trigger) + if not self.should_post_fork_initialize(worker_class): + return + self._post_fork_event.clear() self._wait_for_pg_drop_event() self._ovn_client_inst = None - is_maintenance = (ovn_utils.get_method_class(trigger) == - worker.MaintenanceWorker) - if not is_maintenance: + if worker_class == neutron.wsgi.WorkerService: admin_context = n_context.get_admin_context() self.node_uuid = ovn_hash_ring_db.add_node(admin_context, self.hash_ring_group) @@ -284,7 +291,7 @@ class OVNMechanismDriver(api.MechanismDriver): # Now IDL connections can be safely used. self._post_fork_event.set() - if is_maintenance: + if worker_class == worker.MaintenanceWorker: # Call the synchronization task if its maintenance worker # This sync neutron DB to OVN-NB DB only in inconsistent states self.nb_synchronizer = ovn_db_sync.OvnNbSynchronizer( diff --git a/neutron/tests/functional/base.py b/neutron/tests/functional/base.py index 0f186482570..ab8e0404938 100644 --- a/neutron/tests/functional/base.py +++ b/neutron/tests/functional/base.py @@ -43,11 +43,13 @@ from neutron.db import models # noqa from neutron import manager from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker from neutron.plugins.ml2.drivers import type_geneve # noqa +from neutron import service # noqa from neutron.tests import base from neutron.tests.common import base as common_base from neutron.tests.common import helpers from neutron.tests.functional.resources import process from neutron.tests.unit.plugins.ml2 import test_plugin +import neutron.wsgi LOG = log.getLogger(__name__) @@ -173,6 +175,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase, ovn_conf.cfg.CONF.set_override('dns_servers', ['10.10.10.10'], group='ovn') + ovn_conf.cfg.CONF.set_override('api_workers', 1) self.addCleanup(exts.PluginAwareExtensionManager.clear_instance) super(TestOVNFunctionalBase, self).setUp() @@ -282,6 +285,8 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase, if self.maintenance_worker: trigger_cls.trigger.__self__.__class__ = worker.MaintenanceWorker cfg.CONF.set_override('neutron_sync_mode', 'off', 'ovn') + else: + trigger_cls.trigger.__self__.__class__ = neutron.wsgi.WorkerService self.addCleanup(self._collect_processes_logs) self.addCleanup(self.stop) diff --git a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py index 19fd2bd7588..53e05564286 100644 --- a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py +++ b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py @@ -15,6 +15,7 @@ from unittest import mock import fixtures as og_fixtures +from oslo_config import cfg from oslo_utils import uuidutils from neutron.common.ovn import constants as ovn_const @@ -200,12 +201,14 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase): self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN') def test_distributed_lock(self): + api_workers = 11 + cfg.CONF.set_override('api_workers', api_workers) row_event = DistributedLockTestEvent() self.mech_driver._nb_ovn.idl.notify_handler.watch_event(row_event) worker_list = [self.mech_driver._nb_ovn, ] # Create 10 fake workers - for _ in range(10): + for _ in range(api_workers - len(worker_list)): node_uuid = uuidutils.generate_uuid() db_hash_ring.add_node( self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid) @@ -252,7 +255,7 @@ class TestNBDbMonitorOverSsl(TestNBDbMonitor): return 'ssl' -class OvnIdlProbeInterval(base.TestOVNFunctionalBase): +class TestOvnIdlProbeInterval(base.TestOVNFunctionalBase): def setUp(self): # skip parent setUp, we don't need it, but we do need grandparent # pylint: disable=bad-super-call diff --git a/neutron/tests/unit/common/ovn/test_hash_ring_manager.py b/neutron/tests/unit/common/ovn/test_hash_ring_manager.py index 052cd64d6f1..f4b9b971418 100644 --- a/neutron/tests/unit/common/ovn/test_hash_ring_manager.py +++ b/neutron/tests/unit/common/ovn/test_hash_ring_manager.py @@ -14,7 +14,6 @@ # under the License. import datetime -import time from unittest import mock from neutron_lib import context @@ -24,6 +23,7 @@ from neutron.common.ovn import constants from neutron.common.ovn import exceptions from neutron.common.ovn import hash_ring_manager from neutron.db import ovn_hash_ring_db as db_hash_ring +from neutron import service from neutron.tests.unit import testlib_api HASH_RING_TEST_GROUP = 'test_group' @@ -110,24 +110,21 @@ class TestHashRingManager(testlib_api.SqlTestCaseLight): # The ring should re-balance and as it was before self._verify_hashes(hash_dict_before) - def test__wait_startup_before_caching(self): + @mock.patch.object(service, '_get_api_workers', return_value=2) + def test__wait_startup_before_caching(self, api_workers): db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-1') - db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2') - # Assert it will return True until created_at != updated_at + # Assert it will return True until until we equal api_workers self.assertTrue(self.hash_ring_manager._wait_startup_before_caching) - self.assertTrue(self.hash_ring_manager._cache_startup_timeout) + self.assertTrue(self.hash_ring_manager._check_hashring_startup) - # Touch the nodes (== update the updated_at column) - time.sleep(1) - db_hash_ring.touch_nodes_from_host( - self.admin_ctx, HASH_RING_TEST_GROUP) + db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2') # Assert it's now False. Waiting is not needed anymore self.assertFalse(self.hash_ring_manager._wait_startup_before_caching) - self.assertFalse(self.hash_ring_manager._cache_startup_timeout) + self.assertFalse(self.hash_ring_manager._check_hashring_startup) - # Now assert that since the _cache_startup_timeout has been + # Now assert that since the _check_hashring_startup has been # flipped, we no longer will read from the database with mock.patch.object(hash_ring_manager.db_hash_ring, 'get_active_nodes') as get_nodes_mock: