diff --git a/charmhelpers/contrib/openstack/context.py b/charmhelpers/contrib/openstack/context.py index 42abccf7..54aed7ff 100644 --- a/charmhelpers/contrib/openstack/context.py +++ b/charmhelpers/contrib/openstack/context.py @@ -29,6 +29,8 @@ from subprocess import check_call, CalledProcessError import six +import charmhelpers.contrib.storage.linux.ceph as ch_ceph + from charmhelpers.contrib.openstack.audits.openstack_security_guide import ( _config_ini as config_ini ) @@ -56,6 +58,7 @@ from charmhelpers.core.hookenv import ( status_set, network_get_primary_address, WARNING, + service_name, ) from charmhelpers.core.sysctl import create as sysctl_create @@ -808,6 +811,12 @@ class CephContext(OSContextGenerator): ctxt['mon_hosts'] = ' '.join(sorted(mon_hosts)) + if config('pool-type') and config('pool-type') == 'erasure-coded': + base_pool_name = config('rbd-pool') or config('rbd-pool-name') + if not base_pool_name: + base_pool_name = service_name() + ctxt['rbd_default_data_pool'] = base_pool_name + if not os.path.isdir('/etc/ceph'): os.mkdir('/etc/ceph') @@ -3175,3 +3184,90 @@ class SRIOVContext(OSContextGenerator): :rtype: Dict[str,int] """ return self._map + + +class CephBlueStoreCompressionContext(OSContextGenerator): + """Ceph BlueStore compression options.""" + + # Tuple with Tuples that map configuration option name to CephBrokerRq op + # property name + options = ( + ('bluestore-compression-algorithm', + 'compression-algorithm'), + ('bluestore-compression-mode', + 'compression-mode'), + ('bluestore-compression-required-ratio', + 'compression-required-ratio'), + ('bluestore-compression-min-blob-size', + 'compression-min-blob-size'), + ('bluestore-compression-min-blob-size-hdd', + 'compression-min-blob-size-hdd'), + ('bluestore-compression-min-blob-size-ssd', + 'compression-min-blob-size-ssd'), + ('bluestore-compression-max-blob-size', + 'compression-max-blob-size'), + ('bluestore-compression-max-blob-size-hdd', + 'compression-max-blob-size-hdd'), + ('bluestore-compression-max-blob-size-ssd', + 'compression-max-blob-size-ssd'), + ) + + def __init__(self): + """Initialize context by loading values from charm config. + + We keep two maps, one suitable for use with CephBrokerRq's and one + suitable for template generation. + """ + charm_config = config() + + # CephBrokerRq op map + self.op = {} + # Context exposed for template generation + self.ctxt = {} + for config_key, op_key in self.options: + value = charm_config.get(config_key) + self.ctxt.update({config_key.replace('-', '_'): value}) + self.op.update({op_key: value}) + + def __call__(self): + """Get context. + + :returns: Context + :rtype: Dict[str,any] + """ + return self.ctxt + + def get_op(self): + """Get values for use in CephBrokerRq op. + + :returns: Context values with CephBrokerRq op property name as key. + :rtype: Dict[str,any] + """ + return self.op + + def get_kwargs(self): + """Get values for use as keyword arguments. + + :returns: Context values with key suitable for use as kwargs to + CephBrokerRq add_op_create_*_pool methods. + :rtype: Dict[str,any] + """ + return { + k.replace('-', '_'): v + for k, v in self.op.items() + } + + def validate(self): + """Validate options. + + :raises: AssertionError + """ + # We slip in a dummy name on class instantiation to allow validation of + # the other options. It will not affect further use. + # + # NOTE: once we retire Python 3.5 we can fold this into a in-line + # dictionary comprehension in the call to the initializer. + dummy_op = {'name': 'dummy-name'} + dummy_op.update(self.op) + pool = ch_ceph.BasePool('dummy-service', op=dummy_op) + pool.validate() diff --git a/charmhelpers/contrib/openstack/templates/ceph.conf b/charmhelpers/contrib/openstack/templates/ceph.conf index a11ce8ab..c0f22360 100644 --- a/charmhelpers/contrib/openstack/templates/ceph.conf +++ b/charmhelpers/contrib/openstack/templates/ceph.conf @@ -22,3 +22,7 @@ rbd default features = {{ rbd_features }} {{ key }} = {{ value }} {% endfor -%} {%- endif %} + +{% if rbd_default_data_pool -%} +rbd default data pool = {{ rbd_default_data_pool }} +{% endif %} diff --git a/charmhelpers/contrib/openstack/templates/section-ceph-bluestore-compression b/charmhelpers/contrib/openstack/templates/section-ceph-bluestore-compression new file mode 100644 index 00000000..a6430100 --- /dev/null +++ b/charmhelpers/contrib/openstack/templates/section-ceph-bluestore-compression @@ -0,0 +1,28 @@ +{# section header omitted as options can belong to multiple sections #} +{% if bluestore_compression_algorithm -%} +bluestore compression algorithm = {{ bluestore_compression_algorithm }} +{% endif -%} +{% if bluestore_compression_mode -%} +bluestore compression mode = {{ bluestore_compression_mode }} +{% endif -%} +{% if bluestore_compression_required_ratio -%} +bluestore compression required ratio = {{ bluestore_compression_required_ratio }} +{% endif -%} +{% if bluestore_compression_min_blob_size -%} +bluestore compression min blob size = {{ bluestore_compression_min_blob_size }} +{% endif -%} +{% if bluestore_compression_min_blob_size_hdd -%} +bluestore compression min blob size hdd = {{ bluestore_compression_min_blob_size_hdd }} +{% endif -%} +{% if bluestore_compression_min_blob_size_ssd -%} +bluestore compression min blob size ssd = {{ bluestore_compression_min_blob_size_ssd }} +{% endif -%} +{% if bluestore_compression_max_blob_size -%} +bluestore compression max blob size = {{ bluestore_compression_max_blob_size }} +{% endif -%} +{% if bluestore_compression_max_blob_size_hdd -%} +bluestore compression max blob size hdd = {{ bluestore_compression_max_blob_size_hdd }} +{% endif -%} +{% if bluestore_compression_max_blob_size_ssd -%} +bluestore compression max blob size ssd = {{ bluestore_compression_max_blob_size_ssd }} +{% endif -%} diff --git a/charmhelpers/contrib/storage/linux/ceph.py b/charmhelpers/contrib/storage/linux/ceph.py index 814d5c72..526b95ad 100644 --- a/charmhelpers/contrib/storage/linux/ceph.py +++ b/charmhelpers/contrib/storage/linux/ceph.py @@ -39,6 +39,7 @@ from subprocess import ( check_output, CalledProcessError, ) +from charmhelpers import deprecate from charmhelpers.core.hookenv import ( config, service_name, @@ -178,94 +179,293 @@ def send_osd_settings(): def validator(value, valid_type, valid_range=None): - """ - Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values + """Helper function for type validation. + + Used to validate these: + https://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values + https://docs.ceph.com/docs/master/rados/configuration/bluestore-config-ref/#inline-compression + Example input: validator(value=1, valid_type=int, valid_range=[0, 2]) + This says I'm testing value=1. It must be an int inclusive in [0,2] - :param value: The value to validate + :param value: The value to validate. + :type value: any :param valid_type: The type that value should be. + :type valid_type: any :param valid_range: A range of values that value can assume. - :return: + :type valid_range: Optional[Union[List,Tuple]] + :raises: AssertionError, ValueError """ - assert isinstance(value, valid_type), "{} is not a {}".format( - value, - valid_type) + assert isinstance(value, valid_type), ( + "{} is not a {}".format(value, valid_type)) if valid_range is not None: - assert isinstance(valid_range, list), \ - "valid_range must be a list, was given {}".format(valid_range) + assert isinstance( + valid_range, list) or isinstance(valid_range, tuple), ( + "valid_range must be of type List or Tuple, " + "was given {} of type {}" + .format(valid_range, type(valid_range))) # If we're dealing with strings if isinstance(value, six.string_types): - assert value in valid_range, \ - "{} is not in the list {}".format(value, valid_range) + assert value in valid_range, ( + "{} is not in the list {}".format(value, valid_range)) # Integer, float should have a min and max else: if len(valid_range) != 2: raise ValueError( - "Invalid valid_range list of {} for {}. " + "Invalid valid_range list of {} for {}. " "List must be [min,max]".format(valid_range, value)) - assert value >= valid_range[0], \ - "{} is less than minimum allowed value of {}".format( - value, valid_range[0]) - assert value <= valid_range[1], \ - "{} is greater than maximum allowed value of {}".format( - value, valid_range[1]) + assert value >= valid_range[0], ( + "{} is less than minimum allowed value of {}" + .format(value, valid_range[0])) + assert value <= valid_range[1], ( + "{} is greater than maximum allowed value of {}" + .format(value, valid_range[1])) class PoolCreationError(Exception): - """ - A custom error to inform the caller that a pool creation failed. Provides an error message + """A custom exception to inform the caller that a pool creation failed. + + Provides an error message """ def __init__(self, message): super(PoolCreationError, self).__init__(message) -class Pool(object): - """ - An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool. - Do not call create() on this base class as it will not do anything. Instantiate a child class and call create(). - """ +class BasePool(object): + """An object oriented approach to Ceph pool creation. - def __init__(self, service, name): + This base class is inherited by ReplicatedPool and ErasurePool. Do not call + create() on this base class as it will raise an exception. + + Instantiate a child class and call create(). + """ + # Dictionary that maps pool operation properties to Tuples with valid type + # and valid range + op_validation_map = { + 'compression-algorithm': (str, ('lz4', 'snappy', 'zlib', 'zstd')), + 'compression-mode': (str, ('none', 'passive', 'aggressive', 'force')), + 'compression-required-ratio': (float, None), + 'compression-min-blob-size': (int, None), + 'compression-min-blob-size-hdd': (int, None), + 'compression-min-blob-size-ssd': (int, None), + 'compression-max-blob-size': (int, None), + 'compression-max-blob-size-hdd': (int, None), + 'compression-max-blob-size-ssd': (int, None), + } + + def __init__(self, service, name=None, percent_data=None, app_name=None, + op=None): + """Initialize BasePool object. + + Pool information is either initialized from individual keyword + arguments or from a individual CephBrokerRq operation Dict. + + :param service: The Ceph user name to run commands under. + :type service: str + :param name: Name of pool to operate on. + :type name: str + :param percent_data: The expected pool size in relation to all + available resources in the Ceph cluster. Will be + used to set the ``target_size_ratio`` pool + property. (default: 10.0) + :type percent_data: Optional[float] + :param app_name: Ceph application name, usually one of: + ('cephfs', 'rbd', 'rgw') (default: 'unknown') + :type app_name: Optional[str] + :param op: Broker request Op to compile pool data from. + :type op: Optional[Dict[str,any]] + :raises: KeyError + """ + # NOTE: Do not perform initialization steps that require live data from + # a running cluster here. The *Pool classes may be used for validation. self.service = service - self.name = name + self.nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 + self.op = op or {} + + if op: + # When initializing from op the `name` attribute is required and we + # will fail with KeyError if it is not provided. + self.name = op['name'] + self.percent_data = op.get('weight') + self.app_name = op.get('app-name') + else: + self.name = name + self.percent_data = percent_data + self.app_name = app_name + + # Set defaults for these if they are not provided + self.percent_data = self.percent_data or 10.0 + self.app_name = self.app_name or 'unknown' + + def validate(self): + """Check that value of supplied operation parameters are valid. + + :raises: ValueError + """ + for op_key, op_value in self.op.items(): + if op_key in self.op_validation_map and op_value is not None: + valid_type, valid_range = self.op_validation_map[op_key] + try: + validator(op_value, valid_type, valid_range) + except (AssertionError, ValueError) as e: + # Normalize on ValueError, also add information about which + # variable we had an issue with. + raise ValueError("'{}': {}".format(op_key, str(e))) + + def _create(self): + """Perform the pool creation, method MUST be overridden by child class. + """ + raise NotImplementedError + + def _post_create(self): + """Perform common post pool creation tasks. + + Note that pool properties subject to change during the lifetime of a + pool / deployment should go into the ``update`` method. + + Do not add calls for a specific pool type here, those should go into + one of the pool specific classes. + """ + if self.nautilus_or_later: + # Ensure we set the expected pool ratio + update_pool( + client=self.service, + pool=self.name, + settings={ + 'target_size_ratio': str( + self.percent_data / 100.0), + }) + try: + set_app_name_for_pool(client=self.service, + pool=self.name, + name=self.app_name) + except CalledProcessError: + log('Could not set app name for pool {}' + .format(self.name), + level=WARNING) + if 'pg_autoscaler' in enabled_manager_modules(): + try: + enable_pg_autoscale(self.service, self.name) + except CalledProcessError as e: + log('Could not configure auto scaling for pool {}: {}' + .format(self.name, e), + level=WARNING) - # Create the pool if it doesn't exist already - # To be implemented by subclasses def create(self): - pass + """Create pool and perform any post pool creation tasks. + + To allow for sharing of common code among pool specific classes the + processing has been broken out into the private methods ``_create`` + and ``_post_create``. + + Do not add any pool type specific handling here, that should go into + one of the pool specific classes. + """ + if not pool_exists(self.service, self.name): + self.validate() + self._create() + self._post_create() + self.update() + + def set_quota(self): + """Set a quota if requested. + + :raises: CalledProcessError + """ + max_bytes = self.op.get('max-bytes') + max_objects = self.op.get('max-objects') + if max_bytes or max_objects: + set_pool_quota(service=self.service, pool_name=self.name, + max_bytes=max_bytes, max_objects=max_objects) + + def set_compression(self): + """Set compression properties if requested. + + :raises: CalledProcessError + """ + compression_properties = { + key.replace('-', '_'): value + for key, value in self.op.items() + if key in ( + 'compression-algorithm', + 'compression-mode', + 'compression-required-ratio', + 'compression-min-blob-size', + 'compression-min-blob-size-hdd', + 'compression-min-blob-size-ssd', + 'compression-max-blob-size', + 'compression-max-blob-size-hdd', + 'compression-max-blob-size-ssd') and value} + if compression_properties: + update_pool(self.service, self.name, compression_properties) + + def update(self): + """Update properties for an already existing pool. + + Do not add calls for a specific pool type here, those should go into + one of the pool specific classes. + """ + self.validate() + self.set_quota() + self.set_compression() def add_cache_tier(self, cache_pool, mode): - """ - Adds a new cache tier to an existing pool. - :param cache_pool: six.string_types. The cache tier pool name to add. - :param mode: six.string_types. The caching mode to use for this pool. valid range = ["readonly", "writeback"] - :return: None + """Adds a new cache tier to an existing pool. + + :param cache_pool: The cache tier pool name to add. + :type cache_pool: str + :param mode: The caching mode to use for this pool. + valid range = ["readonly", "writeback"] + :type mode: str """ # Check the input types and values validator(value=cache_pool, valid_type=six.string_types) - validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"]) + validator( + value=mode, valid_type=six.string_types, + valid_range=["readonly", "writeback"]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool]) - check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom']) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'add', self.name, cache_pool, + ]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'cache-mode', cache_pool, mode, + ]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'set-overlay', self.name, cache_pool, + ]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom', + ]) def remove_cache_tier(self, cache_pool): - """ - Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete. - :param cache_pool: six.string_types. The cache tier pool name to remove. - :return: None + """Removes a cache tier from Ceph. + + Flushes all dirty objects from writeback pools and waits for that to + complete. + + :param cache_pool: The cache tier pool name to remove. + :type cache_pool: str """ # read-only is easy, writeback is much harder mode = get_cache_mode(self.service, cache_pool) if mode == 'readonly': - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none']) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'cache-mode', cache_pool, 'none' + ]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'remove', self.name, cache_pool, + ]) elif mode == 'writeback': pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier', @@ -276,9 +476,15 @@ class Pool(object): check_call(pool_forward_cmd) # Flush the cache and wait for it to return - check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all']) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool]) + check_call([ + 'rados', '--id', self.service, + '-p', cache_pool, 'cache-flush-evict-all']) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'remove-overlay', self.name]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'remove', self.name, cache_pool]) def get_pgs(self, pool_size, percent_data=DEFAULT_POOL_WEIGHT, device_class=None): @@ -305,19 +511,23 @@ class Pool(object): selected for the specific rule, rather it is left to the user to tune in the form of 'expected-osd-count' config option. - :param pool_size: int. pool_size is either the number of replicas for + :param pool_size: pool_size is either the number of replicas for replicated pools or the K+M sum for erasure coded pools - :param percent_data: float. the percentage of data that is expected to + :type pool_size: int + :param percent_data: the percentage of data that is expected to be contained in the pool for the specific OSD set. Default value is to assume 10% of the data is for this pool, which is a relatively low % of the data but allows for the pg_num to be increased. NOTE: the default is primarily to handle the scenario where related charms requiring pools has not been upgraded to include an update to indicate their relative usage of the pools. - :param device_class: str. class of storage to use for basis of pgs + :type percent_data: float + :param device_class: class of storage to use for basis of pgs calculation; ceph supports nvme, ssd and hdd by default based on presence of devices of each type in the deployment. - :return: int. The number of pgs to use. + :type device_class: str + :returns: The number of pgs to use. + :rtype: int """ # Note: This calculation follows the approach that is provided @@ -357,7 +567,8 @@ class Pool(object): return LEGACY_PG_COUNT percent_data /= 100.0 - target_pgs_per_osd = config('pgs-per-osd') or DEFAULT_PGS_PER_OSD_TARGET + target_pgs_per_osd = config( + 'pgs-per-osd') or DEFAULT_PGS_PER_OSD_TARGET num_pg = (target_pgs_per_osd * osd_count * percent_data) // pool_size # NOTE: ensure a sane minimum number of PGS otherwise we don't get any @@ -380,147 +591,174 @@ class Pool(object): return int(nearest) -class ReplicatedPool(Pool): - def __init__(self, service, name, pg_num=None, replicas=2, - percent_data=10.0, app_name=None): - super(ReplicatedPool, self).__init__(service=service, name=name) - self.replicas = replicas - self.percent_data = percent_data - if pg_num: +class Pool(BasePool): + """Compability shim for any descendents external to this library.""" + + @deprecate( + 'The ``Pool`` baseclass has been replaced by ``BasePool`` class.') + def __init__(self, service, name): + super(Pool, self).__init__(service, name=name) + + def create(self): + pass + + +class ReplicatedPool(BasePool): + def __init__(self, service, name=None, pg_num=None, replicas=None, + percent_data=None, app_name=None, op=None): + """Initialize ReplicatedPool object. + + Pool information is either initialized from individual keyword + arguments or from a individual CephBrokerRq operation Dict. + + Please refer to the docstring of the ``BasePool`` class for + documentation of the common parameters. + + :param pg_num: Express wish for number of Placement Groups (this value + is subject to validation against a running cluster prior + to use to avoid creating a pool with too many PGs) + :type pg_num: int + :param replicas: Number of copies there should be of each object added + to this replicated pool. + :type replicas: int + :raises: KeyError + """ + # NOTE: Do not perform initialization steps that require live data from + # a running cluster here. The *Pool classes may be used for validation. + + # The common parameters are handled in our parents initializer + super(ReplicatedPool, self).__init__( + service=service, name=name, percent_data=percent_data, + app_name=app_name, op=op) + + if op: + # When initializing from op `replicas` is a required attribute, and + # we will fail with KeyError if it is not provided. + self.replicas = op['replicas'] + self.pg_num = op.get('pg_num') + else: + self.replicas = replicas or 2 + self.pg_num = pg_num + + def _create(self): + # Do extra validation on pg_num with data from live cluster + if self.pg_num: # Since the number of placement groups were specified, ensure # that there aren't too many created. max_pgs = self.get_pgs(self.replicas, 100.0) - self.pg_num = min(pg_num, max_pgs) + self.pg_num = min(self.pg_num, max_pgs) else: - self.pg_num = self.get_pgs(self.replicas, percent_data) - if app_name: - self.app_name = app_name + self.pg_num = self.get_pgs(self.replicas, self.percent_data) + + # Create it + if self.nautilus_or_later: + cmd = [ + 'ceph', '--id', self.service, 'osd', 'pool', 'create', + '--pg-num-min={}'.format( + min(AUTOSCALER_DEFAULT_PGS, self.pg_num) + ), + self.name, str(self.pg_num) + ] else: - self.app_name = 'unknown' + cmd = [ + 'ceph', '--id', self.service, 'osd', 'pool', 'create', + self.name, str(self.pg_num) + ] + check_call(cmd) - def create(self): - if not pool_exists(self.service, self.name): - nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 - # Create it - if nautilus_or_later: - cmd = [ - 'ceph', '--id', self.service, 'osd', 'pool', 'create', - '--pg-num-min={}'.format( - min(AUTOSCALER_DEFAULT_PGS, self.pg_num) - ), - self.name, str(self.pg_num) - ] - else: - cmd = [ - 'ceph', '--id', self.service, 'osd', 'pool', 'create', - self.name, str(self.pg_num) - ] - - try: - check_call(cmd) - # Set the pool replica size - update_pool(client=self.service, - pool=self.name, - settings={'size': str(self.replicas)}) - if nautilus_or_later: - # Ensure we set the expected pool ratio - update_pool(client=self.service, - pool=self.name, - settings={'target_size_ratio': str(self.percent_data / 100.0)}) - try: - set_app_name_for_pool(client=self.service, - pool=self.name, - name=self.app_name) - except CalledProcessError: - log('Could not set app name for pool {}'.format(self.name), level=WARNING) - if 'pg_autoscaler' in enabled_manager_modules(): - try: - enable_pg_autoscale(self.service, self.name) - except CalledProcessError as e: - log('Could not configure auto scaling for pool {}: {}'.format( - self.name, e), level=WARNING) - except CalledProcessError: - raise + def _post_create(self): + # Set the pool replica size + update_pool(client=self.service, + pool=self.name, + settings={'size': str(self.replicas)}) + # Perform other common post pool creation tasks + super(ReplicatedPool, self)._post_create() -# Default jerasure erasure coded pool -class ErasurePool(Pool): - def __init__(self, service, name, erasure_code_profile="default", - percent_data=10.0, app_name=None): - super(ErasurePool, self).__init__(service=service, name=name) - self.erasure_code_profile = erasure_code_profile - self.percent_data = percent_data - if app_name: - self.app_name = app_name +class ErasurePool(BasePool): + """Default jerasure erasure coded pool.""" + + def __init__(self, service, name=None, erasure_code_profile=None, + percent_data=None, app_name=None, op=None, + allow_ec_overwrites=False): + """Initialize ReplicatedPool object. + + Pool information is either initialized from individual keyword + arguments or from a individual CephBrokerRq operation Dict. + + Please refer to the docstring of the ``BasePool`` class for + documentation of the common parameters. + + :param erasure_code_profile: EC Profile to use (default: 'default') + :type erasure_code_profile: Optional[str] + """ + # NOTE: Do not perform initialization steps that require live data from + # a running cluster here. The *Pool classes may be used for validation. + + # The common parameters are handled in our parents initializer + super(ErasurePool, self).__init__( + service=service, name=name, percent_data=percent_data, + app_name=app_name, op=op) + + if op: + # Note that the different default when initializing from op stems + # from different handling of this in the `charms.ceph` library. + self.erasure_code_profile = op.get('erasure-profile', + 'default-canonical') + self.allow_ec_overwrites = op.get('allow-ec-overwrites') else: - self.app_name = 'unknown' + # We keep the class default when initialized from keyword arguments + # to not break the API for any other consumers. + self.erasure_code_profile = erasure_code_profile or 'default' + self.allow_ec_overwrites = allow_ec_overwrites - def create(self): - if not pool_exists(self.service, self.name): - # Try to find the erasure profile information in order to properly - # size the number of placement groups. The size of an erasure - # coded placement group is calculated as k+m. - erasure_profile = get_erasure_profile(self.service, - self.erasure_code_profile) + def _create(self): + # Try to find the erasure profile information in order to properly + # size the number of placement groups. The size of an erasure + # coded placement group is calculated as k+m. + erasure_profile = get_erasure_profile(self.service, + self.erasure_code_profile) - # Check for errors - if erasure_profile is None: - msg = ("Failed to discover erasure profile named " - "{}".format(self.erasure_code_profile)) - log(msg, level=ERROR) - raise PoolCreationError(msg) - if 'k' not in erasure_profile or 'm' not in erasure_profile: - # Error - msg = ("Unable to find k (data chunks) or m (coding chunks) " - "in erasure profile {}".format(erasure_profile)) - log(msg, level=ERROR) - raise PoolCreationError(msg) + # Check for errors + if erasure_profile is None: + msg = ("Failed to discover erasure profile named " + "{}".format(self.erasure_code_profile)) + log(msg, level=ERROR) + raise PoolCreationError(msg) + if 'k' not in erasure_profile or 'm' not in erasure_profile: + # Error + msg = ("Unable to find k (data chunks) or m (coding chunks) " + "in erasure profile {}".format(erasure_profile)) + log(msg, level=ERROR) + raise PoolCreationError(msg) - k = int(erasure_profile['k']) - m = int(erasure_profile['m']) - pgs = self.get_pgs(k + m, self.percent_data) - nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 - # Create it - if nautilus_or_later: - cmd = [ - 'ceph', '--id', self.service, 'osd', 'pool', 'create', - '--pg-num-min={}'.format( - min(AUTOSCALER_DEFAULT_PGS, pgs) - ), - self.name, str(pgs), str(pgs), - 'erasure', self.erasure_code_profile - ] - else: - cmd = [ - 'ceph', '--id', self.service, 'osd', 'pool', 'create', - self.name, str(pgs), str(pgs), - 'erasure', self.erasure_code_profile - ] + k = int(erasure_profile['k']) + m = int(erasure_profile['m']) + pgs = self.get_pgs(k + m, self.percent_data) + self.nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 + # Create it + if self.nautilus_or_later: + cmd = [ + 'ceph', '--id', self.service, 'osd', 'pool', 'create', + '--pg-num-min={}'.format( + min(AUTOSCALER_DEFAULT_PGS, pgs) + ), + self.name, str(pgs), str(pgs), + 'erasure', self.erasure_code_profile + ] + else: + cmd = [ + 'ceph', '--id', self.service, 'osd', 'pool', 'create', + self.name, str(pgs), str(pgs), + 'erasure', self.erasure_code_profile + ] + check_call(cmd) - try: - check_call(cmd) - try: - set_app_name_for_pool(client=self.service, - pool=self.name, - name=self.app_name) - except CalledProcessError: - log('Could not set app name for pool {}'.format(self.name), level=WARNING) - if nautilus_or_later: - # Ensure we set the expected pool ratio - update_pool(client=self.service, - pool=self.name, - settings={'target_size_ratio': str(self.percent_data / 100.0)}) - if 'pg_autoscaler' in enabled_manager_modules(): - try: - enable_pg_autoscale(self.service, self.name) - except CalledProcessError as e: - log('Could not configure auto scaling for pool {}: {}'.format( - self.name, e), level=WARNING) - except CalledProcessError: - raise - - """Get an existing erasure code profile if it already exists. - Returns json formatted output""" + def _post_create(self): + super(ErasurePool, self)._post_create() + if self.allow_ec_overwrites: + update_pool(self.service, self.name, + {'allow_ec_overwrites': 'true'}) def enabled_manager_modules(): @@ -541,22 +779,28 @@ def enabled_manager_modules(): def enable_pg_autoscale(service, pool_name): - """ - Enable Ceph's PG autoscaler for the specified pool. + """Enable Ceph's PG autoscaler for the specified pool. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types. The name of the pool to enable sutoscaling on - :raise: CalledProcessError if the command fails + :param service: The Ceph user name to run the command under + :type service: str + :param pool_name: The name of the pool to enable sutoscaling on + :type pool_name: str + :raises: CalledProcessError if the command fails """ - check_call(['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, 'pg_autoscale_mode', 'on']) + check_call([ + 'ceph', '--id', service, + 'osd', 'pool', 'set', pool_name, 'pg_autoscale_mode', 'on']) def get_mon_map(service): - """ - Returns the current monitor map. - :param service: six.string_types. The Ceph user name to run the command under - :return: json string. :raise: ValueError if the monmap fails to parse. - Also raises CalledProcessError if our ceph command fails + """Return the current monitor map. + + :param service: The Ceph user name to run the command under + :type service: str + :returns: Dictionary with monitor map data + :rtype: Dict[str,any] + :raises: ValueError if the monmap fails to parse, CalledProcessError if our + ceph command fails. """ try: mon_status = check_output(['ceph', '--id', service, @@ -576,17 +820,16 @@ def get_mon_map(service): def hash_monitor_names(service): - """ + """Get a sorted list of monitor hashes in ascending order. + Uses the get_mon_map() function to get information about the monitor - cluster. - Hash the name of each monitor. Return a sorted list of monitor hashes - in an ascending order. - :param service: six.string_types. The Ceph user name to run the command under - :rtype : dict. json dict of monitor name, ip address and rank - example: { - 'name': 'ip-172-31-13-165', - 'rank': 0, - 'addr': '172.31.13.165:6789/0'} + cluster. Hash the name of each monitor. + + :param service: The Ceph user name to run the command under. + :type service: str + :returns: a sorted list of monitor hashes in an ascending order. + :rtype : List[str] + :raises: CalledProcessError, ValueError """ try: hash_list = [] @@ -603,46 +846,56 @@ def hash_monitor_names(service): def monitor_key_delete(service, key): - """ - Delete a key and value pair from the monitor cluster - :param service: six.string_types. The Ceph user name to run the command under + """Delete a key and value pair from the monitor cluster. + Deletes a key value pair on the monitor cluster. - :param key: six.string_types. The key to delete. + + :param service: The Ceph user name to run the command under + :type service: str + :param key: The key to delete. + :type key: str + :raises: CalledProcessError """ try: check_output( ['ceph', '--id', service, 'config-key', 'del', str(key)]) except CalledProcessError as e: - log("Monitor config-key put failed with message: {}".format( - e.output)) + log("Monitor config-key put failed with message: {}" + .format(e.output)) raise def monitor_key_set(service, key, value): - """ - Sets a key value pair on the monitor cluster. - :param service: six.string_types. The Ceph user name to run the command under - :param key: six.string_types. The key to set. - :param value: The value to set. This will be converted to a string - before setting + """Set a key value pair on the monitor cluster. + + :param service: The Ceph user name to run the command under. + :type service str + :param key: The key to set. + :type key: str + :param value: The value to set. This will be coerced into a string. + :type value: str + :raises: CalledProcessError """ try: check_output( ['ceph', '--id', service, 'config-key', 'put', str(key), str(value)]) except CalledProcessError as e: - log("Monitor config-key put failed with message: {}".format( - e.output)) + log("Monitor config-key put failed with message: {}" + .format(e.output)) raise def monitor_key_get(service, key): - """ - Gets the value of an existing key in the monitor cluster. - :param service: six.string_types. The Ceph user name to run the command under - :param key: six.string_types. The key to search for. + """Get the value of an existing key in the monitor cluster. + + :param service: The Ceph user name to run the command under + :type service: str + :param key: The key to search for. + :type key: str :return: Returns the value of that key or None if not found. + :rtype: Optional[str] """ try: output = check_output( @@ -650,19 +903,21 @@ def monitor_key_get(service, key): 'config-key', 'get', str(key)]).decode('UTF-8') return output except CalledProcessError as e: - log("Monitor config-key get failed with message: {}".format( - e.output)) + log("Monitor config-key get failed with message: {}" + .format(e.output)) return None def monitor_key_exists(service, key): - """ - Searches for the existence of a key in the monitor cluster. - :param service: six.string_types. The Ceph user name to run the command under - :param key: six.string_types. The key to search for - :return: Returns True if the key exists, False if not and raises an - exception if an unknown error occurs. :raise: CalledProcessError if - an unknown error occurs + """Search for existence of key in the monitor cluster. + + :param service: The Ceph user name to run the command under. + :type service: str + :param key: The key to search for. + :type key: str + :return: Returns True if the key exists, False if not. + :rtype: bool + :raises: CalledProcessError if an unknown error occurs. """ try: check_call( @@ -675,16 +930,20 @@ def monitor_key_exists(service, key): if e.returncode == errno.ENOENT: return False else: - log("Unknown error from ceph config-get exists: {} {}".format( - e.returncode, e.output)) + log("Unknown error from ceph config-get exists: {} {}" + .format(e.returncode, e.output)) raise def get_erasure_profile(service, name): - """ - :param service: six.string_types. The Ceph user name to run the command under - :param name: - :return: + """Get an existing erasure code profile if it exists. + + :param service: The Ceph user name to run the command under. + :type service: str + :param name: Name of profile. + :type name: str + :returns: Dictionary with profile data. + :rtype: Optional[Dict[str]] """ try: out = check_output(['ceph', '--id', service, @@ -698,54 +957,61 @@ def get_erasure_profile(service, name): def pool_set(service, pool_name, key, value): + """Sets a value for a RADOS pool in ceph. + + :param service: The Ceph user name to run the command under. + :type service: str + :param pool_name: Name of pool to set property on. + :type pool_name: str + :param key: Property key. + :type key: str + :param value: Value, will be coerced into str and shifted to lowercase. + :type value: str + :raises: CalledProcessError """ - Sets a value for a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param key: six.string_types - :param value: - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, - str(value).lower()] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'set', pool_name, key, str(value).lower()] + check_call(cmd) def snapshot_pool(service, pool_name, snapshot_name): + """Snapshots a RADOS pool in Ceph. + + :param service: The Ceph user name to run the command under. + :type service: str + :param pool_name: Name of pool to snapshot. + :type pool_name: str + :param snapshot_name: Name of snapshot to create. + :type snapshot_name: str + :raises: CalledProcessError """ - Snapshots a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param snapshot_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'mksnap', pool_name, snapshot_name] + check_call(cmd) def remove_pool_snapshot(service, pool_name, snapshot_name): + """Remove a snapshot from a RADOS pool in Ceph. + + :param service: The Ceph user name to run the command under. + :type service: str + :param pool_name: Name of pool to remove snapshot from. + :type pool_name: str + :param snapshot_name: Name of snapshot to remove. + :type snapshot_name: str + :raises: CalledProcessError """ - Remove a snapshot from a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param snapshot_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'rmsnap', pool_name, snapshot_name] + check_call(cmd) def set_pool_quota(service, pool_name, max_bytes=None, max_objects=None): - """ + """Set byte quota on a RADOS pool in Ceph. + :param service: The Ceph user name to run the command under :type service: str :param pool_name: Name of pool @@ -756,7 +1022,9 @@ def set_pool_quota(service, pool_name, max_bytes=None, max_objects=None): :type max_objects: int :raises: subprocess.CalledProcessError """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name] + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'set-quota', pool_name] if max_bytes: cmd = cmd + ['max_bytes', str(max_bytes)] if max_objects: @@ -765,119 +1033,216 @@ def set_pool_quota(service, pool_name, max_bytes=None, max_objects=None): def remove_pool_quota(service, pool_name): + """Remove byte quota on a RADOS pool in Ceph. + + :param service: The Ceph user name to run the command under. + :type service: str + :param pool_name: Name of pool to remove quota from. + :type pool_name: str + :raises: CalledProcessError """ - Set a byte quota on a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0'] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0'] + check_call(cmd) def remove_erasure_profile(service, profile_name): + """Remove erasure code profile. + + :param service: The Ceph user name to run the command under + :type service: str + :param profile_name: Name of profile to remove. + :type profile_name: str + :raises: CalledProcessError """ - Create a new erasure code profile if one does not already exist for it. Updates - the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ - for more details - :param service: six.string_types. The Ceph user name to run the command under - :param profile_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm', - profile_name] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'erasure-code-profile', 'rm', profile_name] + check_call(cmd) -def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure', - failure_domain='host', +def create_erasure_profile(service, profile_name, + erasure_plugin_name='jerasure', + failure_domain=None, data_chunks=2, coding_chunks=1, locality=None, durability_estimator=None, - device_class=None): - """ - Create a new erasure code profile if one does not already exist for it. Updates - the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ - for more details - :param service: six.string_types. The Ceph user name to run the command under - :param profile_name: six.string_types - :param erasure_plugin_name: six.string_types - :param failure_domain: six.string_types. One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', - 'room', 'root', 'row']) - :param data_chunks: int - :param coding_chunks: int - :param locality: int - :param durability_estimator: int - :param device_class: six.string_types - :return: None. Can raise CalledProcessError - """ - # Ensure this failure_domain is allowed by Ceph - validator(failure_domain, six.string_types, - ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row']) + helper_chunks=None, + scalar_mds=None, + crush_locality=None, + device_class=None, + erasure_plugin_technique=None): + """Create a new erasure code profile if one does not already exist for it. - cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name, - 'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks) - ] - if locality is not None and durability_estimator is not None: - raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.") + Updates the profile if it exists. Please refer to [0] for more details. + + 0: http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ + + :param service: The Ceph user name to run the command under. + :type service: str + :param profile_name: Name of profile. + :type profile_name: str + :param erasure_plugin_name: Erasure code plugin. + :type erasure_plugin_name: str + :param failure_domain: Failure domain, one of: + ('chassis', 'datacenter', 'host', 'osd', 'pdu', + 'pod', 'rack', 'region', 'room', 'root', 'row'). + :type failure_domain: str + :param data_chunks: Number of data chunks. + :type data_chunks: int + :param coding_chunks: Number of coding chunks. + :type coding_chunks: int + :param locality: Locality. + :type locality: int + :param durability_estimator: Durability estimator. + :type durability_estimator: int + :param helper_chunks: int + :type helper_chunks: int + :param device_class: Restrict placement to devices of specific class. + :type device_class: str + :param scalar_mds: one of ['isa', 'jerasure', 'shec'] + :type scalar_mds: str + :param crush_locality: LRC locality faulure domain, one of: + ('chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', + 'rack', 'region', 'room', 'root', 'row') or unset. + :type crush_locaity: str + :param erasure_plugin_technique: Coding technique for EC plugin + :type erasure_plugin_technique: str + :return: None. Can raise CalledProcessError, ValueError or AssertionError + """ + plugin_techniques = { + 'jerasure': [ + 'reed_sol_van', + 'reed_sol_r6_op', + 'cauchy_orig', + 'cauchy_good', + 'liberation', + 'blaum_roth', + 'liber8tion' + ], + 'lrc': [], + 'isa': [ + 'reed_sol_van', + 'cauchy', + ], + 'shec': [ + 'single', + 'multiple' + ], + 'clay': [], + } + failure_domains = [ + 'chassis', 'datacenter', + 'host', 'osd', + 'pdu', 'pod', + 'rack', 'region', + 'room', 'root', + 'row', + ] + device_classes = [ + 'ssd', + 'hdd', + 'nvme' + ] + + validator(erasure_plugin_name, six.string_types, + list(plugin_techniques.keys())) + + cmd = [ + 'ceph', '--id', service, + 'osd', 'erasure-code-profile', 'set', profile_name, + 'plugin={}'.format(erasure_plugin_name), + 'k={}'.format(str(data_chunks)), + 'm={}'.format(str(coding_chunks)), + ] + + if erasure_plugin_technique: + validator(erasure_plugin_technique, six.string_types, + plugin_techniques[erasure_plugin_name]) + cmd.append('technique={}'.format(erasure_plugin_technique)) luminous_or_later = cmp_pkgrevno('ceph-common', '12.0.0') >= 0 - # failure_domain changed in luminous - if luminous_or_later: - cmd.append('crush-failure-domain=' + failure_domain) - else: - cmd.append('ruleset-failure-domain=' + failure_domain) + + # Set failure domain from options if not provided in args + if not failure_domain and config('customize-failure-domain'): + # Defaults to 'host' so just need to deal with + # setting 'rack' if feature is enabled + failure_domain = 'rack' + + if failure_domain: + validator(failure_domain, six.string_types, failure_domains) + # failure_domain changed in luminous + if luminous_or_later: + cmd.append('crush-failure-domain={}'.format(failure_domain)) + else: + cmd.append('ruleset-failure-domain={}'.format(failure_domain)) # device class new in luminous if luminous_or_later and device_class: + validator(device_class, six.string_types, device_classes) cmd.append('crush-device-class={}'.format(device_class)) else: log('Skipping device class configuration (ceph < 12.0.0)', level=DEBUG) # Add plugin specific information - if locality is not None: - # For local erasure codes - cmd.append('l=' + str(locality)) - if durability_estimator is not None: - # For Shec erasure codes - cmd.append('c=' + str(durability_estimator)) + if erasure_plugin_name == 'lrc': + # LRC mandatory configuration + if locality: + cmd.append('l={}'.format(str(locality))) + else: + raise ValueError("locality must be provided for lrc plugin") + # LRC optional configuration + if crush_locality: + validator(crush_locality, six.string_types, failure_domains) + cmd.append('crush-locality={}'.format(crush_locality)) + + if erasure_plugin_name == 'shec': + # SHEC optional configuration + if durability_estimator: + cmd.append('c={}'.format((durability_estimator))) + + if erasure_plugin_name == 'clay': + # CLAY optional configuration + if helper_chunks: + cmd.append('d={}'.format(str(helper_chunks))) + if scalar_mds: + cmd.append('scalar-mds={}'.format(scalar_mds)) if erasure_profile_exists(service, profile_name): cmd.append('--force') - try: - check_call(cmd) - except CalledProcessError: - raise + check_call(cmd) def rename_pool(service, old_name, new_name): - """ - Rename a Ceph pool from old_name to new_name - :param service: six.string_types. The Ceph user name to run the command under - :param old_name: six.string_types - :param new_name: six.string_types - :return: None + """Rename a Ceph pool from old_name to new_name. + + :param service: The Ceph user name to run the command under. + :type service: str + :param old_name: Name of pool subject to rename. + :type old_name: str + :param new_name: Name to rename pool to. + :type new_name: str """ validator(value=old_name, valid_type=six.string_types) validator(value=new_name, valid_type=six.string_types) - cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name] + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'rename', old_name, new_name] check_call(cmd) def erasure_profile_exists(service, name): - """ - Check to see if an Erasure code profile already exists. - :param service: six.string_types. The Ceph user name to run the command under - :param name: six.string_types - :return: int or None + """Check to see if an Erasure code profile already exists. + + :param service: The Ceph user name to run the command under + :type service: str + :param name: Name of profile to look for. + :type name: str + :returns: True if it exists, False otherwise. + :rtype: bool """ validator(value=name, valid_type=six.string_types) try: @@ -890,11 +1255,14 @@ def erasure_profile_exists(service, name): def get_cache_mode(service, pool_name): - """ - Find the current caching mode of the pool_name given. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :return: int or None + """Find the current caching mode of the pool_name given. + + :param service: The Ceph user name to run the command under + :type service: str + :param pool_name: Name of pool. + :type pool_name: str + :returns: Current cache mode. + :rtype: Optional[int] """ validator(value=service, valid_type=six.string_types) validator(value=pool_name, valid_type=six.string_types) @@ -976,17 +1344,23 @@ def create_rbd_image(service, pool, image, sizemb): def update_pool(client, pool, settings): + """Update pool properties. + + :param client: Client/User-name to authenticate with. + :type client: str + :param pool: Name of pool to operate on + :type pool: str + :param settings: Dictionary with key/value pairs to set. + :type settings: Dict[str, str] + :raises: CalledProcessError + """ cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool] for k, v in six.iteritems(settings): - cmd.append(k) - cmd.append(v) - - check_call(cmd) + check_call(cmd + [k, v]) def set_app_name_for_pool(client, pool, name): - """ - Calls `osd pool application enable` for the specified pool name + """Calls `osd pool application enable` for the specified pool name :param client: Name of the ceph client to use :type client: str @@ -1043,8 +1417,7 @@ def _keyring_path(service): def add_key(service, key): - """ - Add a key to a keyring. + """Add a key to a keyring. Creates the keyring if it doesn't already exist. @@ -1288,13 +1661,33 @@ class CephBrokerRq(object): The API is versioned and defaults to version 1. """ - def __init__(self, api_version=1, request_id=None): - self.api_version = api_version - if request_id: - self.request_id = request_id + def __init__(self, api_version=1, request_id=None, raw_request_data=None): + """Initialize CephBrokerRq object. + + Builds a new empty request or rebuilds a request from on-wire JSON + data. + + :param api_version: API version for request (default: 1). + :type api_version: Optional[int] + :param request_id: Unique identifier for request. + (default: string representation of generated UUID) + :type request_id: Optional[str] + :param raw_request_data: JSON-encoded string to build request from. + :type raw_request_data: Optional[str] + :raises: KeyError + """ + if raw_request_data: + request_data = json.loads(raw_request_data) + self.api_version = request_data['api-version'] + self.request_id = request_data['request-id'] + self.set_ops(request_data['ops']) else: - self.request_id = str(uuid.uuid1()) - self.ops = [] + self.api_version = api_version + if request_id: + self.request_id = request_id + else: + self.request_id = str(uuid.uuid1()) + self.ops = [] def add_op(self, op): """Add an op if it is not already in the list. @@ -1336,12 +1729,119 @@ class CephBrokerRq(object): group=group, namespace=namespace, app_name=app_name, max_bytes=max_bytes, max_objects=max_objects) + # Use function parameters and docstring to define types in a compatible + # manner. + # + # NOTE: Our caller should always use a kwarg Dict when calling us so + # no need to maintain fixed order/position for parameters. Please keep them + # sorted by name when adding new ones. + def _partial_build_common_op_create(self, + app_name=None, + compression_algorithm=None, + compression_mode=None, + compression_required_ratio=None, + compression_min_blob_size=None, + compression_min_blob_size_hdd=None, + compression_min_blob_size_ssd=None, + compression_max_blob_size=None, + compression_max_blob_size_hdd=None, + compression_max_blob_size_ssd=None, + group=None, + max_bytes=None, + max_objects=None, + namespace=None, + weight=None): + """Build common part of a create pool operation. + + :param app_name: Tag pool with application name. Note that there is + certain protocols emerging upstream with regard to + meaningful application names to use. + Examples are 'rbd' and 'rgw'. + :type app_name: Optional[str] + :param compression_algorithm: Compressor to use, one of: + ('lz4', 'snappy', 'zlib', 'zstd') + :type compression_algorithm: Optional[str] + :param compression_mode: When to compress data, one of: + ('none', 'passive', 'aggressive', 'force') + :type compression_mode: Optional[str] + :param compression_required_ratio: Minimum compression ratio for data + chunk, if the requested ratio is not + achieved the compressed version will + be thrown away and the original + stored. + :type compression_required_ratio: Optional[float] + :param compression_min_blob_size: Chunks smaller than this are never + compressed (unit: bytes). + :type compression_min_blob_size: Optional[int] + :param compression_min_blob_size_hdd: Chunks smaller than this are not + compressed when destined to + rotational media (unit: bytes). + :type compression_min_blob_size_hdd: Optional[int] + :param compression_min_blob_size_ssd: Chunks smaller than this are not + compressed when destined to flash + media (unit: bytes). + :type compression_min_blob_size_ssd: Optional[int] + :param compression_max_blob_size: Chunks larger than this are broken + into N * compression_max_blob_size + chunks before being compressed + (unit: bytes). + :type compression_max_blob_size: Optional[int] + :param compression_max_blob_size_hdd: Chunks larger than this are + broken into + N * compression_max_blob_size_hdd + chunks before being compressed + when destined for rotational + media (unit: bytes) + :type compression_max_blob_size_hdd: Optional[int] + :param compression_max_blob_size_ssd: Chunks larger than this are + broken into + N * compression_max_blob_size_ssd + chunks before being compressed + when destined for flash media + (unit: bytes). + :type compression_max_blob_size_ssd: Optional[int] + :param group: Group to add pool to + :type group: Optional[str] + :param max_bytes: Maximum bytes quota to apply + :type max_bytes: Optional[int] + :param max_objects: Maximum objects quota to apply + :type max_objects: Optional[int] + :param namespace: Group namespace + :type namespace: Optional[str] + :param weight: The percentage of data that is expected to be contained + in the pool from the total available space on the OSDs. + Used to calculate number of Placement Groups to create + for pool. + :type weight: Optional[float] + :returns: Dictionary with kwarg name as key. + :rtype: Dict[str,any] + :raises: AssertionError + """ + return { + 'app-name': app_name, + 'compression-algorithm': compression_algorithm, + 'compression-mode': compression_mode, + 'compression-required-ratio': compression_required_ratio, + 'compression-min-blob-size': compression_min_blob_size, + 'compression-min-blob-size-hdd': compression_min_blob_size_hdd, + 'compression-min-blob-size-ssd': compression_min_blob_size_ssd, + 'compression-max-blob-size': compression_max_blob_size, + 'compression-max-blob-size-hdd': compression_max_blob_size_hdd, + 'compression-max-blob-size-ssd': compression_max_blob_size_ssd, + 'group': group, + 'max-bytes': max_bytes, + 'max-objects': max_objects, + 'group-namespace': namespace, + 'weight': weight, + } + def add_op_create_replicated_pool(self, name, replica_count=3, pg_num=None, - weight=None, group=None, namespace=None, - app_name=None, max_bytes=None, - max_objects=None): + **kwargs): """Adds an operation to create a replicated pool. + Refer to docstring for ``_partial_build_common_op_create`` for + documentation of keyword arguments. + :param name: Name of pool to create :type name: str :param replica_count: Number of copies Ceph should keep of your data. @@ -1349,66 +1849,114 @@ class CephBrokerRq(object): :param pg_num: Request specific number of Placement Groups to create for pool. :type pg_num: int - :param weight: The percentage of data that is expected to be contained - in the pool from the total available space on the OSDs. - Used to calculate number of Placement Groups to create - for pool. - :type weight: float - :param group: Group to add pool to - :type group: str - :param namespace: Group namespace - :type namespace: str - :param app_name: (Optional) Tag pool with application name. Note that - there is certain protocols emerging upstream with - regard to meaningful application names to use. - Examples are ``rbd`` and ``rgw``. - :type app_name: str - :param max_bytes: Maximum bytes quota to apply - :type max_bytes: int - :param max_objects: Maximum objects quota to apply - :type max_objects: int + :raises: AssertionError if provided data is of invalid type/range """ - if pg_num and weight: + if pg_num and kwargs.get('weight'): raise ValueError('pg_num and weight are mutually exclusive') - self.add_op({'op': 'create-pool', 'name': name, - 'replicas': replica_count, 'pg_num': pg_num, - 'weight': weight, 'group': group, - 'group-namespace': namespace, 'app-name': app_name, - 'max-bytes': max_bytes, 'max-objects': max_objects}) + op = { + 'op': 'create-pool', + 'name': name, + 'replicas': replica_count, + 'pg_num': pg_num, + } + op.update(self._partial_build_common_op_create(**kwargs)) + + # Initialize Pool-object to validate type and range of ops. + pool = ReplicatedPool('dummy-service', op=op) + pool.validate() + + self.add_op(op) def add_op_create_erasure_pool(self, name, erasure_profile=None, - weight=None, group=None, app_name=None, - max_bytes=None, max_objects=None): + allow_ec_overwrites=False, **kwargs): """Adds an operation to create a erasure coded pool. + Refer to docstring for ``_partial_build_common_op_create`` for + documentation of keyword arguments. + :param name: Name of pool to create :type name: str :param erasure_profile: Name of erasure code profile to use. If not set the ceph-mon unit handling the broker request will set its default value. :type erasure_profile: str - :param weight: The percentage of data that is expected to be contained - in the pool from the total available space on the OSDs. - :type weight: float - :param group: Group to add pool to - :type group: str - :param app_name: (Optional) Tag pool with application name. Note that - there is certain protocols emerging upstream with - regard to meaningful application names to use. - Examples are ``rbd`` and ``rgw``. - :type app_name: str - :param max_bytes: Maximum bytes quota to apply - :type max_bytes: int - :param max_objects: Maximum objects quota to apply - :type max_objects: int + :param allow_ec_overwrites: allow EC pools to be overriden + :type allow_ec_overwrites: bool + :raises: AssertionError if provided data is of invalid type/range """ - self.add_op({'op': 'create-pool', 'name': name, - 'pool-type': 'erasure', - 'erasure-profile': erasure_profile, - 'weight': weight, - 'group': group, 'app-name': app_name, - 'max-bytes': max_bytes, 'max-objects': max_objects}) + op = { + 'op': 'create-pool', + 'name': name, + 'pool-type': 'erasure', + 'erasure-profile': erasure_profile, + 'allow-ec-overwrites': allow_ec_overwrites, + } + op.update(self._partial_build_common_op_create(**kwargs)) + + # Initialize Pool-object to validate type and range of ops. + pool = ErasurePool('dummy-service', op) + pool.validate() + + self.add_op(op) + + def add_op_create_erasure_profile(self, name, + erasure_type='jerasure', + erasure_technique=None, + k=None, m=None, + failure_domain=None, + lrc_locality=None, + shec_durability_estimator=None, + clay_helper_chunks=None, + device_class=None, + clay_scalar_mds=None, + lrc_crush_locality=None): + """Adds an operation to create a erasure coding profile. + + :param name: Name of profile to create + :type name: str + :param erasure_type: Which of the erasure coding plugins should be used + :type erasure_type: string + :param erasure_technique: EC plugin technique to use + :type erasure_technique: string + :param k: Number of data chunks + :type k: int + :param m: Number of coding chunks + :type m: int + :param lrc_locality: Group the coding and data chunks into sets of size locality + (lrc plugin) + :type lrc_locality: int + :param durability_estimator: The number of parity chuncks each of which includes + a data chunk in its calculation range (shec plugin) + :type durability_estimator: int + :param helper_chunks: The number of helper chunks to use for recovery operations + (clay plugin) + :type: helper_chunks: int + :param failure_domain: Type of failure domain from Ceph bucket types + to be used + :type failure_domain: string + :param device_class: Device class to use for profile (ssd, hdd) + :type device_class: string + :param clay_scalar_mds: Plugin to use for CLAY layered construction + (jerasure|isa|shec) + :type clay_scaler_mds: string + :param lrc_crush_locality: Type of crush bucket in which set of chunks + defined by lrc_locality will be stored. + :type lrc_crush_locality: string + """ + self.add_op({'op': 'create-erasure-profile', + 'name': name, + 'k': k, + 'm': m, + 'l': lrc_locality, + 'c': shec_durability_estimator, + 'd': clay_helper_chunks, + 'erasure-type': erasure_type, + 'erasure-technique': erasure_technique, + 'failure-domain': failure_domain, + 'device-class': device_class, + 'scalar-mds': clay_scalar_mds, + 'crush-locality': lrc_crush_locality}) def set_ops(self, ops): """Set request ops to provided value. @@ -1424,12 +1972,14 @@ class CephBrokerRq(object): 'request-id': self.request_id}) def _ops_equal(self, other): + keys_to_compare = [ + 'replicas', 'name', 'op', 'pg_num', 'group-permission', + 'object-prefix-permissions', + ] + keys_to_compare += list(self._partial_build_common_op_create().keys()) if len(self.ops) == len(other.ops): for req_no in range(0, len(self.ops)): - for key in [ - 'replicas', 'name', 'op', 'pg_num', 'weight', - 'group', 'group-namespace', 'group-permission', - 'object-prefix-permissions']: + for key in keys_to_compare: if self.ops[req_no].get(key) != other.ops[req_no].get(key): return False else: @@ -1522,18 +2072,15 @@ class CephBrokerRsp(object): def get_previous_request(rid): """Return the last ceph broker request sent on a given relation - @param rid: Relation id to query for request + :param rid: Relation id to query for request + :type rid: str + :returns: CephBrokerRq object or None if relation data not found. + :rtype: Optional[CephBrokerRq] """ - request = None broker_req = relation_get(attribute='broker_req', rid=rid, unit=local_unit()) if broker_req: - request_data = json.loads(broker_req) - request = CephBrokerRq(api_version=request_data['api-version'], - request_id=request_data['request-id']) - request.set_ops(request_data['ops']) - - return request + return CephBrokerRq(raw_request_data=broker_req) def get_request_states(request, relation='ceph'): diff --git a/charmhelpers/contrib/unison/__init__.py b/charmhelpers/contrib/unison/__init__.py index 61409b14..0fd71b88 100644 --- a/charmhelpers/contrib/unison/__init__.py +++ b/charmhelpers/contrib/unison/__init__.py @@ -113,8 +113,8 @@ def create_private_key(user, priv_key_path, key_type='rsa'): check_call(cmd) else: log('SSH key already exists at %s.' % priv_key_path) - check_call(['chown', user, priv_key_path]) - check_call(['chmod', '0600', priv_key_path]) + os.chown(priv_key_path, pwd.getpwnam(user).pw_uid, -1) + os.chmod(priv_key_path, 0o600) def create_public_key(user, priv_key_path, pub_key_path): @@ -124,7 +124,7 @@ def create_public_key(user, priv_key_path, pub_key_path): p = check_output(cmd).strip() with open(pub_key_path, 'wb') as out: out.write(p) - check_call(['chown', user, pub_key_path]) + os.chown(pub_key_path, pwd.getpwnam(user).pw_uid, -1) def get_keypair(user): @@ -157,6 +157,7 @@ def write_authorized_keys(user, keys): with open(auth_keys, 'w') as out: for k in keys: out.write('%s\n' % k) + os.chown(auth_keys, pwd.getpwnam(user).pw_uid, -1) def write_known_hosts(user, hosts): @@ -172,6 +173,7 @@ def write_known_hosts(user, hosts): with open(known_hosts, 'w') as out: for host in khosts: out.write('%s\n' % host) + os.chown(known_hosts, pwd.getpwnam(user).pw_uid, -1) def ensure_user(user, group=None): diff --git a/test-requirements.txt b/test-requirements.txt index 98fa7bd3..56fbf922 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -15,6 +15,5 @@ flake8>=2.2.4 stestr>=2.2.0 coverage>=4.5.2 pyudev # for ceph-* charm unit tests (need to fix the ceph-* charm unit tests/mocking) -juju!=2.8.3 # this version causes spurious JujuAPIError's git+https://github.com/openstack-charmers/zaza.git#egg=zaza;python_version>='3.0' git+https://github.com/openstack-charmers/zaza-openstack-tests.git#egg=zaza.openstack