Ceph-Manager Improvements
This commit transitions the Ceph manager from the SYSINV Upgrade API to
the USM API to align with updated system management interfaces and
introduces a new configuration file,
/opt/platform/ceph/ceph-manager-config.yaml, which supports
health_filters_for_upgrade for filtering health checks during upgrade
scenarios and health_filters_for_ignore for non-upgrade scenarios,
providing flexible health check management.
If the file /opt/platform/ceph/ceph-manager-config.yaml is not found,
it will be automatically created with a default configuration
containing no filters. Below is an example of the initial configuration:
health_filters_for_ignore: []
health_filters_for_upgrade: []
It implements a generic exception handling mechanism for CephClient API
calls to uniformly handle various error types, improving error
management robustness. The _get_current_alarms method is renamed to
_refresh_current_alarms for better clarity in its functionality.
Deprecated code supporting the Ceph Jewel version is removed to
streamline the codebase and eliminate outdated logic. CephClient API
calls are updated from text to JSON format to enhance data handling
consistency, and the _parse_reason method is modified to process
JSON-formatted responses, ensuring compatibility with the updated API.
Test Plan:
PASS: Generate alarms by starting/stopping ceph-osd/mgr/mon/msd
services and verify alarm triggers for both Bare-Metal and
Rook-Ceph.
PASS: Filter out mds/mgr/mon/ceph-osd service health checks and
confirm no alarms are raised using fm alarm-list for both
Bare-Metal and Rook-Ceph.
PASS: Perform controller swact and verify the configuration flag
persists for both Bare-Metal and Rook-Ceph.
PASS: Upgrade the platform with and without health filters applied and
validate behavior for both Bare-Metal and Rook-Ceph.
PASS: Generate an ISO and perform a fresh installation, ensuring
proper functionality.
PASS: Validate the /opt/platform/ceph/ceph-manager-config.yaml file
by intentionally introducing invalid health_filters_for_upgrade
and health_filters_for_ignore entries and verify error handling
and logging for both Bare-Metal and Rook-Ceph.
All tests were conducted on both Simplex and Duplex configurations.
Closes-bug: 2119003
Change-Id: Ie96ffa4da5e3db1bbf3da70cd0a751f1c7dd4c23
Signed-off-by: Hediberto C Silva <hediberto.cavalcantedasilva@windriver.com>
This commit is contained in:
committed by
Gabriel Przybysz Gonçalves Júnior
parent
4d705afc3b
commit
e0b3b0d4cf
@@ -1,14 +1,19 @@
|
||||
#
|
||||
# Copyright (c) 2016-2018 Wind River Systems, Inc.
|
||||
# Copyright (c) 2016-2018, 2025 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
from os import path
|
||||
|
||||
from sysinv.common import constants as sysinv_constants
|
||||
|
||||
from ceph_manager.i18n import _
|
||||
|
||||
CEPH_MANAGER_CONFIG_FILE = 'ceph-manager-config.yaml'
|
||||
CEPH_MANAGER_CONFIG_PATH = path.join(sysinv_constants.PLATFORM_PATH, 'ceph',
|
||||
CEPH_MANAGER_CONFIG_FILE)
|
||||
|
||||
CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL = \
|
||||
sysinv_constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL
|
||||
@@ -56,6 +61,8 @@ ALARM_REASON_OSDS_OUT = _('OSDs are out')
|
||||
ALARM_REASON_OSDS_DOWN_OUT = _('OSDs are down/out')
|
||||
ALARM_REASON_PEER_HOST_DOWN = _('peer host down')
|
||||
|
||||
OSD_DOWN_FILTER = "OSD_DOWN"
|
||||
|
||||
REPAIR_ACTION_MAJOR_CRITICAL_ALARM = _(
|
||||
'Ensure storage hosts from replication group are unlocked and available.'
|
||||
'Check if OSDs of each storage host are up and running.'
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
from collections import namedtuple
|
||||
import os
|
||||
import time
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
@@ -13,6 +15,7 @@ from fm_api import fm_api
|
||||
# noinspection PyUnresolvedReferences
|
||||
from oslo_log import log as logging
|
||||
from tsconfig import tsconfig
|
||||
import yaml
|
||||
|
||||
from ceph_manager import constants
|
||||
from ceph_manager import exception
|
||||
@@ -21,60 +24,96 @@ from ceph_manager.i18n import _
|
||||
from ceph_manager.i18n import _LE
|
||||
from ceph_manager.i18n import _LI
|
||||
from ceph_manager.i18n import _LW
|
||||
from ceph_manager.sysinv_api import upgrade
|
||||
from ceph_manager.usm_api import upgrade
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# In 18.03 R5, ceph cache tiering was disabled and prevented from being
|
||||
# re-enabled. When upgrading from 18.03 (R5) to R6 we need to remove the
|
||||
# cache-tier from the crushmap ceph-cache-tiering
|
||||
#
|
||||
# This class is needed only when upgrading from R5 to R6
|
||||
# TODO: remove it after 1st R6 release
|
||||
#
|
||||
# This class is used to manage and filter Ceph alarms during upgrades
|
||||
class HandleUpgradesMixin(object):
|
||||
|
||||
def __init__(self, service, conf):
|
||||
self.service = service
|
||||
self.sysinv_upgrade_api = upgrade.SysinvUpgradeApi(conf)
|
||||
self.wait_for_upgrade_complete = False
|
||||
self.usm_api = upgrade.UsmApi(conf)
|
||||
self.patch_upgrade_in_progress = False
|
||||
self.release_upgrade_in_progress = False
|
||||
self.health_filters_for_upgrade = []
|
||||
self.health_filters_for_ignore = []
|
||||
|
||||
def setup(self, config):
|
||||
self._set_upgrade(
|
||||
self.sysinv_upgrade_api.retry_get_software_upgrade_status())
|
||||
def setup(self):
|
||||
self._load_config()
|
||||
self._refresh_upgrade_status()
|
||||
|
||||
def _set_upgrade(self, upgrade):
|
||||
state = upgrade.get('state')
|
||||
from_version = upgrade.get('from_version')
|
||||
if (state
|
||||
and state != constants.UPGRADE_COMPLETED
|
||||
and from_version == constants.TITANIUM_SERVER_VERSION_18_03):
|
||||
|
||||
LOG.info(_LI("Wait for ceph upgrade to complete "
|
||||
"before monitoring cluster."))
|
||||
self.wait_for_upgrade_complete = True
|
||||
|
||||
def set_flag_require_jewel_osds(self):
|
||||
def _load_config(self):
|
||||
# Loads the Ceph manager configuration from a YAML file.
|
||||
# If the file doesn't exist, creates a default configuration.
|
||||
# Handles potential errors during file reading and parsing.
|
||||
try:
|
||||
response, body = self.service.ceph_api.osd_set_key(
|
||||
constants.CEPH_FLAG_REQUIRE_JEWEL_OSDS,
|
||||
body='json', timeout=30)
|
||||
LOG.info(_LI("Set require_jewel_osds flag"))
|
||||
except IOError as e:
|
||||
raise exception.CephApiFailure(
|
||||
call="osd_set_key",
|
||||
reason=str(e))
|
||||
else:
|
||||
if not response.ok:
|
||||
raise exception.CephSetKeyFailure(
|
||||
flag=constants.CEPH_FLAG_REQUIRE_JEWEL_OSDS,
|
||||
extra=_("needed to complete upgrade to Jewel"),
|
||||
response_status_code=response.status_code,
|
||||
response_reason=response.reason,
|
||||
status=body.get('status'),
|
||||
output=body.get('output'))
|
||||
with open(constants.CEPH_MANAGER_CONFIG_PATH, 'r') as file:
|
||||
data = yaml.safe_load(file)
|
||||
except FileNotFoundError:
|
||||
LOG.info(_LI("Ceph manager configuration file not found. Creating default."))
|
||||
default_config = {'health_filters_for_upgrade': [], 'health_filters_for_ignore': []}
|
||||
os.makedirs(os.path.dirname(constants.CEPH_MANAGER_CONFIG_PATH), exist_ok=True)
|
||||
try:
|
||||
with open(constants.CEPH_MANAGER_CONFIG_PATH, 'w') as file:
|
||||
yaml.safe_dump(default_config, file)
|
||||
data = default_config
|
||||
except OSError as e:
|
||||
LOG.warning(_LW("Failed to create configuration: %s" % str(e)))
|
||||
|
||||
except yaml.YAMLError as e:
|
||||
LOG.warning(_LW("Ceph manager configuration file parsing error: %s" % str(e)))
|
||||
return
|
||||
|
||||
if data.get('health_filters_for_upgrade'):
|
||||
self.health_filters_for_upgrade = data['health_filters_for_upgrade']
|
||||
LOG.info(_LI("Health filters for upgrade loaded: %s" % self.health_filters_for_upgrade))
|
||||
|
||||
if data.get('health_filters_for_ignore'):
|
||||
self.health_filters_for_ignore = data['health_filters_for_ignore']
|
||||
LOG.info(_LI("Health filters for upgrade loaded: %s" % self.health_filters_for_ignore))
|
||||
|
||||
def _refresh_upgrade_status(self):
|
||||
try:
|
||||
upgrade = self.usm_api.retry_get_software_upgrade_status()
|
||||
except Exception as ex:
|
||||
LOG.warn(_LW(
|
||||
"Getting software upgrade status failed "
|
||||
"with: %s. Skip health filter attempt "
|
||||
"(will retry on next ceph status poll).") % str(ex))
|
||||
return
|
||||
|
||||
state = upgrade.get('state')
|
||||
from_release = upgrade.get('from_release')
|
||||
to_release = upgrade.get('to_release')
|
||||
|
||||
if (state and state != constants.UPGRADE_COMPLETED):
|
||||
if from_release == to_release:
|
||||
self.patch_upgrade_in_progress = True
|
||||
LOG.info(_LI("Patch upgrade in progress."))
|
||||
else:
|
||||
self.release_upgrade_in_progress = True
|
||||
LOG.info(_LI("Release upgrade in progress."))
|
||||
|
||||
def filter_health_status(self, health):
|
||||
if health.get('checks'):
|
||||
if self.patch_upgrade_in_progress or self.release_upgrade_in_progress:
|
||||
for health_filter in self.health_filters_for_upgrade:
|
||||
if health_filter in health['checks']:
|
||||
LOG.info(_LI("Filtering Ceph health check '%s' during upgrade") % health_filter)
|
||||
del health['checks'][health_filter]
|
||||
else:
|
||||
for health_filter in self.health_filters_for_ignore:
|
||||
if health_filter in health['checks']:
|
||||
LOG.info(_LI("Filtering Ceph health check '%s'") % health_filter)
|
||||
del health['checks'][health_filter]
|
||||
|
||||
# If there is no health detail, change the status to HEALTH_OK
|
||||
if not health['checks']:
|
||||
health['health'] = constants.CEPH_HEALTH_OK
|
||||
return health
|
||||
|
||||
|
||||
class Monitor(HandleUpgradesMixin):
|
||||
@@ -86,11 +125,10 @@ class Monitor(HandleUpgradesMixin):
|
||||
self.known_object_pool_name = None
|
||||
self.primary_tier_name = constants.SB_TIER_DEFAULT_NAMES[
|
||||
constants.SB_TIER_TYPE_CEPH] + constants.CEPH_CRUSH_TIER_SUFFIX
|
||||
self.cluster_is_up = False
|
||||
super(Monitor, self).__init__(service, conf)
|
||||
|
||||
def setup(self, config):
|
||||
super(Monitor, self).setup(config)
|
||||
def setup(self):
|
||||
super(Monitor, self).setup()
|
||||
|
||||
def run(self):
|
||||
# Wait until Ceph cluster is up and we can get the fsid
|
||||
@@ -108,6 +146,7 @@ class Monitor(HandleUpgradesMixin):
|
||||
# Start monitoring ceph status
|
||||
while True:
|
||||
try:
|
||||
self.setup()
|
||||
self.ceph_poll_status()
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
@@ -118,91 +157,95 @@ class Monitor(HandleUpgradesMixin):
|
||||
|
||||
def ceph_get_fsid(self):
|
||||
# Check whether an alarm has already been raised
|
||||
self._get_current_alarms()
|
||||
self._refresh_current_alarms()
|
||||
if self.current_health_alarm:
|
||||
LOG.info(_LI("Current alarm: %s") %
|
||||
str(self.current_health_alarm.__dict__))
|
||||
|
||||
fsid = self._get_fsid()
|
||||
if not fsid:
|
||||
# Raise alarm - it will not have an entity_instance_id
|
||||
self._report_fault({'health': constants.CEPH_HEALTH_DOWN,
|
||||
'detail': 'Ceph cluster is down.'},
|
||||
fm_constants.FM_ALARM_ID_STORAGE_CEPH)
|
||||
else:
|
||||
|
||||
if fsid:
|
||||
# Clear alarm with no entity_instance_id
|
||||
self._clear_fault(fm_constants.FM_ALARM_ID_STORAGE_CEPH)
|
||||
self.service.entity_instance_id = 'cluster=%s' % fsid
|
||||
else:
|
||||
health_info = {
|
||||
'health': constants.CEPH_HEALTH_DOWN,
|
||||
'checks': {},
|
||||
'mutes': []
|
||||
}
|
||||
# Raise alarm - it will not have an entity_instance_id
|
||||
self._report_fault(health_info, fm_constants.FM_ALARM_ID_STORAGE_CEPH)
|
||||
|
||||
def ceph_poll_status(self):
|
||||
# get previous data every time in case:
|
||||
# * daemon restarted
|
||||
# * alarm was cleared manually but stored as raised in daemon
|
||||
self._get_current_alarms()
|
||||
self._refresh_current_alarms()
|
||||
if self.current_health_alarm:
|
||||
LOG.info(_LI("Current alarm: %s") %
|
||||
str(self.current_health_alarm.__dict__))
|
||||
|
||||
# get ceph health
|
||||
health = self._get_health()
|
||||
LOG.info(_LI("Current Ceph health: "
|
||||
"%(health)s detail: %(detail)s") % health)
|
||||
health = self._get_health_detail()
|
||||
|
||||
if health['health'] != constants.CEPH_HEALTH_OK:
|
||||
self._report_fault(health, fm_constants.FM_ALARM_ID_STORAGE_CEPH)
|
||||
else:
|
||||
self._clear_fault(fm_constants.FM_ALARM_ID_STORAGE_CEPH)
|
||||
if health:
|
||||
health_info = self.filter_health_status(health)
|
||||
if health_info['health'] != constants.CEPH_HEALTH_OK:
|
||||
self._report_fault(health_info, fm_constants.FM_ALARM_ID_STORAGE_CEPH)
|
||||
else:
|
||||
self._clear_fault(fm_constants.FM_ALARM_ID_STORAGE_CEPH)
|
||||
|
||||
# Report OSD down/out even if ceph health is OK
|
||||
self._report_alarm_osds_health()
|
||||
|
||||
def filter_health_status(self, health):
|
||||
return super(Monitor, self).filter_health_status(health)
|
||||
|
||||
# CEPH HELPERS
|
||||
|
||||
def _get_fsid(self):
|
||||
response = namedtuple('Response', ['ok', 'reason'])
|
||||
|
||||
try:
|
||||
response, fsid = self.service.ceph_api.fsid(
|
||||
body='text', timeout=30)
|
||||
except IOError as e:
|
||||
LOG.warning(_LW("ceph_api.fsid failed: %s") % str(e))
|
||||
self.cluster_is_up = False
|
||||
return None
|
||||
except Exception as e:
|
||||
response.ok = False
|
||||
response.reason = str(e)
|
||||
LOG.error(_LE("ceph_api.fsid error: %s" % response.reason))
|
||||
|
||||
if not response.ok:
|
||||
if response.ok:
|
||||
return fsid.strip()
|
||||
else:
|
||||
LOG.warning(_LW("Get fsid failed: %s") % response.reason)
|
||||
self.cluster_is_up = False
|
||||
return None
|
||||
|
||||
self.cluster_is_up = True
|
||||
return fsid.strip()
|
||||
def _get_health_detail(self):
|
||||
response = namedtuple('Response', ['ok', 'reason'])
|
||||
|
||||
def _get_health(self):
|
||||
try:
|
||||
# we use text since it has all info
|
||||
response, body = self.service.ceph_api.health(
|
||||
body='text', timeout=30)
|
||||
except IOError as e:
|
||||
LOG.warning(_LW("ceph_api.health failed: %s") % str(e))
|
||||
self.cluster_is_up = False
|
||||
return {'health': constants.CEPH_HEALTH_DOWN,
|
||||
'detail': 'Ceph cluster is down.'}
|
||||
body='json', detail='detail', timeout=30)
|
||||
except Exception as e:
|
||||
response.reason = str(e)
|
||||
|
||||
if not response.ok:
|
||||
LOG.warning(_LW("CEPH health check failed: %s") % response.reason)
|
||||
health_info = [constants.CEPH_HEALTH_DOWN, response.reason]
|
||||
self.cluster_is_up = False
|
||||
else:
|
||||
health_info = body.split(' ', 1)
|
||||
self.cluster_is_up = True
|
||||
LOG.error(_LE("ceph_api.health error: %s" % response.reason))
|
||||
|
||||
health = health_info[0]
|
||||
health_info = {
|
||||
'health': constants.CEPH_HEALTH_DOWN,
|
||||
'checks': {},
|
||||
'mutes': []
|
||||
}
|
||||
|
||||
if len(health_info) > 1:
|
||||
detail = health_info[1]
|
||||
else:
|
||||
detail = health_info[0]
|
||||
return health_info
|
||||
|
||||
return {'health': health.strip(),
|
||||
'detail': detail.strip()}
|
||||
if response.ok:
|
||||
health_info = body.get("output", {})
|
||||
return {
|
||||
'health': health_info.get('status', ''),
|
||||
'checks': health_info.get('checks', {}),
|
||||
'mutes': health_info.get('mutes', [])
|
||||
}
|
||||
|
||||
# we have two root nodes 'cache-tier' and 'storage-tier'
|
||||
# to calculate the space that is used by the pools, we must only
|
||||
@@ -321,6 +364,12 @@ class Monitor(HandleUpgradesMixin):
|
||||
return True
|
||||
|
||||
def _report_alarm_osds_health(self):
|
||||
if (self.patch_upgrade_in_progress or self.release_upgrade_in_progress) or \
|
||||
constants.OSD_DOWN_FILTER in self.health_filters_for_ignore:
|
||||
self._clear_fault(fm_constants.FM_ALARM_ID_STORAGE_CEPH_MAJOR)
|
||||
self._clear_fault(fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL)
|
||||
return
|
||||
|
||||
response, osd_tree = self.service.ceph_api.osd_tree(body='json', timeout=30)
|
||||
if not response.ok:
|
||||
LOG.error(_LE("Failed to retrieve Ceph OSD tree: "
|
||||
@@ -421,23 +470,28 @@ class Monitor(HandleUpgradesMixin):
|
||||
"""Parse reason strings received from Ceph"""
|
||||
if health['health'] in constants.CEPH_STATUS_CUSTOM:
|
||||
# Don't parse reason messages that we added
|
||||
return "Storage Alarm Condition: %(health)s. %(detail)s" % health
|
||||
|
||||
reasons_lst = health['detail'].split(';')
|
||||
return "Storage Alarm Condition: %(health)s. Ceph cluster is down." % health
|
||||
|
||||
parsed_reasons_text = ""
|
||||
|
||||
# Check if PGs have issues - we can't safely store the entire message
|
||||
# as it tends to be long
|
||||
for reason in reasons_lst:
|
||||
if "pgs" in reason:
|
||||
for reason in health['checks'].values():
|
||||
summary = reason.get("summary", {})
|
||||
details = reason.get("detail", [])
|
||||
|
||||
pgs_in_summary = summary and "pgs" in summary.get("message", "")
|
||||
pgs_in_details = any(["pgs" in detail.get('message', '') for detail in details])
|
||||
|
||||
recovery_in_summary = summary and "recovery" in summary.get("message", "")
|
||||
recovery_in_details = any(["recovery" in detail.get('message', '') for detail in details])
|
||||
|
||||
if pgs_in_summary or pgs_in_details:
|
||||
parsed_reasons_text += "PGs are degraded/stuck or undersized"
|
||||
break
|
||||
|
||||
# Extract recovery status
|
||||
parsed_reasons = [r.strip() for r in reasons_lst if 'recovery' in r]
|
||||
if parsed_reasons:
|
||||
parsed_reasons_text += ";" + ";".join(parsed_reasons)
|
||||
elif recovery_in_summary or recovery_in_details:
|
||||
parsed_reasons_text += "PGs are recovering or rebalancing"
|
||||
break
|
||||
|
||||
# We need to keep the most important parts of the messages when storing
|
||||
# them to fm alarms, therefore text between [] brackets is truncated if
|
||||
@@ -511,11 +565,11 @@ class Monitor(HandleUpgradesMixin):
|
||||
|
||||
# Log detailed reason for later analysis
|
||||
if (self.current_ceph_health != health['health'] or
|
||||
self.detailed_health_reason != health['detail']):
|
||||
self.detailed_health_reason != health['checks']):
|
||||
LOG.info(_LI("Ceph status changed: %(health)s "
|
||||
"detailed reason: %(detail)s") % health)
|
||||
"detailed reason: %(checks)s") % health)
|
||||
self.current_ceph_health = health['health']
|
||||
self.detailed_health_reason = health['detail']
|
||||
self.detailed_health_reason = health['checks']
|
||||
|
||||
def _clear_fault(self, alarm_id, entity_instance_id=None):
|
||||
# Only clear alarm if there is one already raised
|
||||
@@ -540,7 +594,7 @@ class Monitor(HandleUpgradesMixin):
|
||||
fm_constants.FM_ALARM_ID_STORAGE_CEPH_CRITICAL,
|
||||
alarm_list[alarm].entity_instance_id)
|
||||
|
||||
def _get_current_alarms(self):
|
||||
def _refresh_current_alarms(self):
|
||||
"""Retrieve currently raised alarm"""
|
||||
self.current_health_alarm = self.service.fm_api.get_fault(
|
||||
fm_constants.FM_ALARM_ID_STORAGE_CEPH,
|
||||
|
||||
@@ -1,93 +0,0 @@
|
||||
# Copyright (c) 2022 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from keystoneauth1 import session as ksession
|
||||
from keystoneauth1.identity import v3
|
||||
from oslo_log import log
|
||||
from retrying import retry
|
||||
from urllib3 import util
|
||||
|
||||
from ceph_manager.i18n import _LI
|
||||
from ceph_manager.i18n import _LW
|
||||
from ceph_manager.utils import ipv6_bracketed
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
MAX_RETRY = 5
|
||||
|
||||
|
||||
class SysinvUpgradeApi(object):
|
||||
def __init__(self, conf):
|
||||
# pylint: disable=protected-access
|
||||
sysinv_conf = conf._namespace._normalized[0]['DEFAULT']
|
||||
sysinv_api_bind_ip = sysinv_conf['sysinv_api_bind_ip'][0]
|
||||
sysinv_api_port = sysinv_conf['sysinv_api_port'][0]
|
||||
|
||||
self.base_url = util.Url(
|
||||
scheme='http',
|
||||
host=ipv6_bracketed(sysinv_api_bind_ip),
|
||||
port=sysinv_api_port,
|
||||
path='/v1').url
|
||||
|
||||
# pylint: disable=protected-access
|
||||
auth_conf = conf._namespace._normalized[0]['keystone_authtoken']
|
||||
self.auth_url = auth_conf['auth_url'][0]
|
||||
self.auth_username = auth_conf['username'][0]
|
||||
self.auth_password = auth_conf['password'][0]
|
||||
self.auth_user_domain_name = auth_conf['user_domain_name'][0]
|
||||
self.auth_project_name = auth_conf['project_name'][0]
|
||||
self.auth_project_domain_name = auth_conf['project_domain_name'][0]
|
||||
|
||||
def _rest_api_request(self, method, api_cmd, api_cmd_headers=None,
|
||||
api_cmd_payload=None):
|
||||
headers = {}
|
||||
headers['Accept'] = "application/json"
|
||||
|
||||
if api_cmd_headers is not None:
|
||||
headers.update(api_cmd_headers)
|
||||
|
||||
session = self._get_session()
|
||||
response = session.request(
|
||||
api_cmd, method, headers=headers, json=api_cmd_payload)
|
||||
|
||||
return response.json()
|
||||
|
||||
def _get_session(self):
|
||||
auth = v3.Password(auth_url=self.auth_url + "/v3",
|
||||
username=self.auth_username,
|
||||
password=self.auth_password,
|
||||
project_name=self.auth_project_name,
|
||||
user_domain_name=self.auth_user_domain_name,
|
||||
project_domain_name=self.auth_project_domain_name)
|
||||
session = ksession.Session(auth=auth)
|
||||
|
||||
return session
|
||||
|
||||
def _get_upgrades(self):
|
||||
url = self.base_url + '/upgrade'
|
||||
response = self._rest_api_request('GET', url)
|
||||
return response.get('upgrades', [])
|
||||
|
||||
def get_software_upgrade_status(self):
|
||||
LOG.info(_LI("Getting software upgrade status from sysinv"))
|
||||
upgrade = {
|
||||
'from_version': None,
|
||||
'to_version': None,
|
||||
'state': None
|
||||
}
|
||||
|
||||
upgrades = self._get_upgrades()
|
||||
if upgrades:
|
||||
upgrade = upgrades[0]
|
||||
|
||||
LOG.info(_LI("Software upgrade status: %s") % str(upgrade))
|
||||
return upgrade
|
||||
|
||||
@retry(stop_max_attempt_number=MAX_RETRY,
|
||||
wait_fixed=1000,
|
||||
retry_on_exception=lambda e:
|
||||
LOG.warn(_LW(
|
||||
"Getting software upgrade status failed "
|
||||
"with: %s. Retrying... ") % str(e)) or True)
|
||||
def retry_get_software_upgrade_status(self):
|
||||
return self.get_software_upgrade_status()
|
||||
@@ -1,84 +0,0 @@
|
||||
import unittest
|
||||
|
||||
from keystoneauth1.exceptions import base
|
||||
import mock
|
||||
|
||||
from ceph_manager.sysinv_api import upgrade
|
||||
|
||||
SYSINV_CONF = {
|
||||
'sysinv_api_bind_ip': '192.168.1.1',
|
||||
'sysinv_api_port': 12345
|
||||
}
|
||||
KEYSTONE_CONF = {
|
||||
'auth_url': 'http://example.com',
|
||||
'username': 'sysadmin',
|
||||
'password': 'hunter2',
|
||||
'user_domain_name': 'Default',
|
||||
'project_name': 'sysinv',
|
||||
'project_domain_name': 'Default'
|
||||
}
|
||||
|
||||
UPGRADE_DICT = {
|
||||
'from_version': '123',
|
||||
'to_version': '456',
|
||||
'state': 'done'
|
||||
}
|
||||
|
||||
|
||||
class SysinvUpgradeApiTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
conf = mock.MagicMock()
|
||||
conf._namespace._normalized.return_value = [{'DEFAULT': SYSINV_CONF}]
|
||||
conf._namespace._normalized.return_value = [
|
||||
{'keystone_authtoken': KEYSTONE_CONF}]
|
||||
|
||||
self.api = upgrade.SysinvUpgradeApi(conf)
|
||||
|
||||
self.session_mock = mock.MagicMock()
|
||||
self.response_mock = mock.MagicMock()
|
||||
|
||||
self.session_mock.request.return_value = self.response_mock
|
||||
|
||||
self.api._get_session = mock.MagicMock(return_value=self.session_mock)
|
||||
|
||||
def test_get_software_upgrade_status_has_upgrade(self):
|
||||
self.response_mock.json.return_value = {'upgrades': [UPGRADE_DICT]}
|
||||
|
||||
status = self.api.get_software_upgrade_status()
|
||||
|
||||
self.session_mock.request.assert_called_once()
|
||||
assert status == UPGRADE_DICT
|
||||
|
||||
def test_get_software_upgrade_status_no_upgrade(self):
|
||||
expected = {
|
||||
'from_version': None,
|
||||
'to_version': None,
|
||||
'state': None
|
||||
}
|
||||
self.response_mock.json.return_value = {'upgrades': []}
|
||||
|
||||
status = self.api.get_software_upgrade_status()
|
||||
|
||||
self.session_mock.request.assert_called_once()
|
||||
assert status == expected
|
||||
|
||||
def test_retry_get_software_upgrade_status_should_retry(self):
|
||||
self.response_mock.json.return_value = {'upgrades': [UPGRADE_DICT]}
|
||||
self.session_mock.request.side_effect = [
|
||||
base.ClientException('Boom!'), self.response_mock]
|
||||
|
||||
status = self.api.retry_get_software_upgrade_status()
|
||||
|
||||
assert self.session_mock.request.call_count == 2
|
||||
assert status == UPGRADE_DICT
|
||||
|
||||
def test_retry_get_software_upgrade_status_retry_limit(self):
|
||||
ex = base.ClientException('Boom!')
|
||||
self.session_mock.request.side_effect = [
|
||||
ex for _ in range(upgrade.MAX_RETRY+1)]
|
||||
|
||||
with self.assertRaises(base.ClientException) as context:
|
||||
self.api.retry_get_software_upgrade_status()
|
||||
|
||||
assert context.exception == ex
|
||||
assert self.session_mock.request.call_count == upgrade.MAX_RETRY
|
||||
@@ -0,0 +1,68 @@
|
||||
# Copyright (c) 2022, 2025 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from oslo_log import log
|
||||
from retrying import retry
|
||||
from sysinv.common.rest_api import get_token
|
||||
from sysinv.common.rest_api import rest_api_request_raise
|
||||
|
||||
from ceph_manager.i18n import _LI
|
||||
from ceph_manager.i18n import _LW
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
MAX_RETRY = 5
|
||||
|
||||
class UsmApi(object):
|
||||
def __init__(self, conf):
|
||||
# pylint: disable=protected-access
|
||||
auth_conf = conf._namespace._normalized[0]['openstack_keystone_authtoken']
|
||||
self.region = auth_conf['region_name']
|
||||
self.token = None
|
||||
|
||||
def _update_token(self):
|
||||
if not self.token or self.token.is_expired():
|
||||
LOG.debug("Requesting a new token")
|
||||
self.token = get_token(self.region)
|
||||
else:
|
||||
LOG.debug("Token is still valid. Reusing.")
|
||||
|
||||
def _get_usm_endpoint(self):
|
||||
if self.token:
|
||||
service_type = "usm"
|
||||
service_name = "usm"
|
||||
region_name = self.region[0] if self.region else None
|
||||
return self.token.get_service_internal_url(service_type,
|
||||
service_name,
|
||||
region_name)
|
||||
return "http://127.0.0.1:5493"
|
||||
|
||||
def _get_upgrades(self):
|
||||
self._update_token()
|
||||
endpoint = self._get_usm_endpoint() + '/v1/deploy/software_upgrade'
|
||||
return rest_api_request_raise(self.token, "GET", endpoint, timeout=10)
|
||||
|
||||
def get_software_upgrade_status(self):
|
||||
LOG.info(_LI("Getting software upgrade status from usm"))
|
||||
upgrade = {
|
||||
'from_release': None,
|
||||
'to_release': None,
|
||||
'state': None
|
||||
}
|
||||
|
||||
response = self._get_upgrades()
|
||||
if response:
|
||||
upgrade = response
|
||||
|
||||
LOG.info(_LI("Software upgrade status: %s") % str(upgrade))
|
||||
return upgrade
|
||||
|
||||
@retry(stop_max_attempt_number=MAX_RETRY,
|
||||
wait_fixed=1000,
|
||||
retry_on_exception=lambda e:
|
||||
LOG.warn(_LW(
|
||||
"Getting software upgrade status failed "
|
||||
"with: %s. Retrying... ") % str(e)) or True)
|
||||
def retry_get_software_upgrade_status(self):
|
||||
return self.get_software_upgrade_status()
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) 2013-2014, 2016 Wind River Systems, Inc.
|
||||
# Copyright (c) 2013-2014, 2016, 2025 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
@@ -10,10 +10,10 @@ import setuptools
|
||||
|
||||
setuptools.setup(
|
||||
name='ceph_manager',
|
||||
version='1.0.0',
|
||||
version='1.0.1',
|
||||
description='CEPH manager',
|
||||
license='Apache-2.0',
|
||||
packages=['ceph_manager', 'ceph_manager.sysinv_api'],
|
||||
packages=['ceph_manager', 'ceph_manager.usm_api'],
|
||||
entry_points={
|
||||
}
|
||||
)
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
etc/logrotate.d
|
||||
usr/lib/python3/dist-packages/ceph_manager
|
||||
usr/lib/python3/dist-packages/ceph_manager-1.0.0.egg-info
|
||||
usr/lib/python3/dist-packages/ceph_manager-1.0.1.egg-info
|
||||
|
||||
@@ -3,4 +3,4 @@ etc/init.d/*
|
||||
usr/lib/systemd/system/ceph-manager.service
|
||||
etc/logrotate.d/*
|
||||
usr/lib/python3/dist-packages/ceph_manager/*
|
||||
usr/lib/python3/dist-packages/ceph_manager-1.0.0.egg-info/*
|
||||
usr/lib/python3/dist-packages/ceph_manager-1.0.1.egg-info/*
|
||||
|
||||
Reference in New Issue
Block a user