Merge "[OVN] Improve initial hash ring setup"
This commit is contained in:
commit
79067358f4
@ -46,6 +46,7 @@ Create a ``/etc/neutron/neutron-api-uwsgi.ini`` file with the content below:
|
||||
master = true
|
||||
processes = 2
|
||||
wsgi-file = <path-to-neutron-bin-dir>/neutron-api
|
||||
start-time = %t
|
||||
|
||||
.. end
|
||||
|
||||
@ -160,3 +161,9 @@ in processing agents heartbeats.
|
||||
If OVN ML2 plugin is used without any additional agents, neutron requires
|
||||
no worker for RPC message processing. Set both rpc_workers and
|
||||
rpc_state_report_workers to 0, to disable RPC workers.
|
||||
|
||||
.. note::
|
||||
ML2/OVN uses the ``[uwsgi]start-time = %t`` parameter to create the OVN hash
|
||||
ring registers during the initialization process. This value is populated
|
||||
by the uWSGi process with the start time. For more information, check
|
||||
`Configuring uWSGI <https://uwsgi-docs.readthedocs.io/en/latest/Configuration.html>_`.
|
||||
|
@ -17,7 +17,7 @@
|
||||
# when needed.
|
||||
|
||||
"""Utilities and helper functions."""
|
||||
|
||||
import datetime
|
||||
import functools
|
||||
import hashlib
|
||||
import hmac
|
||||
@ -1114,3 +1114,8 @@ def read_file(path: str) -> str:
|
||||
return file.read()
|
||||
except FileNotFoundError:
|
||||
return ''
|
||||
|
||||
|
||||
def ts_to_datetime(timestamp):
|
||||
"""Converts timestamp (in seconds) to datetime"""
|
||||
return datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
|
||||
|
32
neutron/common/wsgi_utils.py
Normal file
32
neutron/common/wsgi_utils.py
Normal file
@ -0,0 +1,32 @@
|
||||
# Copyright (c) 2024 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
def get_start_time():
|
||||
"""Return the 'start-time=%t' config varible in the WSGI config
|
||||
|
||||
This variable contains the start time of the WSGI server. Check
|
||||
https://uwsgi-docs.readthedocs.io/en/latest/Configuration.html
|
||||
#magic-variables
|
||||
"""
|
||||
try:
|
||||
# pylint: disable=import-outside-toplevel
|
||||
import uwsgi
|
||||
start_time = uwsgi.opt.get('start-time')
|
||||
if not start_time:
|
||||
return
|
||||
return int(start_time.decode(encoding='utf-8'))
|
||||
except ImportError:
|
||||
return
|
@ -30,26 +30,48 @@ LOG = log.getLogger(__name__)
|
||||
# NOTE(ralonsoh): this was migrated from networking-ovn to neutron and should
|
||||
# be refactored to be integrated in a OVO.
|
||||
@db_api.retry_if_session_inactive()
|
||||
def add_node(context, group_name, node_uuid=None):
|
||||
def add_node(context, group_name, node_uuid=None, created_at=None):
|
||||
if node_uuid is None:
|
||||
node_uuid = uuidutils.generate_uuid()
|
||||
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
context.session.add(ovn_models.OVNHashRing(
|
||||
node_uuid=node_uuid, hostname=CONF.host, group_name=group_name))
|
||||
kwargs = {'node_uuid': node_uuid,
|
||||
'hostname': CONF.host,
|
||||
'group_name': group_name}
|
||||
if created_at:
|
||||
kwargs['created_at'] = created_at
|
||||
context.session.add(ovn_models.OVNHashRing(**kwargs))
|
||||
LOG.info('Node %s from host "%s" and group "%s" added to the Hash Ring',
|
||||
node_uuid, CONF.host, group_name)
|
||||
return node_uuid
|
||||
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def remove_nodes_from_host(context, group_name):
|
||||
with db_api.CONTEXT_WRITER.using(context):
|
||||
context.session.query(ovn_models.OVNHashRing).filter(
|
||||
@db_api.CONTEXT_READER
|
||||
def get_nodes(context, group_name, created_at=None):
|
||||
query = context.session.query(ovn_models.OVNHashRing).filter(
|
||||
ovn_models.OVNHashRing.group_name == group_name)
|
||||
if created_at:
|
||||
query = query.filter(
|
||||
ovn_models.OVNHashRing.created_at == created_at)
|
||||
return query.all()
|
||||
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
def remove_nodes_from_host(context, group_name, created_at=None):
|
||||
with (db_api.CONTEXT_WRITER.using(context)):
|
||||
query = context.session.query(ovn_models.OVNHashRing).filter(
|
||||
ovn_models.OVNHashRing.hostname == CONF.host,
|
||||
ovn_models.OVNHashRing.group_name == group_name).delete()
|
||||
LOG.info('Nodes from host "%s" and group "%s" removed from the Hash Ring',
|
||||
CONF.host, group_name)
|
||||
ovn_models.OVNHashRing.group_name == group_name)
|
||||
if created_at:
|
||||
query = query.filter(
|
||||
ovn_models.OVNHashRing.created_at != created_at)
|
||||
query.delete()
|
||||
msg = ('Nodes from host "%s" and group "%s" removed from the Hash Ring' %
|
||||
(CONF.host, group_name))
|
||||
if created_at:
|
||||
msg += ' created at %s' % str(created_at)
|
||||
LOG.info(msg)
|
||||
|
||||
|
||||
@db_api.retry_if_session_inactive()
|
||||
|
@ -31,13 +31,13 @@ from neutron_lib.callbacks import registry
|
||||
from neutron_lib.callbacks import resources
|
||||
from neutron_lib import constants as const
|
||||
from neutron_lib import context as n_context
|
||||
from neutron_lib.db import api as db_api
|
||||
from neutron_lib import exceptions as n_exc
|
||||
from neutron_lib.exceptions import availability_zone as az_exc
|
||||
from neutron_lib.placement import utils as place_utils
|
||||
from neutron_lib.plugins import directory
|
||||
from neutron_lib.plugins.ml2 import api
|
||||
from neutron_lib.utils import helpers
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as os_db_exc
|
||||
from oslo_log import log
|
||||
@ -52,6 +52,8 @@ from neutron.common.ovn import constants as ovn_const
|
||||
from neutron.common.ovn import exceptions as ovn_exceptions
|
||||
from neutron.common.ovn import extensions as ovn_extensions
|
||||
from neutron.common.ovn import utils as ovn_utils
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.common import wsgi_utils
|
||||
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
|
||||
from neutron.db import ovn_hash_ring_db
|
||||
from neutron.db import ovn_revision_numbers_db
|
||||
@ -122,6 +124,10 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
self._maintenance_thread = None
|
||||
self._hash_ring_thread = None
|
||||
self._hash_ring_probe_event = multiprocessing.Event()
|
||||
self._start_time = wsgi_utils.get_start_time()
|
||||
if self._start_time:
|
||||
LOG.info('Server start time: %s',
|
||||
str(n_utils.ts_to_datetime(self._start_time)))
|
||||
self.node_uuid = None
|
||||
self.hash_ring_group = ovn_const.HASH_RING_ML2_GROUP
|
||||
self.sg_enabled = ovn_acl.is_sg_enabled()
|
||||
@ -308,37 +314,64 @@ class OVNMechanismDriver(api.MechanismDriver):
|
||||
worker.MaintenanceWorker,
|
||||
service.RpcWorker)
|
||||
|
||||
@lockutils.synchronized('hash_ring_probe_lock', external=True)
|
||||
def _setup_hash_ring(self):
|
||||
"""Setup the hash ring.
|
||||
|
||||
The first worker to acquire the lock is responsible for cleaning
|
||||
the hash ring from previous runs as well as start the probing
|
||||
thread for this host. Subsequently workers just need to register
|
||||
themselves to the hash ring.
|
||||
The first worker to execute this method will remove the hash ring from
|
||||
previous runs as well as start the probing thread for this host.
|
||||
Subsequently workers just need to register themselves to the hash ring.
|
||||
"""
|
||||
# Attempt to remove the node from the ring when the worker stops
|
||||
sh = oslo_service.SignalHandler()
|
||||
atexit.register(self._remove_node_from_hash_ring)
|
||||
sh.add_handler("SIGTERM", self._remove_node_from_hash_ring)
|
||||
|
||||
if self._start_time:
|
||||
self._setup_hash_ring_start_time()
|
||||
else:
|
||||
self._setup_hash_ring_event()
|
||||
|
||||
def _register_hash_ring_maintenance(self):
|
||||
self._hash_ring_thread = maintenance.MaintenanceThread()
|
||||
self._hash_ring_thread.add_periodics(
|
||||
maintenance.HashRingHealthCheckPeriodics(
|
||||
self.hash_ring_group))
|
||||
self._hash_ring_thread.start()
|
||||
LOG.info('Hash Ring probing thread has started')
|
||||
|
||||
def _setup_hash_ring_event(self):
|
||||
LOG.debug('Hash Ring setup using multiprocess event lock')
|
||||
admin_context = n_context.get_admin_context()
|
||||
if not self._hash_ring_probe_event.is_set():
|
||||
# Clear existing entries
|
||||
# Clear existing entries. This code section should be executed
|
||||
# only once per node (chassis); the multiprocess event should be
|
||||
# set just after the ``is_set`` check.
|
||||
self._hash_ring_probe_event.set()
|
||||
ovn_hash_ring_db.remove_nodes_from_host(admin_context,
|
||||
self.hash_ring_group)
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
|
||||
self.hash_ring_group)
|
||||
self._hash_ring_thread = maintenance.MaintenanceThread()
|
||||
self._hash_ring_thread.add_periodics(
|
||||
maintenance.HashRingHealthCheckPeriodics(
|
||||
self.hash_ring_group))
|
||||
self._hash_ring_thread.start()
|
||||
LOG.info("Hash Ring probing thread has started")
|
||||
self._hash_ring_probe_event.set()
|
||||
else:
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
|
||||
self.hash_ring_group)
|
||||
self._register_hash_ring_maintenance()
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
|
||||
self.hash_ring_group)
|
||||
|
||||
def _setup_hash_ring_start_time(self):
|
||||
LOG.debug('Hash Ring setup using WSGI start time')
|
||||
admin_context = n_context.get_admin_context()
|
||||
with db_api.CONTEXT_WRITER.using(admin_context):
|
||||
# Delete all node registers without created_at=self._start_time
|
||||
created_at = n_utils.ts_to_datetime(self._start_time)
|
||||
ovn_hash_ring_db.remove_nodes_from_host(
|
||||
admin_context, self.hash_ring_group, created_at=created_at)
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(
|
||||
admin_context, self.hash_ring_group, created_at=created_at)
|
||||
newer_nodes = ovn_hash_ring_db.get_nodes(
|
||||
admin_context, self.hash_ring_group, created_at=created_at)
|
||||
LOG.debug('Hash Ring setup, this worker has detected %s OVN hash'
|
||||
'ring registers in the database', len(newer_nodes))
|
||||
|
||||
if len(newer_nodes) == 1:
|
||||
# If only one register per host is present, that means this worker
|
||||
# is the first one to register itself.
|
||||
self._register_hash_ring_maintenance()
|
||||
|
||||
def post_fork_initialize(self, resource, event, trigger, payload=None):
|
||||
# Initialize API/Maintenance workers with OVN IDL connections
|
||||
|
@ -23,8 +23,10 @@ import netaddr
|
||||
|
||||
from neutron_lib.api.definitions import portbindings
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.db import api as db_api
|
||||
from neutron_lib.exceptions import agent as agent_exc
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
from ovsdbapp.backend.ovs_idl import event
|
||||
|
||||
@ -32,6 +34,7 @@ from neutron.common.ovn import constants as ovn_const
|
||||
from neutron.common.ovn import utils
|
||||
from neutron.common import utils as n_utils
|
||||
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
|
||||
from neutron.db import ovn_hash_ring_db
|
||||
from neutron.db import ovn_revision_numbers_db as db_rev
|
||||
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
|
||||
from neutron.tests import base as tests_base
|
||||
@ -57,6 +60,40 @@ VHOSTUSER_VIF_DETAILS = {
|
||||
}
|
||||
|
||||
|
||||
class TestOVNMechanismDriver(base.TestOVNFunctionalBase):
|
||||
|
||||
def test__setup_hash_ring_start_time(self):
|
||||
# Create a differentiated OVN hash ring name.
|
||||
ring_group = uuidutils.generate_uuid()
|
||||
self.mech_driver.hash_ring_group = ring_group
|
||||
|
||||
# Create several OVN hash registers left by a previous execution.
|
||||
created_at = timeutils.utcnow() - datetime.timedelta(1)
|
||||
with db_api.CONTEXT_WRITER.using(self.context):
|
||||
for _ in range(3):
|
||||
self.node_uuid = ovn_hash_ring_db.add_node(
|
||||
self.context, ring_group, created_at=created_at)
|
||||
|
||||
# Check the existing OVN hash ring registers.
|
||||
ovn_hrs = ovn_hash_ring_db.get_nodes(self.context, ring_group)
|
||||
self.assertEqual(3, len(ovn_hrs))
|
||||
|
||||
start_time = timeutils.utcnow()
|
||||
self.mech_driver._start_time = int(start_time.timestamp())
|
||||
with mock.patch.object(self.mech_driver,
|
||||
'_register_hash_ring_maintenance') as \
|
||||
mock_register_maintenance:
|
||||
for _ in range(3):
|
||||
self.mech_driver._setup_hash_ring_start_time()
|
||||
|
||||
ovn_hrs = ovn_hash_ring_db.get_nodes(self.context, ring_group)
|
||||
self.assertEqual(3, len(ovn_hrs))
|
||||
for ovn_hr in ovn_hrs:
|
||||
self.assertEqual(int(start_time.timestamp()),
|
||||
ovn_hr.created_at.timestamp())
|
||||
mock_register_maintenance.assert_called_once()
|
||||
|
||||
|
||||
class TestPortBinding(base.TestOVNFunctionalBase):
|
||||
|
||||
def setUp(self, **kwargs):
|
||||
|
8
releasenotes/notes/wsgi_start-time-101ce9c9a36b8a4f.yaml
Normal file
8
releasenotes/notes/wsgi_start-time-101ce9c9a36b8a4f.yaml
Normal file
@ -0,0 +1,8 @@
|
||||
---
|
||||
other:
|
||||
- |
|
||||
The Neutron API using the WSGI module requires a new configuration
|
||||
parameter: ``[uwsgi]start-time=%t``. The uWSGI process will populate this
|
||||
value when executed, defining the start time of the Neutron API. This value
|
||||
will be used by Neutron ML2/OVN to create the OVN hash ring registers per
|
||||
worker.
|
Loading…
Reference in New Issue
Block a user