diff --git a/distributedcloud/dccommon/subcloud_install.py b/distributedcloud/dccommon/subcloud_install.py index d05fb6906..85613f08f 100644 --- a/distributedcloud/dccommon/subcloud_install.py +++ b/distributedcloud/dccommon/subcloud_install.py @@ -448,6 +448,8 @@ class SubcloudInstall(object): log_file = os.path.join(log_file_dir, self.name) + '_playbook_output.log' try: + # Since this is a long-running task we want to register + # for cleanup on process restart/SWACT. run_playbook(log_file, install_command) except exceptions.PlaybookExecutionFailed: msg = ("Failed to install the subcloud %s, check individual " diff --git a/distributedcloud/dccommon/subprocess_cleanup.py b/distributedcloud/dccommon/subprocess_cleanup.py new file mode 100644 index 000000000..d0f5f9c80 --- /dev/null +++ b/distributedcloud/dccommon/subprocess_cleanup.py @@ -0,0 +1,73 @@ +# +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import os +import signal +import time + +from oslo_concurrency import lockutils +from oslo_log import log as logging + +LOG = logging.getLogger(__name__) + + +class SubprocessCleanup(object): + """Lifecycle manager for subprocesses spawned via python subprocess. + + Notes: + - This is a best-effort cleanup. We need to preserve fast shutdown + times in case of a SWACT. + - There could potentially be multiple hundreds of subprocesses needing + to be cleaned up here. + """ + + LOCK_NAME = 'subprocess-cleanup' + SUBPROCESS_GROUPS = {} + + @staticmethod + def register_subprocess_group(subprocess_p): + SubprocessCleanup.SUBPROCESS_GROUPS[subprocess_p.pid] = subprocess_p + + @staticmethod + def unregister_subprocess_group(subprocess_p): + SubprocessCleanup.SUBPROCESS_GROUPS.pop(subprocess_p.pid, None) + + @staticmethod + @lockutils.synchronized(LOCK_NAME) + def shutdown_cleanup(origin='service'): + SubprocessCleanup._shutdown_subprocess_groups(origin) + + @staticmethod + def _shutdown_subprocess_groups(origin): + num_process_groups = len(SubprocessCleanup.SUBPROCESS_GROUPS) + if num_process_groups > 0: + LOG.warn("Shutting down %d process groups via %s", + num_process_groups, origin) + start_time = time.time() + for _, subp in SubprocessCleanup.SUBPROCESS_GROUPS.items(): + kill_subprocess_group(subp) + LOG.info("Time for %s child processes to exit: %s", + num_process_groups, + time.time() - start_time) + + +def kill_subprocess_group(subp, logmsg=None): + """Kill the subprocess and any children.""" + exitcode = subp.poll() + if exitcode: + LOG.info("kill_subprocess_tree: subprocess has already " + "terminated, pid: %s, exitcode=%s", subp.pid, exitcode) + return + + if logmsg: + LOG.warn(logmsg) + else: + LOG.warn("Killing subprocess group for pid: %s, args: %s", + subp.pid, subp.args) + # Send a SIGTERM (normal kill). We do not verify if the processes + # are shutdown (best-effort), since we don't want to wait around before + # issueing a SIGKILL (fast shutdown) + os.killpg(subp.pid, signal.SIGTERM) diff --git a/distributedcloud/dccommon/tests/unit/test_utils.py b/distributedcloud/dccommon/tests/unit/test_utils.py index b30656b59..78e6ebccc 100644 --- a/distributedcloud/dccommon/tests/unit/test_utils.py +++ b/distributedcloud/dccommon/tests/unit/test_utils.py @@ -25,7 +25,7 @@ class TestUtils(base.DCCommonTestCase): def test_run_playbook_timeout(self): testscript = ['dccommon/tests/unit/test_utils_script.sh', '30'] self.assertRaises(PlaybookExecutionTimeout, - utils.run_playbook_with_timeout, + utils.run_playbook, '/dev/null', testscript, timeout=2) @@ -36,7 +36,7 @@ class TestUtils(base.DCCommonTestCase): # a hung process script = ['dccommon/tests/unit/test_utils_script.sh', '30', 'TERM'] self.assertRaises(PlaybookExecutionTimeout, - utils.run_playbook_with_timeout, + utils.run_playbook, '/dev/null', script, timeout=2) diff --git a/distributedcloud/dccommon/utils.py b/distributedcloud/dccommon/utils.py index 5e04b71eb..679c8afa4 100644 --- a/distributedcloud/dccommon/utils.py +++ b/distributedcloud/dccommon/utils.py @@ -23,6 +23,7 @@ from oslo_utils import timeutils from dccommon.exceptions import PlaybookExecutionFailed from dccommon.exceptions import PlaybookExecutionTimeout +from dccommon.subprocess_cleanup import SubprocessCleanup LOG = logging.getLogger(__name__) ANSIBLE_PASSWD_PARMS = ['ansible_ssh_pass', 'ansible_become_pass'] @@ -59,40 +60,14 @@ def _strip_password_from_command(script_command): return logged_command -def run_playbook(log_file, playbook_command): - """Run ansible playbook via subprocess""" - exec_env = os.environ.copy() - exec_env["ANSIBLE_LOG_PATH"] = "/dev/null" - - with open(log_file, "a+") as f_out_log: - try: - logged_playbook_command = \ - _strip_password_from_command(playbook_command) - txt = "%s Executing playbook command: %s\n" \ - % (datetime.today().strftime('%Y-%m-%d-%H:%M:%S'), - logged_playbook_command) - f_out_log.write(txt) - f_out_log.flush() - - subprocess.check_call( # pylint: disable=E1102 - playbook_command, - stdout=f_out_log, - stderr=f_out_log, - env=exec_env) - except subprocess.CalledProcessError: - raise PlaybookExecutionFailed(playbook_cmd=playbook_command) - except Exception as ex: - LOG.error(str(ex)) - raise - - -def run_playbook_with_timeout(log_file, - playbook_command, - timeout=None): +def run_playbook(log_file, playbook_command, + timeout=None, register_cleanup=True): """Run ansible playbook via subprocess. log_file: Logs output to file timeout: Timeout in seconds. Raises PlaybookExecutionTimeout on timeout + register_cleanup: Register the subprocess group for cleanup on shutdown, + if the underlying service supports cleanup. """ exec_env = os.environ.copy() exec_env["ANSIBLE_LOG_PATH"] = "/dev/null" @@ -100,7 +75,7 @@ def run_playbook_with_timeout(log_file, if timeout: # Invoke ansible-playbook via the 'timeout' command. # Using --kill-after=5s which will force a kill -9 if the process - # hasn't terminated within 10s: + # hasn't terminated within 5s: timeout_log_str = " (timeout: %ss)" % timeout playbook_command = ["/usr/bin/timeout", "--kill-after=5s", "%ss" % timeout] + playbook_command @@ -118,6 +93,14 @@ def run_playbook_with_timeout(log_file, f_out_log.write(txt) f_out_log.flush() + if register_cleanup: + # Use the same process group / session for all children + # This makes it easier to kill the entire process group + # on cleanup + preexec_fn = os.setsid + else: + preexec_fn = None + # TODO(kmacleod) future considerations: # - In python3, this code can be simplified to use the new # subprocess.run(timeout=val) method or Popen with @@ -128,8 +111,12 @@ def run_playbook_with_timeout(log_file, subp = subprocess.Popen(playbook_command, stdout=f_out_log, stderr=f_out_log, - env=exec_env) + env=exec_env, + preexec_fn=preexec_fn) try: + if register_cleanup: + SubprocessCleanup.register_subprocess_group(subp) + subp.communicate() # wait for process to exit if timeout and subp.returncode == TIMEOUT_EXITCODE: @@ -143,6 +130,8 @@ def run_playbook_with_timeout(log_file, raise PlaybookExecutionFailed(playbook_cmd=playbook_command) finally: f_out_log.flush() + if register_cleanup: + SubprocessCleanup.unregister_subprocess_group(subp) except PlaybookExecutionFailed: raise diff --git a/distributedcloud/dcmanager/common/prestage.py b/distributedcloud/dcmanager/common/prestage.py index 3b99ee3f4..741f19964 100644 --- a/distributedcloud/dcmanager/common/prestage.py +++ b/distributedcloud/dcmanager/common/prestage.py @@ -34,7 +34,7 @@ from dccommon.drivers.openstack.sdk_platform import OpenStackDriver from dccommon.drivers.openstack.sysinv_v1 import SysinvClient from dccommon.exceptions import PlaybookExecutionFailed from dccommon.exceptions import PlaybookExecutionTimeout -from dccommon.utils import run_playbook_with_timeout +from dccommon.utils import run_playbook from dcmanager.common import consts from dcmanager.common import exceptions @@ -399,9 +399,8 @@ def _run_ansible(context, prestage_command, phase, oam_floating_ip, ansible_pass=base64.b64decode(sysadmin_password).decode('utf-8')) try: - run_playbook_with_timeout(log_file, - prestage_command, - timeout=timeout_seconds) + run_playbook(log_file, prestage_command, + timeout=timeout_seconds, register_cleanup=True) except PlaybookExecutionFailed as ex: timeout_msg = '' if isinstance(ex, PlaybookExecutionTimeout): diff --git a/distributedcloud/dcmanager/manager/service.py b/distributedcloud/dcmanager/manager/service.py index cd92a79f5..703f6fddf 100644 --- a/distributedcloud/dcmanager/manager/service.py +++ b/distributedcloud/dcmanager/manager/service.py @@ -22,6 +22,7 @@ import oslo_messaging from oslo_service import service from oslo_utils import uuidutils +from dccommon.subprocess_cleanup import SubprocessCleanup from dcmanager.audit import rpcapi as dcmanager_audit_rpc_client from dcmanager.common import consts from dcmanager.common import context @@ -161,16 +162,17 @@ class DCManagerService(service.Service): def _stop_rpc_server(self): # Stop RPC connection to prevent new requests - LOG.debug(_("Attempting to stop engine service...")) + LOG.debug(_("Attempting to stop RPC service...")) try: self._rpc_server.stop() self._rpc_server.wait() - LOG.info('Engine service stopped successfully') + LOG.info('RPC service stopped successfully') except Exception as ex: - LOG.error('Failed to stop engine service: %s', + LOG.error('Failed to stop RPC service: %s', six.text_type(ex)) def stop(self): + SubprocessCleanup.shutdown_cleanup(origin="service") self._stop_rpc_server() # Terminate the engine process LOG.info("All threads were gone, terminating engine") diff --git a/distributedcloud/dcmanager/orchestrator/orch_thread.py b/distributedcloud/dcmanager/orchestrator/orch_thread.py index 480247147..f802dec88 100644 --- a/distributedcloud/dcmanager/orchestrator/orch_thread.py +++ b/distributedcloud/dcmanager/orchestrator/orch_thread.py @@ -92,10 +92,11 @@ class OrchThread(threading.Thread): self._stop.set() def run(self): + LOG.info("(%s) OrchThread Starting" % self.update_type) self.run_orch() # Stop any greenthreads that are still running + LOG.info("(%s) OrchThread Stopping" % self.update_type) self.thread_group_manager.stop() - LOG.info("(%s) OrchThread Stopped" % self.update_type) @staticmethod def get_ks_client(region_name=consts.DEFAULT_REGION_NAME): diff --git a/distributedcloud/dcmanager/orchestrator/service.py b/distributedcloud/dcmanager/orchestrator/service.py index 4c903f57f..abda5757c 100644 --- a/distributedcloud/dcmanager/orchestrator/service.py +++ b/distributedcloud/dcmanager/orchestrator/service.py @@ -21,6 +21,7 @@ from oslo_log import log as logging import oslo_messaging from oslo_service import service +from dccommon.subprocess_cleanup import SubprocessCleanup from dcmanager.common import consts from dcmanager.common import context from dcmanager.common import exceptions @@ -80,19 +81,20 @@ class DCManagerOrchestratorService(service.Service): def _stop_rpc_server(self): # Stop RPC connection to prevent new requests - LOG.debug("Attempting to stop engine service...") + LOG.debug("Attempting to stop RPC service...") if self._rpc_server is not None: try: self._rpc_server.stop() self._rpc_server.wait() self._rpc_server = None - LOG.info('Engine service stopped successfully') + LOG.info('RPC service stopped successfully') except Exception as ex: LOG.error('Failed to stop engine service: %s', six.text_type(ex)) def stop(self): """Stop anything initiated by start""" + SubprocessCleanup.shutdown_cleanup(origin="service") self._stop_rpc_server() if self.TG is not None: self.TG.stop()