Browse Source

Merge "Rely on worker count for HashRing caching" into stable/train

changes/39/792539/1
Zuul 3 weeks ago
committed by Gerrit Code Review
parent
commit
653dda5b78
5 changed files with 44 additions and 32 deletions
  1. +13
    -14
      networking_ovn/common/hash_ring_manager.py
  2. +13
    -6
      networking_ovn/ml2/mech_driver.py
  3. +5
    -0
      networking_ovn/tests/functional/base.py
  4. +5
    -2
      networking_ovn/tests/functional/test_ovsdb_monitor.py
  5. +8
    -10
      networking_ovn/tests/unit/common/test_hash_ring_manager.py

+ 13
- 14
networking_ovn/common/hash_ring_manager.py View File

@ -23,6 +23,7 @@ from tooz import hashring
from networking_ovn.common import constants
from networking_ovn.common import exceptions
from networking_ovn.db import hash_ring as db_hash_ring
from neutron import service
LOG = log.getLogger(__name__)
@ -32,7 +33,7 @@ class HashRingManager(object):
def __init__(self, group_name):
self._hash_ring = None
self._last_time_loaded = None
self._cache_startup_timeout = True
self._check_hashring_startup = True
self._group = group_name
@property
@ -40,27 +41,25 @@ class HashRingManager(object):
# NOTE(lucasagomes): Some events are processed at the service's
# startup time and since many services may be started concurrently
# we do not want to use a cached hash ring at that point. This
# method checks if the created_at and updated_at columns from the
# nodes in the ring from this host is equal, and if so it means
# that the service just started.
# method ensures that we start allowing the use of cached HashRings
# once the number of HashRing nodes >= the number of api workers.
# If the startup timeout already expired, there's no reason to
# keep reading from the DB. At this point this will always
# return False
if not self._cache_startup_timeout:
if not self._check_hashring_startup:
return False
api_workers = service._get_api_workers()
nodes = db_hash_ring.get_active_nodes(
constants.HASH_RING_CACHE_TIMEOUT, self._group, from_host=True)
# created_at and updated_at differ in microseonds so we compare their
# difference is less than a second to be safe on slow machines
dont_cache = nodes and (
nodes[0].updated_at - nodes[0].created_at < datetime.timedelta(
seconds=1))
if not dont_cache:
self._cache_startup_timeout = False
return dont_cache
if len(nodes) >= api_workers:
LOG.debug("Allow caching, nodes %s>=%s", len(nodes), api_workers)
self._check_hashring_startup = False
return False
LOG.debug("Disallow caching, nodes %s<%s", len(nodes), api_workers)
return True
def _load_hash_ring(self, refresh=False):
cache_timeout = timeutils.utcnow() - datetime.timedelta(


+ 13
- 6
networking_ovn/ml2/mech_driver.py View File

@ -57,6 +57,7 @@ from networking_ovn.ml2 import trunk_driver
from networking_ovn import ovn_db_sync
from networking_ovn.ovsdb import impl_idl_ovn
from networking_ovn.ovsdb import worker
import neutron.wsgi
LOG = log.getLogger(__name__)
@ -219,15 +220,21 @@ class OVNMechanismDriver(api.MechanismDriver):
atexit.register(self._clean_hash_ring)
signal.signal(signal.SIGTERM, self._clean_hash_ring)
@staticmethod
def should_post_fork_initialize(worker_class):
return worker_class in (neutron.wsgi.WorkerService,
worker.MaintenanceWorker)
def post_fork_initialize(self, resource, event, trigger, payload=None):
# NOTE(rtheis): This will initialize all workers (API, RPC,
# plugin service and OVN) with OVN IDL connections.
# Initialize API/Maintenance workers with OVN IDL connections
worker_class = utils.get_method_class(trigger)
if not self.should_post_fork_initialize(worker_class):
return
self._post_fork_event.clear()
self._ovn_client_inst = None
is_maintenance = (utils.get_method_class(trigger) ==
worker.MaintenanceWorker)
if not is_maintenance:
if worker_class == neutron.wsgi.WorkerService:
self.node_uuid = db_hash_ring.add_node(self.hash_ring_group)
self._nb_ovn, self._sb_ovn = impl_idl_ovn.get_ovn_idls(self, trigger)
@ -252,7 +259,7 @@ class OVNMechanismDriver(api.MechanismDriver):
# Now IDL connections can be safely used.
self._post_fork_event.set()
if is_maintenance:
if worker_class == worker.MaintenanceWorker:
# Call the synchronization task if its maintenance worker
# This sync neutron DB to OVN-NB DB only in inconsistent states
self.nb_synchronizer = ovn_db_sync.OvnNbSynchronizer(


+ 5
- 0
networking_ovn/tests/functional/base.py View File

@ -21,7 +21,9 @@ import fixtures
import mock
from neutron.conf.plugins.ml2 import config
from neutron.plugins.ml2.drivers import type_geneve # noqa
from neutron import service # noqa
from neutron.tests.unit.plugins.ml2 import test_plugin
import neutron.wsgi
from neutron_lib import fixture
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
@ -110,6 +112,7 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase):
config.cfg.CONF.set_override('dns_servers',
['10.10.10.10'],
group='ovn')
config.cfg.CONF.set_override('api_workers', 1)
super(TestOVNFunctionalBase, self).setUp()
self.test_log_dir = os.path.join(DEFAULT_LOG_DIR, self.id())
@ -233,6 +236,8 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase):
if self.maintenance_worker:
trigger_cls.trigger.__self__.__class__ = worker.MaintenanceWorker
cfg.CONF.set_override('neutron_sync_mode', 'off', 'ovn')
else:
trigger_cls.trigger.__self__.__class__ = neutron.wsgi.WorkerService
self.addCleanup(self._collect_processes_logs)
self.addCleanup(self.stop)


