46d78550bd
ceph-disk is used by stx puppet manifest but it's replaced by ceph-volume in ceph 14.1+, and there might be other incompatible issues, so add version 13.2.2 to align with stx 3.0, the version 14.1 is kept so it may be used by future stx release. This includes the following changes: - Add recipe and patches for version 13.2.2 - Set preferred version to 13.2.2 - Rename ceph_%.bbappend to ceph_14.1.0.bbappend - Update the conf file and scripts from stx 2.0 to stx 3.0 - Remove the useless files in stx-integ fix #483 fix #497 Signed-off-by: Jackie Huang <jackie.huang@windriver.com> Signed-off-by: Babak Sarashki <Babak.SarAshki@windriver.com>
1122 lines
42 KiB
Python
1122 lines
42 KiB
Python
#!/usr/bin/python
|
|
#
|
|
# Copyright (c) 2019 Wind River Systems, Inc.
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
|
|
|
|
### BEGIN INIT INFO
|
|
# Provides: ceph/mgr RESTful API plugin
|
|
# Required-Start: $ceph
|
|
# Required-Stop: $ceph
|
|
# Default-Start: 2 3 4 5
|
|
# Default-Stop: 0 1 6
|
|
# Short-Description: Ceph MGR RESTful API plugin
|
|
# Description: Ceph MGR RESTful API plugin
|
|
### END INIT INFO
|
|
|
|
import argparse
|
|
import contextlib
|
|
import errno
|
|
import fcntl
|
|
import inspect
|
|
import json
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import shutil
|
|
import signal
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
|
|
import daemon
|
|
import psutil
|
|
import requests
|
|
|
|
# 'timeout' command returns exit status 124
|
|
# if command times out (see man page)
|
|
GNU_TIMEOUT_EXPIRED_RETCODE = 124
|
|
|
|
|
|
def psutil_terminate_kill(target, timeout):
|
|
|
|
"""Extend psutil functionality to stop a process.
|
|
|
|
SIGINT is sent to each target then after a grace period SIGKILL
|
|
is sent to the ones that are still running.
|
|
"""
|
|
|
|
if not isinstance(target, list):
|
|
target = [target]
|
|
_, target = psutil.wait_procs(target, timeout=0)
|
|
for action in [lambda p: p.terminate(), lambda p: p.kill()]:
|
|
for proc in target:
|
|
action(proc)
|
|
_, target = psutil.wait_procs(
|
|
target, timeout=timeout)
|
|
|
|
|
|
class Config(object):
|
|
|
|
"""ceph-mgr service wrapper configuration options.
|
|
|
|
In the future we may want to load them from a configuration file
|
|
(for example /etc/ceph/mgr-restful-plugin.conf )
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.log_level = logging.INFO
|
|
self.log_dir = '/var/log'
|
|
|
|
self.ceph_mgr_service = '/usr/bin/ceph-mgr'
|
|
self.ceph_mgr_config = '/etc/ceph/ceph.conf'
|
|
self.ceph_mgr_cluster = 'ceph'
|
|
self.ceph_mgr_rundir = '/var/run/ceph/mgr'
|
|
self.ceph_mgr_confdir = '/var/lib/ceph/mgr'
|
|
self.ceph_mgr_identity = socket.gethostname()
|
|
|
|
self.service_name = 'mgr-restful-plugin'
|
|
self.service_socket = os.path.join(
|
|
self.ceph_mgr_rundir, '{}.socket'.format(self.service_name))
|
|
self.service_lock = os.path.join(
|
|
self.ceph_mgr_rundir, '{}.lock'.format(self.service_name))
|
|
self.service_pid_file = os.path.join(
|
|
'/var/run/ceph', '{}.pid'.format(self.service_name))
|
|
|
|
self.restful_plugin_port = 5001
|
|
|
|
# maximum size of a message received/sent via
|
|
# service monitor control socket
|
|
self.service_socket_bufsize = 1024
|
|
|
|
# maximum time to wait for ceph cli to exit
|
|
self.ceph_cli_timeout_sec = 30
|
|
|
|
# how much time to wait after ceph cli commands fail with timeout
|
|
# before running any other commands
|
|
self.cluster_grace_period_sec = 30
|
|
|
|
# after ceph-mgr is started it goes through an internal initialization
|
|
# phase before; how much time to wait before querying ceph-mgr
|
|
self.ceph_mgr_grace_period_sec = 15
|
|
|
|
# after sending SIGTERM to ceph-mgr how much time to wait before
|
|
# sending SIGKILL (maximum time allowed for ceph-mgr cleanup)
|
|
self.ceph_mgr_kill_delay_sec = 5
|
|
|
|
# if service monitor is running a recovery procedure it reports
|
|
# status OK even if ceph-mgr is currently down. This sets the
|
|
# maximum number of consecutive ceph-mgr failures before reporting
|
|
# status error
|
|
self.ceph_mgr_fail_count_report_error = 3
|
|
|
|
# maximum number of consecutive ceph-mgr failures before
|
|
# stopping mgr-restful-plugin service
|
|
self.ceph_mgr_fail_count_exit = 5
|
|
|
|
# maximum time allowed for ceph-mgr to respond to a REST API request
|
|
self.rest_api_timeout_sec = 15
|
|
|
|
# interval between consecutive REST API requests (ping's). A smaller
|
|
# value here triggers more requests to ceph-mgr restful plugin. A
|
|
# higher value makes recovery slower when services become unavailable
|
|
self.restful_plugin_ping_delay_sec = 3
|
|
|
|
# where to save the self-signed certificate generated by ceph-mgr
|
|
self.restful_plugin_cert_path = os.path.join(
|
|
self.ceph_mgr_rundir, 'restful.crt')
|
|
|
|
# time to wait after enabling restful plugin
|
|
self.restful_plugin_grace_period_sec = 3
|
|
|
|
# after how many REST API ping failures to restart ceph-mgr
|
|
self.ping_fail_count_restart_mgr = 3
|
|
|
|
# after how many REST API ping failures to report status error.
|
|
# Until then service monitor reports status OK just in case
|
|
# restful plugin recovers
|
|
self.ping_fail_count_report_error = 5
|
|
|
|
@staticmethod
|
|
def load():
|
|
return Config()
|
|
|
|
|
|
def setup_logging(name=None, cleanup_handlers=False):
|
|
if not name:
|
|
name = CONFIG.service_name
|
|
log = logging.getLogger(name)
|
|
log.setLevel(CONFIG.log_level)
|
|
if cleanup_handlers:
|
|
try:
|
|
for handler in log.handlers:
|
|
if isinstance(handler, logging.StreamHandler):
|
|
handler.flush()
|
|
if isinstance(handler, logging.FileHandler):
|
|
handler.close()
|
|
log.handlers = []
|
|
except Exception:
|
|
pass
|
|
elif log.handlers:
|
|
return log
|
|
handler = logging.FileHandler(
|
|
os.path.join(CONFIG.log_dir,
|
|
'{}.log'.format(CONFIG.service_name)))
|
|
handler.setFormatter(
|
|
logging.Formatter('%(asctime)s %(process)s %(levelname)s %(name)s %(message)s'))
|
|
log.addHandler(handler)
|
|
return log
|
|
|
|
|
|
CONFIG = Config.load()
|
|
LOG = setup_logging(name='init-wrapper')
|
|
|
|
|
|
class ServiceException(Exception):
|
|
|
|
"""Generic mgr-restful-plugin service exception.
|
|
|
|
Build exception string based on static (per exception class)
|
|
string plus args, keyword args passed to exception constructor.
|
|
"""
|
|
|
|
message = ""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
if "message" not in kwargs:
|
|
try:
|
|
message = self.message.format(*args, **kwargs)
|
|
except Exception: # noqa
|
|
message = '{}, args:{}, kwargs: {}'.format(
|
|
self.message, args, kwargs)
|
|
else:
|
|
message = kwargs["message"]
|
|
super(ServiceException, self).__init__(message)
|
|
|
|
|
|
class ServiceAlreadyStarted(ServiceException):
|
|
message = ('Service monitor already started')
|
|
|
|
|
|
class ServiceLockFailed(ServiceException):
|
|
message = ('Unable to lock service monitor: '
|
|
'reason={reason}')
|
|
|
|
|
|
class ServiceNoSocket(ServiceException):
|
|
message = ('Unable to create service monitor socket: '
|
|
'reason={reason}')
|
|
|
|
|
|
class ServiceSocketBindFailed(ServiceException):
|
|
message = ('Failed to bind service monitor socket: '
|
|
'path={path}, reason={reason}')
|
|
|
|
|
|
class ServiceNoPidFile(ServiceException):
|
|
message = ('Failed to update pid file: '
|
|
'path={path}, reason={reason}')
|
|
|
|
|
|
class CommandFailed(ServiceException):
|
|
message = ('Command failed: command={command}, '
|
|
'reason={reason}, out={out}')
|
|
|
|
|
|
class CommandTimeout(ServiceException):
|
|
message = ('Command timeout: command={command}, '
|
|
'timeout={timeout}')
|
|
|
|
|
|
class CephMgrStartFailed(ServiceException):
|
|
message = ('Failed to start ceph_mgr: '
|
|
'reason={reason}')
|
|
|
|
|
|
class CephRestfulPluginFailed(ServiceException):
|
|
message = ('Failed to start restful plugin: '
|
|
'reason={reason}')
|
|
|
|
|
|
class RestApiPingFailed(ServiceException):
|
|
message = ('REST API ping failed: '
|
|
'reason={reason}')
|
|
|
|
|
|
class ServiceMonitor(object):
|
|
|
|
"""Configure and monitor ceph-mgr and restful plugin (Ceph REST API)
|
|
|
|
1. process init script service requests: status, stop. Requests are
|
|
received via a control socket. Stop has priority over whatever
|
|
the monitor is doing currently. Any ceph command that may be running
|
|
is terminated/killed. Note that while ceph-mgr and restful plugin
|
|
configuration is in progress ServiceMonitor reports status OK to
|
|
avoid being restarted by SM.
|
|
|
|
2. configure ceph-mgr and mgr restful plugin: authentication, REST API
|
|
service port, self signed certificate. This runs as a separate
|
|
process so it can be stopped when init script requests it.
|
|
|
|
3. periodically check (ping) REST API responds to HTTPS requests.
|
|
Recovery actions are taken if REST API fails to respond: restart
|
|
ceph-mgr, wait for cluster to become available again.
|
|
"""
|
|
|
|
def __init__(self):
|
|
# process running configuration & REST API ping loop
|
|
self.monitor = None
|
|
|
|
# command socket used by init script
|
|
self.command = None
|
|
|
|
# ceph-mgr process
|
|
self.ceph_mgr = None
|
|
|
|
# consecutive ceph-mgr/restful-plugin start failures. Service monitor
|
|
# reports failure after CONFIG.ceph_mgr_max_failure_count
|
|
self.ceph_mgr_failure_count = 0
|
|
|
|
# consecutive REST API ping failures. ceph-mgr service is restarted
|
|
# after CONFIG.ping_fail_count_restart_mgr threshold is exceeded
|
|
self.ping_failure_count = 0
|
|
|
|
# REST API url reported by ceph-mgr after enabling restful plugin
|
|
self.restful_plugin_url = ''
|
|
|
|
# REST API self signed certificate generated by restful plugin
|
|
self.certificate = ''
|
|
|
|
def run(self):
|
|
self.disable_certificate_check()
|
|
with self.service_lock(), self.service_socket(), \
|
|
self.service_pid_file():
|
|
self.start_monitor()
|
|
self.server_loop()
|
|
|
|
def disable_certificate_check(self):
|
|
# ceph-mgr restful plugin is configured with a self-signed
|
|
# certificate. Certificate host is hard-coded to "ceph-restful"
|
|
# which causes HTTPS requests to fail because they don't
|
|
# match current host name ("controller-..."). Disable HTTPS
|
|
# certificates check in urllib3
|
|
LOG.warning('Disable urllib3 certifcates check')
|
|
requests.packages.urllib3.disable_warnings()
|
|
|
|
def server_loop(self):
|
|
self.command.listen(2)
|
|
while True:
|
|
try:
|
|
client, _ = self.command.accept()
|
|
request = client.recv(CONFIG.service_socket_bufsize)
|
|
LOG.debug('Monitor command socket: request=%s', str(request))
|
|
cmd = request.split(' ')
|
|
cmd, args = cmd[0], cmd[1:]
|
|
if cmd == 'status':
|
|
self.send_response(client, request, self.status())
|
|
elif cmd == 'stop':
|
|
self.stop()
|
|
self.send_response(client, request, 'OK')
|
|
break
|
|
elif cmd == 'restful-url':
|
|
try:
|
|
self.restful_plugin_url = args[0]
|
|
self.send_response(client, request, 'OK')
|
|
except IndexError:
|
|
LOG.warning('Failed to update restful plugin url: '
|
|
'args=%s', str(args))
|
|
self.send_response(client, request, 'ERR')
|
|
elif cmd == 'certificate':
|
|
try:
|
|
self.certificate = args[0] if args else ''
|
|
self.send_response(client, request, 'OK')
|
|
except IndexError:
|
|
LOG.warning('Failed to update certificate path: '
|
|
'args=%s', str(args))
|
|
self.send_response(client, request, 'ERR')
|
|
elif cmd == 'ceph-mgr-failures':
|
|
try:
|
|
self.ceph_mgr_failure_count = int(args[0])
|
|
self.send_response(client, request, 'OK')
|
|
if self.ceph_mgr_failure_count >= CONFIG.ceph_mgr_fail_count_exit:
|
|
self.stop()
|
|
break
|
|
except (IndexError, ValueError):
|
|
LOG.warning('Failed to update ceph-mgr failures: '
|
|
'args=%s', str(args))
|
|
self.send_response(client, request, 'ERR')
|
|
elif cmd == 'ping-failures':
|
|
try:
|
|
self.ping_failure_count = int(args[0])
|
|
self.send_response(client, request, 'OK')
|
|
except (IndexError, ValueError):
|
|
LOG.warning('Failed to update ping failures: '
|
|
'args=%s', str(args))
|
|
self.send_response(client, request, 'ERR')
|
|
except Exception as err:
|
|
LOG.exception(err)
|
|
|
|
@staticmethod
|
|
def send_response(client, request, response):
|
|
try:
|
|
client.send(response)
|
|
except socket.error as err:
|
|
LOG.warning('Failed to send response back. '
|
|
'request=%s, response=%s, reason=%s',
|
|
request, response, err)
|
|
|
|
def status(self):
|
|
if not self.restful_plugin_url:
|
|
if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
|
|
and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
|
|
LOG.debug('Monitor is starting services. Report status OK')
|
|
return 'OK'
|
|
LOG.debug('Too many failures: '
|
|
'ceph_mgr=%d < %d, ping=%d < %d. '
|
|
'Report status ERR',
|
|
self.ceph_mgr_failure_count,
|
|
CONFIG.ceph_mgr_fail_count_report_error,
|
|
self.ping_failure_count,
|
|
CONFIG.ping_fail_count_report_error)
|
|
return 'ERR.down'
|
|
try:
|
|
self.restful_plugin_ping()
|
|
LOG.debug('Restful plugin ping successful. Report status OK')
|
|
return 'OK'
|
|
except (CommandFailed, RestApiPingFailed):
|
|
if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
|
|
and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
|
|
LOG.info('Restful plugin does not respond but failure '
|
|
'count is within acceptable limits: '
|
|
' ceph_mgr=%d < %d, ping=%d < %d. '
|
|
'Report status OK',
|
|
self.ceph_mgr_failure_count,
|
|
CONFIG.ceph_mgr_fail_count_report_error,
|
|
self.ping_failure_count,
|
|
CONFIG.ping_fail_count_report_error)
|
|
return 'OK'
|
|
LOG.debug('Restful does not respond (ping failure count %d). '
|
|
'Report status ERR', self.ping_failure_count)
|
|
return 'ERR.ping_failed'
|
|
|
|
def stop(self):
|
|
if not self.monitor:
|
|
return
|
|
LOG.info('Stop monitor with SIGTERM to process group %d',
|
|
self.monitor.pid)
|
|
try:
|
|
os.killpg(self.monitor.pid, signal.SIGTERM)
|
|
except OSError as err:
|
|
LOG.info('Stop monitor failed: reason=%s', str(err))
|
|
return
|
|
time.sleep(CONFIG.ceph_mgr_kill_delay_sec)
|
|
LOG.info('Stop monitor with SIGKILL to process group %d',
|
|
self.monitor.pid)
|
|
try:
|
|
os.killpg(self.monitor.pid, signal.SIGKILL)
|
|
os.waitpid(self.monitor.pid, 0)
|
|
except OSError as err:
|
|
LOG.info('Stop monitor failed: reason=%s', str(err))
|
|
return
|
|
LOG.info('Monitor stopped: pid=%d', self.monitor.pid)
|
|
|
|
@contextlib.contextmanager
|
|
def service_lock(self):
|
|
LOG.info('Take service lock: path=%s', CONFIG.service_lock)
|
|
try:
|
|
os.makedirs(os.path.dirname(CONFIG.service_lock))
|
|
except OSError:
|
|
pass
|
|
lock_file = open(CONFIG.service_lock, 'w')
|
|
try:
|
|
fcntl.flock(lock_file.fileno(),
|
|
fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except (IOError, OSError) as err:
|
|
if err.errno == errno.EAGAIN:
|
|
raise ServiceAlreadyStarted()
|
|
else:
|
|
raise ServiceLockFailed(reason=str(err))
|
|
# even if we have the lock here there might be another service manager
|
|
# running whose CONFIG.ceph_mgr_rundir was removed before starting
|
|
# this instance. Make sure there is only one service manager running
|
|
self.stop_other_service_managers()
|
|
try:
|
|
yield
|
|
finally:
|
|
os.unlink(CONFIG.service_lock)
|
|
lock_file.close()
|
|
LOG.info('Release service lock: path=%s', CONFIG.service_lock)
|
|
|
|
def stop_other_service_managers(self):
|
|
service = os.path.join('/etc/init.d', CONFIG.service_name)
|
|
for p in psutil.process_iter():
|
|
if p.cmdline()[:2] not in [[service], ['/usr/bin/python', service]]:
|
|
continue
|
|
if p.pid == os.getpid():
|
|
continue
|
|
p.kill()
|
|
|
|
@contextlib.contextmanager
|
|
def service_socket(self):
|
|
LOG.info('Create service socket')
|
|
try:
|
|
self.command = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
|
|
except socket.error as err:
|
|
raise ServiceNoSocket(reason=str(err))
|
|
LOG.info('Remove existing socket files')
|
|
try:
|
|
os.unlink(CONFIG.service_socket)
|
|
except OSError:
|
|
pass
|
|
LOG.info('Bind service socket: path=%s', CONFIG.service_socket)
|
|
try:
|
|
self.command.bind(CONFIG.service_socket)
|
|
except socket.error as err:
|
|
raise ServiceSocketBindFailed(
|
|
path=CONFIG.service_socket, reason=str(err))
|
|
try:
|
|
yield
|
|
finally:
|
|
LOG.info('Close service socket and remove file: path=%s',
|
|
CONFIG.service_socket)
|
|
self.command.close()
|
|
os.unlink(CONFIG.service_socket)
|
|
|
|
@contextlib.contextmanager
|
|
def service_pid_file(self):
|
|
LOG.info('Update service pid file: path=%s', CONFIG.service_pid_file)
|
|
try:
|
|
pid_file = open(CONFIG.service_pid_file, 'w')
|
|
pid_file.write(str(os.getpid()))
|
|
pid_file.flush()
|
|
except OSError as err:
|
|
raise ServiceNoPidFile(
|
|
path=CONFIG.service_pid_file, reason=str(err))
|
|
try:
|
|
yield
|
|
finally:
|
|
LOG.info('Remove service pid file: path=%s',
|
|
CONFIG.service_pid_file)
|
|
try:
|
|
os.unlink(CONFIG.service_pid_file)
|
|
except OSError:
|
|
pass
|
|
|
|
def start_monitor(self):
|
|
LOG.info('Start monitor loop')
|
|
self.monitor = multiprocessing.Process(target=self.monitor_loop)
|
|
self.monitor.start()
|
|
|
|
def stop_unmanaged_ceph_mgr(self):
|
|
LOG.info('Stop unmanaged running ceph-mgr processes')
|
|
service_name = os.path.basename(CONFIG.ceph_mgr_service)
|
|
if self.ceph_mgr:
|
|
psutil_terminate_kill(
|
|
[proc for proc in psutil.process_iter()
|
|
if (proc.name() == service_name
|
|
and proc.pid != self.ceph_mgr.pid)],
|
|
CONFIG.ceph_mgr_kill_delay_sec)
|
|
else:
|
|
psutil_terminate_kill(
|
|
[proc for proc in psutil.process_iter()
|
|
if proc.name() == service_name],
|
|
CONFIG.ceph_mgr_kill_delay_sec)
|
|
|
|
def monitor_loop(self):
|
|
|
|
"""Bring up and monitor ceph-mgr restful plugin.
|
|
|
|
Steps:
|
|
- wait for Ceph cluster to become available
|
|
- configure and start ceph-mgr
|
|
- configure and enable restful plugin
|
|
- send periodic requests to REST API
|
|
- recover from failures
|
|
|
|
Note: because this runs as a separate process it
|
|
must send status updates to service monitor
|
|
via control socket for: ping_failure_count,
|
|
restful_plugin_url and certificate.
|
|
"""
|
|
|
|
# Promote to process group leader so parent (service monitor)
|
|
# can kill the monitor plus processes spawned by it. Otherwise
|
|
# children of monitor_loop() will keep running in background and
|
|
# will be reaped by init when they finish but by then they might
|
|
# interfere with any new service instance.
|
|
os.setpgrp()
|
|
|
|
# Ignoring SIGTERM here ensures process group is not reused by
|
|
# the time parent (service monitor) issues the final SIGKILL.
|
|
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
|
|
|
while True:
|
|
try:
|
|
# steps to configure/start ceph-mgr and restful plugin
|
|
self.ceph_fsid_get()
|
|
self.ceph_mgr_auth_create()
|
|
self.restful_plugin_set_server_port()
|
|
self.restful_plugin_create_certificate()
|
|
self.ceph_mgr_start()
|
|
self.restful_plugin_enable()
|
|
self.restful_plugin_create_admin_key()
|
|
self.restful_plugin_get_url()
|
|
self.restful_plugin_get_certificate()
|
|
|
|
# REST API should be available now
|
|
# start making periodic requests (ping)
|
|
while True:
|
|
try:
|
|
self.restful_plugin_ping()
|
|
self.ping_failure_count = 0
|
|
self.request_update_ping_failures(
|
|
self.ping_failure_count)
|
|
self.ceph_mgr_failure_count = 0
|
|
self.request_update_ceph_mgr_failures(
|
|
self.ceph_mgr_failure_count)
|
|
time.sleep(CONFIG.restful_plugin_ping_delay_sec)
|
|
continue
|
|
except RestApiPingFailed as err:
|
|
LOG.warning(str(err))
|
|
|
|
LOG.info('REST API ping failure count=%d',
|
|
self.ping_failure_count)
|
|
self.ping_failure_count += 1
|
|
self.request_update_ping_failures(
|
|
self.ping_failure_count)
|
|
|
|
# maybe request failed because ceph-mgr is not running
|
|
if not self.ceph_mgr_is_running():
|
|
self.ceph_mgr_failure_count += 1
|
|
self.request_update_ceph_mgr_failures(
|
|
self.ceph_mgr_failure_count)
|
|
self.ceph_mgr_start()
|
|
time.sleep(CONFIG.ceph_mgr_grace_period_sec)
|
|
continue
|
|
|
|
# maybe request failed because cluster health is not ok
|
|
if not self.ceph_fsid_get():
|
|
LOG.info('Unable to get cluster fsid. '
|
|
'Sleep for a while')
|
|
time.sleep(CONFIG.cluster_grace_period_sec)
|
|
break
|
|
|
|
# too many failures? Restart ceph-mgr and go again
|
|
# through configuration steps
|
|
if (self.ping_failure_count
|
|
% CONFIG.ping_fail_count_restart_mgr == 0):
|
|
LOG.info('Too many consecutive REST API failures. '
|
|
'Restart ceph-mgr. Update service '
|
|
'url and certificate')
|
|
self.ceph_mgr_stop()
|
|
self.restful_plugin_url = ''
|
|
self.request_update_plugin_url(self.restful_plugin_url)
|
|
self.certificate = ''
|
|
self.request_update_certificate(self.certificate)
|
|
break
|
|
|
|
time.sleep(CONFIG.restful_plugin_ping_delay_sec)
|
|
|
|
except CommandFailed as err:
|
|
LOG.warning(str(err))
|
|
time.sleep(CONFIG.cluster_grace_period_sec)
|
|
except CommandTimeout as err:
|
|
LOG.warning(str(err))
|
|
except (CephMgrStartFailed, CephRestfulPluginFailed) as err:
|
|
LOG.warning(str(err))
|
|
self.ceph_mgr_failure_count += 1
|
|
self.request_update_ceph_mgr_failures(
|
|
self.ceph_mgr_failure_count)
|
|
time.sleep(CONFIG.ceph_mgr_grace_period_sec)
|
|
except Exception as err:
|
|
LOG.exception(err)
|
|
time.sleep(CONFIG.cluster_grace_period_sec)
|
|
|
|
@staticmethod
|
|
def run_with_timeout(command, timeout, stderr=subprocess.STDOUT):
|
|
try:
|
|
LOG.info('Run command: %s', ' '.join(command))
|
|
return subprocess.check_output(
|
|
['/usr/bin/timeout', str(timeout)] + command,
|
|
stderr=stderr, shell=False).strip()
|
|
except subprocess.CalledProcessError as err:
|
|
if err.returncode == GNU_TIMEOUT_EXPIRED_RETCODE:
|
|
raise CommandTimeout(command=err.cmd, timeout=timeout)
|
|
raise CommandFailed(command=err.cmd, reason=str(err),
|
|
out=err.output)
|
|
|
|
def ceph_fsid_get(self):
|
|
return self.run_with_timeout(['/usr/bin/ceph', 'fsid'],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
|
|
def ceph_mgr_has_auth(self):
|
|
path = '{}/ceph-{}'.format(
|
|
CONFIG.ceph_mgr_confdir, CONFIG.ceph_mgr_identity)
|
|
try:
|
|
os.makedirs(path)
|
|
except OSError as err:
|
|
pass
|
|
try:
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'auth', 'get',
|
|
'mgr.{}'.format(CONFIG.ceph_mgr_identity),
|
|
'-o', '{}/keyring'.format(path)],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
return True
|
|
except CommandFailed as err:
|
|
if 'ENOENT' in str(err):
|
|
return False
|
|
raise
|
|
|
|
def ceph_mgr_auth_create(self):
|
|
if self.ceph_mgr_has_auth():
|
|
return
|
|
LOG.info('Create ceph-mgr authentication')
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'auth', 'get-or-create',
|
|
'mgr.{}'.format(CONFIG.ceph_mgr_identity),
|
|
'mon', 'allow *', 'osd', 'allow *'],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
|
|
def ceph_mgr_is_running(self):
|
|
if not self.ceph_mgr:
|
|
return None
|
|
try:
|
|
self.ceph_mgr.wait(timeout=0)
|
|
except psutil.TimeoutExpired:
|
|
return True
|
|
return False
|
|
|
|
def ceph_mgr_start(self):
|
|
if self.ceph_mgr_is_running():
|
|
return
|
|
self.stop_unmanaged_ceph_mgr()
|
|
LOG.info('Start ceph-mgr daemon')
|
|
try:
|
|
with open(os.devnull, 'wb') as null:
|
|
self.ceph_mgr = psutil.Popen(
|
|
[CONFIG.ceph_mgr_service,
|
|
'--cluster', CONFIG.ceph_mgr_cluster,
|
|
'--conf', CONFIG.ceph_mgr_config,
|
|
'--id', CONFIG.ceph_mgr_identity,
|
|
'-f'],
|
|
close_fds=True,
|
|
stdout=null,
|
|
stderr=null,
|
|
shell=False)
|
|
except (OSError, ValueError) as err:
|
|
raise CephMgrStartFailed(reason=str(err))
|
|
time.sleep(CONFIG.ceph_mgr_grace_period_sec)
|
|
|
|
def ceph_mgr_stop(self):
|
|
if not self.ceph_mgr:
|
|
return
|
|
LOG.info('Stop ceph-mgr')
|
|
psutil_terminate_kill(self.ceph_mgr, CONFIG.ceph_mgr_kill_delay_sec)
|
|
|
|
def restful_plugin_has_server_port(self):
|
|
try:
|
|
with open(os.devnull, 'wb') as null:
|
|
out = self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'get',
|
|
'mgr/restful/server_port'],
|
|
CONFIG.ceph_cli_timeout_sec, stderr=null)
|
|
if out == str(CONFIG.restful_plugin_port):
|
|
return True
|
|
LOG.warning('Restful plugin port mismatch: '
|
|
'current=%d, expected=%d', out,
|
|
CONFIG.restful_plugin_port)
|
|
except CommandFailed as err:
|
|
LOG.warning('Failed to get restful plugin port: '
|
|
'reason=%s', str(err))
|
|
return False
|
|
|
|
def restful_plugin_set_server_port(self):
|
|
if self.restful_plugin_has_server_port():
|
|
return
|
|
LOG.info('Set restful plugin port=%d', CONFIG.restful_plugin_port)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'set',
|
|
'mgr/restful/server_port', str(CONFIG.restful_plugin_port)],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
|
|
def restful_plugin_has_admin_key(self):
|
|
try:
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'get',
|
|
'mgr/restful/keys/admin'],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
return True
|
|
except CommandFailed:
|
|
pass
|
|
return False
|
|
|
|
def restful_plugin_create_admin_key(self):
|
|
if self.restful_plugin_has_admin_key():
|
|
return
|
|
LOG.info('Create restful plugin admin key')
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'restful',
|
|
'create-key', 'admin'],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
|
|
def restful_plugin_has_certificate(self):
|
|
try:
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'get',
|
|
'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'get',
|
|
'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'get',
|
|
'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'get',
|
|
'/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
return True
|
|
except CommandFailed:
|
|
pass
|
|
return False
|
|
|
|
def restful_plugin_create_certificate(self):
|
|
if self.restful_plugin_has_certificate():
|
|
return
|
|
LOG.info('Create restful plugin self signed certificate')
|
|
path = tempfile.mkdtemp()
|
|
try:
|
|
try:
|
|
with tempfile.NamedTemporaryFile() as restful_cnf:
|
|
restful_cnf.write((
|
|
'[req]\n'
|
|
'req_extensions = v3_ca\n'
|
|
'distinguished_name = req_distinguished_name\n'
|
|
'[v3_ca]\n'
|
|
'subjectAltName=DNS:{}\n'
|
|
'basicConstraints = CA:true\n'
|
|
'[ req_distinguished_name ]\n'
|
|
'0.organizationName = IT\n'
|
|
'commonName = ceph-restful\n').format(
|
|
CONFIG.ceph_mgr_identity))
|
|
restful_cnf.flush()
|
|
subprocess.check_call([
|
|
'/usr/bin/openssl', 'req', '-new', '-nodes', '-x509',
|
|
'-subj', '/O=IT/CN=' + CONFIG.ceph_mgr_identity,
|
|
'-days', '3650',
|
|
'-config', restful_cnf.name,
|
|
'-out', os.path.join(path, 'crt'),
|
|
'-keyout', os.path.join(path, 'key'),
|
|
'-extensions', 'v3_ca'])
|
|
except subprocess.CalledProcessError as err:
|
|
raise CommandFailed(
|
|
command=' '.join(err.cmd),
|
|
reason='failed to generate self-signed certificate: {}'.format(str(err)),
|
|
out=err.output)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'set',
|
|
'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
|
|
'-i', os.path.join(path, 'crt')],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'set',
|
|
'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
|
|
'-i', os.path.join(path, 'crt')],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'set',
|
|
'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
|
|
'-i', os.path.join(path, 'key')],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'config-key', 'set',
|
|
'mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
|
|
'-i', os.path.join(path, 'key')],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
finally:
|
|
shutil.rmtree(path)
|
|
|
|
def restful_plugin_is_enabled(self):
|
|
command = ['/usr/bin/ceph', 'mgr', 'module', 'ls',
|
|
'--format', 'json']
|
|
with open(os.devnull, 'wb') as null:
|
|
out = self.run_with_timeout(
|
|
command, CONFIG.ceph_cli_timeout_sec, stderr=null)
|
|
try:
|
|
if 'restful' in json.loads(out)['enabled_modules']:
|
|
return True
|
|
except ValueError as err:
|
|
raise CommandFailed(
|
|
command=' '.join(command),
|
|
reason='unable to decode json: {}'.format(err), out=out)
|
|
except KeyError as err:
|
|
raise CommandFailed(
|
|
command=' '.join(command),
|
|
reason='missing expected key: {}'.format(err), out=out)
|
|
return False
|
|
|
|
def restful_plugin_enable(self):
|
|
if not self.restful_plugin_is_enabled():
|
|
LOG.info('Enable restful plugin')
|
|
self.run_with_timeout(
|
|
['/usr/bin/ceph', 'mgr',
|
|
'module', 'enable', 'restful'],
|
|
CONFIG.ceph_cli_timeout_sec)
|
|
time.sleep(CONFIG.restful_plugin_grace_period_sec)
|
|
|
|
def restful_plugin_get_url(self):
|
|
command = ['/usr/bin/ceph', 'mgr', 'services',
|
|
'--format', 'json']
|
|
with open(os.devnull, 'wb') as null:
|
|
out = self.run_with_timeout(
|
|
command, CONFIG.ceph_cli_timeout_sec, stderr=null)
|
|
try:
|
|
self.restful_plugin_url = json.loads(out)['restful']
|
|
except ValueError as err:
|
|
raise CephRestfulPluginFailed(
|
|
reason='unable to decode json: {} output={}'.format(err, out))
|
|
except KeyError as err:
|
|
raise CephRestfulPluginFailed(
|
|
reason='missing expected key: {} in ouput={}'.format(err, out))
|
|
self.request_update_plugin_url(self.restful_plugin_url)
|
|
|
|
def restful_plugin_get_certificate(self):
|
|
command = ['/usr/bin/ceph', 'config-key', 'get',
|
|
'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)]
|
|
with open(os.devnull, 'wb') as null:
|
|
certificate = self.run_with_timeout(
|
|
command, CONFIG.ceph_cli_timeout_sec, stderr=null)
|
|
with open(CONFIG.restful_plugin_cert_path, 'wb') as cert_file:
|
|
cert_file.write(certificate)
|
|
self.certificate = CONFIG.restful_plugin_cert_path
|
|
self.request_update_certificate(
|
|
self.certificate)
|
|
|
|
def restful_plugin_ping(self):
|
|
if not self.restful_plugin_url:
|
|
raise RestApiPingFailed(reason='missing service url')
|
|
if not self.certificate:
|
|
raise RestApiPingFailed(reason='missing certificate')
|
|
LOG.debug('Ping restful plugin: url=%d', self.restful_plugin_url)
|
|
try:
|
|
response = requests.request(
|
|
'GET', self.restful_plugin_url, verify=False,
|
|
timeout=CONFIG.rest_api_timeout_sec)
|
|
if not response.ok:
|
|
raise RestApiPingFailed(
|
|
reason='response not ok ({})'.format(response))
|
|
LOG.debug('Ping restful plugin OK')
|
|
except (requests.ConnectionError,
|
|
requests.Timeout,
|
|
requests.HTTPError) as err:
|
|
raise RestApiPingFailed(reason=str(err))
|
|
|
|
@staticmethod
|
|
def _make_client_socket():
|
|
sock = socket.socket(
|
|
socket.AF_UNIX, socket.SOCK_SEQPACKET)
|
|
sock.settimeout(2 * CONFIG.rest_api_timeout_sec)
|
|
sock.connect(CONFIG.service_socket)
|
|
return sock
|
|
|
|
@staticmethod
|
|
def request_status():
|
|
try:
|
|
with contextlib.closing(
|
|
ServiceMonitor._make_client_socket()) as sock:
|
|
sock.send('status')
|
|
status = sock.recv(CONFIG.service_socket_bufsize)
|
|
LOG.debug('Status %s', status)
|
|
return status.startswith('OK')
|
|
except socket.error as err:
|
|
LOG.error('Status error: reason=%s', err)
|
|
return False
|
|
|
|
@staticmethod
|
|
def request_stop():
|
|
try:
|
|
with contextlib.closing(
|
|
ServiceMonitor._make_client_socket()) as sock:
|
|
sock.send('stop')
|
|
response = sock.recv(CONFIG.service_socket_bufsize)
|
|
LOG.debug('Stop response: %s', response)
|
|
return True
|
|
except socket.error as err:
|
|
LOG.error('Stop error: reason=%s', err)
|
|
return False
|
|
|
|
@staticmethod
|
|
def request_update_ceph_mgr_failures(count):
|
|
try:
|
|
with contextlib.closing(
|
|
ServiceMonitor._make_client_socket()) as sock:
|
|
sock.send('ceph-mgr-failures {}'.format(count))
|
|
sock.recv(CONFIG.service_socket_bufsize)
|
|
return True
|
|
except socket.error as err:
|
|
LOG.error('Stop error: reason=%s', err)
|
|
return False
|
|
|
|
@staticmethod
|
|
def request_update_ping_failures(count):
|
|
try:
|
|
with contextlib.closing(
|
|
ServiceMonitor._make_client_socket()) as sock:
|
|
sock.send('ping-failures {}'.format(count))
|
|
sock.recv(CONFIG.service_socket_bufsize)
|
|
return True
|
|
except socket.error as err:
|
|
LOG.error('Stop error: reason=%s', err)
|
|
return False
|
|
|
|
@staticmethod
|
|
def request_update_plugin_url(url):
|
|
try:
|
|
with contextlib.closing(
|
|
ServiceMonitor._make_client_socket()) as sock:
|
|
sock.send('restful-url {}'.format(url))
|
|
sock.recv(CONFIG.service_socket_bufsize)
|
|
return True
|
|
except socket.error as err:
|
|
LOG.error('Stop error: reason=%s', err)
|
|
return False
|
|
|
|
@staticmethod
|
|
def request_update_certificate(path):
|
|
try:
|
|
with contextlib.closing(
|
|
ServiceMonitor._make_client_socket()) as sock:
|
|
sock.send('certificate {}'.format(path))
|
|
sock.recv(CONFIG.service_socket_bufsize)
|
|
return True
|
|
except socket.error as err:
|
|
LOG.error('Stop error: reason=%s', err)
|
|
return False
|
|
|
|
|
|
class InitWrapper(object):
|
|
|
|
"""Handle System V init script actions: start, stop, restart, etc. """
|
|
|
|
def __init__(self):
|
|
|
|
"""Dispatch command line action to the corresponding function.
|
|
|
|
Candidate action functions are all class methods except ones
|
|
that start with an underscore.
|
|
"""
|
|
|
|
parser = argparse.ArgumentParser()
|
|
actions = [m[0]
|
|
for m in inspect.getmembers(self)
|
|
if (inspect.ismethod(m[1])
|
|
and not m[0].startswith('_'))]
|
|
parser.add_argument(
|
|
'action',
|
|
choices=actions)
|
|
self.args = parser.parse_args()
|
|
getattr(self, self.args.action)()
|
|
|
|
def start(self):
|
|
|
|
"""Start ServiceMonitor as a daemon unless one is already running.
|
|
|
|
Use a pipe to report monitor status back to this process.
|
|
"""
|
|
|
|
pipe = os.pipe()
|
|
child = os.fork()
|
|
if child == 0:
|
|
os.close(pipe[0])
|
|
with daemon.DaemonContext(files_preserve=[pipe[1]]):
|
|
# prevent duplication of messages in log
|
|
global LOG
|
|
LOG = setup_logging(cleanup_handlers=True)
|
|
try:
|
|
monitor = ServiceMonitor()
|
|
status = 'OK'
|
|
except ServiceAlreadyStarted:
|
|
os.write(pipe[1], 'OK')
|
|
os.close(pipe[1])
|
|
return
|
|
except Exception as err:
|
|
status = str(err)
|
|
os.write(pipe[1], status)
|
|
os.close(pipe[1])
|
|
if status == 'OK':
|
|
try:
|
|
monitor.run()
|
|
except ServiceException as err:
|
|
LOG.warning(str(err))
|
|
except Exception as err:
|
|
LOG.exception('Service monitor error: reason=%s', err)
|
|
else:
|
|
os.close(pipe[1])
|
|
try:
|
|
status = os.read(pipe[0], CONFIG.service_socket_bufsize)
|
|
if status == 'OK':
|
|
sys.exit(0)
|
|
else:
|
|
LOG.warning('Service monitor failed to start: '
|
|
'status=%s', status)
|
|
except IOError as err:
|
|
LOG.warning('Failed to read monitor status: reason=%s', err)
|
|
os.close(pipe[0])
|
|
os.waitpid(child, 0)
|
|
sys.exit(1)
|
|
|
|
def stop(self):
|
|
|
|
"""Tell ServiceMonitor daemon to stop running.
|
|
|
|
In case request fails stop ServiceMonitor and ceph_mgr proecsses
|
|
using SIGTERM followed by SIGKILL.
|
|
"""
|
|
|
|
result = ServiceMonitor.request_stop()
|
|
if not result:
|
|
ceph_mgr = os.path.basename(CONFIG.ceph_mgr_service)
|
|
procs = []
|
|
for proc in psutil.process_iter():
|
|
name = proc.name()
|
|
if name == CONFIG.service_name:
|
|
procs.append(proc)
|
|
if name == ceph_mgr:
|
|
procs.append(proc)
|
|
psutil_terminate_kill(procs, CONFIG.ceph_mgr_kill_delay_sec)
|
|
|
|
def restart(self):
|
|
self.stop()
|
|
self.start()
|
|
|
|
def force_reload(self):
|
|
self.stop()
|
|
self.start()
|
|
|
|
def reload(self):
|
|
self.stop()
|
|
self.start()
|
|
|
|
def status(self):
|
|
|
|
"""Report status from ServiceMonitor.
|
|
|
|
We don't just try to access REST API here because ServiceMonitor may
|
|
be in the process of starting/configuring ceph-mgr and restful
|
|
plugin in which case we report OK to avoid being restarted by SM.
|
|
"""
|
|
|
|
status = ServiceMonitor.request_status()
|
|
sys.exit(0 if status is True else 1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
InitWrapper()
|