# Copyright 2014-2021 Canonical Limited. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # This file is sourced from lp:openstack-charm-helpers # # Authors: # James Page # Adam Gandelman # import collections import errno import hashlib import math import six import os import shutil import json import time import uuid from subprocess import ( check_call, check_output, CalledProcessError, ) from charmhelpers import deprecate from charmhelpers.core.hookenv import ( application_name, config, service_name, local_unit, relation_get, relation_ids, relation_set, related_units, log, DEBUG, INFO, WARNING, ERROR, ) from charmhelpers.core.host import ( mount, mounts, service_start, service_stop, service_running, umount, cmp_pkgrevno, ) from charmhelpers.fetch import ( apt_install, ) from charmhelpers.core.unitdata import kv from charmhelpers.core.kernel import modprobe from charmhelpers.contrib.openstack.utils import config_flags_parser KEYRING = '/etc/ceph/ceph.client.{}.keyring' KEYFILE = '/etc/ceph/ceph.client.{}.key' CEPH_CONF = """[global] auth supported = {auth} keyring = {keyring} mon host = {mon_hosts} log to syslog = {use_syslog} err to syslog = {use_syslog} clog to syslog = {use_syslog} """ # The number of placement groups per OSD to target for placement group # calculations. This number is chosen as 100 due to the ceph PG Calc # documentation recommending to choose 100 for clusters which are not # expected to increase in the foreseeable future. Since the majority of the # calculations are done on deployment, target the case of non-expanding # clusters as the default. DEFAULT_PGS_PER_OSD_TARGET = 100 DEFAULT_POOL_WEIGHT = 10.0 LEGACY_PG_COUNT = 200 DEFAULT_MINIMUM_PGS = 2 AUTOSCALER_DEFAULT_PGS = 32 class OsdPostUpgradeError(Exception): """Error class for OSD post-upgrade operations.""" pass class OSDSettingConflict(Exception): """Error class for conflicting osd setting requests.""" pass class OSDSettingNotAllowed(Exception): """Error class for a disallowed setting.""" pass OSD_SETTING_EXCEPTIONS = (OSDSettingConflict, OSDSettingNotAllowed) OSD_SETTING_WHITELIST = [ 'osd heartbeat grace', 'osd heartbeat interval', ] def _order_dict_by_key(rdict): """Convert a dictionary into an OrderedDict sorted by key. :param rdict: Dictionary to be ordered. :type rdict: dict :returns: Ordered Dictionary. :rtype: collections.OrderedDict """ return collections.OrderedDict(sorted(rdict.items(), key=lambda k: k[0])) def get_osd_settings(relation_name): """Consolidate requested osd settings from all clients. Consolidate requested osd settings from all clients. Check that the requested setting is on the whitelist and it does not conflict with any other requested settings. :returns: Dictionary of settings :rtype: dict :raises: OSDSettingNotAllowed :raises: OSDSettingConflict """ rel_ids = relation_ids(relation_name) osd_settings = {} for relid in rel_ids: for unit in related_units(relid): unit_settings = relation_get('osd-settings', unit, relid) or '{}' unit_settings = json.loads(unit_settings) for key, value in unit_settings.items(): if key not in OSD_SETTING_WHITELIST: msg = 'Illegal settings "{}"'.format(key) raise OSDSettingNotAllowed(msg) if key in osd_settings: if osd_settings[key] != unit_settings[key]: msg = 'Conflicting settings for "{}"'.format(key) raise OSDSettingConflict(msg) else: osd_settings[key] = value return _order_dict_by_key(osd_settings) def send_application_name(relid=None): """Send the application name down the relation. :param relid: Relation id to set application name in. :type relid: str """ relation_set( relation_id=relid, relation_settings={'application-name': application_name()}) def send_osd_settings(): """Pass on requested OSD settings to osd units.""" try: settings = get_osd_settings('client') except OSD_SETTING_EXCEPTIONS as e: # There is a problem with the settings, not passing them on. Update # status will notify the user. log(e, level=ERROR) return data = { 'osd-settings': json.dumps(settings, sort_keys=True)} for relid in relation_ids('osd'): relation_set(relation_id=relid, relation_settings=data) def validator(value, valid_type, valid_range=None): """Helper function for type validation. Used to validate these: https://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values https://docs.ceph.com/docs/master/rados/configuration/bluestore-config-ref/#inline-compression Example input: validator(value=1, valid_type=int, valid_range=[0, 2]) This says I'm testing value=1. It must be an int inclusive in [0,2] :param value: The value to validate. :type value: any :param valid_type: The type that value should be. :type valid_type: any :param valid_range: A range of values that value can assume. :type valid_range: Optional[Union[List,Tuple]] :raises: AssertionError, ValueError """ assert isinstance(value, valid_type), ( "{} is not a {}".format(value, valid_type)) if valid_range is not None: assert isinstance( valid_range, list) or isinstance(valid_range, tuple), ( "valid_range must be of type List or Tuple, " "was given {} of type {}" .format(valid_range, type(valid_range))) # If we're dealing with strings if isinstance(value, six.string_types): assert value in valid_range, ( "{} is not in the list {}".format(value, valid_range)) # Integer, float should have a min and max else: if len(valid_range) != 2: raise ValueError( "Invalid valid_range list of {} for {}. " "List must be [min,max]".format(valid_range, value)) assert value >= valid_range[0], ( "{} is less than minimum allowed value of {}" .format(value, valid_range[0])) assert value <= valid_range[1], ( "{} is greater than maximum allowed value of {}" .format(value, valid_range[1])) class PoolCreationError(Exception): """A custom exception to inform the caller that a pool creation failed. Provides an error message """ def __init__(self, message): super(PoolCreationError, self).__init__(message) class BasePool(object): """An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool. Do not call create() on this base class as it will raise an exception. Instantiate a child class and call create(). """ # Dictionary that maps pool operation properties to Tuples with valid type # and valid range op_validation_map = { 'compression-algorithm': (str, ('lz4', 'snappy', 'zlib', 'zstd')), 'compression-mode': (str, ('none', 'passive', 'aggressive', 'force')), 'compression-required-ratio': (float, None), 'compression-min-blob-size': (int, None), 'compression-min-blob-size-hdd': (int, None), 'compression-min-blob-size-ssd': (int, None), 'compression-max-blob-size': (int, None), 'compression-max-blob-size-hdd': (int, None), 'compression-max-blob-size-ssd': (int, None), 'rbd-mirroring-mode': (str, ('image', 'pool')) } def __init__(self, service, name=None, percent_data=None, app_name=None, op=None): """Initialize BasePool object. Pool information is either initialized from individual keyword arguments or from a individual CephBrokerRq operation Dict. :param service: The Ceph user name to run commands under. :type service: str :param name: Name of pool to operate on. :type name: str :param percent_data: The expected pool size in relation to all available resources in the Ceph cluster. Will be used to set the ``target_size_ratio`` pool property. (default: 10.0) :type percent_data: Optional[float] :param app_name: Ceph application name, usually one of: ('cephfs', 'rbd', 'rgw') (default: 'unknown') :type app_name: Optional[str] :param op: Broker request Op to compile pool data from. :type op: Optional[Dict[str,any]] :raises: KeyError """ # NOTE: Do not perform initialization steps that require live data from # a running cluster here. The *Pool classes may be used for validation. self.service = service self.op = op or {} if op: # When initializing from op the `name` attribute is required and we # will fail with KeyError if it is not provided. self.name = op['name'] self.percent_data = op.get('weight') self.app_name = op.get('app-name') else: self.name = name self.percent_data = percent_data self.app_name = app_name # Set defaults for these if they are not provided self.percent_data = self.percent_data or 10.0 self.app_name = self.app_name or 'unknown' def validate(self): """Check that value of supplied operation parameters are valid. :raises: ValueError """ for op_key, op_value in self.op.items(): if op_key in self.op_validation_map and op_value is not None: valid_type, valid_range = self.op_validation_map[op_key] try: validator(op_value, valid_type, valid_range) except (AssertionError, ValueError) as e: # Normalize on ValueError, also add information about which # variable we had an issue with. raise ValueError("'{}': {}".format(op_key, str(e))) def _create(self): """Perform the pool creation, method MUST be overridden by child class. """ raise NotImplementedError def _post_create(self): """Perform common post pool creation tasks. Note that pool properties subject to change during the lifetime of a pool / deployment should go into the ``update`` method. Do not add calls for a specific pool type here, those should go into one of the pool specific classes. """ nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 if nautilus_or_later: # Ensure we set the expected pool ratio update_pool( client=self.service, pool=self.name, settings={ 'target_size_ratio': str( self.percent_data / 100.0), }) try: set_app_name_for_pool(client=self.service, pool=self.name, name=self.app_name) except CalledProcessError: log('Could not set app name for pool {}' .format(self.name), level=WARNING) if 'pg_autoscaler' in enabled_manager_modules(): try: enable_pg_autoscale(self.service, self.name) except CalledProcessError as e: log('Could not configure auto scaling for pool {}: {}' .format(self.name, e), level=WARNING) def create(self): """Create pool and perform any post pool creation tasks. To allow for sharing of common code among pool specific classes the processing has been broken out into the private methods ``_create`` and ``_post_create``. Do not add any pool type specific handling here, that should go into one of the pool specific classes. """ if not pool_exists(self.service, self.name): self.validate() self._create() self._post_create() self.update() def set_quota(self): """Set a quota if requested. :raises: CalledProcessError """ max_bytes = self.op.get('max-bytes') max_objects = self.op.get('max-objects') if max_bytes or max_objects: set_pool_quota(service=self.service, pool_name=self.name, max_bytes=max_bytes, max_objects=max_objects) def set_compression(self): """Set compression properties if requested. :raises: CalledProcessError """ compression_properties = { key.replace('-', '_'): value for key, value in self.op.items() if key in ( 'compression-algorithm', 'compression-mode', 'compression-required-ratio', 'compression-min-blob-size', 'compression-min-blob-size-hdd', 'compression-min-blob-size-ssd', 'compression-max-blob-size', 'compression-max-blob-size-hdd', 'compression-max-blob-size-ssd') and value} if compression_properties: update_pool(self.service, self.name, compression_properties) def update(self): """Update properties for an already existing pool. Do not add calls for a specific pool type here, those should go into one of the pool specific classes. """ self.validate() self.set_quota() self.set_compression() def add_cache_tier(self, cache_pool, mode): """Adds a new cache tier to an existing pool. :param cache_pool: The cache tier pool name to add. :type cache_pool: str :param mode: The caching mode to use for this pool. valid range = ["readonly", "writeback"] :type mode: str """ # Check the input types and values validator(value=cache_pool, valid_type=six.string_types) validator( value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"]) check_call([ 'ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool, ]) check_call([ 'ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode, ]) check_call([ 'ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool, ]) check_call([ 'ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom', ]) def remove_cache_tier(self, cache_pool): """Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete. :param cache_pool: The cache tier pool name to remove. :type cache_pool: str """ # read-only is easy, writeback is much harder mode = get_cache_mode(self.service, cache_pool) if mode == 'readonly': check_call([ 'ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none' ]) check_call([ 'ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool, ]) elif mode == 'writeback': pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'forward'] if cmp_pkgrevno('ceph-common', '10.1') >= 0: # Jewel added a mandatory flag pool_forward_cmd.append('--yes-i-really-mean-it') check_call(pool_forward_cmd) # Flush the cache and wait for it to return check_call([ 'rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all']) check_call([ 'ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name]) check_call([ 'ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool]) def get_pgs(self, pool_size, percent_data=DEFAULT_POOL_WEIGHT, device_class=None): """Return the number of placement groups to use when creating the pool. Returns the number of placement groups which should be specified when creating the pool. This is based upon the calculation guidelines provided by the Ceph Placement Group Calculator (located online at http://ceph.com/pgcalc/). The number of placement groups are calculated using the following: (Target PGs per OSD) * (OSD #) * (%Data) ---------------------------------------- (Pool size) Per the upstream guidelines, the OSD # should really be considered based on the number of OSDs which are eligible to be selected by the pool. Since the pool creation doesn't specify any of CRUSH set rules, the default rule will be dependent upon the type of pool being created (replicated or erasure). This code makes no attempt to determine the number of OSDs which can be selected for the specific rule, rather it is left to the user to tune in the form of 'expected-osd-count' config option. :param pool_size: pool_size is either the number of replicas for replicated pools or the K+M sum for erasure coded pools :type pool_size: int :param percent_data: the percentage of data that is expected to be contained in the pool for the specific OSD set. Default value is to assume 10% of the data is for this pool, which is a relatively low % of the data but allows for the pg_num to be increased. NOTE: the default is primarily to handle the scenario where related charms requiring pools has not been upgraded to include an update to indicate their relative usage of the pools. :type percent_data: float :param device_class: class of storage to use for basis of pgs calculation; ceph supports nvme, ssd and hdd by default based on presence of devices of each type in the deployment. :type device_class: str :returns: The number of pgs to use. :rtype: int """ # Note: This calculation follows the approach that is provided # by the Ceph PG Calculator located at http://ceph.com/pgcalc/. validator(value=pool_size, valid_type=int) # Ensure that percent data is set to something - even with a default # it can be set to None, which would wreak havoc below. if percent_data is None: percent_data = DEFAULT_POOL_WEIGHT # If the expected-osd-count is specified, then use the max between # the expected-osd-count and the actual osd_count osd_list = get_osds(self.service, device_class) expected = config('expected-osd-count') or 0 if osd_list: if device_class: osd_count = len(osd_list) else: osd_count = max(expected, len(osd_list)) # Log a message to provide some insight if the calculations claim # to be off because someone is setting the expected count and # there are more OSDs in reality. Try to make a proper guess # based upon the cluster itself. if not device_class and expected and osd_count != expected: log("Found more OSDs than provided expected count. " "Using the actual count instead", INFO) elif expected: # Use the expected-osd-count in older ceph versions to allow for # a more accurate pg calculations osd_count = expected else: # NOTE(james-page): Default to 200 for older ceph versions # which don't support OSD query from cli return LEGACY_PG_COUNT percent_data /= 100.0 target_pgs_per_osd = config( 'pgs-per-osd') or DEFAULT_PGS_PER_OSD_TARGET num_pg = (target_pgs_per_osd * osd_count * percent_data) // pool_size # NOTE: ensure a sane minimum number of PGS otherwise we don't get any # reasonable data distribution in minimal OSD configurations if num_pg < DEFAULT_MINIMUM_PGS: num_pg = DEFAULT_MINIMUM_PGS # The CRUSH algorithm has a slight optimization for placement groups # with powers of 2 so find the nearest power of 2. If the nearest # power of 2 is more than 25% below the original value, the next # highest value is used. To do this, find the nearest power of 2 such # that 2^n <= num_pg, check to see if its within the 25% tolerance. exponent = math.floor(math.log(num_pg, 2)) nearest = 2 ** exponent if (num_pg - nearest) > (num_pg * 0.25): # Choose the next highest power of 2 since the nearest is more # than 25% below the original value. return int(nearest * 2) else: return int(nearest) class Pool(BasePool): """Compatibility shim for any descendents external to this library.""" @deprecate( 'The ``Pool`` baseclass has been replaced by ``BasePool`` class.') def __init__(self, service, name): super(Pool, self).__init__(service, name=name) def create(self): pass class ReplicatedPool(BasePool): def __init__(self, service, name=None, pg_num=None, replicas=None, percent_data=None, app_name=None, op=None): """Initialize ReplicatedPool object. Pool information is either initialized from individual keyword arguments or from a individual CephBrokerRq operation Dict. Please refer to the docstring of the ``BasePool`` class for documentation of the common parameters. :param pg_num: Express wish for number of Placement Groups (this value is subject to validation against a running cluster prior to use to avoid creating a pool with too many PGs) :type pg_num: int :param replicas: Number of copies there should be of each object added to this replicated pool. :type replicas: int :raises: KeyError """ # NOTE: Do not perform initialization steps that require live data from # a running cluster here. The *Pool classes may be used for validation. # The common parameters are handled in our parents initializer super(ReplicatedPool, self).__init__( service=service, name=name, percent_data=percent_data, app_name=app_name, op=op) if op: # When initializing from op `replicas` is a required attribute, and # we will fail with KeyError if it is not provided. self.replicas = op['replicas'] self.pg_num = op.get('pg_num') else: self.replicas = replicas or 2 self.pg_num = pg_num def _create(self): # Do extra validation on pg_num with data from live cluster if self.pg_num: # Since the number of placement groups were specified, ensure # that there aren't too many created. max_pgs = self.get_pgs(self.replicas, 100.0) self.pg_num = min(self.pg_num, max_pgs) else: self.pg_num = self.get_pgs(self.replicas, self.percent_data) nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 # Create it if nautilus_or_later: cmd = [ 'ceph', '--id', self.service, 'osd', 'pool', 'create', '--pg-num-min={}'.format( min(AUTOSCALER_DEFAULT_PGS, self.pg_num) ), self.name, str(self.pg_num) ] else: cmd = [ 'ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(self.pg_num) ] check_call(cmd) def _post_create(self): # Set the pool replica size update_pool(client=self.service, pool=self.name, settings={'size': str(self.replicas)}) # Perform other common post pool creation tasks super(ReplicatedPool, self)._post_create() class ErasurePool(BasePool): """Default jerasure erasure coded pool.""" def __init__(self, service, name=None, erasure_code_profile=None, percent_data=None, app_name=None, op=None, allow_ec_overwrites=False): """Initialize ReplicatedPool object. Pool information is either initialized from individual keyword arguments or from a individual CephBrokerRq operation Dict. Please refer to the docstring of the ``BasePool`` class for documentation of the common parameters. :param erasure_code_profile: EC Profile to use (default: 'default') :type erasure_code_profile: Optional[str] """ # NOTE: Do not perform initialization steps that require live data from # a running cluster here. The *Pool classes may be used for validation. # The common parameters are handled in our parents initializer super(ErasurePool, self).__init__( service=service, name=name, percent_data=percent_data, app_name=app_name, op=op) if op: # Note that the different default when initializing from op stems # from different handling of this in the `charms.ceph` library. self.erasure_code_profile = op.get('erasure-profile', 'default-canonical') self.allow_ec_overwrites = op.get('allow-ec-overwrites') else: # We keep the class default when initialized from keyword arguments # to not break the API for any other consumers. self.erasure_code_profile = erasure_code_profile or 'default' self.allow_ec_overwrites = allow_ec_overwrites def _create(self): # Try to find the erasure profile information in order to properly # size the number of placement groups. The size of an erasure # coded placement group is calculated as k+m. erasure_profile = get_erasure_profile(self.service, self.erasure_code_profile) # Check for errors if erasure_profile is None: msg = ("Failed to discover erasure profile named " "{}".format(self.erasure_code_profile)) log(msg, level=ERROR) raise PoolCreationError(msg) if 'k' not in erasure_profile or 'm' not in erasure_profile: # Error msg = ("Unable to find k (data chunks) or m (coding chunks) " "in erasure profile {}".format(erasure_profile)) log(msg, level=ERROR) raise PoolCreationError(msg) k = int(erasure_profile['k']) m = int(erasure_profile['m']) pgs = self.get_pgs(k + m, self.percent_data) nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 # Create it if nautilus_or_later: cmd = [ 'ceph', '--id', self.service, 'osd', 'pool', 'create', '--pg-num-min={}'.format( min(AUTOSCALER_DEFAULT_PGS, pgs) ), self.name, str(pgs), str(pgs), 'erasure', self.erasure_code_profile ] else: cmd = [ 'ceph', '--id', self.service, 'osd', 'pool', 'create', self.name, str(pgs), str(pgs), 'erasure', self.erasure_code_profile ] check_call(cmd) def _post_create(self): super(ErasurePool, self)._post_create() if self.allow_ec_overwrites: update_pool(self.service, self.name, {'allow_ec_overwrites': 'true'}) def enabled_manager_modules(): """Return a list of enabled manager modules. :rtype: List[str] """ cmd = ['ceph', 'mgr', 'module', 'ls'] try: modules = check_output(cmd) if six.PY3: modules = modules.decode('UTF-8') except CalledProcessError as e: log("Failed to list ceph modules: {}".format(e), WARNING) return [] modules = json.loads(modules) return modules['enabled_modules'] def enable_pg_autoscale(service, pool_name): """Enable Ceph's PG autoscaler for the specified pool. :param service: The Ceph user name to run the command under :type service: str :param pool_name: The name of the pool to enable sutoscaling on :type pool_name: str :raises: CalledProcessError if the command fails """ check_call([ 'ceph', '--id', service, 'osd', 'pool', 'set', pool_name, 'pg_autoscale_mode', 'on']) def get_mon_map(service): """Return the current monitor map. :param service: The Ceph user name to run the command under :type service: str :returns: Dictionary with monitor map data :rtype: Dict[str,any] :raises: ValueError if the monmap fails to parse, CalledProcessError if our ceph command fails. """ try: mon_status = check_output(['ceph', '--id', service, 'mon_status', '--format=json']) if six.PY3: mon_status = mon_status.decode('UTF-8') try: return json.loads(mon_status) except ValueError as v: log("Unable to parse mon_status json: {}. Error: {}" .format(mon_status, str(v))) raise except CalledProcessError as e: log("mon_status command failed with message: {}" .format(str(e))) raise def hash_monitor_names(service): """Get a sorted list of monitor hashes in ascending order. Uses the get_mon_map() function to get information about the monitor cluster. Hash the name of each monitor. :param service: The Ceph user name to run the command under. :type service: str :returns: a sorted list of monitor hashes in an ascending order. :rtype : List[str] :raises: CalledProcessError, ValueError """ try: hash_list = [] monitor_list = get_mon_map(service=service) if monitor_list['monmap']['mons']: for mon in monitor_list['monmap']['mons']: hash_list.append( hashlib.sha224(mon['name'].encode('utf-8')).hexdigest()) return sorted(hash_list) else: return None except (ValueError, CalledProcessError): raise def monitor_key_delete(service, key): """Delete a key and value pair from the monitor cluster. Deletes a key value pair on the monitor cluster. :param service: The Ceph user name to run the command under :type service: str :param key: The key to delete. :type key: str :raises: CalledProcessError """ try: check_output( ['ceph', '--id', service, 'config-key', 'del', str(key)]) except CalledProcessError as e: log("Monitor config-key put failed with message: {}" .format(e.output)) raise def monitor_key_set(service, key, value): """Set a key value pair on the monitor cluster. :param service: The Ceph user name to run the command under. :type service str :param key: The key to set. :type key: str :param value: The value to set. This will be coerced into a string. :type value: str :raises: CalledProcessError """ try: check_output( ['ceph', '--id', service, 'config-key', 'put', str(key), str(value)]) except CalledProcessError as e: log("Monitor config-key put failed with message: {}" .format(e.output)) raise def monitor_key_get(service, key): """Get the value of an existing key in the monitor cluster. :param service: The Ceph user name to run the command under :type service: str :param key: The key to search for. :type key: str :return: Returns the value of that key or None if not found. :rtype: Optional[str] """ try: output = check_output( ['ceph', '--id', service, 'config-key', 'get', str(key)]).decode('UTF-8') return output except CalledProcessError as e: log("Monitor config-key get failed with message: {}" .format(e.output)) return None def monitor_key_exists(service, key): """Search for existence of key in the monitor cluster. :param service: The Ceph user name to run the command under. :type service: str :param key: The key to search for. :type key: str :return: Returns True if the key exists, False if not. :rtype: bool :raises: CalledProcessError if an unknown error occurs. """ try: check_call( ['ceph', '--id', service, 'config-key', 'exists', str(key)]) # I can return true here regardless because Ceph returns # ENOENT if the key wasn't found return True except CalledProcessError as e: if e.returncode == errno.ENOENT: return False else: log("Unknown error from ceph config-get exists: {} {}" .format(e.returncode, e.output)) raise def get_erasure_profile(service, name): """Get an existing erasure code profile if it exists. :param service: The Ceph user name to run the command under. :type service: str :param name: Name of profile. :type name: str :returns: Dictionary with profile data. :rtype: Optional[Dict[str]] """ try: out = check_output(['ceph', '--id', service, 'osd', 'erasure-code-profile', 'get', name, '--format=json']) if six.PY3: out = out.decode('UTF-8') return json.loads(out) except (CalledProcessError, OSError, ValueError): return None def pool_set(service, pool_name, key, value): """Sets a value for a RADOS pool in ceph. :param service: The Ceph user name to run the command under. :type service: str :param pool_name: Name of pool to set property on. :type pool_name: str :param key: Property key. :type key: str :param value: Value, will be coerced into str and shifted to lowercase. :type value: str :raises: CalledProcessError """ cmd = [ 'ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, str(value).lower()] check_call(cmd) def snapshot_pool(service, pool_name, snapshot_name): """Snapshots a RADOS pool in Ceph. :param service: The Ceph user name to run the command under. :type service: str :param pool_name: Name of pool to snapshot. :type pool_name: str :param snapshot_name: Name of snapshot to create. :type snapshot_name: str :raises: CalledProcessError """ cmd = [ 'ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name] check_call(cmd) def remove_pool_snapshot(service, pool_name, snapshot_name): """Remove a snapshot from a RADOS pool in Ceph. :param service: The Ceph user name to run the command under. :type service: str :param pool_name: Name of pool to remove snapshot from. :type pool_name: str :param snapshot_name: Name of snapshot to remove. :type snapshot_name: str :raises: CalledProcessError """ cmd = [ 'ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name] check_call(cmd) def set_pool_quota(service, pool_name, max_bytes=None, max_objects=None): """Set byte quota on a RADOS pool in Ceph. :param service: The Ceph user name to run the command under :type service: str :param pool_name: Name of pool :type pool_name: str :param max_bytes: Maximum bytes quota to apply :type max_bytes: int :param max_objects: Maximum objects quota to apply :type max_objects: int :raises: subprocess.CalledProcessError """ cmd = [ 'ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name] if max_bytes: cmd = cmd + ['max_bytes', str(max_bytes)] if max_objects: cmd = cmd + ['max_objects', str(max_objects)] check_call(cmd) def remove_pool_quota(service, pool_name): """Remove byte quota on a RADOS pool in Ceph. :param service: The Ceph user name to run the command under. :type service: str :param pool_name: Name of pool to remove quota from. :type pool_name: str :raises: CalledProcessError """ cmd = [ 'ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0'] check_call(cmd) def remove_erasure_profile(service, profile_name): """Remove erasure code profile. :param service: The Ceph user name to run the command under :type service: str :param profile_name: Name of profile to remove. :type profile_name: str :raises: CalledProcessError """ cmd = [ 'ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm', profile_name] check_call(cmd) def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure', failure_domain=None, data_chunks=2, coding_chunks=1, locality=None, durability_estimator=None, helper_chunks=None, scalar_mds=None, crush_locality=None, device_class=None, erasure_plugin_technique=None): """Create a new erasure code profile if one does not already exist for it. Profiles are considered immutable so will not be updated if the named profile already exists. Please refer to [0] for more details. 0: http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ :param service: The Ceph user name to run the command under. :type service: str :param profile_name: Name of profile. :type profile_name: str :param erasure_plugin_name: Erasure code plugin. :type erasure_plugin_name: str :param failure_domain: Failure domain, one of: ('chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row'). :type failure_domain: str :param data_chunks: Number of data chunks. :type data_chunks: int :param coding_chunks: Number of coding chunks. :type coding_chunks: int :param locality: Locality. :type locality: int :param durability_estimator: Durability estimator. :type durability_estimator: int :param helper_chunks: int :type helper_chunks: int :param device_class: Restrict placement to devices of specific class. :type device_class: str :param scalar_mds: one of ['isa', 'jerasure', 'shec'] :type scalar_mds: str :param crush_locality: LRC locality faulure domain, one of: ('chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row') or unset. :type crush_locaity: str :param erasure_plugin_technique: Coding technique for EC plugin :type erasure_plugin_technique: str :return: None. Can raise CalledProcessError, ValueError or AssertionError """ if erasure_profile_exists(service, profile_name): log('EC profile {} exists, skipping update'.format(profile_name), level=WARNING) return plugin_techniques = { 'jerasure': [ 'reed_sol_van', 'reed_sol_r6_op', 'cauchy_orig', 'cauchy_good', 'liberation', 'blaum_roth', 'liber8tion' ], 'lrc': [], 'isa': [ 'reed_sol_van', 'cauchy', ], 'shec': [ 'single', 'multiple' ], 'clay': [], } failure_domains = [ 'chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row', ] device_classes = [ 'ssd', 'hdd', 'nvme' ] validator(erasure_plugin_name, six.string_types, list(plugin_techniques.keys())) cmd = [ 'ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name, 'plugin={}'.format(erasure_plugin_name), 'k={}'.format(str(data_chunks)), 'm={}'.format(str(coding_chunks)), ] if erasure_plugin_technique: validator(erasure_plugin_technique, six.string_types, plugin_techniques[erasure_plugin_name]) cmd.append('technique={}'.format(erasure_plugin_technique)) luminous_or_later = cmp_pkgrevno('ceph-common', '12.0.0') >= 0 # Set failure domain from options if not provided in args if not failure_domain and config('customize-failure-domain'): # Defaults to 'host' so just need to deal with # setting 'rack' if feature is enabled failure_domain = 'rack' if failure_domain: validator(failure_domain, six.string_types, failure_domains) # failure_domain changed in luminous if luminous_or_later: cmd.append('crush-failure-domain={}'.format(failure_domain)) else: cmd.append('ruleset-failure-domain={}'.format(failure_domain)) # device class new in luminous if luminous_or_later and device_class: validator(device_class, six.string_types, device_classes) cmd.append('crush-device-class={}'.format(device_class)) else: log('Skipping device class configuration (ceph < 12.0.0)', level=DEBUG) # Add plugin specific information if erasure_plugin_name == 'lrc': # LRC mandatory configuration if locality: cmd.append('l={}'.format(str(locality))) else: raise ValueError("locality must be provided for lrc plugin") # LRC optional configuration if crush_locality: validator(crush_locality, six.string_types, failure_domains) cmd.append('crush-locality={}'.format(crush_locality)) if erasure_plugin_name == 'shec': # SHEC optional configuration if durability_estimator: cmd.append('c={}'.format((durability_estimator))) if erasure_plugin_name == 'clay': # CLAY optional configuration if helper_chunks: cmd.append('d={}'.format(str(helper_chunks))) if scalar_mds: cmd.append('scalar-mds={}'.format(scalar_mds)) check_call(cmd) def rename_pool(service, old_name, new_name): """Rename a Ceph pool from old_name to new_name. :param service: The Ceph user name to run the command under. :type service: str :param old_name: Name of pool subject to rename. :type old_name: str :param new_name: Name to rename pool to. :type new_name: str """ validator(value=old_name, valid_type=six.string_types) validator(value=new_name, valid_type=six.string_types) cmd = [ 'ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name] check_call(cmd) def erasure_profile_exists(service, name): """Check to see if an Erasure code profile already exists. :param service: The Ceph user name to run the command under :type service: str :param name: Name of profile to look for. :type name: str :returns: True if it exists, False otherwise. :rtype: bool """ validator(value=name, valid_type=six.string_types) try: check_call(['ceph', '--id', service, 'osd', 'erasure-code-profile', 'get', name]) return True except CalledProcessError: return False def get_cache_mode(service, pool_name): """Find the current caching mode of the pool_name given. :param service: The Ceph user name to run the command under :type service: str :param pool_name: Name of pool. :type pool_name: str :returns: Current cache mode. :rtype: Optional[int] """ validator(value=service, valid_type=six.string_types) validator(value=pool_name, valid_type=six.string_types) out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json']) if six.PY3: out = out.decode('UTF-8') try: osd_json = json.loads(out) for pool in osd_json['pools']: if pool['pool_name'] == pool_name: return pool['cache_mode'] return None except ValueError: raise def pool_exists(service, name): """Check to see if a RADOS pool already exists.""" try: out = check_output(['rados', '--id', service, 'lspools']) if six.PY3: out = out.decode('UTF-8') except CalledProcessError: return False return name in out.split() def get_osds(service, device_class=None): """Return a list of all Ceph Object Storage Daemons currently in the cluster (optionally filtered by storage device class). :param device_class: Class of storage device for OSD's :type device_class: str """ luminous_or_later = cmp_pkgrevno('ceph-common', '12.0.0') >= 0 if luminous_or_later and device_class: out = check_output(['ceph', '--id', service, 'osd', 'crush', 'class', 'ls-osd', device_class, '--format=json']) else: out = check_output(['ceph', '--id', service, 'osd', 'ls', '--format=json']) if six.PY3: out = out.decode('UTF-8') return json.loads(out) def install(): """Basic Ceph client installation.""" ceph_dir = "/etc/ceph" if not os.path.exists(ceph_dir): os.mkdir(ceph_dir) apt_install('ceph-common', fatal=True) def rbd_exists(service, pool, rbd_img): """Check to see if a RADOS block device exists.""" try: out = check_output(['rbd', 'list', '--id', service, '--pool', pool]) if six.PY3: out = out.decode('UTF-8') except CalledProcessError: return False return rbd_img in out def create_rbd_image(service, pool, image, sizemb): """Create a new RADOS block device.""" cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service, '--pool', pool] check_call(cmd) def update_pool(client, pool, settings): """Update pool properties. :param client: Client/User-name to authenticate with. :type client: str :param pool: Name of pool to operate on :type pool: str :param settings: Dictionary with key/value pairs to set. :type settings: Dict[str, str] :raises: CalledProcessError """ cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool] for k, v in six.iteritems(settings): check_call(cmd + [k, v]) def set_app_name_for_pool(client, pool, name): """Calls `osd pool application enable` for the specified pool name :param client: Name of the ceph client to use :type client: str :param pool: Pool to set app name for :type pool: str :param name: app name for the specified pool :type name: str :raises: CalledProcessError if ceph call fails """ if cmp_pkgrevno('ceph-common', '12.0.0') >= 0: cmd = ['ceph', '--id', client, 'osd', 'pool', 'application', 'enable', pool, name] check_call(cmd) def create_pool(service, name, replicas=3, pg_num=None): """Create a new RADOS pool.""" if pool_exists(service, name): log("Ceph pool {} already exists, skipping creation".format(name), level=WARNING) return if not pg_num: # Calculate the number of placement groups based # on upstream recommended best practices. osds = get_osds(service) if osds: pg_num = (len(osds) * 100 // replicas) else: # NOTE(james-page): Default to 200 for older ceph versions # which don't support OSD query from cli pg_num = 200 cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)] check_call(cmd) update_pool(service, name, settings={'size': str(replicas)}) def delete_pool(service, name): """Delete a RADOS pool from ceph.""" cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name, '--yes-i-really-really-mean-it'] check_call(cmd) def _keyfile_path(service): return KEYFILE.format(service) def _keyring_path(service): return KEYRING.format(service) def add_key(service, key): """Add a key to a keyring. Creates the keyring if it doesn't already exist. Logs and returns if the key is already in the keyring. """ keyring = _keyring_path(service) if os.path.exists(keyring): with open(keyring, 'r') as ring: if key in ring.read(): log('Ceph keyring exists at %s and has not changed.' % keyring, level=DEBUG) return log('Updating existing keyring %s.' % keyring, level=DEBUG) cmd = ['ceph-authtool', keyring, '--create-keyring', '--name=client.{}'.format(service), '--add-key={}'.format(key)] check_call(cmd) log('Created new ceph keyring at %s.' % keyring, level=DEBUG) def create_keyring(service, key): """Deprecated. Please use the more accurately named 'add_key'""" return add_key(service, key) def delete_keyring(service): """Delete an existing Ceph keyring.""" keyring = _keyring_path(service) if not os.path.exists(keyring): log('Keyring does not exist at %s' % keyring, level=WARNING) return os.remove(keyring) log('Deleted ring at %s.' % keyring, level=INFO) def create_key_file(service, key): """Create a file containing key.""" keyfile = _keyfile_path(service) if os.path.exists(keyfile): log('Keyfile exists at %s.' % keyfile, level=WARNING) return with open(keyfile, 'w') as fd: fd.write(key) log('Created new keyfile at %s.' % keyfile, level=INFO) def get_ceph_nodes(relation='ceph'): """Query named relation to determine current nodes.""" hosts = [] for r_id in relation_ids(relation): for unit in related_units(r_id): hosts.append(relation_get('private-address', unit=unit, rid=r_id)) return hosts def configure(service, key, auth, use_syslog): """Perform basic configuration of Ceph.""" add_key(service, key) create_key_file(service, key) hosts = get_ceph_nodes() with open('/etc/ceph/ceph.conf', 'w') as ceph_conf: ceph_conf.write(CEPH_CONF.format(auth=auth, keyring=_keyring_path(service), mon_hosts=",".join(map(str, hosts)), use_syslog=use_syslog)) modprobe('rbd') def image_mapped(name): """Determine whether a RADOS block device is mapped locally.""" try: out = check_output(['rbd', 'showmapped']) if six.PY3: out = out.decode('UTF-8') except CalledProcessError: return False return name in out def map_block_storage(service, pool, image): """Map a RADOS block device for local use.""" cmd = [ 'rbd', 'map', '{}/{}'.format(pool, image), '--user', service, '--secret', _keyfile_path(service), ] check_call(cmd) def filesystem_mounted(fs): """Determine whether a filesystem is already mounted.""" return fs in [f for f, m in mounts()] def make_filesystem(blk_device, fstype='ext4', timeout=10): """Make a new filesystem on the specified block device.""" count = 0 e_noent = errno.ENOENT while not os.path.exists(blk_device): if count >= timeout: log('Gave up waiting on block device %s' % blk_device, level=ERROR) raise IOError(e_noent, os.strerror(e_noent), blk_device) log('Waiting for block device %s to appear' % blk_device, level=DEBUG) count += 1 time.sleep(1) else: log('Formatting block device %s as filesystem %s.' % (blk_device, fstype), level=INFO) check_call(['mkfs', '-t', fstype, blk_device]) def place_data_on_block_device(blk_device, data_src_dst): """Migrate data in data_src_dst to blk_device and then remount.""" # mount block device into /mnt mount(blk_device, '/mnt') # copy data to /mnt copy_files(data_src_dst, '/mnt') # umount block device umount('/mnt') # Grab user/group ID's from original source _dir = os.stat(data_src_dst) uid = _dir.st_uid gid = _dir.st_gid # re-mount where the data should originally be # TODO: persist is currently a NO-OP in core.host mount(blk_device, data_src_dst, persist=True) # ensure original ownership of new mount. os.chown(data_src_dst, uid, gid) def copy_files(src, dst, symlinks=False, ignore=None): """Copy files from src to dst.""" for item in os.listdir(src): s = os.path.join(src, item) d = os.path.join(dst, item) if os.path.isdir(s): shutil.copytree(s, d, symlinks, ignore) else: shutil.copy2(s, d) def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point, blk_device, fstype, system_services=[], replicas=3): """NOTE: This function must only be called from a single service unit for the same rbd_img otherwise data loss will occur. Ensures given pool and RBD image exists, is mapped to a block device, and the device is formatted and mounted at the given mount_point. If formatting a device for the first time, data existing at mount_point will be migrated to the RBD device before being re-mounted. All services listed in system_services will be stopped prior to data migration and restarted when complete. """ # Ensure pool, RBD image, RBD mappings are in place. if not pool_exists(service, pool): log('Creating new pool {}.'.format(pool), level=INFO) create_pool(service, pool, replicas=replicas) if not rbd_exists(service, pool, rbd_img): log('Creating RBD image ({}).'.format(rbd_img), level=INFO) create_rbd_image(service, pool, rbd_img, sizemb) if not image_mapped(rbd_img): log('Mapping RBD Image {} as a Block Device.'.format(rbd_img), level=INFO) map_block_storage(service, pool, rbd_img) # make file system # TODO: What happens if for whatever reason this is run again and # the data is already in the rbd device and/or is mounted?? # When it is mounted already, it will fail to make the fs # XXX: This is really sketchy! Need to at least add an fstab entry # otherwise this hook will blow away existing data if its executed # after a reboot. if not filesystem_mounted(mount_point): make_filesystem(blk_device, fstype) for svc in system_services: if service_running(svc): log('Stopping services {} prior to migrating data.' .format(svc), level=DEBUG) service_stop(svc) place_data_on_block_device(blk_device, mount_point) for svc in system_services: log('Starting service {} after migrating data.' .format(svc), level=DEBUG) service_start(svc) def ensure_ceph_keyring(service, user=None, group=None, relation='ceph', key=None): """Ensures a ceph keyring is created for a named service and optionally ensures user and group ownership. @returns boolean: Flag to indicate whether a key was successfully written to disk based on either relation data or a supplied key """ if not key: for rid in relation_ids(relation): for unit in related_units(rid): key = relation_get('key', rid=rid, unit=unit) if key: break if not key: return False add_key(service=service, key=key) keyring = _keyring_path(service) if user and group: check_call(['chown', '%s.%s' % (user, group), keyring]) return True class CephBrokerRq(object): """Ceph broker request. Multiple operations can be added to a request and sent to the Ceph broker to be executed. Request is json-encoded for sending over the wire. The API is versioned and defaults to version 1. """ def __init__(self, api_version=1, request_id=None, raw_request_data=None): """Initialize CephBrokerRq object. Builds a new empty request or rebuilds a request from on-wire JSON data. :param api_version: API version for request (default: 1). :type api_version: Optional[int] :param request_id: Unique identifier for request. (default: string representation of generated UUID) :type request_id: Optional[str] :param raw_request_data: JSON-encoded string to build request from. :type raw_request_data: Optional[str] :raises: KeyError """ if raw_request_data: request_data = json.loads(raw_request_data) self.api_version = request_data['api-version'] self.request_id = request_data['request-id'] self.set_ops(request_data['ops']) else: self.api_version = api_version if request_id: self.request_id = request_id else: self.request_id = str(uuid.uuid1()) self.ops = [] def add_op(self, op): """Add an op if it is not already in the list. :param op: Operation to add. :type op: dict """ if op not in self.ops: self.ops.append(op) def add_op_request_access_to_group(self, name, namespace=None, permission=None, key_name=None, object_prefix_permissions=None): """ Adds the requested permissions to the current service's Ceph key, allowing the key to access only the specified pools or object prefixes. object_prefix_permissions should be a dictionary keyed on the permission with the corresponding value being a list of prefixes to apply that permission to. { 'rwx': ['prefix1', 'prefix2'], 'class-read': ['prefix3']} """ self.add_op({ 'op': 'add-permissions-to-key', 'group': name, 'namespace': namespace, 'name': key_name or service_name(), 'group-permission': permission, 'object-prefix-permissions': object_prefix_permissions}) def add_op_create_pool(self, name, replica_count=3, pg_num=None, weight=None, group=None, namespace=None, app_name=None, max_bytes=None, max_objects=None): """DEPRECATED: Use ``add_op_create_replicated_pool()`` or ``add_op_create_erasure_pool()`` instead. """ return self.add_op_create_replicated_pool( name, replica_count=replica_count, pg_num=pg_num, weight=weight, group=group, namespace=namespace, app_name=app_name, max_bytes=max_bytes, max_objects=max_objects) # Use function parameters and docstring to define types in a compatible # manner. # # NOTE: Our caller should always use a kwarg Dict when calling us so # no need to maintain fixed order/position for parameters. Please keep them # sorted by name when adding new ones. def _partial_build_common_op_create(self, app_name=None, compression_algorithm=None, compression_mode=None, compression_required_ratio=None, compression_min_blob_size=None, compression_min_blob_size_hdd=None, compression_min_blob_size_ssd=None, compression_max_blob_size=None, compression_max_blob_size_hdd=None, compression_max_blob_size_ssd=None, group=None, max_bytes=None, max_objects=None, namespace=None, rbd_mirroring_mode='pool', weight=None): """Build common part of a create pool operation. :param app_name: Tag pool with application name. Note that there is certain protocols emerging upstream with regard to meaningful application names to use. Examples are 'rbd' and 'rgw'. :type app_name: Optional[str] :param compression_algorithm: Compressor to use, one of: ('lz4', 'snappy', 'zlib', 'zstd') :type compression_algorithm: Optional[str] :param compression_mode: When to compress data, one of: ('none', 'passive', 'aggressive', 'force') :type compression_mode: Optional[str] :param compression_required_ratio: Minimum compression ratio for data chunk, if the requested ratio is not achieved the compressed version will be thrown away and the original stored. :type compression_required_ratio: Optional[float] :param compression_min_blob_size: Chunks smaller than this are never compressed (unit: bytes). :type compression_min_blob_size: Optional[int] :param compression_min_blob_size_hdd: Chunks smaller than this are not compressed when destined to rotational media (unit: bytes). :type compression_min_blob_size_hdd: Optional[int] :param compression_min_blob_size_ssd: Chunks smaller than this are not compressed when destined to flash media (unit: bytes). :type compression_min_blob_size_ssd: Optional[int] :param compression_max_blob_size: Chunks larger than this are broken into N * compression_max_blob_size chunks before being compressed (unit: bytes). :type compression_max_blob_size: Optional[int] :param compression_max_blob_size_hdd: Chunks larger than this are broken into N * compression_max_blob_size_hdd chunks before being compressed when destined for rotational media (unit: bytes) :type compression_max_blob_size_hdd: Optional[int] :param compression_max_blob_size_ssd: Chunks larger than this are broken into N * compression_max_blob_size_ssd chunks before being compressed when destined for flash media (unit: bytes). :type compression_max_blob_size_ssd: Optional[int] :param group: Group to add pool to :type group: Optional[str] :param max_bytes: Maximum bytes quota to apply :type max_bytes: Optional[int] :param max_objects: Maximum objects quota to apply :type max_objects: Optional[int] :param namespace: Group namespace :type namespace: Optional[str] :param rbd_mirroring_mode: Pool mirroring mode used when Ceph RBD mirroring is enabled. :type rbd_mirroring_mode: Optional[str] :param weight: The percentage of data that is expected to be contained in the pool from the total available space on the OSDs. Used to calculate number of Placement Groups to create for pool. :type weight: Optional[float] :returns: Dictionary with kwarg name as key. :rtype: Dict[str,any] :raises: AssertionError """ return { 'app-name': app_name, 'compression-algorithm': compression_algorithm, 'compression-mode': compression_mode, 'compression-required-ratio': compression_required_ratio, 'compression-min-blob-size': compression_min_blob_size, 'compression-min-blob-size-hdd': compression_min_blob_size_hdd, 'compression-min-blob-size-ssd': compression_min_blob_size_ssd, 'compression-max-blob-size': compression_max_blob_size, 'compression-max-blob-size-hdd': compression_max_blob_size_hdd, 'compression-max-blob-size-ssd': compression_max_blob_size_ssd, 'group': group, 'max-bytes': max_bytes, 'max-objects': max_objects, 'group-namespace': namespace, 'rbd-mirroring-mode': rbd_mirroring_mode, 'weight': weight, } def add_op_create_replicated_pool(self, name, replica_count=3, pg_num=None, **kwargs): """Adds an operation to create a replicated pool. Refer to docstring for ``_partial_build_common_op_create`` for documentation of keyword arguments. :param name: Name of pool to create :type name: str :param replica_count: Number of copies Ceph should keep of your data. :type replica_count: int :param pg_num: Request specific number of Placement Groups to create for pool. :type pg_num: int :raises: AssertionError if provided data is of invalid type/range """ if pg_num and kwargs.get('weight'): raise ValueError('pg_num and weight are mutually exclusive') op = { 'op': 'create-pool', 'name': name, 'replicas': replica_count, 'pg_num': pg_num, } op.update(self._partial_build_common_op_create(**kwargs)) # Initialize Pool-object to validate type and range of ops. pool = ReplicatedPool('dummy-service', op=op) pool.validate() self.add_op(op) def add_op_create_erasure_pool(self, name, erasure_profile=None, allow_ec_overwrites=False, **kwargs): """Adds an operation to create a erasure coded pool. Refer to docstring for ``_partial_build_common_op_create`` for documentation of keyword arguments. :param name: Name of pool to create :type name: str :param erasure_profile: Name of erasure code profile to use. If not set the ceph-mon unit handling the broker request will set its default value. :type erasure_profile: str :param allow_ec_overwrites: allow EC pools to be overridden :type allow_ec_overwrites: bool :raises: AssertionError if provided data is of invalid type/range """ op = { 'op': 'create-pool', 'name': name, 'pool-type': 'erasure', 'erasure-profile': erasure_profile, 'allow-ec-overwrites': allow_ec_overwrites, } op.update(self._partial_build_common_op_create(**kwargs)) # Initialize Pool-object to validate type and range of ops. pool = ErasurePool('dummy-service', op) pool.validate() self.add_op(op) def add_op_create_erasure_profile(self, name, erasure_type='jerasure', erasure_technique=None, k=None, m=None, failure_domain=None, lrc_locality=None, shec_durability_estimator=None, clay_helper_chunks=None, device_class=None, clay_scalar_mds=None, lrc_crush_locality=None): """Adds an operation to create a erasure coding profile. :param name: Name of profile to create :type name: str :param erasure_type: Which of the erasure coding plugins should be used :type erasure_type: string :param erasure_technique: EC plugin technique to use :type erasure_technique: string :param k: Number of data chunks :type k: int :param m: Number of coding chunks :type m: int :param lrc_locality: Group the coding and data chunks into sets of size locality (lrc plugin) :type lrc_locality: int :param durability_estimator: The number of parity chunks each of which includes a data chunk in its calculation range (shec plugin) :type durability_estimator: int :param helper_chunks: The number of helper chunks to use for recovery operations (clay plugin) :type: helper_chunks: int :param failure_domain: Type of failure domain from Ceph bucket types to be used :type failure_domain: string :param device_class: Device class to use for profile (ssd, hdd) :type device_class: string :param clay_scalar_mds: Plugin to use for CLAY layered construction (jerasure|isa|shec) :type clay_scaler_mds: string :param lrc_crush_locality: Type of crush bucket in which set of chunks defined by lrc_locality will be stored. :type lrc_crush_locality: string """ self.add_op({'op': 'create-erasure-profile', 'name': name, 'k': k, 'm': m, 'l': lrc_locality, 'c': shec_durability_estimator, 'd': clay_helper_chunks, 'erasure-type': erasure_type, 'erasure-technique': erasure_technique, 'failure-domain': failure_domain, 'device-class': device_class, 'scalar-mds': clay_scalar_mds, 'crush-locality': lrc_crush_locality}) def set_ops(self, ops): """Set request ops to provided value. Useful for injecting ops that come from a previous request to allow comparisons to ensure validity. """ self.ops = ops @property def request(self): return json.dumps({'api-version': self.api_version, 'ops': self.ops, 'request-id': self.request_id}) def _ops_equal(self, other): keys_to_compare = [ 'replicas', 'name', 'op', 'pg_num', 'group-permission', 'object-prefix-permissions', ] keys_to_compare += list(self._partial_build_common_op_create().keys()) if len(self.ops) == len(other.ops): for req_no in range(0, len(self.ops)): for key in keys_to_compare: if self.ops[req_no].get(key) != other.ops[req_no].get(key): return False else: return False return True def __eq__(self, other): if not isinstance(other, self.__class__): return False if self.api_version == other.api_version and \ self._ops_equal(other): return True else: return False def __ne__(self, other): return not self.__eq__(other) class CephBrokerRsp(object): """Ceph broker response. Response is json-decoded and contents provided as methods/properties. The API is versioned and defaults to version 1. """ def __init__(self, encoded_rsp): self.api_version = None self.rsp = json.loads(encoded_rsp) @property def request_id(self): return self.rsp.get('request-id') @property def exit_code(self): return self.rsp.get('exit-code') @property def exit_msg(self): return self.rsp.get('stderr') # Ceph Broker Conversation: # If a charm needs an action to be taken by ceph it can create a CephBrokerRq # and send that request to ceph via the ceph relation. The CephBrokerRq has a # unique id so that the client can identity which CephBrokerRsp is associated # with the request. Ceph will also respond to each client unit individually # creating a response key per client unit eg glance/0 will get a CephBrokerRsp # via key broker-rsp-glance-0 # # To use this the charm can just do something like: # # from charmhelpers.contrib.storage.linux.ceph import ( # send_request_if_needed, # is_request_complete, # CephBrokerRq, # ) # # @hooks.hook('ceph-relation-changed') # def ceph_changed(): # rq = CephBrokerRq() # rq.add_op_create_pool(name='poolname', replica_count=3) # # if is_request_complete(rq): # # else: # send_request_if_needed(get_ceph_request()) # # CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example # of glance having sent a request to ceph which ceph has successfully processed # 'ceph:8': { # 'ceph/0': { # 'auth': 'cephx', # 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}', # 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}', # 'ceph-public-address': '10.5.44.103', # 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==', # 'private-address': '10.5.44.103', # }, # 'glance/0': { # 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", ' # '"ops": [{"replicas": 3, "name": "glance", ' # '"op": "create-pool"}]}'), # 'private-address': '10.5.44.109', # }, # } def get_previous_request(rid): """Return the last ceph broker request sent on a given relation :param rid: Relation id to query for request :type rid: str :returns: CephBrokerRq object or None if relation data not found. :rtype: Optional[CephBrokerRq] """ broker_req = relation_get(attribute='broker_req', rid=rid, unit=local_unit()) if broker_req: return CephBrokerRq(raw_request_data=broker_req) def get_request_states(request, relation='ceph'): """Return a dict of requests per relation id with their corresponding completion state. This allows a charm, which has a request for ceph, to see whether there is an equivalent request already being processed and if so what state that request is in. @param request: A CephBrokerRq object """ complete = [] requests = {} for rid in relation_ids(relation): complete = False previous_request = get_previous_request(rid) if request == previous_request: sent = True complete = is_request_complete_for_rid(previous_request, rid) else: sent = False complete = False requests[rid] = { 'sent': sent, 'complete': complete, } return requests def is_request_sent(request, relation='ceph'): """Check to see if a functionally equivalent request has already been sent Returns True if a similair request has been sent @param request: A CephBrokerRq object """ states = get_request_states(request, relation=relation) for rid in states.keys(): if not states[rid]['sent']: return False return True def is_request_complete(request, relation='ceph'): """Check to see if a functionally equivalent request has already been completed Returns True if a similair request has been completed @param request: A CephBrokerRq object """ states = get_request_states(request, relation=relation) for rid in states.keys(): if not states[rid]['complete']: return False return True def is_request_complete_for_rid(request, rid): """Check if a given request has been completed on the given relation @param request: A CephBrokerRq object @param rid: Relation ID """ broker_key = get_broker_rsp_key() for unit in related_units(rid): rdata = relation_get(rid=rid, unit=unit) if rdata.get(broker_key): rsp = CephBrokerRsp(rdata.get(broker_key)) if rsp.request_id == request.request_id: if not rsp.exit_code: return True else: # The remote unit sent no reply targeted at this unit so either the # remote ceph cluster does not support unit targeted replies or it # has not processed our request yet. if rdata.get('broker_rsp'): request_data = json.loads(rdata['broker_rsp']) if request_data.get('request-id'): log('Ignoring legacy broker_rsp without unit key as remote ' 'service supports unit specific replies', level=DEBUG) else: log('Using legacy broker_rsp as remote service does not ' 'supports unit specific replies', level=DEBUG) rsp = CephBrokerRsp(rdata['broker_rsp']) if not rsp.exit_code: return True return False def get_broker_rsp_key(): """Return broker response key for this unit This is the key that ceph is going to use to pass request status information back to this unit """ return 'broker-rsp-' + local_unit().replace('/', '-') def send_request_if_needed(request, relation='ceph'): """Send broker request if an equivalent request has not already been sent @param request: A CephBrokerRq object """ if is_request_sent(request, relation=relation): log('Request already sent but not complete, not sending new request', level=DEBUG) else: for rid in relation_ids(relation): log('Sending request {}'.format(request.request_id), level=DEBUG) relation_set(relation_id=rid, broker_req=request.request) relation_set(relation_id=rid, relation_settings={'unit-name': local_unit()}) def has_broker_rsp(rid=None, unit=None): """Return True if the broker_rsp key is 'truthy' (i.e. set to something) in the relation data. :param rid: The relation to check (default of None means current relation) :type rid: Union[str, None] :param unit: The remote unit to check (default of None means current unit) :type unit: Union[str, None] :returns: True if broker key exists and is set to something 'truthy' :rtype: bool """ rdata = relation_get(rid=rid, unit=unit) or {} broker_rsp = rdata.get(get_broker_rsp_key()) return True if broker_rsp else False def is_broker_action_done(action, rid=None, unit=None): """Check whether broker action has completed yet. @param action: name of action to be performed @returns True if action complete otherwise False """ rdata = relation_get(rid=rid, unit=unit) or {} broker_rsp = rdata.get(get_broker_rsp_key()) if not broker_rsp: return False rsp = CephBrokerRsp(broker_rsp) unit_name = local_unit().partition('/')[2] key = "unit_{}_ceph_broker_action.{}".format(unit_name, action) kvstore = kv() val = kvstore.get(key=key) if val and val == rsp.request_id: return True return False def mark_broker_action_done(action, rid=None, unit=None): """Mark action as having been completed. @param action: name of action to be performed @returns None """ rdata = relation_get(rid=rid, unit=unit) or {} broker_rsp = rdata.get(get_broker_rsp_key()) if not broker_rsp: return rsp = CephBrokerRsp(broker_rsp) unit_name = local_unit().partition('/')[2] key = "unit_{}_ceph_broker_action.{}".format(unit_name, action) kvstore = kv() kvstore.set(key=key, value=rsp.request_id) kvstore.flush() class CephConfContext(object): """Ceph config (ceph.conf) context. Supports user-provided Ceph configuration settings. Use can provide a dictionary as the value for the config-flags charm option containing Ceph configuration settings keyede by their section in ceph.conf. """ def __init__(self, permitted_sections=None): self.permitted_sections = permitted_sections or [] def __call__(self): conf = config('config-flags') if not conf: return {} conf = config_flags_parser(conf) if not isinstance(conf, dict): log("Provided config-flags is not a dictionary - ignoring", level=WARNING) return {} permitted = self.permitted_sections if permitted: diff = set(conf.keys()).difference(set(permitted)) if diff: log("Config-flags contains invalid keys '%s' - they will be " "ignored" % (', '.join(diff)), level=WARNING) ceph_conf = {} for key in conf: if permitted and key not in permitted: log("Ignoring key '%s'" % key, level=WARNING) continue ceph_conf[key] = conf[key] return ceph_conf class CephOSDConfContext(CephConfContext): """Ceph config (ceph.conf) context. Consolidates settings from config-flags via CephConfContext with settings provided by the mons. The config-flag values are preserved in conf['osd'], settings from the mons which do not clash with config-flag settings are in conf['osd_from_client'] and finally settings which do clash are in conf['osd_from_client_conflict']. Rather than silently drop the conflicting settings they are provided in the context so they can be rendered commented out to give some visibility to the admin. """ def __init__(self, permitted_sections=None): super(CephOSDConfContext, self).__init__( permitted_sections=permitted_sections) try: self.settings_from_mons = get_osd_settings('mon') except OSDSettingConflict: log( "OSD settings from mons are inconsistent, ignoring them", level=WARNING) self.settings_from_mons = {} def filter_osd_from_mon_settings(self): """Filter settings from client relation against config-flags. :returns: A tuple ( ,config-flag values, ,client settings which do not conflict with config-flag values, ,client settings which confilct with config-flag values) :rtype: (OrderedDict, OrderedDict, OrderedDict) """ ceph_conf = super(CephOSDConfContext, self).__call__() conflicting_entries = {} clear_entries = {} for key, value in self.settings_from_mons.items(): if key in ceph_conf.get('osd', {}): if ceph_conf['osd'][key] != value: conflicting_entries[key] = value else: clear_entries[key] = value clear_entries = _order_dict_by_key(clear_entries) conflicting_entries = _order_dict_by_key(conflicting_entries) return ceph_conf, clear_entries, conflicting_entries def __call__(self): """Construct OSD config context. Standard context with two additional special keys. osd_from_client_conflict: client settings which confilct with config-flag values osd_from_client: settings which do not conflict with config-flag values :returns: OSD config context dict. :rtype: dict """ conf, osd_clear, osd_conflict = self.filter_osd_from_mon_settings() conf['osd_from_client_conflict'] = osd_conflict conf['osd_from_client'] = osd_clear return conf