diff --git a/neutron/common/ovn/constants.py b/neutron/common/ovn/constants.py index 66a870ef890..2ee6a101e73 100644 --- a/neutron/common/ovn/constants.py +++ b/neutron/common/ovn/constants.py @@ -285,6 +285,9 @@ LB_EXT_IDS_VIP_PORT_ID_KEY = 'neutron:vip_port_id' HASH_RING_NODES_TIMEOUT = 60 HASH_RING_TOUCH_INTERVAL = 30 HASH_RING_CACHE_TIMEOUT = 30 +# NOTE(ralonsoh): the OVN hash ring clean up should not match with the node +# touch process, to avoid any database lock. +HASH_RING_CLEANUP_INTERVAL = int(HASH_RING_TOUCH_INTERVAL * 1.5) HASH_RING_ML2_GROUP = 'mechanism_driver' # Maximum chassis count where a gateway port can be hosted diff --git a/neutron/common/ovn/hash_ring_manager.py b/neutron/common/ovn/hash_ring_manager.py index 9c72269854b..b45ed0ed65c 100644 --- a/neutron/common/ovn/hash_ring_manager.py +++ b/neutron/common/ovn/hash_ring_manager.py @@ -30,13 +30,14 @@ LOG = log.getLogger(__name__) class HashRingManager(object): - def __init__(self, group_name): + def __init__(self, group_name, init_time): self._hash_ring = None self._last_time_loaded = None self._check_hashring_startup = True self._group = group_name # Flag to rate limit the caching log self._prev_num_nodes = -1 + self._init_time = init_time self.admin_ctx = context.get_admin_context() @property @@ -56,7 +57,8 @@ class HashRingManager(object): api_workers = service._get_api_workers() nodes = db_hash_ring.get_active_nodes( self.admin_ctx, - constants.HASH_RING_CACHE_TIMEOUT, self._group, from_host=True) + constants.HASH_RING_CACHE_TIMEOUT, self._group, self._init_time, + from_host=True) num_nodes = len(nodes) if num_nodes >= api_workers: @@ -87,8 +89,8 @@ class HashRingManager(object): not self._hash_ring.nodes or cache_timeout >= self._last_time_loaded): nodes = db_hash_ring.get_active_nodes( - self.admin_ctx, - constants.HASH_RING_NODES_TIMEOUT, self._group) + self.admin_ctx, constants.HASH_RING_NODES_TIMEOUT, + self._group, self._init_time) self._hash_ring = hashring.HashRing({node.node_uuid for node in nodes}) self._last_time_loaded = timeutils.utcnow() diff --git a/neutron/db/ovn_hash_ring_db.py b/neutron/db/ovn_hash_ring_db.py index 1cea2f6bef2..6468c0711cc 100644 --- a/neutron/db/ovn_hash_ring_db.py +++ b/neutron/db/ovn_hash_ring_db.py @@ -37,11 +37,19 @@ def add_node(context, group_name, node_uuid=None): return node_uuid -def remove_nodes_from_host(context, group_name): - with db_api.CONTEXT_WRITER.using(context): - context.session.query(ovn_models.OVNHashRing).filter( - ovn_models.OVNHashRing.hostname == CONF.host, - ovn_models.OVNHashRing.group_name == group_name).delete() +@db_api.CONTEXT_WRITER +def remove_nodes_from_host(context, group_name, created_before=None): + query = context.session.query(ovn_models.OVNHashRing).filter( + ovn_models.OVNHashRing.hostname == CONF.host, + ovn_models.OVNHashRing.group_name == group_name) + if created_before: + query = query.filter( + ovn_models.OVNHashRing.created_at < created_before) + # NOTE(ralonsoh): with "synchronize_session=False", the SQL action will + # be performed after the transaction commit. However, SQLAlchemy won't + # try to find those registers in the active session and won't update + # it after the deletion. This is the most efficient execution. + query.delete(synchronize_session=False) def _touch(context, **filter_args): @@ -58,10 +66,12 @@ def touch_node(context, node_uuid): _touch(context, node_uuid=node_uuid) -def get_active_nodes(context, interval, group_name, from_host=False): +def get_active_nodes(context, interval, group_name, created_at, + from_host=False): limit = timeutils.utcnow() - datetime.timedelta(seconds=interval) with db_api.CONTEXT_READER.using(context): query = context.session.query(ovn_models.OVNHashRing).filter( + ovn_models.OVNHashRing.created_at >= created_at, ovn_models.OVNHashRing.updated_at >= limit, ovn_models.OVNHashRing.group_name == group_name) if from_host: diff --git a/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py b/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py index ffea23296ca..b8203638adf 100644 --- a/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py +++ b/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py @@ -120,12 +120,9 @@ class OVNMechanismDriver(api.MechanismDriver): self._maintenance_thread = None self.node_uuid = None self.hash_ring_group = ovn_const.HASH_RING_ML2_GROUP + self.init_time = timeutils.utcnow() self.sg_enabled = ovn_acl.is_sg_enabled() - # NOTE(lucasagomes): _clean_hash_ring() must be called before - # self.subscribe() to avoid processes racing when adding or - # deleting nodes from the Hash Ring during service initialization ovn_conf.register_opts() - self._clean_hash_ring() self._post_fork_event = threading.Event() if cfg.CONF.SECURITYGROUP.firewall_driver: LOG.warning('Firewall driver configuration is ignored') @@ -394,7 +391,7 @@ class OVNMechanismDriver(api.MechanismDriver): maintenance.DBInconsistenciesPeriodics(self._ovn_client)) self._maintenance_thread.add_periodics( maintenance.HashRingHealthCheckPeriodics( - self.hash_ring_group)) + self.hash_ring_group, self.init_time)) self._maintenance_thread.start() def _create_security_group_precommit(self, resource, event, trigger, diff --git a/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/maintenance.py b/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/maintenance.py index de19ebf57f5..f703a1c324d 100644 --- a/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/maintenance.py +++ b/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/maintenance.py @@ -26,6 +26,7 @@ from neutron_lib import constants as n_const from neutron_lib import context as n_context from neutron_lib import exceptions as n_exc from oslo_config import cfg +from oslo_db import exception as db_exc from oslo_log import log from oslo_utils import timeutils from ovsdbapp.backend.ovs_idl import event as row_event @@ -821,8 +822,9 @@ class DBInconsistenciesPeriodics(SchemaAwarePeriodicsBase): class HashRingHealthCheckPeriodics(object): - def __init__(self, group): + def __init__(self, group, created_time): self._group = group + self._created_time = created_time self.ctx = n_context.get_admin_context() @periodics.periodic(spacing=ovn_const.HASH_RING_TOUCH_INTERVAL) @@ -831,3 +833,16 @@ class HashRingHealthCheckPeriodics(object): # here because we want the maintenance tasks from each instance to # execute this task. hash_ring_db.touch_nodes_from_host(self.ctx, self._group) + + @periodics.periodic(spacing=ovn_const.HASH_RING_CLEANUP_INTERVAL) + def clean_up_hash_ring_nodes(self): + try: + hash_ring_db.remove_nodes_from_host( + self.ctx, self._group, created_before=self._created_time) + except db_exc.DBError as exc: + LOG.info('The "ovn_hash_ring" table was not cleaned; the ' + 'operation will be retried. Error: %s', + str(exc)) + return + + raise periodics.NeverAgain() diff --git a/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py b/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py index ed7dbf914f3..96db9ce5135 100644 --- a/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py +++ b/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py @@ -688,7 +688,7 @@ class OvnIdlDistributedLock(BaseOvnIdl): self.notify_handler = OvnDbNotifyHandler(driver) self._node_uuid = self.driver.node_uuid self._hash_ring = hash_ring_manager.HashRingManager( - self.driver.hash_ring_group) + self.driver.hash_ring_group, self.driver.init_time) self._last_touch = None # This is a map of tables that may be new after OVN database is updated self._tables_to_register = { diff --git a/neutron/tests/functional/base.py b/neutron/tests/functional/base.py index 55ecf7a256d..0dc05d7bafe 100644 --- a/neutron/tests/functional/base.py +++ b/neutron/tests/functional/base.py @@ -360,9 +360,6 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase, trigger_cls.trigger.__self__.__class__ = neutron.wsgi.WorkerService self.addCleanup(self.stop) - # NOTE(ralonsoh): do not access to the DB at exit when the SQL - # connection is already closed, to avoid useless exception messages. - mock.patch.object(self.mech_driver, '_clean_hash_ring').start() self.mech_driver.pre_fork_initialize( mock.ANY, mock.ANY, trigger_cls.trigger) diff --git a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py index 8854afeae99..6a46a6d65b4 100644 --- a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py +++ b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py @@ -237,6 +237,7 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase): def _create_workers(self, row_event, worker_num): self.mech_driver.nb_ovn.idl.notify_handler.watch_event(row_event) worker_list = [self.mech_driver.nb_ovn] + init_time = timeutils.utcnow() # Create 10 fake workers for _ in range(worker_num): @@ -245,7 +246,8 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase): self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid) fake_driver = mock.MagicMock( node_uuid=node_uuid, - hash_ring_group=ovn_const.HASH_RING_ML2_GROUP) + hash_ring_group=ovn_const.HASH_RING_ML2_GROUP, + init_time=init_time) _idl = ovsdb_monitor.OvnNbIdl.from_server( self.ovsdb_server_mgr.get_ovsdb_connection_path(), self.nb_api.schema_helper, fake_driver) @@ -265,7 +267,8 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase): len(db_hash_ring.get_active_nodes( self.context, interval=ovn_const.HASH_RING_NODES_TIMEOUT, - group_name=ovn_const.HASH_RING_ML2_GROUP))) + group_name=ovn_const.HASH_RING_ML2_GROUP, + created_at=self.mech_driver.init_time))) return worker_list diff --git a/neutron/tests/unit/common/ovn/test_hash_ring_manager.py b/neutron/tests/unit/common/ovn/test_hash_ring_manager.py index 482dba57d3c..12699ce40cf 100644 --- a/neutron/tests/unit/common/ovn/test_hash_ring_manager.py +++ b/neutron/tests/unit/common/ovn/test_hash_ring_manager.py @@ -33,8 +33,9 @@ class TestHashRingManager(testlib_api.SqlTestCaseLight): def setUp(self): super(TestHashRingManager, self).setUp() + init_time = timeutils.utcnow() self.hash_ring_manager = hash_ring_manager.HashRingManager( - HASH_RING_TEST_GROUP) + HASH_RING_TEST_GROUP, init_time) self.admin_ctx = context.get_admin_context() def _verify_hashes(self, hash_dict): diff --git a/neutron/tests/unit/db/test_ovn_hash_ring_db.py b/neutron/tests/unit/db/test_ovn_hash_ring_db.py index 477ee5d560b..39d05946db9 100644 --- a/neutron/tests/unit/db/test_ovn_hash_ring_db.py +++ b/neutron/tests/unit/db/test_ovn_hash_ring_db.py @@ -121,6 +121,7 @@ class TestHashRing(testlib_api.SqlTestCaseLight): self.assertEqual(node_db.created_at, node_db.updated_at) def test_active_nodes(self): + created_at = timeutils.utcnow() self._add_nodes_and_assert_exists(count=3) # Add another node from a different host @@ -130,7 +131,8 @@ class TestHashRing(testlib_api.SqlTestCaseLight): # Assert all nodes are active (within 60 seconds) self.assertEqual(4, len(ovn_hash_ring_db.get_active_nodes( - self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP))) + self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP, + created_at=created_at))) # Subtract 60 seconds from utcnow() and touch the nodes from our host time.sleep(1) @@ -143,11 +145,13 @@ class TestHashRing(testlib_api.SqlTestCaseLight): # Now assert that all nodes from our host are seeing as offline. # Only the node from another host should be active active_nodes = ovn_hash_ring_db.get_active_nodes( - self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP) + self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP, + created_at=created_at) self.assertEqual(1, len(active_nodes)) self.assertEqual(another_host_node, active_nodes[0].node_uuid) def test_active_nodes_from_host(self): + created_at = timeutils.utcnow() self._add_nodes_and_assert_exists(count=3) # Add another node from a different host @@ -159,7 +163,7 @@ class TestHashRing(testlib_api.SqlTestCaseLight): # Assert only the 3 nodes from this host is returned active_nodes = ovn_hash_ring_db.get_active_nodes( self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP, - from_host=True) + from_host=True, created_at=created_at) self.assertEqual(3, len(active_nodes)) self.assertNotIn(another_host_id, active_nodes) @@ -185,18 +189,21 @@ class TestHashRing(testlib_api.SqlTestCaseLight): self.assertEqual(node_db.created_at, node_db.updated_at) def test_active_nodes_different_groups(self): + created_at = timeutils.utcnow() another_group = 'another_test_group' self._add_nodes_and_assert_exists(count=3) self._add_nodes_and_assert_exists(count=2, group_name=another_group) active_nodes = ovn_hash_ring_db.get_active_nodes( - self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP) + self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP, + created_at=created_at) self.assertEqual(3, len(active_nodes)) for node in active_nodes: self.assertEqual(HASH_RING_TEST_GROUP, node.group_name) active_nodes = ovn_hash_ring_db.get_active_nodes( - self.admin_ctx, interval=60, group_name=another_group) + self.admin_ctx, interval=60, group_name=another_group, + created_at=created_at) self.assertEqual(2, len(active_nodes)) for node in active_nodes: self.assertEqual(another_group, node.group_name) diff --git a/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_maintenance.py b/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_maintenance.py index 508b5c1880f..2941f09226a 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_maintenance.py +++ b/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_maintenance.py @@ -13,17 +13,22 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime from unittest import mock from futurist import periodics from neutron_lib import context from neutron_lib.db import api as db_api from oslo_config import cfg +from oslo_utils import timeutils from oslo_utils import uuidutils from neutron.common.ovn import constants from neutron.common.ovn import utils +from neutron.conf import common as common_conf from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf +from neutron.db.models import ovn as ovn_models +from neutron.db import ovn_hash_ring_db from neutron.db import ovn_revision_numbers_db from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import maintenance from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync @@ -705,3 +710,49 @@ class TestDBInconsistenciesPeriodics(testlib_api.SqlTestCaseLight, expected_calls = [mock.call('Logical_Switch_Port', lsp0.uuid, ('type', constants.LSP_TYPE_VIRTUAL))] nb_idl.db_set.assert_has_calls(expected_calls) + + +class TestHashRingHealthCheckPeriodics(testlib_api.SqlTestCaseLight): + + def setUp(self): + super().setUp() + common_conf.register_core_common_config_opts() + self.ctx = context.get_admin_context() + self.group = uuidutils.generate_uuid() + self.created_time = timeutils.utcnow() + self.hr_check_periodics = maintenance.HashRingHealthCheckPeriodics( + self.group, self.created_time) + + def test_clean_up_hash_ring_nodes(self): + num_nodes = 10 + utc_zero = datetime.datetime.fromtimestamp(0) + # This loop will create "ovn_hash_ring" registers from + # utcnow - (num_nodes/2) to utcnow + (num_nodes/2) - 1. That means + # we'll have old/stale registers and new registers. + for idx in range(- int(num_nodes / 2), int(num_nodes / 2)): + _uuid = ovn_hash_ring_db.add_node(self.ctx, self.group) + new_time = self.created_time + datetime.timedelta(seconds=idx) + with db_api.CONTEXT_WRITER.using(self.ctx): + self.ctx.session.query(ovn_models.OVNHashRing).filter( + ovn_models.OVNHashRing.node_uuid == _uuid).update( + {'updated_at': new_time, 'created_at': new_time}) + + all_nodes = ovn_hash_ring_db.get_active_nodes( + self.ctx, 10, self.group, utc_zero) + # "num_nodes" registers created + self.assertEqual(num_nodes, len(all_nodes)) + # Only "num_nodes/2" registers are active (created_at is updated) + active_nodes = ovn_hash_ring_db.get_active_nodes( + self.ctx, 10, self.group, self.created_time) + self.assertEqual(int(num_nodes / 2), len(active_nodes)) + + self.assertRaises(periodics.NeverAgain, + self.hr_check_periodics.clean_up_hash_ring_nodes) + all_nodes = ovn_hash_ring_db.get_active_nodes( + self.ctx, 10, self.group, utc_zero) + # Only active registers remain in the table. + self.assertEqual(int(num_nodes / 2), len(all_nodes)) + # Only active registers remain in the table. + active_nodes = ovn_hash_ring_db.get_active_nodes( + self.ctx, 10, self.group, self.created_time) + self.assertEqual(int(num_nodes / 2), len(active_nodes))