Split conductor-specific RPCService

The current implementation in common has a lot of assumptions that the
manager is a conductor manager. To be able to reuse the same base RPC
service for the PXE filter, split the conductor part away from the
common one.

Change-Id: I4d24cf82d62cb034840435ef15b5373748b65f09
This commit is contained in:
Dmitry Tantsur 2024-02-26 18:16:17 +01:00
parent 0cfe290d15
commit a9397f49d5
No known key found for this signature in database
GPG Key ID: 315B2AF9FD216C60
6 changed files with 138 additions and 103 deletions

@ -25,9 +25,9 @@ from oslo_config import cfg
from oslo_log import log from oslo_log import log
from oslo_service import service from oslo_service import service
from ironic.common import rpc_service
from ironic.common import service as ironic_service from ironic.common import service as ironic_service
from ironic.common import utils from ironic.common import utils
from ironic.conductor import rpc_service
CONF = cfg.CONF CONF = cfg.CONF

@ -17,9 +17,9 @@ from oslo_log import log
from oslo_service import service from oslo_service import service
from ironic.cmd import conductor as conductor_cmd from ironic.cmd import conductor as conductor_cmd
from ironic.common import rpc_service
from ironic.common import service as ironic_service from ironic.common import service as ironic_service
from ironic.common import wsgi_service from ironic.common import wsgi_service
from ironic.conductor import rpc_service
CONF = cfg.CONF CONF = cfg.CONF

