Merge "[OVN] Create a OVN hash ring maintenance thread per worker"

This commit is contained in:
Zuul 2024-12-23 20:46:36 +00:00 committed by Gerrit Code Review
commit d7ae83e35e
9 changed files with 137 additions and 102 deletions

View File

@ -32,6 +32,7 @@ class HashRingManager:
def __init__(self, group_name):
self._hash_ring = None
self._node_last_touch = {}
self._last_time_loaded = None
self._check_hashring_startup = True
self._group = group_name
@ -92,6 +93,8 @@ class HashRingManager:
constants.HASH_RING_NODES_TIMEOUT, self._group)
self._hash_ring = hashring.HashRing({node.node_uuid
for node in nodes})
self._node_last_touch = {node.node_uuid: node.updated_at
for node in nodes}
self._last_time_loaded = timeutils.utcnow()
self._offline_node_count = db_hash_ring.count_offline_nodes(
self.admin_ctx, constants.HASH_RING_NODES_TIMEOUT,
@ -112,7 +115,8 @@ class HashRingManager:
try:
# We need to pop the value from the set. If empty,
# KeyError is raised
return self._hash_ring[key].pop()
node_uuid = self._hash_ring[key].pop()
return node_uuid, self._node_last_touch[node_uuid]
except KeyError:
raise exceptions.HashRingIsEmpty(
key=key, node_count=self._offline_node_count)

View File

@ -57,6 +57,14 @@ def get_nodes(context, group_name, created_at=None):
return query.all()
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_READER
def get_node(context, group_name, node_uuid):
return context.session.query(ovn_models.OVNHashRing).filter(
ovn_models.OVNHashRing.group_name == group_name,
ovn_models.OVNHashRing.node_uuid == node_uuid).one()
@db_api.retry_if_session_inactive()
def remove_nodes_from_host(context, group_name, created_at=None):
with (db_api.CONTEXT_WRITER.using(context)):
@ -91,21 +99,17 @@ def cleanup_old_nodes(context, days):
LOG.info('Cleaned up Hash Ring nodes older than %d days', days)
@db_api.retry_if_session_inactive()
def _touch(context, updated_at=None, **filter_args):
@db_api.CONTEXT_WRITER
def touch_node(context, node_uuid, updated_at=None):
# NOTE(ralonsoh): there are several mechanisms to update the node OVN hash
# ring register. This method does not retry the DB operation in case of
# failure but relies on the success of later calls. That will prevent from
# blocking the DB needlessly.
if updated_at is None:
updated_at = timeutils.utcnow()
with db_api.CONTEXT_WRITER.using(context):
context.session.query(ovn_models.OVNHashRing).filter_by(
**filter_args).update({'updated_at': updated_at})
def touch_nodes_from_host(context, group_name):
_touch(context, hostname=CONF.host, group_name=group_name)
def touch_node(context, node_uuid):
_touch(context, node_uuid=node_uuid)
context.session.query(ovn_models.OVNHashRing).filter(
ovn_models.OVNHashRing.node_uuid == node_uuid).update(
{'updated_at': updated_at})
def _get_nodes_query(context, interval, group_name, offline=False,

View File

@ -326,52 +326,52 @@ class OVNMechanismDriver(api.MechanismDriver):
atexit.register(self._remove_node_from_hash_ring)
sh.add_handler("SIGTERM", self._remove_node_from_hash_ring)
admin_context = n_context.get_admin_context()
if self._start_time:
self._setup_hash_ring_start_time()
self._setup_hash_ring_start_time(admin_context)
else:
self._setup_hash_ring_event()
self._setup_hash_ring_event(admin_context)
self._register_hash_ring_maintenance()
def _register_hash_ring_maintenance(self):
"""Maintenance method for the node OVN hash ring register
The ``self.node_uuid`` value must be set before calling this method.
"""
self._hash_ring_thread = maintenance.MaintenanceThread()
self._hash_ring_thread.add_periodics(
maintenance.HashRingHealthCheckPeriodics(
self.hash_ring_group))
self.hash_ring_group, self.node_uuid))
self._hash_ring_thread.start()
LOG.info('Hash Ring probing thread has started')
LOG.info('Hash Ring probing thread for node %s has started',
self.node_uuid)
def _setup_hash_ring_event(self):
def _setup_hash_ring_event(self, context):
LOG.debug('Hash Ring setup using multiprocess event lock')
admin_context = n_context.get_admin_context()
if not self._hash_ring_probe_event.is_set():
# Clear existing entries. This code section should be executed
# only once per node (chassis); the multiprocess event should be
# set just after the ``is_set`` check.
self._hash_ring_probe_event.set()
ovn_hash_ring_db.remove_nodes_from_host(admin_context,
ovn_hash_ring_db.remove_nodes_from_host(context,
self.hash_ring_group)
self._register_hash_ring_maintenance()
self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
self.node_uuid = ovn_hash_ring_db.add_node(context,
self.hash_ring_group)
def _setup_hash_ring_start_time(self):
@db_api.retry_if_session_inactive()
@db_api.CONTEXT_WRITER
def _setup_hash_ring_start_time(self, context):
LOG.debug('Hash Ring setup using WSGI start time')
admin_context = n_context.get_admin_context()
with db_api.CONTEXT_WRITER.using(admin_context):
# Delete all node registers without created_at=self._start_time
created_at = n_utils.ts_to_datetime(self._start_time)
ovn_hash_ring_db.remove_nodes_from_host(
admin_context, self.hash_ring_group, created_at=created_at)
self.node_uuid = ovn_hash_ring_db.add_node(
admin_context, self.hash_ring_group, created_at=created_at)
newer_nodes = ovn_hash_ring_db.get_nodes(
admin_context, self.hash_ring_group, created_at=created_at)
LOG.debug('Hash Ring setup, this worker has detected %s OVN hash'
'ring registers in the database', len(newer_nodes))
if len(newer_nodes) == 1:
# If only one register per host is present, that means this worker
# is the first one to register itself.
self._register_hash_ring_maintenance()
# Delete all node registers without created_at=self._start_time
created_at = n_utils.ts_to_datetime(self._start_time)
ovn_hash_ring_db.remove_nodes_from_host(
context, self.hash_ring_group, created_at=created_at)
self.node_uuid = ovn_hash_ring_db.add_node(
context, self.hash_ring_group, created_at=created_at)
newer_nodes = ovn_hash_ring_db.get_nodes(
context, self.hash_ring_group, created_at=created_at)
LOG.debug('Hash Ring setup, this worker has detected %s OVN hash '
'ring registers in the database', len(newer_nodes))
def post_fork_initialize(self, resource, event, trigger, payload=None):
# Initialize API/Maintenance workers with OVN IDL connections

View File

@ -1111,16 +1111,17 @@ class DBInconsistenciesPeriodics(SchemaAwarePeriodicsBase):
class HashRingHealthCheckPeriodics:
def __init__(self, group):
def __init__(self, group, node_uuid):
self._group = group
self._node_uuid = node_uuid
self.ctx = n_context.get_admin_context()
@periodics.periodic(spacing=ovn_const.HASH_RING_TOUCH_INTERVAL)
def touch_hash_ring_nodes(self):
def touch_hash_ring_node(self):
# NOTE(lucasagomes): Note that we do not rely on the OVSDB lock
# here because we want the maintenance tasks from each instance to
# execute this task.
hash_ring_db.touch_nodes_from_host(self.ctx, self._group)
hash_ring_db.touch_node(self.ctx, self._node_uuid)
# Check the number of the nodes in the ring and log a message in
# case they are out of sync. See LP #2024205 for more information

View File

@ -824,7 +824,8 @@ class OvnIdlDistributedLock(BaseOvnIdl):
try:
self.notify_handler.notify(event, row, updates, global_=True)
try:
target_node = self._hash_ring.get_node(str(row.uuid))
target_node, node_last_touch = self._hash_ring.get_node(
str(row.uuid))
except exceptions.HashRingIsEmpty as e:
LOG.error('HashRing is empty, error: %s', e)
return
@ -833,6 +834,7 @@ class OvnIdlDistributedLock(BaseOvnIdl):
# If the worker hasn't been health checked by the maintenance
# thread (see bug #1834498), indicate that it's alive here
self._last_touch = node_last_touch
time_now = timeutils.utcnow()
touch_timeout = time_now - datetime.timedelta(
seconds=ovn_const.HASH_RING_TOUCH_INTERVAL)
@ -843,7 +845,6 @@ class OvnIdlDistributedLock(BaseOvnIdl):
try:
ctx = neutron_context.get_admin_context()
ovn_hash_ring_db.touch_node(ctx, self._node_uuid)
self._last_touch = time_now
except Exception:
LOG.exception('Hash Ring node %s failed to heartbeat',
self._node_uuid)

View File

@ -80,18 +80,14 @@ class TestOVNMechanismDriver(base.TestOVNFunctionalBase):
start_time = timeutils.utcnow()
self.mech_driver._start_time = int(start_time.timestamp())
with mock.patch.object(self.mech_driver,
'_register_hash_ring_maintenance') as \
mock_register_maintenance:
for _ in range(3):
self.mech_driver._setup_hash_ring_start_time()
for _ in range(3):
self.mech_driver._setup_hash_ring_start_time(self.context)
ovn_hrs = ovn_hash_ring_db.get_nodes(self.context, ring_group)
self.assertEqual(3, len(ovn_hrs))
for ovn_hr in ovn_hrs:
self.assertEqual(int(start_time.timestamp()),
ovn_hr.created_at.timestamp())
mock_register_maintenance.assert_called_once()
class TestPortBinding(base.TestOVNFunctionalBase):

View File

@ -38,19 +38,23 @@ class TestHashRingManager(testlib_api.SqlTestCaseLight):
self.admin_ctx = context.get_admin_context()
def _verify_hashes(self, hash_dict):
for uuid_, target_node in hash_dict.items():
self.assertEqual(target_node,
self.hash_ring_manager.get_node(uuid_))
for node, target_node in hash_dict.items():
self.assertEqual(target_node.node_uuid,
self.hash_ring_manager.get_node(node)[0])
self.assertEqual(target_node.updated_at,
self.hash_ring_manager.get_node(node)[1])
def test_get_node(self):
# Use pre-defined UUIDs to make the hashes predictable
node_1_uuid = db_hash_ring.add_node(
self.admin_ctx, HASH_RING_TEST_GROUP, 'node-1')
node_2_uuid = db_hash_ring.add_node(
self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2')
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-1')
node1 = db_hash_ring.get_node(self.admin_ctx, HASH_RING_TEST_GROUP,
'node-1')
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2')
node2 = db_hash_ring.get_node(self.admin_ctx, HASH_RING_TEST_GROUP,
'node-2')
hash_dict_before = {'fake-uuid': node_1_uuid,
'fake-uuid-0': node_2_uuid}
hash_dict_before = {'fake-uuid': node1,
'fake-uuid-0': node2}
self._verify_hashes(hash_dict_before)
def test_get_node_no_active_nodes(self):
@ -60,15 +64,19 @@ class TestHashRingManager(testlib_api.SqlTestCaseLight):
def test_ring_rebalance(self):
# Use pre-defined UUIDs to make the hashes predictable
node_1_uuid = db_hash_ring.add_node(
self.admin_ctx, HASH_RING_TEST_GROUP, 'node-1')
node_2_uuid = db_hash_ring.add_node(
self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2')
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-1')
node1 = db_hash_ring.get_node(self.admin_ctx, HASH_RING_TEST_GROUP,
'node-1')
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP, 'node-2')
node2 = db_hash_ring.get_node(self.admin_ctx, HASH_RING_TEST_GROUP,
'node-2')
# Add another node from a different host
with mock.patch.object(db_hash_ring, 'CONF') as mock_conf:
mock_conf.host = 'another-host-52359446-c366'
another_host_node = db_hash_ring.add_node(
db_hash_ring.add_node(self.admin_ctx, HASH_RING_TEST_GROUP,
'another-host')
node_other = db_hash_ring.get_node(
self.admin_ctx, HASH_RING_TEST_GROUP, 'another-host')
# Assert all nodes are alive in the ring
@ -76,9 +84,9 @@ class TestHashRingManager(testlib_api.SqlTestCaseLight):
self.assertEqual(3, len(self.hash_ring_manager._hash_ring.nodes))
# Hash certain values against the nodes
hash_dict_before = {'fake-uuid': node_1_uuid,
'fake-uuid-0': node_2_uuid,
'fake-uuid-ABCDE': another_host_node}
hash_dict_before = {'fake-uuid': node1,
'fake-uuid-0': node2,
'fake-uuid-ABCDE': node_other}
self._verify_hashes(hash_dict_before)
# Mock utcnow() as the HASH_RING_NODES_TIMEOUT have expired
@ -87,27 +95,34 @@ class TestHashRingManager(testlib_api.SqlTestCaseLight):
seconds=constants.HASH_RING_NODES_TIMEOUT)
with mock.patch.object(timeutils, 'utcnow') as mock_utcnow:
mock_utcnow.return_value = fake_utcnow
db_hash_ring.touch_nodes_from_host(
self.admin_ctx, HASH_RING_TEST_GROUP)
for _node in [node1, node2]:
db_hash_ring.touch_node(self.admin_ctx, _node.node_uuid)
# Now assert that the ring was re-balanced and only the node from
# another host is marked as alive
self.hash_ring_manager.refresh()
self.assertEqual([another_host_node],
self.assertEqual([node_other.node_uuid],
list(self.hash_ring_manager._hash_ring.nodes.keys()))
# Now only "another_host_node" is alive, all values should hash to it
hash_dict_after_rebalance = {'fake-uuid': another_host_node,
'fake-uuid-0': another_host_node,
'fake-uuid-ABCDE': another_host_node}
hash_dict_after_rebalance = {'fake-uuid': node_other,
'fake-uuid-0': node_other,
'fake-uuid-ABCDE': node_other}
self._verify_hashes(hash_dict_after_rebalance)
# Now touch the nodes so they appear active again
db_hash_ring.touch_nodes_from_host(
self.admin_ctx, HASH_RING_TEST_GROUP)
for _node in [node1, node2]:
db_hash_ring.touch_node(self.admin_ctx, _node.node_uuid)
self.hash_ring_manager.refresh()
# The ring should re-balance and as it was before
node1 = db_hash_ring.get_node(self.admin_ctx, HASH_RING_TEST_GROUP,
'node-1')
node2 = db_hash_ring.get_node(self.admin_ctx, HASH_RING_TEST_GROUP,
'node-2')
hash_dict_before = {'fake-uuid': node1,
'fake-uuid-0': node2,
'fake-uuid-ABCDE': node_other}
self._verify_hashes(hash_dict_before)
@mock.patch.object(hash_ring_manager.LOG, 'debug')

View File

@ -89,7 +89,7 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
self.assertIsNotNone(self._get_node_row(another_host_node))
def test_touch_nodes_from_host(self):
nodes = self._add_nodes_and_assert_exists(count=3)
node_uuids = self._add_nodes_and_assert_exists(count=3)
# Add another node from a different host
with mock.patch.object(ovn_hash_ring_db, 'CONF') as mock_conf:
@ -97,8 +97,8 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
another_host_node = self._add_nodes_and_assert_exists()[0]
# Assert that updated_at isn't updated yet
for node in nodes:
node_db = self._get_node_row(node)
for node_uuid in node_uuids:
node_db = self._get_node_row(node_uuid)
self.assertEqual(node_db.created_at, node_db.updated_at)
# Assert the same for the node from another host
@ -107,12 +107,12 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
# Touch the nodes from our host
time.sleep(1)
ovn_hash_ring_db.touch_nodes_from_host(self.admin_ctx,
HASH_RING_TEST_GROUP)
for node_uuid in node_uuids:
ovn_hash_ring_db.touch_node(self.admin_ctx, node_uuid)
# Assert that updated_at is now updated
for node in nodes:
node_db = self._get_node_row(node)
for node_uuid in node_uuids:
node_db = self._get_node_row(node_uuid)
self.assertGreater(node_db.updated_at, node_db.created_at)
# Assert that the node from another host hasn't been touched
@ -121,7 +121,7 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
self.assertEqual(node_db.created_at, node_db.updated_at)
def test_active_nodes(self):
self._add_nodes_and_assert_exists(count=3)
node_uuids = self._add_nodes_and_assert_exists(count=3)
# Add another node from a different host
with mock.patch.object(ovn_hash_ring_db, 'CONF') as mock_conf:
@ -137,8 +137,8 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
fake_utcnow = timeutils.utcnow() - datetime.timedelta(seconds=60)
with mock.patch.object(timeutils, 'utcnow') as mock_utcnow:
mock_utcnow.return_value = fake_utcnow
ovn_hash_ring_db.touch_nodes_from_host(self.admin_ctx,
HASH_RING_TEST_GROUP)
for node_uuid in node_uuids:
ovn_hash_ring_db.touch_node(self.admin_ctx, node_uuid)
# Now assert that all nodes from our host are seeing as offline.
# Only the node from another host should be active
@ -230,8 +230,8 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
# Touch the nodes from group1
time.sleep(1)
ovn_hash_ring_db.touch_nodes_from_host(self.admin_ctx,
HASH_RING_TEST_GROUP)
for node_uuid in group1:
ovn_hash_ring_db.touch_node(self.admin_ctx, node_uuid)
# Assert that updated_at was updated for group1
for node in group1:
@ -244,7 +244,7 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
self.assertEqual(node_db.created_at, node_db.updated_at)
def test_count_offline_nodes(self):
self._add_nodes_and_assert_exists(count=3)
node_uuids = self._add_nodes_and_assert_exists(count=3)
# Assert no nodes are considered offline
self.assertEqual(0, ovn_hash_ring_db.count_offline_nodes(
@ -255,16 +255,16 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
fake_utcnow = timeutils.utcnow() - datetime.timedelta(seconds=60)
with mock.patch.object(timeutils, 'utcnow') as mock_utcnow:
mock_utcnow.return_value = fake_utcnow
ovn_hash_ring_db.touch_nodes_from_host(self.admin_ctx,
HASH_RING_TEST_GROUP)
for node_uuid in node_uuids:
ovn_hash_ring_db.touch_node(self.admin_ctx, node_uuid)
# Now assert that all nodes from our host are seeing as offline
self.assertEqual(3, ovn_hash_ring_db.count_offline_nodes(
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP))
# Touch the nodes again without faking utcnow()
ovn_hash_ring_db.touch_nodes_from_host(self.admin_ctx,
HASH_RING_TEST_GROUP)
for node_uuid in node_uuids:
ovn_hash_ring_db.touch_node(self.admin_ctx, node_uuid)
# Assert no nodes are considered offline
self.assertEqual(0, ovn_hash_ring_db.count_offline_nodes(
@ -288,15 +288,15 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
def test_cleanup_old_nodes(self):
# Add 2 new nodes
self._add_nodes_and_assert_exists(count=2)
node_uuids = self._add_nodes_and_assert_exists(count=2)
# Subtract 5 days from utcnow() and touch the nodes to make
# them to appear stale
fake_utcnow = timeutils.utcnow() - datetime.timedelta(days=5)
with mock.patch.object(timeutils, 'utcnow') as mock_utcnow:
mock_utcnow.return_value = fake_utcnow
ovn_hash_ring_db.touch_nodes_from_host(self.admin_ctx,
HASH_RING_TEST_GROUP)
for node_uuid in node_uuids:
ovn_hash_ring_db.touch_node(self.admin_ctx, node_uuid)
# Add 3 new nodes
self._add_nodes_and_assert_exists(count=3)

View File

@ -220,7 +220,8 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
self.mock_get_node = mock.patch.object(
hash_ring_manager.HashRingManager,
'get_node', return_value=self.node_uuid).start()
'get_node',
return_value=(self.node_uuid, timeutils.utcnow())).start()
self.mock_update_tables = mock.patch.object(
self.idl, 'update_tables').start()
@ -231,9 +232,17 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
self.assertEqual(2, len(self.idl.notify_handler.mock_calls))
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
def test_notify(self, mock_touch_node):
def test_notify_updated_node(self, mock_touch_node):
self.idl.notify(self.fake_event, self.fake_row)
mock_touch_node.assert_not_called()
self._assert_has_notify_calls()
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
def test_notify_not_updated_node(self, mock_touch_node):
updated_at = timeutils.utcnow() - datetime.timedelta(
seconds=ovn_const.HASH_RING_CACHE_TIMEOUT + 10)
self.mock_get_node.return_value = (self.node_uuid, updated_at)
self.idl.notify(self.fake_event, self.fake_row)
mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid)
self._assert_has_notify_calls()
@ -266,6 +275,9 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
@mock.patch.object(ovsdb_monitor.LOG, 'exception')
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
def test_notify_touch_node_exception(self, mock_touch_node, mock_log):
updated_at = timeutils.utcnow() - datetime.timedelta(
seconds=ovn_const.HASH_RING_CACHE_TIMEOUT + 10)
self.mock_get_node.return_value = (self.node_uuid, updated_at)
mock_touch_node.side_effect = Exception('BoOooOmmMmmMm')
self.idl.notify(self.fake_event, self.fake_row)
@ -277,7 +289,8 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
self._assert_has_notify_calls()
def test_notify_different_node(self):
self.mock_get_node.return_value = 'different-node-uuid'
self.mock_get_node.return_value = ('different-node-uuid',
timeutils.utcnow())
self.idl.notify('fake-event', self.fake_row)
# Assert that notify() wasn't called for a different node uuid
self.idl.notify_handler.notify.assert_called_once_with(
@ -400,7 +413,8 @@ class TestOvnNbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
self.mech_driver.set_port_status_up = mock.Mock()
self.mech_driver.set_port_status_down = mock.Mock()
self._mock_hash_ring = mock.patch.object(
self.idl._hash_ring, 'get_node', return_value=self.idl._node_uuid)
self.idl._hash_ring, 'get_node',
return_value=(self.idl._node_uuid, timeutils.utcnow()))
self._mock_hash_ring.start()
def _test_lsp_helper(self, event, new_row_json, old_row_json=None,
@ -560,7 +574,7 @@ class TestOvnSbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
}
self._mock_hash_ring = mock.patch.object(
self.sb_idl._hash_ring, 'get_node',
return_value=self.sb_idl._node_uuid)
return_value=(self.sb_idl._node_uuid, timeutils.utcnow()))
self._mock_hash_ring.start()
def _test_chassis_helper(self, event, new_row_json, old_row_json=None):