Simplify heartbeating by removing use of select()
Heartbeating in IPA has used select.poll() for years to workaround a bug where changing the time in the ramdisk could cause heartbeats to stop and never resume. Now that IPA syncs time at start and exit, this workaround is no longer needed. So instead, we'll revert to using threading.Event() in order to make the code simpler and easier to understand. Since we need this to be an eventlet-event, and not a standard-thread event, also monkey_patch threading. Additionally, there were a few completely unused backoff interval values set, that were never applied. In respect of maintaining the 5+ years old behavior of not doing error backoffs, that code was removed instead of being made to work. Change-Id: Ibcde99de64bb7e95d5df63a42a4ca4999f0c4c9b
This commit is contained in:
parent
9b75453339
commit
a01646f56b
@ -14,9 +14,7 @@
|
|||||||
|
|
||||||
import collections
|
import collections
|
||||||
import ipaddress
|
import ipaddress
|
||||||
import os
|
|
||||||
import random
|
import random
|
||||||
import select
|
|
||||||
import socket
|
import socket
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
@ -79,13 +77,6 @@ class IronicPythonAgentHeartbeater(threading.Thread):
|
|||||||
min_jitter_multiplier = 0.3
|
min_jitter_multiplier = 0.3
|
||||||
max_jitter_multiplier = 0.6
|
max_jitter_multiplier = 0.6
|
||||||
|
|
||||||
# Exponential backoff values used in case of an error. In reality we will
|
|
||||||
# only wait a portion of either of these delays based on the jitter
|
|
||||||
# multipliers.
|
|
||||||
initial_delay = 1.0
|
|
||||||
max_delay = 300.0
|
|
||||||
backoff_factor = 2.7
|
|
||||||
|
|
||||||
def __init__(self, agent):
|
def __init__(self, agent):
|
||||||
"""Initialize the heartbeat thread.
|
"""Initialize the heartbeat thread.
|
||||||
|
|
||||||
@ -94,39 +85,19 @@ class IronicPythonAgentHeartbeater(threading.Thread):
|
|||||||
"""
|
"""
|
||||||
super(IronicPythonAgentHeartbeater, self).__init__()
|
super(IronicPythonAgentHeartbeater, self).__init__()
|
||||||
self.agent = agent
|
self.agent = agent
|
||||||
|
self.stop_event = threading.Event()
|
||||||
self.api = agent.api_client
|
self.api = agent.api_client
|
||||||
self.error_delay = self.initial_delay
|
self.interval = 0
|
||||||
self.reader = None
|
|
||||||
self.writer = None
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Start the heartbeat thread."""
|
"""Start the heartbeat thread."""
|
||||||
# The first heartbeat happens immediately
|
# The first heartbeat happens immediately
|
||||||
LOG.info('starting heartbeater')
|
LOG.info('Starting heartbeater')
|
||||||
interval = 0
|
|
||||||
self.agent.set_agent_advertise_addr()
|
self.agent.set_agent_advertise_addr()
|
||||||
|
|
||||||
self.reader, self.writer = os.pipe()
|
while not self.stop_event.wait(self.interval):
|
||||||
p = select.poll()
|
self.do_heartbeat()
|
||||||
p.register(self.reader, select.POLLIN)
|
eventlet.sleep(0)
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
if p.poll(interval * 1000):
|
|
||||||
if os.read(self.reader, 1).decode() == 'a':
|
|
||||||
break
|
|
||||||
|
|
||||||
self.do_heartbeat()
|
|
||||||
interval_multiplier = random.uniform(
|
|
||||||
self.min_jitter_multiplier,
|
|
||||||
self.max_jitter_multiplier)
|
|
||||||
interval = self.agent.heartbeat_timeout * interval_multiplier
|
|
||||||
log_msg = 'sleeping before next heartbeat, interval: {}'
|
|
||||||
LOG.info(log_msg.format(interval))
|
|
||||||
finally:
|
|
||||||
os.close(self.reader)
|
|
||||||
os.close(self.writer)
|
|
||||||
self.reader = None
|
|
||||||
self.writer = None
|
|
||||||
|
|
||||||
def do_heartbeat(self):
|
def do_heartbeat(self):
|
||||||
"""Send a heartbeat to Ironic."""
|
"""Send a heartbeat to Ironic."""
|
||||||
@ -136,28 +107,28 @@ class IronicPythonAgentHeartbeater(threading.Thread):
|
|||||||
advertise_address=self.agent.advertise_address,
|
advertise_address=self.agent.advertise_address,
|
||||||
advertise_protocol=self.agent.advertise_protocol,
|
advertise_protocol=self.agent.advertise_protocol,
|
||||||
)
|
)
|
||||||
self.error_delay = self.initial_delay
|
|
||||||
LOG.info('heartbeat successful')
|
LOG.info('heartbeat successful')
|
||||||
except errors.HeartbeatConflictError:
|
except errors.HeartbeatConflictError:
|
||||||
LOG.warning('conflict error sending heartbeat to {}'.format(
|
LOG.warning('conflict error sending heartbeat to {}'.format(
|
||||||
self.agent.api_url))
|
self.agent.api_url))
|
||||||
self.error_delay = min(self.error_delay * self.backoff_factor,
|
|
||||||
self.max_delay)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception('error sending heartbeat to {}'.format(
|
LOG.exception('error sending heartbeat to {}'.format(
|
||||||
self.agent.api_url))
|
self.agent.api_url))
|
||||||
self.error_delay = min(self.error_delay * self.backoff_factor,
|
finally:
|
||||||
self.max_delay)
|
interval_multiplier = random.uniform(self.min_jitter_multiplier,
|
||||||
|
self.max_jitter_multiplier)
|
||||||
|
self.interval = self.agent.heartbeat_timeout * interval_multiplier
|
||||||
|
log_msg = 'sleeping before next heartbeat, interval: {0}'
|
||||||
|
LOG.info(log_msg.format(self.interval))
|
||||||
|
|
||||||
def force_heartbeat(self):
|
def force_heartbeat(self):
|
||||||
os.write(self.writer, b'b')
|
self.do_heartbeat()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""Stop the heartbeat thread."""
|
"""Stop the heartbeat thread."""
|
||||||
if self.writer is not None:
|
LOG.info('stopping heartbeater')
|
||||||
LOG.info('stopping heartbeater')
|
self.stop_event.set()
|
||||||
os.write(self.writer, b'a')
|
return self.join()
|
||||||
return self.join()
|
|
||||||
|
|
||||||
|
|
||||||
class IronicPythonAgent(base.ExecuteCommandMixin):
|
class IronicPythonAgent(base.ExecuteCommandMixin):
|
||||||
|
@ -60,21 +60,19 @@ class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
|||||||
hardware.HardwareManager)
|
hardware.HardwareManager)
|
||||||
self.heartbeater.stop_event = mock.Mock()
|
self.heartbeater.stop_event = mock.Mock()
|
||||||
|
|
||||||
@mock.patch('os.read', autospec=True)
|
|
||||||
@mock.patch('select.poll', autospec=True)
|
|
||||||
@mock.patch('ironic_python_agent.agent._time', autospec=True)
|
@mock.patch('ironic_python_agent.agent._time', autospec=True)
|
||||||
@mock.patch('random.uniform', autospec=True)
|
@mock.patch('random.uniform', autospec=True)
|
||||||
def test_heartbeat(self, mock_uniform, mock_time, mock_poll, mock_read):
|
def test_heartbeat(self, mock_uniform, mock_time):
|
||||||
time_responses = []
|
time_responses = []
|
||||||
uniform_responses = []
|
uniform_responses = []
|
||||||
heartbeat_responses = []
|
heartbeat_responses = []
|
||||||
poll_responses = []
|
wait_responses = []
|
||||||
expected_poll_calls = []
|
expected_stop_calls = []
|
||||||
|
|
||||||
# FIRST RUN:
|
# FIRST RUN:
|
||||||
# initial delay is 0
|
# initial delay is 0
|
||||||
expected_poll_calls.append(mock.call(0))
|
expected_stop_calls.append(mock.call(0))
|
||||||
poll_responses.append(False)
|
wait_responses.append(False)
|
||||||
# next heartbeat due at t=100
|
# next heartbeat due at t=100
|
||||||
heartbeat_responses.append(100)
|
heartbeat_responses.append(100)
|
||||||
# random interval multiplier is 0.5
|
# random interval multiplier is 0.5
|
||||||
@ -84,8 +82,8 @@ class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
|||||||
|
|
||||||
# SECOND RUN:
|
# SECOND RUN:
|
||||||
# 50 * .5 = 25
|
# 50 * .5 = 25
|
||||||
expected_poll_calls.append(mock.call(1000 * 25.0))
|
expected_stop_calls.append(mock.call(25.0))
|
||||||
poll_responses.append(False)
|
wait_responses.append(False)
|
||||||
# next heartbeat due at t=180
|
# next heartbeat due at t=180
|
||||||
heartbeat_responses.append(180)
|
heartbeat_responses.append(180)
|
||||||
# random interval multiplier is 0.4
|
# random interval multiplier is 0.4
|
||||||
@ -95,36 +93,34 @@ class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
|||||||
|
|
||||||
# THIRD RUN:
|
# THIRD RUN:
|
||||||
# 50 * .4 = 20
|
# 50 * .4 = 20
|
||||||
expected_poll_calls.append(mock.call(1000 * 20.0))
|
expected_stop_calls.append(mock.call(20.0))
|
||||||
poll_responses.append(False)
|
wait_responses.append(False)
|
||||||
# this heartbeat attempt fails
|
# this heartbeat attempt fails
|
||||||
heartbeat_responses.append(Exception('uh oh!'))
|
heartbeat_responses.append(Exception('uh oh!'))
|
||||||
# we check the time to generate a fake deadline, now t=125
|
|
||||||
time_responses.append(125)
|
|
||||||
# random interval multiplier is 0.5
|
# random interval multiplier is 0.5
|
||||||
uniform_responses.append(0.5)
|
uniform_responses.append(0.5)
|
||||||
|
# we check the time to generate a fake deadline, now t=125
|
||||||
|
time_responses.append(125)
|
||||||
# time is now 125.5
|
# time is now 125.5
|
||||||
time_responses.append(125.5)
|
time_responses.append(125.5)
|
||||||
|
|
||||||
# FOURTH RUN:
|
# FOURTH RUN:
|
||||||
# 50 * .5 = 25
|
# 50 * .5 = 20
|
||||||
expected_poll_calls.append(mock.call(1000 * 25.0))
|
expected_stop_calls.append(mock.call(25.0))
|
||||||
# Stop now
|
# Stop now
|
||||||
poll_responses.append(True)
|
wait_responses.append(True)
|
||||||
mock_read.return_value = b'a'
|
|
||||||
|
|
||||||
# Hook it up and run it
|
# Hook it up and run it
|
||||||
mock_time.side_effect = time_responses
|
mock_time.side_effect = time_responses
|
||||||
mock_uniform.side_effect = uniform_responses
|
mock_uniform.side_effect = uniform_responses
|
||||||
self.mock_agent.heartbeat_timeout = 50
|
self.mock_agent.heartbeat_timeout = 50
|
||||||
self.heartbeater.api.heartbeat.side_effect = heartbeat_responses
|
self.heartbeater.api.heartbeat.side_effect = heartbeat_responses
|
||||||
mock_poll.return_value.poll.side_effect = poll_responses
|
self.heartbeater.stop_event.wait.side_effect = wait_responses
|
||||||
self.heartbeater.run()
|
self.heartbeater.run()
|
||||||
|
|
||||||
# Validate expectations
|
# Validate expectations
|
||||||
self.assertEqual(expected_poll_calls,
|
self.assertEqual(expected_stop_calls,
|
||||||
mock_poll.return_value.poll.call_args_list)
|
self.heartbeater.stop_event.wait.call_args_list)
|
||||||
self.assertEqual(2.7, self.heartbeater.error_delay)
|
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(hardware, '_md_scan_and_assemble', lambda: None)
|
@mock.patch.object(hardware, '_md_scan_and_assemble', lambda: None)
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
---
|
||||||
|
upgrade:
|
||||||
|
- |
|
||||||
|
IPA heartbeat intervals now rely on accurate clock time. Any clean or
|
||||||
|
deploy steps which attempt to sync the clock may cause heartbeats to not
|
||||||
|
be emitted. IPA syncs time at startup and shutdown, so these steps should
|
||||||
|
not be required.
|
Loading…
Reference in New Issue
Block a user