Move the "ovn_hash_ring" clean up to maintenance worker
The "ovn_hash_ring" procedure to clean up the stale/old registers is now executed on the ``HashRingHealthCheckPeriodics`` class, tha is executed on the ``MaintenanceWorker`` process. In a HA scenario, if several servers are rebooted at the same time, the "ovn_hash_ring" clean up operation can clash with API worker method "_load_hash_ring", that executed a SQL read from this table. In some high loaded environments, if the OVN database takes time to be locally cached, this read operation is executed thousand of times; basically any time an OVN database event occurs. In order to avoid/skip a deadlock when deleting the "ovn_hash_ring" table, this clean up is executed in a periodic task. If this task succeeds, the task is stopped. If the task raises a database exception, it is processed again. Now the "ovn_hash_ring" registers are retrieved using the "created_at" time as a filter. The initial time is taken when the OVN mechanism driver is initilized, before any API worker is spawned and any new "ovn_hash_ring" register has been created (an API worker, when started, will create a new "ovn_hash_ring" register). Any stale/old register stored in this table will be ignored; that means any register created before the OVN mechanism driver was started. Closes-Bug: #1990174 Change-Id: I07c4cb6e20b8a84e4ace7a8e34555aced5b5da9f
This commit is contained in:
parent
e9b5a27001
commit
819a1bb3e6
@ -285,6 +285,9 @@ LB_EXT_IDS_VIP_PORT_ID_KEY = 'neutron:vip_port_id'
|
||||
HASH_RING_NODES_TIMEOUT = 60
|
||||
HASH_RING_TOUCH_INTERVAL = 30
|
||||
HASH_RING_CACHE_TIMEOUT = 30
|
||||
# NOTE(ralonsoh): the OVN hash ring clean up should not match with the node
|
||||
# touch process, to avoid any database lock.
|
||||
HASH_RING_CLEANUP_INTERVAL = int(HASH_RING_TOUCH_INTERVAL * 1.5)
|
||||
HASH_RING_ML2_GROUP = 'mechanism_driver'
|
||||
|
||||
# Maximum chassis count where a gateway port can be hosted
|
||||
|
@ -30,13 +30,14 @@ LOG = log.getLogger(__name__)
|
||||
|
||||
class HashRingManager(object):
|
||||
|
||||
def __init__(self, group_name):
|
||||
def __init__(self, group_name, init_time):
|
||||
self._hash_ring = None
|
||||
self._last_time_loaded = None
|
||||
self._check_hashring_startup = True
|
||||
self._group = group_name
|
||||
# Flag to rate limit the caching log
|
||||
self._prev_num_nodes = -1
|
||||
self._init_time = init_time
|
||||
self.admin_ctx = context.get_admin_context()
|
||||
|
||||
@property
|
||||
@ -56,7 +57,8 @@ class HashRingManager(object):
|
||||
api_workers = service._get_api_workers()
|
||||
nodes = db_hash_ring.get_active_nodes(
|
||||
self.admin_ctx,
|
||||
constants.HASH_RING_CACHE_TIMEOUT, self._group, from_host=True)
|
||||
constants.HASH_RING_CACHE_TIMEOUT, self._group, self._init_time,
|
||||
from_host=True)
|
||||
|
||||
num_nodes = len(nodes)
|
||||
if num_nodes >= api_workers:
|
||||
@ -87,8 +89,8 @@ class HashRingManager(object):
|
||||
not self._hash_ring.nodes or
|
||||
cache_timeout >= self._last_time_loaded):
|
||||
nodes = db_hash_ring.get_active_nodes(
|
||||
self.admin_ctx,
|
||||
constants.HASH_RING_NODES_TIMEOUT, self._group)
|
||||
self.admin_ctx, constants.HASH_RING_NODES_TIMEOUT,
|
||||
self._group, self._init_time)
|
||||
self._hash_ring = hashring.HashRing({node.node_uuid
|
||||
for node in nodes})
|
||||
self._last_time_loaded = timeutils.utcnow()
|
||||
|
@ -37,11 +37,19 @@ def add_node(context, group_name, node_uuid=None):
|
||||
return node_uuid
|
||||
|
||||
|
||||
def remove_nodes_from_host(context, group_name):
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
context.session.query(ovn_models.OVNHashRing).filter(
|
||||
ovn_models.OVNHashRing.hostname == CONF.host,
|
||||
ovn_models.OVNHashRing.group_name == group_name).delete()
|
||||
@db_api.CONTEXT_WRITER
|
||||
def remove_nodes_from_host(context, group_name, created_before=None):
|
||||
query = context.session.query(ovn_models.OVNHashRing).filter(
|
||||
ovn_models.OVNHashRing.hostname == CONF.host,
|
||||
ovn_models.OVNHashRing.group_name == group_name)
|
||||
if created_before:
|
||||
query = query.filter(
|
||||
ovn_models.OVNHashRing.created_at < created_before)
|
||||
# NOTE(ralonsoh): with "synchronize_session=False", the SQL action will
|
||||
# be performed after the transaction commit. However, SQLAlchemy won't
|
||||
# try to find those registers in the active session and won't update
|
||||
# it after the deletion. This is the most efficient execution.
|
||||
query.delete(synchronize_session=False)
|
||||
|
||||
|
||||
def _touch(context, **filter_args):
|
||||
@ -58,10 +66,12 @@ def touch_node(context, node_uuid):
|
||||
_touch(context, node_uuid=node_uuid)
|
||||
|
||||
|
||||
def get_active_nodes(context, interval, group_name, from_host=False):
|
||||
def get_active_nodes(context, interval, group_name, created_at,
|
||||
from_host=False):
|
||||
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
|
||||
with db_api.CONTEXT_READER.using(context):
|
||||
query = context.session.query(ovn_models.OVNHashRing).filter(
|
||||
ovn_models.OVNHashRing.created_at >= created_at,
|
||||
ovn_models.OVNHashRing.updated_at >= limit,
|
||||
ovn_models.OVNHashRing.group_name == group_name)
|
||||
if from_host:
|
||||
|
@ -120,12 +120,9 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
self._maintenance_thread = None
|
||||
self.node_uuid = None
|
||||
self.hash_ring_group = ovn_const.HASH_RING_ML2_GROUP
|
||||
self.init_time = timeutils.utcnow()
|
||||
self.sg_enabled = ovn_acl.is_sg_enabled()
|
||||
# NOTE(lucasagomes): _clean_hash_ring() must be called before
|
||||
# self.subscribe() to avoid processes racing when adding or
|
||||
# deleting nodes from the Hash Ring during service initialization
|
||||
ovn_conf.register_opts()
|
||||
self._clean_hash_ring()
|
||||
self._post_fork_event = threading.Event()
|
||||
if cfg.CONF.SECURITYGROUP.firewall_driver:
|
||||
LOG.warning('Firewall driver configuration is ignored')
|
||||
@ -394,7 +391,7 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
maintenance.DBInconsistenciesPeriodics(self._ovn_client))
|
||||
self._maintenance_thread.add_periodics(
|
||||
maintenance.HashRingHealthCheckPeriodics(
|
||||
self.hash_ring_group))
|
||||
self.hash_ring_group, self.init_time))
|
||||
self._maintenance_thread.start()
|
||||
|
||||
def _create_security_group_precommit(self, resource, event, trigger,
|
||||
|
@ -26,6 +26,7 @@ from neutron_lib import constants as n_const
|
||||
from neutron_lib import context as n_context
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
from ovsdbapp.backend.ovs_idl import event as row_event
|
||||
@ -821,8 +822,9 @@ class DBInconsistenciesPeriodics(SchemaAwarePeriodicsBase):
|
||||
|
||||
class HashRingHealthCheckPeriodics(object):
|
||||
|
||||
def __init__(self, group):
|
||||
def __init__(self, group, created_time):
|
||||
self._group = group
|
||||
self._created_time = created_time
|
||||
self.ctx = n_context.get_admin_context()
|
||||
|
||||
@periodics.periodic(spacing=ovn_const.HASH_RING_TOUCH_INTERVAL)
|
||||
@ -831,3 +833,16 @@ class HashRingHealthCheckPeriodics(object):
|
||||
# here because we want the maintenance tasks from each instance to
|
||||
# execute this task.
|
||||
hash_ring_db.touch_nodes_from_host(self.ctx, self._group)
|
||||
|
||||
@periodics.periodic(spacing=ovn_const.HASH_RING_CLEANUP_INTERVAL)
|
||||
def clean_up_hash_ring_nodes(self):
|
||||
try:
|
||||
hash_ring_db.remove_nodes_from_host(
|
||||
self.ctx, self._group, created_before=self._created_time)
|
||||
except db_exc.DBError as exc:
|
||||
LOG.info('The "ovn_hash_ring" table was not cleaned; the '
|
||||
'operation will be retried. Error: %s',
|
||||
str(exc))
|
||||
return
|
||||
|
||||
raise periodics.NeverAgain()
|
||||
|
@ -688,7 +688,7 @@ class OvnIdlDistributedLock(BaseOvnIdl):
|
||||
self.notify_handler = OvnDbNotifyHandler(driver)
|
||||
self._node_uuid = self.driver.node_uuid
|
||||
self._hash_ring = hash_ring_manager.HashRingManager(
|
||||
self.driver.hash_ring_group)
|
||||
self.driver.hash_ring_group, self.driver.init_time)
|
||||
self._last_touch = None
|
||||
# This is a map of tables that may be new after OVN database is updated
|
||||
self._tables_to_register = {
|
||||
|
@ -360,9 +360,6 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
|
||||
trigger_cls.trigger.__self__.__class__ = neutron.wsgi.WorkerService
|
||||
|
||||
self.addCleanup(self.stop)
|
||||
# NOTE(ralonsoh): do not access to the DB at exit when the SQL
|
||||
# connection is already closed, to avoid useless exception messages.
|
||||
mock.patch.object(self.mech_driver, '_clean_hash_ring').start()
|
||||
self.mech_driver.pre_fork_initialize(
|
||||
mock.ANY, mock.ANY, trigger_cls.trigger)
|
||||
|
||||
|
@ -237,6 +237,7 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
||||
def _create_workers(self, row_event, worker_num):
|
||||
self.mech_driver.nb_ovn.idl.notify_handler.watch_event(row_event)
|
||||
worker_list = [self.mech_driver.nb_ovn]
|
||||
init_time = timeutils.utcnow()
|
||||
|
||||
# Create 10 fake workers
|
||||
for _ in range(worker_num):
|
||||
@ -245,7 +246,8 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
||||
self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid)
|
||||
fake_driver = mock.MagicMock(
|
||||
node_uuid=node_uuid,
|
||||
hash_ring_group=ovn_const.HASH_RING_ML2_GROUP)
|
||||
hash_ring_group=ovn_const.HASH_RING_ML2_GROUP,
|
||||
init_time=init_time)
|
||||
_idl = ovsdb_monitor.OvnNbIdl.from_server(
|
||||
self.ovsdb_server_mgr.get_ovsdb_connection_path(),
|
||||
self.nb_api.schema_helper, fake_driver)
|
||||
@ -265,7 +267,8 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
|
||||
len(db_hash_ring.get_active_nodes(
|
||||
self.context,
|
||||
interval=ovn_const.HASH_RING_NODES_TIMEOUT,
|
||||
group_name=ovn_const.HASH_RING_ML2_GROUP)))
|
||||
group_name=ovn_const.HASH_RING_ML2_GROUP,
|
||||
created_at=self.mech_driver.init_time)))
|
||||
|
||||
return worker_list
|
||||
|
||||
|
@ -33,8 +33,9 @@ class TestHashRingManager(testlib_api.SqlTestCaseLight):
|
||||
|
||||
def setUp(self):
|
||||
super(TestHashRingManager, self).setUp()
|
||||
init_time = timeutils.utcnow()
|
||||
self.hash_ring_manager = hash_ring_manager.HashRingManager(
|
||||
HASH_RING_TEST_GROUP)
|
||||
HASH_RING_TEST_GROUP, init_time)
|
||||
self.admin_ctx = context.get_admin_context()
|
||||
|
||||
def _verify_hashes(self, hash_dict):
|
||||
|
@ -121,6 +121,7 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
|
||||
self.assertEqual(node_db.created_at, node_db.updated_at)
|
||||
|
||||
def test_active_nodes(self):
|
||||
created_at = timeutils.utcnow()
|
||||
self._add_nodes_and_assert_exists(count=3)
|
||||
|
||||
# Add another node from a different host
|
||||
@ -130,7 +131,8 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
|
||||
|
||||
# Assert all nodes are active (within 60 seconds)
|
||||
self.assertEqual(4, len(ovn_hash_ring_db.get_active_nodes(
|
||||
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP)))
|
||||
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP,
|
||||
created_at=created_at)))
|
||||
|
||||
# Subtract 60 seconds from utcnow() and touch the nodes from our host
|
||||
time.sleep(1)
|
||||
@ -143,11 +145,13 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
|
||||
# Now assert that all nodes from our host are seeing as offline.
|
||||
# Only the node from another host should be active
|
||||
active_nodes = ovn_hash_ring_db.get_active_nodes(
|
||||
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP)
|
||||
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP,
|
||||
created_at=created_at)
|
||||
self.assertEqual(1, len(active_nodes))
|
||||
self.assertEqual(another_host_node, active_nodes[0].node_uuid)
|
||||
|
||||
def test_active_nodes_from_host(self):
|
||||
created_at = timeutils.utcnow()
|
||||
self._add_nodes_and_assert_exists(count=3)
|
||||
|
||||
# Add another node from a different host
|
||||
@ -159,7 +163,7 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
|
||||
# Assert only the 3 nodes from this host is returned
|
||||
active_nodes = ovn_hash_ring_db.get_active_nodes(
|
||||
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP,
|
||||
from_host=True)
|
||||
from_host=True, created_at=created_at)
|
||||
self.assertEqual(3, len(active_nodes))
|
||||
self.assertNotIn(another_host_id, active_nodes)
|
||||
|
||||
@ -185,18 +189,21 @@ class TestHashRing(testlib_api.SqlTestCaseLight):
|
||||
self.assertEqual(node_db.created_at, node_db.updated_at)
|
||||
|
||||
def test_active_nodes_different_groups(self):
|
||||
created_at = timeutils.utcnow()
|
||||
another_group = 'another_test_group'
|
||||
self._add_nodes_and_assert_exists(count=3)
|
||||
self._add_nodes_and_assert_exists(count=2, group_name=another_group)
|
||||
|
||||
active_nodes = ovn_hash_ring_db.get_active_nodes(
|
||||
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP)
|
||||
self.admin_ctx, interval=60, group_name=HASH_RING_TEST_GROUP,
|
||||
created_at=created_at)
|
||||
self.assertEqual(3, len(active_nodes))
|
||||
for node in active_nodes:
|
||||
self.assertEqual(HASH_RING_TEST_GROUP, node.group_name)
|
||||
|
||||
active_nodes = ovn_hash_ring_db.get_active_nodes(
|
||||
self.admin_ctx, interval=60, group_name=another_group)
|
||||
self.admin_ctx, interval=60, group_name=another_group,
|
||||
created_at=created_at)
|
||||
self.assertEqual(2, len(active_nodes))
|
||||
for node in active_nodes:
|
||||
self.assertEqual(another_group, node.group_name)
|
||||
|
@ -13,17 +13,22 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
from unittest import mock
|
||||
|
||||
from futurist import periodics
|
||||
from neutron_lib import context
|
||||
from neutron_lib.db import api as db_api
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.common.ovn import constants
|
||||
from neutron.common.ovn import utils
|
||||
from neutron.conf import common as common_conf
|
||||
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
|
||||
from neutron.db.models import ovn as ovn_models
|
||||
from neutron.db import ovn_hash_ring_db
|
||||
from neutron.db import ovn_revision_numbers_db
|
||||
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import maintenance
|
||||
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync
|
||||
@ -705,3 +710,49 @@ class TestDBInconsistenciesPeriodics(testlib_api.SqlTestCaseLight,
|
||||
expected_calls = [mock.call('Logical_Switch_Port', lsp0.uuid,
|
||||
('type', constants.LSP_TYPE_VIRTUAL))]
|
||||
nb_idl.db_set.assert_has_calls(expected_calls)
|
||||
|
||||
|
||||
class TestHashRingHealthCheckPeriodics(testlib_api.SqlTestCaseLight):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
common_conf.register_core_common_config_opts()
|
||||
self.ctx = context.get_admin_context()
|
||||
self.group = uuidutils.generate_uuid()
|
||||
self.created_time = timeutils.utcnow()
|
||||
self.hr_check_periodics = maintenance.HashRingHealthCheckPeriodics(
|
||||
self.group, self.created_time)
|
||||
|
||||
def test_clean_up_hash_ring_nodes(self):
|
||||
num_nodes = 10
|
||||
utc_zero = datetime.datetime.fromtimestamp(0)
|
||||
# This loop will create "ovn_hash_ring" registers from
|
||||
# utcnow - (num_nodes/2) to utcnow + (num_nodes/2) - 1. That means
|
||||
# we'll have old/stale registers and new registers.
|
||||
for idx in range(- int(num_nodes / 2), int(num_nodes / 2)):
|
||||
_uuid = ovn_hash_ring_db.add_node(self.ctx, self.group)
|
||||
new_time = self.created_time + datetime.timedelta(seconds=idx)
|
||||
with db_api.CONTEXT_WRITER.using(self.ctx):
|
||||
self.ctx.session.query(ovn_models.OVNHashRing).filter(
|
||||
ovn_models.OVNHashRing.node_uuid == _uuid).update(
|
||||
{'updated_at': new_time, 'created_at': new_time})
|
||||
|
||||
all_nodes = ovn_hash_ring_db.get_active_nodes(
|
||||
self.ctx, 10, self.group, utc_zero)
|
||||
# "num_nodes" registers created
|
||||
self.assertEqual(num_nodes, len(all_nodes))
|
||||
# Only "num_nodes/2" registers are active (created_at is updated)
|
||||
active_nodes = ovn_hash_ring_db.get_active_nodes(
|
||||
self.ctx, 10, self.group, self.created_time)
|
||||
self.assertEqual(int(num_nodes / 2), len(active_nodes))
|
||||
|
||||
self.assertRaises(periodics.NeverAgain,
|
||||
self.hr_check_periodics.clean_up_hash_ring_nodes)
|
||||
all_nodes = ovn_hash_ring_db.get_active_nodes(
|
||||
self.ctx, 10, self.group, utc_zero)
|
||||
# Only active registers remain in the table.
|
||||
self.assertEqual(int(num_nodes / 2), len(all_nodes))
|
||||
# Only active registers remain in the table.
|
||||
active_nodes = ovn_hash_ring_db.get_active_nodes(
|
||||
self.ctx, 10, self.group, self.created_time)
|
||||
self.assertEqual(int(num_nodes / 2), len(active_nodes))
|
||||
|
Loading…
Reference in New Issue
Block a user