Registration-based subprocess cleanup on service shutdown
Introduce a helper class SubprocessCleanup in dccommon which allows a worker to register a subprocess that must be cleaned up (killed) upon service exit. There are two parts to this mechanism: 1. Registration: - The subprocess is registered for cleanup when spawned (see utils.run_playbook_with_timeout) - Suprocess is also spawned using setsid in order to start a new process group + session 2. The Service calls subprocess_cleanup upon stopping. - All registered subprocesses are terminated using the os.killpg() call to terminate the entire subprocess process group. Caveat: This mechanism only handles clean process exit cases. If the process crashes or is is killed non-gracefully via SIGKILL, the cleanup will not happen. Closes-Bug: 1972013 Test Plan: PASS: Orchestrated prestaging: * Perform system host-swact while prestaging packages in progress - ansible-playbook is terminated - prestaging task is marked as prestaging-failed * Perform system host-swact while prestaging images in progress - ansible-playbook is terminated - prestaging task is marked as prestaging-failed * Restart dcmanager-orchestrator service for the same two cases as above - behaviour is the same as for swact * Kill dcmanager-orchestrator service while prestaging in progress Non-Orchestrated prestaging: * Perform host-swact and service restart for non-orchestrated prestaging - ansible-playbook is terminated - subcloud deploy status marked as prestaging-failed Swact during large-scale subcloud add - initiate large number of subcloud add operations - swact during 'installing' state - swact during 'bootstrapping' state - verify that ansible playbooks are killed - verify that deploy status is updated with -failed state Not covered: Tested a sudo 'pkill -9 dcmanager-manager' (ungraceful SIGKILL) - in this case the ansible subprocess tree is not cleaned up - this is expected - we aren't handling a non-clean shutdown Signed-off-by: Kyle MacLeod <kyle.macleod@windriver.com> Change-Id: I714398017b71c99edeeaa828933edd8163fb67cd
This commit is contained in:
parent
286d79348c
commit
b24837a73d
@ -448,6 +448,8 @@ class SubcloudInstall(object):
|
||||
log_file = os.path.join(log_file_dir, self.name) + '_playbook_output.log'
|
||||
|
||||
try:
|
||||
# Since this is a long-running task we want to register
|
||||
# for cleanup on process restart/SWACT.
|
||||
run_playbook(log_file, install_command)
|
||||
except exceptions.PlaybookExecutionFailed:
|
||||
msg = ("Failed to install the subcloud %s, check individual "
|
||||
|
73
distributedcloud/dccommon/subprocess_cleanup.py
Normal file
73
distributedcloud/dccommon/subprocess_cleanup.py
Normal file
@ -0,0 +1,73 @@
|
||||
#
|
||||
# Copyright (c) 2022 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
import os
|
||||
import signal
|
||||
import time
|
||||
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SubprocessCleanup(object):
|
||||
"""Lifecycle manager for subprocesses spawned via python subprocess.
|
||||
|
||||
Notes:
|
||||
- This is a best-effort cleanup. We need to preserve fast shutdown
|
||||
times in case of a SWACT.
|
||||
- There could potentially be multiple hundreds of subprocesses needing
|
||||
to be cleaned up here.
|
||||
"""
|
||||
|
||||
LOCK_NAME = 'subprocess-cleanup'
|
||||
SUBPROCESS_GROUPS = {}
|
||||
|
||||
@staticmethod
|
||||
def register_subprocess_group(subprocess_p):
|
||||
SubprocessCleanup.SUBPROCESS_GROUPS[subprocess_p.pid] = subprocess_p
|
||||
|
||||
@staticmethod
|
||||
def unregister_subprocess_group(subprocess_p):
|
||||
SubprocessCleanup.SUBPROCESS_GROUPS.pop(subprocess_p.pid, None)
|
||||
|
||||
@staticmethod
|
||||
@lockutils.synchronized(LOCK_NAME)
|
||||
def shutdown_cleanup(origin='service'):
|
||||
SubprocessCleanup._shutdown_subprocess_groups(origin)
|
||||
|
||||
@staticmethod
|
||||
def _shutdown_subprocess_groups(origin):
|
||||
num_process_groups = len(SubprocessCleanup.SUBPROCESS_GROUPS)
|
||||
if num_process_groups > 0:
|
||||
LOG.warn("Shutting down %d process groups via %s",
|
||||
num_process_groups, origin)
|
||||
start_time = time.time()
|
||||
for _, subp in SubprocessCleanup.SUBPROCESS_GROUPS.items():
|
||||
kill_subprocess_group(subp)
|
||||
LOG.info("Time for %s child processes to exit: %s",
|
||||
num_process_groups,
|
||||
time.time() - start_time)
|
||||
|
||||
|
||||
def kill_subprocess_group(subp, logmsg=None):
|
||||
"""Kill the subprocess and any children."""
|
||||
exitcode = subp.poll()
|
||||
if exitcode:
|
||||
LOG.info("kill_subprocess_tree: subprocess has already "
|
||||
"terminated, pid: %s, exitcode=%s", subp.pid, exitcode)
|
||||
return
|
||||
|
||||
if logmsg:
|
||||
LOG.warn(logmsg)
|
||||
else:
|
||||
LOG.warn("Killing subprocess group for pid: %s, args: %s",
|
||||
subp.pid, subp.args)
|
||||
# Send a SIGTERM (normal kill). We do not verify if the processes
|
||||
# are shutdown (best-effort), since we don't want to wait around before
|
||||
# issueing a SIGKILL (fast shutdown)
|
||||
os.killpg(subp.pid, signal.SIGTERM)
|
@ -25,7 +25,7 @@ class TestUtils(base.DCCommonTestCase):
|
||||
def test_run_playbook_timeout(self):
|
||||
testscript = ['dccommon/tests/unit/test_utils_script.sh', '30']
|
||||
self.assertRaises(PlaybookExecutionTimeout,
|
||||
utils.run_playbook_with_timeout,
|
||||
utils.run_playbook,
|
||||
'/dev/null',
|
||||
testscript,
|
||||
timeout=2)
|
||||
@ -36,7 +36,7 @@ class TestUtils(base.DCCommonTestCase):
|
||||
# a hung process
|
||||
script = ['dccommon/tests/unit/test_utils_script.sh', '30', 'TERM']
|
||||
self.assertRaises(PlaybookExecutionTimeout,
|
||||
utils.run_playbook_with_timeout,
|
||||
utils.run_playbook,
|
||||
'/dev/null',
|
||||
script,
|
||||
timeout=2)
|
||||
|
@ -23,6 +23,7 @@ from oslo_utils import timeutils
|
||||
|
||||
from dccommon.exceptions import PlaybookExecutionFailed
|
||||
from dccommon.exceptions import PlaybookExecutionTimeout
|
||||
from dccommon.subprocess_cleanup import SubprocessCleanup
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
ANSIBLE_PASSWD_PARMS = ['ansible_ssh_pass', 'ansible_become_pass']
|
||||
@ -59,40 +60,14 @@ def _strip_password_from_command(script_command):
|
||||
return logged_command
|
||||
|
||||
|
||||
def run_playbook(log_file, playbook_command):
|
||||
"""Run ansible playbook via subprocess"""
|
||||
exec_env = os.environ.copy()
|
||||
exec_env["ANSIBLE_LOG_PATH"] = "/dev/null"
|
||||
|
||||
with open(log_file, "a+") as f_out_log:
|
||||
try:
|
||||
logged_playbook_command = \
|
||||
_strip_password_from_command(playbook_command)
|
||||
txt = "%s Executing playbook command: %s\n" \
|
||||
% (datetime.today().strftime('%Y-%m-%d-%H:%M:%S'),
|
||||
logged_playbook_command)
|
||||
f_out_log.write(txt)
|
||||
f_out_log.flush()
|
||||
|
||||
subprocess.check_call( # pylint: disable=E1102
|
||||
playbook_command,
|
||||
stdout=f_out_log,
|
||||
stderr=f_out_log,
|
||||
env=exec_env)
|
||||
except subprocess.CalledProcessError:
|
||||
raise PlaybookExecutionFailed(playbook_cmd=playbook_command)
|
||||
except Exception as ex:
|
||||
LOG.error(str(ex))
|
||||
raise
|
||||
|
||||
|
||||
def run_playbook_with_timeout(log_file,
|
||||
playbook_command,
|
||||
timeout=None):
|
||||
def run_playbook(log_file, playbook_command,
|
||||
timeout=None, register_cleanup=True):
|
||||
"""Run ansible playbook via subprocess.
|
||||
|
||||
log_file: Logs output to file
|
||||
timeout: Timeout in seconds. Raises PlaybookExecutionTimeout on timeout
|
||||
register_cleanup: Register the subprocess group for cleanup on shutdown,
|
||||
if the underlying service supports cleanup.
|
||||
"""
|
||||
exec_env = os.environ.copy()
|
||||
exec_env["ANSIBLE_LOG_PATH"] = "/dev/null"
|
||||
@ -100,7 +75,7 @@ def run_playbook_with_timeout(log_file,
|
||||
if timeout:
|
||||
# Invoke ansible-playbook via the 'timeout' command.
|
||||
# Using --kill-after=5s which will force a kill -9 if the process
|
||||
# hasn't terminated within 10s:
|
||||
# hasn't terminated within 5s:
|
||||
timeout_log_str = " (timeout: %ss)" % timeout
|
||||
playbook_command = ["/usr/bin/timeout", "--kill-after=5s",
|
||||
"%ss" % timeout] + playbook_command
|
||||
@ -118,6 +93,14 @@ def run_playbook_with_timeout(log_file,
|
||||
f_out_log.write(txt)
|
||||
f_out_log.flush()
|
||||
|
||||
if register_cleanup:
|
||||
# Use the same process group / session for all children
|
||||
# This makes it easier to kill the entire process group
|
||||
# on cleanup
|
||||
preexec_fn = os.setsid
|
||||
else:
|
||||
preexec_fn = None
|
||||
|
||||
# TODO(kmacleod) future considerations:
|
||||
# - In python3, this code can be simplified to use the new
|
||||
# subprocess.run(timeout=val) method or Popen with
|
||||
@ -128,8 +111,12 @@ def run_playbook_with_timeout(log_file,
|
||||
subp = subprocess.Popen(playbook_command,
|
||||
stdout=f_out_log,
|
||||
stderr=f_out_log,
|
||||
env=exec_env)
|
||||
env=exec_env,
|
||||
preexec_fn=preexec_fn)
|
||||
try:
|
||||
if register_cleanup:
|
||||
SubprocessCleanup.register_subprocess_group(subp)
|
||||
|
||||
subp.communicate() # wait for process to exit
|
||||
|
||||
if timeout and subp.returncode == TIMEOUT_EXITCODE:
|
||||
@ -143,6 +130,8 @@ def run_playbook_with_timeout(log_file,
|
||||
raise PlaybookExecutionFailed(playbook_cmd=playbook_command)
|
||||
finally:
|
||||
f_out_log.flush()
|
||||
if register_cleanup:
|
||||
SubprocessCleanup.unregister_subprocess_group(subp)
|
||||
|
||||
except PlaybookExecutionFailed:
|
||||
raise
|
||||
|
@ -34,7 +34,7 @@ from dccommon.drivers.openstack.sdk_platform import OpenStackDriver
|
||||
from dccommon.drivers.openstack.sysinv_v1 import SysinvClient
|
||||
from dccommon.exceptions import PlaybookExecutionFailed
|
||||
from dccommon.exceptions import PlaybookExecutionTimeout
|
||||
from dccommon.utils import run_playbook_with_timeout
|
||||
from dccommon.utils import run_playbook
|
||||
|
||||
from dcmanager.common import consts
|
||||
from dcmanager.common import exceptions
|
||||
@ -399,9 +399,8 @@ def _run_ansible(context, prestage_command, phase,
|
||||
oam_floating_ip,
|
||||
ansible_pass=base64.b64decode(sysadmin_password).decode('utf-8'))
|
||||
try:
|
||||
run_playbook_with_timeout(log_file,
|
||||
prestage_command,
|
||||
timeout=timeout_seconds)
|
||||
run_playbook(log_file, prestage_command,
|
||||
timeout=timeout_seconds, register_cleanup=True)
|
||||
except PlaybookExecutionFailed as ex:
|
||||
timeout_msg = ''
|
||||
if isinstance(ex, PlaybookExecutionTimeout):
|
||||
|
@ -22,6 +22,7 @@ import oslo_messaging
|
||||
from oslo_service import service
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from dccommon.subprocess_cleanup import SubprocessCleanup
|
||||
from dcmanager.audit import rpcapi as dcmanager_audit_rpc_client
|
||||
from dcmanager.common import consts
|
||||
from dcmanager.common import context
|
||||
@ -161,16 +162,17 @@ class DCManagerService(service.Service):
|
||||
|
||||
def _stop_rpc_server(self):
|
||||
# Stop RPC connection to prevent new requests
|
||||
LOG.debug(_("Attempting to stop engine service..."))
|
||||
LOG.debug(_("Attempting to stop RPC service..."))
|
||||
try:
|
||||
self._rpc_server.stop()
|
||||
self._rpc_server.wait()
|
||||
LOG.info('Engine service stopped successfully')
|
||||
LOG.info('RPC service stopped successfully')
|
||||
except Exception as ex:
|
||||
LOG.error('Failed to stop engine service: %s',
|
||||
LOG.error('Failed to stop RPC service: %s',
|
||||
six.text_type(ex))
|
||||
|
||||
def stop(self):
|
||||
SubprocessCleanup.shutdown_cleanup(origin="service")
|
||||
self._stop_rpc_server()
|
||||
# Terminate the engine process
|
||||
LOG.info("All threads were gone, terminating engine")
|
||||
|
@ -92,10 +92,11 @@ class OrchThread(threading.Thread):
|
||||
self._stop.set()
|
||||
|
||||
def run(self):
|
||||
LOG.info("(%s) OrchThread Starting" % self.update_type)
|
||||
self.run_orch()
|
||||
# Stop any greenthreads that are still running
|
||||
LOG.info("(%s) OrchThread Stopping" % self.update_type)
|
||||
self.thread_group_manager.stop()
|
||||
LOG.info("(%s) OrchThread Stopped" % self.update_type)
|
||||
|
||||
@staticmethod
|
||||
def get_ks_client(region_name=consts.DEFAULT_REGION_NAME):
|
||||
|
@ -21,6 +21,7 @@ from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_service import service
|
||||
|
||||
from dccommon.subprocess_cleanup import SubprocessCleanup
|
||||
from dcmanager.common import consts
|
||||
from dcmanager.common import context
|
||||
from dcmanager.common import exceptions
|
||||
@ -80,19 +81,20 @@ class DCManagerOrchestratorService(service.Service):
|
||||
|
||||
def _stop_rpc_server(self):
|
||||
# Stop RPC connection to prevent new requests
|
||||
LOG.debug("Attempting to stop engine service...")
|
||||
LOG.debug("Attempting to stop RPC service...")
|
||||
if self._rpc_server is not None:
|
||||
try:
|
||||
self._rpc_server.stop()
|
||||
self._rpc_server.wait()
|
||||
self._rpc_server = None
|
||||
LOG.info('Engine service stopped successfully')
|
||||
LOG.info('RPC service stopped successfully')
|
||||
except Exception as ex:
|
||||
LOG.error('Failed to stop engine service: %s',
|
||||
six.text_type(ex))
|
||||
|
||||
def stop(self):
|
||||
"""Stop anything initiated by start"""
|
||||
SubprocessCleanup.shutdown_cleanup(origin="service")
|
||||
self._stop_rpc_server()
|
||||
if self.TG is not None:
|
||||
self.TG.stop()
|
||||
|
Loading…
Reference in New Issue
Block a user