2737967430
The change [1] added a syntax issue where it was not possible to use the variable ceph_mgr_lifecycle_days. Analyzing the code, it was possible to observe that the ceph_mgr_lifecycle_days variable is part of the Config class, not of the ServiceMonitor. So to fix the problem, just replace the use of 'self' with CONFIG. Test Plan: PASS: AIO-SX fresh install PASS: Notice that there are no errors in the log /var/log/mgr-restful-plugin.log Closes-Bug: 2023553 [1]: https://review.opendev.org/c/starlingx/integ/+/885881 Change-Id: Icb46f1589057607e24123b69e9ab44994580585a Signed-off-by: Erickson Silva de Oliveira <Erickson.SilvadeOliveira@windriver.com>
1148 lines
43 KiB
Python
1148 lines
43 KiB
Python
#!/usr/bin/python
|
|
#
|
|
# Copyright (c) 2019-2023 Wind River Systems, Inc.
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
|
|
|
|
### BEGIN INIT INFO
|
|
# Provides: ceph/mgr RESTful API plugin
|
|
# Required-Start: $ceph
|
|
# Required-Stop: $ceph
|
|
# Default-Start: 2 3 4 5
|
|
# Default-Stop: 0 1 6
|
|
# Short-Description: Ceph MGR RESTful API plugin
|
|
# Description: Ceph MGR RESTful API plugin
|
|
### END INIT INFO
|
|
|
|
import argparse
|
|
import contextlib
|
|
import errno
|
|
import fcntl
|
|
import inspect
|
|
import json
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import shutil
|
|
import signal
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import daemon
|
|
import psutil
|
|
import requests
|
|
|
|
# 'timeout' command returns exit status 124
|
|
# if command times out (see man page)
|
|
GNU_TIMEOUT_EXPIRED_RETCODE = 124
|
|
|
|
|
|
def psutil_terminate_kill(target, timeout):
|
|
|
|
"""Extend psutil functionality to stop a process.
|
|
|
|
SIGINT is sent to each target then after a grace period SIGKILL
|
|
is sent to the ones that are still running.
|
|
"""
|
|
|
|
if not isinstance(target, list):
|
|
target = [target]
|
|
_, target = psutil.wait_procs(target, timeout=0)
|
|
for action in [lambda p: p.terminate(), lambda p: p.kill()]:
|
|
for proc in target:
|
|
action(proc)
|
|
_, target = psutil.wait_procs(
|
|
target, timeout=timeout)
|
|
|
|
|
|
class Config(object):
|
|
|
|
"""ceph-mgr service wrapper configuration options.
|
|
|
|
In the future we may want to load them from a configuration file
|
|
(for example /etc/ceph/mgr-restful-plugin.conf )
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.log_level = logging.INFO
|
|
self.log_dir = '/var/log'
|
|
|
|
self.ceph_mgr_service = '/usr/bin/ceph-mgr'
|
|
self.ceph_mgr_config = '/etc/ceph/ceph.conf'
|
|
self.ceph_mgr_cluster = 'ceph'
|
|
self.ceph_mgr_rundir = '/var/run/ceph/mgr'
|
|
self.ceph_mgr_confdir = '/var/lib/ceph/mgr'
|
|
self.ceph_mgr_identity = socket.gethostname()
|
|
|
|
self.service_name = 'mgr-restful-plugin'
|
|
self.service_socket = os.path.join(
|
|
self.ceph_mgr_rundir, '{}.socket'.format(self.service_name))
|
|
self.service_lock = os.path.join(
|
|
self.ceph_mgr_rundir, '{}.lock'.format(self.service_name))
|
|
self.service_pid_file = os.path.join(
|
|
'/var/run/ceph', '{}.pid'.format(self.service_name))
|
|
|
|
self.restful_plugin_port = 7999
|
|
|
|
# maximum size of a message received/sent via
|
|
# service monitor control socket
|
|
self.service_socket_bufsize = 1024
|
|
|
|
# maximum time to wait for ceph cli to exit
|
|
self.ceph_cli_timeout_sec = 30
|
|
|
|
# how much time to wait after ceph cli commands fail with timeout
|
|
# before running any other commands
|
|
self.cluster_grace_period_sec = 30
|
|
|
|
# after ceph-mgr is started it goes through an internal initialization
|
|
# phase before; how much time to wait before querying ceph-mgr
|
|
self.ceph_mgr_grace_period_sec = 15
|
|
|
|
# after sending SIGTERM to ceph-mgr how much time to wait before
|
|
# sending SIGKILL (maximum time allowed for ceph-mgr cleanup)
|
|
self.ceph_mgr_kill_delay_sec = 5
|
|
|
|
# if service monitor is running a recovery procedure it reports
|
|
# status OK even if ceph-mgr is currently down. This sets the
|
|
# maximum number of consecutive ceph-mgr failures before reporting
|
|
# status error
|
|
self.ceph_mgr_fail_count_report_error = 3
|
|
|
|
# maximum number of consecutive ceph-mgr failures before
|
|
# stopping mgr-restful-plugin service
|
|
self.ceph_mgr_fail_count_exit = 5
|
|
|
|
# maximum time allowed for ceph-mgr to respond to a REST API request
|
|
self.rest_api_timeout_sec = 15
|
|
|
|
# interval between consecutive REST API requests (ping's). A smaller
|
|
# value here triggers more requests to ceph-mgr restful plugin. A
|
|
# higher value makes recovery slower when services become unavailable
|
|
self.restful_plugin_ping_delay_sec = 3
|
|
|
|
# where to save the self-signed certificate generated by ceph-mgr
|
|
self.restful_plugin_cert_path = os.path.join(
|
|
self.ceph_mgr_rundir, 'restful.crt')
|
|
|
|
# time to wait after enabling restful plugin
|
|
self.restful_plugin_grace_period_sec = 3
|
|
|
|
# after how many REST API ping failures to restart ceph-mgr
|
|
self.ping_fail_count_restart_mgr = 3
|
|
|
|
# after how many REST API ping failures to report status error.
|
|
# Until then service monitor reports status OK just in case
|
|
# restful plugin recovers
|
|
self.ping_fail_count_report_error = 5
|
|
|
|
# Number of days for ceph-mgr to be restarted to avoid possible
|
|
# memory overflow due to memory growth (-1 to disable)
|
|
self.ceph_mgr_lifecycle_days = 7
|
|
|
|
@staticmethod
|
|
def load():
|
|
return Config()
|
|
|
|
|
|
def setup_logging(name=None, cleanup_handlers=False):
|
|
if not name:
|
|
name = CONFIG.service_name
|
|
log = logging.getLogger(name)
|
|
log.setLevel(CONFIG.log_level)
|
|
if cleanup_handlers:
|
|
try:
|
|
for handler in log.handlers:
|
|
if isinstance(handler, logging.StreamHandler):
|
|
handler.flush()
|
|
if isinstance(handler, logging.FileHandler):
|
|
handler.close()
|
|
log.handlers = []
|
|
except Exception:
|
|
pass
|
|
elif log.handlers:
|
|
return log
|
|
handler = logging.FileHandler(
|
|
os.path.join(CONFIG.log_dir,
|
|
'{}.log'.format(CONFIG.service_name)))
|
|
handler.setFormatter(
|
|
logging.Formatter('%(asctime)s %(process)s %(levelname)s %(name)s %(message)s'))
|
|
log.addHandler(handler)
|
|
return log
|
|
|
|
|
|
CONFIG = Config.load()
|
|
LOG = setup_logging(name='init-wrapper')
|
|
|
|
|
|
class ServiceException(Exception):
|
|
|
|
"""Generic mgr-restful-plugin service exception.
|
|
|
|
Build exception string based on static (per exception class)
|
|
string plus args, keyword args passed to exception constructor.
|
|
"""
|
|
|
|
message = ""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
if "message" not in kwargs:
|
|
try:
|
|
message = self.message.format(*args, **kwargs)
|
|
except Exception: # noqa
|
|
message = '{}, args:{}, kwargs: {}'.format(
|
|
self.message, args, kwargs)
|
|
else:
|
|
message = kwargs["message"]
|
|
super(ServiceException, self).__init__(message)
|
|
|
|
|
|
class ServiceAlreadyStarted(ServiceException):
|
|
message = ('Service monitor already started')
|
|
|
|
|
|
class ServiceLockFailed(ServiceException):
|
|
message = ('Unable to lock service monitor: '
|
|
'reason={reason}')
|
|
|
|
|
|
class ServiceNoSocket(ServiceException):
|
|
message = ('Unable to create service monitor socket: '
|
|
'reason={reason}')
|
|
|
|
|
|
class ServiceSocketBindFailed(ServiceException):
|
|
message = ('Failed to bind service monitor socket: '
|
|
'path={path}, reason={reason}')
|
|
|
|
|
|
class ServiceNoPidFile(ServiceException):
|
|
message = ('Failed to update pid file: '
|
|
'path={path}, reason={reason}')
|
|
|
|
|
|
class CommandFailed(ServiceException):
|
|
message = ('Command failed: command={command}, '
|
|
'reason={reason}, out={out}')
|
|
|
|
|
|
class CommandTimeout(ServiceException):
|
|
message = ('Command timeout: command={command}, '
|
|
'timeout={timeout}')
|
|
|
|
|
|
class CephMgrStartFailed(ServiceException):
|
|
message = ('Failed to start ceph_mgr: '
|
|
'reason={reason}')
|
|
|
|
|
|
class CephRestfulPluginFailed(ServiceException):
|
|
message = ('Failed to start restful plugin: '
|
|
'reason={reason}')
|
|
|
|
|
|
class RestApiPingFailed(ServiceException):
|
|
message = ('REST API ping failed: '
|
|
'reason={reason}')
|
|
|
|
|
|
class ServiceMonitor(object):
|
|
|
|
"""Configure and monitor ceph-mgr and restful plugin (Ceph REST API)
|
|
|
|
1. process init script service requests: status, stop. Requests are
|
|
received via a control socket. Stop has priority over whatever
|
|
the monitor is doing currently. Any ceph command that may be running
|
|
is terminated/killed. Note that while ceph-mgr and restful plugin
|
|
configuration is in progress ServiceMonitor reports status OK to
|
|
avoid being restarted by SM.
|
|
|
|
2. configure ceph-mgr and mgr restful plugin: authentication, REST API
|
|
service port, self signed certificate. This runs as a separate
|
|
process so it can be stopped when init script requests it.
|
|
|
|
3. periodically check (ping) REST API responds to HTTPS requests.
|
|
Recovery actions are taken if REST API fails to respond: restart
|
|
ceph-mgr, wait for cluster to become available again.
|
|
"""
|
|
|
|
def __init__(self):
|
|
# process running configuration & REST API ping loop
|
|
self.monitor = None
|
|
|
|
# command socket used by init script
|
|
self.command = None
|
|
|
|
# ceph-mgr process
|
|
self.ceph_mgr = None
|
|
|
|
# date the ceph-mgr process was started
|
|
self.ceph_mgr_start_date = None
|
|
|
|
# consecutive ceph-mgr/restful-plugin start failures. Service monitor
|
|
# reports failure after CONFIG.ceph_mgr_max_failure_count
|
|
self.ceph_mgr_failure_count = 0
|
|
|
|
# consecutive REST API ping failures. ceph-mgr service is restarted
|
|
# after CONFIG.ping_fail_count_restart_mgr threshold is exceeded
|
|
self.ping_failure_count = 0
|
|
|
|
# REST API url reported by ceph-mgr after enabling restful plugin
|
|
self.restful_plugin_url = ''
|
|
|
|
# REST API self signed certificate generated by restful plugin
|
|
self.certificate = ''
|
|
|
|
def run(self):
|
|
self.disable_certificate_check()
|
|
with self.service_lock(), self.service_socket(), \
|
|
self.service_pid_file():
|
|
self.start_monitor()
|
|
self.server_loop()
|
|
|
|
def disable_certificate_check(self):
|
|
# ceph-mgr restful plugin is configured with a self-signed
|
|
# certificate. Certificate host is hard-coded to "ceph-restful"
|
|
# which causes HTTPS requests to fail because they don't
|
|
# match current host name ("controller-..."). Disable HTTPS
|
|
# certificates check in urllib3
|
|
LOG.warning('Disable urllib3 certifcates check')
|
|
requests.packages.urllib3.disable_warnings()
|
|
|
|
def server_loop(self):
|
|
self.command.listen(2)
|
|
while True:
|
|
try:
|
|
client, _ = self.command.accept()
|
|
request = client.recv(CONFIG.service_socket_bufsize)
|
|
LOG.debug('Monitor command socket: request=%s', str(request))
|
|
cmd = request.split(b' ')
|
|
cmd, args = cmd[0], cmd[1:]
|
|
if cmd == b'status':
|
|
self.send_response(client, request, self.status())
|
|
elif cmd == b'stop':
|
|
self.stop()
|
|
self.send_response(client, request, 'OK')
|
|
break
|
|
elif cmd == b'restful-url':
|
|
try:
|
|
self.restful_plugin_url = args[0]
|
|
self.send_response(client, request, 'OK')
|
|
except IndexError:
|
|
LOG.warning('Failed to update restful plugin url: '
|
|
'args=%s', str(args))
|
|
self.send_response(client, request, 'ERR')
|
|
elif cmd == b'certificate':
|
|
try:
|
|
self.certificate = args[0] if args else ''
|
|
self.send_response(client, request, 'OK')
|
|
except IndexError:
|
|
LOG.warning('Failed to update certificate path: '
|
|
'args=%s', str(args))
|
|
self.send_response(client, request, 'ERR')
|
|
elif cmd == b'ceph-mgr-failures':
|
|
try:
|
|
self.ceph_mgr_failure_count = int(args[0])
|
|
self.send_response(client, request, 'OK')
|
|
if self.ceph_mgr_failure_count >= CONFIG.ceph_mgr_fail_count_exit:
|
|
self.stop()
|
|
break
|
|
except (IndexError, ValueError):
|
|
LOG.warning('Failed to update ceph-mgr failures: '
|
|
'args=%s', str(args))
|
|
self.send_response(client, request, 'ERR')
|
|
elif cmd == b'ping-failures':
|
|
try:
|
|
self.ping_failure_count = int(args[0])
|
|
self.send_response(client, request, 'OK')
|
|
except (IndexError, ValueError):
|
|
LOG.warning('Failed to update ping failures: '
|
|
'args=%s', str(args))
|
|
self.send_response(client, request, 'ERR')
|
|
except Exception as err:
|
|
LOG.exception(err)
|
|
|
|
@staticmethod
|
|
def send_response(client, request, response):
|
|
try:
|
|
client.send(response.encode('utf-8'))
|
|
except socket.error as err:
|
|
LOG.warning('Failed to send response back. '
|
|
'request=%s, response=%s, reason=%s',
|
|
request, response, err)
|
|
|
|
def status(self):
|
|
if not self.restful_plugin_url:
|
|
if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
|
|
and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
|
|
LOG.debug('Monitor is starting services. Report status OK')
|
|
return 'OK'
|
|
LOG.debug('Too many failures: '
|
|
'ceph_mgr=%d < %d, ping=%d < %d. '
|
|
'Report status ERR',
|
|
self.ceph_mgr_failure_count,
|
|
CONFIG.ceph_mgr_fail_count_report_error,
|
|
self.ping_failure_count,
|
|
CONFIG.ping_fail_count_report_error)
|
|
return 'ERR.down'
|
|
try:
|
|
self.restful_plugin_ping()
|
|
LOG.debug('Restful plugin ping successful. Report status OK')
|
|
return 'OK'
|
|
except (CommandFailed, RestApiPingFailed):
|
|
if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
|
|
and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
|
|
LOG.info('Restful plugin does not respond but failure '
|
|
'count is within acceptable limits: '
|
|
' ceph_mgr=%d < %d, ping=%d < %d. '
|
|
'Report status OK',
|
|
self.ceph_mgr_failure_count,
|
|
CONFIG.ceph_mgr_fail_count_report_error,
|
|
self.ping_failure_count,
|
|
CONFIG.ping_fail_count_report_error)
|
|
return 'OK'
|
|
LOG.debug('Restful does not respond (ping failure count %d). '
|
|
'Report status ERR', self.ping_failure_count)
|
|
return 'ERR.ping_failed'
|
|
|
|
def stop(self):
|
|
if not self.monitor:
|
|
return
|
|
LOG.info('Stop monitor with SIGTERM to process group %d',
|
|
self.monitor.pid)
|
|
try:
|
|
os.killpg(self.monitor.pid, signal.SIGTERM)
|
|
except OSError as err:
|
|
LOG.info('Stop monitor failed: reason=%s', str(err))
|
|
return
|
|
time.sleep(CONFIG.ceph_mgr_kill_delay_sec)
|
|
LOG.info('Stop monitor with SIGKILL to process group %d',
|
|
self.monitor.pid)
|
|
try:
|
|
os.killpg(self.monitor.pid, signal.SIGKILL)
|
|
os.waitpid(self.monitor.pid, 0)
|
|
except OSError as err:
|
|
LOG.info('Stop monitor failed: reason=%s', str(err))
|
|
return
|
|
LOG.info('Monitor stopped: pid=%d', self.monitor.pid)
|
|
|
|
@contextlib.contextmanager
|
|
def service_lock(self):
|
|
LOG.info('Take service lock: path=%s', CONFIG.service_lock)
|
|
try:
|
|
os.makedirs(os.path.dirname(CONFIG.service_lock))
|
|
except OSError:
|
|
pass
|
|
lock_file = open(CONFIG.service_lock, 'w')
|
|
try:
|
|
fcntl.flock(lock_file.fileno(),
|
|
fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except (IOError, OSError) as err:
|
|
if err.errno == errno.EAGAIN:
|
|
raise ServiceAlreadyStarted()
|
|
else:
|
|
raise ServiceLockFailed(reason=str(err))
|
|
# even if we have the lock here there might be another service manager
|
|
# running whose CONFIG.ceph_mgr_rundir was removed before starting
|
|
# this instance. Make sure there is only one service manager running
|
|
self.stop_other_service_managers()
|
|
try:
|
|
yield
|
|
finally:
|
|
os.unlink(CONFIG.service_lock)
|
|
lock_file.close()
|
|
LOG.info('Release service lock: path=%s', CONFIG.service_lock)
|
|
|
|
def stop_other_service_managers(self):
|
|
service = os.path.join('/etc/init.d', CONFIG.service_name)
|
|
for p in psutil.process_iter():
|
|
if p.cmdline()[:2] not in [[service], ['/usr/bin/python', service]]:
|
|
continue
|
|
if p.pid == os.getpid():
|
|
continue
|
|
p.kill()
|
|
|
|
@contextlib.contextmanager
|
|
def service_socket(self):
|
|
LOG.info('Create service socket')
|
|
try:
|
|
self.command = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
|
|
except socket.error as err:
|
|
raise ServiceNoSocket(reason=str(err))
|
|
LOG.info('Remove existing socket files')
|
|
try:
|
|
os.unlink(CONFIG.service_socket)
|
|
except OSError:
|
|
pass
|
|
LOG.info('Bind service socket: path=%s', CONFIG.service_socket)
|
|
try:
|
|
self.command.bind(CONFIG.service_socket)
|
|
except socket.error as err:
|
|
raise ServiceSocketBindFailed(
|
|
path=CONFIG.service_socket, reason=str(err))
|
|
try:
|
|
yield
|
|
finally:
|
|
LOG.info('Close service socket and remove file: path=%s',
|
|
CONFIG.service_socket)
|
|
self.command.close()
|
|
os.unlink(CONFIG.service_socket)
|
|
|
|
@contextlib.contextmanager
|
|
def service_pid_file(self):
|
|
LOG.info('Update service pid file: path=%s', CONFIG.service_pid_file)
|
|
try:
|
|
pid_file = open(CONFIG.service_pid_file, 'w')
|
|
pid_file.write(str(os.getpid()))
|
|
pid_file.flush()
|
|
except OSError as err:
|
|
raise ServiceNoPidFile(
|
|
path=CONFIG.service_pid_file, reason=str(err))
|
|
try:
|
|
yield
|
|
finally:
|
|
LOG.info('Remove service pid file: path=%s',
|
|
CONFIG.service_pid_file)
|
|
try:
|
|
os.unlink(CONFIG.service_pid_file)
|
|
except OSError:
|
|
pass
|
|
|
|
def start_monitor(self):
|
|
LOG.info('Start monitor loop')
|
|
self.monitor = multiprocessing.Process(target=self.monitor_loop)
|
|
self.monitor.start()
|
|
|
|
def stop_unmanaged_ceph_mgr(self):
|
|
LOG.info('Stop unmanaged running ceph-mgr processes')
|
|
service_name = os.path.basename(CONFIG.ceph_mgr_service)
|
|
if self.ceph_mgr:
|
|
psutil_terminate_kill(
|
|
[proc for proc in psutil.process_iter()
|
|
if (proc.name() == service_name
|
|
and proc.pid != self.ceph_mgr.pid)],
|
|
CONFIG.ceph_mgr_kill_delay_sec)
|
|
else:
|
|
psutil_terminate_kill(
|
|
[proc for proc in psutil.process_iter()
|
|
if proc.name() == service_name],
|
|
CONFIG.ceph_mgr_kill_delay_sec)
|
|
|
|
def monitor_loop(self):
|
|
|
|
"""Bring up and monitor ceph-mgr restful plugin.
|
|
|
|
Steps:
|
|
- wait for Ceph cluster to become available
|
|
- configure and start ceph-mgr
|
|
- configure and enable restful plugin
|
|
- send periodic requests to REST API
|
|
- recover from failures
|
|
|
|
Note: because this runs as a separate process it
|
|
must send status updates to service monitor
|
|
via control socket for: ping_failure_count,
|
|
restful_plugin_url and certificate.
|
|
"""
|
|
|
|
# Promote to process group leader so parent (service monitor)
|
|
# can kill the monitor plus processes spawned by it. Otherwise
|
|
# children of monitor_loop() will keep running in background and
|
|
# will be reaped by init when they finish but by then they might
|
|
# interfere with any new service instance.
|
|
os.setpgrp()
|
|
|
|
# Ignoring SIGTERM here ensures process group is not reused by
|
|
# the time parent (service monitor) issues the final SIGKILL.
|
|
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
|
|
|
while True:
|
|
try:
|
|
# steps to configure/start ceph-mgr and restful plugin
|
|
self.ceph_fsid_get()
|
|
self.ceph_mgr_auth_create()
|
|
self.restful_plugin_set_server_port()
|
|
self.restful_plugin_create_certificate()
|
|
self.ceph_mgr_start()
|
|
self.restful_plugin_enable()
|
|
self.restful_plugin_create_admin_key()
|
|
self.restful_plugin_get_url()
|
|
self.restful_plugin_get_certificate()
|
|
|
|
# REST API should be available now
|
|
# start making periodic requests (ping)
|
|
while True:
|
|
if CONFIG.ceph_mgr_lifecycle_days != -1 \
|
|
and self.ceph_mgr_uptime() >= CONFIG.ceph_mgr_lifecycle_days:
|
|
self.ceph_mgr_start_date = None
|
|
LOG.info("Restarting ceph-mgr to control RSS memory growth")
|
|
self.ceph_mgr_restart()
|
|
|
|
try:
|
|
self.restful_plugin_ping()
|
|
self.ping_failure_count = 0
|
|
self.request_update_ping_failures(
|
|
self.ping_failure_count)
|
|
self.ceph_mgr_failure_count = 0
|
|
self.request_update_ceph_mgr_failures(
|
|
self.ceph_mgr_failure_count)
|
|
time.sleep(CONFIG.restful_plugin_ping_delay_sec)
|
|
continue
|
|
except RestApiPingFailed as err:
|
|
LOG.warning(str(err))
|
|
|
|
LOG.info('REST API ping failure count=%d',
|
|
self.ping_failure_count)
|
|
self.ping_failure_count += 1
|
|
self.request_update_ping_failures(
|
|
self.ping_failure_count)
|
|
|
|
# maybe request failed because ceph-mgr is not running
|
|
if not self.ceph_mgr_is_running():
|
|
self.ceph_mgr_failure_count += 1
|
|
self.request_update_ceph_mgr_failures(
|
|
self.ceph_mgr_failure_count)
|
|
self.ceph_mgr_start()
|
|
time.sleep(CONFIG.ceph_mgr_grace_period_sec)
|
|
continue
|
|
|
|
# maybe request failed because cluster health is not ok
|
|
if not self.ceph_fsid_get():
|
|
LOG.info('Unable to get cluster fsid. '
|
|
'Sleep for a while')
|
|
time.sleep(CONFIG.cluster_grace_period_sec)
|
|
break
|
|
|
|
# too many failures? Restart ceph-mgr and go again
|
|
# through configuration steps
|
|
if (self.ping_failure_count
|
|
% CONFIG.ping_fail_count_restart_mgr == 0):
|
|
LOG.info('Too many consecutive REST API failures. '
|
|
'Restart ceph-mgr. Update service '
|
|
'url and certificate')
|
|
self.ceph_mgr_stop()
|
|
self.restful_plugin_url = ''
|
|
self.request_update_plugin_url(self.restful_plugin_url)
|
|
self.certificate = ''
|
|
self.request_update_certificate(self.certificate)
|
|
break
|
|
|
|
time.sleep(CONFIG.restful_plugin_ping_delay_sec)
|
|
|
|
except CommandFailed as err:
|
|
LOG.warning(str(err))
|
|
time.sleep(CONFIG.cluster_grace_period_sec)
|
|
except CommandTimeout as err:
|
|
LOG.warning(str(err))
|
|
except (CephMgrStartFailed, CephRestfulPluginFailed) as err:
|
|
LOG.warning(str(err))
|
|
self.ceph_mgr_failure_count += 1
|
|
self.request_update_ceph_mgr_failures(
|
|
self.ceph_mgr_failure_count)
|
|
time.sleep(CONFIG.ceph_mgr_grace_period_sec)
|
|
except Exception as err:
|
|
LOG.exception(err)
|
|
time.sleep(CONFIG.cluster_grace_period_sec)
|
|
|
|
@staticmethod
|
|
def run_with_timeout(command, timeout, stderr=subprocess.STDOUT):
|
|
try:
|
|
LOG.info('Run command: %s', ' '.join(command))
|
|
return subprocess.check_output(
|
|
['/usr/bin/timeout', str(timeout)] + command,
|
|
stdin=subprocess.PIPE,
|
|
stderr=stderr, shell=False,
|
|
universal_newlines=True).strip()
|
|
except subprocess.CalledProcessError as err:
|
|
if err.returncode == GNU_TIMEOUT_EXPIRED_RETCODE:
|
|
raise CommandTimeout(command=err.cmd, timeout=timeout)
|
|
raise CommandFailed(command=err.cmd, reason=str(err),
|
|
out=err.output)
|
|
|
|
def ceph_fsid_get(self):
|
|
return self.run_with_timeout(['/usr/bin/ceph', 'fsid'],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
|
|
def ceph_mgr_has_auth(self):
|
|
path = '{}/ceph-{}'.format(
|
|
CONFIG.ceph_mgr_confdir, CONFIG.ceph_mgr_identity)
|
|
try:
|
|
os.makedirs(path)
|
|
except OSError as err:
|
|
pass
|
|
try:
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'auth', 'get',
|
|
'mgr.{}'.format(CONFIG.ceph_mgr_identity),
|
|
'-o', '{}/keyring'.format(path)],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
return True
|
|
except CommandFailed as err:
|
|
if 'ENOENT' in str(err):
|
|
return False
|
|
raise
|
|
|
|
def ceph_mgr_auth_create(self):
|
|
if self.ceph_mgr_has_auth():
|
|
return
|
|
LOG.info('Create ceph-mgr authentication')
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'auth', 'get-or-create',
|
|
'mgr.{}'.format(CONFIG.ceph_mgr_identity),
|
|
'mon', 'allow *', 'osd', 'allow *'],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
|
|
def ceph_mgr_is_running(self):
|
|
if not self.ceph_mgr:
|
|
return None
|
|
try:
|
|
self.ceph_mgr.wait(timeout=0)
|
|
except psutil.TimeoutExpired:
|
|
return True
|
|
return False
|
|
|
|
def ceph_mgr_start(self):
|
|
if self.ceph_mgr_is_running():
|
|
return
|
|
self.stop_unmanaged_ceph_mgr()
|
|
LOG.info('Start ceph-mgr daemon')
|
|
try:
|
|
with open(os.devnull, 'wb') as null:
|
|
self.ceph_mgr = psutil.Popen(
|
|
[CONFIG.ceph_mgr_service,
|
|
'--cluster', CONFIG.ceph_mgr_cluster,
|
|
'--conf', CONFIG.ceph_mgr_config,
|
|
'--id', CONFIG.ceph_mgr_identity,
|
|
'-f'],
|
|
close_fds=True,
|
|
stdout=null,
|
|
stderr=null,
|
|
shell=False)
|
|
self.ceph_mgr_start_date = datetime.now()
|
|
except (OSError, ValueError) as err:
|
|
raise CephMgrStartFailed(reason=str(err))
|
|
time.sleep(CONFIG.ceph_mgr_grace_period_sec)
|
|
|
|
def ceph_mgr_stop(self):
|
|
if not self.ceph_mgr:
|
|
return
|
|
LOG.info('Stop ceph-mgr')
|
|
psutil_terminate_kill(self.ceph_mgr, CONFIG.ceph_mgr_kill_delay_sec)
|
|
|
|
def ceph_mgr_restart(self):
|
|
self.ceph_mgr_stop()
|
|
self.ceph_mgr_start()
|
|
|
|
def ceph_mgr_uptime(self):
|
|
if not self.ceph_mgr_start_date:
|
|
return 0
|
|
return (datetime.now() - self.ceph_mgr_start_date).days
|
|
|
|
def restful_plugin_has_server_port(self):
|
|
try:
|
|
with open(os.devnull, 'wb') as null:
|
|
out = self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'get',
|
|
'config/mgr/mgr/restful/server_port'],
|
|
CONFIG.ceph_cli_timeout_sec, stderr=null)
|
|
if out == str(CONFIG.restful_plugin_port):
|
|
return True
|
|
LOG.warning('Restful plugin port mismatch: '
|
|
'current=%d, expected=%d', out,
|
|
CONFIG.restful_plugin_port)
|
|
except CommandFailed as err:
|
|
LOG.warning('Failed to get restful plugin port: '
|
|
'reason=%s', str(err))
|
|
return False
|
|
|
|
def restful_plugin_set_server_port(self):
|
|
if self.restful_plugin_has_server_port():
|
|
return
|
|
LOG.info('Set restful plugin port=%d', CONFIG.restful_plugin_port)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'set',
|
|
'config/mgr/mgr/restful/server_port', str(CONFIG.restful_plugin_port)],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
|
|
def restful_plugin_has_admin_key(self):
|
|
try:
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'get',
|
|
'mgr/restful/keys/admin'],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
return True
|
|
except CommandFailed:
|
|
pass
|
|
return False
|
|
|
|
def restful_plugin_create_admin_key(self):
|
|
if self.restful_plugin_has_admin_key():
|
|
return
|
|
LOG.info('Create restful plugin admin key')
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'restful',
|
|
'create-key', 'admin'],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
|
|
def restful_plugin_has_certificate(self):
|
|
try:
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'get',
|
|
'config/mgr/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'get',
|
|
'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'get',
|
|
'config/mgr/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'get',
|
|
'/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
return True
|
|
except CommandFailed:
|
|
pass
|
|
return False
|
|
|
|
def restful_plugin_create_certificate(self):
|
|
if self.restful_plugin_has_certificate():
|
|
return
|
|
LOG.info('Create restful plugin self signed certificate')
|
|
path = tempfile.mkdtemp()
|
|
try:
|
|
try:
|
|
with tempfile.NamedTemporaryFile() as restful_cnf:
|
|
restful_cnf.write((
|
|
'[req]\n'
|
|
'req_extensions = v3_ca\n'
|
|
'distinguished_name = req_distinguished_name\n'
|
|
'[v3_ca]\n'
|
|
'subjectAltName=DNS:{}\n'
|
|
'basicConstraints = CA:true\n'
|
|
'[ req_distinguished_name ]\n'
|
|
'0.organizationName = IT\n'
|
|
'commonName = ceph-restful\n').format(
|
|
CONFIG.ceph_mgr_identity).encode('utf-8'))
|
|
restful_cnf.flush()
|
|
subprocess.check_call([
|
|
'/usr/bin/openssl', 'req', '-new', '-nodes', '-x509',
|
|
'-subj', '/O=IT/CN=' + CONFIG.ceph_mgr_identity,
|
|
'-days', '3650',
|
|
'-config', restful_cnf.name,
|
|
'-out', os.path.join(path, 'crt'),
|
|
'-keyout', os.path.join(path, 'key'),
|
|
'-extensions', 'v3_ca'])
|
|
except subprocess.CalledProcessError as err:
|
|
raise CommandFailed(
|
|
command=' '.join(err.cmd),
|
|
reason='failed to generate self-signed certificate: {}'.format(str(err)),
|
|
out=err.output)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'set',
|
|
'config/mgr/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
|
|
'-i', os.path.join(path, 'crt')],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'set',
|
|
'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
|
|
'-i', os.path.join(path, 'crt')],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'set',
|
|
'config/mgr/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
|
|
'-i', os.path.join(path, 'key')],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'set',
|
|
'mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
|
|
'-i', os.path.join(path, 'key')],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
finally:
|
|
shutil.rmtree(path)
|
|
|
|
def restful_plugin_is_enabled(self):
|
|
command = ['/usr/bin/ceph', 'mgr', 'module', 'ls',
|
|
'--format', 'json']
|
|
with open(os.devnull, 'wb') as null:
|
|
out = self.run_with_timeout(
|
|
command, CONFIG.ceph_cli_timeout_sec, stderr=null)
|
|
try:
|
|
if 'restful' in json.loads(out)['enabled_modules']:
|
|
return True
|
|
except ValueError as err:
|
|
raise CommandFailed(
|
|
command=' '.join(command),
|
|
reason='unable to decode json: {}'.format(err), out=out)
|
|
except KeyError as err:
|
|
raise CommandFailed(
|
|
command=' '.join(command),
|
|
reason='missing expected key: {}'.format(err), out=out)
|
|
return False
|
|
|
|
def restful_plugin_enable(self):
|
|
if not self.restful_plugin_is_enabled():
|
|
LOG.info('Enable restful plugin')
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'mgr',
|
|
'module', 'enable', 'restful'],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
time.sleep(CONFIG.restful_plugin_grace_period_sec)
|
|
|
|
def restful_plugin_get_url(self):
|
|
command = ['/usr/bin/ceph', 'mgr', 'services',
|
|
'--format', 'json']
|
|
with open(os.devnull, 'wb') as null:
|
|
out = self.run_with_timeout(
|
|
command, CONFIG.ceph_cli_timeout_sec, stderr=null)
|
|
try:
|
|
self.restful_plugin_url = json.loads(out)['restful']
|
|
except ValueError as err:
|
|
raise CephRestfulPluginFailed(
|
|
reason='unable to decode json: {} output={}'.format(err, out))
|
|
except KeyError as err:
|
|
raise CephRestfulPluginFailed(
|
|
reason='missing expected key: {} in ouput={}'.format(err, out))
|
|
self.request_update_plugin_url(self.restful_plugin_url)
|
|
|
|
def restful_plugin_get_certificate(self):
|
|
command = ['/usr/bin/ceph', 'config-key', 'get',
|
|
'config/mgr/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)]
|
|
with open(os.devnull, 'wb') as null:
|
|
certificate = self.run_with_timeout(
|
|
command, CONFIG.ceph_cli_timeout_sec, stderr=null)
|
|
with open(CONFIG.restful_plugin_cert_path, 'w') as cert_file:
|
|
cert_file.write(certificate)
|
|
self.certificate = CONFIG.restful_plugin_cert_path
|
|
self.request_update_certificate(
|
|
self.certificate)
|
|
|
|
def restful_plugin_ping(self):
|
|
if not self.restful_plugin_url:
|
|
raise RestApiPingFailed(reason='missing service url')
|
|
if not self.certificate:
|
|
raise RestApiPingFailed(reason='missing certificate')
|
|
LOG.debug('Ping restful plugin: url=%d', self.restful_plugin_url)
|
|
try:
|
|
response = requests.request(
|
|
'GET', self.restful_plugin_url, verify=False,
|
|
timeout=CONFIG.rest_api_timeout_sec)
|
|
if not response.ok:
|
|
raise RestApiPingFailed(
|
|
reason='response not ok ({})'.format(response))
|
|
LOG.debug('Ping restful plugin OK')
|
|
except (requests.ConnectionError,
|
|
requests.Timeout,
|
|
requests.HTTPError) as err:
|
|
raise RestApiPingFailed(reason=str(err))
|
|
|
|
@staticmethod
|
|
def _make_client_socket():
|
|
sock = socket.socket(
|
|
socket.AF_UNIX, socket.SOCK_SEQPACKET)
|
|
sock.settimeout(2 * CONFIG.rest_api_timeout_sec)
|
|
sock.connect(CONFIG.service_socket)
|
|
return sock
|
|
|
|
@staticmethod
|
|
def request_status():
|
|
try:
|
|
with contextlib.closing(
|
|
ServiceMonitor._make_client_socket()) as sock:
|
|
sock.send(b'status')
|
|
status = sock.recv(CONFIG.service_socket_bufsize)
|
|
LOG.debug('Status %s', status)
|
|
return status.startswith(b'OK')
|
|
except socket.error as err:
|
|
LOG.error('Status error: reason=%s', err)
|
|
return False
|
|
|
|
@staticmethod
|
|
def request_stop():
|
|
try:
|
|
with contextlib.closing(
|
|
ServiceMonitor._make_client_socket()) as sock:
|
|
sock.send(b'stop')
|
|
response = sock.recv(CONFIG.service_socket_bufsize)
|
|
LOG.debug('Stop response: %s', response)
|
|
return True
|
|
except socket.error as err:
|
|
LOG.error('Stop error: reason=%s', err)
|
|
return False
|
|
|
|
@staticmethod
|
|
def request_update_ceph_mgr_failures(count):
|
|
try:
|
|
with contextlib.closing(
|
|
ServiceMonitor._make_client_socket()) as sock:
|
|
sock.send('ceph-mgr-failures {}'.format(count).encode('utf-8'))
|
|
sock.recv(CONFIG.service_socket_bufsize)
|
|
return True
|
|
except socket.error as err:
|
|
LOG.error('Stop error: reason=%s', err)
|
|
return False
|
|
|
|
@staticmethod
|
|
def request_update_ping_failures(count):
|
|
try:
|
|
with contextlib.closing(
|
|
ServiceMonitor._make_client_socket()) as sock:
|
|
sock.send('ping-failures {}'.format(count).encode('utf-8'))
|
|
sock.recv(CONFIG.service_socket_bufsize)
|
|
return True
|
|
except socket.error as err:
|
|
LOG.error('Stop error: reason=%s', err)
|
|
return False
|
|
|
|
@staticmethod
|
|
def request_update_plugin_url(url):
|
|
try:
|
|
with contextlib.closing(
|
|
ServiceMonitor._make_client_socket()) as sock:
|
|
sock.send('restful-url {}'.format(url).encode('utf-8'))
|
|
sock.recv(CONFIG.service_socket_bufsize)
|
|
return True
|
|
except socket.error as err:
|
|
LOG.error('Stop error: reason=%s', err)
|
|
return False
|
|
|
|
@staticmethod
|
|
def request_update_certificate(path):
|
|
try:
|
|
with contextlib.closing(
|
|
ServiceMonitor._make_client_socket()) as sock:
|
|
sock.send('certificate {}'.format(path).encode('utf-8'))
|
|
sock.recv(CONFIG.service_socket_bufsize)
|
|
return True
|
|
except socket.error as err:
|
|
LOG.error('Stop error: reason=%s', err)
|
|
return False
|
|
|
|
|
|
class InitWrapper(object):
|
|
|
|
"""Handle System V init script actions: start, stop, restart, etc. """
|
|
|
|
def __init__(self):
|
|
|
|
"""Dispatch command line action to the corresponding function.
|
|
|
|
Candidate action functions are all class methods except ones
|
|
that start with an underscore.
|
|
"""
|
|
|
|
parser = argparse.ArgumentParser()
|
|
actions = [m[0]
|
|
for m in inspect.getmembers(self)
|
|
if (inspect.ismethod(m[1])
|
|
and not m[0].startswith('_'))]
|
|
parser.add_argument(
|
|
'action',
|
|
choices=actions)
|
|
self.args = parser.parse_args()
|
|
getattr(self, self.args.action)()
|
|
|
|
def start(self):
|
|
|
|
"""Start ServiceMonitor as a daemon unless one is already running.
|
|
|
|
Use a pipe to report monitor status back to this process.
|
|
"""
|
|
|
|
pipe = os.pipe()
|
|
child = os.fork()
|
|
if child == 0:
|
|
os.close(pipe[0])
|
|
with daemon.DaemonContext(files_preserve=[pipe[1]]):
|
|
# prevent duplication of messages in log
|
|
global LOG
|
|
LOG = setup_logging(cleanup_handlers=True)
|
|
try:
|
|
monitor = ServiceMonitor()
|
|
status = b'OK'
|
|
except ServiceAlreadyStarted:
|
|
os.write(pipe[1], b'OK')
|
|
os.close(pipe[1])
|
|
return
|
|
except Exception as err:
|
|
status = str(err)
|
|
os.write(pipe[1], status)
|
|
os.close(pipe[1])
|
|
if status == b'OK':
|
|
try:
|
|
monitor.run()
|
|
except ServiceException as err:
|
|
LOG.warning(str(err))
|
|
except Exception as err:
|
|
LOG.exception('Service monitor error: reason=%s', err)
|
|
else:
|
|
os.close(pipe[1])
|
|
try:
|
|
status = os.read(pipe[0], CONFIG.service_socket_bufsize)
|
|
if status == b'OK':
|
|
sys.exit(0)
|
|
else:
|
|
LOG.warning('Service monitor failed to start: '
|
|
'status=%s', status)
|
|
except IOError as err:
|
|
LOG.warning('Failed to read monitor status: reason=%s', err)
|
|
os.close(pipe[0])
|
|
os.waitpid(child, 0)
|
|
sys.exit(1)
|
|
|
|
def stop(self):
|
|
|
|
"""Tell ServiceMonitor daemon to stop running.
|
|
|
|
In case request fails stop ServiceMonitor and ceph_mgr proecsses
|
|
using SIGTERM followed by SIGKILL.
|
|
"""
|
|
|
|
result = ServiceMonitor.request_stop()
|
|
if not result:
|
|
ceph_mgr = os.path.basename(CONFIG.ceph_mgr_service)
|
|
procs = []
|
|
for proc in psutil.process_iter():
|
|
name = proc.name()
|
|
if name == CONFIG.service_name:
|
|
procs.append(proc)
|
|
if name == ceph_mgr:
|
|
procs.append(proc)
|
|
psutil_terminate_kill(procs, CONFIG.ceph_mgr_kill_delay_sec)
|
|
|
|
def restart(self):
|
|
self.stop()
|
|
self.start()
|
|
|
|
def force_reload(self):
|
|
self.stop()
|
|
self.start()
|
|
|
|
def reload(self):
|
|
self.stop()
|
|
self.start()
|
|
|
|
def status(self):
|
|
|
|
"""Report status from ServiceMonitor.
|
|
|
|
We don't just try to access REST API here because ServiceMonitor may
|
|
be in the process of starting/configuring ceph-mgr and restful
|
|
plugin in which case we report OK to avoid being restarted by SM.
|
|
"""
|
|
|
|
status = ServiceMonitor.request_status()
|
|
sys.exit(0 if status is True else 1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
InitWrapper()
|