@ -14,8 +14,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import datetime
import signal
import sys import sys
import time import time
@ -25,7 +23,6 @@ from oslo_log import log
import oslo_messaging as messaging import oslo_messaging as messaging
from oslo_service import service from oslo_service import service
from oslo_utils import importutils from oslo_utils import importutils
from oslo_utils import timeutils
from ironic.common import context from ironic.common import context
from ironic.common import rpc from ironic.common import rpc
@ -35,20 +32,18 @@ LOG = log.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
class RPCService(service.Service): class BaseRPCService(service.Service):
def __init__(self, host, manager_module, manager_class): def __init__(self, host, manager_module, manager_class):
super(RPCService, self).__init__() super().__init__()
self.host = host self.host = host
manager_module = importutils.try_import(manager_module) manager_module = importutils.try_import(manager_module)
manager_class = getattr(manager_module, manager_class) manager_class = getattr(manager_module, manager_class)
self.manager = manager_class(host, rpc.MANAGER_TOPIC) self.manager = manager_class(host)
self.topic = self.manager.topic self.topic = self.manager.topic
self.rpcserver = None self.rpcserver = None
self.deregister = True
self._failure = None
self._started = False self._started = False
self.draining = False self._failure = None
def wait_for_start(self): def wait_for_start(self):
while not self._started and not self._failure: while not self._started and not self._failure:
@ -60,7 +55,7 @@ class RPCService(service.Service):
def start(self): def start(self):
self._failure = None self._failure = None
self._started = False self._started = False
super(RPCService, self).start() super().start()
try: try:
self._real_start() self._real_start()
except Exception as exc: except Exception as exc:
@ -69,6 +64,9 @@ class RPCService(service.Service):
else: else:
self._started = True self._started = True
def handle_signal(self):
pass
def _real_start(self): def _real_start(self):
admin_context = context.get_admin_context() admin_context = context.get_admin_context()
@ -88,97 +86,8 @@ class RPCService(service.Service):
self.handle_signal() self.handle_signal()
self.manager.init_host(admin_context) self.manager.init_host(admin_context)
rpc.set_global_manager(self.manager)
LOG.info('Created RPC server with %(transport)s transport for service ' LOG.info('Created RPC server with %(transport)s transport for service '
'%(service)s on host %(host)s.', '%(service)s on host %(host)s.',
{'service': self.topic, 'host': self.host, {'service': self.topic, 'host': self.host,
'transport': CONF.rpc_transport}) 'transport': CONF.rpc_transport})
def stop(self):
initial_time = timeutils.utcnow()
extend_time = initial_time + datetime.timedelta(
seconds=CONF.hash_ring_reset_interval)
try:
self.manager.del_host(deregister=self.deregister,
clear_node_reservations=False)
except Exception as e:
LOG.exception('Service error occurred when cleaning up '
'the RPC manager. Error: %s', e)
if self.manager.get_online_conductor_count() > 1:
# Delay stopping the server until the hash ring has been
# reset on the cluster
stop_time = timeutils.utcnow()
if stop_time < extend_time:
stop_wait = max(0, (extend_time - stop_time).seconds)
LOG.info('Waiting %(stop_wait)s seconds for hash ring reset.',
{'stop_wait': stop_wait})
time.sleep(stop_wait)
try:
if self.rpcserver is not None:
self.rpcserver.stop()
self.rpcserver.wait()
except Exception as e:
LOG.exception('Service error occurred when stopping the '
'RPC server. Error: %s', e)
super(RPCService, self).stop(graceful=True)
LOG.info('Stopped RPC server for service %(service)s on host '
'%(host)s.',
{'service': self.topic, 'host': self.host})
# Wait for reservation locks held by this conductor.
# The conductor process will end when one of the following occurs:
# - All reservations for this conductor are released
# - shutdown_timeout has elapsed
# - The process manager (systemd, kubernetes) sends SIGKILL after the
# configured timeout period
while (self.manager.has_reserved()
and not self._shutdown_timeout_reached(initial_time)):
LOG.info('Waiting for reserved nodes to clear on host %(host)s',
{'host': self.host})
time.sleep(1)
# Stop the keepalive heartbeat greenthread sending touch(online=False)
self.manager.keepalive_halt()
rpc.set_global_manager(None)
def _shutdown_timeout_reached(self, initial_time):
if self.draining:
shutdown_timeout = CONF.drain_shutdown_timeout
else:
shutdown_timeout = CONF.graceful_shutdown_timeout
if shutdown_timeout == 0:
# No timeout, run until no nodes are reserved
return False
shutdown_time = initial_time + datetime.timedelta(
seconds=shutdown_timeout)
return shutdown_time < timeutils.utcnow()
def _handle_no_deregister(self, signo, frame):
LOG.info('Got signal SIGUSR1. Not deregistering on next shutdown '
'of service %(service)s on host %(host)s.',
{'service': self.topic, 'host': self.host})
self.deregister = False
def _handle_drain(self, signo, frame):
LOG.info('Got signal SIGUSR2. Starting drain shutdown'
'of service %(service)s on host %(host)s.',
{'service': self.topic, 'host': self.host})
self.draining = True
self.stop()
def handle_signal(self):
"""Add a signal handler for SIGUSR1, SIGUSR2.
The SIGUSR1 handler ensures that the manager is not deregistered when
it is shutdown.
The SIGUSR2 handler starts a drain shutdown.
"""
signal.signal(signal.SIGUSR1, self._handle_no_deregister)
signal.signal(signal.SIGUSR2, self._handle_drain)

@ -59,6 +59,7 @@ from ironic.common import faults
from ironic.common.i18n import _ from ironic.common.i18n import _
from ironic.common import network from ironic.common import network
from ironic.common import nova from ironic.common import nova
from ironic.common import rpc
from ironic.common import states from ironic.common import states
from ironic.conductor import allocations from ironic.conductor import allocations
from ironic.conductor import base_manager from ironic.conductor import base_manager
@ -100,7 +101,7 @@ class ConductorManager(base_manager.BaseConductorManager):
target = messaging.Target(version=RPC_API_VERSION) target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, host, topic): def __init__(self, host, topic=rpc.MANAGER_TOPIC):
super(ConductorManager, self).__init__(host, topic) super(ConductorManager, self).__init__(host, topic)
# NOTE(TheJulia): This is less a metric-able count, but a means to # NOTE(TheJulia): This is less a metric-able count, but a means to
# sort out nodes and prioritise a subset (of non-responding nodes). # sort out nodes and prioritise a subset (of non-responding nodes).

