Add a jitter to heartbeat retries
Currently, if heartbeat fails, we reschedule it after 5 seconds. This is fine for the first retry, but it can cause a thundering herd problem when a lot of nodes fail to heartbeat at once. This change adds jitter to the minimum wait of 5 seconds. The jitter is not applied for forced heartbeats: they still have a minimum wait of exactly 5 seconds from the last heartbeat. The code is re-ordered to move the interval calculation to one place. Bonus: correctly logging the next interval. The unit tests have been rewritten to test the heartbeat process step by step and not rely on the exact sequence of the calls. Closes-Bug: #2038438 Change-Id: I4c4207b15fb3d48b55e340b7b3b54af833f92cb5
This commit is contained in:
parent
62041d6d9e
commit
2ab8364649
@ -68,14 +68,24 @@ class IronicPythonAgentStatus(encoding.Serializable):
|
|||||||
self.version = version
|
self.version = version
|
||||||
|
|
||||||
|
|
||||||
|
def _with_jitter(value, min_multiplier, max_multiplier):
|
||||||
|
interval_multiplier = random.uniform(min_multiplier, max_multiplier)
|
||||||
|
return value * interval_multiplier
|
||||||
|
|
||||||
|
|
||||||
class IronicPythonAgentHeartbeater(threading.Thread):
|
class IronicPythonAgentHeartbeater(threading.Thread):
|
||||||
"""Thread that periodically heartbeats to Ironic."""
|
"""Thread that periodically heartbeats to Ironic."""
|
||||||
|
|
||||||
# If we could wait at most N seconds between heartbeats (or in case of an
|
# If we could wait at most N seconds between heartbeats, we will instead
|
||||||
# error) we will instead wait r x N seconds, where r is a random value
|
# wait r x N seconds, where r is a random value between these multipliers.
|
||||||
# between these multipliers.
|
|
||||||
min_jitter_multiplier = 0.3
|
min_jitter_multiplier = 0.3
|
||||||
max_jitter_multiplier = 0.6
|
max_jitter_multiplier = 0.6
|
||||||
|
# Error retry between 5 and 10 seconds, at least 12 retries with
|
||||||
|
# the default ramdisk_heartbeat_timeout of 300 and the worst case interval
|
||||||
|
# jitter of 0.6.
|
||||||
|
min_heartbeat_interval = 5
|
||||||
|
min_error_jitter_multiplier = 1.0
|
||||||
|
max_error_jitter_multiplier = 2.0
|
||||||
|
|
||||||
def __init__(self, agent):
|
def __init__(self, agent):
|
||||||
"""Initialize the heartbeat thread.
|
"""Initialize the heartbeat thread.
|
||||||
@ -97,19 +107,39 @@ class IronicPythonAgentHeartbeater(threading.Thread):
|
|||||||
LOG.info('Starting heartbeater')
|
LOG.info('Starting heartbeater')
|
||||||
self.agent.set_agent_advertise_addr()
|
self.agent.set_agent_advertise_addr()
|
||||||
|
|
||||||
while not self.stop_event.wait(min(self.interval, 5)):
|
while self._run_next():
|
||||||
if self._heartbeat_expected():
|
|
||||||
self.do_heartbeat()
|
|
||||||
eventlet.sleep(0)
|
eventlet.sleep(0)
|
||||||
|
|
||||||
|
def _run_next(self):
|
||||||
|
# The logic here makes sure we don't wait exactly 5 seconds more or
|
||||||
|
# less regardless of the current interval since it may cause a
|
||||||
|
# thundering herd problem when a lot of agents are heartbeating.
|
||||||
|
# Essentially, if the next heartbeat is due in 2 seconds, don't wait 5.
|
||||||
|
# But if the next one is scheduled in 2 minutes, do wait 5 to account
|
||||||
|
# for forced heartbeats.
|
||||||
|
wait = min(
|
||||||
|
self.min_heartbeat_interval,
|
||||||
|
# This operation checks how much of the initially planned interval
|
||||||
|
# we have still left. Compare with 0 in case we overshoot the goal.
|
||||||
|
max(0, self.interval - (_time() - self.previous_heartbeat)),
|
||||||
|
)
|
||||||
|
if self.stop_event.wait(wait):
|
||||||
|
return False # done
|
||||||
|
|
||||||
|
if self._heartbeat_expected():
|
||||||
|
self.do_heartbeat()
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
def _heartbeat_expected(self):
|
def _heartbeat_expected(self):
|
||||||
|
elapsed = _time() - self.previous_heartbeat
|
||||||
|
|
||||||
# Normal heartbeating
|
# Normal heartbeating
|
||||||
if _time() > self.previous_heartbeat + self.interval:
|
if elapsed >= self.interval:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Forced heartbeating, but once in 5 seconds
|
# Forced heartbeating, but once in 5 seconds
|
||||||
if (self.heartbeat_forced
|
if self.heartbeat_forced and elapsed > self.min_heartbeat_interval:
|
||||||
and _time() > self.previous_heartbeat + 5):
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def do_heartbeat(self):
|
def do_heartbeat(self):
|
||||||
@ -121,20 +151,24 @@ class IronicPythonAgentHeartbeater(threading.Thread):
|
|||||||
advertise_protocol=self.agent.advertise_protocol,
|
advertise_protocol=self.agent.advertise_protocol,
|
||||||
generated_cert=self.agent.generated_cert,
|
generated_cert=self.agent.generated_cert,
|
||||||
)
|
)
|
||||||
LOG.info('heartbeat successful')
|
except Exception as exc:
|
||||||
|
if isinstance(exc, errors.HeartbeatConflictError):
|
||||||
|
LOG.warning('conflict error sending heartbeat to %s',
|
||||||
|
self.agent.api_url)
|
||||||
|
else:
|
||||||
|
LOG.exception('error sending heartbeat to %s',
|
||||||
|
self.agent.api_url)
|
||||||
|
self.interval = _with_jitter(self.min_heartbeat_interval,
|
||||||
|
self.min_error_jitter_multiplier,
|
||||||
|
self.max_error_jitter_multiplier)
|
||||||
|
else:
|
||||||
|
LOG.debug('heartbeat successful')
|
||||||
self.heartbeat_forced = False
|
self.heartbeat_forced = False
|
||||||
self.previous_heartbeat = _time()
|
self.interval = _with_jitter(self.agent.heartbeat_timeout,
|
||||||
except errors.HeartbeatConflictError:
|
self.min_jitter_multiplier,
|
||||||
LOG.warning('conflict error sending heartbeat to %s',
|
self.max_jitter_multiplier)
|
||||||
self.agent.api_url)
|
self.previous_heartbeat = _time()
|
||||||
except Exception:
|
LOG.info('sleeping before next heartbeat, interval: %s', self.interval)
|
||||||
LOG.exception('error sending heartbeat to %s', self.agent.api_url)
|
|
||||||
finally:
|
|
||||||
interval_multiplier = random.uniform(self.min_jitter_multiplier,
|
|
||||||
self.max_jitter_multiplier)
|
|
||||||
self.interval = self.agent.heartbeat_timeout * interval_multiplier
|
|
||||||
LOG.info('sleeping before next heartbeat, interval: %s',
|
|
||||||
self.interval)
|
|
||||||
|
|
||||||
def force_heartbeat(self):
|
def force_heartbeat(self):
|
||||||
self.heartbeat_forced = True
|
self.heartbeat_forced = True
|
||||||
|
@ -50,6 +50,20 @@ class FakeExtension(base.BaseAgentExtension):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FakeClock:
|
||||||
|
current = 0
|
||||||
|
last_wait = None
|
||||||
|
wait_result = False
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
return self.current
|
||||||
|
|
||||||
|
def wait(self, interval):
|
||||||
|
self.last_wait = interval
|
||||||
|
self.current += interval
|
||||||
|
return self.wait_result
|
||||||
|
|
||||||
|
|
||||||
class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestHeartbeater, self).setUp()
|
super(TestHeartbeater, self).setUp()
|
||||||
@ -64,65 +78,79 @@ class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
|||||||
@mock.patch('ironic_python_agent.agent._time', autospec=True)
|
@mock.patch('ironic_python_agent.agent._time', autospec=True)
|
||||||
@mock.patch('random.uniform', autospec=True)
|
@mock.patch('random.uniform', autospec=True)
|
||||||
def test_heartbeat(self, mock_uniform, mock_time):
|
def test_heartbeat(self, mock_uniform, mock_time):
|
||||||
time_responses = []
|
clock = FakeClock()
|
||||||
uniform_responses = []
|
mock_time.side_effect = clock.get
|
||||||
heartbeat_responses = []
|
self.heartbeater.stop_event.wait.side_effect = clock.wait
|
||||||
wait_responses = []
|
|
||||||
expected_stop_calls = []
|
|
||||||
|
|
||||||
# FIRST RUN:
|
heartbeat_mock = self.heartbeater.api.heartbeat
|
||||||
# initial delay is 0
|
self.mock_agent.heartbeat_timeout = 20
|
||||||
expected_stop_calls.append(mock.call(0))
|
|
||||||
wait_responses.append(False)
|
|
||||||
# next heartbeat due at t=100
|
|
||||||
heartbeat_responses.append(100)
|
|
||||||
# random interval multiplier is 0.5
|
|
||||||
uniform_responses.append(0.5)
|
|
||||||
# time is now 50
|
|
||||||
time_responses.append(50)
|
|
||||||
|
|
||||||
# SECOND RUN:
|
# First run right after start
|
||||||
expected_stop_calls.append(mock.call(5))
|
mock_uniform.return_value = 0.6
|
||||||
wait_responses.append(False)
|
self.assertTrue(self.heartbeater._run_next())
|
||||||
# next heartbeat due at t=180
|
self.assertEqual(0, clock.last_wait)
|
||||||
heartbeat_responses.append(180)
|
heartbeat_mock.assert_called_once_with(
|
||||||
# random interval multiplier is 0.4
|
uuid=self.mock_agent.get_node_uuid.return_value,
|
||||||
uniform_responses.append(0.4)
|
advertise_address=self.mock_agent.advertise_address,
|
||||||
# time is now 80
|
advertise_protocol=self.mock_agent.advertise_protocol,
|
||||||
time_responses.append(80)
|
generated_cert=self.mock_agent.generated_cert)
|
||||||
# add one response for _time in _heartbeat_expected
|
heartbeat_mock.reset_mock()
|
||||||
time_responses.append(80)
|
self.assertEqual(12, self.heartbeater.interval) # 20*0.6
|
||||||
|
self.assertEqual(0, self.heartbeater.previous_heartbeat)
|
||||||
|
|
||||||
# THIRD RUN:
|
# A few empty runs before reaching the next heartbeat
|
||||||
expected_stop_calls.append(mock.call(5))
|
for ts in [5, 10]:
|
||||||
wait_responses.append(False)
|
self.assertTrue(self.heartbeater._run_next())
|
||||||
# this heartbeat attempt fails
|
self.assertEqual(5, clock.last_wait)
|
||||||
heartbeat_responses.append(Exception('uh oh!'))
|
self.assertEqual(ts, clock.current)
|
||||||
# random interval multiplier is 0.5
|
heartbeat_mock.assert_not_called()
|
||||||
uniform_responses.append(0.5)
|
self.assertEqual(0, self.heartbeater.previous_heartbeat)
|
||||||
# we check the time to generate a fake deadline, now t=125
|
|
||||||
time_responses.append(125)
|
|
||||||
# time is now 125.5
|
|
||||||
time_responses.append(125.5)
|
|
||||||
|
|
||||||
# FOURTH RUN:
|
# Second run when the heartbeat is due
|
||||||
expected_stop_calls.append(mock.call(5))
|
mock_uniform.return_value = 0.4
|
||||||
# Stop now
|
self.assertTrue(self.heartbeater._run_next())
|
||||||
wait_responses.append(True)
|
self.assertEqual(2, clock.last_wait) # 12-2*5
|
||||||
|
self.assertTrue(heartbeat_mock.called)
|
||||||
|
heartbeat_mock.reset_mock()
|
||||||
|
self.assertEqual(8, self.heartbeater.interval) # 20*0.4
|
||||||
|
self.assertEqual(12, self.heartbeater.previous_heartbeat)
|
||||||
|
|
||||||
# Hook it up and run it
|
# One empty run before reaching the next heartbeat
|
||||||
mock_time.side_effect = time_responses
|
self.assertTrue(self.heartbeater._run_next())
|
||||||
mock_uniform.side_effect = uniform_responses
|
self.assertEqual(5, clock.last_wait)
|
||||||
self.mock_agent.heartbeat_timeout = 50
|
heartbeat_mock.assert_not_called()
|
||||||
self.heartbeater.api.heartbeat.side_effect = heartbeat_responses
|
self.assertEqual(12, self.heartbeater.previous_heartbeat)
|
||||||
self.heartbeater.stop_event.wait.side_effect = wait_responses
|
|
||||||
self.heartbeater.run()
|
|
||||||
|
|
||||||
# Validate expectations
|
# Failed run resulting in a fast retry
|
||||||
self.assertEqual(expected_stop_calls,
|
mock_uniform.return_value = 1.2
|
||||||
self.heartbeater.stop_event.wait.call_args_list)
|
heartbeat_mock.side_effect = Exception('uh oh!')
|
||||||
self.assertEqual(self.heartbeater.api.heartbeat.call_count, 2)
|
self.assertTrue(self.heartbeater._run_next())
|
||||||
self.assertEqual(mock_time.call_count, 5)
|
self.assertEqual(3, clock.last_wait) # 8-5
|
||||||
|
self.assertTrue(heartbeat_mock.called)
|
||||||
|
heartbeat_mock.reset_mock(side_effect=True)
|
||||||
|
self.assertEqual(6, self.heartbeater.interval) # 5*1.2
|
||||||
|
self.assertEqual(20, self.heartbeater.previous_heartbeat)
|
||||||
|
|
||||||
|
# One empty run because 6>5
|
||||||
|
self.assertTrue(self.heartbeater._run_next())
|
||||||
|
self.assertEqual(5, clock.last_wait)
|
||||||
|
heartbeat_mock.assert_not_called()
|
||||||
|
self.assertEqual(20, self.heartbeater.previous_heartbeat)
|
||||||
|
|
||||||
|
# Retry after the remaining 1 second
|
||||||
|
mock_uniform.return_value = 0.5
|
||||||
|
self.assertTrue(self.heartbeater._run_next())
|
||||||
|
self.assertEqual(1, clock.last_wait)
|
||||||
|
self.assertTrue(heartbeat_mock.called)
|
||||||
|
heartbeat_mock.reset_mock()
|
||||||
|
self.assertEqual(10, self.heartbeater.interval) # 20*0.5
|
||||||
|
self.assertEqual(26, self.heartbeater.previous_heartbeat)
|
||||||
|
|
||||||
|
# Stop on the next empty run
|
||||||
|
clock.wait_result = True
|
||||||
|
self.assertFalse(self.heartbeater._run_next())
|
||||||
|
heartbeat_mock.assert_not_called()
|
||||||
|
self.assertEqual(26, self.heartbeater.previous_heartbeat)
|
||||||
|
|
||||||
@mock.patch('ironic_python_agent.agent._time', autospec=True)
|
@mock.patch('ironic_python_agent.agent._time', autospec=True)
|
||||||
def test__heartbeat_expected(self, mock_time):
|
def test__heartbeat_expected(self, mock_time):
|
||||||
@ -132,7 +160,7 @@ class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
|||||||
self.heartbeater.interval = 0
|
self.heartbeater.interval = 0
|
||||||
self.heartbeater.heartbeat_forced = False
|
self.heartbeater.heartbeat_forced = False
|
||||||
mock_time.return_value = 0
|
mock_time.return_value = 0
|
||||||
self.assertFalse(self.heartbeater._heartbeat_expected())
|
self.assertTrue(self.heartbeater._heartbeat_expected())
|
||||||
|
|
||||||
# 1st cadence
|
# 1st cadence
|
||||||
self.heartbeater.previous_heartbeat = 0
|
self.heartbeater.previous_heartbeat = 0
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Adds random jitter to retried heartbeats after Ironic returns an error.
|
||||||
|
Previously, heartbeats would be retried after 5 seconds, potentially
|
||||||
|
causing a thundering herd problem if many nodes fail to heartbeat at
|
||||||
|
the same time.
|
Loading…
Reference in New Issue
Block a user