Revert "Move the "ovn_hash_ring" clean up to maintenance worker"

This reverts commit 819a1bb3e6.

Reason for revert:
This patch does not take care of other controller nodes registers.
We need to revert it to find a better solution.

Change-Id: I5a547c6baab8e5307a531480b0e10859256ad25d
This commit is contained in:
Rodolfo Alonso 2022-09-23 14:09:26 +00:00
parent 819a1bb3e6
commit 3f20cabc4f
11 changed files with 28 additions and 114 deletions

View File

@ -285,9 +285,6 @@ LB_EXT_IDS_VIP_PORT_ID_KEY = 'neutron:vip_port_id'
HASH_RING_NODES_TIMEOUT = 60
HASH_RING_TOUCH_INTERVAL = 30
HASH_RING_CACHE_TIMEOUT = 30
# NOTE(ralonsoh): the OVN hash ring clean up should not match with the node
# touch process, to avoid any database lock.
HASH_RING_CLEANUP_INTERVAL = int(HASH_RING_TOUCH_INTERVAL * 1.5)
HASH_RING_ML2_GROUP = 'mechanism_driver'
# Maximum chassis count where a gateway port can be hosted

View File

@ -30,14 +30,13 @@ LOG = log.getLogger(__name__)
class HashRingManager(object):
def __init__(self, group_name, init_time):
def __init__(self, group_name):
self._hash_ring = None
self._last_time_loaded = None
self._check_hashring_startup = True
self._group = group_name
# Flag to rate limit the caching log
self._prev_num_nodes = -1
self._init_time = init_time
self.admin_ctx = context.get_admin_context()
@property
@ -57,8 +56,7 @@ class HashRingManager(object):
api_workers = service._get_api_workers()
nodes = db_hash_ring.get_active_nodes(
self.admin_ctx,
constants.HASH_RING_CACHE_TIMEOUT, self._group, self._init_time,
from_host=True)
constants.HASH_RING_CACHE_TIMEOUT, self._group, from_host=True)
num_nodes = len(nodes)
if num_nodes >= api_workers:
@ -89,8 +87,8 @@ class HashRingManager(object):
not self._hash_ring.nodes or
cache_timeout >= self._last_time_loaded):
nodes = db_hash_ring.get_active_nodes(
self.admin_ctx, constants.HASH_RING_NODES_TIMEOUT,
self._group, self._init_time)
self.admin_ctx,
constants.HASH_RING_NODES_TIMEOUT, self._group)
self._hash_ring = hashring.HashRing({node.node_uuid
for node in nodes})
self._last_time_loaded = timeutils.utcnow()

View File

@ -37,19 +37,11 @@ def add_node(context, group_name, node_uuid=None):
return node_uuid
@db_api.CONTEXT_WRITER
def remove_nodes_from_host(context, group_name, created_before=None):
query = context.session.query(ovn_models.OVNHashRing).filter(
ovn_models.OVNHashRing.hostname == CONF.host,
ovn_models.OVNHashRing.group_name == group_name)
if created_before:
query = query.filter(
ovn_models.OVNHashRing.created_at < created_before)
# NOTE(ralonsoh): with "synchronize_session=False", the SQL action will
# be performed after the transaction commit. However, SQLAlchemy won't
# try to find those registers in the active session and won't update
# it after the deletion. This is the most efficient execution.
query.delete(synchronize_session=False)
def remove_nodes_from_host(context, group_name):
with db_api.CONTEXT_WRITER.using(context):
context.session.query(ovn_models.OVNHashRing).filter(
ovn_models.OVNHashRing.hostname == CONF.host,
ovn_models.OVNHashRing.group_name == group_name).delete()
def _touch(context, **filter_args):
@ -66,12 +58,10 @@ def touch_node(context, node_uuid):
_touch(context, node_uuid=node_uuid)
def get_active_nodes(context, interval, group_name, created_at,
from_host=False):
def get_active_nodes(context, interval, group_name, from_host=False):
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
with db_api.CONTEXT_READER.using(context):
query = context.session.query(ovn_models.OVNHashRing).filter(
ovn_models.OVNHashRing.created_at >= created_at,
ovn_models.OVNHashRing.updated_at >= limit,
ovn_models.OVNHashRing.group_name == group_name)
if from_host:

View File

