Rely on worker count for HashRing caching
The current code looks at a hash ring node's created_at/updated_at
fields and tries to determine whether the node has been updated
based on whether updated_at - created_at > 1 second (due to the
method that initially fills them being different by microseconds).
Unfortunately, due to the notify() method being called which calls
the hash ring node's touch_node(), a node can be updated in under
a second, meaning we will prevent caching for much longer than
we intend.
When using sqlite in-memory db, this continually re-creating the
Hash Ring objects for every event that is processed is exposing an
issue where rows that should be in the db just *aren't*.
This patch instead limits the hash ring nodes to api workers and
prevents caching only until the number of nodes == number of api
workers on the host. The switch from spawning hash ring nodes
where !is_maintenance to is_api_worker is primarily because it
seems to be difficult to get a list of *all* workers from which to
subtract the maintenance worker so that _wait_startup_before_caching
can wait for that specific number of workers. In practice, this
means that RpcWorker and ServiceWorker workers would not process
HashRing events.
A note on bug 1903008: While this change will greatly reduce the
likelihood of this issue taking place, we still have some work to
do in order to fully understand why it rubs the database backend
in the wrong way. Thus, we will make this change 'related to'
instead of closing the bug.
Related-Bug: #1894117
Related-Bug: #1903008
Change-Id: Ia198d45f49bddda549a0e70a3374b8339f88887b
(cherry picked from commit c4007b0833
)
This commit is contained in:
parent
890018217e
commit
1f30f2dfff
|
@ -22,6 +22,7 @@ from tooz import hashring
|
|||
from neutron.common.ovn import constants
|
||||
from neutron.common.ovn import exceptions
|
||||
from neutron.db import ovn_hash_ring_db as db_hash_ring
|
||||
from neutron import service
|
||||
from neutron_lib import context
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -32,7 +33,7 @@ class HashRingManager(object):
|
|||
def __init__(self, group_name):
|
||||
self._hash_ring = None
|
||||
self._last_time_loaded = None
|
||||
self._cache_startup_timeout = True
|
||||
self._check_hashring_startup = True
|
||||
self._group = group_name
|
||||
self.admin_ctx = context.get_admin_context()
|
||||
|
||||
|
@ -41,28 +42,26 @@ class HashRingManager(object):
|
|||
# NOTE(lucasagomes): Some events are processed at the service's
|
||||
# startup time and since many services may be started concurrently
|
||||
# we do not want to use a cached hash ring at that point. This
|
||||
# method checks if the created_at and updated_at columns from the
|
||||
# nodes in the ring from this host is equal, and if so it means
|
||||
# that the service just started.
|
||||
# method ensures that we start allowing the use of cached HashRings
|
||||
# once the number of HashRing nodes >= the number of api workers.
|
||||
|
||||
# If the startup timeout already expired, there's no reason to
|
||||
# keep reading from the DB. At this point this will always
|
||||
# return False
|
||||
if not self._cache_startup_timeout:
|
||||
if not self._check_hashring_startup:
|
||||
return False
|
||||
|
||||
api_workers = service._get_api_workers()
|
||||
nodes = db_hash_ring.get_active_nodes(
|
||||
self.admin_ctx,
|
||||
constants.HASH_RING_CACHE_TIMEOUT, self._group, from_host=True)
|
||||
# created_at and updated_at differ in microseonds so we compare their
|
||||
# difference is less than a second to be safe on slow machines
|
||||
dont_cache = nodes and (
|
||||
nodes[0].updated_at - nodes[0].created_at < datetime.timedelta(
|
||||
seconds=1))
|
||||
if not dont_cache:
|
||||
self._cache_startup_timeout = False
|
||||
|
||||
return dont_cache
|
||||
if len(nodes) >= api_workers:
|
||||
LOG.debug("Allow caching, nodes %s>=%s", len(nodes), api_workers)
|
||||
self._check_hashring_startup = False
|
||||
return False
|
||||
LOG.debug("Disallow caching, nodes %s<%s", len(nodes), api_workers)
|
||||
return True
|
||||
|
||||
def _load_hash_ring(self, refresh=False):
|
||||
cache_timeout = timeutils.utcnow() - datetime.timedelta(
|
||||
|
|
|
@ -58,6 +58,7 @@ from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
|
|||
from neutron.services.qos.drivers.ovn import driver as qos_driver
|
||||
from neutron.services.segments import db as segment_service_db
|
||||
from neutron.services.trunk.drivers.ovn import trunk_driver
|
||||
import neutron.wsgi
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -248,16 +249,22 @@ class OVNMechanismDriver(api.MechanismDriver):
|
|||
else:
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def should_post_fork_initialize(worker_class):
|
||||
return worker_class in (neutron.wsgi.WorkerService,
|
||||
worker.MaintenanceWorker)
|
||||
|
||||
def post_fork_initialize(self, resource, event, trigger, payload=None):
|
||||
# NOTE(rtheis): This will initialize all workers (API, RPC,
|
||||
# plugin service and OVN) with OVN IDL connections.
|
||||
# Initialize API/Maintenance workers with OVN IDL connections
|
||||
worker_class = ovn_utils.get_method_class(trigger)
|
||||
if not self.should_post_fork_initialize(worker_class):
|
||||
return
|
||||
|
||||
self._post_fork_event.clear()
|
||||
self._wait_for_pg_drop_event()
|
||||
self._ovn_client_inst = None
|
||||
|
||||
is_maintenance = (ovn_utils.get_method_class(trigger) ==
|
||||
worker.MaintenanceWorker)
|
||||
if not is_maintenance:
|
||||
if worker_class == neutron.wsgi.WorkerService:
|
||||
admin_context = n_context.get_admin_context()
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
|
||||
self.hash_ring_group)
|
||||
|
@ -284,7 +291,7 @@ class OVNMechanismDriver(api.MechanismDriver):
|
|||
# Now IDL connections can be safely used.
|
||||
self._post_fork_event.set()
|
||||
|
||||
if is_maintenance:
|
||||
if worker_class == worker.MaintenanceWorker:
|
||||
# Call the synchronization task if its maintenance worker
|
||||
# This sync neutron DB to OVN-NB DB only in inconsistent states
|
||||
self.nb_synchronizer = ovn_db_sync.OvnNbSynchronizer(
|
||||
|
|
|
@ -43,11 +43,13 @@ from neutron.db import models # noqa
|
|||
from neutron import manager
|
||||
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
|
||||
from neutron.plugins.ml2.drivers import type_geneve # noqa
|
||||
from neutron import service # noqa
|
||||
from neutron.tests import base
|
||||
from neutron.tests.common import base as common_base
|
||||
from neutron.tests.common import helpers
|
||||
from neutron.tests.functional.resources import process
|
||||
from neutron.tests.unit.plugins.ml2 import test_plugin
|
||||
import neutron.wsgi
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
@ -173,6 +175,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
|
|||
ovn_conf.cfg.CONF.set_override('dns_servers',
|
||||
['10.10.10.10'],
|
||||
group='ovn')
|
||||
ovn_conf.cfg.CONF.set_override('api_workers', 1)
|
||||
|
||||
self.addCleanup(exts.PluginAwareExtensionManager.clear_instance)
|
||||
super(TestOVNFunctionalBase, self).setUp()
|
||||
|
@ -282,6 +285,8 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
|
|||
if self.maintenance_worker:
|
||||
trigger_cls.trigger.__self__.__class__ = worker.MaintenanceWorker
|
||||
cfg.CONF.set_override('neutron_sync_mode', 'off', 'ovn')
|
||||
else:
|
||||
trigger_cls.trigger.__self__.__class__ = neutron.wsgi.WorkerService
|
||||
|
||||
self.addCleanup(self._collect_processes_logs)
|
||||
self.addCleanup(self.stop)
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
from unittest import mock
|
||||
|
||||
import fixtures as og_fixtures
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.common.ovn import constants as ovn_const
|
||||
|
@ -200,12 +201,14 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
|||
self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN')
|
||||
|
||||
def test_distributed_lock(self):
|
||||
api_workers = 11
|
||||
cfg.CONF.set_override('api_workers', api_workers)
|
||||
row_event = DistributedLockTestEvent()
|
||||
self.mech_driver._nb_ovn.idl.notify_handler.watch_event(row_event)
|
||||
worker_list = [self.mech_driver._nb_ovn, ]
|
||||
|
||||
# Create 10 fake workers
|
||||
for _ in range(10):
|
||||
for _ in range(api_workers - len(worker_list)):
|
||||
node_uuid = uuidutils.generate_uuid()
|
||||
db_hash_ring.add_node(
|
||||
self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid)
|
||||
|
@ -252,7 +255,7 @@ class TestNBDbMonitorOverSsl(TestNBDbMonitor):
|
|||
return 'ssl'
|
||||
|
||||
|
||||
class OvnIdlProbeInterval(base.TestOVNFunctionalBase):
|
||||
class TestOvnIdlProbeInterval(base.TestOVNFunctionalBase):
|
||||
def setUp(self):
|
||||
# skip parent setUp, we don't need it, but we do need grandparent
|
||||
# pylint: disable=bad-super-call
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
# under the License.
|
||||
|
||||
import datetime
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
from neutron_lib import context
|
||||
|
@ -24,6 +23,7 @@ from neutron.common.ovn import constants
|
|||
from neutron.common.ovn import exceptions
|
||||
from neutron.common.ovn import hash_ring_manager
|
||||
from neutron.db import ovn_hash_ring_db as db_hash_ring
|
||||
from neutron import service
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
HASH_RING_TEST_GROUP = 'test_group'
|
||||
|
@ -110,24 +110,21 @@ class TestHashRingManager(testlib_api.SqlTestCaseLight):
|
|||
# The ring should re-balance and as it was before
|
||||
self._verify_hashes(hash_dict_before)
|
||||
|
||||
def test__wait_startup_before_caching(self):
|
||||
@mock.patch.object(service, '_get_api_workers', return_value=2)
|
||||
def test__wait_startup_before_caching(self, api_workers):
|
||||
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-1')
|
||||
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2')
|
||||
|
||||
# Assert it will return True until created_at != updated_at
|
||||
# Assert it will return True until until we equal api_workers
|
||||
self.assertTrue(self.hash_ring_manager._wait_startup_before_caching)
|
||||
self.assertTrue(self.hash_ring_manager._cache_startup_timeout)
|
||||
self.assertTrue(self.hash_ring_manager._check_hashring_startup)
|
||||
|
||||
# Touch the nodes (== update the updated_at column)
|
||||
time.sleep(1)
|
||||
db_hash_ring.touch_nodes_from_host(
|
||||
self.admin_ctx, HASH_RING_TEST_GROUP)
|
||||
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2')
|
||||
|
||||
# Assert it's now False. Waiting is not needed anymore
|
||||
self.assertFalse(self.hash_ring_manager._wait_startup_before_caching)
|
||||
self.assertFalse(self.hash_ring_manager._cache_startup_timeout)
|
||||
self.assertFalse(self.hash_ring_manager._check_hashring_startup)
|
||||
|
||||
# Now assert that since the _cache_startup_timeout has been
|
||||
# Now assert that since the _check_hashring_startup has been
|
||||
# flipped, we no longer will read from the database
|
||||
with mock.patch.object(hash_ring_manager.db_hash_ring,
|
||||
'get_active_nodes') as get_nodes_mock:
|
||||
|
|
Loading…
Reference in New Issue