Merge "Rolling upgrades for Ceph Monitor Cluster"
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -3,5 +3,6 @@ bin
|
||||
.testrepository
|
||||
.tox
|
||||
*.sw[nop]
|
||||
.idea
|
||||
*.pyc
|
||||
.idea
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
#
|
||||
# Copyright 2012 Canonical Ltd.
|
||||
#
|
||||
@@ -6,35 +5,32 @@
|
||||
# James Page <james.page@canonical.com>
|
||||
# Paul Collins <paul.collins@canonical.com>
|
||||
#
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
from charmhelpers.contrib.storage.linux.utils import (
|
||||
is_block_device,
|
||||
zap_disk,
|
||||
is_device_mounted)
|
||||
from charmhelpers.core.host import (
|
||||
mkdir,
|
||||
chownr,
|
||||
service_restart,
|
||||
cmp_pkgrevno,
|
||||
lsb_release
|
||||
)
|
||||
lsb_release,
|
||||
cmp_pkgrevno)
|
||||
from charmhelpers.core.hookenv import (
|
||||
log,
|
||||
ERROR,
|
||||
WARNING,
|
||||
cached,
|
||||
status_set,
|
||||
)
|
||||
WARNING)
|
||||
from charmhelpers.fetch import (
|
||||
apt_cache
|
||||
)
|
||||
from charmhelpers.contrib.storage.linux.utils import (
|
||||
zap_disk,
|
||||
is_block_device,
|
||||
is_device_mounted,
|
||||
)
|
||||
from utils import (
|
||||
get_unit_hostname,
|
||||
)
|
||||
@@ -53,8 +49,32 @@ def ceph_user():
|
||||
return "root"
|
||||
|
||||
|
||||
def get_local_mon_ids():
|
||||
"""
|
||||
This will list the /var/lib/ceph/mon/* directories and try
|
||||
to split the ID off of the directory name and return it in
|
||||
a list
|
||||
|
||||
:return: list. A list of monitor identifiers :raise: OSError if
|
||||
something goes wrong with listing the directory.
|
||||
"""
|
||||
mon_ids = []
|
||||
mon_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'mon')
|
||||
if os.path.exists(mon_path):
|
||||
try:
|
||||
dirs = os.listdir(mon_path)
|
||||
for mon_dir in dirs:
|
||||
# Basically this takes everything after ceph- as the monitor ID
|
||||
match = re.search('ceph-(?P<mon_id>.*)', mon_dir)
|
||||
if match:
|
||||
mon_ids.append(match.group('mon_id'))
|
||||
except OSError:
|
||||
raise
|
||||
return mon_ids
|
||||
|
||||
|
||||
def get_version():
|
||||
'''Derive Ceph release from an installed package.'''
|
||||
"""Derive Ceph release from an installed package."""
|
||||
import apt_pkg as apt
|
||||
|
||||
cache = apt_cache()
|
||||
@@ -63,7 +83,7 @@ def get_version():
|
||||
pkg = cache[package]
|
||||
except:
|
||||
# the package is unknown to the current apt cache.
|
||||
e = 'Could not determine version of package with no installation '\
|
||||
e = 'Could not determine version of package with no installation ' \
|
||||
'candidate: %s' % package
|
||||
error_out(e)
|
||||
|
||||
@@ -165,6 +185,7 @@ def add_bootstrap_hint(peer):
|
||||
# Ignore any errors for this call
|
||||
subprocess.call(cmd)
|
||||
|
||||
|
||||
DISK_FORMATS = [
|
||||
'xfs',
|
||||
'ext4',
|
||||
@@ -178,7 +199,7 @@ def is_osd_disk(dev):
|
||||
info = info.split("\n") # IGNORE:E1103
|
||||
for line in info:
|
||||
if line.startswith(
|
||||
'Partition GUID code: 4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D'
|
||||
'Partition GUID code: 4FBD7E29-9D25-41B8-AFD0-062C0CEFF05D'
|
||||
):
|
||||
return True
|
||||
except subprocess.CalledProcessError:
|
||||
@@ -213,7 +234,7 @@ def is_bootstrapped():
|
||||
|
||||
|
||||
def wait_for_bootstrap():
|
||||
while (not is_bootstrapped()):
|
||||
while not is_bootstrapped():
|
||||
time.sleep(3)
|
||||
|
||||
|
||||
@@ -243,7 +264,6 @@ def generate_monitor_secret():
|
||||
|
||||
return "{}==".format(res.split('=')[1].strip())
|
||||
|
||||
|
||||
# OSD caps taken from ceph-create-keys
|
||||
_osd_bootstrap_caps = {
|
||||
'mon': [
|
||||
@@ -310,6 +330,9 @@ _radosgw_caps = {
|
||||
'mon': ['allow rw'],
|
||||
'osd': ['allow rwx']
|
||||
}
|
||||
_upgrade_caps = {
|
||||
'mon': ['allow rwx']
|
||||
}
|
||||
|
||||
|
||||
def get_radosgw_key():
|
||||
@@ -321,6 +344,26 @@ _default_caps = {
|
||||
'osd': ['allow rwx']
|
||||
}
|
||||
|
||||
admin_caps = {
|
||||
'mds': ['allow'],
|
||||
'mon': ['allow *'],
|
||||
'osd': ['allow *']
|
||||
}
|
||||
|
||||
osd_upgrade_caps = {
|
||||
'mon': ['allow command "config-key"',
|
||||
'allow command "osd tree"',
|
||||
'allow command "config-key list"',
|
||||
'allow command "config-key put"',
|
||||
'allow command "config-key get"',
|
||||
'allow command "config-key exists"',
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def get_upgrade_key():
|
||||
return get_named_key('upgrade-osd', _upgrade_caps)
|
||||
|
||||
|
||||
def get_named_key(name, caps=None):
|
||||
caps = caps or _default_caps
|
||||
@@ -346,7 +389,7 @@ def get_named_key(name, caps=None):
|
||||
|
||||
|
||||
def upgrade_key_caps(key, caps):
|
||||
''' Upgrade key to have capabilities caps '''
|
||||
""" Upgrade key to have capabilities caps """
|
||||
if not is_leader():
|
||||
# Not the MON leader OR not clustered
|
||||
return
|
||||
@@ -440,7 +483,7 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
|
||||
log('Path {} is not a block device - bailing'.format(dev))
|
||||
return
|
||||
|
||||
if (is_osd_disk(dev) and not reformat_osd):
|
||||
if is_osd_disk(dev) and not reformat_osd:
|
||||
log('Looks like {} is already an OSD, skipping.'.format(dev))
|
||||
return
|
||||
|
||||
|
||||
@@ -10,10 +10,17 @@
|
||||
|
||||
import glob
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import uuid
|
||||
import time
|
||||
|
||||
import ceph
|
||||
from charmhelpers.core import host
|
||||
from charmhelpers.core import hookenv
|
||||
from charmhelpers.core.hookenv import (
|
||||
log,
|
||||
DEBUG,
|
||||
@@ -29,15 +36,14 @@ from charmhelpers.core.hookenv import (
|
||||
service_name,
|
||||
relations_of_type,
|
||||
status_set,
|
||||
local_unit
|
||||
)
|
||||
local_unit)
|
||||
from charmhelpers.core.host import (
|
||||
service_restart,
|
||||
mkdir,
|
||||
write_file,
|
||||
rsync,
|
||||
cmp_pkgrevno
|
||||
)
|
||||
cmp_pkgrevno,
|
||||
service_stop, service_start)
|
||||
from charmhelpers.fetch import (
|
||||
apt_install,
|
||||
apt_update,
|
||||
@@ -52,7 +58,11 @@ from charmhelpers.contrib.network.ip import (
|
||||
)
|
||||
from charmhelpers.core.sysctl import create as create_sysctl
|
||||
from charmhelpers.core.templating import render
|
||||
|
||||
from charmhelpers.contrib.storage.linux.ceph import (
|
||||
monitor_key_set,
|
||||
monitor_key_exists,
|
||||
monitor_key_get,
|
||||
get_mon_map)
|
||||
from utils import (
|
||||
get_networks,
|
||||
get_public_addr,
|
||||
@@ -61,7 +71,6 @@ from utils import (
|
||||
from ceph_broker import (
|
||||
process_requests
|
||||
)
|
||||
|
||||
from charmhelpers.contrib.charmsupport import nrpe
|
||||
|
||||
hooks = Hooks()
|
||||
@@ -71,6 +80,186 @@ SCRIPTS_DIR = '/usr/local/bin'
|
||||
STATUS_FILE = '/var/lib/nagios/cat-ceph-status.txt'
|
||||
STATUS_CRONFILE = '/etc/cron.d/cat-ceph-health'
|
||||
|
||||
# A dict of valid ceph upgrade paths. Mapping is old -> new
|
||||
upgrade_paths = {
|
||||
'cloud:trusty-juno': 'cloud:trusty-kilo',
|
||||
'cloud:trusty-kilo': 'cloud:trusty-liberty',
|
||||
'cloud:trusty-liberty': 'cloud:trusty-mitaka',
|
||||
}
|
||||
|
||||
|
||||
def pretty_print_upgrade_paths():
|
||||
lines = []
|
||||
for key, value in upgrade_paths.iteritems():
|
||||
lines.append("{} -> {}".format(key, value))
|
||||
return lines
|
||||
|
||||
|
||||
def check_for_upgrade():
|
||||
release_info = host.lsb_release()
|
||||
if not release_info['DISTRIB_CODENAME'] == 'trusty':
|
||||
log("Invalid upgrade path from {}. Only trusty is currently "
|
||||
"supported".format(release_info['DISTRIB_CODENAME']))
|
||||
return
|
||||
|
||||
c = hookenv.config()
|
||||
old_version = c.previous('source')
|
||||
log('old_version: {}'.format(old_version))
|
||||
# Strip all whitespace
|
||||
new_version = hookenv.config('source')
|
||||
if new_version:
|
||||
# replace all whitespace
|
||||
new_version = new_version.replace(' ', '')
|
||||
log('new_version: {}'.format(new_version))
|
||||
|
||||
if old_version in upgrade_paths:
|
||||
if new_version == upgrade_paths[old_version]:
|
||||
log("{} to {} is a valid upgrade path. Proceeding.".format(
|
||||
old_version, new_version))
|
||||
roll_monitor_cluster(new_version)
|
||||
else:
|
||||
# Log a helpful error message
|
||||
log("Invalid upgrade path from {} to {}. "
|
||||
"Valid paths are: {}".format(old_version,
|
||||
new_version,
|
||||
pretty_print_upgrade_paths()))
|
||||
|
||||
|
||||
def lock_and_roll(my_name):
|
||||
start_timestamp = time.time()
|
||||
|
||||
log('monitor_key_set {}_start {}'.format(my_name, start_timestamp))
|
||||
monitor_key_set('admin', "{}_start".format(my_name), start_timestamp)
|
||||
log("Rolling")
|
||||
# This should be quick
|
||||
upgrade_monitor()
|
||||
log("Done")
|
||||
|
||||
stop_timestamp = time.time()
|
||||
# Set a key to inform others I am finished
|
||||
log('monitor_key_set {}_done {}'.format(my_name, stop_timestamp))
|
||||
monitor_key_set('admin', "{}_done".format(my_name), stop_timestamp)
|
||||
|
||||
|
||||
def wait_on_previous_node(previous_node):
|
||||
log("Previous node is: {}".format(previous_node))
|
||||
|
||||
previous_node_finished = monitor_key_exists(
|
||||
'admin',
|
||||
"{}_done".format(previous_node))
|
||||
|
||||
while previous_node_finished is False:
|
||||
log("{} is not finished. Waiting".format(previous_node))
|
||||
# Has this node been trying to upgrade for longer than
|
||||
# 10 minutes?
|
||||
# If so then move on and consider that node dead.
|
||||
|
||||
# NOTE: This assumes the clusters clocks are somewhat accurate
|
||||
# If the hosts clock is really far off it may cause it to skip
|
||||
# the previous node even though it shouldn't.
|
||||
current_timestamp = time.time()
|
||||
previous_node_start_time = monitor_key_get(
|
||||
'admin',
|
||||
"{}_start".format(previous_node))
|
||||
if (current_timestamp - (10 * 60)) > previous_node_start_time:
|
||||
# Previous node is probably dead. Lets move on
|
||||
if previous_node_start_time is not None:
|
||||
log(
|
||||
"Waited 10 mins on node {}. current time: {} > "
|
||||
"previous node start time: {} Moving on".format(
|
||||
previous_node,
|
||||
(current_timestamp - (10 * 60)),
|
||||
previous_node_start_time))
|
||||
return
|
||||
else:
|
||||
# I have to wait. Sleep a random amount of time and then
|
||||
# check if I can lock,upgrade and roll.
|
||||
wait_time = random.randrange(5, 30)
|
||||
log('waiting for {} seconds'.format(wait_time))
|
||||
time.sleep(wait_time)
|
||||
previous_node_finished = monitor_key_exists(
|
||||
'admin',
|
||||
"{}_done".format(previous_node))
|
||||
|
||||
|
||||
# Edge cases:
|
||||
# 1. Previous node dies on upgrade, can we retry?
|
||||
def roll_monitor_cluster(new_version):
|
||||
"""
|
||||
This is tricky to get right so here's what we're going to do.
|
||||
There's 2 possible cases: Either I'm first in line or not.
|
||||
If I'm not first in line I'll wait a random time between 5-30 seconds
|
||||
and test to see if the previous monitor is upgraded yet.
|
||||
"""
|
||||
log('roll_monitor_cluster called with {}'.format(new_version))
|
||||
my_name = socket.gethostname()
|
||||
monitor_list = []
|
||||
mon_map = get_mon_map('admin')
|
||||
if mon_map['monmap']['mons']:
|
||||
for mon in mon_map['monmap']['mons']:
|
||||
monitor_list.append(mon['name'])
|
||||
else:
|
||||
status_set('blocked', 'Unable to get monitor cluster information')
|
||||
sys.exit(1)
|
||||
log('monitor_list: {}'.format(monitor_list))
|
||||
|
||||
# A sorted list of osd unit names
|
||||
mon_sorted_list = sorted(monitor_list)
|
||||
|
||||
try:
|
||||
position = mon_sorted_list.index(my_name)
|
||||
log("upgrade position: {}".format(position))
|
||||
if position == 0:
|
||||
# I'm first! Roll
|
||||
# First set a key to inform others I'm about to roll
|
||||
lock_and_roll(my_name=my_name)
|
||||
else:
|
||||
# Check if the previous node has finished
|
||||
status_set('blocked',
|
||||
'Waiting on {} to finish upgrading'.format(
|
||||
mon_sorted_list[position - 1]))
|
||||
wait_on_previous_node(previous_node=mon_sorted_list[position - 1])
|
||||
lock_and_roll(my_name=my_name)
|
||||
except ValueError:
|
||||
log("Failed to find {} in list {}.".format(
|
||||
my_name, mon_sorted_list))
|
||||
status_set('blocked', 'failed to upgrade monitor')
|
||||
|
||||
|
||||
def upgrade_monitor():
|
||||
current_version = ceph.get_version()
|
||||
status_set("maintenance", "Upgrading monitor")
|
||||
log("Current ceph version is {}".format(current_version))
|
||||
new_version = config('release-version')
|
||||
log("Upgrading to: {}".format(new_version))
|
||||
|
||||
try:
|
||||
add_source(config('source'), config('key'))
|
||||
apt_update(fatal=True)
|
||||
except subprocess.CalledProcessError as err:
|
||||
log("Adding the ceph source failed with message: {}".format(
|
||||
err.message))
|
||||
status_set("blocked", "Upgrade to {} failed".format(new_version))
|
||||
sys.exit(1)
|
||||
try:
|
||||
if ceph.systemd():
|
||||
for mon_id in ceph.get_local_mon_ids():
|
||||
service_stop('ceph-mon@{}'.format(mon_id))
|
||||
else:
|
||||
service_stop('ceph-mon-all')
|
||||
apt_install(packages=ceph.PACKAGES, fatal=True)
|
||||
if ceph.systemd():
|
||||
for mon_id in ceph.get_local_mon_ids():
|
||||
service_start('ceph-mon@{}'.format(mon_id))
|
||||
else:
|
||||
service_start('ceph-mon-all')
|
||||
status_set("active", "")
|
||||
except subprocess.CalledProcessError as err:
|
||||
log("Stopping ceph and upgrading packages failed "
|
||||
"with message: {}".format(err.message))
|
||||
status_set("blocked", "Upgrade to {} failed".format(new_version))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def install_upstart_scripts():
|
||||
# Only install upstart configurations for older versions
|
||||
@@ -123,6 +312,7 @@ def emit_cephconf():
|
||||
install_alternative('ceph.conf', '/etc/ceph/ceph.conf',
|
||||
charm_ceph_conf, 100)
|
||||
|
||||
|
||||
JOURNAL_ZAPPED = '/var/lib/ceph/journal_zapped'
|
||||
|
||||
|
||||
@@ -131,6 +321,9 @@ def config_changed():
|
||||
if config('prefer-ipv6'):
|
||||
assert_charm_supports_ipv6()
|
||||
|
||||
# Check if an upgrade was requested
|
||||
check_for_upgrade()
|
||||
|
||||
log('Monitor hosts are ' + repr(get_mon_hosts()))
|
||||
|
||||
sysctl_dict = config('sysctl')
|
||||
@@ -165,7 +358,7 @@ def config_changed():
|
||||
emit_cephconf()
|
||||
|
||||
# Support use of single node ceph
|
||||
if (not ceph.is_bootstrapped() and int(config('monitor-count')) == 1):
|
||||
if not ceph.is_bootstrapped() and int(config('monitor-count')) == 1:
|
||||
status_set('maintenance', 'Bootstrapping single Ceph MON')
|
||||
ceph.bootstrap_monitor_cluster(config('monitor-secret'))
|
||||
ceph.wait_for_bootstrap()
|
||||
@@ -188,10 +381,10 @@ def get_mon_hosts():
|
||||
|
||||
|
||||
def get_peer_units():
|
||||
'''
|
||||
"""
|
||||
Returns a dictionary of unit names from the mon peer relation with
|
||||
a flag indicating whether the unit has presented its address
|
||||
'''
|
||||
"""
|
||||
units = {}
|
||||
units[local_unit()] = True
|
||||
for relid in relation_ids('mon'):
|
||||
@@ -206,8 +399,7 @@ def mon_relation_joined():
|
||||
public_addr = get_public_addr()
|
||||
for relid in relation_ids('mon'):
|
||||
relation_set(relation_id=relid,
|
||||
relation_settings={'ceph-public-address':
|
||||
public_addr})
|
||||
relation_settings={'ceph-public-address': public_addr})
|
||||
|
||||
|
||||
@hooks.hook('mon-relation-departed',
|
||||
@@ -250,7 +442,7 @@ def notify_client():
|
||||
|
||||
|
||||
def upgrade_keys():
|
||||
''' Ceph now required mon allow rw for pool creation '''
|
||||
""" Ceph now required mon allow rw for pool creation """
|
||||
if len(relation_ids('radosgw')) > 0:
|
||||
ceph.upgrade_key_caps('client.radosgw.gateway',
|
||||
ceph._radosgw_caps)
|
||||
@@ -272,6 +464,8 @@ def osd_relation(relid=None):
|
||||
'osd_bootstrap_key': ceph.get_osd_bootstrap_key(),
|
||||
'auth': config('auth-supported'),
|
||||
'ceph-public-address': public_addr,
|
||||
'osd_upgrade_key': ceph.get_named_key('osd-upgrade',
|
||||
caps=ceph.osd_upgrade_caps),
|
||||
}
|
||||
relation_set(relation_id=relid,
|
||||
relation_settings=data)
|
||||
@@ -430,6 +624,9 @@ def assess_status():
|
||||
# Unit should be running and clustered, but no quorum
|
||||
# TODO: should this be blocked or waiting?
|
||||
status_set('blocked', 'Unit not clustered (no quorum)')
|
||||
# If there's a pending lock for this unit,
|
||||
# can i get the lock?
|
||||
# reboot the ceph-mon process
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -24,6 +24,8 @@
|
||||
# Adam Gandelman <adamg@ubuntu.com>
|
||||
#
|
||||
import bisect
|
||||
import errno
|
||||
import hashlib
|
||||
import six
|
||||
|
||||
import os
|
||||
@@ -259,6 +261,133 @@ class ErasurePool(Pool):
|
||||
Returns json formatted output"""
|
||||
|
||||
|
||||
def get_mon_map(service):
|
||||
"""
|
||||
Returns the current monitor map.
|
||||
:param service: six.string_types. The Ceph user name to run the command under
|
||||
:return: json string. :raise: ValueError if the monmap fails to parse.
|
||||
Also raises CalledProcessError if our ceph command fails
|
||||
"""
|
||||
try:
|
||||
mon_status = check_output(
|
||||
['ceph', '--id', service,
|
||||
'mon_status', '--format=json'])
|
||||
try:
|
||||
return json.loads(mon_status)
|
||||
except ValueError as v:
|
||||
log("Unable to parse mon_status json: {}. Error: {}".format(
|
||||
mon_status, v.message))
|
||||
raise
|
||||
except CalledProcessError as e:
|
||||
log("mon_status command failed with message: {}".format(
|
||||
e.message))
|
||||
raise
|
||||
|
||||
def hash_monitor_names(service):
|
||||
"""
|
||||
Uses the get_mon_map() function to get information about the monitor
|
||||
cluster.
|
||||
Hash the name of each monitor. Return a sorted list of monitor hashes
|
||||
in an ascending order.
|
||||
:param service: six.string_types. The Ceph user name to run the command under
|
||||
:rtype : dict. json dict of monitor name, ip address and rank
|
||||
example: {
|
||||
'name': 'ip-172-31-13-165',
|
||||
'rank': 0,
|
||||
'addr': '172.31.13.165:6789/0'}
|
||||
"""
|
||||
try:
|
||||
hash_list = []
|
||||
monitor_list = get_mon_map(service=service)
|
||||
if monitor_list['monmap']['mons']:
|
||||
for mon in monitor_list['monmap']['mons']:
|
||||
hash_list.append(
|
||||
hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
|
||||
return sorted(hash_list)
|
||||
else:
|
||||
return None
|
||||
except (ValueError, CalledProcessError):
|
||||
raise
|
||||
|
||||
|
||||
def monitor_key_delete(service, key):
|
||||
"""
|
||||
Delete a key and value pair from the monitor cluster
|
||||
:param service: six.string_types. The Ceph user name to run the command under
|
||||
Deletes a key value pair on the monitor cluster.
|
||||
:param key: six.string_types. The key to delete.
|
||||
"""
|
||||
try:
|
||||
check_output(
|
||||
['ceph', '--id', service,
|
||||
'config-key', 'del', str(key)])
|
||||
except CalledProcessError as e:
|
||||
log("Monitor config-key put failed with message: {}".format(
|
||||
e.output))
|
||||
raise
|
||||
|
||||
|
||||
def monitor_key_set(service, key, value):
|
||||
"""
|
||||
Sets a key value pair on the monitor cluster.
|
||||
:param service: six.string_types. The Ceph user name to run the command under
|
||||
:param key: six.string_types. The key to set.
|
||||
:param value: The value to set. This will be converted to a string
|
||||
before setting
|
||||
"""
|
||||
try:
|
||||
check_output(
|
||||
['ceph', '--id', service,
|
||||
'config-key', 'put', str(key), str(value)])
|
||||
except CalledProcessError as e:
|
||||
log("Monitor config-key put failed with message: {}".format(
|
||||
e.output))
|
||||
raise
|
||||
|
||||
|
||||
def monitor_key_get(service, key):
|
||||
"""
|
||||
Gets the value of an existing key in the monitor cluster.
|
||||
:param service: six.string_types. The Ceph user name to run the command under
|
||||
:param key: six.string_types. The key to search for.
|
||||
:return: Returns the value of that key or None if not found.
|
||||
"""
|
||||
try:
|
||||
output = check_output(
|
||||
['ceph', '--id', service,
|
||||
'config-key', 'get', str(key)])
|
||||
return output
|
||||
except CalledProcessError as e:
|
||||
log("Monitor config-key get failed with message: {}".format(
|
||||
e.output))
|
||||
return None
|
||||
|
||||
|
||||
def monitor_key_exists(service, key):
|
||||
"""
|
||||
Searches for the existence of a key in the monitor cluster.
|
||||
:param service: six.string_types. The Ceph user name to run the command under
|
||||
:param key: six.string_types. The key to search for
|
||||
:return: Returns True if the key exists, False if not and raises an
|
||||
exception if an unknown error occurs. :raise: CalledProcessError if
|
||||
an unknown error occurs
|
||||
"""
|
||||
try:
|
||||
check_call(
|
||||
['ceph', '--id', service,
|
||||
'config-key', 'exists', str(key)])
|
||||
# I can return true here regardless because Ceph returns
|
||||
# ENOENT if the key wasn't found
|
||||
return True
|
||||
except CalledProcessError as e:
|
||||
if e.returncode == errno.ENOENT:
|
||||
return False
|
||||
else:
|
||||
log("Unknown error from ceph config-get exists: {} {}".format(
|
||||
e.returncode, e.output))
|
||||
raise
|
||||
|
||||
|
||||
def get_erasure_profile(service, name):
|
||||
"""
|
||||
:param service: six.string_types. The Ceph user name to run the command under
|
||||
|
||||
136
unit_tests/test_upgrade_roll.py
Normal file
136
unit_tests/test_upgrade_roll.py
Normal file
@@ -0,0 +1,136 @@
|
||||
__author__ = 'chris'
|
||||
import time
|
||||
|
||||
from mock import patch, call, MagicMock
|
||||
import sys
|
||||
|
||||
sys.path.append('/home/chris/repos/ceph-mon/hooks')
|
||||
|
||||
import test_utils
|
||||
import ceph_hooks
|
||||
|
||||
TO_PATCH = [
|
||||
'hookenv',
|
||||
'status_set',
|
||||
'config',
|
||||
'ceph',
|
||||
'log',
|
||||
'add_source',
|
||||
'apt_update',
|
||||
'apt_install',
|
||||
'service_stop',
|
||||
'service_start',
|
||||
'host',
|
||||
]
|
||||
|
||||
|
||||
def config_side_effect(*args):
|
||||
if args[0] == 'source':
|
||||
return 'cloud:trusty-kilo'
|
||||
elif args[0] == 'key':
|
||||
return 'key'
|
||||
elif args[0] == 'release-version':
|
||||
return 'cloud:trusty-kilo'
|
||||
|
||||
|
||||
previous_node_start_time = time.time() - (9 * 60)
|
||||
|
||||
|
||||
def monitor_key_side_effect(*args):
|
||||
if args[1] == \
|
||||
'ip-192-168-1-2_done':
|
||||
return False
|
||||
elif args[1] == \
|
||||
'ip-192-168-1-2_start':
|
||||
# Return that the previous node started 9 minutes ago
|
||||
return previous_node_start_time
|
||||
|
||||
|
||||
class UpgradeRollingTestCase(test_utils.CharmTestCase):
|
||||
def setUp(self):
|
||||
super(UpgradeRollingTestCase, self).setUp(ceph_hooks, TO_PATCH)
|
||||
|
||||
@patch('ceph_hooks.roll_monitor_cluster')
|
||||
def test_check_for_upgrade(self, roll_monitor_cluster):
|
||||
self.host.lsb_release.return_value = {
|
||||
'DISTRIB_CODENAME': 'trusty',
|
||||
}
|
||||
previous_mock = MagicMock().return_value
|
||||
previous_mock.previous.return_value = "cloud:trusty-juno"
|
||||
self.hookenv.config.side_effect = [previous_mock,
|
||||
config_side_effect('source')]
|
||||
ceph_hooks.check_for_upgrade()
|
||||
|
||||
roll_monitor_cluster.assert_called_with('cloud:trusty-kilo')
|
||||
|
||||
@patch('ceph_hooks.upgrade_monitor')
|
||||
@patch('ceph_hooks.monitor_key_set')
|
||||
def test_lock_and_roll(self, monitor_key_set, upgrade_monitor):
|
||||
monitor_key_set.monitor_key_set.return_value = None
|
||||
ceph_hooks.lock_and_roll(my_name='ip-192-168-1-2')
|
||||
upgrade_monitor.assert_called_once_with()
|
||||
|
||||
def test_upgrade_monitor(self):
|
||||
self.config.side_effect = config_side_effect
|
||||
self.ceph.get_version.return_value = "0.80"
|
||||
self.ceph.systemd.return_value = False
|
||||
ceph_hooks.upgrade_monitor()
|
||||
self.service_stop.assert_called_with('ceph-mon-all')
|
||||
self.service_start.assert_called_with('ceph-mon-all')
|
||||
self.status_set.assert_has_calls([
|
||||
call('maintenance', 'Upgrading monitor'),
|
||||
call('active', '')
|
||||
])
|
||||
|
||||
@patch('ceph_hooks.lock_and_roll')
|
||||
@patch('ceph_hooks.wait_on_previous_node')
|
||||
@patch('ceph_hooks.get_mon_map')
|
||||
@patch('ceph_hooks.socket')
|
||||
def test_roll_monitor_cluster_second(self,
|
||||
socket,
|
||||
get_mon_map,
|
||||
wait_on_previous_node,
|
||||
lock_and_roll):
|
||||
wait_on_previous_node.return_value = None
|
||||
socket.gethostname.return_value = "ip-192-168-1-3"
|
||||
get_mon_map.return_value = {
|
||||
'monmap': {
|
||||
'mons': [
|
||||
{
|
||||
'name': 'ip-192-168-1-2',
|
||||
},
|
||||
{
|
||||
'name': 'ip-192-168-1-3',
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
ceph_hooks.roll_monitor_cluster('0.94.1')
|
||||
self.status_set.assert_called_with(
|
||||
'blocked',
|
||||
'Waiting on ip-192-168-1-2 to finish upgrading')
|
||||
lock_and_roll.assert_called_with(my_name="ip-192-168-1-3")
|
||||
|
||||
@patch('ceph_hooks.monitor_key_get')
|
||||
@patch('ceph_hooks.monitor_key_exists')
|
||||
def test_wait_on_previous_node(self,
|
||||
monitor_key_exists,
|
||||
monitor_key_get):
|
||||
monitor_key_get.side_effect = monitor_key_side_effect
|
||||
monitor_key_exists.return_value = False
|
||||
|
||||
ceph_hooks.wait_on_previous_node("ip-192-168-1-2")
|
||||
|
||||
# Make sure we checked to see if the previous node started
|
||||
monitor_key_get.assert_has_calls(
|
||||
[call('admin', 'ip-192-168-1-2_start')]
|
||||
)
|
||||
# Make sure we checked to see if the previous node was finished
|
||||
monitor_key_exists.assert_has_calls(
|
||||
[call('admin', 'ip-192-168-1-2_done')]
|
||||
)
|
||||
# Make sure we waited at last once before proceeding
|
||||
self.log.assert_has_calls(
|
||||
[call('Previous node is: ip-192-168-1-2')],
|
||||
[call('ip-192-168-1-2 is not finished. Waiting')],
|
||||
)
|
||||
Reference in New Issue
Block a user