Remove Ceph Cache Tiering support

Ceph Cache Tiering feature is not supported anymore. This commit removes
all the code changes associated with the no longer supported Ceph Cache
Tiering. This implies: a. cache tiering cannot be configured on system
b. no ceph-caching host could be added c. no ceph-backing host could be
added d. ceph-caching/ceph-backing personality sub-type won't show up
when 'system host-show'/system host-add command is issued e.
ceph-caching/ceph-backing personality sub-type won't show up when host
is added/listed from horizon

Change-Id: Idd9daf0a258fe4deaf51d174414240cb0a359dde
This commit is contained in:
Robert Church 2018-04-04 13:28:31 -05:00 committed by Jack Ding
parent 2768d2bb1a
commit 686d83b25b
14 changed files with 115 additions and 1533 deletions

View File

@ -33,7 +33,7 @@ HOST_XML_ATTRIBUTES = ['hostname', 'personality', 'subfunctions',
'bm_ip', 'bm_type', 'bm_username',
'bm_password', 'boot_device', 'rootfs_device',
'install_output', 'console', 'vsc_controllers',
'power_on', 'location', 'subtype']
'power_on', 'location']
# Network naming types
DEFAULT_NAMES = 0

Binary file not shown.

Before

Width:  |  Height:  |  Size: 594 B

After

Width:  |  Height:  |  Size: 479 B

View File

@ -1254,52 +1254,6 @@ class HostController(rest.RestController):
HostController._personality_license_check(personality)
def _validate_subtype_cache_tiering(self, operation):
''' Validate cache tiering personality subtype when adding or
when deleting hosts
'''
# TODO(rchurch): Ceph cache tiering is no longer supported. This will be
# refactored out in R6. For R5 we are preventing the service parameter
# from being enabled. This should prevent a caching host from being
# provisioned. To ensure this, just skip all checks and raise an error.
msg = _("Ceph cache tiering is no longer supported. Caching hosts are "
"not allowed to be provisioned")
raise wsme.exc.ClientSideError(msg)
cache_enabled_applied = pecan.request.dbapi.service_parameter_get_one(
service=constants.SERVICE_TYPE_CEPH,
section=constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED,
name=constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED)
if operation == constants.HOST_ADD:
feature_enabled = pecan.request.dbapi.service_parameter_get_one(
service=constants.SERVICE_TYPE_CEPH,
section=constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED,
name=constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED)
if feature_enabled.value.lower() != 'true':
raise wsme.exc.ClientSideError(_("Adding storage hosts with "
"personality subtype {} requires "
"cache tiering feature to be "
"enabled.").format(
constants.PERSONALITY_SUBTYPE_CEPH_CACHING))
if cache_enabled_applied.value.lower() == 'true':
raise wsme.exc.ClientSideError(_("Adding storage hosts with "
"personality subtype {} requires "
"cache tiering to be "
"disabled.").format(
constants.PERSONALITY_SUBTYPE_CEPH_CACHING))
elif operation == constants.HOST_DELETE:
cache_enabled_desired = pecan.request.dbapi.service_parameter_get_one(
service=constants.SERVICE_TYPE_CEPH,
section=constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_DESIRED,
name=constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED)
if (cache_enabled_desired.value.lower() == 'true'or
cache_enabled_applied.value.lower() == 'true'):
raise wsme.exc.ClientSideError(_("Delete storage hosts with "
"personality subtype {} requires "
"cache tiering to be "
"disabled.").format(
constants.PERSONALITY_SUBTYPE_CEPH_CACHING))
def _do_post(self, ihost_dict):
"""Create a new ihost based off a dictionary of attributes """
@ -1398,19 +1352,6 @@ class HostController(rest.RestController):
or not ihost_dict['capabilities']:
ihost_dict['capabilities'] = {}
if ihost_dict['personality'] == constants.STORAGE:
if not 'subtype' in ihost_dict:
ihost_dict['capabilities']['pers_subtype'] = constants.PERSONALITY_SUBTYPE_CEPH_BACKING
else:
if ihost_dict['subtype'] == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
ihost_dict['capabilities']['pers_subtype'] = ihost_dict['subtype']
else:
ihost_dict['capabilities']['pers_subtype'] = constants.PERSONALITY_SUBTYPE_CEPH_BACKING
del ihost_dict['subtype']
if ihost_dict['capabilities']['pers_subtype'] == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
self._validate_subtype_cache_tiering(constants.HOST_ADD)
# If this is the first controller being set up,
# configure and return
if ihost_dict['personality'] == constants.CONTROLLER:
@ -1770,36 +1711,10 @@ class HostController(rest.RestController):
# Add transient fields that are not stored in the database
ihost_dict['bm_password'] = None
subtype_added = False
for p in patch:
if (p['path'] == '/personality' and p['value'] == 'storage'):
if 'pers_subtype' in ihost_dict['capabilities']:
raise wsme.exc.ClientSideError(_("Subtype personality already assigned."))
else:
subtype_added = True
for p1 in patch:
if p1['path'] == '/subtype':
subtype = p1['value']
allowed_subtypes = [
constants.PERSONALITY_SUBTYPE_CEPH_BACKING,
constants.PERSONALITY_SUBTYPE_CEPH_CACHING]
if subtype not in allowed_subtypes:
raise wsme.exc.ClientSideError(_(
"Only {} subtypes are supported for storage personality").format(
",".join(allowed_subtypes)))
ihost_dict['capabilities']['pers_subtype'] = subtype
patch.remove(p1)
break
else:
ihost_dict['capabilities']['pers_subtype'] = constants.PERSONALITY_SUBTYPE_CEPH_BACKING
break
for p in patch:
if p['value'] != 'storage':
break
if p['path'] == '/subtype':
patch.remove(p)
break
try:
patched_ihost = jsonpatch.apply_patch(ihost_dict,
@ -1808,10 +1723,6 @@ class HostController(rest.RestController):
LOG.exception(e)
raise wsme.exc.ClientSideError(_("Patching Error: %s") % e)
if subtype_added and patched_ihost['personality'] == constants.STORAGE:
if patched_ihost['capabilities'].get('pers_subtype') == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
self._validate_subtype_cache_tiering(constants.HOST_ADD)
defaults = objects.host.get_defaults()
ihost_dict_orig = dict(ihost_obj.as_dict())
@ -2230,9 +2141,6 @@ class HostController(rest.RestController):
constants.CONTROLLER_0_HOSTNAME,
constants.CONTROLLER_1_HOSTNAME,
constants.STORAGE_0_HOSTNAME))
# We are not allowed to delete caching hosts if cache tiering is enabled
if ihost['capabilities'].get('pers_subtype') == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
self._validate_subtype_cache_tiering(constants.HOST_DELETE)
# If it is the last storage node to delete, we need to delete
# ceph osd pools and update additional tier status to "defined"
@ -5178,37 +5086,24 @@ class HostController(rest.RestController):
available_peer_count += 1
if available_peer_count < min_replication:
host_subtype = hostupdate.ihost_orig.get('capabilities', {}).get('pers_subtype')
if host_subtype == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
cache_enabled = pecan.request.dbapi.service_parameter_get_one(
service=constants.SERVICE_TYPE_CEPH,
section=constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_DESIRED,
name=constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED)
if cache_enabled.value == 'true':
msg = _("Cannot lock a {} storage node when replication "
"is lost and cache is enabled. Please disable cache first.").format(
constants.PERSONALITY_SUBTYPE_CEPH_CACHING)
pools_usage = \
pecan.request.rpcapi.get_ceph_pools_df_stats(pecan.request.context)
if not pools_usage:
raise wsme.exc.ClientSideError(
_("Cannot lock a storage node when ceph pool usage is undetermined."))
for ceph_pool in pools_usage:
# We only need to check data pools
if ([pool for pool in constants.ALL_CEPH_POOLS
if ceph_pool['name'].startswith(pool)] and
int(ceph_pool['stats']['bytes_used']) > 0):
# Ceph pool is not empty and no other enabled storage
# in set, so locking this storage node is not allowed.
msg = _("Cannot lock a storage node when ceph pools are"
" not empty and replication is lost. This may"
" result in data loss. ")
raise wsme.exc.ClientSideError(msg)
else:
pass
else:
pools_usage = \
pecan.request.rpcapi.get_ceph_pools_df_stats(pecan.request.context)
if not pools_usage:
raise wsme.exc.ClientSideError(
_("Cannot lock a storage node when ceph pool usage is undetermined."))
for ceph_pool in pools_usage:
# We only need to check data pools
if ([pool for pool in constants.ALL_BACKING_POOLS
if ceph_pool['name'].startswith(pool)] and
int(ceph_pool['stats']['bytes_used']) > 0):
# Ceph pool is not empty and no other enabled storage
# in set, so locking this storage node is not allowed.
msg = _("Cannot lock a storage node when ceph pools are not empty "
"and replication is lost. This may result in data loss. ")
raise wsme.exc.ClientSideError(msg)
ceph_pools_empty = True
ceph_pools_empty = True
# Perform checks on storage regardless of operational state
# as a minimum number of monitor is required.
@ -5731,10 +5626,7 @@ def _create_node(host, xml_node, personality, is_dynamic_ip):
if personality == constants.COMPUTE:
et.SubElement(host_node, 'hostname').text = host.hostname
et.SubElement(host_node, 'subfunctions').text = host.subfunctions
elif personality == constants.STORAGE:
subtype = host.capabilities.get('pers_subtype')
if subtype == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
et.SubElement(host_node, 'subtype').text = subtype
et.SubElement(host_node, 'mgmt_mac').text = host.mgmt_mac
if not is_dynamic_ip:
et.SubElement(host_node, 'mgmt_ip').text = host.mgmt_ip

View File

@ -2748,11 +2748,6 @@ def storprofile_applicable(host, profile):
if not len(host.disks) >= len(profile.disks):
return (False, _('profile has more disks than host does'))
if host.capabilities.get('pers_subtype') == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
for pstor in profile.stors:
if pstor.function == constants.STOR_FUNCTION_JOURNAL:
return (False, _('journal storage functions not allowed on {} host').format(
constants.PERSONALITY_SUBTYPE_CEPH_CACHING))
return (True, None)

View File

