From 9e2aa03289bbf07ebbb167944cd04a7a11aa1f8f Mon Sep 17 00:00:00 2001 From: Ryan Beisner Date: Wed, 23 Nov 2016 15:14:58 -0600 Subject: [PATCH] Update Amulet defs, series metadata and c-h sync - Sync charm helpers if applicable. - Fix test executable hashbags for virtualenv prep. - Add Yakkety-Newton Amulet test definitions. - Prep Xenial-Ocata Amulet test definitions (not yet enabled). - Prep Zesty-Ocata Amulet test definitions (not yet enabled). - Remove Precise charm series metadata if present. - Remove Precise Amulet test definitions if present. Change-Id: Ic32eba6b28007101965bd2caec9ca6e5369ab379 --- hooks/charmhelpers/contrib/openstack/utils.py | 22 +- hooks/charmhelpers/core/hookenv.py | 2 + hooks/charmhelpers/core/host.py | 17 + .../core/kernel_factory/ubuntu.py | 2 +- hooks/charmhelpers/fetch/ubuntu.py | 8 + lib/ceph/__init__.py | 90 +- lib/ceph/ceph_helpers.py | 1514 +++++++++++++++++ metadata.yaml | 1 - tests/basic_deployment.py | 4 +- tests/charmhelpers/contrib/amulet/utils.py | 2 +- .../contrib/openstack/amulet/deployment.py | 35 +- .../contrib/openstack/amulet/utils.py | 11 + tests/gate-basic-trusty-icehouse | 2 +- tests/gate-basic-trusty-kilo | 2 +- tests/gate-basic-trusty-liberty | 2 +- tests/gate-basic-trusty-mitaka | 2 +- tests/gate-basic-xenial-mitaka | 2 +- tests/gate-basic-xenial-newton | 2 +- tests/gate-basic-xenial-ocata | 25 + tests/gate-basic-yakkety-newton | 2 +- tests/gate-basic-zesty-ocata | 23 + 21 files changed, 1721 insertions(+), 49 deletions(-) create mode 100644 lib/ceph/ceph_helpers.py create mode 100644 tests/gate-basic-xenial-ocata mode change 100644 => 100755 tests/gate-basic-yakkety-newton create mode 100644 tests/gate-basic-zesty-ocata diff --git a/hooks/charmhelpers/contrib/openstack/utils.py b/hooks/charmhelpers/contrib/openstack/utils.py index 8c89c3a..6d544e7 100644 --- a/hooks/charmhelpers/contrib/openstack/utils.py +++ b/hooks/charmhelpers/contrib/openstack/utils.py @@ -109,7 +109,7 @@ UBUNTU_OPENSTACK_RELEASE = OrderedDict([ ('wily', 'liberty'), ('xenial', 'mitaka'), ('yakkety', 'newton'), - ('zebra', 'ocata'), # TODO: upload with real Z name + ('zesty', 'ocata'), ]) @@ -152,6 +152,8 @@ SWIFT_CODENAMES = OrderedDict([ ['2.5.0', '2.6.0', '2.7.0']), ('newton', ['2.8.0', '2.9.0', '2.10.0']), + ('ocata', + ['2.11.0']), ]) # >= Liberty version->codename mapping @@ -410,14 +412,26 @@ def get_os_version_package(pkg, fatal=True): os_rel = None -def os_release(package, base='essex'): +def reset_os_release(): + '''Unset the cached os_release version''' + global os_rel + os_rel = None + + +def os_release(package, base='essex', reset_cache=False): ''' Returns OpenStack release codename from a cached global. + + If reset_cache then unset the cached os_release version and return the + freshly determined version. + If the codename can not be determined from either an installed package or the installation source, the earliest release supported by the charm should be returned. ''' global os_rel + if reset_cache: + reset_os_release() if os_rel: return os_rel os_rel = (git_os_codename_install_source(config('openstack-origin-git')) or @@ -535,6 +549,9 @@ def configure_installation_source(rel): 'newton': 'xenial-updates/newton', 'newton/updates': 'xenial-updates/newton', 'newton/proposed': 'xenial-proposed/newton', + 'zesty': 'zesty-updates/ocata', + 'zesty/updates': 'xenial-updates/ocata', + 'zesty/proposed': 'xenial-proposed/ocata', } try: @@ -668,6 +685,7 @@ def clean_storage(block_device): else: zap_disk(block_device) + is_ip = ip.is_ip ns_query = ip.ns_query get_host_ip = ip.get_host_ip diff --git a/hooks/charmhelpers/core/hookenv.py b/hooks/charmhelpers/core/hookenv.py index 996e81c..94fc996 100644 --- a/hooks/charmhelpers/core/hookenv.py +++ b/hooks/charmhelpers/core/hookenv.py @@ -332,6 +332,8 @@ def config(scope=None): config_cmd_line = ['config-get'] if scope is not None: config_cmd_line.append(scope) + else: + config_cmd_line.append('--all') config_cmd_line.append('--format=json') try: config_data = json.loads( diff --git a/hooks/charmhelpers/core/host.py b/hooks/charmhelpers/core/host.py index 0f1b2f3..04cadb3 100644 --- a/hooks/charmhelpers/core/host.py +++ b/hooks/charmhelpers/core/host.py @@ -732,3 +732,20 @@ def get_total_ram(): assert unit == 'kB', 'Unknown unit' return int(value) * 1024 # Classic, not KiB. raise NotImplementedError() + + +UPSTART_CONTAINER_TYPE = '/run/container_type' + + +def is_container(): + """Determine whether unit is running in a container + + @return: boolean indicating if unit is in a container + """ + if init_is_systemd(): + # Detect using systemd-detect-virt + return subprocess.call(['systemd-detect-virt', + '--container']) == 0 + else: + # Detect using upstart container file marker + return os.path.exists(UPSTART_CONTAINER_TYPE) diff --git a/hooks/charmhelpers/core/kernel_factory/ubuntu.py b/hooks/charmhelpers/core/kernel_factory/ubuntu.py index 2155964..3de372f 100644 --- a/hooks/charmhelpers/core/kernel_factory/ubuntu.py +++ b/hooks/charmhelpers/core/kernel_factory/ubuntu.py @@ -5,7 +5,7 @@ def persistent_modprobe(module): """Load a kernel module and configure for auto-load on reboot.""" with open('/etc/modules', 'r+') as modules: if module not in modules.read(): - modules.write(module) + modules.write(module + "\n") def update_initramfs(version='all'): diff --git a/hooks/charmhelpers/fetch/ubuntu.py b/hooks/charmhelpers/fetch/ubuntu.py index fce496b..39b9b80 100644 --- a/hooks/charmhelpers/fetch/ubuntu.py +++ b/hooks/charmhelpers/fetch/ubuntu.py @@ -105,6 +105,14 @@ CLOUD_ARCHIVE_POCKETS = { 'newton/proposed': 'xenial-proposed/newton', 'xenial-newton/proposed': 'xenial-proposed/newton', 'xenial-proposed/newton': 'xenial-proposed/newton', + # Ocata + 'ocata': 'xenial-updates/ocata', + 'xenial-ocata': 'xenial-updates/ocata', + 'xenial-ocata/updates': 'xenial-updates/ocata', + 'xenial-updates/ocata': 'xenial-updates/ocata', + 'ocata/proposed': 'xenial-proposed/ocata', + 'xenial-ocata/proposed': 'xenial-proposed/ocata', + 'xenial-ocata/newton': 'xenial-proposed/ocata', } APT_NO_LOCK = 100 # The return code for "couldn't acquire lock" in APT. diff --git a/lib/ceph/__init__.py b/lib/ceph/__init__.py index b95a826..db3772b 100644 --- a/lib/ceph/__init__.py +++ b/lib/ceph/__init__.py @@ -23,6 +23,7 @@ import re import sys import errno import shutil +import pyudev from charmhelpers.core import hookenv from charmhelpers.core.host import ( @@ -50,13 +51,15 @@ from charmhelpers.contrib.storage.linux.utils import ( is_block_device, zap_disk, is_device_mounted) +from charmhelpers.contrib.openstack.utils import ( + get_os_codename_install_source) LEADER = 'leader' PEON = 'peon' QUORUM = [LEADER, PEON] PACKAGES = ['ceph', 'gdisk', 'ntp', 'btrfs-tools', 'python-ceph', - 'radosgw', 'xfsprogs'] + 'radosgw', 'xfsprogs', 'python-pyudev'] LinkSpeed = { "BASE_10": 10, @@ -100,6 +103,23 @@ NETWORK_ADAPTER_SYSCTLS = { } +def unmounted_disks(): + """List of unmounted block devices on the current host.""" + disks = [] + context = pyudev.Context() + for device in context.list_devices(DEVTYPE='disk'): + if device['SUBSYSTEM'] == 'block': + matched = False + for block_type in [u'dm', u'loop', u'ram', u'nbd']: + if block_type in device.device_node: + matched = True + if matched: + continue + disks.append(device.device_node) + log("Found disks: {}".format(disks)) + return [disk for disk in disks if not is_device_mounted(disk)] + + def save_sysctls(sysctl_dict, save_location): """ Persist the sysctls to the hard drive. @@ -1344,7 +1364,7 @@ def roll_monitor_cluster(new_version, upgrade_key): version=new_version) else: # Check if the previous node has finished - status_set('blocked', + status_set('waiting', 'Waiting on {} to finish upgrading'.format( mon_sorted_list[position - 1])) wait_on_previous_node(upgrade_key=upgrade_key, @@ -1361,11 +1381,10 @@ def roll_monitor_cluster(new_version, upgrade_key): status_set('blocked', 'failed to upgrade monitor') -def upgrade_monitor(): +def upgrade_monitor(new_version): current_version = get_version() status_set("maintenance", "Upgrading monitor") log("Current ceph version is {}".format(current_version)) - new_version = config('release-version') log("Upgrading to: {}".format(new_version)) try: @@ -1393,7 +1412,6 @@ def upgrade_monitor(): service_start('ceph-mon@{}'.format(mon_id)) else: service_start('ceph-mon-all') - status_set("active", "") except subprocess.CalledProcessError as err: log("Stopping ceph and upgrading packages failed " "with message: {}".format(err.message)) @@ -1415,9 +1433,9 @@ def lock_and_roll(upgrade_key, service, my_name, version): # This should be quick if service == 'osd': - upgrade_osd() + upgrade_osd(version) elif service == 'mon': - upgrade_monitor() + upgrade_monitor(version) else: log("Unknown service {}. Unable to upgrade".format(service), level=ERROR) @@ -1541,11 +1559,10 @@ def roll_osd_cluster(new_version, upgrade_key): status_set('blocked', 'failed to upgrade osd') -def upgrade_osd(): +def upgrade_osd(new_version): current_version = get_version() status_set("maintenance", "Upgrading osd") log("Current ceph version is {}".format(current_version)) - new_version = config('release-version') log("Upgrading to: {}".format(new_version)) try: @@ -1578,3 +1595,58 @@ def upgrade_osd(): "with message: {}".format(err.message)) status_set("blocked", "Upgrade to {} failed".format(new_version)) sys.exit(1) + + +def list_pools(service): + """ + This will list the current pools that Ceph has + + :param service: String service id to run under + :return: list. Returns a list of the ceph pools. Raises CalledProcessError + if the subprocess fails to run. + """ + try: + pool_list = [] + pools = subprocess.check_output(['rados', '--id', service, 'lspools']) + for pool in pools.splitlines(): + pool_list.append(pool) + return pool_list + except subprocess.CalledProcessError as err: + log("rados lspools failed with error: {}".format(err.output)) + raise + + +# A dict of valid ceph upgrade paths. Mapping is old -> new +UPGRADE_PATHS = { + 'firefly': 'hammer', + 'hammer': 'jewel', +} + +# Map UCA codenames to ceph codenames +UCA_CODENAME_MAP = { + 'icehouse': 'firefly', + 'juno': 'firefly', + 'kilo': 'hammer', + 'liberty': 'hammer', + 'mitaka': 'jewel', +} + + +def pretty_print_upgrade_paths(): + '''Pretty print supported upgrade paths for ceph''' + lines = [] + for key, value in UPGRADE_PATHS.iteritems(): + lines.append("{} -> {}".format(key, value)) + return lines + + +def resolve_ceph_version(source): + ''' + Resolves a version of ceph based on source configuration + based on Ubuntu Cloud Archive pockets. + + @param: source: source configuration option of charm + @returns: ceph release codename or None if not resolvable + ''' + os_release = get_os_codename_install_source(source) + return UCA_CODENAME_MAP.get(os_release) diff --git a/lib/ceph/ceph_helpers.py b/lib/ceph/ceph_helpers.py new file mode 100644 index 0000000..31b1956 --- /dev/null +++ b/lib/ceph/ceph_helpers.py @@ -0,0 +1,1514 @@ +# Copyright 2014-2015 Canonical Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# Copyright 2012 Canonical Ltd. +# +# This file is sourced from lp:openstack-charm-helpers +# +# Authors: +# James Page +# Adam Gandelman +# + +import errno +import hashlib +import math +import six + +import os +import shutil +import json +import time +import uuid +import re + +import subprocess +from subprocess import (check_call, check_output, CalledProcessError, ) +from charmhelpers.core.hookenv import (config, + local_unit, + relation_get, + relation_ids, + relation_set, + related_units, + log, + DEBUG, + INFO, + WARNING, + ERROR, ) +from charmhelpers.core.host import (mount, + mounts, + service_start, + service_stop, + service_running, + umount, ) +from charmhelpers.fetch import (apt_install, ) + +from charmhelpers.core.kernel import modprobe +from charmhelpers.contrib.openstack.utils import config_flags_parser + +KEYRING = '/etc/ceph/ceph.client.{}.keyring' +KEYFILE = '/etc/ceph/ceph.client.{}.key' + +CEPH_CONF = """[global] +auth supported = {auth} +keyring = {keyring} +mon host = {mon_hosts} +log to syslog = {use_syslog} +err to syslog = {use_syslog} +clog to syslog = {use_syslog} +""" + +CRUSH_BUCKET = """root {name} {{ + id {id} # do not change unnecessarily + # weight 0.000 + alg straw + hash 0 # rjenkins1 +}} + +rule {name} {{ + ruleset 0 + type replicated + min_size 1 + max_size 10 + step take {name} + step chooseleaf firstn 0 type host + step emit +}}""" + +# This regular expression looks for a string like: +# root NAME { +# id NUMBER +# so that we can extract NAME and ID from the crushmap +CRUSHMAP_BUCKETS_RE = re.compile(r"root\s+(.+)\s+\{\s*id\s+(-?\d+)") + +# This regular expression looks for ID strings in the crushmap like: +# id NUMBER +# so that we can extract the IDs from a crushmap +CRUSHMAP_ID_RE = re.compile(r"id\s+(-?\d+)") + +# The number of placement groups per OSD to target for placement group +# calculations. This number is chosen as 100 due to the ceph PG Calc +# documentation recommending to choose 100 for clusters which are not +# expected to increase in the foreseeable future. Since the majority of the +# calculations are done on deployment, target the case of non-expanding +# clusters as the default. +DEFAULT_PGS_PER_OSD_TARGET = 100 +DEFAULT_POOL_WEIGHT = 10.0 +LEGACY_PG_COUNT = 200 + + +def validator(value, valid_type, valid_range=None): + """ + Used to validate these: http://docs.ceph.com/docs/master/rados/operations/ + pools/#set-pool-values + Example input: + validator(value=1, + valid_type=int, + valid_range=[0, 2]) + This says I'm testing value=1. It must be an int inclusive in [0,2] + + :param value: The value to validate + :param valid_type: The type that value should be. + :param valid_range: A range of values that value can assume. + :return: + """ + assert isinstance(value, valid_type), "{} is not a {}".format(value, + valid_type) + if valid_range is not None: + assert isinstance(valid_range, list), \ + "valid_range must be a list, was given {}".format(valid_range) + # If we're dealing with strings + if valid_type is six.string_types: + assert value in valid_range, \ + "{} is not in the list {}".format(value, valid_range) + # Integer, float should have a min and max + else: + if len(valid_range) != 2: + raise ValueError("Invalid valid_range list of {} for {}. " + "List must be [min,max]".format(valid_range, + value)) + assert value >= valid_range[0], \ + "{} is less than minimum allowed value of {}".format( + value, valid_range[0]) + assert value <= valid_range[1], \ + "{} is greater than maximum allowed value of {}".format( + value, valid_range[1]) + + +class PoolCreationError(Exception): + """ + A custom error to inform the caller that a pool creation failed. Provides + an error message + """ + + def __init__(self, message): + super(PoolCreationError, self).__init__(message) + + +class Crushmap(object): + """An object oriented approach to Ceph crushmap management.""" + + def __init__(self): + """Iiitialize the Crushmap from Ceph""" + self._crushmap = self.load_crushmap() + roots = re.findall(CRUSHMAP_BUCKETS_RE, self._crushmap) + buckets = [] + ids = list(map( + lambda x: int(x), + re.findall(CRUSHMAP_ID_RE, self._crushmap))) + ids.sort() + if roots != []: + for root in roots: + buckets.append(Crushmap.Bucket(root[0], root[1], True)) + + self._buckets = buckets + if ids != []: + self._ids = ids + else: + self._ids = [0] + + def load_crushmap(self): + try: + crush = subprocess.Popen( + ('ceph', 'osd', 'getcrushmap'), + stdout=subprocess.PIPE) + return subprocess.check_output( + ('crushtool', '-d', '-'), + stdin=crush.stdout).decode('utf-8') + except Exception as e: + log("load_crushmap error: {}".format(e)) + raise "Failed to read Crushmap" + + def buckets(self): + """Return a list of buckets that are in the Crushmap.""" + return self._buckets + + def add_bucket(self, bucket_name): + """Add a named bucket to Ceph""" + new_id = min(self._ids) - 1 + self._ids.append(new_id) + self._buckets.append(Crushmap.Bucket(bucket_name, new_id)) + + def save(self): + """Persist Crushmap to Ceph""" + try: + crushmap = self.build_crushmap() + compiled = subprocess.Popen( + ('crushtool', '-c', '/dev/stdin', '-o', '/dev/stdout'), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE) + output = compiled.communicate(crushmap)[0] + ceph = subprocess.Popen( + ('ceph', 'osd', 'setcrushmap', '-i', '/dev/stdin'), + stdin=subprocess.PIPE) + ceph_output = ceph.communicate(input=output) + return ceph_output + except Exception as e: + log("save error: {}".format(e)) + raise "Failed to save crushmap" + + def build_crushmap(self): + """Modifies the curent crushmap to include the new buckets""" + tmp_crushmap = self._crushmap + for bucket in self._buckets: + if not bucket.default: + tmp_crushmap = "{}\n\n{}".format( + tmp_crushmap, + Crushmap.bucket_string(bucket.name, bucket.id)) + return tmp_crushmap + + @staticmethod + def bucket_string(name, id): + return CRUSH_BUCKET.format(name=name, id=id) + + class Bucket(object): + """An object that describes a Crush bucket.""" + + def __init__(self, name, id, default=False): + self.name = name + self.id = int(id) + self.default = default + + def __repr__(self): + return "Bucket {{Name: {name}, ID: {id}}}".format( + name=self.name, id=self.id) + + def __eq__(self, other): + """Override the default Equals behavior""" + if isinstance(other, self.__class__): + return self.__dict__ == other.__dict__ + return NotImplemented + + def __ne__(self, other): + """Define a non-equality test""" + if isinstance(other, self.__class__): + return not self.__eq__(other) + return NotImplemented + + +class Pool(object): + """ + An object oriented approach to Ceph pool creation. This base class is + inherited by ReplicatedPool and ErasurePool. + Do not call create() on this base class as it will not do anything. + Instantiate a child class and call create(). + """ + + def __init__(self, service, name): + self.service = service + self.name = name + + # Create the pool if it doesn't exist already + # To be implemented by subclasses + def create(self): + pass + + def add_cache_tier(self, cache_pool, mode): + """ + Adds a new cache tier to an existing pool. + :param cache_pool: six.string_types. The cache tier pool name to add. + :param mode: six.string_types. The caching mode to use for this pool. + valid range = ["readonly", "writeback"] + :return: None + """ + # Check the input types and values + validator(value=cache_pool, valid_type=six.string_types) + validator(value=mode, + valid_type=six.string_types, + valid_range=["readonly", "writeback"]) + + check_call(['ceph', '--id', self.service, 'osd', 'tier', 'add', + self.name, cache_pool]) + check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', + cache_pool, mode]) + check_call(['ceph', '--id', self.service, 'osd', 'tier', 'set-overlay', + self.name, cache_pool]) + check_call(['ceph', '--id', self.service, 'osd', 'pool', 'set', + cache_pool, 'hit_set_type', 'bloom']) + + def remove_cache_tier(self, cache_pool): + """ + Removes a cache tier from Ceph. Flushes all dirty objects from + writeback pools and waits for that to complete. + :param cache_pool: six.string_types. The cache tier pool name to + remove. + :return: None + """ + # read-only is easy, writeback is much harder + mode = get_cache_mode(self.service, cache_pool) + version = ceph_version() + if mode == 'readonly': + check_call(['ceph', '--id', self.service, 'osd', 'tier', + 'cache-mode', cache_pool, 'none']) + check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', + self.name, cache_pool]) + + elif mode == 'writeback': + pool_forward_cmd = ['ceph', '--id', self.service, 'osd', 'tier', + 'cache-mode', cache_pool, 'forward'] + if version >= '10.1': + # Jewel added a mandatory flag + pool_forward_cmd.append('--yes-i-really-mean-it') + + check_call(pool_forward_cmd) + # Flush the cache and wait for it to return + check_call(['rados', '--id', self.service, '-p', cache_pool, + 'cache-flush-evict-all']) + check_call(['ceph', '--id', self.service, 'osd', 'tier', + 'remove-overlay', self.name]) + check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', + self.name, cache_pool]) + + def get_pgs(self, pool_size, percent_data=DEFAULT_POOL_WEIGHT): + """Return the number of placement groups to use when creating the pool. + + Returns the number of placement groups which should be specified when + creating the pool. This is based upon the calculation guidelines + provided by the Ceph Placement Group Calculator (located online at + http://ceph.com/pgcalc/). + + The number of placement groups are calculated using the following: + + (Target PGs per OSD) * (OSD #) * (%Data) + ---------------------------------------- + (Pool size) + + Per the upstream guidelines, the OSD # should really be considered + based on the number of OSDs which are eligible to be selected by the + pool. Since the pool creation doesn't specify any of CRUSH set rules, + the default rule will be dependent upon the type of pool being + created (replicated or erasure). + + This code makes no attempt to determine the number of OSDs which can be + selected for the specific rule, rather it is left to the user to tune + in the form of 'expected-osd-count' config option. + + :param pool_size: int. pool_size is either the number of replicas for + replicated pools or the K+M sum for erasure coded pools + :param percent_data: float. the percentage of data that is expected to + be contained in the pool for the specific OSD set. Default value + is to assume 10% of the data is for this pool, which is a + relatively low % of the data but allows for the pg_num to be + increased. NOTE: the default is primarily to handle the scenario + where related charms requiring pools has not been upgraded to + include an update to indicate their relative usage of the pools. + :return: int. The number of pgs to use. + """ + + # Note: This calculation follows the approach that is provided + # by the Ceph PG Calculator located at http://ceph.com/pgcalc/. + validator(value=pool_size, valid_type=int) + + # Ensure that percent data is set to something - even with a default + # it can be set to None, which would wreak havoc below. + if percent_data is None: + percent_data = DEFAULT_POOL_WEIGHT + + # If the expected-osd-count is specified, then use the max between + # the expected-osd-count and the actual osd_count + osd_list = get_osds(self.service) + expected = config('expected-osd-count') or 0 + + if osd_list: + osd_count = max(expected, len(osd_list)) + + # Log a message to provide some insight if the calculations claim + # to be off because someone is setting the expected count and + # there are more OSDs in reality. Try to make a proper guess + # based upon the cluster itself. + if expected and osd_count != expected: + log("Found more OSDs than provided expected count. " + "Using the actual count instead", INFO) + elif expected: + # Use the expected-osd-count in older ceph versions to allow for + # a more accurate pg calculations + osd_count = expected + else: + # NOTE(james-page): Default to 200 for older ceph versions + # which don't support OSD query from cli + return LEGACY_PG_COUNT + + percent_data /= 100.0 + target_pgs_per_osd = config( + 'pgs-per-osd') or DEFAULT_PGS_PER_OSD_TARGET + num_pg = (target_pgs_per_osd * osd_count * percent_data) // pool_size + + # The CRUSH algorithm has a slight optimization for placement groups + # with powers of 2 so find the nearest power of 2. If the nearest + # power of 2 is more than 25% below the original value, the next + # highest value is used. To do this, find the nearest power of 2 such + # that 2^n <= num_pg, check to see if its within the 25% tolerance. + exponent = math.floor(math.log(num_pg, 2)) + nearest = 2**exponent + if (num_pg - nearest) > (num_pg * 0.25): + # Choose the next highest power of 2 since the nearest is more + # than 25% below the original value. + return int(nearest * 2) + else: + return int(nearest) + + +class ReplicatedPool(Pool): + + def __init__(self, + service, + name, + pg_num=None, + replicas=2, + percent_data=10.0): + super(ReplicatedPool, self).__init__(service=service, name=name) + self.replicas = replicas + if pg_num: + # Since the number of placement groups were specified, ensure + # that there aren't too many created. + max_pgs = self.get_pgs(self.replicas, 100.0) + self.pg_num = min(pg_num, max_pgs) + else: + self.pg_num = self.get_pgs(self.replicas, percent_data) + + def create(self): + if not pool_exists(self.service, self.name): + # Create it + cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', + self.name, str(self.pg_num)] + try: + check_call(cmd) + # Set the pool replica size + update_pool(client=self.service, + pool=self.name, + settings={'size': str(self.replicas)}) + except CalledProcessError: + raise + + +# Default jerasure erasure coded pool +class ErasurePool(Pool): + + def __init__(self, + service, + name, + erasure_code_profile="default", + percent_data=10.0): + super(ErasurePool, self).__init__(service=service, name=name) + self.erasure_code_profile = erasure_code_profile + self.percent_data = percent_data + + def create(self): + if not pool_exists(self.service, self.name): + # Try to find the erasure profile information in order to properly + # size the number of placement groups. The size of an erasure + # coded placement group is calculated as k+m. + erasure_profile = get_erasure_profile(self.service, + self.erasure_code_profile) + + # Check for errors + if erasure_profile is None: + msg = ("Failed to discover erasure profile named " + "{}".format(self.erasure_code_profile)) + log(msg, level=ERROR) + raise PoolCreationError(msg) + if 'k' not in erasure_profile or 'm' not in erasure_profile: + # Error + msg = ("Unable to find k (data chunks) or m (coding chunks) " + "in erasure profile {}".format(erasure_profile)) + log(msg, level=ERROR) + raise PoolCreationError(msg) + + k = int(erasure_profile['k']) + m = int(erasure_profile['m']) + pgs = self.get_pgs(k + m, self.percent_data) + # Create it + cmd = ['ceph', '--id', self.service, 'osd', 'pool', 'create', + self.name, str(pgs), str(pgs), 'erasure', + self.erasure_code_profile] + try: + check_call(cmd) + except CalledProcessError: + raise + + """Get an existing erasure code profile if it already exists. + Returns json formatted output""" + + +def get_mon_map(service): + """ + Returns the current monitor map. + :param service: six.string_types. The Ceph user name to run the command + under + :return: json string. :raise: ValueError if the monmap fails to parse. + Also raises CalledProcessError if our ceph command fails + """ + try: + mon_status = check_output(['ceph', '--id', service, 'mon_status', + '--format=json']) + try: + return json.loads(mon_status) + except ValueError as v: + log("Unable to parse mon_status json: {}. Error: {}".format( + mon_status, v.message)) + raise + except CalledProcessError as e: + log("mon_status command failed with message: {}".format(e.message)) + raise + + +def hash_monitor_names(service): + """ + Uses the get_mon_map() function to get information about the monitor + cluster. + Hash the name of each monitor. Return a sorted list of monitor hashes + in an ascending order. + :param service: six.string_types. The Ceph user name to run the command + under + :rtype : dict. json dict of monitor name, ip address and rank + example: { + 'name': 'ip-172-31-13-165', + 'rank': 0, + 'addr': '172.31.13.165:6789/0'} + """ + try: + hash_list = [] + monitor_list = get_mon_map(service=service) + if monitor_list['monmap']['mons']: + for mon in monitor_list['monmap']['mons']: + hash_list.append(hashlib.sha224(mon['name'].encode( + 'utf-8')).hexdigest()) + return sorted(hash_list) + else: + return None + except (ValueError, CalledProcessError): + raise + + +def monitor_key_delete(service, key): + """ + Delete a key and value pair from the monitor cluster + :param service: six.string_types. The Ceph user name to run the command + under + :param key: six.string_types. The key to delete. + """ + try: + check_output(['ceph', '--id', service, 'config-key', 'del', str(key)]) + except CalledProcessError as e: + log("Monitor config-key put failed with message: {}".format(e.output)) + raise + + +def monitor_key_set(service, key, value): + """ + Sets a key value pair on the monitor cluster. + :param service: six.string_types. The Ceph user name to run the command + under + :param key: six.string_types. The key to set. + :param value: The value to set. This will be converted to a string + before setting + """ + try: + check_output(['ceph', '--id', service, 'config-key', 'put', str(key), + str(value)]) + except CalledProcessError as e: + log("Monitor config-key put failed with message: {}".format(e.output)) + raise + + +def monitor_key_get(service, key): + """ + Gets the value of an existing key in the monitor cluster. + :param service: six.string_types. The Ceph user name to run the command + under + :param key: six.string_types. The key to search for. + :return: Returns the value of that key or None if not found. + """ + try: + output = check_output(['ceph', '--id', service, 'config-key', 'get', + str(key)]) + return output + except CalledProcessError as e: + log("Monitor config-key get failed with message: {}".format(e.output)) + return None + + +def monitor_key_exists(service, key): + """ + Searches for the existence of a key in the monitor cluster. + :param service: six.string_types. The Ceph user name to run the command + under + :param key: six.string_types. The key to search for + :return: Returns True if the key exists, False if not and raises an + exception if an unknown error occurs. :raise: CalledProcessError if + an unknown error occurs + """ + try: + check_call(['ceph', '--id', service, 'config-key', 'exists', str(key)]) + # I can return true here regardless because Ceph returns + # ENOENT if the key wasn't found + return True + except CalledProcessError as e: + if e.returncode == errno.ENOENT: + return False + else: + log("Unknown error from ceph config-get exists: {} {}".format( + e.returncode, e.output)) + raise + + +def get_erasure_profile(service, name): + """ + :param service: six.string_types. The Ceph user name to run the command + under + :param name: + :return: + """ + try: + out = check_output( + ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'get', + name, '--format=json']) + return json.loads(out) + except (CalledProcessError, OSError, ValueError): + return None + + +def pool_set(service, pool_name, key, value): + """ + Sets a value for a RADOS pool in ceph. + :param service: six.string_types. The Ceph user name to run the command + under + :param pool_name: six.string_types + :param key: six.string_types + :param value: + :return: None. Can raise CalledProcessError + """ + cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', pool_name, key, value + ] + try: + check_call(cmd) + except CalledProcessError: + raise + + +def snapshot_pool(service, pool_name, snapshot_name): + """ + Snapshots a RADOS pool in ceph. + :param service: six.string_types. The Ceph user name to run the command + under + :param pool_name: six.string_types + :param snapshot_name: six.string_types + :return: None. Can raise CalledProcessError + """ + cmd = ['ceph', '--id', service, 'osd', 'pool', 'mksnap', pool_name, + snapshot_name] + try: + check_call(cmd) + except CalledProcessError: + raise + + +def remove_pool_snapshot(service, pool_name, snapshot_name): + """ + Remove a snapshot from a RADOS pool in ceph. + :param service: six.string_types. The Ceph user name to run the command + under + :param pool_name: six.string_types + :param snapshot_name: six.string_types + :return: None. Can raise CalledProcessError + """ + cmd = ['ceph', '--id', service, 'osd', 'pool', 'rmsnap', pool_name, + snapshot_name] + try: + check_call(cmd) + except CalledProcessError: + raise + + +# max_bytes should be an int or long +def set_pool_quota(service, pool_name, max_bytes): + """ + :param service: six.string_types. The Ceph user name to run the command + under + :param pool_name: six.string_types + :param max_bytes: int or long + :return: None. Can raise CalledProcessError + """ + # Set a byte quota on a RADOS pool in ceph. + cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, + 'max_bytes', str(max_bytes)] + try: + check_call(cmd) + except CalledProcessError: + raise + + +def remove_pool_quota(service, pool_name): + """ + Set a byte quota on a RADOS pool in ceph. + :param service: six.string_types. The Ceph user name to run the command + under + :param pool_name: six.string_types + :return: None. Can raise CalledProcessError + """ + cmd = ['ceph', '--id', service, 'osd', 'pool', 'set-quota', pool_name, + 'max_bytes', '0'] + try: + check_call(cmd) + except CalledProcessError: + raise + + +def remove_erasure_profile(service, profile_name): + """ + Create a new erasure code profile if one does not already exist for it. + Updates + the profile if it exists. Please see http://docs.ceph.com/docs/master/ + rados/operations/erasure-code-profile/ + for more details + :param service: six.string_types. The Ceph user name to run the command + under + :param profile_name: six.string_types + :return: None. Can raise CalledProcessError + """ + cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'rm', + profile_name] + try: + check_call(cmd) + except CalledProcessError: + raise + + +def create_erasure_profile(service, + profile_name, + erasure_plugin_name='jerasure', + failure_domain='host', + data_chunks=2, + coding_chunks=1, + locality=None, + durability_estimator=None): + """ + Create a new erasure code profile if one does not already exist for it. + Updates + the profile if it exists. Please see http://docs.ceph.com/docs/master/ + rados/operations/erasure-code-profile/ + for more details + :param service: six.string_types. The Ceph user name to run the command + under + :param profile_name: six.string_types + :param erasure_plugin_name: six.string_types + :param failure_domain: six.string_types. One of ['chassis', 'datacenter', + 'host', 'osd', 'pdu', 'pod', 'rack', 'region', 'room', 'root', 'row']) + :param data_chunks: int + :param coding_chunks: int + :param locality: int + :param durability_estimator: int + :return: None. Can raise CalledProcessError + """ + # Ensure this failure_domain is allowed by Ceph + validator(failure_domain, six.string_types, + ['chassis', 'datacenter', 'host', 'osd', 'pdu', 'pod', 'rack', + 'region', 'room', 'root', 'row']) + + cmd = ['ceph', '--id', service, 'osd', 'erasure-code-profile', 'set', + profile_name, 'plugin=' + erasure_plugin_name, + 'k=' + str(data_chunks), 'm=' + str(coding_chunks), + 'ruleset_failure_domain=' + failure_domain] + if locality is not None and durability_estimator is not None: + raise ValueError( + "create_erasure_profile should be called with k, m and one of l " + "or c but not both.") + + # Add plugin specific information + if locality is not None: + # For local erasure codes + cmd.append('l=' + str(locality)) + if durability_estimator is not None: + # For Shec erasure codes + cmd.append('c=' + str(durability_estimator)) + + if erasure_profile_exists(service, profile_name): + cmd.append('--force') + + try: + check_call(cmd) + except CalledProcessError: + raise + + +def rename_pool(service, old_name, new_name): + """ + Rename a Ceph pool from old_name to new_name + :param service: six.string_types. The Ceph user name to run the command + under + :param old_name: six.string_types + :param new_name: six.string_types + :return: None + """ + validator(value=old_name, valid_type=six.string_types) + validator(value=new_name, valid_type=six.string_types) + + cmd = ['ceph', '--id', service, 'osd', 'pool', 'rename', old_name, new_name + ] + check_call(cmd) + + +def erasure_profile_exists(service, name): + """ + Check to see if an Erasure code profile already exists. + :param service: six.string_types. The Ceph user name to run the command + under + :param name: six.string_types + :return: int or None + """ + validator(value=name, valid_type=six.string_types) + try: + check_call(['ceph', '--id', service, 'osd', 'erasure-code-profile', + 'get', name]) + return True + except CalledProcessError: + return False + + +def get_cache_mode(service, pool_name): + """ + Find the current caching mode of the pool_name given. + :param service: six.string_types. The Ceph user name to run the command + under + :param pool_name: six.string_types + :return: int or None + """ + validator(value=service, valid_type=six.string_types) + validator(value=pool_name, valid_type=six.string_types) + out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json' + ]) + try: + osd_json = json.loads(out) + for pool in osd_json['pools']: + if pool['pool_name'] == pool_name: + return pool['cache_mode'] + return None + except ValueError: + raise + + +def pool_exists(service, name): + """Check to see if a RADOS pool already exists.""" + try: + out = check_output(['rados', '--id', service, 'lspools']).decode( + 'UTF-8') + except CalledProcessError: + return False + + return name in out.split() + + +def get_osds(service): + """Return a list of all Ceph Object Storage Daemons currently in the + cluster. + """ + version = ceph_version() + if version and version >= '0.56': + return json.loads(check_output(['ceph', '--id', service, 'osd', 'ls', + '--format=json']).decode('UTF-8')) + + return None + + +def install(): + """Basic Ceph client installation.""" + ceph_dir = "/etc/ceph" + if not os.path.exists(ceph_dir): + os.mkdir(ceph_dir) + + apt_install('ceph-common', fatal=True) + + +def rbd_exists(service, pool, rbd_img): + """Check to see if a RADOS block device exists.""" + try: + out = check_output(['rbd', 'list', '--id', service, '--pool', pool + ]).decode('UTF-8') + except CalledProcessError: + return False + + return rbd_img in out + + +def create_rbd_image(service, pool, image, sizemb): + """Create a new RADOS block device.""" + cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service, + '--pool', pool] + check_call(cmd) + + +def update_pool(client, pool, settings): + cmd = ['ceph', '--id', client, 'osd', 'pool', 'set', pool] + for k, v in six.iteritems(settings): + cmd.append(k) + cmd.append(v) + + check_call(cmd) + + +def create_pool(service, name, replicas=3, pg_num=None): + """Create a new RADOS pool.""" + if pool_exists(service, name): + log("Ceph pool {} already exists, skipping creation".format(name), + level=WARNING) + return + + if not pg_num: + # Calculate the number of placement groups based + # on upstream recommended best practices. + osds = get_osds(service) + if osds: + pg_num = (len(osds) * 100 // replicas) + else: + # NOTE(james-page): Default to 200 for older ceph versions + # which don't support OSD query from cli + pg_num = 200 + + cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pg_num)] + check_call(cmd) + + update_pool(service, name, settings={'size': str(replicas)}) + + +def delete_pool(service, name): + """Delete a RADOS pool from ceph.""" + cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name, + '--yes-i-really-really-mean-it'] + check_call(cmd) + + +def _keyfile_path(service): + return KEYFILE.format(service) + + +def _keyring_path(service): + return KEYRING.format(service) + + +def create_keyring(service, key): + """Create a new Ceph keyring containing key.""" + keyring = _keyring_path(service) + if os.path.exists(keyring): + log('Ceph keyring exists at %s.' % keyring, level=WARNING) + return + + cmd = ['ceph-authtool', keyring, '--create-keyring', + '--name=client.{}'.format(service), '--add-key={}'.format(key)] + check_call(cmd) + log('Created new ceph keyring at %s.' % keyring, level=DEBUG) + + +def delete_keyring(service): + """Delete an existing Ceph keyring.""" + keyring = _keyring_path(service) + if not os.path.exists(keyring): + log('Keyring does not exist at %s' % keyring, level=WARNING) + return + + os.remove(keyring) + log('Deleted ring at %s.' % keyring, level=INFO) + + +def create_key_file(service, key): + """Create a file containing key.""" + keyfile = _keyfile_path(service) + if os.path.exists(keyfile): + log('Keyfile exists at %s.' % keyfile, level=WARNING) + return + + with open(keyfile, 'w') as fd: + fd.write(key) + + log('Created new keyfile at %s.' % keyfile, level=INFO) + + +def get_ceph_nodes(relation='ceph'): + """Query named relation to determine current nodes.""" + hosts = [] + for r_id in relation_ids(relation): + for unit in related_units(r_id): + hosts.append(relation_get('private-address', unit=unit, rid=r_id)) + + return hosts + + +def configure(service, key, auth, use_syslog): + """Perform basic configuration of Ceph.""" + create_keyring(service, key) + create_key_file(service, key) + hosts = get_ceph_nodes() + with open('/etc/ceph/ceph.conf', 'w') as ceph_conf: + ceph_conf.write(CEPH_CONF.format(auth=auth, + keyring=_keyring_path(service), + mon_hosts=",".join(map(str, hosts)), + use_syslog=use_syslog)) + modprobe('rbd') + + +def image_mapped(name): + """Determine whether a RADOS block device is mapped locally.""" + try: + out = check_output(['rbd', 'showmapped']).decode('UTF-8') + except CalledProcessError: + return False + + return name in out + + +def map_block_storage(service, pool, image): + """Map a RADOS block device for local use.""" + cmd = [ + 'rbd', + 'map', + '{}/{}'.format(pool, image), + '--user', + service, + '--secret', + _keyfile_path(service), + ] + check_call(cmd) + + +def filesystem_mounted(fs): + """Determine whether a filesytems is already mounted.""" + return fs in [f for f, m in mounts()] + + +def make_filesystem(blk_device, fstype='ext4', timeout=10): + """Make a new filesystem on the specified block device.""" + count = 0 + e_noent = os.errno.ENOENT + while not os.path.exists(blk_device): + if count >= timeout: + log('Gave up waiting on block device %s' % blk_device, level=ERROR) + raise IOError(e_noent, os.strerror(e_noent), blk_device) + + log('Waiting for block device %s to appear' % blk_device, level=DEBUG) + count += 1 + time.sleep(1) + else: + log('Formatting block device %s as filesystem %s.' % + (blk_device, fstype), + level=INFO) + check_call(['mkfs', '-t', fstype, blk_device]) + + +def place_data_on_block_device(blk_device, data_src_dst): + """Migrate data in data_src_dst to blk_device and then remount.""" + # mount block device into /mnt + mount(blk_device, '/mnt') + # copy data to /mnt + copy_files(data_src_dst, '/mnt') + # umount block device + umount('/mnt') + # Grab user/group ID's from original source + _dir = os.stat(data_src_dst) + uid = _dir.st_uid + gid = _dir.st_gid + # re-mount where the data should originally be + # TODO: persist is currently a NO-OP in core.host + mount(blk_device, data_src_dst, persist=True) + # ensure original ownership of new mount. + os.chown(data_src_dst, uid, gid) + + +def copy_files(src, dst, symlinks=False, ignore=None): + """Copy files from src to dst.""" + for item in os.listdir(src): + s = os.path.join(src, item) + d = os.path.join(dst, item) + if os.path.isdir(s): + shutil.copytree(s, d, symlinks, ignore) + else: + shutil.copy2(s, d) + + +def ensure_ceph_storage(service, + pool, + rbd_img, + sizemb, + mount_point, + blk_device, + fstype, + system_services=[], + replicas=3): + """NOTE: This function must only be called from a single service unit for + the same rbd_img otherwise data loss will occur. + + Ensures given pool and RBD image exists, is mapped to a block device, + and the device is formatted and mounted at the given mount_point. + + If formatting a device for the first time, data existing at mount_point + will be migrated to the RBD device before being re-mounted. + + All services listed in system_services will be stopped prior to data + migration and restarted when complete. + """ + # Ensure pool, RBD image, RBD mappings are in place. + if not pool_exists(service, pool): + log('Creating new pool {}.'.format(pool), level=INFO) + create_pool(service, pool, replicas=replicas) + + if not rbd_exists(service, pool, rbd_img): + log('Creating RBD image ({}).'.format(rbd_img), level=INFO) + create_rbd_image(service, pool, rbd_img, sizemb) + + if not image_mapped(rbd_img): + log('Mapping RBD Image {} as a Block Device.'.format(rbd_img), + level=INFO) + map_block_storage(service, pool, rbd_img) + + # make file system + # TODO: What happens if for whatever reason this is run again and + # the data is already in the rbd device and/or is mounted?? + # When it is mounted already, it will fail to make the fs + # XXX: This is really sketchy! Need to at least add an fstab entry + # otherwise this hook will blow away existing data if its executed + # after a reboot. + if not filesystem_mounted(mount_point): + make_filesystem(blk_device, fstype) + + for svc in system_services: + if service_running(svc): + log('Stopping services {} prior to migrating data.' + .format(svc), + level=DEBUG) + service_stop(svc) + + place_data_on_block_device(blk_device, mount_point) + + for svc in system_services: + log('Starting service {} after migrating data.'.format(svc), + level=DEBUG) + service_start(svc) + + +def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'): + """Ensures a ceph keyring is created for a named service and optionally + ensures user and group ownership. + + Returns False if no ceph key is available in relation state. + """ + key = None + for rid in relation_ids(relation): + for unit in related_units(rid): + key = relation_get('key', rid=rid, unit=unit) + if key: + break + + if not key: + return False + + create_keyring(service=service, key=key) + keyring = _keyring_path(service) + if user and group: + check_call(['chown', '%s.%s' % (user, group), keyring]) + + return True + + +def ceph_version(): + """Retrieve the local version of ceph.""" + if os.path.exists('/usr/bin/ceph'): + cmd = ['ceph', '-v'] + output = check_output(cmd).decode('US-ASCII') + output = output.split() + if len(output) > 3: + return output[2] + else: + return None + else: + return None + + +class CephBrokerRq(object): + """Ceph broker request. + + Multiple operations can be added to a request and sent to the Ceph broker + to be executed. + + Request is json-encoded for sending over the wire. + + The API is versioned and defaults to version 1. + """ + + def __init__(self, api_version=1, request_id=None): + self.api_version = api_version + if request_id: + self.request_id = request_id + else: + self.request_id = str(uuid.uuid1()) + self.ops = [] + + def add_op_create_pool( + self, name, replica_count=3, + pg_num=None, weight=None): + """Adds an operation to create a pool. + + @param pg_num setting: optional setting. If not provided, this value + will be calculated by the broker based on how many OSDs are in the + cluster at the time of creation. Note that, if provided, this value + will be capped at the current available maximum. + @param weight: the percentage of data the pool makes up + """ + if pg_num and weight: + raise ValueError('pg_num and weight are mutually exclusive') + + self.ops.append({'op': 'create-pool', + 'name': name, + 'replicas': replica_count, + 'pg_num': pg_num, + 'weight': weight}) + + def set_ops(self, ops): + """Set request ops to provided value. + + Useful for injecting ops that come from a previous request + to allow comparisons to ensure validity. + """ + self.ops = ops + + @property + def request(self): + return json.dumps({'api-version': self.api_version, + 'ops': self.ops, + 'request-id': self.request_id}) + + def _ops_equal(self, other): + if len(self.ops) == len(other.ops): + for req_no in range(0, len(self.ops)): + for key in ['replicas', 'name', 'op', 'pg_num', 'weight']: + if self.ops[req_no].get(key) != other.ops[req_no].get(key): + return False + else: + return False + return True + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + if self.api_version == other.api_version and \ + self._ops_equal(other): + return True + else: + return False + + def __ne__(self, other): + return not self.__eq__(other) + + +class CephBrokerRsp(object): + """Ceph broker response. + + Response is json-decoded and contents provided as methods/properties. + + The API is versioned and defaults to version 1. + """ + + def __init__(self, encoded_rsp): + self.api_version = None + self.rsp = json.loads(encoded_rsp) + + @property + def request_id(self): + return self.rsp.get('request-id') + + @property + def exit_code(self): + return self.rsp.get('exit-code') + + @property + def exit_msg(self): + return self.rsp.get('stderr') + +# Ceph Broker Conversation: +# If a charm needs an action to be taken by ceph it can create a CephBrokerRq +# and send that request to ceph via the ceph relation. The CephBrokerRq has a +# unique id so that the client can identity which CephBrokerRsp is associated +# with the request. Ceph will also respond to each client unit individually +# creating a response key per client unit eg glance/0 will get a CephBrokerRsp +# via key broker-rsp-glance-0 +# +# To use this the charm can just do something like: +# +# from charmhelpers.contrib.storage.linux.ceph import ( +# send_request_if_needed, +# is_request_complete, +# CephBrokerRq, +# ) +# +# @hooks.hook('ceph-relation-changed') +# def ceph_changed(): +# rq = CephBrokerRq() +# rq.add_op_create_pool(name='poolname', replica_count=3) +# +# if is_request_complete(rq): +# +# else: +# send_request_if_needed(get_ceph_request()) +# +# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example +# of glance having sent a request to ceph which ceph has successfully processed +# 'ceph:8': { +# 'ceph/0': { +# 'auth': 'cephx', +# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}', +# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}', +# 'ceph-public-address': '10.5.44.103', +# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==', +# 'private-address': '10.5.44.103', +# }, +# 'glance/0': { +# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", ' +# '"ops": [{"replicas": 3, "name": "glance", ' +# '"op": "create-pool"}]}'), +# 'private-address': '10.5.44.109', +# }, +# } + + +def get_previous_request(rid): + """Return the last ceph broker request sent on a given relation + + @param rid: Relation id to query for request + """ + request = None + broker_req = relation_get(attribute='broker_req', + rid=rid, + unit=local_unit()) + if broker_req: + request_data = json.loads(broker_req) + request = CephBrokerRq(api_version=request_data['api-version'], + request_id=request_data['request-id']) + request.set_ops(request_data['ops']) + + return request + + +def get_request_states(request, relation='ceph'): + """Return a dict of requests per relation id with their corresponding + completion state. + + This allows a charm, which has a request for ceph, to see whether there is + an equivalent request already being processed and if so what state that + request is in. + + @param request: A CephBrokerRq object + """ + complete = [] + requests = {} + for rid in relation_ids(relation): + complete = False + previous_request = get_previous_request(rid) + if request == previous_request: + sent = True + complete = is_request_complete_for_rid(previous_request, rid) + else: + sent = False + complete = False + + requests[rid] = {'sent': sent, 'complete': complete, } + + return requests + + +def is_request_sent(request, relation='ceph'): + """Check to see if a functionally equivalent request has already been sent + + Returns True if a similair request has been sent + + @param request: A CephBrokerRq object + """ + states = get_request_states(request, relation=relation) + for rid in states.keys(): + if not states[rid]['sent']: + return False + + return True + + +def is_request_complete(request, relation='ceph'): + """Check to see if a functionally equivalent request has already been + completed + + Returns True if a similair request has been completed + + @param request: A CephBrokerRq object + """ + states = get_request_states(request, relation=relation) + for rid in states.keys(): + if not states[rid]['complete']: + return False + + return True + + +def is_request_complete_for_rid(request, rid): + """Check if a given request has been completed on the given relation + + @param request: A CephBrokerRq object + @param rid: Relation ID + """ + broker_key = get_broker_rsp_key() + for unit in related_units(rid): + rdata = relation_get(rid=rid, unit=unit) + if rdata.get(broker_key): + rsp = CephBrokerRsp(rdata.get(broker_key)) + if rsp.request_id == request.request_id: + if not rsp.exit_code: + return True + else: + # The remote unit sent no reply targeted at this unit so either the + # remote ceph cluster does not support unit targeted replies or it + # has not processed our request yet. + if rdata.get('broker_rsp'): + request_data = json.loads(rdata['broker_rsp']) + if request_data.get('request-id'): + log('Ignoring legacy broker_rsp without unit key as remote' + ' service supports unit specific replies', + level=DEBUG) + else: + log('Using legacy broker_rsp as remote service does not ' + 'supports unit specific replies', + level=DEBUG) + rsp = CephBrokerRsp(rdata['broker_rsp']) + if not rsp.exit_code: + return True + + return False + + +def get_broker_rsp_key(): + """Return broker response key for this unit + + This is the key that ceph is going to use to pass request status + information back to this unit + """ + return 'broker-rsp-' + local_unit().replace('/', '-') + + +def send_request_if_needed(request, relation='ceph'): + """Send broker request if an equivalent request has not already been sent + + @param request: A CephBrokerRq object + """ + if is_request_sent(request, relation=relation): + log('Request already sent but not complete, not sending new request', + level=DEBUG) + else: + for rid in relation_ids(relation): + log('Sending request {}'.format(request.request_id), level=DEBUG) + relation_set(relation_id=rid, broker_req=request.request) + + +class CephConfContext(object): + """Ceph config (ceph.conf) context. + + Supports user-provided Ceph configuration settings. Use can provide a + dictionary as the value for the config-flags charm option containing + Ceph configuration settings keyede by their section in ceph.conf. + """ + + def __init__(self, permitted_sections=None): + self.permitted_sections = permitted_sections or [] + + def __call__(self): + conf = config('config-flags') + if not conf: + return {} + + conf = config_flags_parser(conf) + if type(conf) != dict: + log("Provided config-flags is not a dictionary - ignoring", + level=WARNING) + return {} + + permitted = self.permitted_sections + if permitted: + diff = set(conf.keys()).difference(set(permitted)) + if diff: + log("Config-flags contains invalid keys '%s' - they will be " + "ignored" % (', '.join(diff)), + level=WARNING) + + ceph_conf = {} + for key in conf: + if permitted and key not in permitted: + log("Ignoring key '%s'" % key, level=WARNING) + continue + + ceph_conf[key] = conf[key] + + return ceph_conf diff --git a/metadata.yaml b/metadata.yaml index bd19c80..c8f404c 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -12,7 +12,6 @@ tags: series: - xenial - trusty - - precise - yakkety peers: mon: diff --git a/tests/basic_deployment.py b/tests/basic_deployment.py index 43df82f..d40c158 100644 --- a/tests/basic_deployment.py +++ b/tests/basic_deployment.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # # Copyright 2016 Canonical Ltd # @@ -228,7 +228,7 @@ class CephBasicDeployment(OpenStackAmuletDeployment): 'cinder-volume'], } - if self._get_openstack_release() < self.vivid_kilo: + if self._get_openstack_release() < self.xenial_mitaka: # For upstart systems only. Ceph services under systemd # are checked by process name instead. ceph_services = [ diff --git a/tests/charmhelpers/contrib/amulet/utils.py b/tests/charmhelpers/contrib/amulet/utils.py index a39ed4c..8e13ab1 100644 --- a/tests/charmhelpers/contrib/amulet/utils.py +++ b/tests/charmhelpers/contrib/amulet/utils.py @@ -546,7 +546,7 @@ class AmuletUtils(object): raise if it is present. :returns: List of process IDs """ - cmd = 'pidof -x {}'.format(process_name) + cmd = 'pidof -x "{}"'.format(process_name) if not expect_success: cmd += " || exit 0 && exit 1" output, code = sentry_unit.run(cmd) diff --git a/tests/charmhelpers/contrib/openstack/amulet/deployment.py b/tests/charmhelpers/contrib/openstack/amulet/deployment.py index 9e0b07f..5c1ce45 100644 --- a/tests/charmhelpers/contrib/openstack/amulet/deployment.py +++ b/tests/charmhelpers/contrib/openstack/amulet/deployment.py @@ -69,9 +69,9 @@ class OpenStackAmuletDeployment(AmuletDeployment): # Charms outside the ~openstack-charmers base_charms = { - 'mysql': ['precise', 'trusty'], - 'mongodb': ['precise', 'trusty'], - 'nrpe': ['precise', 'trusty', 'wily', 'xenial'], + 'mysql': ['trusty'], + 'mongodb': ['trusty'], + 'nrpe': ['trusty', 'xenial'], } for svc in other_services: @@ -260,31 +260,20 @@ class OpenStackAmuletDeployment(AmuletDeployment): release. """ # Must be ordered by OpenStack release (not by Ubuntu release): - (self.precise_essex, self.precise_folsom, self.precise_grizzly, - self.precise_havana, self.precise_icehouse, - self.trusty_icehouse, self.trusty_juno, self.utopic_juno, - self.trusty_kilo, self.vivid_kilo, self.trusty_liberty, - self.wily_liberty, self.trusty_mitaka, - self.xenial_mitaka, self.xenial_newton, - self.yakkety_newton) = range(16) + (self.trusty_icehouse, self.trusty_kilo, self.trusty_liberty, + self.trusty_mitaka, self.xenial_mitaka, self.xenial_newton, + self.yakkety_newton, self.xenial_ocata, self.zesty_ocata) = range(9) releases = { - ('precise', None): self.precise_essex, - ('precise', 'cloud:precise-folsom'): self.precise_folsom, - ('precise', 'cloud:precise-grizzly'): self.precise_grizzly, - ('precise', 'cloud:precise-havana'): self.precise_havana, - ('precise', 'cloud:precise-icehouse'): self.precise_icehouse, ('trusty', None): self.trusty_icehouse, - ('trusty', 'cloud:trusty-juno'): self.trusty_juno, ('trusty', 'cloud:trusty-kilo'): self.trusty_kilo, ('trusty', 'cloud:trusty-liberty'): self.trusty_liberty, ('trusty', 'cloud:trusty-mitaka'): self.trusty_mitaka, - ('utopic', None): self.utopic_juno, - ('vivid', None): self.vivid_kilo, - ('wily', None): self.wily_liberty, ('xenial', None): self.xenial_mitaka, ('xenial', 'cloud:xenial-newton'): self.xenial_newton, + ('xenial', 'cloud:xenial-ocata'): self.xenial_ocata, ('yakkety', None): self.yakkety_newton, + ('zesty', None): self.zesty_ocata, } return releases[(self.series, self.openstack)] @@ -294,16 +283,10 @@ class OpenStackAmuletDeployment(AmuletDeployment): Return a string representing the openstack release. """ releases = OrderedDict([ - ('precise', 'essex'), - ('quantal', 'folsom'), - ('raring', 'grizzly'), - ('saucy', 'havana'), ('trusty', 'icehouse'), - ('utopic', 'juno'), - ('vivid', 'kilo'), - ('wily', 'liberty'), ('xenial', 'mitaka'), ('yakkety', 'newton'), + ('zesty', 'ocata'), ]) if self.openstack: os_origin = self.openstack.split(':')[1] diff --git a/tests/charmhelpers/contrib/openstack/amulet/utils.py b/tests/charmhelpers/contrib/openstack/amulet/utils.py index e4546c8..6a0ba83 100644 --- a/tests/charmhelpers/contrib/openstack/amulet/utils.py +++ b/tests/charmhelpers/contrib/openstack/amulet/utils.py @@ -28,6 +28,7 @@ import keystoneclient.v2_0 as keystone_client from keystoneclient.auth.identity import v3 as keystone_id_v3 from keystoneclient import session as keystone_session from keystoneclient.v3 import client as keystone_client_v3 +from novaclient import exceptions import novaclient.client as nova_client import pika @@ -377,6 +378,16 @@ class OpenStackAmuletUtils(AmuletUtils): tenant_name=tenant, auth_version='2.0') + def create_flavor(self, nova, name, ram, vcpus, disk, flavorid="auto", + ephemeral=0, swap=0, rxtx_factor=1.0, is_public=True): + """Create the specified flavor.""" + try: + nova.flavors.find(name=name) + except (exceptions.NotFound, exceptions.NoUniqueMatch): + self.log.debug('Creating flavor ({})'.format(name)) + nova.flavors.create(name, ram, vcpus, disk, flavorid, + ephemeral, swap, rxtx_factor, is_public) + def create_cirros_image(self, glance, image_name): """Download the latest cirros image and upload it to glance, validate and return a resource pointer. diff --git a/tests/gate-basic-trusty-icehouse b/tests/gate-basic-trusty-icehouse index a8639fe..8a98793 100755 --- a/tests/gate-basic-trusty-icehouse +++ b/tests/gate-basic-trusty-icehouse @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # # Copyright 2016 Canonical Ltd # diff --git a/tests/gate-basic-trusty-kilo b/tests/gate-basic-trusty-kilo index c331559..86e772a 100755 --- a/tests/gate-basic-trusty-kilo +++ b/tests/gate-basic-trusty-kilo @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # # Copyright 2016 Canonical Ltd # diff --git a/tests/gate-basic-trusty-liberty b/tests/gate-basic-trusty-liberty index d654265..3dfa8b6 100755 --- a/tests/gate-basic-trusty-liberty +++ b/tests/gate-basic-trusty-liberty @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # # Copyright 2016 Canonical Ltd # diff --git a/tests/gate-basic-trusty-mitaka b/tests/gate-basic-trusty-mitaka index e18197b..52b688f 100755 --- a/tests/gate-basic-trusty-mitaka +++ b/tests/gate-basic-trusty-mitaka @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # # Copyright 2016 Canonical Ltd # diff --git a/tests/gate-basic-xenial-mitaka b/tests/gate-basic-xenial-mitaka index 8d93f7b..f897700 100755 --- a/tests/gate-basic-xenial-mitaka +++ b/tests/gate-basic-xenial-mitaka @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # # Copyright 2016 Canonical Ltd # diff --git a/tests/gate-basic-xenial-newton b/tests/gate-basic-xenial-newton index 40fc35e..69bf0a5 100755 --- a/tests/gate-basic-xenial-newton +++ b/tests/gate-basic-xenial-newton @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # # Copyright 2016 Canonical Ltd # diff --git a/tests/gate-basic-xenial-ocata b/tests/gate-basic-xenial-ocata new file mode 100644 index 0000000..ec2713c --- /dev/null +++ b/tests/gate-basic-xenial-ocata @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# +# Copyright 2016 Canonical Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Amulet tests on a basic ceph deployment on xenial-ocata.""" + +from basic_deployment import CephBasicDeployment + +if __name__ == '__main__': + deployment = CephBasicDeployment(series='xenial', + openstack='cloud:xenial-ocata', + source='cloud:xenial-updates/ocata') + deployment.run_tests() diff --git a/tests/gate-basic-yakkety-newton b/tests/gate-basic-yakkety-newton old mode 100644 new mode 100755 index f293986..9ae4400 --- a/tests/gate-basic-yakkety-newton +++ b/tests/gate-basic-yakkety-newton @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # # Copyright 2016 Canonical Ltd # diff --git a/tests/gate-basic-zesty-ocata b/tests/gate-basic-zesty-ocata new file mode 100644 index 0000000..a642193 --- /dev/null +++ b/tests/gate-basic-zesty-ocata @@ -0,0 +1,23 @@ +#!/usr/bin/env python +# +# Copyright 2016 Canonical Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Amulet tests on a basic ceph deployment on zesty-ocata.""" + +from basic_deployment import CephBasicDeployment + +if __name__ == '__main__': + deployment = CephBasicDeployment(series='zesty') + deployment.run_tests()