# Copyright 2016 Canonical Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import re import socket import subprocess import errno from charmhelpers.core.hookenv import ( DEBUG, cached, config, goal_state, log, network_get_primary_address, related_units, relation_ids, relation_get, status_set, unit_get, ) from charmhelpers.fetch import ( apt_install, filter_installed_packages ) from charmhelpers.core.host import ( lsb_release, CompareHostReleases, cmp_pkgrevno, ) from charmhelpers.contrib.network.ip import ( get_address_in_network, get_ipv6_addr ) from charmhelpers.contrib.storage.linux import ceph try: import dns.resolver except ImportError: apt_install(filter_installed_packages(['python-dnspython']), fatal=True) import dns.resolver class OsdPostUpgradeError(Exception): """Error class for OSD post-upgrade operations.""" pass def enable_pocket(pocket): apt_sources = "/etc/apt/sources.list" with open(apt_sources, "r") as sources: lines = sources.readlines() with open(apt_sources, "w") as sources: for line in lines: if pocket in line: sources.write(re.sub('^# deb', 'deb', line)) else: sources.write(line) def is_mgr_module_enabled(module): """Is a given manager module enabled. :param module: :type module: str :returns: Whether the named module is enabled :rtype: bool """ return module in ceph.enabled_manager_modules() def mgr_enable_module(module): """Enable a Ceph Manager Module. :param module: The module name to enable :type module: str :raises: subprocess.CalledProcessError """ if not is_mgr_module_enabled(module): subprocess.check_call(['ceph', 'mgr', 'module', 'enable', module]) return True return False def mgr_disable_module(module): """Enable a Ceph Manager Module. :param module: The module name to enable :type module: str :raises: subprocess.CalledProcessError """ if is_mgr_module_enabled(module): subprocess.check_call(['ceph', 'mgr', 'module', 'disable', module]) return True return False def set_balancer_mode(mode): '''Set the balancer mode used by the Ceph manager.''' if not mode: return elif cmp_pkgrevno('ceph-common', '12.0.0') < 0: log('Luminous or later is required to set the balancer mode') return elif not is_mgr_module_enabled('balancer'): log("Balancer module is disabled") return try: subprocess.check_call(['ceph', 'balancer', 'mode', mode], shell=True) except subprocess.CalledProcessError: log('Failed to set balancer mode:', level='ERROR') @cached def get_unit_hostname(): return socket.gethostname() @cached def get_host_ip(hostname=None): if config('prefer-ipv6'): return get_ipv6_addr()[0] hostname = hostname or unit_get('private-address') try: # Test to see if already an IPv4 address socket.inet_aton(hostname) return hostname except socket.error: # This may throw an NXDOMAIN exception; in which case # things are badly broken so just let it kill the hook answers = dns.resolver.query(hostname, 'A') if answers: return answers[0].address @cached def get_public_addr(): if config('ceph-public-network'): return get_network_addrs('ceph-public-network')[0] try: return network_get_primary_address('public') except NotImplementedError: log("network-get not supported", DEBUG) return get_host_ip() @cached def get_cluster_addr(): if config('ceph-cluster-network'): return get_network_addrs('ceph-cluster-network')[0] try: return network_get_primary_address('cluster') except NotImplementedError: log("network-get not supported", DEBUG) return get_host_ip() def get_networks(config_opt='ceph-public-network'): """Get all configured networks from provided config option. If public network(s) are provided, go through them and return those for which we have an address configured. """ networks = config(config_opt) if networks: networks = networks.split() return [n for n in networks if get_address_in_network(n)] return [] def get_network_addrs(config_opt): """Get all configured public networks addresses. If public network(s) are provided, go through them and return the addresses we have configured on any of those networks. """ addrs = [] networks = config(config_opt) if networks: networks = networks.split() addrs = [get_address_in_network(n) for n in networks] addrs = [a for a in addrs if a] if not addrs: if networks: msg = ("Could not find an address on any of '%s' - resolve this " "error to retry" % (networks)) status_set('blocked', msg) raise Exception(msg) else: return [get_host_ip()] return addrs def assert_charm_supports_ipv6(): """Check whether we are able to support charms ipv6.""" _release = lsb_release()['DISTRIB_CODENAME'].lower() if CompareHostReleases(_release) < "trusty": raise Exception("IPv6 is not supported in the charms for Ubuntu " "versions less than Trusty 14.04") def has_rbd_mirrors(): """Determine if we have or will have ``rbd-mirror`` charms related. :returns: True or False :rtype: bool """ try: # NOTE(fnordahl): This optimization will not be useful until we get a # resolution on LP: #1818245 raise NotImplementedError gs = goal_state() return 'rbd-mirror' in gs.get('relations', {}) except NotImplementedError: for relid in relation_ids('rbd-mirror'): if related_units(relid): return True def get_default_rbd_features(): """Get default value for ``rbd_default_features``. This is retrieved by asking the installed Ceph binary to show its runtime config when using a empty configuration file. :returns: Installed Ceph's Default vaule for ``rbd_default_features`` :rtype: int :raises: IndexError, json.JSONDecodeError, subprocess.CalledProcessError """ ceph_conf = json.loads(subprocess.check_output( ['ceph-conf', '-c', '/dev/null', '-D', '--format', 'json'], universal_newlines=True)) return int(ceph_conf['rbd_default_features']) def add_rbd_mirror_features(rbd_features): """Take a RBD Features bitmap and add the features required for Mirroring. :param rbd_features: Input bitmap :type rbd_features: int :returns: Bitmap bitwise OR'ed with the features required for Mirroring. :rtype: int """ RBD_FEATURE_EXCLUSIVE_LOCK = 4 RBD_FEATURE_JOURNALING = 64 return rbd_features | RBD_FEATURE_EXCLUSIVE_LOCK | RBD_FEATURE_JOURNALING def get_rbd_features(): """Determine if we should set, and what the rbd default features should be. :returns: None or the apropriate value to use :rtype: Option[int, None] """ rbd_feature_config = config('default-rbd-features') if rbd_feature_config: return int(rbd_feature_config) elif has_rbd_mirrors(): return add_rbd_mirror_features(get_default_rbd_features()) def get_ceph_osd_releases(): ceph_osd_releases = set() for r_id in relation_ids('osd'): for unit in related_units(r_id): ceph_osd_release = relation_get( attribute='ceph_release', unit=unit, rid=r_id ) if ceph_osd_release is not None: ceph_osd_releases.add(ceph_osd_release) return list(ceph_osd_releases) def execute_post_osd_upgrade_steps(ceph_osd_release): """Executes post-upgrade steps. Allows execution of any steps that need to be taken after osd upgrades have finished (often specified in ceph upgrade docs). :param str ceph_osd_release: the new ceph-osd release. """ log('Executing post-ceph-osd upgrade commands.') try: if (_all_ceph_versions_same() and not _is_required_osd_release(ceph_osd_release)): log('Setting require_osd_release to {}.'.format(ceph_osd_release)) _set_require_osd_release(ceph_osd_release) except OsdPostUpgradeError as upgrade_error: msg = 'OSD post-upgrade steps failed: {}'.format( upgrade_error) log(message=msg, level='ERROR') def _all_ceph_versions_same(): """Checks that ceph-mon and ceph-osd have converged to the same version. :return boolean: True if all same, false if not or command failed. """ try: versions_command = 'ceph versions' versions_str = subprocess.check_output( versions_command.split()).decode('UTF-8') except subprocess.CalledProcessError as call_error: if call_error.returncode == errno.EINVAL: log('Calling "ceph versions" failed. Command requires ' 'luminous and above.', level='WARNING') return False else: log('Calling "ceph versions" failed.', level='ERROR') raise OsdPostUpgradeError(call_error) versions_dict = json.loads(versions_str) if len(versions_dict['overall']) > 1: log('All upgrades of mon and osd have not completed.') return False if len(versions_dict['osd']) < 1: log('Monitors have converged but no osd versions found.', level='WARNING') return False return True def _is_required_osd_release(release): """Checks to see if require_osd_release is set to input release. Runs and parses the ceph osd dump command to determine if require_osd_release is set to the input release. If so, return True. Else, return False. :param str release: the release to check against :return bool: True if releases match, else False. :raises: OsdPostUpgradeError """ try: dump_command = 'ceph osd dump -f json' osd_dump_str = subprocess.check_output( dump_command.split()).decode('UTF-8') osd_dump_dict = json.loads(osd_dump_str) except subprocess.CalledProcessError as cmd_error: log(message='Command {} failed.'.format(cmd_error.cmd), level='ERROR') raise OsdPostUpgradeError(cmd_error) except json.JSONDecodeError as decode_error: log(message='Failed to decode JSON.', level='ERROR') raise OsdPostUpgradeError(decode_error) return osd_dump_dict.get('require_osd_release') == release def _set_require_osd_release(release): """Attempts to set the required_osd_release osd config option. :param str release: The release to set option to :raises: OsdPostUpgradeError """ try: command = 'ceph osd require-osd-release {} ' \ '--yes-i-really-mean-it'.format(release) subprocess.check_call(command.split()) except subprocess.CalledProcessError as call_error: msg = 'Unable to execute command <{}>'.format(call_error.cmd) log(message=msg, level='ERROR') raise OsdPostUpgradeError(call_error)