Merge "Rely on worker count for HashRing caching" into stable/ussuri

This commit is contained in:
Zuul 2021-05-06 22:39:45 +00:00 committed by Gerrit Code Review
commit 3322cd3294
5 changed files with 43 additions and 32 deletions

View File

@ -23,6 +23,7 @@ from tooz import hashring
from neutron.common.ovn import constants from neutron.common.ovn import constants
from neutron.common.ovn import exceptions from neutron.common.ovn import exceptions
from neutron.db import ovn_hash_ring_db as db_hash_ring from neutron.db import ovn_hash_ring_db as db_hash_ring
from neutron import service
from neutron_lib import context from neutron_lib import context
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -33,7 +34,7 @@ class HashRingManager(object):
def __init__(self, group_name): def __init__(self, group_name):
self._hash_ring = None self._hash_ring = None
self._last_time_loaded = None self._last_time_loaded = None
self._cache_startup_timeout = True self._check_hashring_startup = True
self._group = group_name self._group = group_name
self.admin_ctx = context.get_admin_context() self.admin_ctx = context.get_admin_context()
@ -42,28 +43,26 @@ class HashRingManager(object):
# NOTE(lucasagomes): Some events are processed at the service's # NOTE(lucasagomes): Some events are processed at the service's
# startup time and since many services may be started concurrently # startup time and since many services may be started concurrently
# we do not want to use a cached hash ring at that point. This # we do not want to use a cached hash ring at that point. This
# method checks if the created_at and updated_at columns from the # method ensures that we start allowing the use of cached HashRings
# nodes in the ring from this host is equal, and if so it means # once the number of HashRing nodes >= the number of api workers.
# that the service just started.
# If the startup timeout already expired, there's no reason to # If the startup timeout already expired, there's no reason to
# keep reading from the DB. At this point this will always # keep reading from the DB. At this point this will always
# return False # return False
if not self._cache_startup_timeout: if not self._check_hashring_startup:
return False return False
api_workers = service._get_api_workers()
nodes = db_hash_ring.get_active_nodes( nodes = db_hash_ring.get_active_nodes(
self.admin_ctx, self.admin_ctx,
constants.HASH_RING_CACHE_TIMEOUT, self._group, from_host=True) constants.HASH_RING_CACHE_TIMEOUT, self._group, from_host=True)
# created_at and updated_at differ in microseonds so we compare their
# difference is less than a second to be safe on slow machines
dont_cache = nodes and (
nodes[0].updated_at - nodes[0].created_at < datetime.timedelta(
seconds=1))
if not dont_cache:
self._cache_startup_timeout = False
return dont_cache if len(nodes) >= api_workers:
LOG.debug("Allow caching, nodes %s>=%s", len(nodes), api_workers)
self._check_hashring_startup = False
return False
LOG.debug("Disallow caching, nodes %s<%s", len(nodes), api_workers)
return True
def _load_hash_ring(self, refresh=False): def _load_hash_ring(self, refresh=False):
cache_timeout = timeutils.utcnow() - datetime.timedelta( cache_timeout = timeutils.utcnow() - datetime.timedelta(

View File

@ -57,6 +57,7 @@ from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
from neutron.services.qos.drivers.ovn import driver as qos_driver from neutron.services.qos.drivers.ovn import driver as qos_driver
from neutron.services.segments import db as segment_service_db from neutron.services.segments import db as segment_service_db
from neutron.services.trunk.drivers.ovn import trunk_driver from neutron.services.trunk.drivers.ovn import trunk_driver
import neutron.wsgi
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -220,15 +221,21 @@ class OVNMechanismDriver(api.MechanismDriver):
atexit.register(self._clean_hash_ring) atexit.register(self._clean_hash_ring)
signal.signal(signal.SIGTERM, self._clean_hash_ring) signal.signal(signal.SIGTERM, self._clean_hash_ring)
@staticmethod
def should_post_fork_initialize(worker_class):
return worker_class in (neutron.wsgi.WorkerService,
worker.MaintenanceWorker)
def post_fork_initialize(self, resource, event, trigger, payload=None): def post_fork_initialize(self, resource, event, trigger, payload=None):
# NOTE(rtheis): This will initialize all workers (API, RPC, # Initialize API/Maintenance workers with OVN IDL connections
# plugin service and OVN) with OVN IDL connections. worker_class = ovn_utils.get_method_class(trigger)
if not self.should_post_fork_initialize(worker_class):
return
self._post_fork_event.clear() self._post_fork_event.clear()
self._ovn_client_inst = None self._ovn_client_inst = None
is_maintenance = (ovn_utils.get_method_class(trigger) == if worker_class == neutron.wsgi.WorkerService:
worker.MaintenanceWorker)
if not is_maintenance:
admin_context = n_context.get_admin_context() admin_context = n_context.get_admin_context()
self.node_uuid = ovn_hash_ring_db.add_node(admin_context, self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
self.hash_ring_group) self.hash_ring_group)
@ -255,7 +262,7 @@ class OVNMechanismDriver(api.MechanismDriver):
# Now IDL connections can be safely used. # Now IDL connections can be safely used.
self._post_fork_event.set() self._post_fork_event.set()
if is_maintenance: if worker_class == worker.MaintenanceWorker:
# Call the synchronization task if its maintenance worker # Call the synchronization task if its maintenance worker
# This sync neutron DB to OVN-NB DB only in inconsistent states # This sync neutron DB to OVN-NB DB only in inconsistent states
self.nb_synchronizer = ovn_db_sync.OvnNbSynchronizer( self.nb_synchronizer = ovn_db_sync.OvnNbSynchronizer(

View File

@ -42,11 +42,13 @@ from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
from neutron.db import models # noqa from neutron.db import models # noqa
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
from neutron.plugins.ml2.drivers import type_geneve # noqa from neutron.plugins.ml2.drivers import type_geneve # noqa
from neutron import service # noqa
from neutron.tests import base from neutron.tests import base
from neutron.tests.common import base as common_base from neutron.tests.common import base as common_base
from neutron.tests.common import helpers from neutron.tests.common import helpers
from neutron.tests.functional.resources import process from neutron.tests.functional.resources import process
from neutron.tests.unit.plugins.ml2 import test_plugin from neutron.tests.unit.plugins.ml2 import test_plugin
import neutron.wsgi
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -172,6 +174,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
ovn_conf.cfg.CONF.set_override('dns_servers', ovn_conf.cfg.CONF.set_override('dns_servers',
['10.10.10.10'], ['10.10.10.10'],
group='ovn') group='ovn')
ovn_conf.cfg.CONF.set_override('api_workers', 1)
self.addCleanup(exts.PluginAwareExtensionManager.clear_instance) self.addCleanup(exts.PluginAwareExtensionManager.clear_instance)
super(TestOVNFunctionalBase, self).setUp() super(TestOVNFunctionalBase, self).setUp()
@ -277,6 +280,8 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
if self.maintenance_worker: if self.maintenance_worker:
trigger_cls.trigger.__self__.__class__ = worker.MaintenanceWorker trigger_cls.trigger.__self__.__class__ = worker.MaintenanceWorker
cfg.CONF.set_override('neutron_sync_mode', 'off', 'ovn') cfg.CONF.set_override('neutron_sync_mode', 'off', 'ovn')
else:
trigger_cls.trigger.__self__.__class__ = neutron.wsgi.WorkerService
self.addCleanup(self._collect_processes_logs) self.addCleanup(self._collect_processes_logs)
self.addCleanup(self.stop) self.addCleanup(self.stop)

View File

@ -15,6 +15,7 @@
import mock import mock
import fixtures as og_fixtures import fixtures as og_fixtures
from oslo_config import cfg
from oslo_utils import uuidutils from oslo_utils import uuidutils
from neutron.common.ovn import constants as ovn_const from neutron.common.ovn import constants as ovn_const
@ -200,12 +201,14 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN') self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN')
def test_distributed_lock(self): def test_distributed_lock(self):
api_workers = 11
cfg.CONF.set_override('api_workers', api_workers)
row_event = DistributedLockTestEvent() row_event = DistributedLockTestEvent()
self.mech_driver._nb_ovn.idl.notify_handler.watch_event(row_event) self.mech_driver._nb_ovn.idl.notify_handler.watch_event(row_event)
worker_list = [self.mech_driver._nb_ovn, ] worker_list = [self.mech_driver._nb_ovn, ]
# Create 10 fake workers # Create 10 fake workers
for _ in range(10): for _ in range(api_workers - len(worker_list)):
node_uuid = uuidutils.generate_uuid() node_uuid = uuidutils.generate_uuid()
db_hash_ring.add_node( db_hash_ring.add_node(
self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid) self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid)
@ -252,7 +255,7 @@ class TestNBDbMonitorOverSsl(TestNBDbMonitor):
return 'ssl' return 'ssl'
class OvnIdlProbeInterval(base.TestOVNFunctionalBase): class TestOvnIdlProbeInterval(base.TestOVNFunctionalBase):
def setUp(self): def setUp(self):
# skip parent setUp, we don't need it, but we do need grandparent # skip parent setUp, we don't need it, but we do need grandparent
# pylint: disable=bad-super-call # pylint: disable=bad-super-call

View File

@ -14,7 +14,6 @@
# under the License. # under the License.
import datetime import datetime
import time
import mock import mock
from neutron_lib import context from neutron_lib import context
@ -24,6 +23,7 @@ from neutron.common.ovn import constants
from neutron.common.ovn import exceptions from neutron.common.ovn import exceptions
from neutron.common.ovn import hash_ring_manager from neutron.common.ovn import hash_ring_manager
from neutron.db import ovn_hash_ring_db as db_hash_ring from neutron.db import ovn_hash_ring_db as db_hash_ring
from neutron import service
from neutron.tests.unit import testlib_api from neutron.tests.unit import testlib_api
HASH_RING_TEST_GROUP = 'test_group' HASH_RING_TEST_GROUP = 'test_group'
@ -110,24 +110,21 @@ class TestHashRingManager(testlib_api.SqlTestCaseLight):
# The ring should re-balance and as it was before # The ring should re-balance and as it was before
self._verify_hashes(hash_dict_before) self._verify_hashes(hash_dict_before)
def test__wait_startup_before_caching(self): @mock.patch.object(service, '_get_api_workers', return_value=2)
def test__wait_startup_before_caching(self, api_workers):
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-1') db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-1')
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2')
# Assert it will return True until created_at != updated_at # Assert it will return True until until we equal api_workers
self.assertTrue(self.hash_ring_manager._wait_startup_before_caching) self.assertTrue(self.hash_ring_manager._wait_startup_before_caching)
self.assertTrue(self.hash_ring_manager._cache_startup_timeout) self.assertTrue(self.hash_ring_manager._check_hashring_startup)
# Touch the nodes (== update the updated_at column) db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2')
time.sleep(1)
db_hash_ring.touch_nodes_from_host(
self.admin_ctx, HASH_RING_TEST_GROUP)
# Assert it's now False. Waiting is not needed anymore # Assert it's now False. Waiting is not needed anymore
self.assertFalse(self.hash_ring_manager._wait_startup_before_caching) self.assertFalse(self.hash_ring_manager._wait_startup_before_caching)
self.assertFalse(self.hash_ring_manager._cache_startup_timeout) self.assertFalse(self.hash_ring_manager._check_hashring_startup)
# Now assert that since the _cache_startup_timeout has been # Now assert that since the _check_hashring_startup has been
# flipped, we no longer will read from the database # flipped, we no longer will read from the database
with mock.patch.object(hash_ring_manager.db_hash_ring, with mock.patch.object(hash_ring_manager.db_hash_ring,
'get_active_nodes') as get_nodes_mock: 'get_active_nodes') as get_nodes_mock: