Merge "Rely on worker count for HashRing caching" into stable/victoria
This commit is contained in:
commit
9b2ea43264
|
@ -22,6 +22,7 @@ from tooz import hashring
|
|||
from neutron.common.ovn import constants
|
||||
from neutron.common.ovn import exceptions
|
||||
from neutron.db import ovn_hash_ring_db as db_hash_ring
|
||||
from neutron import service
|
||||
from neutron_lib import context
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -32,7 +33,7 @@ class HashRingManager(object):
|
|||
def __init__(self, group_name):
|
||||
self._hash_ring = None
|
||||
self._last_time_loaded = None
|
||||
self._cache_startup_timeout = True
|
||||
self._check_hashring_startup = True
|
||||
self._group = group_name
|
||||
self.admin_ctx = context.get_admin_context()
|
||||
|
||||
|
@ -41,28 +42,26 @@ class HashRingManager(object):
|
|||
# NOTE(lucasagomes): Some events are processed at the service's
|
||||
# startup time and since many services may be started concurrently
|
||||
# we do not want to use a cached hash ring at that point. This
|
||||
# method checks if the created_at and updated_at columns from the
|
||||
# nodes in the ring from this host is equal, and if so it means
|
||||
# that the service just started.
|
||||
# method ensures that we start allowing the use of cached HashRings
|
||||
# once the number of HashRing nodes >= the number of api workers.
|
||||
|
||||
# If the startup timeout already expired, there's no reason to
|
||||
# keep reading from the DB. At this point this will always
|
||||
# return False
|
||||
if not self._cache_startup_timeout:
|
||||
if not self._check_hashring_startup:
|
||||
return False
|
||||
|
||||
api_workers = service._get_api_workers()
|
||||
nodes = db_hash_ring.get_active_nodes(
|
||||
self.admin_ctx,
|
||||
constants.HASH_RING_CACHE_TIMEOUT, self._group, from_host=True)
|
||||
# created_at and updated_at differ in microseonds so we compare their
|
||||
# difference is less than a second to be safe on slow machines
|
||||
dont_cache = nodes and (
|
||||
nodes[0].updated_at - nodes[0].created_at < datetime.timedelta(
|
||||
seconds=1))
|
||||
if not dont_cache:
|
||||
self._cache_startup_timeout = False
|
||||
|
||||
return dont_cache
|
||||
if len(nodes) >= api_workers:
|
||||
LOG.debug("Allow caching, nodes %s>=%s", len(nodes), api_workers)
|
||||
self._check_hashring_startup = False
|
||||
return False
|
||||
LOG.debug("Disallow caching, nodes %s<%s", len(nodes), api_workers)
|
||||
return True
|
||||
|
||||
def _load_hash_ring(self, refresh=False):
|
||||
cache_timeout = timeutils.utcnow() - datetime.timedelta(
|
||||
|
|
|
@ -58,6 +58,7 @@ from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
|
|||
from neutron.services.qos.drivers.ovn import driver as qos_driver
|
||||
from neutron.services.segments import db as segment_service_db
|
||||
from neutron.services.trunk.drivers.ovn import trunk_driver
|
||||
import neutron.wsgi
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -248,16 +249,22 @@ class OVNMechanismDriver(api.MechanismDriver):
|
|||
else:
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def should_post_fork_initialize(worker_class):
|
||||
return worker_class in (neutron.wsgi.WorkerService,
|
||||
worker.MaintenanceWorker)
|
||||
|
||||
def post_fork_initialize(self, resource, event, trigger, payload=None):
|
||||
# NOTE(rtheis): This will initialize all workers (API, RPC,
|
||||
# plugin service and OVN) with OVN IDL connections.
|
||||
# Initialize API/Maintenance workers with OVN IDL connections
|
||||
worker_class = ovn_utils.get_method_class(trigger)
|
||||
if not self.should_post_fork_initialize(worker_class):
|
||||
return
|
||||
|
||||
self._post_fork_event.clear()
|
||||
self._wait_for_pg_drop_event()
|
||||
self._ovn_client_inst = None
|
||||
|
||||
is_maintenance = (ovn_utils.get_method_class(trigger) ==
|
||||
worker.MaintenanceWorker)
|
||||
if not is_maintenance:
|
||||
if worker_class == neutron.wsgi.WorkerService:
|
||||
admin_context = n_context.get_admin_context()
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
|
||||
self.hash_ring_group)
|
||||
|
@ -284,7 +291,7 @@ class OVNMechanismDriver(api.MechanismDriver):
|
|||
# Now IDL connections can be safely used.
|
||||
self._post_fork_event.set()
|
||||
|
||||
if is_maintenance:
|
||||
if worker_class == worker.MaintenanceWorker:
|
||||
# Call the synchronization task if its maintenance worker
|
||||
# This sync neutron DB to OVN-NB DB only in inconsistent states
|
||||
self.nb_synchronizer = ovn_db_sync.OvnNbSynchronizer(
|
||||
|
|
|
@ -43,11 +43,13 @@ from neutron.db import models # noqa
|
|||
from neutron import manager
|
||||
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
|
||||
from neutron.plugins.ml2.drivers import type_geneve # noqa
|
||||
from neutron import service # noqa
|
||||
from neutron.tests import base
|
||||
from neutron.tests.common import base as common_base
|
||||
from neutron.tests.common import helpers
|
||||
from neutron.tests.functional.resources import process
|
||||
from neutron.tests.unit.plugins.ml2 import test_plugin
|
||||
import neutron.wsgi
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
@ -173,6 +175,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
|
|||
ovn_conf.cfg.CONF.set_override('dns_servers',
|
||||
['10.10.10.10'],
|
||||
group='ovn')
|
||||
ovn_conf.cfg.CONF.set_override('api_workers', 1)
|
||||
|
||||
self.addCleanup(exts.PluginAwareExtensionManager.clear_instance)
|
||||
super(TestOVNFunctionalBase, self).setUp()
|
||||
|
@ -282,6 +285,8 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
|
|||
if self.maintenance_worker:
|
||||
trigger_cls.trigger.__self__.__class__ = worker.MaintenanceWorker
|
||||
cfg.CONF.set_override('neutron_sync_mode', 'off', 'ovn')
|
||||
else:
|
||||
trigger_cls.trigger.__self__.__class__ = neutron.wsgi.WorkerService
|
||||
|
||||
self.addCleanup(self._collect_processes_logs)
|
||||
self.addCleanup(self.stop)
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
from unittest import mock
|
||||
|
||||
import fixtures as og_fixtures
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.common.ovn import constants as ovn_const
|
||||
|
@ -200,12 +201,14 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
|||
self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN')
|
||||
|
||||
def test_distributed_lock(self):
|
||||
api_workers = 11
|
||||
cfg.CONF.set_override('api_workers', api_workers)
|
||||
row_event = DistributedLockTestEvent()
|
||||
self.mech_driver._nb_ovn.idl.notify_handler.watch_event(row_event)
|
||||
worker_list = [self.mech_driver._nb_ovn, ]
|
||||
|
||||
# Create 10 fake workers
|
||||
for _ in range(10):
|
||||
for _ in range(api_workers - len(worker_list)):
|
||||
node_uuid = uuidutils.generate_uuid()
|
||||
db_hash_ring.add_node(
|
||||
self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid)
|
||||
|
@ -252,7 +255,7 @@ class TestNBDbMonitorOverSsl(TestNBDbMonitor):
|
|||
return 'ssl'
|
||||
|
||||
|
||||
class OvnIdlProbeInterval(base.TestOVNFunctionalBase):
|
||||
class TestOvnIdlProbeInterval(base.TestOVNFunctionalBase):
|
||||
def setUp(self):
|
||||
# skip parent setUp, we don't need it, but we do need grandparent
|
||||
# pylint: disable=bad-super-call
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
# under the License.
|
||||
|
||||
import datetime
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
from neutron_lib import context
|
||||
|
@ -24,6 +23,7 @@ from neutron.common.ovn import constants
|
|||
from neutron.common.ovn import exceptions
|
||||
from neutron.common.ovn import hash_ring_manager
|
||||
from neutron.db import ovn_hash_ring_db as db_hash_ring
|
||||
from neutron import service
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
HASH_RING_TEST_GROUP = 'test_group'
|
||||
|
@ -110,24 +110,21 @@ class TestHashRingManager(testlib_api.SqlTestCaseLight):
|
|||
# The ring should re-balance and as it was before
|
||||
self._verify_hashes(hash_dict_before)
|
||||
|
||||
def test__wait_startup_before_caching(self):
|
||||
@mock.patch.object(service, '_get_api_workers', return_value=2)
|
||||
def test__wait_startup_before_caching(self, api_workers):
|
||||
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-1')
|
||||
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2')
|
||||
|
||||
# Assert it will return True until created_at != updated_at
|
||||
# Assert it will return True until until we equal api_workers
|
||||
self.assertTrue(self.hash_ring_manager._wait_startup_before_caching)
|
||||
self.assertTrue(self.hash_ring_manager._cache_startup_timeout)
|
||||
self.assertTrue(self.hash_ring_manager._check_hashring_startup)
|
||||
|
||||
# Touch the nodes (== update the updated_at column)
|
||||
time.sleep(1)
|
||||
db_hash_ring.touch_nodes_from_host(
|
||||
self.admin_ctx, HASH_RING_TEST_GROUP)
|
||||
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2')
|
||||
|
||||
# Assert it's now False. Waiting is not needed anymore
|
||||
self.assertFalse(self.hash_ring_manager._wait_startup_before_caching)
|
||||
self.assertFalse(self.hash_ring_manager._cache_startup_timeout)
|
||||
self.assertFalse(self.hash_ring_manager._check_hashring_startup)
|
||||
|
||||
# Now assert that since the _cache_startup_timeout has been
|
||||
# Now assert that since the _check_hashring_startup has been
|
||||
# flipped, we no longer will read from the database
|
||||
with mock.patch.object(hash_ring_manager.db_hash_ring,
|
||||
'get_active_nodes') as get_nodes_mock:
|
||||
|
|
Loading…
Reference in New Issue