[OVN] Improve initial hash ring setup
When using WSGI module, the multiprocess event is not shared between the WSGI workers. This new implementation retrieves the WSGI start time to define the OVN hash ring register creation time. That will be used to filter out the stale registers. Depends-On: https://review.opendev.org/c/openstack/devstack/+/936669 Closes-Bug: #2083570 Change-Id: Id9f851f33c2cb3d2c2759a3c66adf2599a3122fe
This commit is contained in:
parent
bedb19bb22
commit
865097c689
@ -46,6 +46,7 @@ Create a ``/etc/neutron/neutron-api-uwsgi.ini`` file with the content below:
|
||||
master = true
|
||||
processes = 2
|
||||
wsgi-file = <path-to-neutron-bin-dir>/neutron-api
|
||||
start-time = %t
|
||||
|
||||
.. end
|
||||
|
||||
@ -160,3 +161,9 @@ in processing agents heartbeats.
|
||||
If OVN ML2 plugin is used without any additional agents, neutron requires
|
||||
no worker for RPC message processing. Set both rpc_workers and
|
||||
rpc_state_report_workers to 0, to disable RPC workers.
|
||||
|
||||
.. note::
|
||||
ML2/OVN uses the ``[uwsgi]start-time = %t`` parameter to create the OVN hash
|
||||
ring registers during the initialization process. This value is populated
|
||||
by the uWSGi process with the start time. For more information, check
|
||||
`Configuring uWSGI <https://uwsgi-docs.readthedocs.io/en/latest/Configuration.html>_`.
|
||||
|
@ -17,7 +17,7 @@
|
||||
# when needed.
|
||||
|
||||
"""Utilities and helper functions."""
|
||||
|
||||
import datetime
|
||||
import functools
|
||||
import hashlib
|
||||
import hmac
|
||||
@ -1114,3 +1114,8 @@ def read_file(path: str) -> str:
|
||||
return file.read()
|
||||
except FileNotFoundError:
|
||||
return ''
|
||||
|
||||
|
||||
def ts_to_datetime(timestamp):
|
||||
"""Converts timestamp (in seconds) to datetime"""
|
||||
return datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
|
||||
|
32
neutron/common/wsgi_utils.py
Normal file
32
neutron/common/wsgi_utils.py
Normal file
@ -0,0 +1,32 @@
|
||||
# Copyright (c) 2024 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
def get_start_time():
|
||||
"""Return the 'start-time=%t' config varible in the WSGI config
|
||||
|
||||
This variable contains the start time of the WSGI server. Check
|
||||
https://uwsgi-docs.readthedocs.io/en/latest/Configuration.html
|
||||
#magic-variables
|
||||
"""
|
||||
try:
|
||||
# pylint: disable=import-outside-toplevel
|
||||
import uwsgi
|
||||
start_time = uwsgi.opt.get('start-time')
|
||||
if not start_time:
|
||||
return
|
||||
return int(start_time.decode(encoding='utf-8'))
|
||||
except ImportError:
|
||||
return
|
@ -30,26 +30,48 @@ LOG = log.getLogger(__name__)
|
||||
# NOTE(ralonsoh): this was migrated from networking-ovn to neutron and should
|
||||
# be refactored to be integrated in a OVO.
|
||||
@db_api.retry_if_session_inactive()
|
||||
def add_node(context, group_name, node_uuid=None):
|
||||
def add_node(context, group_name, node_uuid=None, created_at=None):
|
||||
if node_uuid is None:
|
||||
node_uuid = uuidutils.generate_uuid()
|
||||
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
context.session.add(ovn_models.OVNHashRing(
|
||||
node_uuid=node_uuid, hostname=CONF.host, group_name=group_name))
|
||||
kwargs = {'node_uuid': node_uuid,
|
||||
'hostname': CONF.host,
|
||||
'group_name': group_name}
|
||||
if created_at:
|
||||
kwargs['created_at'] = created_at
|
||||
context.session.add(ovn_models.OVNHashRing(**kwargs))
|
||||
LOG.info('Node %s from host "%s" and group "%s" added to the Hash Ring',
|
||||
node_uuid, CONF.host, group_name)
|
||||
return node_uuid
|
||||
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def remove_nodes_from_host(context, group_name):
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
context.session.query(ovn_models.OVNHashRing).filter(
|
||||
@db_api.CONTEXT_READER
|
||||
def get_nodes(context, group_name, created_at=None):
|
||||
query = context.session.query(ovn_models.OVNHashRing).filter(
|
||||
ovn_models.OVNHashRing.group_name == group_name)
|
||||
if created_at:
|
||||
query = query.filter(
|
||||
ovn_models.OVNHashRing.created_at == created_at)
|
||||
return query.all()
|
||||
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def remove_nodes_from_host(context, group_name, created_at=None):
|
||||
with (db_api.CONTEXT_WRITER.using(context)):
|
||||
query = context.session.query(ovn_models.OVNHashRing).filter(
|
||||
ovn_models.OVNHashRing.hostname == CONF.host,
|
||||
ovn_models.OVNHashRing.group_name == group_name).delete()
|
||||
LOG.info('Nodes from host "%s" and group "%s" removed from the Hash Ring',
|
||||
CONF.host, group_name)
|
||||
ovn_models.OVNHashRing.group_name == group_name)
|
||||
if created_at:
|
||||
query = query.filter(
|
||||
ovn_models.OVNHashRing.created_at != created_at)
|
||||
query.delete()
|
||||
msg = ('Nodes from host "%s" and group "%s" removed from the Hash Ring' %
|
||||
(CONF.host, group_name))
|
||||
if created_at:
|
||||
msg += ' created at %s' % str(created_at)
|
||||
LOG.info(msg)
|
||||
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
|
@ -31,13 +31,13 @@ from neutron_lib.callbacks import registry
|
||||
from neutron_lib.callbacks import resources
|
||||
from neutron_lib import constants as const
|
||||
from neutron_lib import context as n_context
|
||||
from neutron_lib.db import api as db_api
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from neutron_lib.exceptions import availability_zone as az_exc
|
||||
from neutron_lib.placement import utils as place_utils
|
||||
from neutron_lib.plugins import directory
|
||||
from neutron_lib.plugins.ml2 import api
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as os_db_exc
|
||||
from oslo_log import log
|
||||
@ -52,6 +52,8 @@ from neutron.common.ovn import constants as ovn_const
|
||||
from neutron.common.ovn import exceptions as ovn_exceptions
|
||||
from neutron.common.ovn import extensions as ovn_extensions
|
||||
from neutron.common.ovn import utils as ovn_utils
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.common import wsgi_utils
|
||||
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
|
||||
from neutron.db import ovn_hash_ring_db
|
||||
from neutron.db import ovn_revision_numbers_db
|
||||
@ -122,6 +124,10 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
self._maintenance_thread = None
|
||||
self._hash_ring_thread = None
|
||||
self._hash_ring_probe_event = multiprocessing.Event()
|
||||
self._start_time = wsgi_utils.get_start_time()
|
||||
if self._start_time:
|
||||
LOG.info('Server start time: %s',
|
||||
str(n_utils.ts_to_datetime(self._start_time)))
|
||||
self.node_uuid = None
|
||||
self.hash_ring_group = ovn_const.HASH_RING_ML2_GROUP
|
||||
self.sg_enabled = ovn_acl.is_sg_enabled()
|
||||
@ -308,37 +314,64 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
worker.MaintenanceWorker,
|
||||
service.RpcWorker)
|
||||
|
||||
@lockutils.synchronized('hash_ring_probe_lock', external=True)
|
||||
def _setup_hash_ring(self):
|
||||
"""Setup the hash ring.
|
||||
|
||||
The first worker to acquire the lock is responsible for cleaning
|
||||
the hash ring from previous runs as well as start the probing
|
||||
thread for this host. Subsequently workers just need to register
|
||||
themselves to the hash ring.
|
||||
The first worker to execute this method will remove the hash ring from
|
||||
previous runs as well as start the probing thread for this host.
|
||||
Subsequently workers just need to register themselves to the hash ring.
|
||||
"""
|
||||
# Attempt to remove the node from the ring when the worker stops
|
||||
sh = oslo_service.SignalHandler()
|
||||
atexit.register(self._remove_node_from_hash_ring)
|
||||
sh.add_handler("SIGTERM", self._remove_node_from_hash_ring)
|
||||
|
||||
if self._start_time:
|
||||
self._setup_hash_ring_start_time()
|
||||
else:
|
||||
self._setup_hash_ring_event()
|
||||
|
||||
def _register_hash_ring_maintenance(self):
|
||||
self._hash_ring_thread = maintenance.MaintenanceThread()
|
||||
self._hash_ring_thread.add_periodics(
|
||||
maintenance.HashRingHealthCheckPeriodics(
|
||||
self.hash_ring_group))
|
||||
self._hash_ring_thread.start()
|
||||
LOG.info('Hash Ring probing thread has started')
|
||||
|
||||
def _setup_hash_ring_event(self):
|
||||
LOG.debug('Hash Ring setup using multiprocess event lock')
|
||||
admin_context = n_context.get_admin_context()
|
||||
if not self._hash_ring_probe_event.is_set():
|
||||
# Clear existing entries
|
||||
# Clear existing entries. This code section should be executed
|
||||
# only once per node (chassis); the multiprocess event should be
|
||||
# set just after the ``is_set`` check.
|
||||
self._hash_ring_probe_event.set()
|
||||
ovn_hash_ring_db.remove_nodes_from_host(admin_context,
|
||||
self.hash_ring_group)
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
|
||||
self.hash_ring_group)
|
||||
self._hash_ring_thread = maintenance.MaintenanceThread()
|
||||
self._hash_ring_thread.add_periodics(
|
||||
maintenance.HashRingHealthCheckPeriodics(
|
||||
self.hash_ring_group))
|
||||
self._hash_ring_thread.start()
|
||||
LOG.info("Hash Ring probing thread has started")
|
||||
self._hash_ring_probe_event.set()
|
||||
else:
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
|
||||
self.hash_ring_group)
|
||||
self._register_hash_ring_maintenance()
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
|
||||
self.hash_ring_group)
|
||||
|
||||
def _setup_hash_ring_start_time(self):
|
||||
LOG.debug('Hash Ring setup using WSGI start time')
|
||||
admin_context = n_context.get_admin_context()
|
||||
with db_api.CONTEXT_WRITER.using(admin_context):
|
||||
# Delete all node registers without created_at=self._start_time
|
||||
created_at = n_utils.ts_to_datetime(self._start_time)
|
||||
ovn_hash_ring_db.remove_nodes_from_host(
|
||||
admin_context, self.hash_ring_group, created_at=created_at)
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(
|
||||
admin_context, self.hash_ring_group, created_at=created_at)
|
||||
newer_nodes = ovn_hash_ring_db.get_nodes(
|
||||
admin_context, self.hash_ring_group, created_at=created_at)
|
||||
LOG.debug('Hash Ring setup, this worker has detected %s OVN hash'
|
||||
'ring registers in the database', len(newer_nodes))
|
||||
|
||||
if len(newer_nodes) == 1:
|
||||
# If only one register per host is present, that means this worker
|
||||
# is the first one to register itself.
|
||||
self._register_hash_ring_maintenance()
|
||||
|
||||
def post_fork_initialize(self, resource, event, trigger, payload=None):
|
||||
# Initialize API/Maintenance workers with OVN IDL connections
|
||||
|
@ -23,8 +23,10 @@ import netaddr
|
||||
|
||||
from neutron_lib.api.definitions import portbindings
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.db import api as db_api
|
||||
from neutron_lib.exceptions import agent as agent_exc
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
from ovsdbapp.backend.ovs_idl import event
|
||||
|
||||
@ -32,6 +34,7 @@ from neutron.common.ovn import constants as ovn_const
|
||||
from neutron.common.ovn import utils
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
|
||||
from neutron.db import ovn_hash_ring_db
|
||||
from neutron.db import ovn_revision_numbers_db as db_rev
|
||||
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
|
||||
from neutron.tests import base as tests_base
|
||||
@ -57,6 +60,40 @@ VHOSTUSER_VIF_DETAILS = {
|
||||
}
|
||||
|
||||
|
||||
class TestOVNMechanismDriver(base.TestOVNFunctionalBase):
|
||||
|
||||
def test__setup_hash_ring_start_time(self):
|
||||
# Create a differentiated OVN hash ring name.
|
||||
ring_group = uuidutils.generate_uuid()
|
||||
self.mech_driver.hash_ring_group = ring_group
|
||||
|
||||
# Create several OVN hash registers left by a previous execution.
|
||||
created_at = timeutils.utcnow() - datetime.timedelta(1)
|
||||
with db_api.CONTEXT_WRITER.using(self.context):
|
||||
for _ in range(3):
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(
|
||||
self.context, ring_group, created_at=created_at)
|
||||
|
||||
# Check the existing OVN hash ring registers.
|
||||
ovn_hrs = ovn_hash_ring_db.get_nodes(self.context, ring_group)
|
||||
self.assertEqual(3, len(ovn_hrs))
|
||||
|
||||
start_time = timeutils.utcnow()
|
||||
self.mech_driver._start_time = int(start_time.timestamp())
|
||||
with mock.patch.object(self.mech_driver,
|
||||
'_register_hash_ring_maintenance') as \
|
||||
mock_register_maintenance:
|
||||
for _ in range(3):
|
||||
self.mech_driver._setup_hash_ring_start_time()
|
||||
|
||||
ovn_hrs = ovn_hash_ring_db.get_nodes(self.context, ring_group)
|
||||
self.assertEqual(3, len(ovn_hrs))
|
||||
for ovn_hr in ovn_hrs:
|
||||
self.assertEqual(int(start_time.timestamp()),
|
||||
ovn_hr.created_at.timestamp())
|
||||
mock_register_maintenance.assert_called_once()
|
||||
|
||||
|
||||
class TestPortBinding(base.TestOVNFunctionalBase):
|
||||
|
||||
def setUp(self, **kwargs):
|
||||
|
8
releasenotes/notes/wsgi_start-time-101ce9c9a36b8a4f.yaml
Normal file
8
releasenotes/notes/wsgi_start-time-101ce9c9a36b8a4f.yaml
Normal file
@ -0,0 +1,8 @@
|
||||
---
|
||||
other:
|
||||
- |
|
||||
The Neutron API using the WSGI module requires a new configuration
|
||||
parameter: ``[uwsgi]start-time=%t``. The uWSGI process will populate this
|
||||
value when executed, defining the start time of the Neutron API. This value
|
||||
will be used by Neutron ML2/OVN to create the OVN hash ring registers per
|
||||
worker.
|
Loading…
x
Reference in New Issue
Block a user