Coalesce heartbeats
The IPA sends heartbeats to the conductor periodically and when requested, e.g. at the end of asynchronous commands. In order to avoid to send such notifications in too quick succession, e.g. when two asynchronous commands finish at the same time or when the periodic heartbeat was just sent right before a command ended, this patch proposes to coalesce heartbeats which are close together timewise and send only one for all of them in a time interval of 5 seconds. Co-Authored-By: Arne Wiebalck <arne.wiebalck@cern.ch> Story: #2008983 Task: 42633 Change-Id: Idfbce44065e1e5a8b730b94741b2604c51f0ab14
This commit is contained in:
parent
8afc176c28
commit
b605943796
@ -88,6 +88,8 @@ class IronicPythonAgentHeartbeater(threading.Thread):
|
|||||||
self.stop_event = threading.Event()
|
self.stop_event = threading.Event()
|
||||||
self.api = agent.api_client
|
self.api = agent.api_client
|
||||||
self.interval = 0
|
self.interval = 0
|
||||||
|
self.heartbeat_forced = False
|
||||||
|
self.previous_heartbeat = 0
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Start the heartbeat thread."""
|
"""Start the heartbeat thread."""
|
||||||
@ -95,10 +97,21 @@ class IronicPythonAgentHeartbeater(threading.Thread):
|
|||||||
LOG.info('Starting heartbeater')
|
LOG.info('Starting heartbeater')
|
||||||
self.agent.set_agent_advertise_addr()
|
self.agent.set_agent_advertise_addr()
|
||||||
|
|
||||||
while not self.stop_event.wait(self.interval):
|
while not self.stop_event.wait(min(self.interval, 5)):
|
||||||
|
if self._heartbeat_expected():
|
||||||
self.do_heartbeat()
|
self.do_heartbeat()
|
||||||
eventlet.sleep(0)
|
eventlet.sleep(0)
|
||||||
|
|
||||||
|
def _heartbeat_expected(self):
|
||||||
|
# Normal heartbeating
|
||||||
|
if _time() > self.previous_heartbeat + self.interval:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Forced heartbeating, but once in 5 seconds
|
||||||
|
if (self.heartbeat_forced
|
||||||
|
and _time() > self.previous_heartbeat + 5):
|
||||||
|
return True
|
||||||
|
|
||||||
def do_heartbeat(self):
|
def do_heartbeat(self):
|
||||||
"""Send a heartbeat to Ironic."""
|
"""Send a heartbeat to Ironic."""
|
||||||
try:
|
try:
|
||||||
@ -109,6 +122,8 @@ class IronicPythonAgentHeartbeater(threading.Thread):
|
|||||||
generated_cert=self.agent.generated_cert,
|
generated_cert=self.agent.generated_cert,
|
||||||
)
|
)
|
||||||
LOG.info('heartbeat successful')
|
LOG.info('heartbeat successful')
|
||||||
|
self.heartbeat_forced = False
|
||||||
|
self.previous_heartbeat = _time()
|
||||||
except errors.HeartbeatConflictError:
|
except errors.HeartbeatConflictError:
|
||||||
LOG.warning('conflict error sending heartbeat to {}'.format(
|
LOG.warning('conflict error sending heartbeat to {}'.format(
|
||||||
self.agent.api_url))
|
self.agent.api_url))
|
||||||
@ -123,7 +138,7 @@ class IronicPythonAgentHeartbeater(threading.Thread):
|
|||||||
LOG.info(log_msg.format(self.interval))
|
LOG.info(log_msg.format(self.interval))
|
||||||
|
|
||||||
def force_heartbeat(self):
|
def force_heartbeat(self):
|
||||||
self.do_heartbeat()
|
self.heartbeat_forced = True
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""Stop the heartbeat thread."""
|
"""Stop the heartbeat thread."""
|
||||||
|
@ -83,8 +83,7 @@ class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
|||||||
time_responses.append(50)
|
time_responses.append(50)
|
||||||
|
|
||||||
# SECOND RUN:
|
# SECOND RUN:
|
||||||
# 50 * .5 = 25
|
expected_stop_calls.append(mock.call(5))
|
||||||
expected_stop_calls.append(mock.call(25.0))
|
|
||||||
wait_responses.append(False)
|
wait_responses.append(False)
|
||||||
# next heartbeat due at t=180
|
# next heartbeat due at t=180
|
||||||
heartbeat_responses.append(180)
|
heartbeat_responses.append(180)
|
||||||
@ -92,10 +91,11 @@ class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
|||||||
uniform_responses.append(0.4)
|
uniform_responses.append(0.4)
|
||||||
# time is now 80
|
# time is now 80
|
||||||
time_responses.append(80)
|
time_responses.append(80)
|
||||||
|
# add one response for _time in _heartbeat_expected
|
||||||
|
time_responses.append(80)
|
||||||
|
|
||||||
# THIRD RUN:
|
# THIRD RUN:
|
||||||
# 50 * .4 = 20
|
expected_stop_calls.append(mock.call(5))
|
||||||
expected_stop_calls.append(mock.call(20.0))
|
|
||||||
wait_responses.append(False)
|
wait_responses.append(False)
|
||||||
# this heartbeat attempt fails
|
# this heartbeat attempt fails
|
||||||
heartbeat_responses.append(Exception('uh oh!'))
|
heartbeat_responses.append(Exception('uh oh!'))
|
||||||
@ -107,8 +107,7 @@ class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
|||||||
time_responses.append(125.5)
|
time_responses.append(125.5)
|
||||||
|
|
||||||
# FOURTH RUN:
|
# FOURTH RUN:
|
||||||
# 50 * .5 = 20
|
expected_stop_calls.append(mock.call(5))
|
||||||
expected_stop_calls.append(mock.call(25.0))
|
|
||||||
# Stop now
|
# Stop now
|
||||||
wait_responses.append(True)
|
wait_responses.append(True)
|
||||||
|
|
||||||
@ -123,6 +122,39 @@ class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
|||||||
# Validate expectations
|
# Validate expectations
|
||||||
self.assertEqual(expected_stop_calls,
|
self.assertEqual(expected_stop_calls,
|
||||||
self.heartbeater.stop_event.wait.call_args_list)
|
self.heartbeater.stop_event.wait.call_args_list)
|
||||||
|
self.assertEqual(self.heartbeater.api.heartbeat.call_count, 2)
|
||||||
|
self.assertEqual(mock_time.call_count, 5)
|
||||||
|
|
||||||
|
@mock.patch('ironic_python_agent.agent._time', autospec=True)
|
||||||
|
def test__heartbeat_expected(self, mock_time):
|
||||||
|
|
||||||
|
# initial setting
|
||||||
|
self.heartbeater.previous_heartbeat = 0
|
||||||
|
self.heartbeater.interval = 0
|
||||||
|
self.heartbeater.heartbeat_forced = False
|
||||||
|
mock_time.return_value = 0
|
||||||
|
self.assertFalse(self.heartbeater._heartbeat_expected())
|
||||||
|
|
||||||
|
# 1st cadence
|
||||||
|
self.heartbeater.previous_heartbeat = 0
|
||||||
|
self.heartbeater.interval = 100
|
||||||
|
self.heartbeater.heartbeat_forced = False
|
||||||
|
mock_time.return_value = 5
|
||||||
|
self.assertFalse(self.heartbeater._heartbeat_expected())
|
||||||
|
|
||||||
|
# 2nd cadence with a forced heartbeat
|
||||||
|
self.heartbeater.previous_heartbeat = 0
|
||||||
|
self.heartbeater.interval = 100
|
||||||
|
self.heartbeater.heartbeat_forced = True
|
||||||
|
mock_time.return_value = 10
|
||||||
|
self.assertTrue(self.heartbeater._heartbeat_expected())
|
||||||
|
|
||||||
|
# 11th cadence with a scheduled heartbeat
|
||||||
|
self.heartbeater.previous_heartbeat = 0
|
||||||
|
self.heartbeater.interval = 100
|
||||||
|
self.heartbeater.heartbeat_forced = False
|
||||||
|
mock_time.return_value = 110
|
||||||
|
self.assertTrue(self.heartbeater._heartbeat_expected())
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(hardware, '_md_scan_and_assemble', lambda: None)
|
@mock.patch.object(hardware, '_md_scan_and_assemble', lambda: None)
|
||||||
|
10
releasenotes/notes/coalesce_heartbeats-fb8899a5f9fe4709.yaml
Normal file
10
releasenotes/notes/coalesce_heartbeats-fb8899a5f9fe4709.yaml
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Heartbeats to the conductor are grouped when they are scheduled or
|
||||||
|
requested within a time interval of five seconds to avoid sending
|
||||||
|
them in quick succession.
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Fixes an issue where a quick succession of heartbeats exposes a race
|
||||||
|
condition in the conductor's RPC handling.
|
Loading…
Reference in New Issue
Block a user