From 04b8f8492737b429a1465e609c72174171d433ac Mon Sep 17 00:00:00 2001 From: David Ames Date: Tue, 16 May 2017 11:22:55 -0700 Subject: [PATCH] Do not install NTP when installed in a container Use determine_packages() to avoid installing NTP when in a container. Sync charms.ceph to get ceph.determine_packages(). Change-Id: Ia00af86964d8f77e615367cbcde35a4d7d10774c Partial-Bug: #1690513 --- hooks/ceph_hooks.py | 5 +- lib/ceph/__init__.py | 609 +++++++++++++++++++++++++++++++-------- lib/ceph/ceph_broker.py | 6 +- lib/ceph/ceph_helpers.py | 31 +- lib/setup.py | 85 ++++++ 5 files changed, 604 insertions(+), 132 deletions(-) create mode 100644 lib/setup.py diff --git a/hooks/ceph_hooks.py b/hooks/ceph_hooks.py index 93d58dc..d71210d 100755 --- a/hooks/ceph_hooks.py +++ b/hooks/ceph_hooks.py @@ -154,7 +154,7 @@ def install(): execd_preinstall() add_source(config('source'), config('key')) apt_update(fatal=True) - apt_install(packages=ceph.PACKAGES, fatal=True) + apt_install(packages=ceph.determine_packages(), fatal=True) def az_info(): @@ -544,7 +544,8 @@ def client_relation_changed(): @harden() def upgrade_charm(): emit_cephconf() - apt_install(packages=filter_installed_packages(ceph.PACKAGES), fatal=True) + apt_install(packages=filter_installed_packages(ceph.determine_packages()), + fatal=True) ceph.update_monfs() mon_relation_joined() if is_relation_made("nrpe-external-master"): diff --git a/lib/ceph/__init__.py b/lib/ceph/__init__.py index 948e99e..ad67965 100644 --- a/lib/ceph/__init__.py +++ b/lib/ceph/__init__.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from _ctypes import POINTER, byref import ctypes import collections import json @@ -26,25 +25,32 @@ import errno import shutil import pyudev +from datetime import datetime + from charmhelpers.core import hookenv +from charmhelpers.core import templating from charmhelpers.core.host import ( - mkdir, chownr, - service_restart, - lsb_release, cmp_pkgrevno, - service_stop, + lsb_release, + mkdir, mounts, + owner, + service_restart, service_start, + service_stop, CompareHostReleases, + is_container, ) from charmhelpers.core.hookenv import ( - log, - ERROR, cached, + config, + log, status_set, - WARNING, DEBUG, config) -from charmhelpers.core.services import render_template + DEBUG, + ERROR, + WARNING, +) from charmhelpers.fetch import ( apt_cache, add_source, apt_install, apt_update) @@ -52,13 +58,22 @@ from charmhelpers.contrib.storage.linux.ceph import ( monitor_key_set, monitor_key_exists, monitor_key_get, - get_mon_map) + get_mon_map, +) from charmhelpers.contrib.storage.linux.utils import ( is_block_device, zap_disk, - is_device_mounted) + is_device_mounted, +) from charmhelpers.contrib.openstack.utils import ( - get_os_codename_install_source) + get_os_codename_install_source, +) + +from ceph.ceph_helpers import check_output + +CEPH_BASE_DIR = os.path.join(os.sep, 'var', 'lib', 'ceph') +OSD_BASE_DIR = os.path.join(CEPH_BASE_DIR, 'osd') +HDPARM_FILE = os.path.join(os.sep, 'etc', 'hdparm.conf') LEADER = 'leader' PEON = 'peon' @@ -109,6 +124,42 @@ NETWORK_ADAPTER_SYSCTLS = { } +class Partition(object): + def __init__(self, name, number, size, start, end, sectors, uuid): + """ + A block device partition + :param name: Name of block device + :param number: Partition number + :param size: Capacity of the device + :param start: Starting block + :param end: Ending block + :param sectors: Number of blocks + :param uuid: UUID of the partition + """ + self.name = name, + self.number = number + self.size = size + self.start = start + self.end = end + self.sectors = sectors + self.uuid = uuid + + def __str__(self): + return "number: {} start: {} end: {} sectors: {} size: {} " \ + "name: {} uuid: {}".format(self.number, self.start, + self.end, + self.sectors, self.size, + self.name, self.uuid) + + def __eq__(self, other): + if isinstance(other, self.__class__): + return self.__dict__ == other.__dict__ + return False + + def __ne__(self, other): + return not self.__eq__(other) + + def unmounted_disks(): """List of unmounted block devices on the current host.""" disks = [] @@ -173,7 +224,7 @@ def tune_nic(network_interface): try: # Apply the settings log("Applying sysctl settings", level=DEBUG) - subprocess.check_output(["sysctl", "-p", sysctl_file]) + check_output(["sysctl", "-p", sysctl_file]) except subprocess.CalledProcessError as err: log('sysctl -p {} failed with error {}'.format(sysctl_file, err.output), @@ -224,14 +275,21 @@ def persist_settings(settings_dict): The settings_dict should be in the form of {"uuid": {"key":"value"}} :param settings_dict: dict of settings to save """ - hdparm_path = os.path.join(os.sep, 'etc', 'hdparm.conf') + if not settings_dict: + return + try: - with open(hdparm_path, 'w') as hdparm: - hdparm.write(render_template('hdparm.conf', settings_dict)) + templating.render(source='hdparm.conf', target=HDPARM_FILE, + context=settings_dict) except IOError as err: log("Unable to open {path} because of error: {error}".format( - path=hdparm_path, - error=err.message), level=ERROR) + path=HDPARM_FILE, error=err.message), level=ERROR) + except Exception as e: + # The templating.render can raise a jinja2 exception if the + # template is not found. Rather than polluting the import + # space of this charm, simply catch Exception + log('Unable to render {path} due to error: {error}'.format( + path=HDPARM_FILE, error=e.message), level=ERROR) def set_max_sectors_kb(dev_name, max_sectors_size): @@ -305,9 +363,9 @@ def set_hdd_read_ahead(dev_name, read_ahead_sectors=256): log('Setting read ahead to {} for device {}'.format( read_ahead_sectors, dev_name)) - subprocess.check_output(['hdparm', - '-a{}'.format(read_ahead_sectors), - dev_name]) + check_output(['hdparm', + '-a{}'.format(read_ahead_sectors), + dev_name]) except subprocess.CalledProcessError as e: log('hdparm failed with error: {}'.format(e.output), level=ERROR) @@ -315,52 +373,22 @@ def set_hdd_read_ahead(dev_name, read_ahead_sectors=256): def get_block_uuid(block_dev): """ - This queries blkid to get the uuid for a block device. Note: This function - needs to be called with root priv. It will raise an error otherwise. + This queries blkid to get the uuid for a block device. :param block_dev: Name of the block device to query. - :return: The UUID of the device or None on Error. Raises OSError + :return: The UUID of the device or None on Error. """ try: - blkid = ctypes.cdll.LoadLibrary("libblkid.so") - # Header signature - # extern int blkid_probe_lookup_value(blkid_probe pr, const char *name, - # const char **data, size_t *len); - blkid.blkid_new_probe_from_filename.argtypes = [ctypes.c_char_p] - blkid.blkid_probe_lookup_value.argtypes = [ctypes.c_void_p, - ctypes.c_char_p, - POINTER(ctypes.c_char_p), - POINTER(ctypes.c_ulong)] - except OSError as err: - log('get_block_uuid loading libblkid.so failed with error: {}'.format( - os.strerror(err.errno)), - level=ERROR) - raise err - if not os.path.exists(block_dev): + block_info = check_output( + ['blkid', '-o', 'export', block_dev]) + for tag in block_info.split('\n'): + parts = tag.split('=') + if parts[0] == 'UUID': + return parts[1] return None - probe = blkid.blkid_new_probe_from_filename(ctypes.c_char_p(block_dev)) - if probe < 0: - log('get_block_uuid new_probe_from_filename failed: {}'.format( - os.strerror(probe)), + except subprocess.CalledProcessError as err: + log('get_block_uuid failed with error: {}'.format(err.output), level=ERROR) - raise OSError(probe, os.strerror(probe)) - result = blkid.blkid_do_probe(probe) - if result != 0: - log('get_block_uuid do_probe failed with error: {}'.format( - os.strerror(result)), - level=ERROR) - raise OSError(result, os.strerror(result)) - uuid = ctypes.c_char_p() - result = blkid.blkid_probe_lookup_value(probe, - ctypes.c_char_p( - 'UUID'.encode('ascii')), - byref(uuid), None) - if result < 0: - log('get_block_uuid lookup_value failed with error: {}'.format( - os.strerror(result)), - level=ERROR) - raise OSError(result, os.strerror(result)) - blkid.blkid_free_probe(probe) - return ctypes.string_at(uuid).decode('ascii') + return None def check_max_sectors(save_settings_dict, @@ -499,7 +527,7 @@ def get_osd_weight(osd_id): Also raises CalledProcessError if our ceph command fails """ try: - tree = subprocess.check_output( + tree = check_output( ['ceph', 'osd', 'tree', '--format=json']) try: json_tree = json.loads(tree) @@ -526,7 +554,7 @@ def get_osd_tree(service): Also raises CalledProcessError if our ceph command fails """ try: - tree = subprocess.check_output( + tree = check_output( ['ceph', '--id', service, 'osd', 'tree', '--format=json']) try: @@ -561,6 +589,43 @@ def get_osd_tree(service): raise +def _get_child_dirs(path): + """Returns a list of directory names in the specified path. + + :param path: a full path listing of the parent directory to return child + directory names + :return: list. A list of child directories under the parent directory + :raises: ValueError if the specified path does not exist or is not a + directory, + OSError if an error occurs reading the directory listing + """ + if not os.path.exists(path): + raise ValueError('Specfied path "%s" does not exist' % path) + if not os.path.isdir(path): + raise ValueError('Specified path "%s" is not a directory' % path) + + files_in_dir = [os.path.join(path, f) for f in os.listdir(path)] + return list(filter(os.path.isdir, files_in_dir)) + + +def _get_osd_num_from_dirname(dirname): + """Parses the dirname and returns the OSD id. + + Parses a string in the form of 'ceph-{osd#}' and returns the osd number + from the directory name. + + :param dirname: the directory name to return the OSD number from + :return int: the osd number the directory name corresponds to + :raises ValueError: if the osd number cannot be parsed from the provided + directory name. + """ + match = re.search('ceph-(?P\d+)', dirname) + if not match: + raise ValueError("dirname not in correct format: %s" % dirname) + + return match.group('osd_id') + + def get_local_osd_ids(): """ This will list the /var/lib/ceph/osd/* directories and try @@ -666,7 +731,7 @@ def is_quorum(): ] if os.path.exists(asok): try: - result = json.loads(subprocess.check_output(cmd)) + result = json.loads(check_output(cmd)) except subprocess.CalledProcessError: return False except ValueError: @@ -693,7 +758,7 @@ def is_leader(): ] if os.path.exists(asok): try: - result = json.loads(subprocess.check_output(cmd)) + result = json.loads(check_output(cmd)) except subprocess.CalledProcessError: return False except ValueError: @@ -737,9 +802,12 @@ DISK_FORMATS = [ ] CEPH_PARTITIONS = [ + '89C57F98-2FE5-4DC0-89C1-5EC00CEFF2BE', # ceph encrypted disk in creation + '45B0969E-9B03-4F30-B4C6-5EC00CEFF106', # ceph encrypted journal '4FBD7E29-9D25-41B8-AFD0-5EC00CEFF05D', # ceph encrypted osd data '4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D', # ceph osd data '45B0969E-9B03-4F30-B4C6-B4B80CEFF106', # ceph osd journal + '89C57F98-2FE5-4DC0-89C1-F3AD0CEFF2BE', # ceph disk in creation ] @@ -800,7 +868,7 @@ def replace_osd(dead_osd_number, # Drop this osd out of the cluster. This will begin a # rebalance operation status_set('maintenance', 'Removing osd {}'.format(dead_osd_number)) - subprocess.check_output([ + check_output([ 'ceph', '--id', 'osd-upgrade', @@ -811,8 +879,8 @@ def replace_osd(dead_osd_number, if systemd(): service_stop('ceph-osd@{}'.format(dead_osd_number)) else: - subprocess.check_output(['stop', 'ceph-osd', 'id={}'.format( - dead_osd_number)]), + check_output(['stop', 'ceph-osd', 'id={}'.format( + dead_osd_number)]) # umount if still mounted ret = umount(mount_point) if ret < 0: @@ -820,20 +888,20 @@ def replace_osd(dead_osd_number, mount_point, os.strerror(ret))) # Clean up the old mount point shutil.rmtree(mount_point) - subprocess.check_output([ + check_output([ 'ceph', '--id', 'osd-upgrade', 'osd', 'crush', 'remove', 'osd.{}'.format(dead_osd_number)]) # Revoke the OSDs access keys - subprocess.check_output([ + check_output([ 'ceph', '--id', 'osd-upgrade', 'auth', 'del', 'osd.{}'.format(dead_osd_number)]) - subprocess.check_output([ + check_output([ 'ceph', '--id', 'osd-upgrade', @@ -850,17 +918,48 @@ def replace_osd(dead_osd_number, log('replace_osd failed with error: ' + e.output) -def is_osd_disk(dev): +def get_partition_list(dev): + """ + Lists the partitions of a block device + :param dev: Path to a block device. ex: /dev/sda + :return: :raise: Returns a list of Partition objects. + Raises CalledProcessException if lsblk fails + """ + partitions_list = [] try: - info = subprocess.check_output(['sgdisk', '-i', '1', dev]) - info = info.split("\n") # IGNORE:E1103 - for line in info: - for ptype in CEPH_PARTITIONS: - sig = 'Partition GUID code: {}'.format(ptype) - if line.startswith(sig): - return True + partitions = get_partitions(dev) + # For each line of output + for partition in partitions: + parts = partition.split() + partitions_list.append( + Partition(number=parts[0], + start=parts[1], + end=parts[2], + sectors=parts[3], + size=parts[4], + name=parts[5], + uuid=parts[6]) + ) + return partitions_list except subprocess.CalledProcessError: - pass + raise + + +def is_osd_disk(dev): + partitions = get_partition_list(dev) + for partition in partitions: + try: + info = check_output(['sgdisk', '-i', partition.number, dev]) + info = info.split("\n") # IGNORE:E1103 + for line in info: + for ptype in CEPH_PARTITIONS: + sig = 'Partition GUID code: {}'.format(ptype) + if line.startswith(sig): + return True + except subprocess.CalledProcessError as e: + log("sgdisk inspection of partition {} on {} failed with " + "error: {}. Skipping".format(partition.minor, dev, e.message), + level=ERROR) return False @@ -933,7 +1032,7 @@ def generate_monitor_secret(): '--name=mon.', '--gen-key' ] - res = subprocess.check_output(cmd) + res = check_output(cmd) return "{}==".format(res.split('=')[1].strip()) @@ -1081,8 +1180,8 @@ def create_named_keyring(entity, name, caps=None): ] for subsystem, subcaps in caps.items(): cmd.extend([subsystem, '; '.join(subcaps)]) - log("Calling subprocess.check_output: {}".format(cmd), level=DEBUG) - return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103 + log("Calling check_output: {}".format(cmd), level=DEBUG) + return parse_key(check_output(cmd).strip()) # IGNORE:E1103 def get_upgrade_key(): @@ -1099,7 +1198,7 @@ def get_named_key(name, caps=None, pool_list=None): """ try: # Does the key already exist? - output = subprocess.check_output( + output = check_output( [ 'sudo', '-u', ceph_user(), @@ -1139,8 +1238,8 @@ def get_named_key(name, caps=None, pool_list=None): pools = " ".join(['pool={0}'.format(i) for i in pool_list]) subcaps[0] = subcaps[0] + " " + pools cmd.extend([subsystem, '; '.join(subcaps)]) - log("Calling subprocess.check_output: {}".format(cmd), level=DEBUG) - return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103 + log("Calling check_output: {}".format(cmd), level=DEBUG) + return parse_key(check_output(cmd).strip()) # IGNORE:E1103 def upgrade_key_caps(key, caps): @@ -1158,7 +1257,7 @@ def upgrade_key_caps(key, caps): @cached def systemd(): - return (CompareHostReleases(lsb_release()['DISTRIB_CODENAME']) >= 'vivid') + return CompareHostReleases(lsb_release()['DISTRIB_CODENAME']) >= 'vivid' def bootstrap_monitor_cluster(secret): @@ -1232,7 +1331,7 @@ def maybe_zap_journal(journal_dev): def get_partitions(dev): cmd = ['partx', '--raw', '--noheadings', dev] try: - out = subprocess.check_output(cmd).splitlines() + out = check_output(cmd).splitlines() log("get partitions: {}".format(out), level=DEBUG) return out except subprocess.CalledProcessError as e: @@ -1316,7 +1415,7 @@ def osdize_dir(path, encrypt=False): if cmp_pkgrevno('ceph', "0.56.6") < 0: log('Unable to use directories for OSDs with ceph < 0.56.6', level=ERROR) - raise + return mkdir(path, owner=ceph_user(), group=ceph_user(), perms=0o755) chownr('/var/lib/ceph', ceph_user(), ceph_user()) @@ -1342,7 +1441,7 @@ def get_running_osds(): """Returns a list of the pids of the current running OSD daemons""" cmd = ['pgrep', 'ceph-osd'] try: - result = subprocess.check_output(cmd) + result = check_output(cmd) return result.split() except subprocess.CalledProcessError: return [] @@ -1358,9 +1457,9 @@ def get_cephfs(service): # This command wasn't introduced until 0.86 ceph return [] try: - output = subprocess.check_output(["ceph", - '--id', service, - "fs", "ls"]) + output = check_output(["ceph", + '--id', service, + "fs", "ls"]) if not output: return [] """ @@ -1485,7 +1584,7 @@ def upgrade_monitor(new_version): service_stop('ceph-mon@{}'.format(mon_id)) else: service_stop('ceph-mon-all') - apt_install(packages=PACKAGES, fatal=True) + apt_install(packages=determine_packages(), fatal=True) # Ensure the files and directories under /var/lib/ceph is chowned # properly as part of the move to the Jewel release, which moved the @@ -1538,6 +1637,7 @@ def lock_and_roll(upgrade_key, service, my_name, version): my_name, version, stop_timestamp)) + status_set('maintenance', 'Finishing upgrade') monitor_key_set(upgrade_key, "{}_{}_{}_done".format(service, my_name, version), @@ -1660,42 +1760,198 @@ def upgrade_osd(new_version): add_source(config('source'), config('key')) apt_update(fatal=True) except subprocess.CalledProcessError as err: - log("Adding the ceph source failed with message: {}".format( + log("Adding the ceph sources failed with message: {}".format( err.message)) status_set("blocked", "Upgrade to {} failed".format(new_version)) sys.exit(1) + try: - if systemd(): - for osd_id in get_local_osd_ids(): - service_stop('ceph-osd@{}'.format(osd_id)) - else: - service_stop('ceph-osd-all') - apt_install(packages=PACKAGES, fatal=True) + # Upgrade the packages before restarting the daemons. + status_set('maintenance', 'Upgrading packages to %s' % new_version) + apt_install(packages=determine_packages(), fatal=True) - # Ensure the files and directories under /var/lib/ceph is chowned - # properly as part of the move to the Jewel release, which moved the - # ceph daemons to running as ceph:ceph instead of root:root. Only do - # it when necessary as this is an expensive operation to run. - if new_version == 'jewel': - owner = ceph_user() - status_set('maintenance', 'Updating file ownership for OSDs') - chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), - owner=owner, - group=owner, - follow_links=True) + # If the upgrade does not need an ownership update of any of the + # directories in the osd service directory, then simply restart + # all of the OSDs at the same time as this will be the fastest + # way to update the code on the node. + if not dirs_need_ownership_update('osd'): + log('Restarting all OSDs to load new binaries', DEBUG) + service_restart('ceph-osd-all') + return - if systemd(): - for osd_id in get_local_osd_ids(): - service_start('ceph-osd@{}'.format(osd_id)) - else: - service_start('ceph-osd-all') - except subprocess.CalledProcessError as err: + # Need to change the ownership of all directories which are not OSD + # directories as well. + # TODO - this should probably be moved to the general upgrade function + # and done before mon/osd. + update_owner(CEPH_BASE_DIR, recurse_dirs=False) + non_osd_dirs = filter(lambda x: not x == 'osd', + os.listdir(CEPH_BASE_DIR)) + non_osd_dirs = map(lambda x: os.path.join(CEPH_BASE_DIR, x), + non_osd_dirs) + for path in non_osd_dirs: + update_owner(path) + + # Fast service restart wasn't an option because each of the OSD + # directories need the ownership updated for all the files on + # the OSD. Walk through the OSDs one-by-one upgrading the OSD. + for osd_dir in _get_child_dirs(OSD_BASE_DIR): + try: + osd_num = _get_osd_num_from_dirname(osd_dir) + _upgrade_single_osd(osd_num, osd_dir) + except ValueError as ex: + # Directory could not be parsed - junk directory? + log('Could not parse osd directory %s: %s' % (osd_dir, ex), + WARNING) + continue + + except (subprocess.CalledProcessError, IOError) as err: log("Stopping ceph and upgrading packages failed " "with message: {}".format(err.message)) status_set("blocked", "Upgrade to {} failed".format(new_version)) sys.exit(1) +def _upgrade_single_osd(osd_num, osd_dir): + """Upgrades the single OSD directory. + + :param osd_num: the num of the OSD + :param osd_dir: the directory of the OSD to upgrade + :raises CalledProcessError: if an error occurs in a command issued as part + of the upgrade process + :raises IOError: if an error occurs reading/writing to a file as part + of the upgrade process + """ + stop_osd(osd_num) + disable_osd(osd_num) + update_owner(osd_dir) + enable_osd(osd_num) + start_osd(osd_num) + + +def stop_osd(osd_num): + """Stops the specified OSD number. + + :param osd_num: the osd number to stop + """ + if systemd(): + service_stop('ceph-osd@{}'.format(osd_num)) + else: + service_stop('ceph-osd', id=osd_num) + + +def start_osd(osd_num): + """Starts the specified OSD number. + + :param osd_num: the osd number to start. + """ + if systemd(): + service_start('ceph-osd@{}'.format(osd_num)) + else: + service_start('ceph-osd', id=osd_num) + + +def disable_osd(osd_num): + """Disables the specified OSD number. + + Ensures that the specified osd will not be automatically started at the + next reboot of the system. Due to differences between init systems, + this method cannot make any guarantees that the specified osd cannot be + started manually. + + :param osd_num: the osd id which should be disabled. + :raises CalledProcessError: if an error occurs invoking the systemd cmd + to disable the OSD + :raises IOError, OSError: if the attempt to read/remove the ready file in + an upstart enabled system fails + """ + if systemd(): + # When running under systemd, the individual ceph-osd daemons run as + # templated units and can be directly addressed by referring to the + # templated service name ceph-osd@. Additionally, systemd + # allows one to disable a specific templated unit by running the + # 'systemctl disable ceph-osd@' command. When disabled, the + # OSD should remain disabled until re-enabled via systemd. + # Note: disabling an already disabled service in systemd returns 0, so + # no need to check whether it is enabled or not. + cmd = ['systemctl', 'disable', 'ceph-osd@{}'.format(osd_num)] + subprocess.check_call(cmd) + else: + # Neither upstart nor the ceph-osd upstart script provides for + # disabling the starting of an OSD automatically. The specific OSD + # cannot be prevented from running manually, however it can be + # prevented from running automatically on reboot by removing the + # 'ready' file in the OSD's root directory. This is due to the + # ceph-osd-all upstart script checking for the presence of this file + # before starting the OSD. + ready_file = os.path.join(OSD_BASE_DIR, 'ceph-{}'.format(osd_num), + 'ready') + if os.path.exists(ready_file): + os.unlink(ready_file) + + +def enable_osd(osd_num): + """Enables the specified OSD number. + + Ensures that the specified osd_num will be enabled and ready to start + automatically in the event of a reboot. + + :param osd_num: the osd id which should be enabled. + :raises CalledProcessError: if the call to the systemd command issued + fails when enabling the service + :raises IOError: if the attempt to write the ready file in an usptart + enabled system fails + """ + if systemd(): + cmd = ['systemctl', 'enable', 'ceph-osd@{}'.format(osd_num)] + subprocess.check_call(cmd) + else: + # When running on upstart, the OSDs are started via the ceph-osd-all + # upstart script which will only start the osd if it has a 'ready' + # file. Make sure that file exists. + ready_file = os.path.join(OSD_BASE_DIR, 'ceph-{}'.format(osd_num), + 'ready') + with open(ready_file, 'w') as f: + f.write('ready') + + # Make sure the correct user owns the file. It shouldn't be necessary + # as the upstart script should run with root privileges, but its better + # to have all the files matching ownership. + update_owner(ready_file) + + +def update_owner(path, recurse_dirs=True): + """Changes the ownership of the specified path. + + Changes the ownership of the specified path to the new ceph daemon user + using the system's native chown functionality. This may take awhile, + so this method will issue a set_status for any changes of ownership which + recurses into directory structures. + + :param path: the path to recursively change ownership for + :param recurse_dirs: boolean indicating whether to recursively change the + ownership of all the files in a path's subtree or to + simply change the ownership of the path. + :raises CalledProcessError: if an error occurs issuing the chown system + command + """ + user = ceph_user() + user_group = '{ceph_user}:{ceph_user}'.format(ceph_user=user) + cmd = ['chown', user_group, path] + if os.path.isdir(path) and recurse_dirs: + status_set('maintenance', ('Updating ownership of %s to %s' % + (path, user))) + cmd.insert(1, '-R') + + log('Changing ownership of {path} to {user}'.format( + path=path, user=user_group), DEBUG) + start = datetime.now() + subprocess.check_call(cmd) + elapsed_time = (datetime.now() - start) + + log('Took {secs} seconds to change the ownership of path: {path}'.format( + secs=elapsed_time.total_seconds(), path=path), DEBUG) + + def list_pools(service): """ This will list the current pools that Ceph has @@ -1706,7 +1962,7 @@ def list_pools(service): """ try: pool_list = [] - pools = subprocess.check_output(['rados', '--id', service, 'lspools']) + pools = check_output(['rados', '--id', service, 'lspools']) for pool in pools.splitlines(): pool_list.append(pool) return pool_list @@ -1714,6 +1970,36 @@ def list_pools(service): log("rados lspools failed with error: {}".format(err.output)) raise + +def dirs_need_ownership_update(service): + """Determines if directories still need change of ownership. + + Examines the set of directories under the /var/lib/ceph/{service} directory + and determines if they have the correct ownership or not. This is + necessary due to the upgrade from Hammer to Jewel where the daemon user + changes from root: to ceph:. + + :param service: the name of the service folder to check (e.g. osd, mon) + :return: boolean. True if the directories need a change of ownership, + False otherwise. + :raises IOError: if an error occurs reading the file stats from one of + the child directories. + :raises OSError: if the specified path does not exist or some other error + """ + expected_owner = expected_group = ceph_user() + path = os.path.join(CEPH_BASE_DIR, service) + for child in _get_child_dirs(path): + curr_owner, curr_group = owner(child) + + if (curr_owner == expected_owner) and (curr_group == expected_group): + continue + + log('Directory "%s" needs its ownership updated' % child, DEBUG) + return True + + # All child directories had the expected ownership + return False + # A dict of valid ceph upgrade paths. Mapping is old -> new UPGRADE_PATHS = { 'firefly': 'hammer', @@ -1748,3 +2034,86 @@ def resolve_ceph_version(source): ''' os_release = get_os_codename_install_source(source) return UCA_CODENAME_MAP.get(os_release) + + +def get_ceph_pg_stat(): + """ + Returns the result of ceph pg stat + :return: dict + """ + try: + tree = check_output(['ceph', 'pg', 'stat', '--format=json']) + try: + json_tree = json.loads(tree) + if not json_tree['num_pg_by_state']: + return None + return json_tree + except ValueError as v: + log("Unable to parse ceph pg stat json: {}. Error: {}".format( + tree, v.message)) + raise + except subprocess.CalledProcessError as e: + log("ceph pg stat command failed with message: {}".format( + e.message)) + raise + + +def get_ceph_health(): + """ + Returns the health of the cluster from a 'ceph health' + :return: dict + Also raises CalledProcessError if our ceph command fails + To get the overall status, use get_ceph_health()['overall_status'] + """ + try: + tree = check_output( + ['ceph', 'health', '--format=json']) + try: + json_tree = json.loads(tree) + # Make sure children are present in the json + if not json_tree['overall_status']: + return None + return json_tree + except ValueError as v: + log("Unable to parse ceph tree json: {}. Error: {}".format( + tree, v.message)) + raise + except subprocess.CalledProcessError as e: + log("ceph osd tree command failed with message: {}".format( + e.message)) + raise + + +def reweight_osd(osd_num, new_weight): + """ + Changes the crush weight of an OSD to the value specified. + :param osd_num: the osd id which should be changed + :param new_weight: the new weight for the OSD + :returns: bool. True if output looks right, else false. + :raises CalledProcessError: if an error occurs invoking the systemd cmd + """ + try: + cmd_result = subprocess.check_output( + ['ceph', 'osd', 'crush', 'reweight', "osd.{}".format(osd_num), + new_weight], stderr=subprocess.STDOUT) + expected_result = "reweighted item id {ID} name \'osd.{ID}\'".format( + ID=osd_num) + " to {}".format(new_weight) + log(cmd_result) + if expected_result in cmd_result: + return True + return False + except subprocess.CalledProcessError as e: + log("ceph osd tree command failed with message: {}".format( + e.message)) + raise + + +def determine_packages(): + ''' + Determines packages for installation. + + @returns: list of ceph packages + ''' + if is_container(): + PACKAGES.remove('ntp') + return PACKAGES diff --git a/lib/ceph/ceph_broker.py b/lib/ceph/ceph_broker.py index 8a99805..1f6db8c 100644 --- a/lib/ceph/ceph_broker.py +++ b/lib/ceph/ceph_broker.py @@ -28,7 +28,7 @@ from ceph import ( get_cephfs, get_osd_weight ) -from ceph_helpers import Crushmap +from ceph.ceph_helpers import Crushmap from charmhelpers.contrib.storage.linux.ceph import ( create_erasure_profile, delete_pool, @@ -168,6 +168,8 @@ def handle_add_permissions_to_key(request, service): A group can optionally have a namespace defined that will be used to further restrict pool access. """ + resp = {'exit-code': 0} + service_name = request.get('name') group_name = request.get('group') group_namespace = request.get('group-namespace') @@ -190,6 +192,8 @@ def handle_add_permissions_to_key(request, service): group_namespace) update_service_permissions(service_name, service_obj, group_namespace) + return resp + def update_service_permissions(service, service_obj=None, namespace=None): """Update the key permissions for the named client in Ceph""" diff --git a/lib/ceph/ceph_helpers.py b/lib/ceph/ceph_helpers.py index 8e5c807..11f5dd8 100644 --- a/lib/ceph/ceph_helpers.py +++ b/lib/ceph/ceph_helpers.py @@ -36,7 +36,11 @@ import uuid import re import subprocess -from subprocess import (check_call, check_output, CalledProcessError, ) +from subprocess import ( + check_call, + check_output as s_check_output, + CalledProcessError, +) from charmhelpers.core.hookenv import (config, local_unit, relation_get, @@ -111,6 +115,15 @@ DEFAULT_POOL_WEIGHT = 10.0 LEGACY_PG_COUNT = 200 +def check_output(*args, **kwargs): + ''' + Helper wrapper for py2/3 compat with subprocess.check_output + + @returns str: UTF-8 decoded representation of output + ''' + return s_check_output(*args, **kwargs).decode('UTF-8') + + def validator(value, valid_type, valid_range=None): """ Used to validate these: http://docs.ceph.com/docs/master/rados/operations/ @@ -188,7 +201,7 @@ class Crushmap(object): stdout=subprocess.PIPE) return subprocess.check_output( ('crushtool', '-d', '-'), - stdin=crush.stdout).decode('utf-8') + stdin=crush.stdout) except Exception as e: log("load_crushmap error: {}".format(e)) raise "Failed to read Crushmap" @@ -565,7 +578,8 @@ def monitor_key_delete(service, key): :param key: six.string_types. The key to delete. """ try: - check_output(['ceph', '--id', service, 'config-key', 'del', str(key)]) + check_output(['ceph', '--id', service, + 'config-key', 'del', str(key)]) except CalledProcessError as e: log("Monitor config-key put failed with message: {}".format(e.output)) raise @@ -867,8 +881,7 @@ def get_cache_mode(service, pool_name): def pool_exists(service, name): """Check to see if a RADOS pool already exists.""" try: - out = check_output(['rados', '--id', service, 'lspools']).decode( - 'UTF-8') + out = check_output(['rados', '--id', service, 'lspools']) except CalledProcessError: return False @@ -882,7 +895,7 @@ def get_osds(service): version = ceph_version() if version and version >= '0.56': return json.loads(check_output(['ceph', '--id', service, 'osd', 'ls', - '--format=json']).decode('UTF-8')) + '--format=json'])) return None @@ -900,7 +913,7 @@ def rbd_exists(service, pool, rbd_img): """Check to see if a RADOS block device exists.""" try: out = check_output(['rbd', 'list', '--id', service, '--pool', pool - ]).decode('UTF-8') + ]) except CalledProcessError: return False @@ -1025,7 +1038,7 @@ def configure(service, key, auth, use_syslog): def image_mapped(name): """Determine whether a RADOS block device is mapped locally.""" try: - out = check_output(['rbd', 'showmapped']).decode('UTF-8') + out = check_output(['rbd', 'showmapped']) except CalledProcessError: return False @@ -1212,7 +1225,7 @@ def ceph_version(): """Retrieve the local version of ceph.""" if os.path.exists('/usr/bin/ceph'): cmd = ['ceph', '-v'] - output = check_output(cmd).decode('US-ASCII') + output = check_output(cmd) output = output.split() if len(output) > 3: return output[2] diff --git a/lib/setup.py b/lib/setup.py new file mode 100644 index 0000000..139c80d --- /dev/null +++ b/lib/setup.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +from __future__ import print_function + +import os +import sys +from setuptools import setup, find_packages +from setuptools.command.test import test as TestCommand + +version = "0.0.1.dev1" +install_require = [ +] + +tests_require = [ + 'tox >= 2.3.1', +] + + +class Tox(TestCommand): + + user_options = [('tox-args=', 'a', "Arguments to pass to tox")] + + def initialize_options(self): + TestCommand.initialize_options(self) + self.tox_args = None + + def finalize_options(self): + TestCommand.finalize_options(self) + self.test_args = [] + self.test_suite = True + + def run_tests(self): + # import here, cause outside the eggs aren't loaded + import tox + import shlex + args = self.tox_args + # remove the 'test' arg from argv as tox passes it to ostestr which + # breaks it. + sys.argv.pop() + if args: + args = shlex.split(self.tox_args) + errno = tox.cmdline(args=args) + sys.exit(errno) + + +if sys.argv[-1] == 'publish': + os.system("python setup.py sdist upload") + os.system("python setup.py bdist_wheel upload") + sys.exit() + + +if sys.argv[-1] == 'tag': + os.system("git tag -a %s -m 'version %s'" % (version, version)) + os.system("git push --tags") + sys.exit() + + +setup( + name='charms.ceph', + version=version, + description='Provide base module for ceph charms.', + classifiers=[ + "Development Status :: 2 - Pre-Alpha", + "Intended Audience :: Developers", + "Topic :: System", + "Topic :: System :: Installation/Setup", + "Topic :: System :: Software Distribution", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.5", + "License :: OSI Approved :: Apache Software License", + ], + url='https://github.com/openstack/charms.ceph', + author='OpenStack Charmers', + author_email='openstack-dev@lists.openstack.org', + license='Apache-2.0: http://www.apache.org/licenses/LICENSE-2.0', + packages=find_packages(exclude=["unit_tests"]), + zip_safe=False, + cmdclass={'test': Tox}, + install_requires=install_require, + extras_require={ + 'testing': tests_require, + }, + tests_require=tests_require, +)