Add a progress watchdog for OSD upgrades
This patch add the concept of a watchdog to the upgrade_monitor so that the charm can achieve two objectives of 1. Waiting for much longer, but 2. detecting whether the previous node has died / gone away. This is needed for 'large' OSDs where the time to upgrade a node may exceed the current limit of 10 minutes, but also not to wait for 30 minutes on a dead previous node. The watchdog implements two timeouts and an addition 'alive' key from the previous node to indicate that it is still running. Otherwise, functionality is identical. Change-Id: Ia450e936c2096f092af3be5a369b7abaf5023b16 Related-Bug: #1762852
This commit is contained in:
parent
5c06f74ab3
commit
b688f04889
|
@ -2169,15 +2169,18 @@ def roll_monitor_cluster(new_version, upgrade_key):
|
|||
status_set('blocked', 'failed to upgrade monitor')
|
||||
|
||||
|
||||
# TODO(jamespage):
|
||||
# Mimic support will need to ensure that ceph-mgr daemons are also
|
||||
# restarted during upgrades - probably through use of one of the
|
||||
# high level systemd targets shipped by the packaging.
|
||||
def upgrade_monitor(new_version):
|
||||
# For E731 we can't assign a lambda, therefore, instead pass this.
|
||||
def noop():
|
||||
pass
|
||||
|
||||
|
||||
def upgrade_monitor(new_version, kick_function=None):
|
||||
"""Upgrade the current ceph monitor to the new version
|
||||
|
||||
:param new_version: String version to upgrade to.
|
||||
"""
|
||||
if kick_function is None:
|
||||
kick_function = noop
|
||||
current_version = get_version()
|
||||
status_set("maintenance", "Upgrading monitor")
|
||||
log("Current ceph version is {}".format(current_version))
|
||||
|
@ -2186,6 +2189,7 @@ def upgrade_monitor(new_version):
|
|||
# Needed to determine if whether to stop/start ceph-mgr
|
||||
luminous_or_later = cmp_pkgrevno('ceph-common', '12.2.0') >= 0
|
||||
|
||||
kick_function()
|
||||
try:
|
||||
add_source(config('source'), config('key'))
|
||||
apt_update(fatal=True)
|
||||
|
@ -2194,6 +2198,7 @@ def upgrade_monitor(new_version):
|
|||
err))
|
||||
status_set("blocked", "Upgrade to {} failed".format(new_version))
|
||||
sys.exit(1)
|
||||
kick_function()
|
||||
try:
|
||||
if systemd():
|
||||
service_stop('ceph-mon')
|
||||
|
@ -2204,6 +2209,7 @@ def upgrade_monitor(new_version):
|
|||
else:
|
||||
service_stop('ceph-mon-all')
|
||||
apt_install(packages=determine_packages(), fatal=True)
|
||||
kick_function()
|
||||
|
||||
owner = ceph_user()
|
||||
|
||||
|
@ -2217,6 +2223,8 @@ def upgrade_monitor(new_version):
|
|||
group=owner,
|
||||
follow_links=True)
|
||||
|
||||
kick_function()
|
||||
|
||||
# Ensure that mon directory is user writable
|
||||
hostname = socket.gethostname()
|
||||
path = '/var/lib/ceph/mon/ceph-{}'.format(hostname)
|
||||
|
@ -2257,13 +2265,22 @@ def lock_and_roll(upgrade_key, service, my_name, version):
|
|||
start_timestamp))
|
||||
monitor_key_set(upgrade_key, "{}_{}_{}_start".format(
|
||||
service, my_name, version), start_timestamp)
|
||||
|
||||
# alive indication:
|
||||
alive_function = (
|
||||
lambda: monitor_key_set(
|
||||
upgrade_key, "{}_{}_{}_alive"
|
||||
.format(service, my_name, version), time.time()))
|
||||
dog = WatchDog(kick_interval=3 * 60,
|
||||
kick_function=alive_function)
|
||||
|
||||
log("Rolling")
|
||||
|
||||
# This should be quick
|
||||
if service == 'osd':
|
||||
upgrade_osd(version)
|
||||
upgrade_osd(version, kick_function=dog.kick_the_dog)
|
||||
elif service == 'mon':
|
||||
upgrade_monitor(version)
|
||||
upgrade_monitor(version, kick_function=dog.kick_the_dog)
|
||||
else:
|
||||
log("Unknown service {}. Unable to upgrade".format(service),
|
||||
level=ERROR)
|
||||
|
@ -2294,45 +2311,225 @@ def wait_on_previous_node(upgrade_key, service, previous_node, version):
|
|||
"""
|
||||
log("Previous node is: {}".format(previous_node))
|
||||
|
||||
previous_node_finished = monitor_key_exists(
|
||||
upgrade_key,
|
||||
"{}_{}_{}_done".format(service, previous_node, version))
|
||||
|
||||
while previous_node_finished is False:
|
||||
log("{} is not finished. Waiting".format(previous_node))
|
||||
# Has this node been trying to upgrade for longer than
|
||||
# 10 minutes?
|
||||
# If so then move on and consider that node dead.
|
||||
|
||||
# NOTE: This assumes the clusters clocks are somewhat accurate
|
||||
# If the hosts clock is really far off it may cause it to skip
|
||||
# the previous node even though it shouldn't.
|
||||
current_timestamp = time.time()
|
||||
previous_node_start_time = monitor_key_get(
|
||||
previous_node_started_f = (
|
||||
lambda: monitor_key_exists(
|
||||
upgrade_key,
|
||||
"{}_{}_{}_start".format(service, previous_node, version))
|
||||
if (previous_node_start_time is not None and
|
||||
((current_timestamp - (10 * 60)) >
|
||||
float(previous_node_start_time))):
|
||||
# NOTE(jamespage):
|
||||
# Previous node is probably dead as we've been waiting
|
||||
# for 10 minutes - lets move on and upgrade
|
||||
log("Waited 10 mins on node {}. current time: {} > "
|
||||
"previous node start time: {} Moving on".format(
|
||||
previous_node,
|
||||
(current_timestamp - (10 * 60)),
|
||||
previous_node_start_time))
|
||||
return
|
||||
# NOTE(jamespage)
|
||||
# Previous node has not started, or started less than
|
||||
# 10 minutes ago - sleep a random amount of time and
|
||||
# then check again.
|
||||
wait_time = random.randrange(5, 30)
|
||||
log('waiting for {} seconds'.format(wait_time))
|
||||
time.sleep(wait_time)
|
||||
previous_node_finished = monitor_key_exists(
|
||||
"{}_{}_{}_start".format(service, previous_node, version)))
|
||||
previous_node_finished_f = (
|
||||
lambda: monitor_key_exists(
|
||||
upgrade_key,
|
||||
"{}_{}_{}_done".format(service, previous_node, version))
|
||||
"{}_{}_{}_done".format(service, previous_node, version)))
|
||||
previous_node_alive_time_f = (
|
||||
lambda: monitor_key_get(
|
||||
upgrade_key,
|
||||
"{}_{}_{}_alive".format(service, previous_node, version)))
|
||||
|
||||
# wait for 30 minutes until the previous node starts. We don't proceed
|
||||
# unless we get a start condition.
|
||||
try:
|
||||
WatchDog.wait_until(previous_node_started_f, timeout=30 * 60)
|
||||
except WatchDog.WatchDogTimeoutException:
|
||||
log("Waited for previous node to start for 30 minutes. "
|
||||
"It didn't start, so may have a serious issue. Continuing with "
|
||||
"upgrade of this node.",
|
||||
level=WARNING)
|
||||
return
|
||||
|
||||
# keep the time it started from this nodes' perspective.
|
||||
previous_node_started_at = time.time()
|
||||
log("Detected that previous node {} has started. Time now: {}"
|
||||
.format(previous_node, previous_node_started_at))
|
||||
|
||||
# Now wait for the node to complete. The node may optionally be kicking
|
||||
# with the *_alive key, which allows this node to wait longer as it 'knows'
|
||||
# the other node is proceeding.
|
||||
try:
|
||||
WatchDog.timed_wait(kicked_at_function=previous_node_alive_time_f,
|
||||
complete_function=previous_node_finished_f,
|
||||
wait_time=30 * 60,
|
||||
compatibility_wait_time=10 * 60,
|
||||
max_kick_interval=5 * 60)
|
||||
except WatchDog.WatchDogDeadException:
|
||||
# previous node was kicking, but timed out; log this condition and move
|
||||
# on.
|
||||
now = time.time()
|
||||
waited = int((now - previous_node_started_at) / 60)
|
||||
log("Previous node started, but has now not ticked for 5 minutes. "
|
||||
"Waited total of {} mins on node {}. current time: {} > "
|
||||
"previous node start time: {}. "
|
||||
"Continuing with upgrade of this node."
|
||||
.format(waited, previous_node, now, previous_node_started_at),
|
||||
level=WARNING)
|
||||
except WatchDog.WatchDogTimeoutException:
|
||||
# previous node never kicked, or simply took too long; log this
|
||||
# condition and move on.
|
||||
now = time.time()
|
||||
waited = int((now - previous_node_started_at) / 60)
|
||||
log("Previous node is taking too long; assuming it has died."
|
||||
"Waited {} mins on node {}. current time: {} > "
|
||||
"previous node start time: {}. "
|
||||
"Continuing with upgrade of this node."
|
||||
.format(waited, previous_node, now, previous_node_started_at),
|
||||
level=WARNING)
|
||||
|
||||
|
||||
class WatchDog(object):
|
||||
"""Watch a dog; basically a kickable timer with a timeout between two async
|
||||
units.
|
||||
|
||||
The idea is that you have an overall timeout and then can kick that timeout
|
||||
with intermediary hits, with a max time between those kicks allowed.
|
||||
|
||||
Note that this watchdog doesn't rely on the clock of the other side; just
|
||||
roughly when it detects when the other side started. All timings are based
|
||||
on the local clock.
|
||||
|
||||
The kicker will not 'kick' more often than a set interval, regardless of
|
||||
how often the kick_the_dog() function is called. The kicker provides a
|
||||
function (lambda: -> None) that is called when the kick interval is
|
||||
reached.
|
||||
|
||||
The waiter calls the static method with a check function
|
||||
(lambda: -> Boolean) that indicates when the wait should be over and the
|
||||
maximum interval to wait. e.g. 30 minutes with a 5 minute kick interval.
|
||||
|
||||
So the waiter calls wait(f, 30, 3) and the kicker sets up a 3 minute kick
|
||||
interval, or however long it is expected for the key to propagate and to
|
||||
allow for other delays.
|
||||
|
||||
There is a compatibility mode where if the otherside never kicks, then it
|
||||
simply waits for the compatability timer.
|
||||
"""
|
||||
|
||||
class WatchDogDeadException(Exception):
|
||||
pass
|
||||
|
||||
class WatchDogTimeoutException(Exception):
|
||||
pass
|
||||
|
||||
def __init__(self, kick_interval=3 * 60, kick_function=None):
|
||||
"""Initialise a new WatchDog
|
||||
|
||||
:param kick_interval: the interval when this side kicks the other in
|
||||
seconds.
|
||||
:type kick_interval: Int
|
||||
:param kick_function: The function to call that does the kick.
|
||||
:type kick_function: Callable[]
|
||||
"""
|
||||
self.start_time = time.time()
|
||||
self.last_run_func = None
|
||||
self.last_kick_at = None
|
||||
self.kick_interval = kick_interval
|
||||
self.kick_f = kick_function
|
||||
|
||||
def kick_the_dog(self):
|
||||
"""Might call the kick_function if it's time.
|
||||
|
||||
This function can be called as frequently as needed, but will run the
|
||||
self.kick_function after kick_interval seconds have passed.
|
||||
"""
|
||||
now = time.time()
|
||||
if (self.last_run_func is None or
|
||||
(now - self.last_run_func > self.kick_interval)):
|
||||
if self.kick_f is not None:
|
||||
self.kick_f()
|
||||
self.last_run_func = now
|
||||
self.last_kick_at = now
|
||||
|
||||
@staticmethod
|
||||
def wait_until(wait_f, timeout=10 * 60):
|
||||
"""Wait for timeout seconds until the passed function return True.
|
||||
|
||||
:param wait_f: The function to call that will end the wait.
|
||||
:type wait_f: Callable[[], Boolean]
|
||||
:param timeout: The time to wait in seconds.
|
||||
:type timeout: int
|
||||
"""
|
||||
start_time = time.time()
|
||||
while(not wait_f()):
|
||||
now = time.time()
|
||||
if now > start_time + timeout:
|
||||
raise WatchDog.WatchDogTimeoutException()
|
||||
wait_time = random.randrange(5, 30)
|
||||
log('wait_until: waiting for {} seconds'.format(wait_time))
|
||||
time.sleep(wait_time)
|
||||
|
||||
@staticmethod
|
||||
def timed_wait(kicked_at_function,
|
||||
complete_function,
|
||||
wait_time=30 * 60,
|
||||
compatibility_wait_time=10 * 60,
|
||||
max_kick_interval=5 * 60):
|
||||
"""Wait a maximum time with an intermediate 'kick' time.
|
||||
|
||||
This function will wait for max_kick_interval seconds unless the
|
||||
kicked_at_function() call returns a time that is not older that
|
||||
max_kick_interval (in seconds). i.e. the other side can signal that it
|
||||
is still doing things during the max_kick_interval as long as it kicks
|
||||
at least every max_kick_interval seconds.
|
||||
|
||||
The maximum wait is "wait_time", but the otherside must keep kicking
|
||||
during this period.
|
||||
|
||||
The "compatibility_wait_time" is used if the other side never kicks
|
||||
(i.e. the kicked_at_function() always returns None. In this case the
|
||||
function wait up to "compatibility_wait_time".
|
||||
|
||||
Note that the type of the return from the kicked_at_function is an
|
||||
Optional[str], not a Float. The function will coerce this to a float
|
||||
for the comparison. This represents the return value of
|
||||
time.time() at the "other side". It's a string to simplify the
|
||||
function obtaining the time value from the other side.
|
||||
|
||||
The function raises WatchDogTimeoutException if either the
|
||||
compatibility_wait_time or the wait_time are exceeded.
|
||||
|
||||
The function raises WatchDogDeadException if the max_kick_interval is
|
||||
exceeded.
|
||||
|
||||
Note that it is possible that the first kick interval is extended to
|
||||
compatibility_wait_time if the "other side" doesn't kick immediately.
|
||||
The best solution is for the other side to kick early and often.
|
||||
|
||||
:param kicked_at_function: The function to call to retrieve the time
|
||||
that the other side 'kicked' at. None if the other side hasn't
|
||||
kicked.
|
||||
:type kicked_at_function: Callable[[], Optional[str]]
|
||||
:param complete_function: The callable that returns True when done.
|
||||
:type complete_function: Callable[[], Boolean]
|
||||
:param wait_time: the maximum time to wait, even with kicks, in
|
||||
seconds.
|
||||
:type wait_time: int
|
||||
:param compatibility_wait_time: The time to wait if no kicks are
|
||||
received, in seconds.
|
||||
:type compatibility_wait_time: int
|
||||
:param max_kick_interval: The maximum time allowed between kicks before
|
||||
the wait is over, in seconds:
|
||||
:type max_kick_interval: int
|
||||
:raises: WatchDog.WatchDogTimeoutException,
|
||||
WatchDog.WatchDogDeadException
|
||||
"""
|
||||
start_time = time.time()
|
||||
while True:
|
||||
if complete_function():
|
||||
break
|
||||
# the time when the waiting for unit last kicked.
|
||||
kicked_at = kicked_at_function()
|
||||
now = time.time()
|
||||
if kicked_at is None:
|
||||
# assume other end doesn't do alive kicks
|
||||
if (now - start_time > compatibility_wait_time):
|
||||
raise WatchDog.WatchDogTimeoutException()
|
||||
else:
|
||||
# other side is participating in kicks; must kick at least
|
||||
# every 'max_kick_interval' to stay alive.
|
||||
if (now - float(kicked_at) > max_kick_interval):
|
||||
raise WatchDog.WatchDogDeadException()
|
||||
if (now - start_time > wait_time):
|
||||
raise WatchDog.WatchDogTimeoutException()
|
||||
delay_time = random.randrange(5, 30)
|
||||
log('waiting for {} seconds'.format(delay_time))
|
||||
time.sleep(delay_time)
|
||||
|
||||
|
||||
def get_upgrade_position(osd_sorted_list, match_name):
|
||||
|
@ -2412,11 +2609,14 @@ def roll_osd_cluster(new_version, upgrade_key):
|
|||
status_set('blocked', 'failed to upgrade osd')
|
||||
|
||||
|
||||
def upgrade_osd(new_version):
|
||||
def upgrade_osd(new_version, kick_function=None):
|
||||
"""Upgrades the current osd
|
||||
|
||||
:param new_version: str. The new version to upgrade to
|
||||
"""
|
||||
if kick_function is None:
|
||||
kick_function = noop
|
||||
|
||||
current_version = get_version()
|
||||
status_set("maintenance", "Upgrading osd")
|
||||
log("Current ceph version is {}".format(current_version))
|
||||
|
@ -2431,10 +2631,13 @@ def upgrade_osd(new_version):
|
|||
status_set("blocked", "Upgrade to {} failed".format(new_version))
|
||||
sys.exit(1)
|
||||
|
||||
kick_function()
|
||||
|
||||
try:
|
||||
# Upgrade the packages before restarting the daemons.
|
||||
status_set('maintenance', 'Upgrading packages to %s' % new_version)
|
||||
apt_install(packages=determine_packages(), fatal=True)
|
||||
kick_function()
|
||||
|
||||
# If the upgrade does not need an ownership update of any of the
|
||||
# directories in the osd service directory, then simply restart
|
||||
|
@ -2458,13 +2661,16 @@ def upgrade_osd(new_version):
|
|||
os.listdir(CEPH_BASE_DIR))
|
||||
non_osd_dirs = map(lambda x: os.path.join(CEPH_BASE_DIR, x),
|
||||
non_osd_dirs)
|
||||
for path in non_osd_dirs:
|
||||
for i, path in enumerate(non_osd_dirs):
|
||||
if i % 100 == 0:
|
||||
kick_function()
|
||||
update_owner(path)
|
||||
|
||||
# Fast service restart wasn't an option because each of the OSD
|
||||
# directories need the ownership updated for all the files on
|
||||
# the OSD. Walk through the OSDs one-by-one upgrading the OSD.
|
||||
for osd_dir in _get_child_dirs(OSD_BASE_DIR):
|
||||
kick_function()
|
||||
try:
|
||||
osd_num = _get_osd_num_from_dirname(osd_dir)
|
||||
_upgrade_single_osd(osd_num, osd_dir)
|
||||
|
|
|
@ -16,7 +16,7 @@ import sys
|
|||
import time
|
||||
import unittest
|
||||
|
||||
from unittest.mock import patch, call, MagicMock
|
||||
from unittest.mock import patch, call, MagicMock, ANY
|
||||
|
||||
import charms_ceph.utils
|
||||
|
||||
|
@ -52,6 +52,14 @@ def monitor_key_side_effect(*args):
|
|||
return str(previous_node_start_time)
|
||||
|
||||
|
||||
def monitor_key_exists_side_effect(*args):
|
||||
if args[1] == 'mon_ip-192-168-1-2_0.94.1_start':
|
||||
return True
|
||||
if args[1] == 'mon_ip-192-168-1-2_0.94.1_done':
|
||||
return False
|
||||
raise Exception("Unexpected test argument")
|
||||
|
||||
|
||||
class UpgradeRollingTestCase(unittest.TestCase):
|
||||
|
||||
@patch('time.time')
|
||||
|
@ -65,7 +73,8 @@ class UpgradeRollingTestCase(unittest.TestCase):
|
|||
version='hammer',
|
||||
service='mon',
|
||||
upgrade_key='admin')
|
||||
upgrade_monitor.assert_called_once_with('hammer')
|
||||
upgrade_monitor.assert_called_once_with('hammer',
|
||||
kick_function=ANY)
|
||||
log.assert_has_calls(
|
||||
[
|
||||
call('monitor_key_set '
|
||||
|
@ -108,10 +117,14 @@ class UpgradeRollingTestCase(unittest.TestCase):
|
|||
local_mons.return_value = ['a']
|
||||
mock_cmp_pkgrevno.return_value = -1
|
||||
|
||||
charms_ceph.utils.upgrade_monitor('hammer')
|
||||
mock_kick_function = MagicMock()
|
||||
|
||||
charms_ceph.utils.upgrade_monitor('hammer',
|
||||
kick_function=mock_kick_function)
|
||||
service_stop.assert_called_with('ceph-mon-all')
|
||||
service_start.assert_called_with('ceph-mon-all')
|
||||
add_source.assert_called_with('cloud:trusty-kilo', 'key')
|
||||
mock_kick_function.assert_called()
|
||||
|
||||
log.assert_has_calls(
|
||||
[
|
||||
|
@ -317,7 +330,7 @@ class UpgradeRollingTestCase(unittest.TestCase):
|
|||
|
||||
mock_time.time.side_effect = fake_time
|
||||
monitor_key_get.side_effect = monitor_key_side_effect
|
||||
monitor_key_exists.return_value = False
|
||||
monitor_key_exists.side_effect = monitor_key_exists_side_effect
|
||||
|
||||
charms_ceph.utils.wait_on_previous_node(
|
||||
previous_node="ip-192-168-1-2",
|
||||
|
@ -325,18 +338,22 @@ class UpgradeRollingTestCase(unittest.TestCase):
|
|||
service='mon',
|
||||
upgrade_key='admin'
|
||||
)
|
||||
# Make sure the function tested that start exists.
|
||||
monitor_key_exists.assert_any_call('admin',
|
||||
'mon_ip-192-168-1-2_0.94.1_start')
|
||||
# Make sure the Watchdog checked at least once for alive.
|
||||
monitor_key_get.assert_any_call('admin',
|
||||
'mon_ip-192-168-1-2_0.94.1_alive')
|
||||
|
||||
# Make sure we checked to see if the previous node started
|
||||
monitor_key_get.assert_has_calls(
|
||||
[call('admin', 'mon_ip-192-168-1-2_0.94.1_start')]
|
||||
)
|
||||
# Make sure we checked to see if the previous node was finished
|
||||
monitor_key_exists.assert_has_calls(
|
||||
[call('admin', 'mon_ip-192-168-1-2_0.94.1_done')]
|
||||
)
|
||||
|
||||
# Make sure we waited at last once before proceeding
|
||||
log.assert_has_calls(
|
||||
[call('Previous node is: ip-192-168-1-2')],
|
||||
[call('ip-192-168-1-2 is not finished. Waiting')],
|
||||
)
|
||||
|
||||
self.assertGreaterEqual(tval[0], previous_node_start_time + 600)
|
||||
|
|
Loading…
Reference in New Issue