@ -162,13 +162,9 @@ class ServiceParameterController(rest.RestController):
parms = pecan.request.dbapi.service_parameter_get_all(**kwargs)
# filter out desired and applied parameters; they are used to keep
# track of updates between two consecutive apply actions;
s_applied = constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED
s_desired = constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_DESIRED
parms = [p for p in parms if not (
p.service == constants.SERVICE_TYPE_CEPH and
p.section in [s_applied, s_desired])]
# track of updates between two consecutive apply actions
parms = [p for p in parms if not
p.service == constants.SERVICE_TYPE_CEPH]
# filter out cinder state
parms = [p for p in parms if not (
@ -618,52 +614,6 @@ class ServiceParameterController(rest.RestController):
pass
raise wsme.exc.ClientSideError(str(e.value))
@staticmethod
def _cache_tiering_feature_enabled_semantic_check(service):
if service != constants.SERVICE_TYPE_CEPH:
return
# TODO(rchurch): Ceph cache tiering is no longer supported. This will be
# refactored out in R6. For R5 prevent enabling.
msg = _("Ceph cache tiering is no longer supported.")
raise wsme.exc.ClientSideError(msg)
if not StorageBackendConfig.has_backend_configured(
pecan.request.dbapi,
constants.CINDER_BACKEND_CEPH):
msg = _("Ceph backend is required.")
raise wsme.exc.ClientSideError(msg)
section = 'cache_tiering'
feature_enabled = pecan.request.dbapi.service_parameter_get_one(
service=service, section=section,
name=constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED)
if feature_enabled.value == 'true':
for name in CEPH_CACHE_TIER_PARAMETER_REQUIRED_ON_FEATURE_ENABLED:
try:
pecan.request.dbapi.service_parameter_get_one(
service=service, section=section, name=name)
except exception.NotFound:
msg = _("Unable to apply service parameters. "
"Missing service parameter '%s' for service '%s' "
"in section '%s'." % (name, service, section))
raise wsme.exc.ClientSideError(msg)
else:
storage_nodes = pecan.request.dbapi.ihost_get_by_personality(
constants.STORAGE)
ceph_caching_hosts = []
for node in storage_nodes:
if node.capabilities.get('pers_subtype') == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
ceph_caching_hosts.append(node['hostname'])
if len(ceph_caching_hosts):
msg = _("Unable to apply service parameters. "
"Trying to disable CEPH cache tiering feature "
"with {} host(s) present: {}. "
"Delete host(s) first.").format(
constants.PERSONALITY_SUBTYPE_CEPH_CACHING,
", ".join(sorted(ceph_caching_hosts)))
raise wsme.exc.ClientSideError(msg)
@staticmethod
def _service_parameter_apply_semantic_check_identity():
""" Perform checks for the Identity Service Type."""
@ -941,15 +891,13 @@ class ServiceParameterController(rest.RestController):
"in section '%s'." % (name, service, section))
raise wsme.exc.ClientSideError(msg)
ServiceParameterController._cache_tiering_feature_enabled_semantic_check(service)
# Apply service specific semantic checks
if service == constants.SERVICE_TYPE_IDENTITY:
self._service_parameter_apply_semantic_check_identity()
if service == constants.SERVICE_TYPE_CINDER:
# Make sure one of the internal cinder configs is enabled so that we
# know cinder is operational in this region
# Make sure one of the internal cinder configs is enabled so that
# we know cinder is operational in this region
if not StorageBackendConfig.is_service_enabled(pecan.request.dbapi,
constants.SB_SVC_CINDER,
filter_shared=True):

View File

@ -795,32 +795,6 @@ def _create(stor, iprofile=None, create_pv=True):
create_attrs['fortierid'] = tier.id
if ihost.capabilities.get('pers_subtype') == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
idisk = pecan.request.dbapi.idisk_get(idisk_uuid)
if (idisk.device_type != constants.DEVICE_TYPE_SSD and
idisk.device_type != constants.DEVICE_TYPE_NVME):
raise wsme.exc.ClientSideError(_(
"Invalid stor device type: only SSD and NVME devices "
"are supported on {} hosts.").format(
constants.PERSONALITY_SUBTYPE_CEPH_CACHING))
# OSDs should not be created when cache tiering is enabled
cache_enabled_desired = pecan.request.dbapi.service_parameter_get_one(
service=constants.SERVICE_TYPE_CEPH,
section=constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_DESIRED,
name=constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED)
cache_enabled_applied = pecan.request.dbapi.service_parameter_get_one(
service=constants.SERVICE_TYPE_CEPH,
section=constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED,
name=constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED)
if (cache_enabled_desired.value.lower() == 'true' or
cache_enabled_applied.value.lower() == 'true'):
raise wsme.exc.ClientSideError(_("Adding OSDs to {} nodes "
"is not allowed when cache "
"tiering is "
"enabled.").format(
constants.PERSONALITY_SUBTYPE_CEPH_CACHING))
if not iprofile:
try:
journal_location = \
@ -857,12 +831,6 @@ def _create(stor, iprofile=None, create_pv=True):
"Invalid stor device type: only SSD and NVME devices are supported"
" for journal functions."))
if ihost.capabilities.get('pers_subtype') == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
raise wsme.exc.ClientSideError(_(
"Invalid stor device type: journal function not allowed "
"on {} hosts.").format(
constants.PERSONALITY_SUBTYPE_CEPH_CACHING))
new_stor = pecan.request.dbapi.istor_create(forihostid,
create_attrs)

View File

@ -8,7 +8,6 @@
# coding=utf-8
#
import copy
import os
import tsconfig.tsconfig as tsc
@ -119,8 +118,6 @@ VM_FUNCTION = "VMs"
NO_FUNCTION = "None"
# Host Personality Sub-Types
PERSONALITY_SUBTYPE_CEPH_BACKING = 'ceph-backing'
PERSONALITY_SUBTYPE_CEPH_CACHING = 'ceph-caching'
HOST_ADD = 'host_add' # for personality sub-type validation
HOST_DELETE = 'host_delete' # for personality sub-type validation
@ -169,9 +166,7 @@ STORAGE_2_HOSTNAME = '%s-2' % STORAGE_HOSTNAME
# Other Storage Hostnames are built dynamically.
# Replication Peer groups
PEER_PREFIX_BACKING = 'group-'
PEER_PREFIX_CACHING = 'group-cache-'
PEER_BACKING_RSVD_GROUP = '%s0' % PEER_PREFIX_BACKING
PEER_PREFIX = 'group-'
VIM_DEFAULT_TIMEOUT_IN_SECS = 5
VIM_DELETE_TIMEOUT_IN_SECS = 10
@ -646,33 +641,33 @@ CEPH_POOL_OBJECT_GATEWAY_NAME = {
CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER}
# Main pools for Ceph data backing
BACKING_POOLS = [{'pool_name': CEPH_POOL_VOLUMES_NAME,
'pg_num': CEPH_POOL_VOLUMES_PG_NUM,
'pgp_num': CEPH_POOL_VOLUMES_PGP_NUM,
'quota_gib': None,
'data_pt': 40},
{'pool_name': CEPH_POOL_IMAGES_NAME,
'pg_num': CEPH_POOL_IMAGES_PG_NUM,
'pgp_num': CEPH_POOL_IMAGES_PGP_NUM,
'quota_gib': None,
'data_pt': 20},
{'pool_name': CEPH_POOL_EPHEMERAL_NAME,
'pg_num': CEPH_POOL_EPHEMERAL_PG_NUM,
'pgp_num': CEPH_POOL_EPHEMERAL_PGP_NUM,
'quota_gib': None,
'data_pt': 30},
{'pool_name': CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL,
'pg_num': CEPH_POOL_OBJECT_GATEWAY_PG_NUM,
'pgp_num': CEPH_POOL_OBJECT_GATEWAY_PGP_NUM,
'quota_gib': None,
'data_pt': 10}]
CEPH_POOLS = [{'pool_name': CEPH_POOL_VOLUMES_NAME,
'pg_num': CEPH_POOL_VOLUMES_PG_NUM,
'pgp_num': CEPH_POOL_VOLUMES_PGP_NUM,
'quota_gib': None,
'data_pt': 40},
{'pool_name': CEPH_POOL_IMAGES_NAME,
'pg_num': CEPH_POOL_IMAGES_PG_NUM,
'pgp_num': CEPH_POOL_IMAGES_PGP_NUM,
'quota_gib': None,
'data_pt': 20},
{'pool_name': CEPH_POOL_EPHEMERAL_NAME,
'pg_num': CEPH_POOL_EPHEMERAL_PG_NUM,
'pgp_num': CEPH_POOL_EPHEMERAL_PGP_NUM,
'quota_gib': None,
'data_pt': 30},
{'pool_name': CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL,
'pg_num': CEPH_POOL_OBJECT_GATEWAY_PG_NUM,
'pgp_num': CEPH_POOL_OBJECT_GATEWAY_PGP_NUM,
'quota_gib': None,
'data_pt': 10}]
ALL_BACKING_POOLS = [CEPH_POOL_RBD_NAME,
CEPH_POOL_VOLUMES_NAME,
CEPH_POOL_IMAGES_NAME,
CEPH_POOL_EPHEMERAL_NAME,
CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL,
CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER]
ALL_CEPH_POOLS = [CEPH_POOL_RBD_NAME,
CEPH_POOL_VOLUMES_NAME,
CEPH_POOL_IMAGES_NAME,
CEPH_POOL_EPHEMERAL_NAME,
CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL,
CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER]
# Supported pools for secondary ceph tiers
SB_TIER_CEPH_POOLS = [
@ -683,12 +678,6 @@ SB_TIER_CEPH_POOLS = [
'quota_default': 0,
'data_pt': 100}]
# Pools for Ceph cache tiering
CACHE_POOLS = copy.deepcopy(BACKING_POOLS)
for p in CACHE_POOLS:
# currently all BACKING_POOLS are cached, but this may change in the future
p['pool_name'] = p['pool_name'] + "-cache"
# See http://ceph.com/pgcalc/. We set it to more than 100 because pool usage
# varies greatly in Titanium Cloud and we want to avoid running too low on PGs
CEPH_TARGET_PGS_PER_OSD = 200
@ -818,28 +807,6 @@ SERVICE_PARAM_NAME_IRONIC_CONTROLLER_1_NIC = 'controller_1_if'
SERVICE_PARAM_NAME_IRONIC_NETMASK = 'netmask'
SERVICE_PARAM_NAME_IRONIC_PROVISIONING_NETWORK = 'provisioning_network'
SERVICE_PARAM_SECTION_HORIZON_AUTH = 'auth'
SERVICE_PARAM_SECTION_CEPH_CACHE_TIER = 'cache_tiering'
SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_DESIRED = 'cache_tiering.desired'
SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED = 'cache_tiering.applied'
SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED = 'feature_enabled'
SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED = 'cache_enabled'
SERVICE_PARAM_CEPH_CACHE_TIER_TARGET_MAX_BYTES = 'target_max_bytes'
SERVICE_PARAM_CEPH_CACHE_HIT_SET_TYPE_BLOOM = 'bloom'
CACHE_TIERING_DEFAULTS = {
'cache_min_evict_age': 0,
'cache_min_flush_age': 0,
# cache_target_dirty_high_ratio - not implemented
'cache_target_dirty_ratio': 0.4,
'cache_target_full_ratio': 0.95,
'hit_set_count': 0,
'hit_set_period': 0,
'hit_set_type': SERVICE_PARAM_CEPH_CACHE_HIT_SET_TYPE_BLOOM,
'min_read_recency_for_promote': 0,
# min_write_recency_for_promote - not implemented
}
SERVICE_PARAM_ASSIGNMENT_DRIVER = 'driver'
SERVICE_PARAM_IDENTITY_DRIVER = 'driver'

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013-2016 Wind River Systems, Inc.
# Copyright (c) 2013-2018 Wind River Systems, Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
@ -236,20 +236,6 @@ class CephPoolGetQuotaFailure(CephFailure):
+ ": %(reason)s"
class CephPoolAddTierFailure(CephFailure):
message = _("Failed to add OSD tier: "
"backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, "
"response=%(response_status_code)s:%(response_reason)s, "
"status=%(status)s, output=%(output)s")
class CephPoolRemoveTierFailure(CephFailure):
message = _("Failed to remove tier: "
"backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, "
"response=%(response_status_code)s:%(response_reason)s, "
"status=%(status)s, output=%(output)s")
class CephGetClusterUsageFailure(CephFailure):
message = _("Getting the cluster usage information failed: %(reason)s")
@ -268,11 +254,6 @@ class CephPoolGetParamFailure(CephFailure):
"Reason: %(reason)s")
class CephPoolApplySetParamFailure(CephFailure):
message = _("Cannot apply/set Ceph OSD pool parameters. "
"Reason: cache tiering operation in progress.")
class CephPoolApplyRestoreInProgress(CephFailure):
message = _("Cannot apply/set Ceph OSD pool parameters. "
"Reason: storage restore in progress (wait until "
@ -285,59 +266,6 @@ class CephPoolSetParamFailure(CephFailure):
"Reason: %(reason)s")
class CephCacheSetModeFailure(CephFailure):
message = _("Failed to set OSD tier cache mode: "
"cache_pool=%(cache_pool)s, mode=%(mode)s, "
"response=%(response_status_code)s:%(response_reason)s, "
"status=%(status)s, output=%(output)s")
class CephCacheCreateOverlayFailure(CephFailure):
message = _("Failed to create overlay: "
"backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, "
"response=%(response_status_code)s:%(response_reason)s, "
"status=%(status)s, output=%(output)s")
class CephCacheDeleteOverlayFailure(CephFailure):
message = _("Failed to delete overlay: "
"backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, "
"response=%(response_status_code)s:%(response_reason)s, "
"status=%(status)s, output=%(output)s")
class CephCacheFlushFailure(CephFailure):
message = _("Failed to flush cache pool: "
"cache_pool=%(cache_pool)s, "
"return_code=%(return_code)s, "
"cmd=%(cmd)s, output=%(output)s")
class CephCacheFeatureEnableFailure(CephFailure):
message = _("Cannot enable Ceph cache tiering feature. "
"Reason: %(reason)s")
class CephCacheFeatureDisableFailure(CephFailure):
message = _("Cannot disable Ceph cache tiering feature. "
"Reason: %(reason)s")
class CephCacheConfigFailure(CephFailure):
message = _("Cannot change Ceph cache tiering. "
"Reason: %(reason)s")
class CephCacheEnableFailure(CephFailure):
message = _("Cannot enable Ceph cache tiering. "
"Reason: %(reason)s")
class CephCacheDisableFailure(CephFailure):
message = _("Cannot enable Ceph cache tiering. "
"Reason: %(reason)s")
class InvalidCPUInfo(Invalid):
message = _("Unacceptable CPU info") + ": %(reason)s"
@ -1168,12 +1096,6 @@ class PeerContainsDuplicates(Conflict):
message = _("Peer with name % already exists")
class StorageSubTypeUnexpected(SysinvException):
message = _("Host %(host)s cannot be assigned subtype %(subtype)s. "
"storage-0 and storage-1 personality sub-type can "
"only be ceph backing.")
class StoragePeerGroupUnexpected(SysinvException):
message = _("Host %(host)s cannot be assigned to group %(peer_name)s. "
"group-0 is reserved for storage-0 and storage-1")

