Add BackOffLoopingCall with jitter
Using DynamicLoopingCall involved a few hacks to make it work properly. This new BackOffLoopingCall will start an exponential backoff (with a configurable jitter) when there is a failure. The backoff will continue until the given function returns True or timeout is about to be exceeded. The function will run indefinitely until either an exception is raised or timeout is reached. I plan to merge this into oslo loopingcall and switch the heartbeat to this. Change-Id: I1482348e98c6b68c34b3003645029e08135b1341
This commit is contained in:
128
ironic_python_agent/backoff.py
Normal file
128
ironic_python_agent/backoff.py
Normal file
@@ -0,0 +1,128 @@
|
||||
# Copyright 2014 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
import sys
|
||||
|
||||
from eventlet import event
|
||||
from eventlet import greenthread
|
||||
|
||||
from ironic_python_agent.openstack.common import log
|
||||
from ironic_python_agent.openstack.common import loopingcall
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
#TODO(JoshNang) move to oslo, i18n
|
||||
class LoopingCallTimeOut(Exception):
|
||||
"""Exception for a timed out LoopingCall.
|
||||
|
||||
The LoopingCall will raise this exception when a timeout is provided
|
||||
and it is exceeded.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class BackOffLoopingCall(loopingcall.LoopingCallBase):
|
||||
"""The passed in function should return True (no error, return to
|
||||
initial_interval),
|
||||
False (error, start backing off), or raise LoopingCallDone(retvalue=None)
|
||||
(quit looping, return retvalue if set).
|
||||
|
||||
When there is an error, the call will backoff on each failure. The
|
||||
backoff will be equal to double the previous base interval times some
|
||||
jitter. If a backoff would put it over the timeout, it halts immediately,
|
||||
so the call will never take more than timeout, but may and likely will
|
||||
take less time.
|
||||
|
||||
When the function return value is True or False, the interval will be
|
||||
multiplied by a random jitter. If min_jitter or max_jitter is None,
|
||||
there will be no jitter (jitter=1). If min_jitter is below 0.5, the code
|
||||
may not backoff and may increase its retry rate.
|
||||
|
||||
If func constantly returns True, this function will not return.
|
||||
|
||||
To run a func and wait for a call to finish (by raising a LoopingCallDone):
|
||||
|
||||
timer = BackOffLoopingCall(func)
|
||||
response = timer.start().wait()
|
||||
|
||||
:param initial_delay: delay before first running of function
|
||||
:param starting_interval: initial interval in seconds between calls to
|
||||
function. When an error occurs and then a success, the interval is
|
||||
returned to starting_interval
|
||||
:param timeout: time in seconds before a LoopingCallTimeout is raised
|
||||
The call will never take longer than timeout, but may quit before timeout
|
||||
:param max_interval: The maximum interval between calls during errors
|
||||
:param jitter: Used to vary when calls are actually run to avoid group of
|
||||
calls all coming at the exact same time. Uses random.gauss(jitter,
|
||||
0.1), with jitter as the mean for the distribution. If set below .5,
|
||||
it can cause the calls to come more rapidly after each failure.
|
||||
:raises: LoopingCallTimeout if time spent doing error retries would exceed
|
||||
timeout.
|
||||
"""
|
||||
|
||||
def start(self, initial_delay=None, starting_interval=1, timeout=300,
|
||||
max_interval=300, jitter=0.75):
|
||||
self._running = True
|
||||
done = event.Event()
|
||||
|
||||
def _inner():
|
||||
interval = starting_interval
|
||||
error_time = 0
|
||||
|
||||
if initial_delay:
|
||||
greenthread.sleep(initial_delay)
|
||||
|
||||
try:
|
||||
while self._running:
|
||||
no_error = self.f(*self.args, **self.kw)
|
||||
if not self._running:
|
||||
break
|
||||
random_jitter = random.gauss(jitter, 0.1)
|
||||
if no_error:
|
||||
# Reset error state
|
||||
error_time = 0
|
||||
interval = starting_interval
|
||||
idle = interval * random_jitter
|
||||
else:
|
||||
# Backoff
|
||||
interval = min(interval * 2 * random_jitter,
|
||||
max_interval)
|
||||
idle = interval
|
||||
|
||||
# Don't go over timeout, end early if necessary. If
|
||||
# timeout is 0, keep going.
|
||||
if timeout > 0 and error_time + idle > timeout:
|
||||
raise LoopingCallTimeOut(
|
||||
'Looping call timed out after %.02f seconds'
|
||||
% error_time)
|
||||
error_time += idle
|
||||
|
||||
LOG.debug('Dynamic looping call sleeping for %.02f '
|
||||
'seconds', idle)
|
||||
greenthread.sleep(idle)
|
||||
except loopingcall.LoopingCallDone as e:
|
||||
self.stop()
|
||||
done.send(e.retvalue)
|
||||
except Exception:
|
||||
LOG.exception('in dynamic looping call')
|
||||
done.send_exception(*sys.exc_info())
|
||||
return
|
||||
else:
|
||||
done.send(True)
|
||||
|
||||
self.done = done
|
||||
greenthread.spawn(_inner)
|
||||
return self.done
|
||||
@@ -15,14 +15,19 @@ limitations under the License.
|
||||
"""
|
||||
|
||||
import json
|
||||
|
||||
import requests
|
||||
|
||||
from ironic_python_agent import backoff
|
||||
from ironic_python_agent import encoding
|
||||
from ironic_python_agent import errors
|
||||
from ironic_python_agent.openstack.common import log
|
||||
from ironic_python_agent.openstack.common import loopingcall
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class APIClient(object):
|
||||
api_version = 'v1'
|
||||
|
||||
@@ -73,43 +78,21 @@ class APIClient(object):
|
||||
raise errors.HeartbeatError('Invalid Heartbeat-Before header')
|
||||
|
||||
def lookup_node(self, hardware_info, timeout, starting_interval):
|
||||
timer = loopingcall.DynamicLoopingCall(
|
||||
timer = backoff.BackOffLoopingCall(
|
||||
self._do_lookup,
|
||||
hardware_info=hardware_info,
|
||||
intervals=[starting_interval],
|
||||
total_time=[0],
|
||||
timeout=timeout)
|
||||
node_content = timer.start().wait()
|
||||
|
||||
# True is returned on timeout
|
||||
if node_content is True:
|
||||
hardware_info=hardware_info)
|
||||
try:
|
||||
node_content = timer.start(starting_interval=starting_interval,
|
||||
timeout=timeout).wait()
|
||||
except backoff.LoopingCallTimeOut:
|
||||
raise errors.LookupNodeError('Could not look up node info. Check '
|
||||
'logs for details.')
|
||||
return node_content
|
||||
|
||||
def _do_lookup(self, hardware_info, timeout, intervals=[1],
|
||||
total_time=[0]):
|
||||
def _do_lookup(self, hardware_info):
|
||||
"""The actual call to lookup a node. Should be called inside
|
||||
loopingcall.DynamicLoopingCall.
|
||||
|
||||
intervals and total_time are mutable so it can be changed by each run
|
||||
in the looping call and accessed/changed on the next run.
|
||||
loopingcall.BackOffLoopingCall.
|
||||
"""
|
||||
def next_interval(timeout, intervals=[], total_time=[]):
|
||||
"""Function to calculate what the next interval should be. Uses
|
||||
exponential backoff and raises an exception (that won't
|
||||
be caught by do_lookup) to kill the looping call if it goes too
|
||||
long
|
||||
"""
|
||||
new_interval = intervals[-1] * 2
|
||||
if total_time[0] + new_interval > timeout:
|
||||
# No retvalue signifies error
|
||||
raise loopingcall.LoopingCallDone()
|
||||
|
||||
total_time[0] += new_interval
|
||||
intervals.append(new_interval)
|
||||
return new_interval
|
||||
|
||||
path = '/{api_version}/drivers/teeth/vendor_passthru/lookup'.format(
|
||||
api_version=self.api_version
|
||||
)
|
||||
@@ -125,30 +108,28 @@ class APIClient(object):
|
||||
response = self._request('POST', path, data=data)
|
||||
except Exception as e:
|
||||
self.log.warning('POST failed: %s' % str(e))
|
||||
return next_interval(timeout, intervals, total_time)
|
||||
return False
|
||||
|
||||
if response.status_code != requests.codes.OK:
|
||||
self.log.warning('Invalid status code: %s' %
|
||||
response.status_code)
|
||||
|
||||
return next_interval(timeout, intervals, total_time)
|
||||
self.log.warning('Invalid status code: %s' % response.status_code)
|
||||
return False
|
||||
|
||||
try:
|
||||
content = json.loads(response.content)
|
||||
except Exception as e:
|
||||
self.log.warning('Error decoding response: %s' % str(e))
|
||||
return next_interval(timeout, intervals, total_time)
|
||||
return False
|
||||
|
||||
# Check for valid response data
|
||||
if 'node' not in content or 'uuid' not in content['node']:
|
||||
self.log.warning('Got invalid node data from the API: %s' %
|
||||
content)
|
||||
return next_interval(timeout, intervals, total_time)
|
||||
return False
|
||||
|
||||
if 'heartbeat_timeout' not in content:
|
||||
self.log.warning('Got invalid heartbeat from the API: %s' %
|
||||
content)
|
||||
return next_interval(timeout, intervals, total_time)
|
||||
return False
|
||||
|
||||
# Got valid content
|
||||
raise loopingcall.LoopingCallDone(retvalue=content)
|
||||
|
||||
100
ironic_python_agent/tests/backoff.py
Normal file
100
ironic_python_agent/tests/backoff.py
Normal file
@@ -0,0 +1,100 @@
|
||||
# Copyright 2014 Rackspace, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from ironic_python_agent import backoff
|
||||
from ironic_python_agent.openstack.common import loopingcall
|
||||
|
||||
|
||||
class TestBackOffLoopingCall(unittest.TestCase):
|
||||
@mock.patch('random.gauss')
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
def test_exponential_backoff(self, sleep_mock, random_mock):
|
||||
def false():
|
||||
return False
|
||||
|
||||
random_mock.return_value = .8
|
||||
|
||||
self.assertRaises(backoff.LoopingCallTimeOut,
|
||||
backoff.BackOffLoopingCall(false).start()
|
||||
.wait)
|
||||
|
||||
expected_times = [mock.call(1.6000000000000001),
|
||||
mock.call(2.5600000000000005),
|
||||
mock.call(4.096000000000001),
|
||||
mock.call(6.5536000000000021),
|
||||
mock.call(10.485760000000004),
|
||||
mock.call(16.777216000000006),
|
||||
mock.call(26.843545600000013),
|
||||
mock.call(42.949672960000022),
|
||||
mock.call(68.719476736000033),
|
||||
mock.call(109.95116277760006)]
|
||||
self.assertEqual(expected_times, sleep_mock.call_args_list)
|
||||
|
||||
@mock.patch('random.gauss')
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
def test_no_backoff(self, sleep_mock, random_mock):
|
||||
random_mock.return_value = 1
|
||||
func = mock.Mock()
|
||||
# func.side_effect
|
||||
func.side_effect = [True, True, True, loopingcall.LoopingCallDone(
|
||||
retvalue='return value')]
|
||||
|
||||
retvalue = backoff.BackOffLoopingCall(func).start().wait()
|
||||
|
||||
expected_times = [mock.call(1), mock.call(1), mock.call(1)]
|
||||
self.assertEqual(expected_times, sleep_mock.call_args_list)
|
||||
self.assertTrue(retvalue, 'return value')
|
||||
|
||||
@mock.patch('random.gauss')
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
def test_no_sleep(self, sleep_mock, random_mock):
|
||||
# Any call that executes properly the first time shouldn't sleep
|
||||
random_mock.return_value = 1
|
||||
func = mock.Mock()
|
||||
# func.side_effect
|
||||
func.side_effect = loopingcall.LoopingCallDone(retvalue='return value')
|
||||
|
||||
retvalue = backoff.BackOffLoopingCall(func).start().wait()
|
||||
self.assertFalse(sleep_mock.called)
|
||||
self.assertTrue(retvalue, 'return value')
|
||||
|
||||
@mock.patch('random.gauss')
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
def test_max_interval(self, sleep_mock, random_mock):
|
||||
def false():
|
||||
return False
|
||||
|
||||
random_mock.return_value = .8
|
||||
|
||||
self.assertRaises(backoff.LoopingCallTimeOut,
|
||||
backoff.BackOffLoopingCall(false).start(
|
||||
max_interval=60)
|
||||
.wait)
|
||||
|
||||
expected_times = [mock.call(1.6000000000000001),
|
||||
mock.call(2.5600000000000005),
|
||||
mock.call(4.096000000000001),
|
||||
mock.call(6.5536000000000021),
|
||||
mock.call(10.485760000000004),
|
||||
mock.call(16.777216000000006),
|
||||
mock.call(26.843545600000013),
|
||||
mock.call(42.949672960000022),
|
||||
mock.call(60),
|
||||
mock.call(60),
|
||||
mock.call(60)]
|
||||
self.assertEqual(expected_times, sleep_mock.call_args_list)
|
||||
@@ -20,6 +20,7 @@ import time
|
||||
import mock
|
||||
from oslotest import base as test_base
|
||||
|
||||
from ironic_python_agent import backoff
|
||||
from ironic_python_agent import errors
|
||||
from ironic_python_agent import hardware
|
||||
from ironic_python_agent import ironic_api_client
|
||||
@@ -111,21 +112,16 @@ class TestBaseIronicPythonAgent(test_base.BaseTestCase):
|
||||
advertise_address=('192.0.2.1', '9999'))
|
||||
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
def test_lookup_node(self, sleep_mock):
|
||||
@mock.patch('ironic_python_agent.ironic_api_client.APIClient._do_lookup')
|
||||
def test_lookup_node(self, lookup_mock, sleep_mock):
|
||||
content = {
|
||||
'node': {
|
||||
'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c'
|
||||
},
|
||||
'heartbeat_timeout': 300
|
||||
}
|
||||
response = FakeResponse(status_code=200, content={
|
||||
'node': {
|
||||
'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c'
|
||||
},
|
||||
'heartbeat_timeout': 300
|
||||
})
|
||||
self.api_client.session.request = mock.Mock()
|
||||
self.api_client.session.request.return_value = response
|
||||
lookup_mock.side_effect = loopingcall.LoopingCallDone(
|
||||
retvalue=content)
|
||||
returned_content = self.api_client.lookup_node(
|
||||
hardware_info=self.hardware_info,
|
||||
timeout=300,
|
||||
@@ -134,15 +130,9 @@ class TestBaseIronicPythonAgent(test_base.BaseTestCase):
|
||||
self.assertEqual(content, returned_content)
|
||||
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
def test_lookup_node_exception(self, sleep_mock):
|
||||
bad_response = FakeResponse(status_code=500, content={
|
||||
'node': {
|
||||
'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c'
|
||||
},
|
||||
'heartbeat_timeout': 300
|
||||
})
|
||||
self.api_client.session.request = mock.Mock()
|
||||
self.api_client.session.request.return_value = bad_response
|
||||
@mock.patch('ironic_python_agent.ironic_api_client.APIClient._do_lookup')
|
||||
def test_lookup_timeout(self, lookup_mock, sleep_mock):
|
||||
lookup_mock.side_effect = backoff.LoopingCallTimeOut()
|
||||
self.assertRaises(errors.LookupNodeError,
|
||||
self.api_client.lookup_node,
|
||||
hardware_info=self.hardware_info,
|
||||
@@ -162,9 +152,7 @@ class TestBaseIronicPythonAgent(test_base.BaseTestCase):
|
||||
|
||||
self.assertRaises(loopingcall.LoopingCallDone,
|
||||
self.api_client._do_lookup,
|
||||
hardware_info=self.hardware_info,
|
||||
timeout=300,
|
||||
intervals=[2])
|
||||
hardware_info=self.hardware_info)
|
||||
|
||||
request_args = self.api_client.session.request.call_args[0]
|
||||
self.assertEqual(request_args[0], 'POST')
|
||||
@@ -194,13 +182,9 @@ class TestBaseIronicPythonAgent(test_base.BaseTestCase):
|
||||
self.api_client.session.request = mock.Mock()
|
||||
self.api_client.session.request.return_value = response
|
||||
|
||||
total_time = [0]
|
||||
interval = self.api_client._do_lookup(self.hardware_info,
|
||||
timeout=300,
|
||||
total_time=total_time,
|
||||
intervals=[2])
|
||||
self.assertEqual(4, interval)
|
||||
self.assertEqual(4, total_time[0])
|
||||
error = self.api_client._do_lookup(self.hardware_info)
|
||||
|
||||
self.assertFalse(error)
|
||||
|
||||
def test_do_lookup_bad_response_data(self):
|
||||
response = FakeResponse(status_code=200, content={
|
||||
@@ -210,13 +194,9 @@ class TestBaseIronicPythonAgent(test_base.BaseTestCase):
|
||||
self.api_client.session.request = mock.Mock()
|
||||
self.api_client.session.request.return_value = response
|
||||
|
||||
total_time = [0]
|
||||
interval = self.api_client._do_lookup(self.hardware_info,
|
||||
timeout=300,
|
||||
total_time=total_time,
|
||||
intervals=[2])
|
||||
self.assertEqual(4, interval)
|
||||
self.assertEqual(4, total_time[0])
|
||||
error = self.api_client._do_lookup(self.hardware_info)
|
||||
|
||||
self.assertFalse(error)
|
||||
|
||||
def test_do_lookup_no_heartbeat_timeout(self):
|
||||
response = FakeResponse(status_code=200, content={
|
||||
@@ -228,13 +208,9 @@ class TestBaseIronicPythonAgent(test_base.BaseTestCase):
|
||||
self.api_client.session.request = mock.Mock()
|
||||
self.api_client.session.request.return_value = response
|
||||
|
||||
total_time = [0]
|
||||
interval = self.api_client._do_lookup(self.hardware_info,
|
||||
timeout=300,
|
||||
total_time=total_time,
|
||||
intervals=[2])
|
||||
self.assertEqual(4, interval)
|
||||
self.assertEqual(4, total_time[0])
|
||||
error = self.api_client._do_lookup(self.hardware_info)
|
||||
|
||||
self.assertFalse(error)
|
||||
|
||||
def test_do_lookup_bad_response_body(self):
|
||||
response = FakeResponse(status_code=200, content={
|
||||
@@ -244,31 +220,6 @@ class TestBaseIronicPythonAgent(test_base.BaseTestCase):
|
||||
self.api_client.session.request = mock.Mock()
|
||||
self.api_client.session.request.return_value = response
|
||||
|
||||
total_time = [0]
|
||||
interval = self.api_client._do_lookup(self.hardware_info,
|
||||
timeout=300,
|
||||
total_time=total_time,
|
||||
intervals=[2])
|
||||
self.assertEqual(4, interval)
|
||||
self.assertEqual(4, total_time[0])
|
||||
error = self.api_client._do_lookup(self.hardware_info)
|
||||
|
||||
@mock.patch('eventlet.greenthread.sleep')
|
||||
def test_lookup_node_exponential_backoff(self, sleep_mock):
|
||||
bad_response = FakeResponse(status_code=500, content={
|
||||
'node': {
|
||||
'uuid': 'deadbeef-dabb-ad00-b105-f00d00bab10c'
|
||||
},
|
||||
'heartbeat_timeout': 300
|
||||
})
|
||||
self.api_client.session.request = mock.Mock()
|
||||
self.api_client.session.request.return_value = bad_response
|
||||
self.assertRaises(errors.LookupNodeError,
|
||||
self.api_client.lookup_node,
|
||||
hardware_info=self.hardware_info,
|
||||
timeout=300,
|
||||
starting_interval=1)
|
||||
# 254 seconds before timeout
|
||||
expected_times = [mock.call(2), mock.call(4), mock.call(8),
|
||||
mock.call(16), mock.call(32), mock.call(64),
|
||||
mock.call(128)]
|
||||
self.assertEqual(expected_times, sleep_mock.call_args_list)
|
||||
self.assertFalse(error)
|
||||
|
||||
Reference in New Issue
Block a user