+ 5
- 2
networking_ovn/tests/functional/test_ovsdb_monitor.py View File

@ -14,6 +14,7 @@
import mock
import fixtures
from oslo_config import cfg
from oslo_utils import uuidutils
from networking_ovn.common import config as ovn_conf
@ -183,12 +184,14 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN')
def test_distributed_lock(self):
api_workers = 11
cfg.CONF.set_override('api_workers', api_workers)
row_event = DistributedLockTestEvent()
self.mech_driver._nb_ovn.idl.notify_handler.watch_event(row_event)
worker_list = [self.mech_driver._nb_ovn, ]
# Create 10 fake workers
for _ in range(10):
for _ in range(api_workers - len(worker_list)):
node_uuid = uuidutils.generate_uuid()
db_hash_ring.add_node(ovn_const.HASH_RING_ML2_GROUP, node_uuid)
fake_driver = mock.MagicMock(
@ -232,7 +235,7 @@ class TestNBDbMonitorOverSsl(TestNBDbMonitor):
return 'ssl'
class OvnIdlProbeInterval(base.TestOVNFunctionalBase):
class TestOvnIdlProbeInterval(base.TestOVNFunctionalBase):
def setUp(self):
# skip parent setUp, we don't need it, but we do need grandparent
# pylint: disable=bad-super-call


+ 8
- 10
networking_ovn/tests/unit/common/test_hash_ring_manager.py View File

@ -14,11 +14,11 @@
# under the License.
import datetime
import time
import mock
from oslo_utils import timeutils
from neutron import service
from networking_ovn.common import constants
from networking_ovn.common import exceptions
from networking_ovn.common import hash_ring_manager
@ -102,23 +102,21 @@ class TestHashRingManager(db_base.DBTestCase):
# The ring should re-balance and as it was before
self._verify_hashes(hash_dict_before)
def test__wait_startup_before_caching(self):
@mock.patch.object(service, '_get_api_workers', return_value=2)
def test__wait_startup_before_caching(self, api_workers):
db_hash_ring.add_node(HASH_RING_TEST_GROUP, 'node-1')
db_hash_ring.add_node(HASH_RING_TEST_GROUP, 'node-2')
# Assert it will return True until created_at != updated_at
# Assert it will return True until until we equal api_workers
self.assertTrue(self.hash_ring_manager._wait_startup_before_caching)
self.assertTrue(self.hash_ring_manager._cache_startup_timeout)
self.assertTrue(self.hash_ring_manager._check_hashring_startup)
# Touch the nodes (== update the updated_at column)
time.sleep(1)
db_hash_ring.touch_nodes_from_host(HASH_RING_TEST_GROUP)
db_hash_ring.add_node(HASH_RING_TEST_GROUP, 'node-2')
# Assert it's now False. Waiting is not needed anymore
self.assertFalse(self.hash_ring_manager._wait_startup_before_caching)
self.assertFalse(self.hash_ring_manager._cache_startup_timeout)
self.assertFalse(self.hash_ring_manager._check_hashring_startup)
# Now assert that since the _cache_startup_timeout has been
# Now assert that since the _check_hashring_startup has been
# flipped, we no longer will read from the database
with mock.patch.object(hash_ring_manager.db_hash_ring,
'get_active_nodes') as get_nodes_mock:


Loading…
Cancel
Save