From a9397f49d5efe7d6b4b6aca121f918fda2383dbd Mon Sep 17 00:00:00 2001 From: Dmitry Tantsur <dtantsur@protonmail.com> Date: Mon, 26 Feb 2024 18:16:17 +0100 Subject: [PATCH] Split conductor-specific RPCService The current implementation in common has a lot of assumptions that the manager is a conductor manager. To be able to reuse the same base RPC service for the PXE filter, split the conductor part away from the common one. Change-Id: I4d24cf82d62cb034840435ef15b5373748b65f09 --- ironic/cmd/conductor.py | 2 +- ironic/cmd/singleprocess.py | 2 +- ironic/common/rpc_service.py | 107 ++------------- ironic/conductor/manager.py | 3 +- ironic/conductor/rpc_service.py | 125 ++++++++++++++++++ .../{common => conductor}/test_rpc_service.py | 2 +- 6 files changed, 138 insertions(+), 103 deletions(-) create mode 100644 ironic/conductor/rpc_service.py rename ironic/tests/unit/{common => conductor}/test_rpc_service.py (99%) diff --git a/ironic/cmd/conductor.py b/ironic/cmd/conductor.py index ec10874b4c..97594e37e5 100644 --- a/ironic/cmd/conductor.py +++ b/ironic/cmd/conductor.py @@ -25,9 +25,9 @@ from oslo_config import cfg from oslo_log import log from oslo_service import service -from ironic.common import rpc_service from ironic.common import service as ironic_service from ironic.common import utils +from ironic.conductor import rpc_service CONF = cfg.CONF diff --git a/ironic/cmd/singleprocess.py b/ironic/cmd/singleprocess.py index 28100efe9b..8171bfde26 100644 --- a/ironic/cmd/singleprocess.py +++ b/ironic/cmd/singleprocess.py @@ -17,9 +17,9 @@ from oslo_log import log from oslo_service import service from ironic.cmd import conductor as conductor_cmd -from ironic.common import rpc_service from ironic.common import service as ironic_service from ironic.common import wsgi_service +from ironic.conductor import rpc_service CONF = cfg.CONF diff --git a/ironic/common/rpc_service.py b/ironic/common/rpc_service.py index 62e61e1cad..60204becfa 100644 --- a/ironic/common/rpc_service.py +++ b/ironic/common/rpc_service.py @@ -14,8 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import datetime -import signal import sys import time @@ -25,7 +23,6 @@ from oslo_log import log import oslo_messaging as messaging from oslo_service import service from oslo_utils import importutils -from oslo_utils import timeutils from ironic.common import context from ironic.common import rpc @@ -35,20 +32,18 @@ LOG = log.getLogger(__name__) CONF = cfg.CONF -class RPCService(service.Service): +class BaseRPCService(service.Service): def __init__(self, host, manager_module, manager_class): - super(RPCService, self).__init__() + super().__init__() self.host = host manager_module = importutils.try_import(manager_module) manager_class = getattr(manager_module, manager_class) - self.manager = manager_class(host, rpc.MANAGER_TOPIC) + self.manager = manager_class(host) self.topic = self.manager.topic self.rpcserver = None - self.deregister = True - self._failure = None self._started = False - self.draining = False + self._failure = None def wait_for_start(self): while not self._started and not self._failure: @@ -60,7 +55,7 @@ class RPCService(service.Service): def start(self): self._failure = None self._started = False - super(RPCService, self).start() + super().start() try: self._real_start() except Exception as exc: @@ -69,6 +64,9 @@ class RPCService(service.Service): else: self._started = True + def handle_signal(self): + pass + def _real_start(self): admin_context = context.get_admin_context() @@ -88,97 +86,8 @@ class RPCService(service.Service): self.handle_signal() self.manager.init_host(admin_context) - rpc.set_global_manager(self.manager) LOG.info('Created RPC server with %(transport)s transport for service ' '%(service)s on host %(host)s.', {'service': self.topic, 'host': self.host, 'transport': CONF.rpc_transport}) - - def stop(self): - initial_time = timeutils.utcnow() - extend_time = initial_time + datetime.timedelta( - seconds=CONF.hash_ring_reset_interval) - - try: - self.manager.del_host(deregister=self.deregister, - clear_node_reservations=False) - except Exception as e: - LOG.exception('Service error occurred when cleaning up ' - 'the RPC manager. Error: %s', e) - - if self.manager.get_online_conductor_count() > 1: - # Delay stopping the server until the hash ring has been - # reset on the cluster - stop_time = timeutils.utcnow() - if stop_time < extend_time: - stop_wait = max(0, (extend_time - stop_time).seconds) - LOG.info('Waiting %(stop_wait)s seconds for hash ring reset.', - {'stop_wait': stop_wait}) - time.sleep(stop_wait) - - try: - if self.rpcserver is not None: - self.rpcserver.stop() - self.rpcserver.wait() - except Exception as e: - LOG.exception('Service error occurred when stopping the ' - 'RPC server. Error: %s', e) - - super(RPCService, self).stop(graceful=True) - LOG.info('Stopped RPC server for service %(service)s on host ' - '%(host)s.', - {'service': self.topic, 'host': self.host}) - - # Wait for reservation locks held by this conductor. - # The conductor process will end when one of the following occurs: - # - All reservations for this conductor are released - # - shutdown_timeout has elapsed - # - The process manager (systemd, kubernetes) sends SIGKILL after the - # configured timeout period - while (self.manager.has_reserved() - and not self._shutdown_timeout_reached(initial_time)): - LOG.info('Waiting for reserved nodes to clear on host %(host)s', - {'host': self.host}) - time.sleep(1) - - # Stop the keepalive heartbeat greenthread sending touch(online=False) - self.manager.keepalive_halt() - - rpc.set_global_manager(None) - - def _shutdown_timeout_reached(self, initial_time): - if self.draining: - shutdown_timeout = CONF.drain_shutdown_timeout - else: - shutdown_timeout = CONF.graceful_shutdown_timeout - if shutdown_timeout == 0: - # No timeout, run until no nodes are reserved - return False - shutdown_time = initial_time + datetime.timedelta( - seconds=shutdown_timeout) - return shutdown_time < timeutils.utcnow() - - def _handle_no_deregister(self, signo, frame): - LOG.info('Got signal SIGUSR1. Not deregistering on next shutdown ' - 'of service %(service)s on host %(host)s.', - {'service': self.topic, 'host': self.host}) - self.deregister = False - - def _handle_drain(self, signo, frame): - LOG.info('Got signal SIGUSR2. Starting drain shutdown' - 'of service %(service)s on host %(host)s.', - {'service': self.topic, 'host': self.host}) - self.draining = True - self.stop() - - def handle_signal(self): - """Add a signal handler for SIGUSR1, SIGUSR2. - - The SIGUSR1 handler ensures that the manager is not deregistered when - it is shutdown. - - The SIGUSR2 handler starts a drain shutdown. - """ - signal.signal(signal.SIGUSR1, self._handle_no_deregister) - signal.signal(signal.SIGUSR2, self._handle_drain) diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index 5121adaa3e..6e8a519c3b 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -59,6 +59,7 @@ from ironic.common import faults from ironic.common.i18n import _ from ironic.common import network from ironic.common import nova +from ironic.common import rpc from ironic.common import states from ironic.conductor import allocations from ironic.conductor import base_manager @@ -100,7 +101,7 @@ class ConductorManager(base_manager.BaseConductorManager): target = messaging.Target(version=RPC_API_VERSION) - def __init__(self, host, topic): + def __init__(self, host, topic=rpc.MANAGER_TOPIC): super(ConductorManager, self).__init__(host, topic) # NOTE(TheJulia): This is less a metric-able count, but a means to # sort out nodes and prioritise a subset (of non-responding nodes). diff --git a/ironic/conductor/rpc_service.py b/ironic/conductor/rpc_service.py new file mode 100644 index 0000000000..d0163c4934 --- /dev/null +++ b/ironic/conductor/rpc_service.py @@ -0,0 +1,125 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import signal +import time + +from oslo_config import cfg +from oslo_log import log +from oslo_utils import timeutils + +from ironic.common import rpc +from ironic.common import rpc_service + +LOG = log.getLogger(__name__) +CONF = cfg.CONF + + +class RPCService(rpc_service.BaseRPCService): + + def __init__(self, host, manager_module, manager_class): + super().__init__(host, manager_module, manager_class) + self.deregister = True + self.draining = False + + def _real_start(self): + super()._real_start() + rpc.set_global_manager(self.manager) + + def stop(self): + initial_time = timeutils.utcnow() + extend_time = initial_time + datetime.timedelta( + seconds=CONF.hash_ring_reset_interval) + + try: + self.manager.del_host(deregister=self.deregister, + clear_node_reservations=False) + except Exception as e: + LOG.exception('Service error occurred when cleaning up ' + 'the RPC manager. Error: %s', e) + + if self.manager.get_online_conductor_count() > 1: + # Delay stopping the server until the hash ring has been + # reset on the cluster + stop_time = timeutils.utcnow() + if stop_time < extend_time: + stop_wait = max(0, (extend_time - stop_time).seconds) + LOG.info('Waiting %(stop_wait)s seconds for hash ring reset.', + {'stop_wait': stop_wait}) + time.sleep(stop_wait) + + try: + if self.rpcserver is not None: + self.rpcserver.stop() + self.rpcserver.wait() + except Exception as e: + LOG.exception('Service error occurred when stopping the ' + 'RPC server. Error: %s', e) + + super().stop(graceful=True) + LOG.info('Stopped RPC server for service %(service)s on host ' + '%(host)s.', + {'service': self.topic, 'host': self.host}) + + # Wait for reservation locks held by this conductor. + # The conductor process will end when one of the following occurs: + # - All reservations for this conductor are released + # - shutdown_timeout has elapsed + # - The process manager (systemd, kubernetes) sends SIGKILL after the + # configured timeout period + while (self.manager.has_reserved() + and not self._shutdown_timeout_reached(initial_time)): + LOG.info('Waiting for reserved nodes to clear on host %(host)s', + {'host': self.host}) + time.sleep(1) + + # Stop the keepalive heartbeat greenthread sending touch(online=False) + self.manager.keepalive_halt() + + rpc.set_global_manager(None) + + def _shutdown_timeout_reached(self, initial_time): + if self.draining: + shutdown_timeout = CONF.drain_shutdown_timeout + else: + shutdown_timeout = CONF.graceful_shutdown_timeout + if shutdown_timeout == 0: + # No timeout, run until no nodes are reserved + return False + shutdown_time = initial_time + datetime.timedelta( + seconds=shutdown_timeout) + return shutdown_time < timeutils.utcnow() + + def _handle_no_deregister(self, signo, frame): + LOG.info('Got signal SIGUSR1. Not deregistering on next shutdown ' + 'of service %(service)s on host %(host)s.', + {'service': self.topic, 'host': self.host}) + self.deregister = False + + def _handle_drain(self, signo, frame): + LOG.info('Got signal SIGUSR2. Starting drain shutdown' + 'of service %(service)s on host %(host)s.', + {'service': self.topic, 'host': self.host}) + self.draining = True + self.stop() + + def handle_signal(self): + """Add a signal handler for SIGUSR1, SIGUSR2. + + The SIGUSR1 handler ensures that the manager is not deregistered when + it is shutdown. + + The SIGUSR2 handler starts a drain shutdown. + """ + signal.signal(signal.SIGUSR1, self._handle_no_deregister) + signal.signal(signal.SIGUSR2, self._handle_drain) diff --git a/ironic/tests/unit/common/test_rpc_service.py b/ironic/tests/unit/conductor/test_rpc_service.py similarity index 99% rename from ironic/tests/unit/common/test_rpc_service.py rename to ironic/tests/unit/conductor/test_rpc_service.py index a2cb0fa77f..d872ba22ab 100644 --- a/ironic/tests/unit/common/test_rpc_service.py +++ b/ironic/tests/unit/conductor/test_rpc_service.py @@ -21,9 +21,9 @@ from oslo_utils import timeutils from ironic.common import context from ironic.common import rpc -from ironic.common import rpc_service from ironic.common import service as ironic_service from ironic.conductor import manager +from ironic.conductor import rpc_service from ironic.objects import base as objects_base from ironic.tests.unit.db import base as db_base from ironic.tests.unit.db import utils as db_utils