From 1f30f2dfff722ea65c811b4b99243fae51a2d688 Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Mon, 7 Dec 2020 20:46:53 +0000 Subject: [PATCH] Rely on worker count for HashRing caching The current code looks at a hash ring node's created_at/updated_at fields and tries to determine whether the node has been updated based on whether updated_at - created_at > 1 second (due to the method that initially fills them being different by microseconds). Unfortunately, due to the notify() method being called which calls the hash ring node's touch_node(), a node can be updated in under a second, meaning we will prevent caching for much longer than we intend. When using sqlite in-memory db, this continually re-creating the Hash Ring objects for every event that is processed is exposing an issue where rows that should be in the db just *aren't*. This patch instead limits the hash ring nodes to api workers and prevents caching only until the number of nodes == number of api workers on the host. The switch from spawning hash ring nodes where !is_maintenance to is_api_worker is primarily because it seems to be difficult to get a list of *all* workers from which to subtract the maintenance worker so that _wait_startup_before_caching can wait for that specific number of workers. In practice, this means that RpcWorker and ServiceWorker workers would not process HashRing events. A note on bug 1903008: While this change will greatly reduce the likelihood of this issue taking place, we still have some work to do in order to fully understand why it rubs the database backend in the wrong way. Thus, we will make this change 'related to' instead of closing the bug. Related-Bug: #1894117 Related-Bug: #1903008 Change-Id: Ia198d45f49bddda549a0e70a3374b8339f88887b (cherry picked from commit c4007b0833111a25d24f597161d39ee9ccd37189) --- neutron/common/ovn/hash_ring_manager.py | 25 +++++++++---------- .../drivers/ovn/mech_driver/mech_driver.py | 19 +++++++++----- neutron/tests/functional/base.py | 5 ++++ .../mech_driver/ovsdb/test_ovsdb_monitor.py | 7 ++++-- .../unit/common/ovn/test_hash_ring_manager.py | 19 ++++++-------- 5 files changed, 43 insertions(+), 32 deletions(-) diff --git a/neutron/common/ovn/hash_ring_manager.py b/neutron/common/ovn/hash_ring_manager.py index 95ea7255b8a..1d1886b53a6 100644 --- a/neutron/common/ovn/hash_ring_manager.py +++ b/neutron/common/ovn/hash_ring_manager.py @@ -22,6 +22,7 @@ from tooz import hashring from neutron.common.ovn import constants from neutron.common.ovn import exceptions from neutron.db import ovn_hash_ring_db as db_hash_ring +from neutron import service from neutron_lib import context LOG = log.getLogger(__name__) @@ -32,7 +33,7 @@ class HashRingManager(object): def __init__(self, group_name): self._hash_ring = None self._last_time_loaded = None - self._cache_startup_timeout = True + self._check_hashring_startup = True self._group = group_name self.admin_ctx = context.get_admin_context() @@ -41,28 +42,26 @@ class HashRingManager(object): # NOTE(lucasagomes): Some events are processed at the service's # startup time and since many services may be started concurrently # we do not want to use a cached hash ring at that point. This - # method checks if the created_at and updated_at columns from the - # nodes in the ring from this host is equal, and if so it means - # that the service just started. + # method ensures that we start allowing the use of cached HashRings + # once the number of HashRing nodes >= the number of api workers. # If the startup timeout already expired, there's no reason to # keep reading from the DB. At this point this will always # return False - if not self._cache_startup_timeout: + if not self._check_hashring_startup: return False + api_workers = service._get_api_workers() nodes = db_hash_ring.get_active_nodes( self.admin_ctx, constants.HASH_RING_CACHE_TIMEOUT, self._group, from_host=True) - # created_at and updated_at differ in microseonds so we compare their - # difference is less than a second to be safe on slow machines - dont_cache = nodes and ( - nodes[0].updated_at - nodes[0].created_at < datetime.timedelta( - seconds=1)) - if not dont_cache: - self._cache_startup_timeout = False - return dont_cache + if len(nodes) >= api_workers: + LOG.debug("Allow caching, nodes %s>=%s", len(nodes), api_workers) + self._check_hashring_startup = False + return False + LOG.debug("Disallow caching, nodes %s<%s", len(nodes), api_workers) + return True def _load_hash_ring(self, refresh=False): cache_timeout = timeutils.utcnow() - datetime.timedelta( diff --git a/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py b/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py index ac94c04e6bf..bfbe364b030 100644 --- a/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py +++ b/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py @@ -58,6 +58,7 @@ from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker from neutron.services.qos.drivers.ovn import driver as qos_driver from neutron.services.segments import db as segment_service_db from neutron.services.trunk.drivers.ovn import trunk_driver +import neutron.wsgi LOG = log.getLogger(__name__) @@ -248,16 +249,22 @@ class OVNMechanismDriver(api.MechanismDriver): else: raise + @staticmethod + def should_post_fork_initialize(worker_class): + return worker_class in (neutron.wsgi.WorkerService, + worker.MaintenanceWorker) + def post_fork_initialize(self, resource, event, trigger, payload=None): - # NOTE(rtheis): This will initialize all workers (API, RPC, - # plugin service and OVN) with OVN IDL connections. + # Initialize API/Maintenance workers with OVN IDL connections + worker_class = ovn_utils.get_method_class(trigger) + if not self.should_post_fork_initialize(worker_class): + return + self._post_fork_event.clear() self._wait_for_pg_drop_event() self._ovn_client_inst = None - is_maintenance = (ovn_utils.get_method_class(trigger) == - worker.MaintenanceWorker) - if not is_maintenance: + if worker_class == neutron.wsgi.WorkerService: admin_context = n_context.get_admin_context() self.node_uuid = ovn_hash_ring_db.add_node(admin_context, self.hash_ring_group) @@ -284,7 +291,7 @@ class OVNMechanismDriver(api.MechanismDriver): # Now IDL connections can be safely used. self._post_fork_event.set() - if is_maintenance: + if worker_class == worker.MaintenanceWorker: # Call the synchronization task if its maintenance worker # This sync neutron DB to OVN-NB DB only in inconsistent states self.nb_synchronizer = ovn_db_sync.OvnNbSynchronizer( diff --git a/neutron/tests/functional/base.py b/neutron/tests/functional/base.py index 0f186482570..ab8e0404938 100644 --- a/neutron/tests/functional/base.py +++ b/neutron/tests/functional/base.py @@ -43,11 +43,13 @@ from neutron.db import models # noqa from neutron import manager from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker from neutron.plugins.ml2.drivers import type_geneve # noqa +from neutron import service # noqa from neutron.tests import base from neutron.tests.common import base as common_base from neutron.tests.common import helpers from neutron.tests.functional.resources import process from neutron.tests.unit.plugins.ml2 import test_plugin +import neutron.wsgi LOG = log.getLogger(__name__) @@ -173,6 +175,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase, ovn_conf.cfg.CONF.set_override('dns_servers', ['10.10.10.10'], group='ovn') + ovn_conf.cfg.CONF.set_override('api_workers', 1) self.addCleanup(exts.PluginAwareExtensionManager.clear_instance) super(TestOVNFunctionalBase, self).setUp() @@ -282,6 +285,8 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase, if self.maintenance_worker: trigger_cls.trigger.__self__.__class__ = worker.MaintenanceWorker cfg.CONF.set_override('neutron_sync_mode', 'off', 'ovn') + else: + trigger_cls.trigger.__self__.__class__ = neutron.wsgi.WorkerService self.addCleanup(self._collect_processes_logs) self.addCleanup(self.stop) diff --git a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py index 19fd2bd7588..53e05564286 100644 --- a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py +++ b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py @@ -15,6 +15,7 @@ from unittest import mock import fixtures as og_fixtures +from oslo_config import cfg from oslo_utils import uuidutils from neutron.common.ovn import constants as ovn_const @@ -200,12 +201,14 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase): self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN') def test_distributed_lock(self): + api_workers = 11 + cfg.CONF.set_override('api_workers', api_workers) row_event = DistributedLockTestEvent() self.mech_driver._nb_ovn.idl.notify_handler.watch_event(row_event) worker_list = [self.mech_driver._nb_ovn, ] # Create 10 fake workers - for _ in range(10): + for _ in range(api_workers - len(worker_list)): node_uuid = uuidutils.generate_uuid() db_hash_ring.add_node( self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid) @@ -252,7 +255,7 @@ class TestNBDbMonitorOverSsl(TestNBDbMonitor): return 'ssl' -class OvnIdlProbeInterval(base.TestOVNFunctionalBase): +class TestOvnIdlProbeInterval(base.TestOVNFunctionalBase): def setUp(self): # skip parent setUp, we don't need it, but we do need grandparent # pylint: disable=bad-super-call diff --git a/neutron/tests/unit/common/ovn/test_hash_ring_manager.py b/neutron/tests/unit/common/ovn/test_hash_ring_manager.py index 052cd64d6f1..f4b9b971418 100644 --- a/neutron/tests/unit/common/ovn/test_hash_ring_manager.py +++ b/neutron/tests/unit/common/ovn/test_hash_ring_manager.py @@ -14,7 +14,6 @@ # under the License. import datetime -import time from unittest import mock from neutron_lib import context @@ -24,6 +23,7 @@ from neutron.common.ovn import constants from neutron.common.ovn import exceptions from neutron.common.ovn import hash_ring_manager from neutron.db import ovn_hash_ring_db as db_hash_ring +from neutron import service from neutron.tests.unit import testlib_api HASH_RING_TEST_GROUP = 'test_group' @@ -110,24 +110,21 @@ class TestHashRingManager(testlib_api.SqlTestCaseLight): # The ring should re-balance and as it was before self._verify_hashes(hash_dict_before) - def test__wait_startup_before_caching(self): + @mock.patch.object(service, '_get_api_workers', return_value=2) + def test__wait_startup_before_caching(self, api_workers): db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-1') - db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2') - # Assert it will return True until created_at != updated_at + # Assert it will return True until until we equal api_workers self.assertTrue(self.hash_ring_manager._wait_startup_before_caching) - self.assertTrue(self.hash_ring_manager._cache_startup_timeout) + self.assertTrue(self.hash_ring_manager._check_hashring_startup) - # Touch the nodes (== update the updated_at column) - time.sleep(1) - db_hash_ring.touch_nodes_from_host( - self.admin_ctx, HASH_RING_TEST_GROUP) + db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2') # Assert it's now False. Waiting is not needed anymore self.assertFalse(self.hash_ring_manager._wait_startup_before_caching) - self.assertFalse(self.hash_ring_manager._cache_startup_timeout) + self.assertFalse(self.hash_ring_manager._check_hashring_startup) - # Now assert that since the _cache_startup_timeout has been + # Now assert that since the _check_hashring_startup has been # flipped, we no longer will read from the database with mock.patch.object(hash_ring_manager.db_hash_ring, 'get_active_nodes') as get_nodes_mock: