From 414701455f7931df1f1cc8e4e28fa3e6523d671c Mon Sep 17 00:00:00 2001 From: James Page Date: Tue, 28 Jul 2020 14:45:39 +0100 Subject: [PATCH] Add support for erasure coding Add support for use of Erasure Coded pools with the Ceph RADOS Gateway. Only the data pool is actually Erasure Coded - all other pools continue to be replicated but have much smaller data footprints. Depends-On: Iec4de19f7b39f0b08158d96c5cc1561b40aefa10 Change-Id: I661639e67853ff471a7d7ddea0e3fc2fcb30fed1 --- .gitignore | 1 + config.yaml | 98 ++ hooks/ceph_rgw.py | 56 +- hooks/charmhelpers/__init__.py | 10 +- .../charmhelpers/contrib/charmsupport/nrpe.py | 21 +- .../charmhelpers/contrib/openstack/context.py | 84 + .../contrib/openstack/templates/ceph.conf | 4 + .../templates/openstack_https_frontend | 10 +- .../templates/openstack_https_frontend.conf | 10 +- .../section-ceph-bluestore-compression | 28 + hooks/charmhelpers/contrib/openstack/utils.py | 17 +- .../contrib/storage/linux/ceph.py | 1395 ++++++++++++----- hooks/charmhelpers/core/host.py | 18 +- lib/charms_ceph/broker.py | 41 +- lib/charms_ceph/utils.py | 298 +++- unit_tests/test_ceph.py | 78 + 16 files changed, 1660 insertions(+), 509 deletions(-) create mode 100644 hooks/charmhelpers/contrib/openstack/templates/section-ceph-bluestore-compression diff --git a/.gitignore b/.gitignore index 0e21f066..4030da5b 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ tags .unit-state.db func-results.json .stestr/ +**/__pycache__ diff --git a/config.yaml b/config.yaml index 2123d97d..3dceefc1 100644 --- a/config.yaml +++ b/config.yaml @@ -121,6 +121,104 @@ options: that once a pool has been created, changes to this setting will be ignored. Setting this value to -1, enables the number of placement groups to be calculated based on the Ceph placement group calculator. + pool-type: + type: string + default: replicated + description: | + Ceph pool type to use for storage - valid values include ‘replicated’ + and ‘erasure-coded’. + ec-profile-name: + type: string + default: + description: | + Name for the EC profile to be created for the EC pools. If not defined + a profile name will be generated based on the name of the pool used by + the application. + ec-rbd-metadata-pool: + type: string + default: + description: | + Name of the metadata pool to be created (for RBD use-cases). If not + defined a metadata pool name will be generated based on the name of + the data pool used by the application. The metadata pool is always + replicated, not erasure coded. + ec-profile-k: + type: int + default: 1 + description: | + Number of data chunks that will be used for EC data pool. K+M factors + should never be greater than the number of available zones (or hosts) + for balancing. + ec-profile-m: + type: int + default: 2 + description: | + Number of coding chunks that will be used for EC data pool. K+M factors + should never be greater than the number of available zones (or hosts) + for balancing. + ec-profile-locality: + type: int + default: + description: | + (lrc plugin - l) Group the coding and data chunks into sets of size l. + For instance, for k=4 and m=2, when l=3 two groups of three are created. + Each set can be recovered without reading chunks from another set. Note + that using the lrc plugin does incur more raw storage usage than isa or + jerasure in order to reduce the cost of recovery operations. + ec-profile-crush-locality: + type: string + default: + description: | + (lrc plugin) The type of the crush bucket in which each set of chunks + defined by l will be stored. For instance, if it is set to rack, each + group of l chunks will be placed in a different rack. It is used to + create a CRUSH rule step such as step choose rack. If it is not set, + no such grouping is done. + ec-profile-durability-estimator: + type: int + default: + description: | + (shec plugin - c) The number of parity chunks each of which includes + each data chunk in its calculation range. The number is used as a + durability estimator. For instance, if c=2, 2 OSDs can be down + without losing data. + ec-profile-helper-chunks: + type: int + default: + description: | + (clay plugin - d) Number of OSDs requested to send data during + recovery of a single chunk. d needs to be chosen such that + k+1 <= d <= k+m-1. Larger the d, the better the savings. + ec-profile-scalar-mds: + type: string + default: + description: | + (clay plugin) specifies the plugin that is used as a building + block in the layered construction. It can be one of jerasure, + isa, shec (defaults to jerasure). + ec-profile-plugin: + type: string + default: jerasure + description: | + EC plugin to use for this applications pool. The following list of + plugins acceptable - jerasure, lrc, isa, shec, clay. + ec-profile-technique: + type: string + default: + description: | + EC profile technique used for this applications pool - will be + validated based on the plugin configured via ec-profile-plugin. + Supported techniques are ‘reed_sol_van’, ‘reed_sol_r6_op’, + ‘cauchy_orig’, ‘cauchy_good’, ‘liber8tion’ for jerasure, + ‘reed_sol_van’, ‘cauchy’ for isa and ‘single’, ‘multiple’ + for shec. + ec-profile-device-class: + type: string + default: + description: | + Device class from CRUSH map to use for placement groups for + erasure profile - valid values: ssd, hdd or nvme (or leave + unset to not use a device class). # Keystone integration operator-roles: type: string diff --git a/hooks/ceph_rgw.py b/hooks/ceph_rgw.py index 7d48ab8a..3aced5bd 100644 --- a/hooks/ceph_rgw.py +++ b/hooks/ceph_rgw.py @@ -18,6 +18,7 @@ import subprocess from charmhelpers.core.hookenv import ( config, + service_name, ) from charmhelpers.core.host import ( @@ -111,18 +112,61 @@ def get_create_rgw_pools_rq(prefix=None): replicas = config('ceph-osd-replication-count') prefix = prefix or 'default' - # Buckets likely to contain the most data and therefore # requiring the most PGs heavy = [ '.rgw.buckets.data' ] bucket_weight = config('rgw-buckets-pool-weight') - for pool in heavy: - pool = "{prefix}{pool}".format(prefix=prefix, pool=pool) - rq.add_op_create_pool(name=pool, replica_count=replicas, - weight=bucket_weight, group='objects', - app_name=CEPH_POOL_APP_NAME) + + if config('pool-type') == 'erasure-coded': + # General EC plugin config + plugin = config('ec-profile-plugin') + technique = config('ec-profile-technique') + device_class = config('ec-profile-device-class') + bdm_k = config('ec-profile-k') + bdm_m = config('ec-profile-m') + # LRC plugin config + bdm_l = config('ec-profile-locality') + crush_locality = config('ec-profile-crush-locality') + # SHEC plugin config + bdm_c = config('ec-profile-durability-estimator') + # CLAY plugin config + bdm_d = config('ec-profile-helper-chunks') + scalar_mds = config('ec-profile-scalar-mds') + # Profile name + service = service_name() + profile_name = ( + config('ec-profile-name') or "{}-profile".format(service) + ) + rq.add_op_create_erasure_profile( + name=profile_name, + k=bdm_k, m=bdm_m, + lrc_locality=bdm_l, + lrc_crush_locality=crush_locality, + shec_durability_estimator=bdm_c, + clay_helper_chunks=bdm_d, + clay_scalar_mds=scalar_mds, + device_class=device_class, + erasure_type=plugin, + erasure_technique=technique + ) + + for pool in heavy: + pool = "{prefix}{pool}".format(prefix=prefix, pool=pool) + rq.add_op_create_erasure_pool( + name=pool, + erasure_profile=profile_name, + weight=bucket_weight, + group="objects", + app_name=CEPH_POOL_APP_NAME + ) + else: + for pool in heavy: + pool = "{prefix}{pool}".format(prefix=prefix, pool=pool) + rq.add_op_create_pool(name=pool, replica_count=replicas, + weight=bucket_weight, group='objects', + app_name=CEPH_POOL_APP_NAME) # NOTE: we want these pools to have a smaller pg_num/pgp_num than the # others since they are not expected to contain as much data diff --git a/hooks/charmhelpers/__init__.py b/hooks/charmhelpers/__init__.py index 61ef9071..1f57ed2a 100644 --- a/hooks/charmhelpers/__init__.py +++ b/hooks/charmhelpers/__init__.py @@ -49,7 +49,8 @@ __deprecated_functions = {} def deprecate(warning, date=None, log=None): """Add a deprecation warning the first time the function is used. - The date, which is a string in semi-ISO8660 format indicate the year-month + + The date which is a string in semi-ISO8660 format indicates the year-month that the function is officially going to be removed. usage: @@ -62,10 +63,11 @@ def deprecate(warning, date=None, log=None): The reason for passing the logging function (log) is so that hookenv.log can be used for a charm if needed. - :param warning: String to indicat where it has moved ot. - :param date: optional sting, in YYYY-MM format to indicate when the + :param warning: String to indicate what is to be used instead. + :param date: Optional string in YYYY-MM format to indicate when the function will definitely (probably) be removed. - :param log: The log function to call to log. If not, logs to stdout + :param log: The log function to call in order to log. If None, logs to + stdout """ def wrap(f): diff --git a/hooks/charmhelpers/contrib/charmsupport/nrpe.py b/hooks/charmhelpers/contrib/charmsupport/nrpe.py index d775861b..14b80d96 100644 --- a/hooks/charmhelpers/contrib/charmsupport/nrpe.py +++ b/hooks/charmhelpers/contrib/charmsupport/nrpe.py @@ -18,14 +18,14 @@ # Authors: # Matthew Wedgwood -import subprocess -import pwd +import glob import grp import os -import glob -import shutil +import pwd import re import shlex +import shutil +import subprocess import yaml from charmhelpers.core.hookenv import ( @@ -265,6 +265,11 @@ class NRPE(object): relation_set(relation_id=rid, relation_settings={'primary': self.primary}) self.remove_check_queue = set() + @classmethod + def does_nrpe_conf_dir_exist(cls): + """Return True if th nrpe_confdif directory exists.""" + return os.path.isdir(cls.nrpe_confdir) + def add_check(self, *args, **kwargs): shortname = None if kwargs.get('shortname') is None: @@ -310,6 +315,12 @@ class NRPE(object): nrpe_monitors = {} monitors = {"monitors": {"remote": {"nrpe": nrpe_monitors}}} + + # check that the charm can write to the conf dir. If not, then nagios + # probably isn't installed, and we can defer. + if not self.does_nrpe_conf_dir_exist(): + return + for nrpecheck in self.checks: nrpecheck.write(self.nagios_context, self.hostname, self.nagios_servicegroups) @@ -400,7 +411,7 @@ def add_init_service_checks(nrpe, services, unit_name, immediate_check=True): upstart_init = '/etc/init/%s.conf' % svc sysv_init = '/etc/init.d/%s' % svc - if host.init_is_systemd(): + if host.init_is_systemd(service_name=svc): nrpe.add_check( shortname=svc, description='process check {%s}' % unit_name, diff --git a/hooks/charmhelpers/contrib/openstack/context.py b/hooks/charmhelpers/contrib/openstack/context.py index 42abccf7..0e41a9f3 100644 --- a/hooks/charmhelpers/contrib/openstack/context.py +++ b/hooks/charmhelpers/contrib/openstack/context.py @@ -29,6 +29,8 @@ from subprocess import check_call, CalledProcessError import six +import charmhelpers.contrib.storage.linux.ceph as ch_ceph + from charmhelpers.contrib.openstack.audits.openstack_security_guide import ( _config_ini as config_ini ) @@ -56,6 +58,7 @@ from charmhelpers.core.hookenv import ( status_set, network_get_primary_address, WARNING, + service_name, ) from charmhelpers.core.sysctl import create as sysctl_create @@ -808,6 +811,12 @@ class CephContext(OSContextGenerator): ctxt['mon_hosts'] = ' '.join(sorted(mon_hosts)) + if config('pool-type') and config('pool-type') == 'erasure-coded': + base_pool_name = config('rbd-pool') or config('rbd-pool-name') + if not base_pool_name: + base_pool_name = service_name() + ctxt['rbd_default_data_pool'] = base_pool_name + if not os.path.isdir('/etc/ceph'): os.mkdir('/etc/ceph') @@ -3175,3 +3184,78 @@ class SRIOVContext(OSContextGenerator): :rtype: Dict[str,int] """ return self._map + + +class CephBlueStoreCompressionContext(OSContextGenerator): + """Ceph BlueStore compression options.""" + + # Tuple with Tuples that map configuration option name to CephBrokerRq op + # property name + options = ( + ('bluestore-compression-algorithm', + 'compression-algorithm'), + ('bluestore-compression-mode', + 'compression-mode'), + ('bluestore-compression-required-ratio', + 'compression-required-ratio'), + ('bluestore-compression-min-blob-size', + 'compression-min-blob-size'), + ('bluestore-compression-min-blob-size-hdd', + 'compression-min-blob-size-hdd'), + ('bluestore-compression-min-blob-size-ssd', + 'compression-min-blob-size-ssd'), + ('bluestore-compression-max-blob-size', + 'compression-max-blob-size'), + ('bluestore-compression-max-blob-size-hdd', + 'compression-max-blob-size-hdd'), + ('bluestore-compression-max-blob-size-ssd', + 'compression-max-blob-size-ssd'), + ) + + def __init__(self): + """Initialize context by loading values from charm config. + + We keep two maps, one suitable for use with CephBrokerRq's and one + suitable for template generation. + """ + charm_config = config() + + # CephBrokerRq op map + self.op = {} + # Context exposed for template generation + self.ctxt = {} + for config_key, op_key in self.options: + value = charm_config.get(config_key) + self.ctxt.update({config_key.replace('-', '_'): value}) + self.op.update({op_key: value}) + + def __call__(self): + """Get context. + + :returns: Context + :rtype: Dict[str,any] + """ + return self.ctxt + + def get_op(self): + """Get values for use in CephBrokerRq op. + + :returns: Context values with CephBrokerRq op property name as key. + :rtype: Dict[str,any] + """ + return self.op + + def validate(self): + """Validate options. + + :raises: AssertionError + """ + # We slip in a dummy name on class instantiation to allow validation of + # the other options. It will not affect further use. + # + # NOTE: once we retire Python 3.5 we can fold this into a in-line + # dictionary comprehension in the call to the initializer. + dummy_op = {'name': 'dummy-name'} + dummy_op.update(self.op) + pool = ch_ceph.BasePool('dummy-service', op=dummy_op) + pool.validate() diff --git a/hooks/charmhelpers/contrib/openstack/templates/ceph.conf b/hooks/charmhelpers/contrib/openstack/templates/ceph.conf index a11ce8ab..c0f22360 100644 --- a/hooks/charmhelpers/contrib/openstack/templates/ceph.conf +++ b/hooks/charmhelpers/contrib/openstack/templates/ceph.conf @@ -22,3 +22,7 @@ rbd default features = {{ rbd_features }} {{ key }} = {{ value }} {% endfor -%} {%- endif %} + +{% if rbd_default_data_pool -%} +rbd default data pool = {{ rbd_default_data_pool }} +{% endif %} diff --git a/hooks/charmhelpers/contrib/openstack/templates/openstack_https_frontend b/hooks/charmhelpers/contrib/openstack/templates/openstack_https_frontend index f614b3fa..530719e9 100644 --- a/hooks/charmhelpers/contrib/openstack/templates/openstack_https_frontend +++ b/hooks/charmhelpers/contrib/openstack/templates/openstack_https_frontend @@ -6,8 +6,14 @@ Listen {{ ext_port }} ServerName {{ endpoint }} SSLEngine on - SSLProtocol +TLSv1 +TLSv1.1 +TLSv1.2 - SSLCipherSuite HIGH:!RC4:!MD5:!aNULL:!eNULL:!EXP:!LOW:!MEDIUM + + # This section is based on Mozilla's recommendation + # as the "intermediate" profile as of July 7th, 2020. + # https://wiki.mozilla.org/Security/Server_Side_TLS + SSLProtocol all -SSLv3 -TLSv1 -TLSv1.1 + SSLCipherSuite ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384 + SSLHonorCipherOrder off + SSLCertificateFile /etc/apache2/ssl/{{ namespace }}/cert_{{ endpoint }} # See LP 1484489 - this is to support <= 2.4.7 and >= 2.4.8 SSLCertificateChainFile /etc/apache2/ssl/{{ namespace }}/cert_{{ endpoint }} diff --git a/hooks/charmhelpers/contrib/openstack/templates/openstack_https_frontend.conf b/hooks/charmhelpers/contrib/openstack/templates/openstack_https_frontend.conf index f614b3fa..530719e9 100644 --- a/hooks/charmhelpers/contrib/openstack/templates/openstack_https_frontend.conf +++ b/hooks/charmhelpers/contrib/openstack/templates/openstack_https_frontend.conf @@ -6,8 +6,14 @@ Listen {{ ext_port }} ServerName {{ endpoint }} SSLEngine on - SSLProtocol +TLSv1 +TLSv1.1 +TLSv1.2 - SSLCipherSuite HIGH:!RC4:!MD5:!aNULL:!eNULL:!EXP:!LOW:!MEDIUM + + # This section is based on Mozilla's recommendation + # as the "intermediate" profile as of July 7th, 2020. + # https://wiki.mozilla.org/Security/Server_Side_TLS + SSLProtocol all -SSLv3 -TLSv1 -TLSv1.1 + SSLCipherSuite ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384 + SSLHonorCipherOrder off + SSLCertificateFile /etc/apache2/ssl/{{ namespace }}/cert_{{ endpoint }} # See LP 1484489 - this is to support <= 2.4.7 and >= 2.4.8 SSLCertificateChainFile /etc/apache2/ssl/{{ namespace }}/cert_{{ endpoint }} diff --git a/hooks/charmhelpers/contrib/openstack/templates/section-ceph-bluestore-compression b/hooks/charmhelpers/contrib/openstack/templates/section-ceph-bluestore-compression new file mode 100644 index 00000000..a6430100 --- /dev/null +++ b/hooks/charmhelpers/contrib/openstack/templates/section-ceph-bluestore-compression @@ -0,0 +1,28 @@ +{# section header omitted as options can belong to multiple sections #} +{% if bluestore_compression_algorithm -%} +bluestore compression algorithm = {{ bluestore_compression_algorithm }} +{% endif -%} +{% if bluestore_compression_mode -%} +bluestore compression mode = {{ bluestore_compression_mode }} +{% endif -%} +{% if bluestore_compression_required_ratio -%} +bluestore compression required ratio = {{ bluestore_compression_required_ratio }} +{% endif -%} +{% if bluestore_compression_min_blob_size -%} +bluestore compression min blob size = {{ bluestore_compression_min_blob_size }} +{% endif -%} +{% if bluestore_compression_min_blob_size_hdd -%} +bluestore compression min blob size hdd = {{ bluestore_compression_min_blob_size_hdd }} +{% endif -%} +{% if bluestore_compression_min_blob_size_ssd -%} +bluestore compression min blob size ssd = {{ bluestore_compression_min_blob_size_ssd }} +{% endif -%} +{% if bluestore_compression_max_blob_size -%} +bluestore compression max blob size = {{ bluestore_compression_max_blob_size }} +{% endif -%} +{% if bluestore_compression_max_blob_size_hdd -%} +bluestore compression max blob size hdd = {{ bluestore_compression_max_blob_size_hdd }} +{% endif -%} +{% if bluestore_compression_max_blob_size_ssd -%} +bluestore compression max blob size ssd = {{ bluestore_compression_max_blob_size_ssd }} +{% endif -%} diff --git a/hooks/charmhelpers/contrib/openstack/utils.py b/hooks/charmhelpers/contrib/openstack/utils.py index f21625d3..0aa797c4 100644 --- a/hooks/charmhelpers/contrib/openstack/utils.py +++ b/hooks/charmhelpers/contrib/openstack/utils.py @@ -2241,10 +2241,13 @@ def inform_peers_unit_state(state, relation_name='cluster'): if state not in UNIT_STATES: raise ValueError( "Setting invalid state {} for unit".format(state)) + this_unit = local_unit() for r_id in relation_ids(relation_name): + juju_log('Telling peer behind relation {} that {} is {}'.format( + r_id, this_unit, state), 'DEBUG') relation_set(relation_id=r_id, relation_settings={ - get_peer_key(local_unit()): state}) + get_peer_key(this_unit): state}) def get_peers_unit_state(relation_name='cluster'): @@ -2276,8 +2279,10 @@ def are_peers_ready(relation_name='cluster'): :returns: Whether all units are ready. :rtype: bool """ - unit_states = get_peers_unit_state(relation_name) - return all(v == UNIT_READY for v in unit_states.values()) + unit_states = get_peers_unit_state(relation_name).values() + juju_log('{} peers are in the following states: {}'.format( + relation_name, unit_states), 'DEBUG') + return all(state == UNIT_READY for state in unit_states) def inform_peers_if_ready(check_unit_ready_func, relation_name='cluster'): @@ -2360,7 +2365,9 @@ def get_api_application_status(): app_state, msg = get_api_unit_status() if app_state == WORKLOAD_STATES.ACTIVE: if are_peers_ready(): - return WORKLOAD_STATES.ACTIVE, 'Application Ready' + msg = 'Application Ready' else: - return WORKLOAD_STATES.WAITING, 'Some units are not ready' + app_state = WORKLOAD_STATES.WAITING + msg = 'Some units are not ready' + juju_log(msg, 'DEBUG') return app_state, msg diff --git a/hooks/charmhelpers/contrib/storage/linux/ceph.py b/hooks/charmhelpers/contrib/storage/linux/ceph.py index 814d5c72..d9d43578 100644 --- a/hooks/charmhelpers/contrib/storage/linux/ceph.py +++ b/hooks/charmhelpers/contrib/storage/linux/ceph.py @@ -39,6 +39,7 @@ from subprocess import ( check_output, CalledProcessError, ) +from charmhelpers import deprecate from charmhelpers.core.hookenv import ( config, service_name, @@ -178,94 +179,293 @@ def send_osd_settings(): def validator(value, valid_type, valid_range=None): - """ - Used to validate these: http://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values + """Helper function for type validation. + + Used to validate these: + https://docs.ceph.com/docs/master/rados/operations/pools/#set-pool-values + https://docs.ceph.com/docs/master/rados/configuration/bluestore-config-ref/#inline-compression + Example input: validator(value=1, valid_type=int, valid_range=[0, 2]) + This says I'm testing value=1. It must be an int inclusive in [0,2] - :param value: The value to validate + :param value: The value to validate. + :type value: any :param valid_type: The type that value should be. + :type valid_type: any :param valid_range: A range of values that value can assume. - :return: + :type valid_range: Optional[Union[List,Tuple]] + :raises: AssertionError, ValueError """ - assert isinstance(value, valid_type), "{} is not a {}".format( - value, - valid_type) + assert isinstance(value, valid_type), ( + "{} is not a {}".format(value, valid_type)) if valid_range is not None: - assert isinstance(valid_range, list), \ - "valid_range must be a list, was given {}".format(valid_range) + assert isinstance( + valid_range, list) or isinstance(valid_range, tuple), ( + "valid_range must be of type List or Tuple, " + "was given {} of type {}" + .format(valid_range, type(valid_range))) # If we're dealing with strings if isinstance(value, six.string_types): - assert value in valid_range, \ - "{} is not in the list {}".format(value, valid_range) + assert value in valid_range, ( + "{} is not in the list {}".format(value, valid_range)) # Integer, float should have a min and max else: if len(valid_range) != 2: raise ValueError( - "Invalid valid_range list of {} for {}. " + "Invalid valid_range list of {} for {}. " "List must be [min,max]".format(valid_range, value)) - assert value >= valid_range[0], \ - "{} is less than minimum allowed value of {}".format( - value, valid_range[0]) - assert value <= valid_range[1], \ - "{} is greater than maximum allowed value of {}".format( - value, valid_range[1]) + assert value >= valid_range[0], ( + "{} is less than minimum allowed value of {}" + .format(value, valid_range[0])) + assert value <= valid_range[1], ( + "{} is greater than maximum allowed value of {}" + .format(value, valid_range[1])) class PoolCreationError(Exception): - """ - A custom error to inform the caller that a pool creation failed. Provides an error message + """A custom exception to inform the caller that a pool creation failed. + + Provides an error message """ def __init__(self, message): super(PoolCreationError, self).__init__(message) -class Pool(object): - """ - An object oriented approach to Ceph pool creation. This base class is inherited by ReplicatedPool and ErasurePool. - Do not call create() on this base class as it will not do anything. Instantiate a child class and call create(). - """ +class BasePool(object): + """An object oriented approach to Ceph pool creation. - def __init__(self, service, name): + This base class is inherited by ReplicatedPool and ErasurePool. Do not call + create() on this base class as it will raise an exception. + + Instantiate a child class and call create(). + """ + # Dictionary that maps pool operation properties to Tuples with valid type + # and valid range + op_validation_map = { + 'compression-algorithm': (str, ('lz4', 'snappy', 'zlib', 'zstd')), + 'compression-mode': (str, ('none', 'passive', 'aggressive', 'force')), + 'compression-required-ratio': (float, None), + 'compression-min-blob-size': (int, None), + 'compression-min-blob-size-hdd': (int, None), + 'compression-min-blob-size-ssd': (int, None), + 'compression-max-blob-size': (int, None), + 'compression-max-blob-size-hdd': (int, None), + 'compression-max-blob-size-ssd': (int, None), + } + + def __init__(self, service, name=None, percent_data=None, app_name=None, + op=None): + """Initialize BasePool object. + + Pool information is either initialized from individual keyword + arguments or from a individual CephBrokerRq operation Dict. + + :param service: The Ceph user name to run commands under. + :type service: str + :param name: Name of pool to operate on. + :type name: str + :param percent_data: The expected pool size in relation to all + available resources in the Ceph cluster. Will be + used to set the ``target_size_ratio`` pool + property. (default: 10.0) + :type percent_data: Optional[float] + :param app_name: Ceph application name, usually one of: + ('cephfs', 'rbd', 'rgw') (default: 'unknown') + :type app_name: Optional[str] + :param op: Broker request Op to compile pool data from. + :type op: Optional[Dict[str,any]] + :raises: KeyError + """ + # NOTE: Do not perform initialization steps that require live data from + # a running cluster here. The *Pool classes may be used for validation. self.service = service - self.name = name + self.nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 + self.op = op or {} + + if op: + # When initializing from op the `name` attribute is required and we + # will fail with KeyError if it is not provided. + self.name = op['name'] + self.percent_data = op.get('weight') + self.app_name = op.get('app-name') + else: + self.name = name + self.percent_data = percent_data + self.app_name = app_name + + # Set defaults for these if they are not provided + self.percent_data = self.percent_data or 10.0 + self.app_name = self.app_name or 'unknown' + + def validate(self): + """Check that value of supplied operation parameters are valid. + + :raises: ValueError + """ + for op_key, op_value in self.op.items(): + if op_key in self.op_validation_map and op_value is not None: + valid_type, valid_range = self.op_validation_map[op_key] + try: + validator(op_value, valid_type, valid_range) + except (AssertionError, ValueError) as e: + # Normalize on ValueError, also add information about which + # variable we had an issue with. + raise ValueError("'{}': {}".format(op_key, str(e))) + + def _create(self): + """Perform the pool creation, method MUST be overridden by child class. + """ + raise NotImplementedError + + def _post_create(self): + """Perform common post pool creation tasks. + + Note that pool properties subject to change during the lifetime of a + pool / deployment should go into the ``update`` method. + + Do not add calls for a specific pool type here, those should go into + one of the pool specific classes. + """ + if self.nautilus_or_later: + # Ensure we set the expected pool ratio + update_pool( + client=self.service, + pool=self.name, + settings={ + 'target_size_ratio': str( + self.percent_data / 100.0), + }) + try: + set_app_name_for_pool(client=self.service, + pool=self.name, + name=self.app_name) + except CalledProcessError: + log('Could not set app name for pool {}' + .format(self.name), + level=WARNING) + if 'pg_autoscaler' in enabled_manager_modules(): + try: + enable_pg_autoscale(self.service, self.name) + except CalledProcessError as e: + log('Could not configure auto scaling for pool {}: {}' + .format(self.name, e), + level=WARNING) - # Create the pool if it doesn't exist already - # To be implemented by subclasses def create(self): - pass + """Create pool and perform any post pool creation tasks. + + To allow for sharing of common code among pool specific classes the + processing has been broken out into the private methods ``_create`` + and ``_post_create``. + + Do not add any pool type specific handling here, that should go into + one of the pool specific classes. + """ + if not pool_exists(self.service, self.name): + self.validate() + self._create() + self._post_create() + self.update() + + def set_quota(self): + """Set a quota if requested. + + :raises: CalledProcessError + """ + max_bytes = self.op.get('max-bytes') + max_objects = self.op.get('max-objects') + if max_bytes or max_objects: + set_pool_quota(service=self.service, pool_name=self.name, + max_bytes=max_bytes, max_objects=max_objects) + + def set_compression(self): + """Set compression properties if requested. + + :raises: CalledProcessError + """ + compression_properties = { + key.replace('-', '_'): value + for key, value in self.op.items() + if key in ( + 'compression-algorithm', + 'compression-mode', + 'compression-required-ratio', + 'compression-min-blob-size', + 'compression-min-blob-size-hdd', + 'compression-min-blob-size-ssd', + 'compression-max-blob-size', + 'compression-max-blob-size-hdd', + 'compression-max-blob-size-ssd') and value} + if compression_properties: + update_pool(self.service, self.name, compression_properties) + + def update(self): + """Update properties for an already existing pool. + + Do not add calls for a specific pool type here, those should go into + one of the pool specific classes. + """ + self.validate() + self.set_quota() + self.set_compression() def add_cache_tier(self, cache_pool, mode): - """ - Adds a new cache tier to an existing pool. - :param cache_pool: six.string_types. The cache tier pool name to add. - :param mode: six.string_types. The caching mode to use for this pool. valid range = ["readonly", "writeback"] - :return: None + """Adds a new cache tier to an existing pool. + + :param cache_pool: The cache tier pool name to add. + :type cache_pool: str + :param mode: The caching mode to use for this pool. + valid range = ["readonly", "writeback"] + :type mode: str """ # Check the input types and values validator(value=cache_pool, valid_type=six.string_types) - validator(value=mode, valid_type=six.string_types, valid_range=["readonly", "writeback"]) + validator( + value=mode, valid_type=six.string_types, + valid_range=["readonly", "writeback"]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', self.name, cache_pool]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, mode]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', self.name, cache_pool]) - check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom']) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'add', self.name, cache_pool, + ]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'cache-mode', cache_pool, mode, + ]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'set-overlay', self.name, cache_pool, + ]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'pool', 'set', cache_pool, 'hit_set_type', 'bloom', + ]) def remove_cache_tier(self, cache_pool): - """ - Removes a cache tier from Ceph. Flushes all dirty objects from writeback pools and waits for that to complete. - :param cache_pool: six.string_types. The cache tier pool name to remove. - :return: None + """Removes a cache tier from Ceph. + + Flushes all dirty objects from writeback pools and waits for that to + complete. + + :param cache_pool: The cache tier pool name to remove. + :type cache_pool: str """ # read-only is easy, writeback is much harder mode = get_cache_mode(self.service, cache_pool) if mode == 'readonly': - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none']) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'cache-mode', cache_pool, 'none' + ]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'remove', self.name, cache_pool, + ]) elif mode == 'writeback': pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier', @@ -276,9 +476,15 @@ class Pool(object): check_call(pool_forward_cmd) # Flush the cache and wait for it to return - check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all']) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name]) - check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool]) + check_call([ + 'rados', '--id', self.service, + '-p', cache_pool, 'cache-flush-evict-all']) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'remove-overlay', self.name]) + check_call([ + 'ceph', '--id', self.service, + 'osd', 'tier', 'remove', self.name, cache_pool]) def get_pgs(self, pool_size, percent_data=DEFAULT_POOL_WEIGHT, device_class=None): @@ -305,19 +511,23 @@ class Pool(object): selected for the specific rule, rather it is left to the user to tune in the form of 'expected-osd-count' config option. - :param pool_size: int. pool_size is either the number of replicas for + :param pool_size: pool_size is either the number of replicas for replicated pools or the K+M sum for erasure coded pools - :param percent_data: float. the percentage of data that is expected to + :type pool_size: int + :param percent_data: the percentage of data that is expected to be contained in the pool for the specific OSD set. Default value is to assume 10% of the data is for this pool, which is a relatively low % of the data but allows for the pg_num to be increased. NOTE: the default is primarily to handle the scenario where related charms requiring pools has not been upgraded to include an update to indicate their relative usage of the pools. - :param device_class: str. class of storage to use for basis of pgs + :type percent_data: float + :param device_class: class of storage to use for basis of pgs calculation; ceph supports nvme, ssd and hdd by default based on presence of devices of each type in the deployment. - :return: int. The number of pgs to use. + :type device_class: str + :returns: The number of pgs to use. + :rtype: int """ # Note: This calculation follows the approach that is provided @@ -357,7 +567,8 @@ class Pool(object): return LEGACY_PG_COUNT percent_data /= 100.0 - target_pgs_per_osd = config('pgs-per-osd') or DEFAULT_PGS_PER_OSD_TARGET + target_pgs_per_osd = config( + 'pgs-per-osd') or DEFAULT_PGS_PER_OSD_TARGET num_pg = (target_pgs_per_osd * osd_count * percent_data) // pool_size # NOTE: ensure a sane minimum number of PGS otherwise we don't get any @@ -380,147 +591,174 @@ class Pool(object): return int(nearest) -class ReplicatedPool(Pool): - def __init__(self, service, name, pg_num=None, replicas=2, - percent_data=10.0, app_name=None): - super(ReplicatedPool, self).__init__(service=service, name=name) - self.replicas = replicas - self.percent_data = percent_data - if pg_num: +class Pool(BasePool): + """Compability shim for any descendents external to this library.""" + + @deprecate( + 'The ``Pool`` baseclass has been replaced by ``BasePool`` class.') + def __init__(self, service, name): + super(Pool, self).__init__(service, name=name) + + def create(self): + pass + + +class ReplicatedPool(BasePool): + def __init__(self, service, name=None, pg_num=None, replicas=None, + percent_data=None, app_name=None, op=None): + """Initialize ReplicatedPool object. + + Pool information is either initialized from individual keyword + arguments or from a individual CephBrokerRq operation Dict. + + Please refer to the docstring of the ``BasePool`` class for + documentation of the common parameters. + + :param pg_num: Express wish for number of Placement Groups (this value + is subject to validation against a running cluster prior + to use to avoid creating a pool with too many PGs) + :type pg_num: int + :param replicas: Number of copies there should be of each object added + to this replicated pool. + :type replicas: int + :raises: KeyError + """ + # NOTE: Do not perform initialization steps that require live data from + # a running cluster here. The *Pool classes may be used for validation. + + # The common parameters are handled in our parents initializer + super(ReplicatedPool, self).__init__( + service=service, name=name, percent_data=percent_data, + app_name=app_name, op=op) + + if op: + # When initializing from op `replicas` is a required attribute, and + # we will fail with KeyError if it is not provided. + self.replicas = op['replicas'] + self.pg_num = op.get('pg_num') + else: + self.replicas = replicas or 2 + self.pg_num = pg_num + + def _create(self): + # Do extra validation on pg_num with data from live cluster + if self.pg_num: # Since the number of placement groups were specified, ensure # that there aren't too many created. max_pgs = self.get_pgs(self.replicas, 100.0) - self.pg_num = min(pg_num, max_pgs) + self.pg_num = min(self.pg_num, max_pgs) else: - self.pg_num = self.get_pgs(self.replicas, percent_data) - if app_name: - self.app_name = app_name + self.pg_num = self.get_pgs(self.replicas, self.percent_data) + + # Create it + if self.nautilus_or_later: + cmd = [ + 'ceph', '--id', self.service, 'osd', 'pool', 'create', + '--pg-num-min={}'.format( + min(AUTOSCALER_DEFAULT_PGS, self.pg_num) + ), + self.name, str(self.pg_num) + ] else: - self.app_name = 'unknown' + cmd = [ + 'ceph', '--id', self.service, 'osd', 'pool', 'create', + self.name, str(self.pg_num) + ] + check_call(cmd) - def create(self): - if not pool_exists(self.service, self.name): - nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 - # Create it - if nautilus_or_later: - cmd = [ - 'ceph', '--id', self.service, 'osd', 'pool', 'create', - '--pg-num-min={}'.format( - min(AUTOSCALER_DEFAULT_PGS, self.pg_num) - ), - self.name, str(self.pg_num) - ] - else: - cmd = [ - 'ceph', '--id', self.service, 'osd', 'pool', 'create', - self.name, str(self.pg_num) - ] - - try: - check_call(cmd) - # Set the pool replica size - update_pool(client=self.service, - pool=self.name, - settings={'size': str(self.replicas)}) - if nautilus_or_later: - # Ensure we set the expected pool ratio - update_pool(client=self.service, - pool=self.name, - settings={'target_size_ratio': str(self.percent_data / 100.0)}) - try: - set_app_name_for_pool(client=self.service, - pool=self.name, - name=self.app_name) - except CalledProcessError: - log('Could not set app name for pool {}'.format(self.name), level=WARNING) - if 'pg_autoscaler' in enabled_manager_modules(): - try: - enable_pg_autoscale(self.service, self.name) - except CalledProcessError as e: - log('Could not configure auto scaling for pool {}: {}'.format( - self.name, e), level=WARNING) - except CalledProcessError: - raise + def _post_create(self): + # Set the pool replica size + update_pool(client=self.service, + pool=self.name, + settings={'size': str(self.replicas)}) + # Perform other common post pool creation tasks + super(ReplicatedPool, self)._post_create() -# Default jerasure erasure coded pool -class ErasurePool(Pool): - def __init__(self, service, name, erasure_code_profile="default", - percent_data=10.0, app_name=None): - super(ErasurePool, self).__init__(service=service, name=name) - self.erasure_code_profile = erasure_code_profile - self.percent_data = percent_data - if app_name: - self.app_name = app_name +class ErasurePool(BasePool): + """Default jerasure erasure coded pool.""" + + def __init__(self, service, name=None, erasure_code_profile=None, + percent_data=None, app_name=None, op=None, + allow_ec_overwrites=False): + """Initialize ReplicatedPool object. + + Pool information is either initialized from individual keyword + arguments or from a individual CephBrokerRq operation Dict. + + Please refer to the docstring of the ``BasePool`` class for + documentation of the common parameters. + + :param erasure_code_profile: EC Profile to use (default: 'default') + :type erasure_code_profile: Optional[str] + """ + # NOTE: Do not perform initialization steps that require live data from + # a running cluster here. The *Pool classes may be used for validation. + + # The common parameters are handled in our parents initializer + super(ErasurePool, self).__init__( + service=service, name=name, percent_data=percent_data, + app_name=app_name, op=op) + + if op: + # Note that the different default when initializing from op stems + # from different handling of this in the `charms.ceph` library. + self.erasure_code_profile = op.get('erasure-profile', + 'default-canonical') else: - self.app_name = 'unknown' + # We keep the class default when initialized from keyword arguments + # to not break the API for any other consumers. + self.erasure_code_profile = erasure_code_profile or 'default' - def create(self): - if not pool_exists(self.service, self.name): - # Try to find the erasure profile information in order to properly - # size the number of placement groups. The size of an erasure - # coded placement group is calculated as k+m. - erasure_profile = get_erasure_profile(self.service, - self.erasure_code_profile) + self.allow_ec_overwrites = allow_ec_overwrites - # Check for errors - if erasure_profile is None: - msg = ("Failed to discover erasure profile named " - "{}".format(self.erasure_code_profile)) - log(msg, level=ERROR) - raise PoolCreationError(msg) - if 'k' not in erasure_profile or 'm' not in erasure_profile: - # Error - msg = ("Unable to find k (data chunks) or m (coding chunks) " - "in erasure profile {}".format(erasure_profile)) - log(msg, level=ERROR) - raise PoolCreationError(msg) + def _create(self): + # Try to find the erasure profile information in order to properly + # size the number of placement groups. The size of an erasure + # coded placement group is calculated as k+m. + erasure_profile = get_erasure_profile(self.service, + self.erasure_code_profile) - k = int(erasure_profile['k']) - m = int(erasure_profile['m']) - pgs = self.get_pgs(k + m, self.percent_data) - nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 - # Create it - if nautilus_or_later: - cmd = [ - 'ceph', '--id', self.service, 'osd', 'pool', 'create', - '--pg-num-min={}'.format( - min(AUTOSCALER_DEFAULT_PGS, pgs) - ), - self.name, str(pgs), str(pgs), - 'erasure', self.erasure_code_profile - ] - else: - cmd = [ - 'ceph', '--id', self.service, 'osd', 'pool', 'create', - self.name, str(pgs), str(pgs), - 'erasure', self.erasure_code_profile - ] + # Check for errors + if erasure_profile is None: + msg = ("Failed to discover erasure profile named " + "{}".format(self.erasure_code_profile)) + log(msg, level=ERROR) + raise PoolCreationError(msg) + if 'k' not in erasure_profile or 'm' not in erasure_profile: + # Error + msg = ("Unable to find k (data chunks) or m (coding chunks) " + "in erasure profile {}".format(erasure_profile)) + log(msg, level=ERROR) + raise PoolCreationError(msg) - try: - check_call(cmd) - try: - set_app_name_for_pool(client=self.service, - pool=self.name, - name=self.app_name) - except CalledProcessError: - log('Could not set app name for pool {}'.format(self.name), level=WARNING) - if nautilus_or_later: - # Ensure we set the expected pool ratio - update_pool(client=self.service, - pool=self.name, - settings={'target_size_ratio': str(self.percent_data / 100.0)}) - if 'pg_autoscaler' in enabled_manager_modules(): - try: - enable_pg_autoscale(self.service, self.name) - except CalledProcessError as e: - log('Could not configure auto scaling for pool {}: {}'.format( - self.name, e), level=WARNING) - except CalledProcessError: - raise + k = int(erasure_profile['k']) + m = int(erasure_profile['m']) + pgs = self.get_pgs(k + m, self.percent_data) + self.nautilus_or_later = cmp_pkgrevno('ceph-common', '14.2.0') >= 0 + # Create it + if self.nautilus_or_later: + cmd = [ + 'ceph', '--id', self.service, 'osd', 'pool', 'create', + '--pg-num-min={}'.format( + min(AUTOSCALER_DEFAULT_PGS, pgs) + ), + self.name, str(pgs), str(pgs), + 'erasure', self.erasure_code_profile + ] + else: + cmd = [ + 'ceph', '--id', self.service, 'osd', 'pool', 'create', + self.name, str(pgs), str(pgs), + 'erasure', self.erasure_code_profile + ] + check_call(cmd) - """Get an existing erasure code profile if it already exists. - Returns json formatted output""" + def _post_create(self): + super(ErasurePool, self)._post_create() + if self.allow_ec_overwrites: + update_pool(self.service, self.name, + {'allow_ec_overwrites': 'true'}) def enabled_manager_modules(): @@ -541,22 +779,28 @@ def enabled_manager_modules(): def enable_pg_autoscale(service, pool_name): - """ - Enable Ceph's PG autoscaler for the specified pool. + """Enable Ceph's PG autoscaler for the specified pool. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types. The name of the pool to enable sutoscaling on - :raise: CalledProcessError if the command fails + :param service: The Ceph user name to run the command under + :type service: str + :param pool_name: The name of the pool to enable sutoscaling on + :type pool_name: str + :raises: CalledProcessError if the command fails """ - check_call(['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, 'pg_autoscale_mode', 'on']) + check_call([ + 'ceph', '--id', service, + 'osd', 'pool', 'set', pool_name, 'pg_autoscale_mode', 'on']) def get_mon_map(service): - """ - Returns the current monitor map. - :param service: six.string_types. The Ceph user name to run the command under - :return: json string. :raise: ValueError if the monmap fails to parse. - Also raises CalledProcessError if our ceph command fails + """Return the current monitor map. + + :param service: The Ceph user name to run the command under + :type service: str + :returns: Dictionary with monitor map data + :rtype: Dict[str,any] + :raises: ValueError if the monmap fails to parse, CalledProcessError if our + ceph command fails. """ try: mon_status = check_output(['ceph', '--id', service, @@ -576,17 +820,16 @@ def get_mon_map(service): def hash_monitor_names(service): - """ + """Get a sorted list of monitor hashes in ascending order. + Uses the get_mon_map() function to get information about the monitor - cluster. - Hash the name of each monitor. Return a sorted list of monitor hashes - in an ascending order. - :param service: six.string_types. The Ceph user name to run the command under - :rtype : dict. json dict of monitor name, ip address and rank - example: { - 'name': 'ip-172-31-13-165', - 'rank': 0, - 'addr': '172.31.13.165:6789/0'} + cluster. Hash the name of each monitor. + + :param service: The Ceph user name to run the command under. + :type service: str + :returns: a sorted list of monitor hashes in an ascending order. + :rtype : List[str] + :raises: CalledProcessError, ValueError """ try: hash_list = [] @@ -603,46 +846,56 @@ def hash_monitor_names(service): def monitor_key_delete(service, key): - """ - Delete a key and value pair from the monitor cluster - :param service: six.string_types. The Ceph user name to run the command under + """Delete a key and value pair from the monitor cluster. + Deletes a key value pair on the monitor cluster. - :param key: six.string_types. The key to delete. + + :param service: The Ceph user name to run the command under + :type service: str + :param key: The key to delete. + :type key: str + :raises: CalledProcessError """ try: check_output( ['ceph', '--id', service, 'config-key', 'del', str(key)]) except CalledProcessError as e: - log("Monitor config-key put failed with message: {}".format( - e.output)) + log("Monitor config-key put failed with message: {}" + .format(e.output)) raise def monitor_key_set(service, key, value): - """ - Sets a key value pair on the monitor cluster. - :param service: six.string_types. The Ceph user name to run the command under - :param key: six.string_types. The key to set. - :param value: The value to set. This will be converted to a string - before setting + """Set a key value pair on the monitor cluster. + + :param service: The Ceph user name to run the command under. + :type service str + :param key: The key to set. + :type key: str + :param value: The value to set. This will be coerced into a string. + :type value: str + :raises: CalledProcessError """ try: check_output( ['ceph', '--id', service, 'config-key', 'put', str(key), str(value)]) except CalledProcessError as e: - log("Monitor config-key put failed with message: {}".format( - e.output)) + log("Monitor config-key put failed with message: {}" + .format(e.output)) raise def monitor_key_get(service, key): - """ - Gets the value of an existing key in the monitor cluster. - :param service: six.string_types. The Ceph user name to run the command under - :param key: six.string_types. The key to search for. + """Get the value of an existing key in the monitor cluster. + + :param service: The Ceph user name to run the command under + :type service: str + :param key: The key to search for. + :type key: str :return: Returns the value of that key or None if not found. + :rtype: Optional[str] """ try: output = check_output( @@ -650,19 +903,21 @@ def monitor_key_get(service, key): 'config-key', 'get', str(key)]).decode('UTF-8') return output except CalledProcessError as e: - log("Monitor config-key get failed with message: {}".format( - e.output)) + log("Monitor config-key get failed with message: {}" + .format(e.output)) return None def monitor_key_exists(service, key): - """ - Searches for the existence of a key in the monitor cluster. - :param service: six.string_types. The Ceph user name to run the command under - :param key: six.string_types. The key to search for - :return: Returns True if the key exists, False if not and raises an - exception if an unknown error occurs. :raise: CalledProcessError if - an unknown error occurs + """Search for existence of key in the monitor cluster. + + :param service: The Ceph user name to run the command under. + :type service: str + :param key: The key to search for. + :type key: str + :return: Returns True if the key exists, False if not. + :rtype: bool + :raises: CalledProcessError if an unknown error occurs. """ try: check_call( @@ -675,16 +930,20 @@ def monitor_key_exists(service, key): if e.returncode == errno.ENOENT: return False else: - log("Unknown error from ceph config-get exists: {} {}".format( - e.returncode, e.output)) + log("Unknown error from ceph config-get exists: {} {}" + .format(e.returncode, e.output)) raise def get_erasure_profile(service, name): - """ - :param service: six.string_types. The Ceph user name to run the command under - :param name: - :return: + """Get an existing erasure code profile if it exists. + + :param service: The Ceph user name to run the command under. + :type service: str + :param name: Name of profile. + :type name: str + :returns: Dictionary with profile data. + :rtype: Optional[Dict[str]] """ try: out = check_output(['ceph', '--id', service, @@ -698,54 +957,61 @@ def get_erasure_profile(service, name): def pool_set(service, pool_name, key, value): + """Sets a value for a RADOS pool in ceph. + + :param service: The Ceph user name to run the command under. + :type service: str + :param pool_name: Name of pool to set property on. + :type pool_name: str + :param key: Property key. + :type key: str + :param value: Value, will be coerced into str and shifted to lowercase. + :type value: str + :raises: CalledProcessError """ - Sets a value for a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param key: six.string_types - :param value: - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, - str(value).lower()] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'set', pool_name, key, str(value).lower()] + check_call(cmd) def snapshot_pool(service, pool_name, snapshot_name): + """Snapshots a RADOS pool in Ceph. + + :param service: The Ceph user name to run the command under. + :type service: str + :param pool_name: Name of pool to snapshot. + :type pool_name: str + :param snapshot_name: Name of snapshot to create. + :type snapshot_name: str + :raises: CalledProcessError """ - Snapshots a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param snapshot_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, snapshot_name] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'mksnap', pool_name, snapshot_name] + check_call(cmd) def remove_pool_snapshot(service, pool_name, snapshot_name): + """Remove a snapshot from a RADOS pool in Ceph. + + :param service: The Ceph user name to run the command under. + :type service: str + :param pool_name: Name of pool to remove snapshot from. + :type pool_name: str + :param snapshot_name: Name of snapshot to remove. + :type snapshot_name: str + :raises: CalledProcessError """ - Remove a snapshot from a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :param snapshot_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, snapshot_name] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'rmsnap', pool_name, snapshot_name] + check_call(cmd) def set_pool_quota(service, pool_name, max_bytes=None, max_objects=None): - """ + """Set byte quota on a RADOS pool in Ceph. + :param service: The Ceph user name to run the command under :type service: str :param pool_name: Name of pool @@ -756,7 +1022,9 @@ def set_pool_quota(service, pool_name, max_bytes=None, max_objects=None): :type max_objects: int :raises: subprocess.CalledProcessError """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name] + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'set-quota', pool_name] if max_bytes: cmd = cmd + ['max_bytes', str(max_bytes)] if max_objects: @@ -765,119 +1033,216 @@ def set_pool_quota(service, pool_name, max_bytes=None, max_objects=None): def remove_pool_quota(service, pool_name): + """Remove byte quota on a RADOS pool in Ceph. + + :param service: The Ceph user name to run the command under. + :type service: str + :param pool_name: Name of pool to remove quota from. + :type pool_name: str + :raises: CalledProcessError """ - Set a byte quota on a RADOS pool in ceph. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0'] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'set-quota', pool_name, 'max_bytes', '0'] + check_call(cmd) def remove_erasure_profile(service, profile_name): + """Remove erasure code profile. + + :param service: The Ceph user name to run the command under + :type service: str + :param profile_name: Name of profile to remove. + :type profile_name: str + :raises: CalledProcessError """ - Create a new erasure code profile if one does not already exist for it. Updates - the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ - for more details - :param service: six.string_types. The Ceph user name to run the command under - :param profile_name: six.string_types - :return: None. Can raise CalledProcessError - """ - cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm', - profile_name] - try: - check_call(cmd) - except CalledProcessError: - raise + cmd = [ + 'ceph', '--id', service, + 'osd', 'erasure-code-profile', 'rm', profile_name] + check_call(cmd) -def create_erasure_profile(service, profile_name, erasure_plugin_name='jerasure', - failure_domain='host', +def create_erasure_profile(service, profile_name, + erasure_plugin_name='jerasure', + failure_domain=None, data_chunks=2, coding_chunks=1, locality=None, durability_estimator=None, - device_class=None): - """ - Create a new erasure code profile if one does not already exist for it. Updates - the profile if it exists. Please see http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ - for more details - :param service: six.string_types. The Ceph user name to run the command under - :param profile_name: six.string_types - :param erasure_plugin_name: six.string_types - :param failure_domain: six.string_types. One of ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', - 'room', 'root', 'row']) - :param data_chunks: int - :param coding_chunks: int - :param locality: int - :param durability_estimator: int - :param device_class: six.string_types - :return: None. Can raise CalledProcessError - """ - # Ensure this failure_domain is allowed by Ceph - validator(failure_domain, six.string_types, - ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row']) + helper_chunks=None, + scalar_mds=None, + crush_locality=None, + device_class=None, + erasure_plugin_technique=None): + """Create a new erasure code profile if one does not already exist for it. - cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', profile_name, - 'plugin=' + erasure_plugin_name, 'k=' + str(data_chunks), 'm=' + str(coding_chunks) - ] - if locality is not None and durability_estimator is not None: - raise ValueError("create_erasure_profile should be called with k, m and one of l or c but not both.") + Updates the profile if it exists. Please refer to [0] for more details. + + 0: http://docs.ceph.com/docs/master/rados/operations/erasure-code-profile/ + + :param service: The Ceph user name to run the command under. + :type service: str + :param profile_name: Name of profile. + :type profile_name: str + :param erasure_plugin_name: Erasure code plugin. + :type erasure_plugin_name: str + :param failure_domain: Failure domain, one of: + ('chassis', 'datacenter', 'host', 'osd', 'pdu', + 'pod', 'rack', 'region', 'room', 'root', 'row'). + :type failure_domain: str + :param data_chunks: Number of data chunks. + :type data_chunks: int + :param coding_chunks: Number of coding chunks. + :type coding_chunks: int + :param locality: Locality. + :type locality: int + :param durability_estimator: Durability estimator. + :type durability_estimator: int + :param helper_chunks: int + :type helper_chunks: int + :param device_class: Restrict placement to devices of specific class. + :type device_class: str + :param scalar_mds: one of ['isa', 'jerasure', 'shec'] + :type scalar_mds: str + :param crush_locality: LRC locality faulure domain, one of: + ('chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', + 'rack', 'region', 'room', 'root', 'row') or unset. + :type crush_locaity: str + :param erasure_plugin_technique: Coding technique for EC plugin + :type erasure_plugin_technique: str + :return: None. Can raise CalledProcessError, ValueError or AssertionError + """ + plugin_techniques = { + 'jerasure': [ + 'reed_sol_van', + 'reed_sol_r6_op', + 'cauchy_orig', + 'cauchy_good', + 'liberation', + 'blaum_roth', + 'liber8tion' + ], + 'lrc': [], + 'isa': [ + 'reed_sol_van', + 'cauchy', + ], + 'shec': [ + 'single', + 'multiple' + ], + 'clay': [], + } + failure_domains = [ + 'chassis', 'datacenter', + 'host', 'osd', + 'pdu', 'pod', + 'rack', 'region', + 'room', 'root', + 'row', + ] + device_classes = [ + 'ssd', + 'hdd', + 'nvme' + ] + + validator(erasure_plugin_name, six.string_types, + list(plugin_techniques.keys())) + + cmd = [ + 'ceph', '--id', service, + 'osd', 'erasure-code-profile', 'set', profile_name, + 'plugin={}'.format(erasure_plugin_name), + 'k={}'.format(str(data_chunks)), + 'm={}'.format(str(coding_chunks)), + ] + + if erasure_plugin_technique: + validator(erasure_plugin_technique, six.string_types, + plugin_techniques[erasure_plugin_name]) + cmd.append('technique={}'.format(erasure_plugin_technique)) luminous_or_later = cmp_pkgrevno('ceph-common', '12.0.0') >= 0 - # failure_domain changed in luminous - if luminous_or_later: - cmd.append('crush-failure-domain=' + failure_domain) - else: - cmd.append('ruleset-failure-domain=' + failure_domain) + + # Set failure domain from options if not provided in args + if not failure_domain and config('customize-failure-domain'): + # Defaults to 'host' so just need to deal with + # setting 'rack' if feature is enabled + failure_domain = 'rack' + + if failure_domain: + validator(failure_domain, six.string_types, failure_domains) + # failure_domain changed in luminous + if luminous_or_later: + cmd.append('crush-failure-domain={}'.format(failure_domain)) + else: + cmd.append('ruleset-failure-domain={}'.format(failure_domain)) # device class new in luminous if luminous_or_later and device_class: + validator(device_class, six.string_types, device_classes) cmd.append('crush-device-class={}'.format(device_class)) else: log('Skipping device class configuration (ceph < 12.0.0)', level=DEBUG) # Add plugin specific information - if locality is not None: - # For local erasure codes - cmd.append('l=' + str(locality)) - if durability_estimator is not None: - # For Shec erasure codes - cmd.append('c=' + str(durability_estimator)) + if erasure_plugin_name == 'lrc': + # LRC mandatory configuration + if locality: + cmd.append('l={}'.format(str(locality))) + else: + raise ValueError("locality must be provided for lrc plugin") + # LRC optional configuration + if crush_locality: + validator(crush_locality, six.string_types, failure_domains) + cmd.append('crush-locality={}'.format(crush_locality)) + + if erasure_plugin_name == 'shec': + # SHEC optional configuration + if durability_estimator: + cmd.append('c={}'.format((durability_estimator))) + + if erasure_plugin_name == 'clay': + # CLAY optional configuration + if helper_chunks: + cmd.append('d={}'.format(str(helper_chunks))) + if scalar_mds: + cmd.append('scalar-mds={}'.format(scalar_mds)) if erasure_profile_exists(service, profile_name): cmd.append('--force') - try: - check_call(cmd) - except CalledProcessError: - raise + check_call(cmd) def rename_pool(service, old_name, new_name): - """ - Rename a Ceph pool from old_name to new_name - :param service: six.string_types. The Ceph user name to run the command under - :param old_name: six.string_types - :param new_name: six.string_types - :return: None + """Rename a Ceph pool from old_name to new_name. + + :param service: The Ceph user name to run the command under. + :type service: str + :param old_name: Name of pool subject to rename. + :type old_name: str + :param new_name: Name to rename pool to. + :type new_name: str """ validator(value=old_name, valid_type=six.string_types) validator(value=new_name, valid_type=six.string_types) - cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name] + cmd = [ + 'ceph', '--id', service, + 'osd', 'pool', 'rename', old_name, new_name] check_call(cmd) def erasure_profile_exists(service, name): - """ - Check to see if an Erasure code profile already exists. - :param service: six.string_types. The Ceph user name to run the command under - :param name: six.string_types - :return: int or None + """Check to see if an Erasure code profile already exists. + + :param service: The Ceph user name to run the command under + :type service: str + :param name: Name of profile to look for. + :type name: str + :returns: True if it exists, False otherwise. + :rtype: bool """ validator(value=name, valid_type=six.string_types) try: @@ -890,11 +1255,14 @@ def erasure_profile_exists(service, name): def get_cache_mode(service, pool_name): - """ - Find the current caching mode of the pool_name given. - :param service: six.string_types. The Ceph user name to run the command under - :param pool_name: six.string_types - :return: int or None + """Find the current caching mode of the pool_name given. + + :param service: The Ceph user name to run the command under + :type service: str + :param pool_name: Name of pool. + :type pool_name: str + :returns: Current cache mode. + :rtype: Optional[int] """ validator(value=service, valid_type=six.string_types) validator(value=pool_name, valid_type=six.string_types) @@ -976,17 +1344,23 @@ def create_rbd_image(service, pool, image, sizemb): def update_pool(client, pool, settings): + """Update pool properties. + + :param client: Client/User-name to authenticate with. + :type client: str + :param pool: Name of pool to operate on + :type pool: str + :param settings: Dictionary with key/value pairs to set. + :type settings: Dict[str, str] + :raises: CalledProcessError + """ cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool] for k, v in six.iteritems(settings): - cmd.append(k) - cmd.append(v) - - check_call(cmd) + check_call(cmd + [k, v]) def set_app_name_for_pool(client, pool, name): - """ - Calls `osd pool application enable` for the specified pool name + """Calls `osd pool application enable` for the specified pool name :param client: Name of the ceph client to use :type client: str @@ -1043,8 +1417,7 @@ def _keyring_path(service): def add_key(service, key): - """ - Add a key to a keyring. + """Add a key to a keyring. Creates the keyring if it doesn't already exist. @@ -1288,13 +1661,33 @@ class CephBrokerRq(object): The API is versioned and defaults to version 1. """ - def __init__(self, api_version=1, request_id=None): - self.api_version = api_version - if request_id: - self.request_id = request_id + def __init__(self, api_version=1, request_id=None, raw_request_data=None): + """Initialize CephBrokerRq object. + + Builds a new empty request or rebuilds a request from on-wire JSON + data. + + :param api_version: API version for request (default: 1). + :type api_version: Optional[int] + :param request_id: Unique identifier for request. + (default: string representation of generated UUID) + :type request_id: Optional[str] + :param raw_request_data: JSON-encoded string to build request from. + :type raw_request_data: Optional[str] + :raises: KeyError + """ + if raw_request_data: + request_data = json.loads(raw_request_data) + self.api_version = request_data['api-version'] + self.request_id = request_data['request-id'] + self.set_ops(request_data['ops']) else: - self.request_id = str(uuid.uuid1()) - self.ops = [] + self.api_version = api_version + if request_id: + self.request_id = request_id + else: + self.request_id = str(uuid.uuid1()) + self.ops = [] def add_op(self, op): """Add an op if it is not already in the list. @@ -1336,12 +1729,119 @@ class CephBrokerRq(object): group=group, namespace=namespace, app_name=app_name, max_bytes=max_bytes, max_objects=max_objects) + # Use function parameters and docstring to define types in a compatible + # manner. + # + # NOTE: Our caller should always use a kwarg Dict when calling us so + # no need to maintain fixed order/position for parameters. Please keep them + # sorted by name when adding new ones. + def _partial_build_common_op_create(self, + app_name=None, + compression_algorithm=None, + compression_mode=None, + compression_required_ratio=None, + compression_min_blob_size=None, + compression_min_blob_size_hdd=None, + compression_min_blob_size_ssd=None, + compression_max_blob_size=None, + compression_max_blob_size_hdd=None, + compression_max_blob_size_ssd=None, + group=None, + max_bytes=None, + max_objects=None, + namespace=None, + weight=None): + """Build common part of a create pool operation. + + :param app_name: Tag pool with application name. Note that there is + certain protocols emerging upstream with regard to + meaningful application names to use. + Examples are 'rbd' and 'rgw'. + :type app_name: Optional[str] + :param compression_algorithm: Compressor to use, one of: + ('lz4', 'snappy', 'zlib', 'zstd') + :type compression_algorithm: Optional[str] + :param compression_mode: When to compress data, one of: + ('none', 'passive', 'aggressive', 'force') + :type compression_mode: Optional[str] + :param compression_required_ratio: Minimum compression ratio for data + chunk, if the requested ratio is not + achieved the compressed version will + be thrown away and the original + stored. + :type compression_required_ratio: Optional[float] + :param compression_min_blob_size: Chunks smaller than this are never + compressed (unit: bytes). + :type compression_min_blob_size: Optional[int] + :param compression_min_blob_size_hdd: Chunks smaller than this are not + compressed when destined to + rotational media (unit: bytes). + :type compression_min_blob_size_hdd: Optional[int] + :param compression_min_blob_size_ssd: Chunks smaller than this are not + compressed when destined to flash + media (unit: bytes). + :type compression_min_blob_size_ssd: Optional[int] + :param compression_max_blob_size: Chunks larger than this are broken + into N * compression_max_blob_size + chunks before being compressed + (unit: bytes). + :type compression_max_blob_size: Optional[int] + :param compression_max_blob_size_hdd: Chunks larger than this are + broken into + N * compression_max_blob_size_hdd + chunks before being compressed + when destined for rotational + media (unit: bytes) + :type compression_max_blob_size_hdd: Optional[int] + :param compression_max_blob_size_ssd: Chunks larger than this are + broken into + N * compression_max_blob_size_ssd + chunks before being compressed + when destined for flash media + (unit: bytes). + :type compression_max_blob_size_ssd: Optional[int] + :param group: Group to add pool to + :type group: Optional[str] + :param max_bytes: Maximum bytes quota to apply + :type max_bytes: Optional[int] + :param max_objects: Maximum objects quota to apply + :type max_objects: Optional[int] + :param namespace: Group namespace + :type namespace: Optional[str] + :param weight: The percentage of data that is expected to be contained + in the pool from the total available space on the OSDs. + Used to calculate number of Placement Groups to create + for pool. + :type weight: Optional[float] + :returns: Dictionary with kwarg name as key. + :rtype: Dict[str,any] + :raises: AssertionError + """ + return { + 'app-name': app_name, + 'compression-algorithm': compression_algorithm, + 'compression-mode': compression_mode, + 'compression-required-ratio': compression_required_ratio, + 'compression-min-blob-size': compression_min_blob_size, + 'compression-min-blob-size-hdd': compression_min_blob_size_hdd, + 'compression-min-blob-size-ssd': compression_min_blob_size_ssd, + 'compression-max-blob-size': compression_max_blob_size, + 'compression-max-blob-size-hdd': compression_max_blob_size_hdd, + 'compression-max-blob-size-ssd': compression_max_blob_size_ssd, + 'group': group, + 'max-bytes': max_bytes, + 'max-objects': max_objects, + 'group-namespace': namespace, + 'weight': weight, + } + def add_op_create_replicated_pool(self, name, replica_count=3, pg_num=None, - weight=None, group=None, namespace=None, - app_name=None, max_bytes=None, - max_objects=None): + **kwargs): """Adds an operation to create a replicated pool. + Refer to docstring for ``_partial_build_common_op_create`` for + documentation of keyword arguments. + :param name: Name of pool to create :type name: str :param replica_count: Number of copies Ceph should keep of your data. @@ -1349,66 +1849,114 @@ class CephBrokerRq(object): :param pg_num: Request specific number of Placement Groups to create for pool. :type pg_num: int - :param weight: The percentage of data that is expected to be contained - in the pool from the total available space on the OSDs. - Used to calculate number of Placement Groups to create - for pool. - :type weight: float - :param group: Group to add pool to - :type group: str - :param namespace: Group namespace - :type namespace: str - :param app_name: (Optional) Tag pool with application name. Note that - there is certain protocols emerging upstream with - regard to meaningful application names to use. - Examples are ``rbd`` and ``rgw``. - :type app_name: str - :param max_bytes: Maximum bytes quota to apply - :type max_bytes: int - :param max_objects: Maximum objects quota to apply - :type max_objects: int + :raises: AssertionError if provided data is of invalid type/range """ - if pg_num and weight: + if pg_num and kwargs.get('weight'): raise ValueError('pg_num and weight are mutually exclusive') - self.add_op({'op': 'create-pool', 'name': name, - 'replicas': replica_count, 'pg_num': pg_num, - 'weight': weight, 'group': group, - 'group-namespace': namespace, 'app-name': app_name, - 'max-bytes': max_bytes, 'max-objects': max_objects}) + op = { + 'op': 'create-pool', + 'name': name, + 'replicas': replica_count, + 'pg_num': pg_num, + } + op.update(self._partial_build_common_op_create(**kwargs)) + + # Initialize Pool-object to validate type and range of ops. + pool = ReplicatedPool('dummy-service', op=op) + pool.validate() + + self.add_op(op) def add_op_create_erasure_pool(self, name, erasure_profile=None, - weight=None, group=None, app_name=None, - max_bytes=None, max_objects=None): + allow_ec_overwrites=False, **kwargs): """Adds an operation to create a erasure coded pool. + Refer to docstring for ``_partial_build_common_op_create`` for + documentation of keyword arguments. + :param name: Name of pool to create :type name: str :param erasure_profile: Name of erasure code profile to use. If not set the ceph-mon unit handling the broker request will set its default value. :type erasure_profile: str - :param weight: The percentage of data that is expected to be contained - in the pool from the total available space on the OSDs. - :type weight: float - :param group: Group to add pool to - :type group: str - :param app_name: (Optional) Tag pool with application name. Note that - there is certain protocols emerging upstream with - regard to meaningful application names to use. - Examples are ``rbd`` and ``rgw``. - :type app_name: str - :param max_bytes: Maximum bytes quota to apply - :type max_bytes: int - :param max_objects: Maximum objects quota to apply - :type max_objects: int + :param allow_ec_overwrites: allow EC pools to be overriden + :type allow_ec_overwrites: bool + :raises: AssertionError if provided data is of invalid type/range """ - self.add_op({'op': 'create-pool', 'name': name, - 'pool-type': 'erasure', - 'erasure-profile': erasure_profile, - 'weight': weight, - 'group': group, 'app-name': app_name, - 'max-bytes': max_bytes, 'max-objects': max_objects}) + op = { + 'op': 'create-pool', + 'name': name, + 'pool-type': 'erasure', + 'erasure-profile': erasure_profile, + 'allow-ec-overwrites': allow_ec_overwrites, + } + op.update(self._partial_build_common_op_create(**kwargs)) + + # Initialize Pool-object to validate type and range of ops. + pool = ErasurePool('dummy-service', op) + pool.validate() + + self.add_op(op) + + def add_op_create_erasure_profile(self, name, + erasure_type='jerasure', + erasure_technique=None, + k=None, m=None, + failure_domain=None, + lrc_locality=None, + shec_durability_estimator=None, + clay_helper_chunks=None, + device_class=None, + clay_scalar_mds=None, + lrc_crush_locality=None): + """Adds an operation to create a erasure coding profile. + + :param name: Name of profile to create + :type name: str + :param erasure_type: Which of the erasure coding plugins should be used + :type erasure_type: string + :param erasure_technique: EC plugin technique to use + :type erasure_technique: string + :param k: Number of data chunks + :type k: int + :param m: Number of coding chunks + :type m: int + :param lrc_locality: Group the coding and data chunks into sets of size locality + (lrc plugin) + :type lrc_locality: int + :param durability_estimator: The number of parity chuncks each of which includes + a data chunk in its calculation range (shec plugin) + :type durability_estimator: int + :param helper_chunks: The number of helper chunks to use for recovery operations + (clay plugin) + :type: helper_chunks: int + :param failure_domain: Type of failure domain from Ceph bucket types + to be used + :type failure_domain: string + :param device_class: Device class to use for profile (ssd, hdd) + :type device_class: string + :param clay_scalar_mds: Plugin to use for CLAY layered construction + (jerasure|isa|shec) + :type clay_scaler_mds: string + :param lrc_crush_locality: Type of crush bucket in which set of chunks + defined by lrc_locality will be stored. + :type lrc_crush_locality: string + """ + self.add_op({'op': 'create-erasure-profile', + 'name': name, + 'k': k, + 'm': m, + 'l': lrc_locality, + 'c': shec_durability_estimator, + 'd': clay_helper_chunks, + 'erasure-type': erasure_type, + 'erasure-technique': erasure_technique, + 'failure-domain': failure_domain, + 'device-class': device_class, + 'scalar-mds': clay_scalar_mds, + 'crush-locality': lrc_crush_locality}) def set_ops(self, ops): """Set request ops to provided value. @@ -1522,18 +2070,15 @@ class CephBrokerRsp(object): def get_previous_request(rid): """Return the last ceph broker request sent on a given relation - @param rid: Relation id to query for request + :param rid: Relation id to query for request + :type rid: str + :returns: CephBrokerRq object or None if relation data not found. + :rtype: Optional[CephBrokerRq] """ - request = None broker_req = relation_get(attribute='broker_req', rid=rid, unit=local_unit()) if broker_req: - request_data = json.loads(broker_req) - request = CephBrokerRq(api_version=request_data['api-version'], - request_id=request_data['request-id']) - request.set_ops(request_data['ops']) - - return request + return CephBrokerRq(raw_request_data=broker_req) def get_request_states(request, relation='ceph'): diff --git a/hooks/charmhelpers/core/host.py b/hooks/charmhelpers/core/host.py index b33ac906..a785efdf 100644 --- a/hooks/charmhelpers/core/host.py +++ b/hooks/charmhelpers/core/host.py @@ -193,7 +193,7 @@ def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d", stopped = service_stop(service_name, **kwargs) upstart_file = os.path.join(init_dir, "{}.conf".format(service_name)) sysv_file = os.path.join(initd_dir, service_name) - if init_is_systemd(): + if init_is_systemd(service_name=service_name): service('disable', service_name) service('mask', service_name) elif os.path.exists(upstart_file): @@ -227,7 +227,7 @@ def service_resume(service_name, init_dir="/etc/init", """ upstart_file = os.path.join(init_dir, "{}.conf".format(service_name)) sysv_file = os.path.join(initd_dir, service_name) - if init_is_systemd(): + if init_is_systemd(service_name=service_name): service('unmask', service_name) service('enable', service_name) elif os.path.exists(upstart_file): @@ -257,7 +257,7 @@ def service(action, service_name, **kwargs): :param **kwargs: additional params to be passed to the service command in the form of key=value. """ - if init_is_systemd(): + if init_is_systemd(service_name=service_name): cmd = ['systemctl', action, service_name] else: cmd = ['service', service_name, action] @@ -281,7 +281,7 @@ def service_running(service_name, **kwargs): units (e.g. service ceph-osd status id=2). The kwargs are ignored in systemd services. """ - if init_is_systemd(): + if init_is_systemd(service_name=service_name): return service('is-active', service_name) else: if os.path.exists(_UPSTART_CONF.format(service_name)): @@ -311,8 +311,14 @@ def service_running(service_name, **kwargs): SYSTEMD_SYSTEM = '/run/systemd/system' -def init_is_systemd(): - """Return True if the host system uses systemd, False otherwise.""" +def init_is_systemd(service_name=None): + """ + Returns whether the host uses systemd for the specified service. + + @param Optional[str] service_name: specific name of service + """ + if str(service_name).startswith("snap."): + return True if lsb_release()['DISTRIB_CODENAME'] == 'trusty': return False return os.path.isdir(SYSTEMD_SYSTEM) diff --git a/lib/charms_ceph/broker.py b/lib/charms_ceph/broker.py index 15552cd8..d5c83891 100644 --- a/lib/charms_ceph/broker.py +++ b/lib/charms_ceph/broker.py @@ -155,25 +155,47 @@ def handle_create_erasure_profile(request, service): :param service: The ceph client to run the command under. :returns: dict. exit-code and reason if not 0 """ - # "local" | "shec" or it defaults to "jerasure" + # "isa" | "lrc" | "shec" | "clay" or it defaults to "jerasure" erasure_type = request.get('erasure-type') - # "host" | "rack" or it defaults to "host" # Any valid Ceph bucket + # dependent on erasure coding type + erasure_technique = request.get('erasure-technique') + # "host" | "rack" | ... failure_domain = request.get('failure-domain') name = request.get('name') # Binary Distribution Matrix (BDM) parameters bdm_k = request.get('k') bdm_m = request.get('m') + # LRC parameters bdm_l = request.get('l') + crush_locality = request.get('crush-locality') + # SHEC parameters + bdm_c = request.get('c') + # CLAY parameters + bdm_d = request.get('d') + scalar_mds = request.get('scalar-mds') + # Device Class + device_class = request.get('device-class') - if failure_domain not in CEPH_BUCKET_TYPES: + if failure_domain and failure_domain not in CEPH_BUCKET_TYPES: msg = "failure-domain must be one of {}".format(CEPH_BUCKET_TYPES) log(msg, level=ERROR) return {'exit-code': 1, 'stderr': msg} - create_erasure_profile(service=service, erasure_plugin_name=erasure_type, - profile_name=name, failure_domain=failure_domain, - data_chunks=bdm_k, coding_chunks=bdm_m, - locality=bdm_l) + create_erasure_profile(service=service, + erasure_plugin_name=erasure_type, + profile_name=name, + failure_domain=failure_domain, + data_chunks=bdm_k, + coding_chunks=bdm_m, + locality=bdm_l, + durability_estimator=bdm_d, + helper_chunks=bdm_c, + scalar_mds=scalar_mds, + crush_locality=crush_locality, + device_class=device_class, + erasure_plugin_technique=erasure_technique) + + return {'exit-code': 0} def handle_add_permissions_to_key(request, service): @@ -387,6 +409,7 @@ def handle_erasure_pool(request, service): max_objects = request.get('max-objects') weight = request.get('weight') group_name = request.get('group') + allow_ec_overwrites = request.get('allow-ec-overwrites') if erasure_profile is None: erasure_profile = "default-canonical" @@ -416,7 +439,9 @@ def handle_erasure_pool(request, service): pool = ErasurePool(service=service, name=pool_name, erasure_code_profile=erasure_profile, - percent_data=weight, app_name=app_name) + percent_data=weight, + app_name=app_name, + allow_ec_overwrites=allow_ec_overwrites) # Ok make the erasure pool if not pool_exists(service=service, name=pool_name): log("Creating pool '{}' (erasure_profile={})" diff --git a/lib/charms_ceph/utils.py b/lib/charms_ceph/utils.py index 1a51e7c5..72e6b921 100644 --- a/lib/charms_ceph/utils.py +++ b/lib/charms_ceph/utils.py @@ -2169,15 +2169,18 @@ def roll_monitor_cluster(new_version, upgrade_key): status_set('blocked', 'failed to upgrade monitor') -# TODO(jamespage): -# Mimic support will need to ensure that ceph-mgr daemons are also -# restarted during upgrades - probably through use of one of the -# high level systemd targets shipped by the packaging. -def upgrade_monitor(new_version): +# For E731 we can't assign a lambda, therefore, instead pass this. +def noop(): + pass + + +def upgrade_monitor(new_version, kick_function=None): """Upgrade the current ceph monitor to the new version :param new_version: String version to upgrade to. """ + if kick_function is None: + kick_function = noop current_version = get_version() status_set("maintenance", "Upgrading monitor") log("Current ceph version is {}".format(current_version)) @@ -2186,6 +2189,7 @@ def upgrade_monitor(new_version): # Needed to determine if whether to stop/start ceph-mgr luminous_or_later = cmp_pkgrevno('ceph-common', '12.2.0') >= 0 + kick_function() try: add_source(config('source'), config('key')) apt_update(fatal=True) @@ -2194,6 +2198,7 @@ def upgrade_monitor(new_version): err)) status_set("blocked", "Upgrade to {} failed".format(new_version)) sys.exit(1) + kick_function() try: if systemd(): service_stop('ceph-mon') @@ -2204,6 +2209,7 @@ def upgrade_monitor(new_version): else: service_stop('ceph-mon-all') apt_install(packages=determine_packages(), fatal=True) + kick_function() owner = ceph_user() @@ -2217,6 +2223,8 @@ def upgrade_monitor(new_version): group=owner, follow_links=True) + kick_function() + # Ensure that mon directory is user writable hostname = socket.gethostname() path = '/var/lib/ceph/mon/ceph-{}'.format(hostname) @@ -2257,13 +2265,22 @@ def lock_and_roll(upgrade_key, service, my_name, version): start_timestamp)) monitor_key_set(upgrade_key, "{}_{}_{}_start".format( service, my_name, version), start_timestamp) + + # alive indication: + alive_function = ( + lambda: monitor_key_set( + upgrade_key, "{}_{}_{}_alive" + .format(service, my_name, version), time.time())) + dog = WatchDog(kick_interval=3 * 60, + kick_function=alive_function) + log("Rolling") # This should be quick if service == 'osd': - upgrade_osd(version) + upgrade_osd(version, kick_function=dog.kick_the_dog) elif service == 'mon': - upgrade_monitor(version) + upgrade_monitor(version, kick_function=dog.kick_the_dog) else: log("Unknown service {}. Unable to upgrade".format(service), level=ERROR) @@ -2294,45 +2311,225 @@ def wait_on_previous_node(upgrade_key, service, previous_node, version): """ log("Previous node is: {}".format(previous_node)) - previous_node_finished = monitor_key_exists( - upgrade_key, - "{}_{}_{}_done".format(service, previous_node, version)) - - while previous_node_finished is False: - log("{} is not finished. Waiting".format(previous_node)) - # Has this node been trying to upgrade for longer than - # 10 minutes? - # If so then move on and consider that node dead. - - # NOTE: This assumes the clusters clocks are somewhat accurate - # If the hosts clock is really far off it may cause it to skip - # the previous node even though it shouldn't. - current_timestamp = time.time() - previous_node_start_time = monitor_key_get( + previous_node_started_f = ( + lambda: monitor_key_exists( upgrade_key, - "{}_{}_{}_start".format(service, previous_node, version)) - if (previous_node_start_time is not None and - ((current_timestamp - (10 * 60)) > - float(previous_node_start_time))): - # NOTE(jamespage): - # Previous node is probably dead as we've been waiting - # for 10 minutes - lets move on and upgrade - log("Waited 10 mins on node {}. current time: {} > " - "previous node start time: {} Moving on".format( - previous_node, - (current_timestamp - (10 * 60)), - previous_node_start_time)) - return - # NOTE(jamespage) - # Previous node has not started, or started less than - # 10 minutes ago - sleep a random amount of time and - # then check again. - wait_time = random.randrange(5, 30) - log('waiting for {} seconds'.format(wait_time)) - time.sleep(wait_time) - previous_node_finished = monitor_key_exists( + "{}_{}_{}_start".format(service, previous_node, version))) + previous_node_finished_f = ( + lambda: monitor_key_exists( upgrade_key, - "{}_{}_{}_done".format(service, previous_node, version)) + "{}_{}_{}_done".format(service, previous_node, version))) + previous_node_alive_time_f = ( + lambda: monitor_key_get( + upgrade_key, + "{}_{}_{}_alive".format(service, previous_node, version))) + + # wait for 30 minutes until the previous node starts. We don't proceed + # unless we get a start condition. + try: + WatchDog.wait_until(previous_node_started_f, timeout=30 * 60) + except WatchDog.WatchDogTimeoutException: + log("Waited for previous node to start for 30 minutes. " + "It didn't start, so may have a serious issue. Continuing with " + "upgrade of this node.", + level=WARNING) + return + + # keep the time it started from this nodes' perspective. + previous_node_started_at = time.time() + log("Detected that previous node {} has started. Time now: {}" + .format(previous_node, previous_node_started_at)) + + # Now wait for the node to complete. The node may optionally be kicking + # with the *_alive key, which allows this node to wait longer as it 'knows' + # the other node is proceeding. + try: + WatchDog.timed_wait(kicked_at_function=previous_node_alive_time_f, + complete_function=previous_node_finished_f, + wait_time=30 * 60, + compatibility_wait_time=10 * 60, + max_kick_interval=5 * 60) + except WatchDog.WatchDogDeadException: + # previous node was kicking, but timed out; log this condition and move + # on. + now = time.time() + waited = int((now - previous_node_started_at) / 60) + log("Previous node started, but has now not ticked for 5 minutes. " + "Waited total of {} mins on node {}. current time: {} > " + "previous node start time: {}. " + "Continuing with upgrade of this node." + .format(waited, previous_node, now, previous_node_started_at), + level=WARNING) + except WatchDog.WatchDogTimeoutException: + # previous node never kicked, or simply took too long; log this + # condition and move on. + now = time.time() + waited = int((now - previous_node_started_at) / 60) + log("Previous node is taking too long; assuming it has died." + "Waited {} mins on node {}. current time: {} > " + "previous node start time: {}. " + "Continuing with upgrade of this node." + .format(waited, previous_node, now, previous_node_started_at), + level=WARNING) + + +class WatchDog(object): + """Watch a dog; basically a kickable timer with a timeout between two async + units. + + The idea is that you have an overall timeout and then can kick that timeout + with intermediary hits, with a max time between those kicks allowed. + + Note that this watchdog doesn't rely on the clock of the other side; just + roughly when it detects when the other side started. All timings are based + on the local clock. + + The kicker will not 'kick' more often than a set interval, regardless of + how often the kick_the_dog() function is called. The kicker provides a + function (lambda: -> None) that is called when the kick interval is + reached. + + The waiter calls the static method with a check function + (lambda: -> Boolean) that indicates when the wait should be over and the + maximum interval to wait. e.g. 30 minutes with a 5 minute kick interval. + + So the waiter calls wait(f, 30, 3) and the kicker sets up a 3 minute kick + interval, or however long it is expected for the key to propagate and to + allow for other delays. + + There is a compatibility mode where if the otherside never kicks, then it + simply waits for the compatability timer. + """ + + class WatchDogDeadException(Exception): + pass + + class WatchDogTimeoutException(Exception): + pass + + def __init__(self, kick_interval=3 * 60, kick_function=None): + """Initialise a new WatchDog + + :param kick_interval: the interval when this side kicks the other in + seconds. + :type kick_interval: Int + :param kick_function: The function to call that does the kick. + :type kick_function: Callable[] + """ + self.start_time = time.time() + self.last_run_func = None + self.last_kick_at = None + self.kick_interval = kick_interval + self.kick_f = kick_function + + def kick_the_dog(self): + """Might call the kick_function if it's time. + + This function can be called as frequently as needed, but will run the + self.kick_function after kick_interval seconds have passed. + """ + now = time.time() + if (self.last_run_func is None or + (now - self.last_run_func > self.kick_interval)): + if self.kick_f is not None: + self.kick_f() + self.last_run_func = now + self.last_kick_at = now + + @staticmethod + def wait_until(wait_f, timeout=10 * 60): + """Wait for timeout seconds until the passed function return True. + + :param wait_f: The function to call that will end the wait. + :type wait_f: Callable[[], Boolean] + :param timeout: The time to wait in seconds. + :type timeout: int + """ + start_time = time.time() + while(not wait_f()): + now = time.time() + if now > start_time + timeout: + raise WatchDog.WatchDogTimeoutException() + wait_time = random.randrange(5, 30) + log('wait_until: waiting for {} seconds'.format(wait_time)) + time.sleep(wait_time) + + @staticmethod + def timed_wait(kicked_at_function, + complete_function, + wait_time=30 * 60, + compatibility_wait_time=10 * 60, + max_kick_interval=5 * 60): + """Wait a maximum time with an intermediate 'kick' time. + + This function will wait for max_kick_interval seconds unless the + kicked_at_function() call returns a time that is not older that + max_kick_interval (in seconds). i.e. the other side can signal that it + is still doing things during the max_kick_interval as long as it kicks + at least every max_kick_interval seconds. + + The maximum wait is "wait_time", but the otherside must keep kicking + during this period. + + The "compatibility_wait_time" is used if the other side never kicks + (i.e. the kicked_at_function() always returns None. In this case the + function wait up to "compatibility_wait_time". + + Note that the type of the return from the kicked_at_function is an + Optional[str], not a Float. The function will coerce this to a float + for the comparison. This represents the return value of + time.time() at the "other side". It's a string to simplify the + function obtaining the time value from the other side. + + The function raises WatchDogTimeoutException if either the + compatibility_wait_time or the wait_time are exceeded. + + The function raises WatchDogDeadException if the max_kick_interval is + exceeded. + + Note that it is possible that the first kick interval is extended to + compatibility_wait_time if the "other side" doesn't kick immediately. + The best solution is for the other side to kick early and often. + + :param kicked_at_function: The function to call to retrieve the time + that the other side 'kicked' at. None if the other side hasn't + kicked. + :type kicked_at_function: Callable[[], Optional[str]] + :param complete_function: The callable that returns True when done. + :type complete_function: Callable[[], Boolean] + :param wait_time: the maximum time to wait, even with kicks, in + seconds. + :type wait_time: int + :param compatibility_wait_time: The time to wait if no kicks are + received, in seconds. + :type compatibility_wait_time: int + :param max_kick_interval: The maximum time allowed between kicks before + the wait is over, in seconds: + :type max_kick_interval: int + :raises: WatchDog.WatchDogTimeoutException, + WatchDog.WatchDogDeadException + """ + start_time = time.time() + while True: + if complete_function(): + break + # the time when the waiting for unit last kicked. + kicked_at = kicked_at_function() + now = time.time() + if kicked_at is None: + # assume other end doesn't do alive kicks + if (now - start_time > compatibility_wait_time): + raise WatchDog.WatchDogTimeoutException() + else: + # other side is participating in kicks; must kick at least + # every 'max_kick_interval' to stay alive. + if (now - float(kicked_at) > max_kick_interval): + raise WatchDog.WatchDogDeadException() + if (now - start_time > wait_time): + raise WatchDog.WatchDogTimeoutException() + delay_time = random.randrange(5, 30) + log('waiting for {} seconds'.format(delay_time)) + time.sleep(delay_time) def get_upgrade_position(osd_sorted_list, match_name): @@ -2412,11 +2609,14 @@ def roll_osd_cluster(new_version, upgrade_key): status_set('blocked', 'failed to upgrade osd') -def upgrade_osd(new_version): +def upgrade_osd(new_version, kick_function=None): """Upgrades the current osd :param new_version: str. The new version to upgrade to """ + if kick_function is None: + kick_function = noop + current_version = get_version() status_set("maintenance", "Upgrading osd") log("Current ceph version is {}".format(current_version)) @@ -2431,10 +2631,13 @@ def upgrade_osd(new_version): status_set("blocked", "Upgrade to {} failed".format(new_version)) sys.exit(1) + kick_function() + try: # Upgrade the packages before restarting the daemons. status_set('maintenance', 'Upgrading packages to %s' % new_version) apt_install(packages=determine_packages(), fatal=True) + kick_function() # If the upgrade does not need an ownership update of any of the # directories in the osd service directory, then simply restart @@ -2458,13 +2661,16 @@ def upgrade_osd(new_version): os.listdir(CEPH_BASE_DIR)) non_osd_dirs = map(lambda x: os.path.join(CEPH_BASE_DIR, x), non_osd_dirs) - for path in non_osd_dirs: + for i, path in enumerate(non_osd_dirs): + if i % 100 == 0: + kick_function() update_owner(path) # Fast service restart wasn't an option because each of the OSD # directories need the ownership updated for all the files on # the OSD. Walk through the OSDs one-by-one upgrading the OSD. for osd_dir in _get_child_dirs(OSD_BASE_DIR): + kick_function() try: osd_num = _get_osd_num_from_dirname(osd_dir) _upgrade_single_osd(osd_num, osd_dir) diff --git a/unit_tests/test_ceph.py b/unit_tests/test_ceph.py index 6ef85dec..71318a8b 100644 --- a/unit_tests/test_ceph.py +++ b/unit_tests/test_ceph.py @@ -24,6 +24,7 @@ TO_PATCH = [ 'os', 'subprocess', 'mkdir', + 'service_name', ] @@ -31,6 +32,7 @@ class CephRadosGWCephTests(CharmTestCase): def setUp(self): super(CephRadosGWCephTests, self).setUp(ceph, TO_PATCH) self.config.side_effect = self.test_config.get + self.service_name.return_value = 'ceph-radosgw' def test_import_radosgw_key(self): self.os.path.exists.return_value = False @@ -138,6 +140,82 @@ class CephRadosGWCephTests(CharmTestCase): name='objects', permission='rwx') + @patch('charmhelpers.contrib.storage.linux.ceph.CephBrokerRq' + '.add_op_create_erasure_profile') + @patch('charmhelpers.contrib.storage.linux.ceph.CephBrokerRq' + '.add_op_create_erasure_pool') + @patch('charmhelpers.contrib.storage.linux.ceph.CephBrokerRq' + '.add_op_request_access_to_group') + @patch('charmhelpers.contrib.storage.linux.ceph.CephBrokerRq' + '.add_op_create_pool') + def test_create_rgw_pools_rq_no_prefix_ec(self, mock_broker, + mock_request_access, + mock_request_create_ec_pool, + mock_request_create_ec_profile): + self.test_config.set('rgw-lightweight-pool-pg-num', -1) + self.test_config.set('ceph-osd-replication-count', 3) + self.test_config.set('rgw-buckets-pool-weight', 19) + self.test_config.set('restrict-ceph-pools', True) + self.test_config.set('pool-type', 'erasure-coded') + self.test_config.set('ec-profile-k', 3) + self.test_config.set('ec-profile-m', 9) + self.test_config.set('ec-profile-technique', 'cauchy_good') + ceph.get_create_rgw_pools_rq(prefix=None) + mock_request_create_ec_profile.assert_called_once_with( + name='ceph-radosgw-profile', + k=3, m=9, + lrc_locality=None, + lrc_crush_locality=None, + shec_durability_estimator=None, + clay_helper_chunks=None, + clay_scalar_mds=None, + device_class=None, + erasure_type='jerasure', + erasure_technique='cauchy_good' + ) + mock_request_create_ec_pool.assert_has_calls([ + call(name='default.rgw.buckets.data', + erasure_profile='ceph-radosgw-profile', + weight=19, + group="objects", + app_name='rgw') + ]) + mock_broker.assert_has_calls([ + call(weight=0.10, replica_count=3, name='default.rgw.control', + group='objects', app_name='rgw'), + call(weight=0.10, replica_count=3, name='default.rgw.data.root', + group='objects', app_name='rgw'), + call(weight=0.10, replica_count=3, name='default.rgw.gc', + group='objects', app_name='rgw'), + call(weight=0.10, replica_count=3, name='default.rgw.log', + group='objects', app_name='rgw'), + call(weight=0.10, replica_count=3, name='default.rgw.intent-log', + group='objects', app_name='rgw'), + call(weight=0.10, replica_count=3, name='default.rgw.meta', + group='objects', app_name='rgw'), + call(weight=0.10, replica_count=3, name='default.rgw.usage', + group='objects', app_name='rgw'), + call(weight=0.10, replica_count=3, name='default.rgw.users.keys', + group='objects', app_name='rgw'), + call(weight=0.10, replica_count=3, name='default.rgw.users.email', + group='objects', app_name='rgw'), + call(weight=0.10, replica_count=3, name='default.rgw.users.swift', + group='objects', app_name='rgw'), + call(weight=0.10, replica_count=3, name='default.rgw.users.uid', + group='objects', app_name='rgw'), + call(weight=1.00, replica_count=3, + name='default.rgw.buckets.extra', + group='objects', app_name='rgw'), + call(weight=3.00, replica_count=3, + name='default.rgw.buckets.index', + group='objects', app_name='rgw'), + call(weight=0.10, replica_count=3, name='.rgw.root', + group='objects', app_name='rgw')], + ) + mock_request_access.assert_called_with(key_name='radosgw.gateway', + name='objects', + permission='rwx') + @patch.object(utils.apt_pkg, 'version_compare', lambda *args: -1) @patch.object(utils, 'lsb_release', lambda: {'DISTRIB_CODENAME': 'trusty'})