@ -0,0 +1,125 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import signal
import time
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
from ironic.common import rpc
from ironic.common import rpc_service
LOG = log.getLogger(__name__)
CONF = cfg.CONF
class RPCService(rpc_service.BaseRPCService):
def __init__(self, host, manager_module, manager_class):
super().__init__(host, manager_module, manager_class)
self.deregister = True
self.draining = False
def _real_start(self):
super()._real_start()
rpc.set_global_manager(self.manager)
def stop(self):
initial_time = timeutils.utcnow()
extend_time = initial_time + datetime.timedelta(
seconds=CONF.hash_ring_reset_interval)
try:
self.manager.del_host(deregister=self.deregister,
clear_node_reservations=False)
except Exception as e:
LOG.exception('Service error occurred when cleaning up '
'the RPC manager. Error: %s', e)
if self.manager.get_online_conductor_count() > 1:
# Delay stopping the server until the hash ring has been
# reset on the cluster
stop_time = timeutils.utcnow()
if stop_time < extend_time:
stop_wait = max(0, (extend_time - stop_time).seconds)
LOG.info('Waiting %(stop_wait)s seconds for hash ring reset.',
{'stop_wait': stop_wait})
time.sleep(stop_wait)
try:
if self.rpcserver is not None:
self.rpcserver.stop()
self.rpcserver.wait()
except Exception as e:
LOG.exception('Service error occurred when stopping the '
'RPC server. Error: %s', e)
super().stop(graceful=True)
LOG.info('Stopped RPC server for service %(service)s on host '
'%(host)s.',
{'service': self.topic, 'host': self.host})
# Wait for reservation locks held by this conductor.
# The conductor process will end when one of the following occurs:
# - All reservations for this conductor are released
# - shutdown_timeout has elapsed
# - The process manager (systemd, kubernetes) sends SIGKILL after the
# configured timeout period
while (self.manager.has_reserved()
and not self._shutdown_timeout_reached(initial_time)):
LOG.info('Waiting for reserved nodes to clear on host %(host)s',
{'host': self.host})
time.sleep(1)
# Stop the keepalive heartbeat greenthread sending touch(online=False)
self.manager.keepalive_halt()
rpc.set_global_manager(None)
def _shutdown_timeout_reached(self, initial_time):
if self.draining:
shutdown_timeout = CONF.drain_shutdown_timeout
else:
shutdown_timeout = CONF.graceful_shutdown_timeout
if shutdown_timeout == 0:
# No timeout, run until no nodes are reserved
return False
shutdown_time = initial_time + datetime.timedelta(
seconds=shutdown_timeout)
return shutdown_time < timeutils.utcnow()
def _handle_no_deregister(self, signo, frame):
LOG.info('Got signal SIGUSR1. Not deregistering on next shutdown '
'of service %(service)s on host %(host)s.',
{'service': self.topic, 'host': self.host})
self.deregister = False
def _handle_drain(self, signo, frame):
LOG.info('Got signal SIGUSR2. Starting drain shutdown'
'of service %(service)s on host %(host)s.',
{'service': self.topic, 'host': self.host})
self.draining = True
self.stop()
def handle_signal(self):
"""Add a signal handler for SIGUSR1, SIGUSR2.
The SIGUSR1 handler ensures that the manager is not deregistered when
it is shutdown.
The SIGUSR2 handler starts a drain shutdown.
"""
signal.signal(signal.SIGUSR1, self._handle_no_deregister)
signal.signal(signal.SIGUSR2, self._handle_drain)

@ -21,9 +21,9 @@ from oslo_utils import timeutils
from ironic.common import context from ironic.common import context
from ironic.common import rpc from ironic.common import rpc
from ironic.common import rpc_service
from ironic.common import service as ironic_service from ironic.common import service as ironic_service
from ironic.conductor import manager from ironic.conductor import manager
from ironic.conductor import rpc_service
from ironic.objects import base as objects_base from ironic.objects import base as objects_base
from ironic.tests.unit.db import base as db_base from ironic.tests.unit.db import base as db_base
from ironic.tests.unit.db import utils as db_utils from ironic.tests.unit.db import utils as db_utils