diff --git a/hooks/ceph_hooks.py b/hooks/ceph_hooks.py index 24c2b4e5..3d04ee81 100755 --- a/hooks/ceph_hooks.py +++ b/hooks/ceph_hooks.py @@ -15,12 +15,9 @@ # limitations under the License. import os import shutil -import random -import subprocess import sys import tempfile import socket -import time import netifaces sys.path.append('lib') @@ -45,10 +42,7 @@ from charmhelpers.core.hookenv import ( from charmhelpers.core.host import ( umount, mkdir, - cmp_pkgrevno, - service_stop, - service_start, - chownr) + cmp_pkgrevno) from charmhelpers.fetch import ( add_source, apt_install, @@ -75,10 +69,7 @@ from charmhelpers.contrib.network.ip import ( format_ipv6_addr, ) from charmhelpers.contrib.storage.linux.ceph import ( - CephConfContext, - monitor_key_set, - monitor_key_exists, - monitor_key_get) + CephConfContext) from charmhelpers.contrib.charmsupport import nrpe from charmhelpers.contrib.hardening.harden import harden @@ -120,7 +111,8 @@ def check_for_upgrade(): if new_version == upgrade_paths[old_version]: log("{} to {} is a valid upgrade path. Proceeding.".format( old_version, new_version)) - roll_osd_cluster(new_version) + ceph.roll_osd_cluster(new_version=new_version, + upgrade_key='osd-upgrade') else: # Log a helpful error message log("Invalid upgrade path from {} to {}. " @@ -129,164 +121,6 @@ def check_for_upgrade(): pretty_print_upgrade_paths())) -def lock_and_roll(my_name, version): - start_timestamp = time.time() - - log('monitor_key_set {}_start {}'.format( - my_name, - version, - start_timestamp)) - monitor_key_set('osd-upgrade', "{}_{}_start".format(my_name, version), - start_timestamp) - log("Rolling") - # This should be quick - upgrade_osd() - log("Done") - - stop_timestamp = time.time() - # Set a key to inform others I am finished - log('monitor_key_set {}_{}_done {}'.format(my_name, - version, - stop_timestamp)) - monitor_key_set('osd-upgrade', "{}_{}_done".format(my_name, version), - stop_timestamp) - - -def wait_on_previous_node(previous_node, version): - log("Previous node is: {}".format(previous_node)) - - previous_node_finished = monitor_key_exists( - 'osd-upgrade', - "{}_{}_done".format(previous_node, version)) - - while previous_node_finished is False: - log("{} is not finished. Waiting".format(previous_node)) - # Has this node been trying to upgrade for longer than - # 10 minutes? - # If so then move on and consider that node dead. - - # NOTE: This assumes the clusters clocks are somewhat accurate - # If the hosts clock is really far off it may cause it to skip - # the previous node even though it shouldn't. - current_timestamp = time.time() - previous_node_start_time = monitor_key_get( - 'osd-upgrade', - "{}_{}_start".format(previous_node, version)) - if (current_timestamp - (10 * 60)) > previous_node_start_time: - # Previous node is probably dead. Lets move on - if previous_node_start_time is not None: - log( - "Waited 10 mins on node {}. current time: {} > " - "previous node start time: {} Moving on".format( - previous_node, - (current_timestamp - (10 * 60)), - previous_node_start_time)) - return - else: - # I have to wait. Sleep a random amount of time and then - # check if I can lock,upgrade and roll. - wait_time = random.randrange(5, 30) - log('waiting for {} seconds'.format(wait_time)) - time.sleep(wait_time) - previous_node_finished = monitor_key_exists( - 'osd-upgrade', - "{}_{}_done".format(previous_node, version)) - - -def get_upgrade_position(osd_sorted_list, match_name): - for index, item in enumerate(osd_sorted_list): - if item.name == match_name: - return index - return None - - -# Edge cases: -# 1. Previous node dies on upgrade, can we retry? -# 2. This assumes that the osd failure domain is not set to osd. -# It rolls an entire server at a time. -def roll_osd_cluster(new_version): - """ - This is tricky to get right so here's what we're going to do. - There's 2 possible cases: Either I'm first in line or not. - If I'm not first in line I'll wait a random time between 5-30 seconds - and test to see if the previous osd is upgraded yet. - - TODO: If you're not in the same failure domain it's safe to upgrade - 1. Examine all pools and adopt the most strict failure domain policy - Example: Pool 1: Failure domain = rack - Pool 2: Failure domain = host - Pool 3: Failure domain = row - - outcome: Failure domain = host - """ - log('roll_osd_cluster called with {}'.format(new_version)) - my_name = socket.gethostname() - osd_tree = ceph.get_osd_tree(service='osd-upgrade') - # A sorted list of osd unit names - osd_sorted_list = sorted(osd_tree) - log("osd_sorted_list: {}".format(osd_sorted_list)) - - try: - position = get_upgrade_position(osd_sorted_list, my_name) - log("upgrade position: {}".format(position)) - if position == 0: - # I'm first! Roll - # First set a key to inform others I'm about to roll - lock_and_roll(my_name=my_name, version=new_version) - else: - # Check if the previous node has finished - status_set('blocked', - 'Waiting on {} to finish upgrading'.format( - osd_sorted_list[position - 1].name)) - wait_on_previous_node( - previous_node=osd_sorted_list[position - 1].name, - version=new_version) - lock_and_roll(my_name=my_name, version=new_version) - except ValueError: - log("Failed to find name {} in list {}".format( - my_name, osd_sorted_list)) - status_set('blocked', 'failed to upgrade osd') - - -def upgrade_osd(): - current_version = ceph.get_version() - status_set("maintenance", "Upgrading osd") - log("Current ceph version is {}".format(current_version)) - new_version = config('release-version') - log("Upgrading to: {}".format(new_version)) - - try: - add_source(config('source'), config('key')) - apt_update(fatal=True) - except subprocess.CalledProcessError as err: - log("Adding the ceph source failed with message: {}".format( - err.message)) - status_set("blocked", "Upgrade to {} failed".format(new_version)) - sys.exit(1) - try: - if ceph.systemd(): - for osd_id in ceph.get_local_osd_ids(): - service_stop('ceph-osd@{}'.format(osd_id)) - else: - service_stop('ceph-osd-all') - apt_install(packages=ceph.PACKAGES, fatal=True) - - # Ensure the ownership of Ceph's directories is correct - chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), - owner=ceph.ceph_user(), - group=ceph.ceph_user()) - if ceph.systemd(): - for osd_id in ceph.get_local_osd_ids(): - service_start('ceph-osd@{}'.format(osd_id)) - else: - service_start('ceph-osd-all') - except subprocess.CalledProcessError as err: - log("Stopping ceph and upgrading packages failed " - "with message: {}".format(err.message)) - status_set("blocked", "Upgrade to {} failed".format(new_version)) - sys.exit(1) - - def tune_network_adapters(): interfaces = netifaces.interfaces() for interface in interfaces: diff --git a/lib/ceph/__init__.py b/lib/ceph/__init__.py index 4b68e039..522e0876 100644 --- a/lib/ceph/__init__.py +++ b/lib/ceph/__init__.py @@ -12,7 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import ctypes +import collections import json +import random +import socket import subprocess import time import os @@ -22,38 +25,38 @@ import errno import shutil from charmhelpers.core import hookenv - from charmhelpers.core.host import ( mkdir, chownr, service_restart, lsb_release, - cmp_pkgrevno, service_stop, mounts) + cmp_pkgrevno, service_stop, mounts, service_start) from charmhelpers.core.hookenv import ( log, ERROR, cached, status_set, - WARNING, DEBUG) + WARNING, DEBUG, config) from charmhelpers.core.services import render_template from charmhelpers.fetch import ( - apt_cache -) - + apt_cache, + add_source, apt_install, apt_update) +from charmhelpers.contrib.storage.linux.ceph import ( + monitor_key_set, + monitor_key_exists, + monitor_key_get, + get_mon_map) from charmhelpers.contrib.storage.linux.utils import ( is_block_device, zap_disk, is_device_mounted) -from utils import ( - get_unit_hostname, -) - LEADER = 'leader' PEON = 'peon' QUORUM = [LEADER, PEON] -PACKAGES = ['ceph', 'gdisk', 'ntp', 'btrfs-tools', 'python-ceph', 'xfsprogs'] +PACKAGES = ['ceph', 'gdisk', 'ntp', 'btrfs-tools', 'python-ceph', + 'radosgw', 'xfsprogs'] LinkSpeed = { "BASE_10": 10, @@ -567,7 +570,7 @@ def error_out(msg): def is_quorum(): - asok = "/var/run/ceph/ceph-mon.{}.asok".format(get_unit_hostname()) + asok = "/var/run/ceph/ceph-mon.{}.asok".format(socket.gethostname()) cmd = [ "sudo", "-u", @@ -594,7 +597,7 @@ def is_quorum(): def is_leader(): - asok = "/var/run/ceph/ceph-mon.{}.asok".format(get_unit_hostname()) + asok = "/var/run/ceph/ceph-mon.{}.asok".format(socket.gethostname()) cmd = [ "sudo", "-u", @@ -627,7 +630,7 @@ def wait_for_quorum(): def add_bootstrap_hint(peer): - asok = "/var/run/ceph/ceph-mon.{}.asok".format(get_unit_hostname()) + asok = "/var/run/ceph/ceph-mon.{}.asok".format(socket.gethostname()) cmd = [ "sudo", "-u", @@ -921,41 +924,51 @@ _upgrade_caps = { } -def get_radosgw_key(): - return get_named_key('radosgw.gateway', _radosgw_caps) +def get_radosgw_key(pool_list): + return get_named_key(name='radosgw.gateway', + caps=_radosgw_caps, + pool_list=pool_list) -_default_caps = { - 'mon': ['allow rw'], - 'osd': ['allow rwx'] -} - -admin_caps = { - 'mds': ['allow'], - 'mon': ['allow *'], - 'osd': ['allow *'] -} - -osd_upgrade_caps = { - 'mon': ['allow command "config-key"', - 'allow command "osd tree"', - 'allow command "config-key list"', - 'allow command "config-key put"', - 'allow command "config-key get"', - 'allow command "config-key exists"', - 'allow command "osd out"', - 'allow command "osd in"', - 'allow command "osd rm"', - 'allow command "auth del"', - ] -} +def get_mds_key(name): + return create_named_keyring(entity='mds', + name=name, + caps=mds_caps) -def get_upgrade_key(): - return get_named_key('upgrade-osd', _upgrade_caps) +_default_caps = collections.OrderedDict([ + ('mon', ['allow rw']), + ('osd', ['allow rwx']), +]) + +admin_caps = collections.OrderedDict([ + ('mds', ['allow *']), + ('mon', ['allow *']), + ('osd', ['allow *']) +]) + +mds_caps = collections.OrderedDict([ + ('osd', ['allow *']), + ('mds', ['allow']), + ('mon', ['allow rwx']), +]) + +osd_upgrade_caps = collections.OrderedDict([ + ('mon', ['allow command "config-key"', + 'allow command "osd tree"', + 'allow command "config-key list"', + 'allow command "config-key put"', + 'allow command "config-key get"', + 'allow command "config-key exists"', + 'allow command "osd out"', + 'allow command "osd in"', + 'allow command "osd rm"', + 'allow command "auth del"', + ]) +]) -def get_named_key(name, caps=None): +def create_named_keyring(entity, name, caps=None): caps = caps or _default_caps cmd = [ "sudo", @@ -965,16 +978,52 @@ def get_named_key(name, caps=None): '--name', 'mon.', '--keyring', '/var/lib/ceph/mon/ceph-{}/keyring'.format( - get_unit_hostname() + socket.gethostname() + ), + 'auth', 'get-or-create', '{entity}.{name}'.format(entity=entity, + name=name), + ] + for subsystem, subcaps in caps.items(): + cmd.extend([subsystem, '; '.join(subcaps)]) + log("Calling subprocess.check_output: {}".format(cmd), level=DEBUG) + return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103 + + +def get_upgrade_key(): + return get_named_key('upgrade-osd', _upgrade_caps) + + +def get_named_key(name, caps=None, pool_list=None): + """ + Retrieve a specific named cephx key + :param name: String Name of key to get. + :param pool_list: The list of pools to give access to + :param caps: dict of cephx capabilities + :return: Returns a cephx key + """ + caps = caps or _default_caps + cmd = [ + "sudo", + "-u", + ceph_user(), + 'ceph', + '--name', 'mon.', + '--keyring', + '/var/lib/ceph/mon/ceph-{}/keyring'.format( + socket.gethostname() ), 'auth', 'get-or-create', 'client.{}'.format(name), ] # Add capabilities - for subsystem, subcaps in caps.iteritems(): - cmd.extend([ - subsystem, - '; '.join(subcaps), - ]) + for subsystem, subcaps in caps.items(): + if subsystem == 'osd': + if pool_list: + # This will output a string similar to: + # "pool=rgw pool=rbd pool=something" + pools = " ".join(['pool={0}'.format(i) for i in pool_list]) + subcaps[0] = subcaps[0] + " " + pools + cmd.extend([subsystem, '; '.join(subcaps)]) + log("Calling subprocess.check_output: {}".format(cmd), level=DEBUG) return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103 @@ -986,7 +1035,7 @@ def upgrade_key_caps(key, caps): cmd = [ "sudo", "-u", ceph_user(), 'ceph', 'auth', 'caps', key ] - for subsystem, subcaps in caps.iteritems(): + for subsystem, subcaps in caps.items(): cmd.extend([subsystem, '; '.join(subcaps)]) subprocess.check_call(cmd) @@ -997,7 +1046,7 @@ def systemd(): def bootstrap_monitor_cluster(secret): - hostname = get_unit_hostname() + hostname = socket.gethostname() path = '/var/lib/ceph/mon/ceph-{}'.format(hostname) done = '{}/done'.format(path) if systemd(): @@ -1042,7 +1091,7 @@ def bootstrap_monitor_cluster(secret): def update_monfs(): - hostname = get_unit_hostname() + hostname = socket.gethostname() monfs = '/var/lib/ceph/mon/ceph-{}'.format(hostname) if systemd(): init_marker = '{}/systemd'.format(monfs) @@ -1181,3 +1230,308 @@ def get_running_osds(): return result.split() except subprocess.CalledProcessError: return [] + + +def wait_for_all_monitors_to_upgrade(new_version, upgrade_key): + """ + Fairly self explanatory name. This function will wait + for all monitors in the cluster to upgrade or it will + return after a timeout period has expired. + :param new_version: str of the version to watch + :param upgrade_key: the cephx key name to use + """ + done = False + start_time = time.time() + monitor_list = [] + + mon_map = get_mon_map('admin') + if mon_map['monmap']['mons']: + for mon in mon_map['monmap']['mons']: + monitor_list.append(mon['name']) + while not done: + try: + done = all(monitor_key_exists(upgrade_key, "{}_{}_{}_done".format( + "mon", mon, new_version + )) for mon in monitor_list) + current_time = time.time() + if current_time > (start_time + 10 * 60): + raise Exception + else: + # Wait 30 seconds and test again if all monitors are upgraded + time.sleep(30) + except subprocess.CalledProcessError: + raise + + +# Edge cases: +# 1. Previous node dies on upgrade, can we retry? +def roll_monitor_cluster(new_version, upgrade_key): + """ + This is tricky to get right so here's what we're going to do. + :param new_version: str of the version to upgrade to + :param upgrade_key: the cephx key name to use when upgrading + There's 2 possible cases: Either I'm first in line or not. + If I'm not first in line I'll wait a random time between 5-30 seconds + and test to see if the previous monitor is upgraded yet. + """ + log('roll_monitor_cluster called with {}'.format(new_version)) + my_name = socket.gethostname() + monitor_list = [] + mon_map = get_mon_map('admin') + if mon_map['monmap']['mons']: + for mon in mon_map['monmap']['mons']: + monitor_list.append(mon['name']) + else: + status_set('blocked', 'Unable to get monitor cluster information') + sys.exit(1) + log('monitor_list: {}'.format(monitor_list)) + + # A sorted list of osd unit names + mon_sorted_list = sorted(monitor_list) + + try: + position = mon_sorted_list.index(my_name) + log("upgrade position: {}".format(position)) + if position == 0: + # I'm first! Roll + # First set a key to inform others I'm about to roll + lock_and_roll(upgrade_key=upgrade_key, + service='mon', + my_name=my_name, + version=new_version) + else: + # Check if the previous node has finished + status_set('blocked', + 'Waiting on {} to finish upgrading'.format( + mon_sorted_list[position - 1])) + wait_on_previous_node(upgrade_key=upgrade_key, + service='mon', + previous_node=mon_sorted_list[position - 1], + version=new_version) + lock_and_roll(upgrade_key=upgrade_key, + service='mon', + my_name=my_name, + version=new_version) + except ValueError: + log("Failed to find {} in list {}.".format( + my_name, mon_sorted_list)) + status_set('blocked', 'failed to upgrade monitor') + + +def upgrade_monitor(): + current_version = get_version() + status_set("maintenance", "Upgrading monitor") + log("Current ceph version is {}".format(current_version)) + new_version = config('release-version') + log("Upgrading to: {}".format(new_version)) + + try: + add_source(config('source'), config('key')) + apt_update(fatal=True) + except subprocess.CalledProcessError as err: + log("Adding the ceph source failed with message: {}".format( + err.message)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) + try: + if systemd(): + for mon_id in get_local_mon_ids(): + service_stop('ceph-mon@{}'.format(mon_id)) + else: + service_stop('ceph-mon-all') + apt_install(packages=PACKAGES, fatal=True) + + # Ensure the ownership of Ceph's directories is correct + chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), + owner=ceph_user(), + group=ceph_user()) + if systemd(): + for mon_id in get_local_mon_ids(): + service_start('ceph-mon@{}'.format(mon_id)) + else: + service_start('ceph-mon-all') + status_set("active", "") + except subprocess.CalledProcessError as err: + log("Stopping ceph and upgrading packages failed " + "with message: {}".format(err.message)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) + + +def lock_and_roll(upgrade_key, service, my_name, version): + start_timestamp = time.time() + + log('monitor_key_set {}_{}_{}_start {}'.format( + service, + my_name, + version, + start_timestamp)) + monitor_key_set(upgrade_key, "{}_{}_{}_start".format( + service, my_name, version), start_timestamp) + log("Rolling") + + # This should be quick + if service == 'osd': + upgrade_osd() + elif service == 'mon': + upgrade_monitor() + else: + log("Unknown service {}. Unable to upgrade".format(service), + level=ERROR) + log("Done") + + stop_timestamp = time.time() + # Set a key to inform others I am finished + log('monitor_key_set {}_{}_{}_done {}'.format(service, + my_name, + version, + stop_timestamp)) + monitor_key_set(upgrade_key, "{}_{}_{}_done".format(service, + my_name, + version), + stop_timestamp) + + +def wait_on_previous_node(upgrade_key, service, previous_node, version): + log("Previous node is: {}".format(previous_node)) + + previous_node_finished = monitor_key_exists( + upgrade_key, + "{}_{}_{}_done".format(service, previous_node, version)) + + while previous_node_finished is False: + log("{} is not finished. Waiting".format(previous_node)) + # Has this node been trying to upgrade for longer than + # 10 minutes? + # If so then move on and consider that node dead. + + # NOTE: This assumes the clusters clocks are somewhat accurate + # If the hosts clock is really far off it may cause it to skip + # the previous node even though it shouldn't. + current_timestamp = time.time() + previous_node_start_time = monitor_key_get( + upgrade_key, + "{}_{}_{}_start".format(service, previous_node, version)) + if (current_timestamp - (10 * 60)) > previous_node_start_time: + # Previous node is probably dead. Lets move on + if previous_node_start_time is not None: + log( + "Waited 10 mins on node {}. current time: {} > " + "previous node start time: {} Moving on".format( + previous_node, + (current_timestamp - (10 * 60)), + previous_node_start_time)) + return + else: + # I have to wait. Sleep a random amount of time and then + # check if I can lock,upgrade and roll. + wait_time = random.randrange(5, 30) + log('waiting for {} seconds'.format(wait_time)) + time.sleep(wait_time) + previous_node_finished = monitor_key_exists( + upgrade_key, + "{}_{}_{}_done".format(service, previous_node, version)) + + +def get_upgrade_position(osd_sorted_list, match_name): + for index, item in enumerate(osd_sorted_list): + if item.name == match_name: + return index + return None + + +# Edge cases: +# 1. Previous node dies on upgrade, can we retry? +# 2. This assumes that the osd failure domain is not set to osd. +# It rolls an entire server at a time. +def roll_osd_cluster(new_version, upgrade_key): + """ + This is tricky to get right so here's what we're going to do. + :param new_version: str of the version to upgrade to + :param upgrade_key: the cephx key name to use when upgrading + There's 2 possible cases: Either I'm first in line or not. + If I'm not first in line I'll wait a random time between 5-30 seconds + and test to see if the previous osd is upgraded yet. + + TODO: If you're not in the same failure domain it's safe to upgrade + 1. Examine all pools and adopt the most strict failure domain policy + Example: Pool 1: Failure domain = rack + Pool 2: Failure domain = host + Pool 3: Failure domain = row + + outcome: Failure domain = host + """ + log('roll_osd_cluster called with {}'.format(new_version)) + my_name = socket.gethostname() + osd_tree = get_osd_tree(service=upgrade_key) + # A sorted list of osd unit names + osd_sorted_list = sorted(osd_tree) + log("osd_sorted_list: {}".format(osd_sorted_list)) + + try: + position = get_upgrade_position(osd_sorted_list, my_name) + log("upgrade position: {}".format(position)) + if position == 0: + # I'm first! Roll + # First set a key to inform others I'm about to roll + lock_and_roll(upgrade_key=upgrade_key, + service='osd', + my_name=my_name, + version=new_version) + else: + # Check if the previous node has finished + status_set('blocked', + 'Waiting on {} to finish upgrading'.format( + osd_sorted_list[position - 1].name)) + wait_on_previous_node( + upgrade_key=upgrade_key, + service='osd', + previous_node=osd_sorted_list[position - 1].name, + version=new_version) + lock_and_roll(upgrade_key=upgrade_key, + service='osd', + my_name=my_name, + version=new_version) + except ValueError: + log("Failed to find name {} in list {}".format( + my_name, osd_sorted_list)) + status_set('blocked', 'failed to upgrade osd') + + +def upgrade_osd(): + current_version = get_version() + status_set("maintenance", "Upgrading osd") + log("Current ceph version is {}".format(current_version)) + new_version = config('release-version') + log("Upgrading to: {}".format(new_version)) + + try: + add_source(config('source'), config('key')) + apt_update(fatal=True) + except subprocess.CalledProcessError as err: + log("Adding the ceph source failed with message: {}".format( + err.message)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) + try: + if systemd(): + for osd_id in get_local_osd_ids(): + service_stop('ceph-osd@{}'.format(osd_id)) + else: + service_stop('ceph-osd-all') + apt_install(packages=PACKAGES, fatal=True) + + # Ensure the ownership of Ceph's directories is correct + chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), + owner=ceph_user(), + group=ceph_user()) + if systemd(): + for osd_id in get_local_osd_ids(): + service_start('ceph-osd@{}'.format(osd_id)) + else: + service_start('ceph-osd-all') + except subprocess.CalledProcessError as err: + log("Stopping ceph and upgrading packages failed " + "with message: {}".format(err.message)) + status_set("blocked", "Upgrade to {} failed".format(new_version)) + sys.exit(1) diff --git a/lib/ceph/ceph_broker.py b/lib/ceph/ceph_broker.py index d55e570b..0ed9833e 100644 --- a/lib/ceph/ceph_broker.py +++ b/lib/ceph/ceph_broker.py @@ -15,6 +15,8 @@ # limitations under the License. import json +import os +from tempfile import NamedTemporaryFile from charmhelpers.core.hookenv import ( log, @@ -42,6 +44,8 @@ from charmhelpers.contrib.storage.linux.ceph import ( # This comes from http://docs.ceph.com/docs/master/rados/operations/pools/ # This should do a decent job of preventing people from passing in bad values. # It will give a useful error message +from subprocess import check_output, CalledProcessError + POOL_KEYS = { # "Ceph Key Name": [Python type, [Valid Range]] "size": [int], @@ -56,8 +60,8 @@ POOL_KEYS = { "write_fadvise_dontneed": [bool], "noscrub": [bool], "nodeep-scrub": [bool], - "hit_set_type": [basestring, ["bloom", "explicit_hash", - "explicit_object"]], + "hit_set_type": [str, ["bloom", "explicit_hash", + "explicit_object"]], "hit_set_count": [int, [1, 1]], "hit_set_period": [int], "hit_set_fpp": [float, [0.0, 1.0]], @@ -289,6 +293,136 @@ def handle_set_pool_value(request, service): value=params['value']) +def handle_rgw_regionmap_update(request, service): + name = request.get('client-name') + if not name: + msg = "Missing rgw-region or client-name params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + try: + check_output(['radosgw-admin', + '--id', service, + 'regionmap', 'update', '--name', name]) + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + + +def handle_rgw_regionmap_default(request, service): + region = request.get('rgw-region') + name = request.get('client-name') + if not region or not name: + msg = "Missing rgw-region or client-name params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + try: + check_output( + [ + 'radosgw-admin', + '--id', service, + 'regionmap', + 'default', + '--rgw-region', region, + '--name', name]) + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + + +def handle_rgw_zone_set(request, service): + json_file = request.get('zone-json') + name = request.get('client-name') + region_name = request.get('region-name') + zone_name = request.get('zone-name') + if not json_file or not name or not region_name or not zone_name: + msg = "Missing json-file or client-name params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + infile = NamedTemporaryFile(delete=False) + with open(infile.name, 'w') as infile_handle: + infile_handle.write(json_file) + try: + check_output( + [ + 'radosgw-admin', + '--id', service, + 'zone', + 'set', + '--rgw-zone', zone_name, + '--infile', infile.name, + '--name', name, + ] + ) + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + os.unlink(infile.name) + + +def handle_rgw_create_user(request, service): + user_id = request.get('rgw-uid') + display_name = request.get('display-name') + name = request.get('client-name') + if not name or not display_name or not user_id: + msg = "Missing client-name, display-name or rgw-uid" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + try: + create_output = check_output( + [ + 'radosgw-admin', + '--id', service, + 'user', + 'create', + '--uid', user_id, + '--display-name', display_name, + '--name', name, + '--system' + ] + ) + try: + user_json = json.loads(create_output) + return {'exit-code': 0, 'user': user_json} + except ValueError as err: + log(err, level=ERROR) + return {'exit-code': 1, 'stderr': err} + + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + + +def handle_rgw_region_set(request, service): + # radosgw-admin region set --infile us.json --name client.radosgw.us-east-1 + json_file = request.get('region-json') + name = request.get('client-name') + region_name = request.get('region-name') + zone_name = request.get('zone-name') + if not json_file or not name or not region_name or not zone_name: + msg = "Missing json-file or client-name params" + log(msg, level=ERROR) + return {'exit-code': 1, 'stderr': msg} + infile = NamedTemporaryFile(delete=False) + with open(infile.name, 'w') as infile_handle: + infile_handle.write(json_file) + try: + check_output( + [ + 'radosgw-admin', + '--id', service, + 'region', + 'set', + '--rgw-zone', zone_name, + '--infile', infile.name, + '--name', name, + ] + ) + except CalledProcessError as err: + log(err.output, level=ERROR) + return {'exit-code': 1, 'stderr': err.output} + os.unlink(infile.name) + + def process_requests_v1(reqs): """Process v1 requests. @@ -341,6 +475,16 @@ def process_requests_v1(reqs): snapshot_name=snapshot_name) elif op == "set-pool-value": ret = handle_set_pool_value(request=req, service=svc) + elif op == "rgw-region-set": + ret = handle_rgw_region_set(request=req, service=svc) + elif op == "rgw-zone-set": + ret = handle_rgw_zone_set(request=req, service=svc) + elif op == "rgw-regionmap-update": + ret = handle_rgw_regionmap_update(request=req, service=svc) + elif op == "rgw-regionmap-default": + ret = handle_rgw_regionmap_default(request=req, service=svc) + elif op == "rgw-create-user": + ret = handle_rgw_create_user(request=req, service=svc) else: msg = "Unknown operation '%s'" % op log(msg, level=ERROR) diff --git a/unit_tests/test_upgrade.py b/unit_tests/test_upgrade.py new file mode 100644 index 00000000..7b213ca4 --- /dev/null +++ b/unit_tests/test_upgrade.py @@ -0,0 +1,35 @@ +import unittest + +__author__ = 'Chris Holcombe ' + +from mock import patch, MagicMock + +from ceph_hooks import check_for_upgrade + + +def config_side_effect(*args): + if args[0] == 'source': + return 'cloud:trusty-kilo' + elif args[0] == 'key': + return 'key' + elif args[0] == 'release-version': + return 'cloud:trusty-kilo' + + +class UpgradeRollingTestCase(unittest.TestCase): + @patch('ceph_hooks.hookenv') + @patch('ceph_hooks.host') + @patch('ceph_hooks.ceph.roll_osd_cluster') + def test_check_for_upgrade(self, roll_osd_cluster, host, hookenv): + host.lsb_release.return_value = { + 'DISTRIB_CODENAME': 'trusty', + } + previous_mock = MagicMock().return_value + previous_mock.previous.return_value = "cloud:trusty-juno" + hookenv.config.side_effect = [previous_mock, + config_side_effect('source')] + check_for_upgrade() + + roll_osd_cluster.assert_called_with( + new_version='cloud:trusty-kilo', + upgrade_key='osd-upgrade') diff --git a/unit_tests/test_upgrade_roll.py b/unit_tests/test_upgrade_roll.py deleted file mode 100644 index c88ea6e1..00000000 --- a/unit_tests/test_upgrade_roll.py +++ /dev/null @@ -1,179 +0,0 @@ -# Copyright 2016 Canonical Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time - -from mock import patch, call, MagicMock - -from ceph import CrushLocation - -import test_utils -import ceph_hooks - -TO_PATCH = [ - 'apt_install', - 'apt_update', - 'add_source', - 'config', - 'ceph', - 'get_conf', - 'hookenv', - 'host', - 'log', - 'service_start', - 'service_stop', - 'socket', - 'status_set', - 'chownr', -] - - -def config_side_effect(*args): - if args[0] == 'source': - return 'cloud:trusty-kilo' - elif args[0] == 'key': - return 'key' - elif args[0] == 'release-version': - return 'cloud:trusty-kilo' - - -previous_node_start_time = time.time() - (9 * 60) - - -def monitor_key_side_effect(*args): - if args[1] == \ - 'ip-192-168-1-2_0.94.1_done': - return False - elif args[1] == \ - 'ip-192-168-1-2_0.94.1_start': - # Return that the previous node started 9 minutes ago - return previous_node_start_time - - -class UpgradeRollingTestCase(test_utils.CharmTestCase): - def setUp(self): - super(UpgradeRollingTestCase, self).setUp(ceph_hooks, TO_PATCH) - - @patch('ceph_hooks.roll_osd_cluster') - def test_check_for_upgrade(self, roll_osd_cluster): - self.host.lsb_release.return_value = { - 'DISTRIB_CODENAME': 'trusty', - } - previous_mock = MagicMock().return_value - previous_mock.previous.return_value = "cloud:trusty-juno" - self.hookenv.config.side_effect = [previous_mock, - config_side_effect('source')] - ceph_hooks.check_for_upgrade() - - roll_osd_cluster.assert_called_with('cloud:trusty-kilo') - - @patch('ceph_hooks.upgrade_osd') - @patch('ceph_hooks.monitor_key_set') - def test_lock_and_roll(self, monitor_key_set, upgrade_osd): - monitor_key_set.monitor_key_set.return_value = None - ceph_hooks.lock_and_roll(my_name='ip-192-168-1-2', - version='0.94.1') - upgrade_osd.assert_called_once_with() - - def test_upgrade_osd(self): - self.config.side_effect = config_side_effect - self.ceph.get_version.return_value = "0.80" - self.ceph.ceph_user.return_value = "ceph" - self.ceph.systemd.return_value = False - ceph_hooks.upgrade_osd() - self.service_stop.assert_called_with('ceph-osd-all') - self.service_start.assert_called_with('ceph-osd-all') - self.status_set.assert_has_calls([ - call('maintenance', 'Upgrading osd'), - ]) - self.chownr.assert_has_calls( - [ - call(group='ceph', owner='ceph', path='/var/lib/ceph') - ] - ) - - @patch('ceph_hooks.lock_and_roll') - @patch('ceph_hooks.get_upgrade_position') - def test_roll_osd_cluster_first(self, - get_upgrade_position, - lock_and_roll): - self.socket.gethostname.return_value = "ip-192-168-1-2" - self.ceph.get_osd_tree.return_value = "" - get_upgrade_position.return_value = 0 - ceph_hooks.roll_osd_cluster('0.94.1') - lock_and_roll.assert_called_with(my_name="ip-192-168-1-2", - version="0.94.1") - - @patch('ceph_hooks.lock_and_roll') - @patch('ceph_hooks.get_upgrade_position') - @patch('ceph_hooks.wait_on_previous_node') - def test_roll_osd_cluster_second(self, - wait_on_previous_node, - get_upgrade_position, - lock_and_roll): - wait_on_previous_node.return_value = None - self.socket.gethostname.return_value = "ip-192-168-1-3" - self.ceph.get_osd_tree.return_value = [ - CrushLocation( - name="ip-192-168-1-2", - identifier='a', - host='host-a', - rack='rack-a', - row='row-a', - datacenter='dc-1', - chassis='chassis-a', - root='ceph'), - CrushLocation( - name="ip-192-168-1-3", - identifier='a', - host='host-b', - rack='rack-a', - row='row-a', - datacenter='dc-1', - chassis='chassis-a', - root='ceph') - ] - get_upgrade_position.return_value = 1 - ceph_hooks.roll_osd_cluster('0.94.1') - self.status_set.assert_called_with( - 'blocked', - 'Waiting on ip-192-168-1-2 to finish upgrading') - lock_and_roll.assert_called_with(my_name="ip-192-168-1-3", - version="0.94.1") - - @patch('time.time', lambda *args: previous_node_start_time + 10 * 60 + 1) - @patch('ceph_hooks.monitor_key_get') - @patch('ceph_hooks.monitor_key_exists') - def test_wait_on_previous_node(self, - monitor_key_exists, - monitor_key_get): - monitor_key_get.side_effect = monitor_key_side_effect - monitor_key_exists.return_value = False - - ceph_hooks.wait_on_previous_node(previous_node="ip-192-168-1-2", - version='0.94.1') - - # Make sure we checked to see if the previous node started - monitor_key_get.assert_has_calls( - [call('osd-upgrade', 'ip-192-168-1-2_0.94.1_start')] - ) - # Make sure we checked to see if the previous node was finished - monitor_key_exists.assert_has_calls( - [call('osd-upgrade', 'ip-192-168-1-2_0.94.1_done')] - ) - # Make sure we waited at last once before proceeding - self.log.assert_has_calls( - [call('Previous node is: ip-192-168-1-2')], - [call('ip-192-168-1-2 is not finished. Waiting')], - )