From 801f8538fc8d61d3f85e73a8847d351fe84e7655 Mon Sep 17 00:00:00 2001 From: Chris Holcombe Date: Tue, 6 Sep 2016 15:31:04 -0700 Subject: [PATCH] Move upgrade code to shared lib Moving the ceph mon upgrade code over to the ceph shared library. This will make it easier to make patches and have them be applied to all 3 charms at once. Change-Id: I541269d05e6ff8883233a21c78ebe9df89b9e797 --- hooks/ceph_hooks.py | 174 +----------- lib/ceph/__init__.py | 458 ++++++++++++++++++++++++++++---- lib/ceph/ceph_broker.py | 148 ++++++++++- unit_tests/test_upgrade.py | 35 +++ unit_tests/test_upgrade_roll.py | 179 ------------- 5 files changed, 591 insertions(+), 403 deletions(-) create mode 100644 unit_tests/test_upgrade.py delete mode 100644 unit_tests/test_upgrade_roll.py diff --git a/hooks/ceph_hooks.py b/hooks/ceph_hooks.py index 24c2b4e5..3d04ee81 100755 --- a/hooks/ceph_hooks.py +++ b/hooks/ceph_hooks.py @@ -15,12 +15,9 @@ # limitations under the License. import os import shutil -import random -import subprocess import sys import tempfile import socket -import time import netifaces sys.path.append('lib') @@ -45,10 +42,7 @@ from charmhelpers.core.hookenv import ( from charmhelpers.core.host import ( umount, mkdir, - cmp_pkgrevno, - service_stop, - service_start, - chownr) + cmp_pkgrevno) from charmhelpers.fetch import ( add_source, apt_install, @@ -75,10 +69,7 @@ from charmhelpers.contrib.network.ip import ( format_ipv6_addr, ) from charmhelpers.contrib.storage.linux.ceph import ( - CephConfContext, - monitor_key_set, - monitor_key_exists, - monitor_key_get) + CephConfContext) from charmhelpers.contrib.charmsupport import nrpe from charmhelpers.contrib.hardening.harden import harden @@ -120,7 +111,8 @@ def check_for_upgrade(): if new_version == upgrade_paths[old_version]: log("{} to {} is a valid upgrade path. Proceeding.".format( old_version, new_version)) - roll_osd_cluster(new_version) + ceph.roll_osd_cluster(new_version=new_version, + upgrade_key='osd-upgrade') else: # Log a helpful error message log("Invalid upgrade path from {} to {}. " @@ -129,164 +121,6 @@ def check_for_upgrade(): pretty_print_upgrade_paths())) -def lock_and_roll(my_name, version): - start_timestamp = time.time() - - log('monitor_key_set {}_start {}'.format( - my_name, - version, - start_timestamp)) - monitor_key_set('osd-upgrade', "{}_{}_start".format(my_name, version), - start_timestamp) - log("Rolling") - # This should be quick - upgrade_osd() - log("Done") - - stop_timestamp = time.time() - # Set a key to inform others I am finished - log('monitor_key_set {}_{}_done {}'.format(my_name, - version, - stop_timestamp)) - monitor_key_set('osd-upgrade', "{}_{}_done".format(my_name, version), - stop_timestamp) - - -def wait_on_previous_node(previous_node, version): - log("Previous node is: {}".format(previous_node)) - - previous_node_finished = monitor_key_exists( - 'osd-upgrade', - "{}_{}_done".format(previous_node, version)) - - while previous_node_finished is False: - log("{} is not finished. Waiting".format(previous_node)) - # Has this node been trying to upgrade for longer than - # 10 minutes? - # If so then move on and consider that node dead. - - # NOTE: This assumes the clusters clocks are somewhat accurate - # If the hosts clock is really far off it may cause it to skip - # the previous node even though it shouldn't. - current_timestamp = time.time() - previous_node_start_time = monitor_key_get( - 'osd-upgrade', - "{}_{}_start".format(previous_node, version)) - if (current_timestamp - (10 * 60)) > previous_node_start_time: - # Previous node is probably dead. Lets move on - if previous_node_start_time is not None: - log( - "Waited 10 mins on node {}. current time: {} > " - "previous node start time: {} Moving on".format( - previous_node, - (current_timestamp - (10 * 60)), - previous_node_start_time)) - return - else: - # I have to wait. Sleep a random amount of time and then - # check if I can lock,upgrade and roll. - wait_time = random.randrange(5, 30) - log('waiting for {} seconds'.format(wait_time)) - time.sleep(wait_time) - previous_node_finished = monitor_key_exists( - 'osd-upgrade', - "{}_{}_done".format(previous_node, version)) - - -def get_upgrade_position(osd_sorted_list, match_name): - for index, item in enumerate(osd_sorted_list): - if item.name == match_name: - return index - return None - - -# Edge cases: -# 1. Previous node dies on upgrade, can we retry? -# 2. This assumes that the osd failure domain is not set to osd. -# It rolls an entire server at a time. -def roll_osd_cluster(new_version): - """ - This is tricky to get right so here's what we're going to do. - There's 2 possible cases: Either I'm first in line or not. - If I'm not first in line I'll wait a random time between 5-30 seconds - and test to see if the previous osd is upgraded yet. - - TODO: If you're not in the same failure domain it's safe to upgrade - 1. Examine all pools and adopt the most strict failure domain policy - Example: Pool 1: Failure domain = rack - Pool 2: Failure domain = host - Pool 3: Failure domain = row - - outcome: Failure domain = host - """ - log('roll_osd_cluster called with {}'.format(new_version)) - my_name = socket.gethostname() - osd_tree = ceph.get_osd_tree(service='osd-upgrade') - # A sorted list of osd unit names - osd_sorted_list = sorted(osd_tree) - log("osd_sorted_list: {}".format(osd_sorted_list)) - - try: - position = get_upgrade_position(osd_sorted_list, my_name) - log("upgrade position: {}".format(position)) - if position == 0: - # I'm first! Roll - # First set a key to inform others I'm about to roll - lock_and_roll(my_name=my_name, version=new_version) - else: - # Check if the previous node has finished - status_set('blocked', - 'Waiting on {} to finish upgrading'.format( - osd_sorted_list[position - 1].name)) - wait_on_previous_node( - previous_node=osd_sorted_list[position - 1].name, - version=new_version) - lock_and_roll(my_name=my_name, version=new_version) - except ValueError: - log("Failed to find name {} in list {}".format( - my_name, osd_sorted_list)) - status_set('blocked', 'failed to upgrade osd') - - -def upgrade_osd(): - current_version = ceph.get_version() - status_set("maintenance", "Upgrading osd") - log("Current ceph version is {}".format(current_version)) - new_version = config('release-version') - log("Upgrading to: {}".format(new_version)) - - try: - add_source(config('source'), config('key')) - apt_update(fatal=True) - except subprocess.CalledProcessError as err: - log("Adding the ceph source failed with message: {}".format( - err.message)) - status_set("blocked", "Upgrade to {} failed".format(new_version)) - sys.exit(1) - try: - if ceph.systemd(): - for osd_id in ceph.get_local_osd_ids(): - service_stop('ceph-osd@{}'.format(osd_id)) - else: - service_stop('ceph-osd-all') - apt_install(packages=ceph.PACKAGES, fatal=True) - - # Ensure the ownership of Ceph's directories is correct - chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), - owner=ceph.ceph_user(), - group=ceph.ceph_user()) - if ceph.systemd(): - for osd_id in ceph.get_local_osd_ids(): - service_start('ceph-osd@{}'.format(osd_id)) - else: - service_start('ceph-osd-all') - except subprocess.CalledProcessError as err: - log("Stopping ceph and upgrading packages failed " - "with message: {}".format(err.message)) - status_set("blocked", "Upgrade to {} failed".format(new_version)) - sys.exit(1) - - def tune_network_adapters(): interfaces = netifaces.interfaces() for interface in interfaces: diff --git a/lib/ceph/__init__.py b/lib/ceph/__init__.py index 4b68e039..522e0876 100644 --- a/lib/ceph/__init__.py +++ b/lib/ceph/__init__.py @@ -12,7 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import ctypes +import collections import json +import random +import socket import subprocess import time import os @@ -22,38 +25,38 @@ import errno import shutil from charmhelpers.core import hookenv - from charmhelpers.core.host import ( mkdir, chownr, service_restart, lsb_release, - cmp_pkgrevno, service_stop, mounts) + cmp_pkgrevno, service_stop, mounts, service_start) from charmhelpers.core.hookenv import ( log, ERROR, cached, status_set, - WARNING, DEBUG) + WARNING, DEBUG, config) from charmhelpers.core.services import render_template from charmhelpers.fetch import ( - apt_cache -) - + apt_cache, + add_source, apt_install, apt_update) +from charmhelpers.contrib.storage.linux.ceph import ( + monitor_key_set, + monitor_key_exists, + monitor_key_get, + get_mon_map) from charmhelpers.contrib.storage.linux.utils import ( is_block_device, zap_disk, is_device_mounted) -from utils import ( - get_unit_hostname, -) - LEADER = 'leader' PEON = 'peon' QUORUM = [LEADER, PEON] -PACKAGES = ['ceph', 'gdisk', 'ntp', 'btrfs-tools', 'python-ceph', 'xfsprogs'] +PACKAGES = ['ceph', 'gdisk', 'ntp', 'btrfs-tools', 'python-ceph', + 'radosgw', 'xfsprogs'] LinkSpeed = { "BASE_10": 10, @@ -567,7 +570,7 @@ def error_out(msg): def is_quorum(): - asok = "/var/run/ceph/ceph-mon.{}.asok".format(get_unit_hostname()) + asok = "/var/run/ceph/ceph-mon.{}.asok".format(socket.gethostname()) cmd = [ "sudo", "-u", @@ -594,7 +597,7 @@ def is_quorum(): def is_leader(): - asok = "/var/run/ceph/ceph-mon.{}.asok".format(get_unit_hostname()) + asok = "/var/run/ceph/ceph-mon.{}.asok".format(socket.gethostname()) cmd = [ "sudo", "-u", @@ -627,7 +630,7 @@ def wait_for_quorum(): def add_bootstrap_hint(peer): - asok = "/var/run/ceph/ceph-mon.{}.asok".format(get_unit_hostname()) + asok = "/var/run/ceph/ceph-mon.{}.asok".format(socket.gethostname()) cmd = [ "sudo", "-u", @@ -921,41 +924,51 @@ _upgrade_caps = { } -def get_radosgw_key(): - return get_named_key('radosgw.gateway', _radosgw_caps) +def get_radosgw_key(pool_list): + return get_named_key(name='radosgw.gateway', + caps=_radosgw_caps, + pool_list=pool_list) -_default_caps = { - 'mon': ['allow rw'], - 'osd': ['allow rwx'] -} - -admin_caps = { - 'mds': ['allow'], - 'mon': ['allow *'], - 'osd': ['allow *'] -} - -osd_upgrade_caps = { - 'mon': ['allow command "config-key"', - 'allow command "osd tree"', - 'allow command "config-key list"', - 'allow command "config-key put"', - 'allow command "config-key get"', - 'allow command "config-key exists"', - 'allow command "osd out"', - 'allow command "osd in"', - 'allow command "osd rm"', - 'allow command "auth del"', - ] -} +def get_mds_key(name): + return create_named_keyring(entity='mds', + name=name, + caps=mds_caps) -def get_upgrade_key(): - return get_named_key('upgrade-osd', _upgrade_caps) +_default_caps = collections.OrderedDict([ + ('mon', ['allow rw']), + ('osd', ['allow rwx']), +]) + +admin_caps = collections.OrderedDict([ + ('mds', ['allow *']), + ('mon', ['allow *']), + ('osd', ['allow *']) +]) + +mds_caps = collections.OrderedDict([ + ('osd', ['allow *']), + ('mds', ['allow']), + ('mon', ['allow rwx']), +]) + +osd_upgrade_caps = collections.OrderedDict([ + ('mon', ['allow command "config-key"', + 'allow command "osd tree"', + 'allow command "config-key list"', + 'allow command "config-key put"', + 'allow command "config-key get"', + 'allow command "config-key exists"', + 'allow command "osd out"', + 'allow command "osd in"', + 'allow command "osd rm"', + 'allow command "auth del"', + ]) +]) -def get_named_key(name, caps=None): +def create_named_keyring(entity, name, caps=None): caps = caps or _default_caps cmd = [ "sudo", @@ -965,16 +978,52 @@ def get_named_key(name, caps=None): '--name', 'mon.', '--keyring', '/var/lib/ceph/mon/ceph-{}/keyring'.format( - get_unit_hostname() + socket.gethostname() + ), + 'auth', 'get-or-create', '{entity}.{name}'.format(entity=entity, + name=name), + ] + for subsystem, subcaps in caps.items(): + cmd.extend([subsystem, '; '.join(subcaps)]) + log("Calling subprocess.check_output: {}".format(cmd), level=DEBUG) + return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103 + + +def get_upgrade_key(): + return get_named_key('upgrade-osd', _upgrade_caps) + + +def get_named_key(name, caps=None, pool_list=None): + """ + Retrieve a specific named cephx key + :param name: String Name of key to get. + :param pool_list: The list of pools to give access to + :param caps: dict of cephx capabilities + :return: Returns a cephx key + """ + caps = caps or _default_caps + cmd = [ + "sudo", + "-u", + ceph_user(), + 'ceph', + '--name', 'mon.', + '--keyring', + '/var/lib/ceph/mon/ceph-{}/keyring'.format( + socket.gethostname() ), 'auth', 'get-or-create', 'client.{}'.format(name), ] # Add capabilities - for subsystem, subcaps in caps.iteritems(): - cmd.extend([ - subsystem, - '; '.join(subcaps), - ]) + for subsystem, subcaps in caps.items(): + if subsystem == 'osd': + if pool_list: + # This will output a string similar to: + # "pool=rgw pool=rbd pool=something" + pools = " ".join(['pool={0}'.format(i) for i in pool_list]) + subcaps[0] = subcaps[0] + " " + pools + cmd.extend([subsystem, '; '.join(subcaps)]) + log("Calling subprocess.check_output: {}".format(cmd), level=DEBUG) return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103 @@ -986,7 +1035,7 @@ def upgrade_key_caps(key, caps): cmd = [ "sudo", "-u", ceph_user(), 'ceph', 'auth', 'caps', key ] - for subsystem, subcaps in caps.iteritems(): + for subsystem, subcaps in caps.items(): cmd.extend([subsystem, '; '.join(subcaps)]) subprocess.check_call(cmd) @@ -997,7 +1046,7 @@ def systemd(): def bootstrap_monitor_cluster(secret): - hostname = get_unit_hostname() + hostname = socket.gethostname() path = '/var/lib/ceph/mon/ceph-{}'.format(hostname) done = '{}/done'.format(path) if systemd(): @@ -1042,7 +1091,7 @@ def bootstrap_monitor_cluster(secret): def update_monfs(): - hostname = get_unit_hostname() + hostname = socket.gethostname() monfs = '/var/lib/ceph/mon/ceph-{}'.format(hostname) if systemd(): init_marker = '{}/systemd'.format(monfs) @@ -1181,3 +1230,308 @@ def get_running_osds(): return result.split() except subprocess.CalledProcessError: return [] + + +def wait_for_all_monitors_to_upgrade(new_version, upgrade_key): + """ + Fairly self explanatory name. This function will wait + for all monitors in the cluster to upgrade or it will + return after a timeout period has expired. + :param new_version: str of the version to watch + :param upgrade_key: the cephx key name to use + """ + done = False + start_time = time.time() + monitor_list = [] + + mon_map = get_mon_map('admin') + if mon_map['monmap']['mons']: + for mon in mon_map['monmap']['mons']: + monitor_list.append(mon['name']) + while not done: + try: + done = all(monitor_key_exists(upgrade_key, "{}_{}_{}_done".format( + "mon", mon, new_version + )) for mon in monitor_list) + current_time = time.time() + if current_time > (start_time + 10 * 60): + raise Exception + else: + # Wait 30 seconds and test again if all monitors are upgraded + time.sleep(30) + except subprocess.CalledProcessError: + raise + + +# Edge cases: +# 1. Previous node dies on upgrade, can we retry? +def roll_monitor_cluster(new_version, upgrade_key): + """ + This is tricky to get right so here's what we're going to do. + :param new_version: str of the version to upgrade to + :param upgrade_key: the cephx key name to use when upgrading + There's 2 possible cases: Either I'm first in line or not. + If I'm not first in line I'll wait a random time between 5-30 seconds + and test to see if the previous monitor is upgraded yet. + """ + log('roll_monitor_cluster called with {}'.format(new_version)) + my_name = socket.gethostname() + monitor_list = [] + mon_map = get_mon_map('admin') + if mon_map['monmap']['mons']: + for mon in mon_map['monmap']['mons']: + monitor_list.append(mon['name']) + else: + status_set('blocked', 'Unable to get monitor cluster information') + sys.exit(1) + log('monitor_list: {}'.format(monitor_list)) + + # A sorted list of osd unit names + mon_sorted_list = sorted(monitor_list) + + try: + position = mon_sorted_list.index(my_name) + log("upgrade position: {}".format(position)) + if position == 0: + # I'm first! Roll + # First set a key to inform others I'm about to roll + lock_and_roll(upgrade_key=upgrade_key, + service='mon', + my_name=my_name, + version=new_version) + else: + # Check if the previous node has finished + status_set('blocked', + 'Waiting on {} to finish upgrading'.format( + mon_sorted_list[position - 1])) + wait_on_previous_node(upgrade_key=upgrade_key, + service='mon', + previous_node=mon_sorted_list[position - 1], + version=new_version) + lock_and_roll(upgrade_key=upgrade_key, + service='mon', + my_name=my_name, + version=new_version) + except ValueError: + log("Failed to find {} in list {}.".format( + my_name, mon_sorted_list)) + status_set('blocked', 'failed to upgrade monitor') + + +def upgrade_monitor(): + current_version = get_version() + status_set("maintenance", "Upgrading monitor") + log("Current ceph version is {}".format(current_version)) + new_version = config('release-version') + log("Upgrading to: {}".format(new_version)) + + try: + add_source(config('source'), config('key')) + apt_update(fatal=True) + except subprocess.CalledProcessError as err: + log("Adding the ceph source failed with message: {}".format( + err.message)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) + try: + if systemd(): + for mon_id in get_local_mon_ids(): + service_stop('ceph-mon@{}'.format(mon_id)) + else: + service_stop('ceph-mon-all') + apt_install(packages=PACKAGES, fatal=True) + + # Ensure the ownership of Ceph's directories is correct + chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), + owner=ceph_user(), + group=ceph_user()) + if systemd(): + for mon_id in get_local_mon_ids(): + service_start('ceph-mon@{}'.format(mon_id)) + else: + service_start('ceph-mon-all') + status_set("active", "") + except subprocess.CalledProcessError as err: + log("Stopping ceph and upgrading packages failed " + "with message: {}".format(err.message)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) + + +def lock_and_roll(upgrade_key, service, my_name, version): + start_timestamp = time.time() + + log('monitor_key_set {}_{}_{}_start {}'.format( + service, + my_name, + version, + start_timestamp)) + monitor_key_set(upgrade_key, "{}_{}_{}_start".format( + service, my_name, version), start_timestamp) + log("Rolling") + + # This should be quick + if service == 'osd': + upgrade_osd() + elif service == 'mon': + upgrade_monitor() + else: + log("Unknown service {}. Unable to upgrade".format(service), + level=ERROR) + log("Done") + + stop_timestamp = time.time() + # Set a key to inform others I am finished + log('monitor_key_set {}_{}_{}_done {}'.format(service, + my_name, + version, + stop_timestamp)) + monitor_key_set(upgrade_key, "{}_{}_{}_done".format(service, + my_name, + version), + stop_timestamp) + + +def wait_on_previous_node(upgrade_key, service, previous_node, version): + log("Previous node is: {}".format(previous_node)) + + previous_node_finished = monitor_key_exists( + upgrade_key, + "{}_{}_{}_done".format(service, previous_node, version)) + + while previous_node_finished is False: + log("{} is not finished. Waiting".format(previous_node)) + # Has this node been trying to upgrade for longer than + # 10 minutes? + # If so then move on and consider that node dead. + + # NOTE: This assumes the clusters clocks are somewhat accurate + # If the hosts clock is really far off it may cause it to skip + # the previous node even though it shouldn't. + current_timestamp = time.time() + previous_node_start_time = monitor_key_get( + upgrade_key, + "{}_{}_{}_start".format(service, previous_node, version)) + if (current_timestamp - (10 * 60)) > previous_node_start_time: + # Previous node is probably dead. Lets move on + if previous_node_start_time is not None: + log( + "Waited 10 mins on node {}. current time: {} > " + "previous node start time: {} Moving on".format( + previous_node, + (current_timestamp - (10 * 60)), + previous_node_start_time)) + return + else: + # I have to wait. Sleep a random amount of time and then + # check if I can lock,upgrade and roll. + wait_time = random.randrange(5, 30) + log('waiting for {} seconds'.format(wait_time)) + time.sleep(wait_time) + previous_node_finished = monitor_key_exists( + upgrade_key, + "{}_{}_{}_done".format(service, previous_node, version)) + + +def get_upgrade_position(osd_sorted_list, match_name): + for index, item in enumerate(osd_sorted_list): + if item.name == match_name: + return index + return None + + +# Edge cases: +# 1. Previous node dies on upgrade, can we retry? +# 2. This assumes that the osd failure domain is not set to osd. +# It rolls an entire server at a time. +def roll_osd_cluster(new_version, upgrade_key): + """ + This is tricky to get right so here's what we're going to do. + :param new_version: str of the version to upgrade to + :param upgrade_key: the cephx key name to use when upgrading + There's 2 possible cases: Either I'm first in line or not. + If I'm not first in line I'll wait a random time between 5-30 seconds + and test to see if the previous osd is upgraded yet. + + TODO: If you're not in the same failure domain it's safe to upgrade + 1. Examine all pools and adopt the most strict failure domain policy + Example: Pool 1: Failure domain = rack + Pool 2: Failure domain = host + Pool 3: Failure domain = row + + outcome: Failure domain = host + """ + log('roll_osd_cluster called with {}'.format(new_version)) + my_name = socket.gethostname() + osd_tree = get_osd_tree(service=upgrade_key) + # A sorted list of osd unit names + osd_sorted_list = sorted(osd_tree) + log("osd_sorted_list: {}".format(osd_sorted_list)) + + try: + position = get_upgrade_position(osd_sorted_list, my_name) + log("upgrade position: {}".format(position)) + if position == 0: + # I'm first! Roll + # First set a key to inform others I'm about to roll + lock_and_roll(upgrade_key=upgrade_key, + service='osd', + my_name=my_name, + version=new_version) + else: + # Check if the previous node has finished + status_set('blocked', + 'Waiting on {} to finish upgrading'.format( + osd_sorted_list[position - 1].name)) + wait_on_previous_node( + upgrade_key=upgrade_key, + service='osd', + previous_node=osd_sorted_list[position - 1].name, + version=new_version) + lock_and_roll(upgrade_key=upgrade_key, + service='osd', + my_name=my_name, + version=new_version) + except ValueError: + log("Failed to find name {} in list {}".format( + my_name, osd_sorted_list)) + status_set('blocked', 'failed to upgrade osd') + + +def upgrade_osd(): + current_version = get_version() + status_set("maintenance", "Upgrading osd") + log("Current ceph version is {}".format(current_version)) + new_version = config('release-version') + log("Upgrading to: {}".format(new_version)) + + try: + add_source(config('source'), config('key')) + apt_update(fatal=True) + except subprocess.CalledProcessError as err: + log("Adding the ceph source failed with message: {}".format( + err.message)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) + try: + if systemd(): + for osd_id in get_local_osd_ids(): + service_stop('ceph-osd@{}'.format(osd_id)) + else: + service_stop('ceph-osd-all') + apt_install(packages=PACKAGES, fatal=True) + + # Ensure the ownership of Ceph's directories is correct + chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), + owner=ceph_user(), + group=ceph_user()) + if systemd(): + for osd_id in get_local_osd_ids(): + service_start('ceph-osd@{}'.format(osd_id)) + else: + service_start('ceph-osd-all') + except subprocess.CalledProcessError as err: + log("Stopping ceph and upgrading packages failed " + "with message: {}".format(err.message)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) diff --git a/lib/ceph/ceph_broker.py b/lib/ceph/ceph_broker.py index d55e570b..0ed9833e 100644 --- a/lib/ceph/ceph_broker.py +++ b/lib/ceph/ceph_broker.py @@ -15,6 +15,8 @@ # limitations under the License. import json +import os +from tempfile import NamedTemporaryFile from charmhelpers.core.hookenv import ( log, @@ -42,6 +44,8 @@ from charmhelpers.contrib.storage.linux.ceph import ( # This comes from http://docs.ceph.com/docs/master/rados/operations/pools/ # This should do a decent job of preventing people from passing in bad values. # It will give a useful error message +from subprocess import check_output, CalledProcessError + POOL_KEYS = { # "Ceph Key Name": [Python type, [Valid Range]] "size": [int], @@ -56,8 +60,8 @@ POOL_KEYS = { "write_fadvise_dontneed": [bool], "noscrub": [bool], "nodeep-scrub": [bool], - "hit_set_type": [basestring, ["bloom", "explicit_hash", - "explicit_object"]], + "hit_set_type": [str, ["bloom", "explicit_hash", + "explicit_object"]], "hit_set_count": [int, [1, 1]], "hit_set_period": [int], "hit_set_fpp": [float, [0.0, 1.0]], @@ -289,6 +293,136 @@ def handle_set_pool_value(request, service): value=params['value']) +def handle_rgw_regionmap_update(request, service): + name = request.get('client-name') + if not name: + msg = "Missing rgw-region or client-name params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + try: + check_output(['radosgw-admin', + '--id', service, + 'regionmap', 'update', '--name', name]) + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + + +def handle_rgw_regionmap_default(request, service): + region = request.get('rgw-region') + name = request.get('client-name') + if not region or not name: + msg = "Missing rgw-region or client-name params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + try: + check_output( + [ + 'radosgw-admin', + '--id', service, + 'regionmap', + 'default', + '--rgw-region', region, + '--name', name]) + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + + +def handle_rgw_zone_set(request, service): + json_file = request.get('zone-json') + name = request.get('client-name') + region_name = request.get('region-name') + zone_name = request.get('zone-name') + if not json_file or not name or not region_name or not zone_name: + msg = "Missing json-file or client-name params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + infile = NamedTemporaryFile(delete=False) + with open(infile.name, 'w') as infile_handle: + infile_handle.write(json_file) + try: + check_output( + [ + 'radosgw-admin', + '--id', service, + 'zone', + 'set', + '--rgw-zone', zone_name, + '--infile', infile.name, + '--name', name, + ] + ) + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + os.unlink(infile.name) + + +def handle_rgw_create_user(request, service): + user_id = request.get('rgw-uid') + display_name = request.get('display-name') + name = request.get('client-name') + if not name or not display_name or not user_id: + msg = "Missing client-name, display-name or rgw-uid" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + try: + create_output = check_output( + [ + 'radosgw-admin', + '--id', service, + 'user', + 'create', + '--uid', user_id, + '--display-name', display_name, + '--name', name, + '--system' + ] + ) + try: + user_json = json.loads(create_output) + return {'exit-code': 0, 'user': user_json} + except ValueError as err: + log(err, level=ERROR) + return {'exit-code': 1, 'stderr': err} + + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + + +def handle_rgw_region_set(request, service): + # radosgw-admin region set --infile us.json --name client.radosgw.us-east-1 + json_file = request.get('region-json') + name = request.get('client-name') + region_name = request.get('region-name') + zone_name = request.get('zone-name') + if not json_file or not name or not region_name or not zone_name: + msg = "Missing json-file or client-name params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + infile = NamedTemporaryFile(delete=False) + with open(infile.name, 'w') as infile_handle: + infile_handle.write(json_file) + try: + check_output( + [ + 'radosgw-admin', + '--id', service, + 'region', + 'set', + '--rgw-zone', zone_name, + '--infile', infile.name, + '--name', name, + ] + ) + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + os.unlink(infile.name) + + def process_requests_v1(reqs): """Process v1 requests. @@ -341,6 +475,16 @@ def process_requests_v1(reqs): snapshot_name=snapshot_name) elif op == "set-pool-value": ret = handle_set_pool_value(request=req, service=svc) + elif op == "rgw-region-set": + ret = handle_rgw_region_set(request=req, service=svc) + elif op == "rgw-zone-set": + ret = handle_rgw_zone_set(request=req, service=svc) + elif op == "rgw-regionmap-update": + ret = handle_rgw_regionmap_update(request=req, service=svc) + elif op == "rgw-regionmap-default": + ret = handle_rgw_regionmap_default(request=req, service=svc) + elif op == "rgw-create-user": + ret = handle_rgw_create_user(request=req, service=svc) else: msg = "Unknown operation '%s'" % op log(msg, level=ERROR) diff --git a/unit_tests/test_upgrade.py b/unit_tests/test_upgrade.py new file mode 100644 index 00000000..7b213ca4 --- /dev/null +++ b/unit_tests/test_upgrade.py @@ -0,0 +1,35 @@ +import unittest + +__author__ = 'Chris Holcombe ' + +from mock import patch, MagicMock + +from ceph_hooks import check_for_upgrade + + +def config_side_effect(*args): + if args[0] == 'source': + return 'cloud:trusty-kilo' + elif args[0] == 'key': + return 'key' + elif args[0] == 'release-version': + return 'cloud:trusty-kilo' + + +class UpgradeRollingTestCase(unittest.TestCase): + @patch('ceph_hooks.hookenv') + @patch('ceph_hooks.host') + @patch('ceph_hooks.ceph.roll_osd_cluster') + def test_check_for_upgrade(self, roll_osd_cluster, host, hookenv): + host.lsb_release.return_value = { + 'DISTRIB_CODENAME': 'trusty', + } + previous_mock = MagicMock().return_value + previous_mock.previous.return_value = "cloud:trusty-juno" + hookenv.config.side_effect = [previous_mock, + config_side_effect('source')] + check_for_upgrade() + + roll_osd_cluster.assert_called_with( + new_version='cloud:trusty-kilo', + upgrade_key='osd-upgrade') diff --git a/unit_tests/test_upgrade_roll.py b/unit_tests/test_upgrade_roll.py deleted file mode 100644 index c88ea6e1..00000000 --- a/unit_tests/test_upgrade_roll.py +++ /dev/null @@ -1,179 +0,0 @@ -# Copyright 2016 Canonical Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time - -from mock import patch, call, MagicMock - -from ceph import CrushLocation - -import test_utils -import ceph_hooks - -TO_PATCH = [ - 'apt_install', - 'apt_update', - 'add_source', - 'config', - 'ceph', - 'get_conf', - 'hookenv', - 'host', - 'log', - 'service_start', - 'service_stop', - 'socket', - 'status_set', - 'chownr', -] - - -def config_side_effect(*args): - if args[0] == 'source': - return 'cloud:trusty-kilo' - elif args[0] == 'key': - return 'key' - elif args[0] == 'release-version': - return 'cloud:trusty-kilo' - - -previous_node_start_time = time.time() - (9 * 60) - - -def monitor_key_side_effect(*args): - if args[1] == \ - 'ip-192-168-1-2_0.94.1_done': - return False - elif args[1] == \ - 'ip-192-168-1-2_0.94.1_start': - # Return that the previous node started 9 minutes ago - return previous_node_start_time - - -class UpgradeRollingTestCase(test_utils.CharmTestCase): - def setUp(self): - super(UpgradeRollingTestCase, self).setUp(ceph_hooks, TO_PATCH) - - @patch('ceph_hooks.roll_osd_cluster') - def test_check_for_upgrade(self, roll_osd_cluster): - self.host.lsb_release.return_value = { - 'DISTRIB_CODENAME': 'trusty', - } - previous_mock = MagicMock().return_value - previous_mock.previous.return_value = "cloud:trusty-juno" - self.hookenv.config.side_effect = [previous_mock, - config_side_effect('source')] - ceph_hooks.check_for_upgrade() - - roll_osd_cluster.assert_called_with('cloud:trusty-kilo') - - @patch('ceph_hooks.upgrade_osd') - @patch('ceph_hooks.monitor_key_set') - def test_lock_and_roll(self, monitor_key_set, upgrade_osd): - monitor_key_set.monitor_key_set.return_value = None - ceph_hooks.lock_and_roll(my_name='ip-192-168-1-2', - version='0.94.1') - upgrade_osd.assert_called_once_with() - - def test_upgrade_osd(self): - self.config.side_effect = config_side_effect - self.ceph.get_version.return_value = "0.80" - self.ceph.ceph_user.return_value = "ceph" - self.ceph.systemd.return_value = False - ceph_hooks.upgrade_osd() - self.service_stop.assert_called_with('ceph-osd-all') - self.service_start.assert_called_with('ceph-osd-all') - self.status_set.assert_has_calls([ - call('maintenance', 'Upgrading osd'), - ]) - self.chownr.assert_has_calls( - [ - call(group='ceph', owner='ceph', path='/var/lib/ceph') - ] - ) - - @patch('ceph_hooks.lock_and_roll') - @patch('ceph_hooks.get_upgrade_position') - def test_roll_osd_cluster_first(self, - get_upgrade_position, - lock_and_roll): - self.socket.gethostname.return_value = "ip-192-168-1-2" - self.ceph.get_osd_tree.return_value = "" - get_upgrade_position.return_value = 0 - ceph_hooks.roll_osd_cluster('0.94.1') - lock_and_roll.assert_called_with(my_name="ip-192-168-1-2", - version="0.94.1") - - @patch('ceph_hooks.lock_and_roll') - @patch('ceph_hooks.get_upgrade_position') - @patch('ceph_hooks.wait_on_previous_node') - def test_roll_osd_cluster_second(self, - wait_on_previous_node, - get_upgrade_position, - lock_and_roll): - wait_on_previous_node.return_value = None - self.socket.gethostname.return_value = "ip-192-168-1-3" - self.ceph.get_osd_tree.return_value = [ - CrushLocation( - name="ip-192-168-1-2", - identifier='a', - host='host-a', - rack='rack-a', - row='row-a', - datacenter='dc-1', - chassis='chassis-a', - root='ceph'), - CrushLocation( - name="ip-192-168-1-3", - identifier='a', - host='host-b', - rack='rack-a', - row='row-a', - datacenter='dc-1', - chassis='chassis-a', - root='ceph') - ] - get_upgrade_position.return_value = 1 - ceph_hooks.roll_osd_cluster('0.94.1') - self.status_set.assert_called_with( - 'blocked', - 'Waiting on ip-192-168-1-2 to finish upgrading') - lock_and_roll.assert_called_with(my_name="ip-192-168-1-3", - version="0.94.1") - - @patch('time.time', lambda *args: previous_node_start_time + 10 * 60 + 1) - @patch('ceph_hooks.monitor_key_get') - @patch('ceph_hooks.monitor_key_exists') - def test_wait_on_previous_node(self, - monitor_key_exists, - monitor_key_get): - monitor_key_get.side_effect = monitor_key_side_effect - monitor_key_exists.return_value = False - - ceph_hooks.wait_on_previous_node(previous_node="ip-192-168-1-2", - version='0.94.1') - - # Make sure we checked to see if the previous node started - monitor_key_get.assert_has_calls( - [call('osd-upgrade', 'ip-192-168-1-2_0.94.1_start')] - ) - # Make sure we checked to see if the previous node was finished - monitor_key_exists.assert_has_calls( - [call('osd-upgrade', 'ip-192-168-1-2_0.94.1_done')] - ) - # Make sure we waited at last once before proceeding - self.log.assert_has_calls( - [call('Previous node is: ip-192-168-1-2')], - [call('ip-192-168-1-2 is not finished. Waiting')], - )