@ -120,9 +120,12 @@ class OVNMechanismDriver(api.MechanismDriver):
self._maintenance_thread = None
self.node_uuid = None
self.hash_ring_group = ovn_const.HASH_RING_ML2_GROUP
self.init_time = timeutils.utcnow()
self.sg_enabled = ovn_acl.is_sg_enabled()
# NOTE(lucasagomes): _clean_hash_ring() must be called before
# self.subscribe() to avoid processes racing when adding or
# deleting nodes from the Hash Ring during service initialization
ovn_conf.register_opts()
self._clean_hash_ring()
self._post_fork_event = threading.Event()
if cfg.CONF.SECURITYGROUP.firewall_driver:
LOG.warning('Firewall driver configuration is ignored')
@ -391,7 +394,7 @@ class OVNMechanismDriver(api.MechanismDriver):
maintenance.DBInconsistenciesPeriodics(self._ovn_client))
self._maintenance_thread.add_periodics(
maintenance.HashRingHealthCheckPeriodics(
self.hash_ring_group, self.init_time))
self.hash_ring_group))
self._maintenance_thread.start()
def _create_security_group_precommit(self, resource, event, trigger,

View File

@ -26,7 +26,6 @@ from neutron_lib import constants as n_const
from neutron_lib import context as n_context
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log
from oslo_utils import timeutils
from ovsdbapp.backend.ovs_idl import event as row_event
@ -822,9 +821,8 @@ class DBInconsistenciesPeriodics(SchemaAwarePeriodicsBase):
class HashRingHealthCheckPeriodics(object):
def __init__(self, group, created_time):
def __init__(self, group):
self._group = group
self._created_time = created_time
self.ctx = n_context.get_admin_context()
@periodics.periodic(spacing=ovn_const.HASH_RING_TOUCH_INTERVAL)
@ -833,16 +831,3 @@ class HashRingHealthCheckPeriodics(object):
# here because we want the maintenance tasks from each instance to
# execute this task.
hash_ring_db.touch_nodes_from_host(self.ctx, self._group)
@periodics.periodic(spacing=ovn_const.HASH_RING_CLEANUP_INTERVAL)
def clean_up_hash_ring_nodes(self):
try:
hash_ring_db.remove_nodes_from_host(
self.ctx, self._group, created_before=self._created_time)
except db_exc.DBError as exc:
LOG.info('The "ovn_hash_ring" table was not cleaned; the '
'operation will be retried. Error: %s',
str(exc))
return
raise periodics.NeverAgain()

View File

@ -688,7 +688,7 @@ class OvnIdlDistributedLock(BaseOvnIdl):
self.notify_handler = OvnDbNotifyHandler(driver)
self._node_uuid = self.driver.node_uuid
self._hash_ring = hash_ring_manager.HashRingManager(
self.driver.hash_ring_group, self.driver.init_time)
self.driver.hash_ring_group)
self._last_touch = None
# This is a map of tables that may be new after OVN database is updated
self._tables_to_register = {

View File

@ -360,6 +360,9 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
trigger_cls.trigger.__self__.__class__ = neutron.wsgi.WorkerService
self.addCleanup(self.stop)
# NOTE(ralonsoh): do not access to the DB at exit when the SQL
# connection is already closed, to avoid useless exception messages.
mock.patch.object(self.mech_driver, '_clean_hash_ring').start()
self.mech_driver.pre_fork_initialize(
mock.ANY, mock.ANY, trigger_cls.trigger)

View File

@ -237,7 +237,6 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
def _create_workers(self, row_event, worker_num):
self.mech_driver.nb_ovn.idl.notify_handler.watch_event(row_event)
worker_list = [self.mech_driver.nb_ovn]
init_time = timeutils.utcnow()
# Create 10 fake workers
for _ in range(worker_num):
@ -246,8 +245,7 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid)
fake_driver = mock.MagicMock(
node_uuid=node_uuid,
hash_ring_group=ovn_const.HASH_RING_ML2_GROUP,
init_time=init_time)
hash_ring_group=ovn_const.HASH_RING_ML2_GROUP)
_idl = ovsdb_monitor.OvnNbIdl.from_server(
self.ovsdb_server_mgr.get_ovsdb_connection_path(),
self.nb_api.schema_helper, fake_driver)
@ -267,8 +265,7 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
len(db_hash_ring.get_active_nodes(
self.context,
interval=ovn_const.HASH_RING_NODES_TIMEOUT,
group_name=ovn_const.HASH_RING_ML2_GROUP,
created_at=self.mech_driver.init_time)))
group_name=ovn_const.HASH_RING_ML2_GROUP)))
return worker_list

View File

@ -33,9 +33,8 @@ class TestHashRingManager(testlib_api.SqlTestCaseLight):
def setUp(self):
super(TestHashRingManager, self).setUp()
init_time = timeutils.utcnow()
self.hash_ring_manager = hash_ring_manager.HashRingManager(
HASH_RING_TEST_GROUP, init_time)
HASH_RING_TEST_GROUP)
self.admin_ctx = context.get_admin_context()
def _verify_hashes(self, hash_dict):

View File