View File

@ -1,4 +1,4 @@
# Copyright (c) 2017 Wind River Systems, Inc.
# Copyright (c) 2017-2018 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -206,24 +206,6 @@ def _validate_value_in_set(name, value, _set):
", ".join(_set))))
def _validate_ceph_cache_tier_feature_enabled(name, value):
_validate_value_in_set(
name, value,
['true', 'false'])
def _validate_ceph_cache_tier_cache_enabled(name, value):
_validate_value_in_set(
name, value,
['true', 'false'])
def _validate_ceph_cache_tier_hit_set_type(name, value):
_validate_value_in_set(
name, value,
[constants.SERVICE_PARAM_CEPH_CACHE_HIT_SET_TYPE_BLOOM])
def _validate_token_expiry_time(name, value):
"""Check if timeout value is valid"""
try:
@ -888,58 +870,6 @@ HORIZON_AUTH_PARAMETER_RESOURCE = {
constants.SERVICE_PARAM_HORIZON_AUTH_LOCKOUT_RETRIES: 'openstack::horizon::params::lockout_retries',
}
CEPH_CACHE_TIER_PARAMETER_MANDATORY = [
constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED,
constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED,
]
CEPH_CACHE_TIER_PARAMETER_REQUIRED_ON_FEATURE_ENABLED = [
'hit_set_type',
'hit_set_count',
'hit_set_period',
'cache_target_dirty_ratio',
'cache_target_full_ratio'
]
CEPH_CACHE_TIER_PARAMETER_OPTIONAL = CEPH_CACHE_TIER_PARAMETER_REQUIRED_ON_FEATURE_ENABLED + [
'min_read_recency_for_promote',
'min_write_recency_for_promote',
'cache_target_dirty_high_ratio',
'cache_min_flush_age',
'cache_min_evict_age'
]
CEPH_CACHE_TIER_PARAMETER_VALIDATOR = {
constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED: _validate_ceph_cache_tier_feature_enabled,
constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED: _validate_ceph_cache_tier_cache_enabled,
'hit_set_type': _validate_ceph_cache_tier_hit_set_type,
'hit_set_count': _validate_integer,
'hit_set_period': _validate_integer,
'min_read_recency_for_promote': _validate_integer,
# (not implemented) 'min_write_recency_for_promote': _validate_integer,
'cache_target_dirty_ratio': _validate_float,
# (not implemented) 'cache_target_dirty_high_ratio': _validate_integer,
'cache_target_full_ratio': _validate_float,
'cache_min_flush_age': _validate_integer,
'cache_min_evict_age': _validate_integer,
}
CEPH_CACHE_TIER_PARAMETER_RESOURCE = {
constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED: None,
constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED: None,
'hit_set_type': None,
'hit_set_count': None,
'hit_set_period': None,
'min_read_recency_for_promote': None,
# (not implemented) 'min_write_recency_for_promote': None,
'cache_target_dirty_ratio': None,
# (not implemented) 'cache_target_dirty_high_ratio': None,
'cache_target_full_ratio': None,
'cache_min_flush_age': None,
'cache_min_evict_age': None,
}
# Neutron Service Parameters (optional)
NEUTRON_ML2_PARAMETER_OPTIONAL = [
constants.SERVICE_PARAM_NAME_ML2_MECHANISM_DRIVERS,
@ -1550,19 +1480,6 @@ SERVICE_PARAMETER_SCHEMA = {
SERVICE_PARAM_RESOURCE: HORIZON_AUTH_PARAMETER_RESOURCE,
},
},
constants.SERVICE_TYPE_CEPH: {
constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER: {
SERVICE_PARAM_MANDATORY: CEPH_CACHE_TIER_PARAMETER_MANDATORY,
SERVICE_PARAM_OPTIONAL: CEPH_CACHE_TIER_PARAMETER_OPTIONAL,
SERVICE_PARAM_VALIDATOR: CEPH_CACHE_TIER_PARAMETER_VALIDATOR,
SERVICE_PARAM_RESOURCE: CEPH_CACHE_TIER_PARAMETER_RESOURCE,
},
constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED: {
SERVICE_PARAM_OPTIONAL: CEPH_CACHE_TIER_PARAMETER_MANDATORY + CEPH_CACHE_TIER_PARAMETER_OPTIONAL,
SERVICE_PARAM_VALIDATOR: CEPH_CACHE_TIER_PARAMETER_VALIDATOR,
SERVICE_PARAM_RESOURCE: CEPH_CACHE_TIER_PARAMETER_RESOURCE,
}
},
constants.SERVICE_TYPE_IRONIC: {
constants.SERVICE_PARAM_SECTION_IRONIC_NEUTRON: {
SERVICE_PARAM_OPTIONAL: IRONIC_NEUTRON_PARAMETER_OPTIONAL,

View File

@ -1,57 +0,0 @@
# Copyright (c) 2016-2017 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import copy
from sysinv.common import constants
class ServiceConfig(object):
def __init__(self, db_params=None):
self.feature_enabled = False
self.cache_enabled = False
self.params = {}
self.uuid = {}
if db_params is not None:
for p in db_params:
if p.name == constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED:
self.feature_enabled = (p.value.lower() == 'true')
elif p.name == constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED:
self.cache_enabled = (p.value.lower() == 'true')
else:
self.params[p.name] = p.value
self.uuid[p.name] = p.uuid
def __repr__(self):
return ("ServiceConfig({}={}, {}={}, params={}, uuid={})").format(
constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED, self.feature_enabled,
constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED, self.cache_enabled,
self.params, self.uuid)
def __eq__(self, other):
return (self.feature_enabled == other.feature_enabled and
self.cache_enabled == other.cache_enabled and
self.params == other.params)
def __ne__(self, other):
return not self.__eq__(other)
def to_dict(self):
return {constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED: self.feature_enabled,
constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED: self.cache_enabled,
'params': copy.deepcopy(self.params),
'uuid': copy.deepcopy(self.uuid)}
@classmethod
def from_dict(cls, data):
try:
sp = cls()
sp.feature_enabled = data[constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED]
sp.cache_enabled = data[constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED]
sp.params = copy.deepcopy(data['params'])
sp.uuid = copy.deepcopy(data['uuid'])
return sp
except (KeyError, TypeError):
pass
return

View File

@ -15,13 +15,10 @@ from __future__ import absolute_import
import os
import uuid
import copy
import wsme
from requests.exceptions import RequestException, ReadTimeout
from cephclient import wrapper as ceph
from fm_api import constants as fm_constants
from fm_api import fm_api
from sysinv.common import ceph as ceph_utils
from sysinv.common import constants
from sysinv.common import exception
from sysinv.common import utils as cutils
@ -30,363 +27,14 @@ from sysinv.openstack.common import uuidutils
from sysinv.common.storage_backend_conf import StorageBackendConfig
from sysinv.openstack.common.gettextutils import _
from sysinv.openstack.common import excutils
from sysinv.openstack.common import rpc
from sysinv.openstack.common.rpc.common import CommonRpcContext
from sysinv.openstack.common.rpc.common import RemoteError as RpcRemoteError
from sysinv.conductor.cache_tiering_service_config import ServiceConfig
LOG = logging.getLogger(__name__)
BACKING_POOLS = copy.deepcopy(constants.BACKING_POOLS)
CACHE_POOLS = copy.deepcopy(constants.CACHE_POOLS)
CEPH_POOLS = copy.deepcopy(constants.CEPH_POOLS)
SERVICE_TYPE_CEPH = constants.SERVICE_TYPE_CEPH
CACHE_TIER = constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER
CACHE_TIER_DESIRED = constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_DESIRED
CACHE_TIER_APPLIED = constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED
CACHE_TIER_SECTIONS = [CACHE_TIER, CACHE_TIER_DESIRED, CACHE_TIER_APPLIED]
CACHE_TIER_CACHE_ENABLED = constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED
CACHE_TIER_RESTORE_TASK_DISABLE = "cache_tier_restore_task_disable"
CACHE_TIER_RESTORE_TASK_ENABLE = "cache_tier_restore_task_enable"
class CacheTiering(object):
def __init__(self, operator):
self.operator = operator
# Cache UUIDs of service_parameters for later use to
# reduce DB access
self.config_uuids = {}
self.desired_config_uuids = {}
self.applied_config_uuids = {}
self.restore_task = None
def get_config(self):
ret = {}
if StorageBackendConfig.is_ceph_backend_restore_in_progress(self.operator._db_api):
LOG.info("Restore in progress. Return stub (disabled) Ceph cache tiering configuration")
return ret
for section in CACHE_TIER_SECTIONS:
config = self.operator.service_parameter_get_all(section=section)
if config:
ret[section] = ServiceConfig(config).to_dict()
LOG.info("Ceph cache tiering configuration: %s" % str(ret))
return ret
def is_cache_tiering_enabled(self):
p = self.operator.service_parameter_get_one(SERVICE_TYPE_CEPH,
CACHE_TIER,
CACHE_TIER_CACHE_ENABLED)
return (p.value.lower() == 'true')
def apply_service_config(self, new_config, desired_config, applied_config):
LOG.debug("Applying Ceph service config "
"new_config: %(new)s desired_config: %(desired)s "
"applied_config: %(applied)s" %
{'new': new_config.to_dict(),
'desired': desired_config.to_dict(),
'applied': applied_config.to_dict()})
# See description in ceph.update_service_config for design detail
if new_config.feature_enabled != applied_config.feature_enabled:
if new_config.feature_enabled:
self.enable_feature(new_config, applied_config)
else:
self.disable_feature(new_config, applied_config)
elif new_config.cache_enabled != desired_config.cache_enabled:
if not new_config.feature_enabled:
raise exception.CephCacheEnableFailure(
reason='Cache tiering feature is not enabled')
else:
if not self.operator.ceph_status_ok() and \
not self.restore_task:
raise exception.CephCacheConfigFailure(
reason=_('Ceph Status is not healthy.'))
if new_config.cache_enabled:
# Enable cache only if caching tier nodes are available
caching_hosts = self.operator.get_caching_hosts()
if len(caching_hosts) < 2:
raise exception.CephCacheConfigFailure(
reason=_('At least two caching hosts must be '
'configured and enabled before '
'enabling cache tiering.'))
if len(caching_hosts) % 2:
raise exception.CephCacheConfigFailure(
reason=_('Caching hosts are configured in pairs, '
'both hosts of each pair must be '
'configured and enabled before '
'enabling cache tiering.'))
for h in caching_hosts:
if (h.availability != constants.AVAILABILITY_AVAILABLE and
h.operational != constants.OPERATIONAL_ENABLED):
raise exception.CephCacheConfigFailure(
reason=_('All caching hosts must be '
'available before enabling '
'cache tiering.'))
self.enable_cache(new_config, desired_config)
else:
self.disable_cache(new_config, desired_config)
else:
if new_config.feature_enabled and new_config.cache_enabled:
# To be safe let configure_osd_pools() be the only place that can
# update the object pool name in BACKING_POOLS.
backing_pools_snapshot = copy.deepcopy(BACKING_POOLS)
for pool in backing_pools_snapshot:
# Need to query which Rados object data pool exists
if pool['pool_name'] == constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL:
pool_name = self.operator.get_ceph_object_pool_name()
if pool_name is None:
raise wsme.exc.ClientSideError("Ceph object data pool does not exist.")
else:
pool['pool_name'] = pool_name
self.cache_pool_set_config(pool, new_config, desired_config)
self.db_param_apply(new_config, desired_config, CACHE_TIER_DESIRED)
self.db_param_apply(new_config, desired_config, CACHE_TIER_APPLIED)
def db_param_apply(self, new_config, old_config, section):
""" Update database section with delta between configs
We are comparing 'new_config' with old_config and any difference is
stored in 'section'. If a parameter is missing from new_config then
it is also removed from 'section' otherwise, any difference will be
updated or created in section.
Note that 'section' will not necessarily have the same content as in
'new_config' only the difference between new_config and old_config is
updated in 'section'
"""
# Use cached uuids for current section
if section == CACHE_TIER:
uuids = self.config_uuids
elif section == CACHE_TIER_DESIRED:
uuids = self.desired_config_uuids
elif section == CACHE_TIER_APPLIED:
uuids = self.applied_config_uuids
else:
uuids = old_config.uuid
# Delete service parameters that have been removed
for name in (set(old_config.params) - set(new_config.params)):
try:
self.operator.service_parameter_destroy(name, section)
except exception.NotFound:
pass
# Update feature_enable of old_config with new value
name = constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED
_uuid = uuids.get(name)
value = 'true' if new_config.feature_enabled else 'false'
self.operator.service_parameter_create_or_update(name, value,
section, _uuid)
# Update cache_enable of old_config with new value
name = constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED
_uuid = uuids.get(name)
value = 'true' if new_config.cache_enabled else 'false'
self.operator.service_parameter_create_or_update(name, value,
section, _uuid)
# Update all of the other service parameters
for name, value in new_config.params.iteritems():
_uuid = uuids.get(name)
self.operator.service_parameter_create_or_update(name, value,
section, _uuid)
if section == CACHE_TIER_APPLIED:
self.operator.cache_tier_config_out_of_date_alarm_clear()
def cache_pool_set_config(self, pool, new_config, applied_config):
for name in (set(applied_config.params) - set(new_config.params)):
if name in constants.CACHE_TIERING_DEFAULTS:
LOG.debug("Setting default for parameter: %s" % name)
self.operator.cache_pool_set_param(pool, name,
constants.CACHE_TIERING_DEFAULTS[name])
else:
LOG.warn(_("Unable to reset cache pool parameter {} to default value").format(name))
for name, value in new_config.params.iteritems():
if value != applied_config.params.get(name):
LOG.debug("Setting value of parameter: %(name)s"
" to: %(value)s" % {'name': name,
'value': value})
self.operator.cache_pool_set_param(pool, name, value)
def enable_feature(self, new_config, applied_config):
if new_config.cache_enabled:
raise exception.CephCacheFeatureEnableFailure(
reason=_("Cannot enable feature and cache at the same time, "
"please enable feature first then cache"))
else:
ceph_helper = ceph_utils.CephApiOperator()
num_monitors, required_monitors, quorum_names = \
ceph_helper.get_monitors_status(self.operator._db_api)
if num_monitors < required_monitors:
raise exception.CephCacheFeatureEnableFailure(
reason=_("Only %d storage monitor available. At least %s "
"unlocked and enabled hosts with monitors are "
"required. Please ensure hosts with monitors are "
"unlocked and enabled - candidates: controller-0, "
"controller-1, storage-0") % (num_monitors,
required_monitors))
# This is only a flag so we set it to both desired and applied at the
# same time
self.db_param_apply(new_config, applied_config, CACHE_TIER_DESIRED)
self.db_param_apply(new_config, applied_config, CACHE_TIER_APPLIED)
LOG.info(_("Cache tiering feature enabled"))
def disable_feature(self, new_config, desired_config):
if desired_config.cache_enabled:
raise exception.CephCacheFeatureDisableFailure(
reason=_("Please disable cache before disabling feature."))
else:
ceph_caching_hosts = self.operator.get_caching_hosts()
if len(ceph_caching_hosts):
raise exception.CephCacheFeatureDisableFailure(
reason=_("{} hosts present: {}").format(
constants.PERSONALITY_SUBTYPE_CEPH_CACHING,
[h['hostname'] for h in ceph_caching_hosts]))
# This is only a flag so we set it to both desired and applied at the
# same time
self.db_param_apply(new_config, desired_config, CACHE_TIER_DESIRED)
self.db_param_apply(new_config, desired_config, CACHE_TIER_APPLIED)
LOG.info(_("Cache tiering feature disabled"))
def enable_cache(self, new_config, desired_config):
if not new_config.feature_enabled:
raise exception.CephCacheEnableFailure(
reason='Cache tiering feature is not enabled')
if not self.operator.check_all_group_cache_valid():
raise exception.CephCacheEnableFailure(
reason=_("Each cache group should have at least"
" one storage host available"))
self.db_param_apply(new_config, desired_config, CACHE_TIER_DESIRED)
# 'cache_tiering_enable_cache' is called with a 'desired_config'
# before it was stored in the database! self.db_param_apply only
# updates the database.
rpc.call(CommonRpcContext(),
constants.CEPH_MANAGER_RPC_TOPIC,
{'method': 'cache_tiering_enable_cache',
'args': {'new_config': new_config.to_dict(),
'applied_config': desired_config.to_dict()}})
def enable_cache_complete(self, success, _exception, new_config, applied_config):
new_config = ServiceConfig.from_dict(new_config)
applied_config = ServiceConfig.from_dict(applied_config)
if success:
self.db_param_apply(new_config, applied_config, CACHE_TIER_APPLIED)
LOG.info(_("Cache tiering: enable cache complete"))
if self.restore_task == CACHE_TIER_RESTORE_TASK_ENABLE:
self.operator.reset_storage_backend_task()
self.restore_task = None
else:
# Operation failed, so desired config need to be returned
# to the initial value before user executed
# system service-parameter-apply ceph
self.db_param_apply(applied_config, new_config, CACHE_TIER_DESIRED)
LOG.warn(_exception)
def disable_cache(self, new_config, desired_config):
self.db_param_apply(new_config, desired_config, CACHE_TIER_DESIRED)
rpc.call(CommonRpcContext(),
constants.CEPH_MANAGER_RPC_TOPIC,
{'method': 'cache_tiering_disable_cache',
'args': {'new_config': new_config.to_dict(),
'applied_config': desired_config.to_dict()}})
def disable_cache_complete(self, success, _exception,
new_config, applied_config):
new_config = ServiceConfig.from_dict(new_config)
applied_config = ServiceConfig.from_dict(applied_config)
if success:
self.db_param_apply(new_config, applied_config, CACHE_TIER_APPLIED)
LOG.info(_("Cache tiering: disable cache complete"))
if self.restore_task == CACHE_TIER_RESTORE_TASK_DISABLE:
self.restore_task = CACHE_TIER_RESTORE_TASK_ENABLE
self.operator.restore_cache_tiering()
else:
self.db_param_apply(applied_config, new_config, CACHE_TIER_DESIRED)
LOG.warn(_exception)
def operation_in_progress(self):
return rpc.call(CommonRpcContext(),
constants.CEPH_MANAGER_RPC_TOPIC,
{'method': 'cache_tiering_operation_in_progress',
'args': {}})
def restore_ceph_config_after_storage_enabled(self):
LOG.info(_("Restore Ceph config after storage enabled"))
# get cache tiering config.sections
#
current_config = ServiceConfig(
self.operator.service_parameter_get_all(section=CACHE_TIER))
LOG.info(_("Cache tiering: current configuration %s") % str(current_config))
applied_config = ServiceConfig(
self.operator.service_parameter_get_all(section=CACHE_TIER_APPLIED))
LOG.info(_("Cache tiering: applied configuration %s") % str(applied_config))
desired_config = ServiceConfig(
self.operator.service_parameter_get_all(section=CACHE_TIER_DESIRED))
LOG.info(_("Cache tiering: desired configuration %s") % str(desired_config))
# desired config is the union of applied and desired config. prior
# to backup. This should handle the case when backup is executed
# while cache tiering operation is in progress
#
config = current_config.to_dict()
config.update(applied_config.to_dict())
config.update(desired_config.to_dict())
config = ServiceConfig.from_dict(config)
if (len(self.operator.service_parameter_get_all(
section=CACHE_TIER_DESIRED,
name=constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED)) == 0):
# use applied config in case there's no desired config in
# the database - otherwise ServiceConfig() uses the default
# value (False) which may incorrectly override applied config
#
config.feature_enabled = applied_config.feature_enabled
if (len(self.operator.service_parameter_get_all(
section=CACHE_TIER_DESIRED,
name=constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED)) == 0):
# use applied config in case there's no desired config in
# the database - otherwise ServiceConfig() uses the default
# value (False) which may incorrectly override applied config
#
config.cache_enabled = applied_config.cache_enabled
LOG.info(_("Cache tiering: set database desired config %s") % str(config))
self.db_param_apply(config, desired_config, CACHE_TIER_DESIRED)
desired_config = config
# cache tier applied section stores system state prior to backup;
# clear it on restore before triggering a ceph-manager apply action
#
config = ServiceConfig()
LOG.info(_("Cache tiering: clear database applied configuration"))
self.db_param_apply(config, applied_config, CACHE_TIER_APPLIED)
applied_config = config
# apply desired configuration in 2 steps: enable feature
# then enable cache
#
if desired_config.feature_enabled:
cache_enabled = desired_config.cache_enabled
if cache_enabled:
LOG.info(_("Cache tiering: disable cache_enabled while enabling feature"))
desired_config.cache_enabled = False
LOG.info(_("Cache tiering: enable feature after restore"))
try:
self.apply_service_config(desired_config, applied_config, applied_config)
applied_config.feature_enabled = True
if cache_enabled:
desired_config.cache_enabled = True
LOG.info(_("Cache tiering: enable cache after restore"))
try:
self.apply_service_config(desired_config, applied_config, applied_config)
except exception.CephFailure as e:
LOG.warn(_("Cache tiering: failed to enable cache after restore. Reason: %s") % str(e))
except exception.CephFailure as e:
LOG.warn(_("Cache tiering: failed to enable feature after restore. Reason: %s") % str(e))
class CephOperator(object):
@ -405,12 +53,6 @@ class CephOperator(object):
self._db_cluster = None
self._db_primary_tier = None
self._cluster_name = 'ceph_cluster'
self._cache_tiering_pools = {
constants.CEPH_POOL_VOLUMES_NAME + '-cache': constants.CEPH_POOL_VOLUMES_NAME,
constants.CEPH_POOL_EPHEMERAL_NAME + '-cache': constants.CEPH_POOL_EPHEMERAL_NAME,
constants.CEPH_POOL_IMAGES_NAME + '-cache': constants.CEPH_POOL_IMAGES_NAME
}
self._cache_tiering = CacheTiering(self)
self._init_db_cluster_and_tier()
# Properties: During config_controller we will not initially have a cluster
@ -534,23 +176,18 @@ class CephOperator(object):
def _get_db_peer_groups(self, replication):
# Process all existing peer records and extract view of the peer groups
host_to_peer = {}
group_stats = {
constants.PERSONALITY_SUBTYPE_CEPH_BACKING: CephOperator.GroupStats(),
constants.PERSONALITY_SUBTYPE_CEPH_CACHING: CephOperator.GroupStats()}
stats = CephOperator.GroupStats()
peers = self._db_api.peers_get_all_by_cluster(self.cluster_id)
for peer in peers:
for host in peer.hosts:
# Update host mapping
host_to_peer[host] = peer
if "cache" in peer.name:
stats = group_stats[constants.PERSONALITY_SUBTYPE_CEPH_CACHING]
else:
stats = group_stats[constants.PERSONALITY_SUBTYPE_CEPH_BACKING]
stats.peer_count += 1
if len(peer.hosts) < replication:
stats.incomplete_peers.append(peer)
return host_to_peer, group_stats
return host_to_peer, stats
def assign_host_to_peer_group(self, host_obj):
# Prevent re-running the peer assignment logic if the host already has a
@ -561,20 +198,10 @@ class CephOperator(object):
return
hostname = host_obj.hostname
subtype = host_obj.capabilities['pers_subtype']
# Get configured ceph replication
replication, min_replication = StorageBackendConfig.get_ceph_pool_replication(self._db_api)
# Sanity check #1: storage-0 and storage-1 subtype is ceph-backing
# TODO: keep this check only for default replication until
# TODO: cache tiering is deprecated
if replication == constants.CEPH_REPLICATION_FACTOR_DEFAULT:
if hostname in [constants.STORAGE_0_HOSTNAME,
constants.STORAGE_1_HOSTNAME] and \
subtype != constants.PERSONALITY_SUBTYPE_CEPH_BACKING:
raise exception.StorageSubTypeUnexpected(host=hostname, subtype=subtype)
host_to_peer, stats = self._get_db_peer_groups(replication)
# Sanity Check #2: Is this host already assigned?
@ -585,33 +212,12 @@ class CephOperator(object):
peer_name=peer.name)
try:
peer_obj = stats[subtype].incomplete_peers[0]
peer_obj = stats.incomplete_peers[0]
peer_name = peer_obj.name
except IndexError:
peer_obj = None
if subtype == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
peer_name = '%s%s' % (constants.PEER_PREFIX_CACHING,
str(stats[subtype].peer_count))
else:
peer_name = '%s%s' % (constants.PEER_PREFIX_BACKING,
str(stats[subtype].peer_count))
# TODO: keep these checks only for default repication until
# TODO: cache tiering is deprecated
if replication == constants.CEPH_REPLICATION_FACTOR_DEFAULT:
# Sanity check #3: storage-0 and storage-1 are always in group-0
if hostname in [constants.STORAGE_0_HOSTNAME,
constants.STORAGE_1_HOSTNAME] and \
peer_name != constants.PEER_BACKING_RSVD_GROUP:
raise exception.StoragePeerGroupUnexpected(
host=hostname, subtype=subtype, peer_name=peer_name)
# Sanity check #4: group-0 is reserved for storage-0 and storage-1
if peer_name == constants.PEER_BACKING_RSVD_GROUP \
and hostname not in [constants.STORAGE_0_HOSTNAME,
constants.STORAGE_1_HOSTNAME]:
raise exception.StoragePeerGroupUnexpected(
host=hostname, subtype=subtype, peer_name=peer_name)
peer_name = '%s%s' % (constants.PEER_PREFIX,
str(stats.peer_count))
if not peer_obj:
peer_obj = self._db_api.peer_create({
@ -654,81 +260,6 @@ class CephOperator(object):
self.assign_host_to_peer_group(host)
def _calculate_target_pg_num(self, storage_hosts, pool_name):
"""
Calculate target pg_num based upon storage hosts and OSD
storage_hosts: storage host objects
returns target_pg_num calculated target policy group number
osds_raw actual osds
Minimum: <= 2 storage applies minimum. (512, 512, 256, 256)
Assume max 8 OSD for first pair to set baseline.
cinder_volumes: 512 * 2
ephemeral_vms: 512 * 2
glance_images: 256 * 2
.rgw.buckets: 256 * 2
rbd: 64 (this is created by Ceph)
--------------------
Total: 3136
Note: for a single OSD the value has to be less than 2048, formula:
[Total] / [total number of OSD] = [PGs/OSD]
3136 / 2 = 1568 < 2048
See constants.BACKING_POOLS for up to date values
Above 2 Storage hosts: Calculate OSDs based upon pg_calc:
[(Target PGs per OSD) * (# OSD) * (% Data) ]/ Size
Select Target PGs per OSD = 200; to forecast it can double
Determine number of OSD (in muliples of storage-pairs) on the
first host-unlock of storage pair.
"""
target_pg_num = None
osds = 0
stors = None
for i in storage_hosts:
# either cinder or ceph
stors = self._db_api.istor_get_by_ihost(i.uuid)
osds += len(stors)
osds_raw = osds
if len(storage_hosts) % 2 != 0:
osds += len(stors)
LOG.debug("OSD odd number of storage hosts, adjusting osds by %d "
"to osds=%d" % (len(stors), osds))
data_pt = None
for pool in (BACKING_POOLS + CACHE_POOLS):
# Either pool name would be fine here
if pool_name in constants.CEPH_POOL_OBJECT_GATEWAY_NAME:
if pool['pool_name'] in constants.CEPH_POOL_OBJECT_GATEWAY_NAME:
data_pt = int(pool['data_pt'])
break
if pool['pool_name'] == pool_name:
data_pt = int(pool['data_pt'])
break
target_pg_num_raw = None
if data_pt and osds:
# Get configured ceph replication
replication, min_replication = StorageBackendConfig.get_ceph_pool_replication(self._db_api)
# [(Target PGs per OSD) * (# OSD) * (% Data) ]/ Size
target_pg_num_raw = ((osds * constants.CEPH_TARGET_PGS_PER_OSD * data_pt / 100) /
replication)
# find next highest power of 2 via shift bit length
target_pg_num = 1 << (int(target_pg_num_raw) - 1).bit_length()
LOG.info("OSD pool %s target_pg_num_raw=%s target_pg_num=%s "
"osds_raw=%s osds=%s" %
(pool_name, target_pg_num_raw, target_pg_num, osds_raw, osds))
return target_pg_num, osds_raw
def osd_pool_get(self, pool_name, param):
response, body = self._ceph_api.osd_pool_get(
pool_name, param, body='json')
@ -797,59 +328,6 @@ class CephOperator(object):
LOG.info("osdmap is rebuilt.")
return True
def reset_cache_tiering(self):
"""Restore Cache Tiering service by toggling the cache_enabled field.
The first step here is to disable cache_tiering.
"""
# return if restore is already ongoing
if self._cache_tiering.restore_task:
LOG.info("Cache Tiering restore task %s inprogress"
% self._cache_tiering.restore_task)
return
# No need to restore if Cache Tiering is not enabled
if not self._cache_tiering.is_cache_tiering_enabled():
LOG.info("Cache Tiering service is not enabled. No need to restore")
return True
else:
self._cache_tiering.restore_task = CACHE_TIER_RESTORE_TASK_DISABLE
cache_enabled = self._db_api.service_parameter_get_one(
service=SERVICE_TYPE_CEPH,
section=CACHE_TIER,
name=CACHE_TIER_CACHE_ENABLED)
self.service_parameter_update(
cache_enabled.uuid, CACHE_TIER_CACHE_ENABLED, 'false', CACHE_TIER)
try:
self.update_service_config(do_apply=True)
except RpcRemoteError as e:
raise wsme.exc.ClientSideError(str(e.value))
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.exception(e)
return True
def restore_cache_tiering(self):
"""Restore Cache Tiering service by toggling the cache_enabled field.
The second step here is to re-enable cache_tiering.
"""
cache_enabled = self._db_api.service_parameter_get_one(
service=SERVICE_TYPE_CEPH,
section=CACHE_TIER,
name=CACHE_TIER_CACHE_ENABLED)
self.service_parameter_update(
cache_enabled.uuid, CACHE_TIER_CACHE_ENABLED, 'true', CACHE_TIER)
try:
self.update_service_config(do_apply=True)
except RpcRemoteError as e:
raise wsme.exc.ClientSideError(str(e.value))
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.exception(e)
def restore_ceph_config(self, after_storage_enabled=False):
"""Restore Ceph configuration during Backup and Restore process.
@ -899,7 +377,6 @@ class CephOperator(object):
constants.CINDER_BACKEND_CEPH,
task=constants.SB_TASK_NONE
)
self._cache_tiering.restore_ceph_config_after_storage_enabled()
return True
# check if osdmap is emtpy as an indication for Backup and Restore
@ -1009,96 +486,14 @@ class CephOperator(object):
:param min_size: minimum number of replicas required for I/O
"""
# Determine the ruleset to use
if pool_name.endswith("-cache"):
# ruleset 1: is the ruleset for the cache tier
# Name: cache_tier_ruleset
ruleset = 1
else:
# ruleset 0: is the default ruleset if no crushmap is loaded or
# the ruleset for the backing tier if loaded:
# Name: storage_tier_ruleset
ruleset = 0
# ruleset 0: is the default ruleset if no crushmap is loaded or
# the ruleset for the backing tier if loaded:
# Name: storage_tier_ruleset
ruleset = 0
# Create the pool if not present
self._pool_create(pool_name, pg_num, pgp_num, ruleset, size, min_size)
def cache_pool_create(self, pool):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
# Due to http://tracker.ceph.com/issues/8043 we only audit
# caching pool PGs when the pools are created, for now.
pg_num, _ = self._calculate_target_pg_num(self.get_caching_hosts(), cache_pool)
self.create_or_resize_osd_pool(cache_pool, pg_num, pg_num)
def cache_pool_delete(self, pool):
cache_pool = pool['pool_name'] + '-cache'
self.delete_osd_pool(cache_pool)
def cache_tier_add(self, pool):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
response, body = self._ceph_api.osd_tier_add(
backing_pool, cache_pool,
force_nonempty="--force-nonempty",
body='json')
if response.ok:
LOG.info(_("Added OSD tier: "
"backing_pool={}, cache_pool={}").format(backing_pool, cache_pool))
else:
e = exception.CephPoolAddTierFailure(
backing_pool=backing_pool,
cache_pool=cache_pool,
response_status_code=response.status_code,
response_reason=response.reason,
status=body.get('status'),
output=body.get('output'))
LOG.warn(e)
raise e
def cache_tier_remove(self, pool):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
response, body = self._ceph_api.osd_tier_remove(
backing_pool, cache_pool, body='json')
if response.ok:
LOG.info(_("Removed OSD tier: "
"backing_pool={}, cache_pool={}").format(backing_pool, cache_pool))
else:
e = exception.CephPoolRemoveTierFailure(
backing_pool=backing_pool,
cache_pool=cache_pool,
response_status_code=response.status_code,
response_reason=response.reason,
status=body.get('status'),
output=body.get('output'))
LOG.warn(e)
raise e
def cache_mode_set(self, pool, mode):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
response, body = self._ceph_api.osd_tier_cachemode(
cache_pool, mode, body='json')
if response.ok:
LOG.info(_("Set OSD tier cache mode: "
"cache_pool={}, mode={}").format(cache_pool, mode))
else:
e = exception.CephCacheSetModeFailure(
cache_pool=cache_pool,
response_status_code=response.status_code,
response_reason=response.reason,
status=body.get('status'),
output=body.get('output'))
LOG.warn(e)
raise e
def cache_pool_set_param(self, pool, name, value):
backing_pool = pool['pool_name']
cache_pool = backing_pool + '-cache'
self.osd_set_pool_param(cache_pool, name, value)
def service_parameter_get_all(self, section, name=None):
return self._db_api.service_parameter_get_all(
service=constants.SERVICE_TYPE_CEPH,
@ -1147,23 +542,6 @@ class CephOperator(object):
'name': name,
'value': value})
def get_caching_hosts(self):
storage_nodes = self._db_api.ihost_get_by_personality(constants.STORAGE)
ceph_caching_hosts = []
for node in storage_nodes:
if node.capabilities.get('pers_subtype') == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
ceph_caching_hosts.append(node)
return ceph_caching_hosts
def get_backing_hosts(self):
storage_nodes = self._db_api.ihost_get_by_personality(constants.STORAGE)
ceph_backing_hosts = []
for node in storage_nodes:
if ('pers_subtype' not in node.capabilities or
node.capabilities.get('pers_subtype') == constants.PERSONALITY_SUBTYPE_CEPH_BACKING):
ceph_backing_hosts.append(node)
return ceph_backing_hosts
def delete_osd_pool(self, pool_name):
"""Delete an osd pool
:param pool_name: pool name
@ -1449,7 +827,7 @@ class CephOperator(object):
pass
# Handle primary tier pools (cinder/glance/swift/ephemeral)
for pool in BACKING_POOLS:
for pool in CEPH_POOLS:
# TODO(rchurch): The following is added for R3->R4 upgrades. Can we
# remove this for R5? Or is there some R3->R4->R5 need to keep this
# around.
@ -1610,137 +988,13 @@ class CephOperator(object):
return False
return True
def check_all_group_cache_valid(self):
peers = self._db_api.peers_get_all_by_cluster(self.cluster_id)
if not len(peers):
return False
for peer in peers:
group_name = peer.name
if group_name.find("cache") != -1:
available_cnt = 0
host_cnt = 0
for host in self._db_api.ihost_get_by_personality(constants.STORAGE):
if peer.id == host['peer_id']:
host_cnt += 1
host_action_locking = False
host_action = host['ihost_action'] or ""
if (host_action.startswith(constants.FORCE_LOCK_ACTION) or
host_action.startswith(constants.LOCK_ACTION)):
host_action_locking = True
if (host['administrative'] == constants.ADMIN_UNLOCKED and
host['operational'] == constants.OPERATIONAL_ENABLED and
not host_action_locking):
available_cnt += 1
if (host_cnt > 0) and (available_cnt == 0):
return False
return True
def cache_tier_config_out_of_date_alarm_set(self):
entity_instance_id = "%s=%s" % (
fm_constants.FM_ENTITY_TYPE_CLUSTER,
self.cluster_ceph_uuid)
LOG.warn(_("Raise Ceph cache tier configuration out of date alarm: %s") % entity_instance_id)
self._fm_api.set_fault(
fm_api.Fault(
alarm_id=fm_constants.FM_ALARM_ID_CEPH_CACHE_TIER_CONFIG_OUT_OF_DATE,
alarm_state=fm_constants.FM_ALARM_STATE_SET,
entity_type_id=fm_constants.FM_ENTITY_TYPE_CLUSTER,
entity_instance_id=entity_instance_id,
severity=fm_constants.FM_ALARM_SEVERITY_MAJOR,
reason_text=_("Ceph Cache Tier: Configuration is out-of-date."),
alarm_type=fm_constants.FM_ALARM_TYPE_7,
probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_75,
proposed_repair_action=_("Run 'system service-parameter-apply ceph' "
"to apply Ceph service configuration"),
service_affecting=True))
def cache_tier_config_out_of_date_alarm_clear(self):
entity_instance_id = "%s=%s" % (
fm_constants.FM_ENTITY_TYPE_CLUSTER,
self.cluster_ceph_uuid)
LOG.warn(_("Clear Ceph cache tier configuration out of date alarm: %s") % entity_instance_id)
self._fm_api.clear_fault(
fm_constants.FM_ALARM_ID_CEPH_CACHE_TIER_CONFIG_OUT_OF_DATE,
entity_instance_id)
def cache_tiering_get_config(self):
return self._cache_tiering.get_config()
def get_pool_pg_num(self, pool_name):
pg_num, _ = self._calculate_target_pg_num(self.get_caching_hosts(),
pool_name)
# Make sure we return the max between the minimum configured value
# and computed target pg_num
for pool in (BACKING_POOLS + CACHE_POOLS):
# either object pool name is fine here
if pool_name in constants.CEPH_POOL_OBJECT_GATEWAY_NAME:
if pool['pool_name'] in constants.CEPH_POOL_OBJECT_GATEWAY_NAME:
break
if pool['pool_name'] == pool_name:
break
return max(pg_num, pool['pg_num'])
def update_service_config(self, do_apply=False):
if StorageBackendConfig.is_ceph_backend_restore_in_progress(self._db_api):
raise exception.CephPoolApplyRestoreInProgress()
if self._cache_tiering.operation_in_progress():
raise exception.CephPoolApplySetParamFailure()
# Each service parameter has three states:
# 1. First, the one that the client sees, stored in section:
# SERVICE_PARAM_SECTION_CEPH_CACHE_TIER
# 2. Second, the one that is stored when the client runs:
# 'system service-parameter-apply ceph' stored in:
# SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_DESIRED
# 3. Third, the one after the config is correctly applied:
# SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED
# When a service (e.g. ceph-manager) is restarted and finds that
# DESIRED != APPLIED then it takes corrective action.
# Get service parameters from DB, this should only be needed once
new_config = ServiceConfig(
self.service_parameter_get_all(
section=constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER))
desired_config = ServiceConfig(
self.service_parameter_get_all(
section=constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_DESIRED))
applied_config = ServiceConfig(
self.service_parameter_get_all(
section=constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED))
# Cache UUIDs for configs
if new_config:
self.config_uuids = new_config.uuid
if desired_config:
self.desired_config_uuids = desired_config.uuid
if applied_config:
self.applied_config_uuids = applied_config.uuid
if not do_apply:
if new_config != applied_config:
self.cache_tier_config_out_of_date_alarm_set()
else:
self.cache_tier_config_out_of_date_alarm_clear()
else:
self._cache_tiering.apply_service_config(new_config,
desired_config,
applied_config)
def cache_tiering_enable_cache_complete(self, *args):
self._cache_tiering.enable_cache_complete(*args)
def cache_tiering_disable_cache_complete(self, *args):
self._cache_tiering.disable_cache_complete(*args)
def get_pools_config(self):
for pool in BACKING_POOLS:
for pool in CEPH_POOLS:
# Here it is okay for object pool name is either
# constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
# constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER
pool['quota_gib'] = self.set_quota_gib(pool['pool_name'])
return BACKING_POOLS
return CEPH_POOLS
def get_ceph_primary_tier_size(self):
return rpc.call(CommonRpcContext(),
@ -1819,7 +1073,7 @@ class CephOperator(object):
Note: for a single OSD the value has to be less than 2048, formula:
[Total] / [total number of OSD] = [PGs/OSD]
3136 / 2 = 1568 < 2048
See constants.BACKING_POOLS for up to date values
See constants.CEPH_POOLS for up to date values
Secondary Tiers:
Minimum: <= 2 storage applies minimum. (512)
@ -1849,7 +1103,7 @@ class CephOperator(object):
if tiers_obj.uuid == self.primary_tier_uuid:
is_primary_tier = True
pools = (BACKING_POOLS + CACHE_POOLS)
pools = CEPH_POOLS
else:
is_primary_tier = False
pools = constants.SB_TIER_CEPH_POOLS
@ -2170,17 +1424,17 @@ class CephOperator(object):
self.audit_osd_quotas_for_tier(t)
audit = []
backing_hosts = self.get_backing_hosts()
storage_hosts = self._db_api.ihost_get_by_personality(constants.STORAGE)
# osd audit is not required for <= 2 hosts
if backing_hosts and len(backing_hosts) > 2:
if storage_hosts and len(storage_hosts) > 2:
if t.uuid == self.primary_tier_uuid:
# Query ceph to get rgw object pool name.
# To be safe let configure_osd_pools() be the only place that can
# update the object pool name in BACKING_POOLS, so we make a local
# copy of BACKING_POOLS here.
backing_pools_snapshot = copy.deepcopy(BACKING_POOLS)
for pool in backing_pools_snapshot:
# update the object pool name in CEPH_POOLS, so we make a local
# copy of CEPH_POOLS here.
pools_snapshot = copy.deepcopy(CEPH_POOLS)
for pool in pools_snapshot:
if pool['pool_name'] == constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL:
try:
pool_name = self.get_ceph_object_pool_name()
@ -2193,23 +1447,14 @@ class CephOperator(object):
'Reason: %(reason)s') % {'reason': str(e.message)})
break
audit = [(backing_pools_snapshot, backing_hosts)]
audit = [(pools_snapshot, storage_hosts)]
else:
# Adjust the pool name based on the current tier
pools_snapshot = copy.deepcopy(constants.SB_TIER_CEPH_POOLS)
for p in pools_snapshot:
p['pool_name'] += "-%s" % t.name
audit = [(pools_snapshot, backing_hosts)]
# Due to http://tracker.ceph.com/issues/8043 we only audit
# caching pool PGs when the pools are created, for now.
# Uncomment bellow to enable automatic configuration
# Audit backing and caching pools
# if self._cache_tiering.is_cache_tiering_enabled():
# caching_hosts = self.get_caching_hosts()
# if caching_hosts and len(caching_hosts) > 2:
# audit = audit.extend([(CACHE_POOLS, caching_hosts)])
audit = [(pools_snapshot, storage_hosts)]
if audit is not None:
for pools, storage_hosts in audit:

View File

@ -497,30 +497,6 @@ class ConductorManager(service.PeriodicService):
'value': None},
]
if tsc.system_type != constants.TIS_AIO_BUILD:
DEFAULT_PARAMETERS.extend([
{'service': constants.SERVICE_TYPE_CEPH,
'section': constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER,
'name': constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED,
'value': False
},
{'service': constants.SERVICE_TYPE_CEPH,
'section': constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED,
'name': constants.SERVICE_PARAM_CEPH_CACHE_TIER_FEATURE_ENABLED,
'value': False
},
{'service': constants.SERVICE_TYPE_CEPH,
'section': constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER,
'name': constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED,
'value': False
},
{'service': constants.SERVICE_TYPE_CEPH,
'section': constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED,
'name': constants.SERVICE_PARAM_CEPH_CACHE_TIER_CACHE_ENABLED,
'value': False
}]
)
def _create_default_service_parameter(self):
""" Populate the default service parameters"""
for p in ConductorManager.DEFAULT_PARAMETERS:
@ -1496,10 +1472,7 @@ class ConductorManager(service.PeriodicService):
# Ensure the OSD pools exists. In the case of a system restore,
# the pools must be re-created when the first storage node is
# unlocked.
if host.capabilities['pers_subtype'] == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
pass
else:
self._ceph.configure_osd_pools()
self._ceph.configure_osd_pools()
# Generate host configuration files
self._puppet.update_storage_config(host)
@ -1596,12 +1569,6 @@ class ConductorManager(service.PeriodicService):
elif host.personality == constants.COMPUTE:
self._configure_compute_host(context, host)
elif host.personality == constants.STORAGE:
subtype_dict = host.capabilities
if (host.hostname in
[constants.STORAGE_0_HOSTNAME, constants.STORAGE_1_HOSTNAME]):
if subtype_dict.get('pers_subtype') == constants.PERSONALITY_SUBTYPE_CEPH_CACHING:
raise exception.SysinvException(_("storage-0/storage-1 personality sub-type "
"is restricted to cache-backing"))
self._configure_storage_host(context, host)
else:
raise exception.SysinvException(_(
@ -8681,23 +8648,6 @@ class ConductorManager(service.PeriodicService):
def get_ceph_pools_config(self, context):
return self._ceph.get_pools_config()
def get_pool_pg_num(self, context, pool_name):
return self._ceph.get_pool_pg_num(pool_name)
def cache_tiering_get_config(self, context):
# During system startup ceph-manager may ask for this before the ceph
# operator has been instantiated
config = {}
if self._ceph:
config = self._ceph.cache_tiering_get_config()
return config
def cache_tiering_disable_cache_complete(self, context, success, exception, new_config, applied_config):
self._ceph.cache_tiering_disable_cache_complete(success, exception, new_config, applied_config)
def cache_tiering_enable_cache_complete(self, context, success, exception, new_config, applied_config):
self._ceph.cache_tiering_enable_cache_complete(success, exception, new_config, applied_config)
def get_controllerfs_lv_sizes(self, context):
system = self.dbapi.isystem_get_one()
system_dc_role = system.get('distributed_cloud_role', None)

View File

@ -524,7 +524,7 @@ class StorageTierDependentTCs(base.FunctionalTest):
def assertDeleted(self, fullPath):
self.get_json(fullPath, expect_errors=True) # Make sure this line raises an error
def _create_storage_ihost(self, hostname, pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING):
def _create_storage_ihost(self, hostname):
self.host_index += 1
ihost_dict = dbutils.get_test_ihost(
id=self.host_index,
@ -537,10 +537,7 @@ class StorageTierDependentTCs(base.FunctionalTest):
administrative='locked',
operational='disabled',
availability='online',
invprovision='unprovisioned',
capabilities={
'pers_subtype': pers_subtype,
})
invprovision='unprovisioned')
return self.dbapi.ihost_create(ihost_dict)
#
@ -548,7 +545,7 @@ class StorageTierDependentTCs(base.FunctionalTest):
#
def test_cluster_tier_host_osd(self):
storage_0 = self._create_storage_ihost('storage-0', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_0 = self._create_storage_ihost('storage-0')
disk_0 = dbutils.create_test_idisk(device_node='/dev/sda',
device_path='/dev/disk/by-path/pci-0000:00:0d.0-ata-1.0',
forihostid=storage_0.id)

View File

@ -1,7 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# coding=utf-8
# Copyright (c) 2017 Wind River Systems, Inc.
# Copyright (c) 2017-2018 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -19,7 +19,6 @@ from sysinv.openstack.common import context
from sysinv.openstack.common import uuidutils
from sysinv.tests.db import base
from sysinv.tests.db import utils
from sysinv.common import exception
class UpdateCephCluster(base.DbTestCase):
@ -32,8 +31,6 @@ class UpdateCephCluster(base.DbTestCase):
# Tests for initial provisioning
# - test_add_storage_0_no_fsid
# - test_add_storage_0_fsid
# - test_add_storage_0_caching
# - test_add_storage_1_caching
# - test_add_storage_0
# - test_add_storage_1
# - test_add_3_storage_backing
@ -41,8 +38,7 @@ class UpdateCephCluster(base.DbTestCase):
# - test_cgts_7208
# Tests for adding patterns of hosts based on subtype:
# - test_add_valid_mix_tiers
# - test_add_4_mix_bbbc
# - test_add_4_mix_bbcb
# - test_add_4_mix_bbbb
def setUp(self):
super(UpdateCephCluster, self).setUp()
@ -54,7 +50,7 @@ class UpdateCephCluster(base.DbTestCase):
self.load = utils.create_test_load()
self.host_index = -1
def _create_storage_ihost(self, hostname, pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING):
def _create_storage_ihost(self, hostname):
self.host_index += 1
ihost_dict = utils.get_test_ihost(
id=self.host_index,
@ -67,10 +63,7 @@ class UpdateCephCluster(base.DbTestCase):
administrative='unlocked',
operational='enabled',
availability='available',
invprovision='unprovisioned',
capabilities={
'pers_subtype': pers_subtype,
})
invprovision='unprovisioned')
return self.dbapi.ihost_create(ihost_dict)
def test_init_fsid_none(self):
@ -97,7 +90,7 @@ class UpdateCephCluster(base.DbTestCase):
self.service._ceph.cluster_db_uuid)
def test_init_fsid_update_on_unlock(self):
storage_0 = self._create_storage_ihost('storage-0', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_0 = self._create_storage_ihost('storage-0')
# Mock the fsid call so that we don't have to wait for the timeout
with mock.patch.object(ceph.CephWrapper, 'fsid') as mock_fsid:
@ -137,7 +130,7 @@ class UpdateCephCluster(base.DbTestCase):
self.assertIsNone(self.service._ceph.cluster_ceph_uuid)
self.assertNotEquals(self.dbapi.clusters_get_all(type=constants.CINDER_BACKEND_CEPH), [])
storage_0 = self._create_storage_ihost('storage-0', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_0 = self._create_storage_ihost('storage-0')
with mock.patch.object(ceph.CephWrapper, 'fsid') as mock_fsid:
self.assertIsNone(self.service._ceph.cluster_ceph_uuid)
@ -167,9 +160,7 @@ class UpdateCephCluster(base.DbTestCase):
self.assertEqual(len(clusters), 1)
self.assertEqual(clusters[0].cluster_uuid, cluster_uuid)
storage_0 = self._create_storage_ihost(
'storage-0',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_0 = self._create_storage_ihost('storage-0')
self.service._ceph.update_ceph_cluster(storage_0)
ihost = self.dbapi.ihost_get(storage_0.id)
self.assertEqual(storage_0.id, ihost.id)
@ -182,67 +173,6 @@ class UpdateCephCluster(base.DbTestCase):
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0',))})
def test_add_storage_0_caching(self):
# Mock fsid with a faux cluster_uuid
cluster_uuid = uuidutils.generate_uuid()
with mock.patch.object(ceph.CephWrapper, 'fsid') as mock_fsid:
mock_fsid.return_value = (mock.MagicMock(ok=True), cluster_uuid)
self.service.start()
mock_fsid.assert_called()
storage_0 = self._create_storage_ihost(
'storage-0',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_CACHING)
self.assertRaises(
exception.StorageSubTypeUnexpected,
self.service._ceph.update_ceph_cluster,
storage_0)
clusters = self.dbapi.clusters_get_all(type=constants.CINDER_BACKEND_CEPH)
self.assertEqual(len(clusters), 1)
self.assertEqual(clusters[0].cluster_uuid, cluster_uuid)
# check no (unexpected) peers exist
peers = self.dbapi.peers_get_all_by_cluster(clusters[0].id)
self.assertEqual(
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
set())
def test_add_storage_1_caching(self):
# Mock fsid with a faux cluster_uuid
cluster_uuid = uuidutils.generate_uuid()
with mock.patch.object(ceph.CephWrapper, 'fsid') as mock_fsid:
mock_fsid.return_value = (mock.MagicMock(ok=True), cluster_uuid)
self.service.start()
mock_fsid.assert_called()
storage_0 = self._create_storage_ihost(
'storage-0',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
self.service._ceph.update_ceph_cluster(storage_0)
clusters = self.dbapi.clusters_get_all(type=constants.CINDER_BACKEND_CEPH)
self.assertEqual(len(clusters), 1)
self.assertEqual(clusters[0].cluster_uuid, cluster_uuid)
peers = self.dbapi.peers_get_all_by_cluster(clusters[0].id)
self.assertEqual(
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0',))})
storage_1 = self._create_storage_ihost(
'storage-1',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_CACHING)
self.assertRaises(
exception.StorageSubTypeUnexpected,
self.service._ceph.update_ceph_cluster,
storage_1)
peers = self.dbapi.peers_get_all_by_cluster(clusters[0].id)
self.assertEqual(
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0',))})
def test_add_storage_0(self):
# Mock the fsid call so that we don't have to wait for the timeout
with mock.patch.object(ceph.CephWrapper, 'fsid') as mock_fsid:
@ -253,9 +183,7 @@ class UpdateCephCluster(base.DbTestCase):
self.assertIsNone(self.service._ceph.cluster_ceph_uuid)
self.assertNotEqual(self.dbapi.clusters_get_all(type=constants.CINDER_BACKEND_CEPH), [])
storage_0 = self._create_storage_ihost(
'storage-0',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_0 = self._create_storage_ihost('storage-0')
cluster_uuid = uuidutils.generate_uuid()
with mock.patch.object(ceph.CephWrapper, 'fsid') as mock_fsid:
@ -293,9 +221,7 @@ class UpdateCephCluster(base.DbTestCase):
self.assertEqual(len(clusters), 1)
self.assertEqual(clusters[0].cluster_uuid, cluster_uuid)
storage_0 = self._create_storage_ihost(
'storage-0',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_0 = self._create_storage_ihost('storage-0')
self.service._ceph.update_ceph_cluster(storage_0)
peers = self.dbapi.peers_get_all_by_cluster(clusters[0].id)
@ -303,9 +229,7 @@ class UpdateCephCluster(base.DbTestCase):
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0',))})
storage_1 = self._create_storage_ihost(
'storage-1',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_1 = self._create_storage_ihost('storage-1')
self.service._ceph.update_ceph_cluster(storage_1)
ihost = self.dbapi.ihost_get(storage_1.id)
self.assertEqual(storage_1.id, ihost.id)
@ -331,9 +255,7 @@ class UpdateCephCluster(base.DbTestCase):
self.assertEqual(len(clusters), 1)
self.assertEqual(clusters[0].cluster_uuid, cluster_uuid)
storage_0 = self._create_storage_ihost(
'storage-0',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_0 = self._create_storage_ihost('storage-0')
self.service._ceph.update_ceph_cluster(storage_0)
ihost = self.dbapi.ihost_get(storage_0.id)
self.assertEqual(storage_0.id, ihost.id)
@ -346,9 +268,7 @@ class UpdateCephCluster(base.DbTestCase):
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0',)),})
storage_1 = self._create_storage_ihost(
'storage-1',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_1 = self._create_storage_ihost('storage-1')
self.service._ceph.update_ceph_cluster(storage_1)
ihost = self.dbapi.ihost_get(storage_1.id)
self.assertEqual(storage_1.id, ihost.id)
@ -361,9 +281,7 @@ class UpdateCephCluster(base.DbTestCase):
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0', 'storage-1')),})
storage_2 = self._create_storage_ihost(
'storage-2',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_2 = self._create_storage_ihost('storage-2')
self.service._ceph.update_ceph_cluster(storage_2)
ihost = self.dbapi.ihost_get(storage_2.id)
self.assertEqual(storage_2.id, ihost.id)
@ -378,10 +296,10 @@ class UpdateCephCluster(base.DbTestCase):
('group-1', ('storage-2',))})
def test_cgts_7208(self):
hosts = [self._create_storage_ihost('storage-0', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING),
self._create_storage_ihost('storage-1', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING),
self._create_storage_ihost('storage-2', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING),
self._create_storage_ihost('storage-3', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)]
hosts = [self._create_storage_ihost('storage-0'),
self._create_storage_ihost('storage-1'),
self._create_storage_ihost('storage-2'),
self._create_storage_ihost('storage-3')]
expected_groups = {'storage-0': 'group-0', 'storage-1': 'group-0',
'storage-2': 'group-1', 'storage-3': 'group-1'}
@ -429,19 +347,19 @@ class UpdateCephCluster(base.DbTestCase):
self.assertEqual(set(peer.hosts), expected_peer_hosts2[h.hostname])
def test_add_valid_mix_tiers(self):
hosts = [self._create_storage_ihost('storage-0', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING),
self._create_storage_ihost('storage-1', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING),
self._create_storage_ihost('storage-2', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_CACHING),
self._create_storage_ihost('storage-3', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_CACHING),
self._create_storage_ihost('storage-4', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING),
self._create_storage_ihost('storage-5', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING),
self._create_storage_ihost('storage-6', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_CACHING),
self._create_storage_ihost('storage-7', pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_CACHING)]
hosts = [self._create_storage_ihost('storage-0'),
self._create_storage_ihost('storage-1'),
self._create_storage_ihost('storage-2'),
self._create_storage_ihost('storage-3'),
self._create_storage_ihost('storage-4'),
self._create_storage_ihost('storage-5'),
self._create_storage_ihost('storage-6'),
self._create_storage_ihost('storage-7')]
expected_groups = {'storage-0': 'group-0' , 'storage-1': 'group-0',
'storage-2': 'group-cache-0', 'storage-3': 'group-cache-0',
'storage-4': 'group-1' , 'storage-5': 'group-1',
'storage-6': 'group-cache-1', 'storage-7': 'group-cache-1'}
expected_groups = {'storage-0': 'group-0', 'storage-1': 'group-0',
'storage-2': 'group-1', 'storage-3': 'group-1',
'storage-4': 'group-2', 'storage-5': 'group-2',
'storage-6': 'group-3', 'storage-7': 'group-3'}
expected_peer_hosts = {'storage-0': {'storage-0'}, 'storage-1': {'storage-0', 'storage-1'},
'storage-2': {'storage-2'}, 'storage-3': {'storage-2', 'storage-3'},
@ -464,7 +382,7 @@ class UpdateCephCluster(base.DbTestCase):
self.assertEqual(peer.name, expected_groups[h.hostname])
self.assertEqual(set(peer.hosts), expected_peer_hosts[h.hostname])
def test_add_4_mix_bbbc(self):
def test_add_4_mix_bbbb(self):
# Mock fsid with a faux cluster_uuid
cluster_uuid = uuidutils.generate_uuid()
with mock.patch.object(ceph.CephWrapper, 'fsid') as mock_fsid:
@ -472,9 +390,7 @@ class UpdateCephCluster(base.DbTestCase):
self.service.start()
mock_fsid.assert_called()
storage_0 = self._create_storage_ihost(
'storage-0',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_0 = self._create_storage_ihost('storage-0')
self.service._ceph.update_ceph_cluster(storage_0)
ihost = self.dbapi.ihost_get(storage_0.id)
self.assertEqual(storage_0.id, ihost.id)
@ -487,9 +403,7 @@ class UpdateCephCluster(base.DbTestCase):
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0',)),})
storage_1 = self._create_storage_ihost(
'storage-1',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_1 = self._create_storage_ihost('storage-1')
self.service._ceph.update_ceph_cluster(storage_1)
ihost = self.dbapi.ihost_get(storage_1.id)
self.assertEqual(storage_1.id, ihost.id)
@ -502,9 +416,7 @@ class UpdateCephCluster(base.DbTestCase):
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0', 'storage-1')),})
storage_2 = self._create_storage_ihost(
'storage-2',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_2 = self._create_storage_ihost('storage-2')
self.service._ceph.update_ceph_cluster(storage_2)
ihost = self.dbapi.ihost_get(storage_2.id)
self.assertEqual(storage_2.id, ihost.id)
@ -518,80 +430,7 @@ class UpdateCephCluster(base.DbTestCase):
{('group-0', ('storage-0', 'storage-1')),
('group-1', ('storage-2',))})
storage_3 = self._create_storage_ihost(
'storage-3',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_CACHING)
self.service._ceph.update_ceph_cluster(storage_3)
ihost = self.dbapi.ihost_get(storage_3.id)
self.assertEqual(storage_3.id, ihost.id)
peer = self.dbapi.peer_get(ihost.peer_id)
self.assertEqual(peer.name, 'group-cache-0')
self.assertIn(ihost.hostname, peer.hosts)
peers = self.dbapi.peers_get_all_by_cluster(cluster_uuid)
self.assertEqual(
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0', 'storage-1')),
('group-1', ('storage-2',)),
('group-cache-0', ('storage-3',))})
def test_add_4_mix_bbcb(self):
# Mock fsid with a faux cluster_uuid
cluster_uuid = uuidutils.generate_uuid()
with mock.patch.object(ceph.CephWrapper, 'fsid') as mock_fsid:
mock_fsid.return_value = (mock.MagicMock(ok=True), cluster_uuid)
self.service.start()
mock_fsid.assert_called()
storage_0 = self._create_storage_ihost(
'storage-0',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
self.service._ceph.update_ceph_cluster(storage_0)
ihost = self.dbapi.ihost_get(storage_0.id)
self.assertEqual(storage_0.id, ihost.id)
peer = self.dbapi.peer_get(ihost.peer_id)
self.assertEqual(peer.name, 'group-0')
self.assertIn(ihost.hostname, peer.hosts)
peers = self.dbapi.peers_get_all_by_cluster(cluster_uuid)
self.assertEqual(
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0',)),})
storage_1 = self._create_storage_ihost(
'storage-1',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
self.service._ceph.update_ceph_cluster(storage_1)
ihost = self.dbapi.ihost_get(storage_1.id)
self.assertEqual(storage_1.id, ihost.id)
peer = self.dbapi.peer_get(ihost.peer_id)
self.assertEqual(peer.name, 'group-0')
self.assertIn(ihost.hostname, peer.hosts)
peers = self.dbapi.peers_get_all_by_cluster(cluster_uuid)
self.assertEqual(
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0', 'storage-1')),})
storage_2 = self._create_storage_ihost(
'storage-2',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_CACHING)
self.service._ceph.update_ceph_cluster(storage_2)
ihost = self.dbapi.ihost_get(storage_2.id)
self.assertEqual(storage_2.id, ihost.id)
peer = self.dbapi.peer_get(ihost.peer_id)
self.assertEqual(peer.name, 'group-cache-0')
self.assertIn(ihost.hostname, peer.hosts)
peers = self.dbapi.peers_get_all_by_cluster(cluster_uuid)
self.assertEqual(
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0', 'storage-1')),
('group-cache-0', ('storage-2',))})
storage_3 = self._create_storage_ihost(
'storage-3',
pers_subtype=constants.PERSONALITY_SUBTYPE_CEPH_BACKING)
storage_3 = self._create_storage_ihost('storage-3')
self.service._ceph.update_ceph_cluster(storage_3)
ihost = self.dbapi.ihost_get(storage_3.id)
self.assertEqual(storage_3.id, ihost.id)
@ -603,5 +442,4 @@ class UpdateCephCluster(base.DbTestCase):
self.assertEqual(
set([(p.name, tuple(sorted(p.hosts))) for p in peers]),
{('group-0', ('storage-0', 'storage-1')),
('group-cache-0', ('storage-2',)),
('group-1', ('storage-3',))})
('group-1', ('storage-2', 'storage-3'))})