@ -121,7 +121,6 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
self.assertEqual(node_db.created_at, node_db.updated_at)
def test_active_nodes(self):
created_at = timeutils.utcnow()
self._add_nodes_and_assert_exists(count=3)
# Add another node from a different host
@ -131,8 +130,7 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
# Assert all nodes are active (within 60 seconds)
self.assertEqual(4, len(ovn_hash_ring_db.get_active_nodes(
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP,
created_at=created_at)))
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP)))
# Subtract 60 seconds from utcnow() and touch the nodes from our host
time.sleep(1)
@ -145,13 +143,11 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
# Now assert that all nodes from our host are seeing as offline.
# Only the node from another host should be active
active_nodes = ovn_hash_ring_db.get_active_nodes(
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP,
created_at=created_at)
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP)
self.assertEqual(1, len(active_nodes))
self.assertEqual(another_host_node, active_nodes[0].node_uuid)
def test_active_nodes_from_host(self):
created_at = timeutils.utcnow()
self._add_nodes_and_assert_exists(count=3)
# Add another node from a different host
@ -163,7 +159,7 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
# Assert only the 3 nodes from this host is returned
active_nodes = ovn_hash_ring_db.get_active_nodes(
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP,
from_host=True, created_at=created_at)
from_host=True)
self.assertEqual(3, len(active_nodes))
self.assertNotIn(another_host_id, active_nodes)
@ -189,21 +185,18 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
self.assertEqual(node_db.created_at, node_db.updated_at)
def test_active_nodes_different_groups(self):
created_at = timeutils.utcnow()
another_group = 'another_test_group'
self._add_nodes_and_assert_exists(count=3)
self._add_nodes_and_assert_exists(count=2, group_name=another_group)
active_nodes = ovn_hash_ring_db.get_active_nodes(
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP,
created_at=created_at)
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP)
self.assertEqual(3, len(active_nodes))
for node in active_nodes:
self.assertEqual(HASH_RING_TEST_GROUP, node.group_name)
active_nodes = ovn_hash_ring_db.get_active_nodes(
self.admin_ctx, interval=60, group_name=another_group,
created_at=created_at)
self.admin_ctx, interval=60, group_name=another_group)
self.assertEqual(2, len(active_nodes))
for node in active_nodes:
self.assertEqual(another_group, node.group_name)

View File

@ -13,22 +13,17 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from unittest import mock
from futurist import periodics
from neutron_lib import context
from neutron_lib.db import api as db_api
from oslo_config import cfg
from oslo_utils import timeutils
from oslo_utils import uuidutils
from neutron.common.ovn import constants
from neutron.common.ovn import utils
from neutron.conf import common as common_conf
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
from neutron.db.models import ovn as ovn_models
from neutron.db import ovn_hash_ring_db
from neutron.db import ovn_revision_numbers_db
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import maintenance
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync
@ -710,49 +705,3 @@ class TestDBInconsistenciesPeriodics(testlib_api.SqlTestCaseLight,
expected_calls = [mock.call('Logical_Switch_Port', lsp0.uuid,
('type', constants.LSP_TYPE_VIRTUAL))]
nb_idl.db_set.assert_has_calls(expected_calls)
class TestHashRingHealthCheckPeriodics(testlib_api.SqlTestCaseLight):
def setUp(self):
super().setUp()
common_conf.register_core_common_config_opts()
self.ctx = context.get_admin_context()
self.group = uuidutils.generate_uuid()
self.created_time = timeutils.utcnow()
self.hr_check_periodics = maintenance.HashRingHealthCheckPeriodics(
self.group, self.created_time)
def test_clean_up_hash_ring_nodes(self):
num_nodes = 10
utc_zero = datetime.datetime.fromtimestamp(0)
# This loop will create "ovn_hash_ring" registers from
# utcnow - (num_nodes/2) to utcnow + (num_nodes/2) - 1. That means
# we'll have old/stale registers and new registers.
for idx in range(- int(num_nodes / 2), int(num_nodes / 2)):
_uuid = ovn_hash_ring_db.add_node(self.ctx, self.group)
new_time = self.created_time + datetime.timedelta(seconds=idx)
with db_api.CONTEXT_WRITER.using(self.ctx):
self.ctx.session.query(ovn_models.OVNHashRing).filter(
ovn_models.OVNHashRing.node_uuid == _uuid).update(
{'updated_at': new_time, 'created_at': new_time})
all_nodes = ovn_hash_ring_db.get_active_nodes(
self.ctx, 10, self.group, utc_zero)
# "num_nodes" registers created
self.assertEqual(num_nodes, len(all_nodes))
# Only "num_nodes/2" registers are active (created_at is updated)
active_nodes = ovn_hash_ring_db.get_active_nodes(
self.ctx, 10, self.group, self.created_time)
self.assertEqual(int(num_nodes / 2), len(active_nodes))
self.assertRaises(periodics.NeverAgain,
self.hr_check_periodics.clean_up_hash_ring_nodes)
all_nodes = ovn_hash_ring_db.get_active_nodes(
self.ctx, 10, self.group, utc_zero)
# Only active registers remain in the table.
self.assertEqual(int(num_nodes / 2), len(all_nodes))
# Only active registers remain in the table.
active_nodes = ovn_hash_ring_db.get_active_nodes(
self.ctx, 10, self.group, self.created_time)
self.assertEqual(int(num_nodes / 2), len(active_nodes))