Merge "Support tenacity exponential backoff retry on resource sync"
This commit is contained in:
commit
98636290c5
@ -13,9 +13,8 @@
|
||||
# limitations under the License.
|
||||
|
||||
import ast
|
||||
import eventlet
|
||||
import random
|
||||
import six
|
||||
import tenacity
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -116,25 +115,55 @@ def serialize_input_data(input_data):
|
||||
return {'input_data': _serialize(input_data)}
|
||||
|
||||
|
||||
class wait_random_exponential(tenacity.wait_exponential):
|
||||
"""Random wait strategy with a geometrically increasing amount of jitter.
|
||||
|
||||
Implements the truncated binary exponential backoff algorithm as used in
|
||||
e.g. CSMA media access control. The retry occurs at a random time in a
|
||||
(geometrically) expanding interval constrained by minimum and maximum
|
||||
limits.
|
||||
"""
|
||||
def __init__(self, min=0, multiplier=1, max=tenacity._utils.MAX_WAIT,
|
||||
exp_base=2):
|
||||
super(wait_random_exponential, self).__init__(multiplier=multiplier,
|
||||
max=(max-min),
|
||||
exp_base=exp_base)
|
||||
self._random = tenacity.wait_random(min=min, max=(min + multiplier))
|
||||
|
||||
def __call__(self, previous_attempt_number, delay_since_first_attempt):
|
||||
jitter = super(wait_random_exponential,
|
||||
self).__call__(previous_attempt_number,
|
||||
delay_since_first_attempt)
|
||||
self._random.wait_random_max = self._random.wait_random_min + jitter
|
||||
return self._random(previous_attempt_number, delay_since_first_attempt)
|
||||
|
||||
|
||||
def sync(cnxt, entity_id, current_traversal, is_update, propagate,
|
||||
predecessors, new_data):
|
||||
rows_updated = None
|
||||
sync_point = None
|
||||
input_data = None
|
||||
nconflicts = max(0, len(predecessors) - 2)
|
||||
# limit to 10 seconds
|
||||
max_wt = min(nconflicts * 0.01, 10)
|
||||
while not rows_updated:
|
||||
# Retry waits up to 60 seconds at most, with exponentially increasing
|
||||
# amounts of jitter per resource still outstanding
|
||||
wait_strategy = wait_random_exponential(max=60)
|
||||
|
||||
def init_jitter(existing_input_data):
|
||||
nconflicts = max(0, len(predecessors) - len(existing_input_data) - 1)
|
||||
# 10ms per potential conflict, up to a max of 10s in total
|
||||
return min(nconflicts, 1000) * 0.01
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_result(lambda r: r is None),
|
||||
wait=wait_strategy
|
||||
)
|
||||
def _sync():
|
||||
sync_point = get(cnxt, entity_id, current_traversal, is_update)
|
||||
input_data = deserialize_input_data(sync_point.input_data)
|
||||
wait_strategy.multiplier = init_jitter(input_data)
|
||||
input_data.update(new_data)
|
||||
rows_updated = update_input_data(
|
||||
cnxt, entity_id, current_traversal, is_update,
|
||||
sync_point.atomic_key, serialize_input_data(input_data))
|
||||
# don't aggressively spin; induce some sleep
|
||||
if not rows_updated:
|
||||
eventlet.sleep(random.uniform(0, max_wt))
|
||||
return input_data if rows_updated else None
|
||||
|
||||
input_data = _sync()
|
||||
waiting = predecessors - set(input_data)
|
||||
key = make_key(entity_id, current_traversal, is_update)
|
||||
if waiting:
|
||||
|
@ -74,15 +74,16 @@ class SyncPointTestCase(common.HeatTestCase):
|
||||
self.assertEqual({'input_data': {u'tuple:(3, 8)': None}}, res)
|
||||
|
||||
@mock.patch('heat.engine.sync_point.update_input_data', return_value=None)
|
||||
@mock.patch('eventlet.sleep', side_effect=exception.DBError)
|
||||
@mock.patch('time.sleep', side_effect=exception.DBError)
|
||||
def sync_with_sleep(self, ctx, stack, mock_sleep_time, mock_uid):
|
||||
resource = stack['C']
|
||||
graph = stack.convergence_dependencies.graph()
|
||||
|
||||
mock_callback = mock.Mock()
|
||||
sender = (3, True)
|
||||
self.assertRaises(exception.DBError, sync_point.sync, ctx, resource.id,
|
||||
stack.current_traversal, True, mock_callback,
|
||||
set(graph[(resource.id, True)]), {})
|
||||
set(graph[(resource.id, True)]), {sender: None})
|
||||
return mock_sleep_time
|
||||
|
||||
def test_sync_with_time_throttle(self):
|
||||
@ -92,4 +93,4 @@ class SyncPointTestCase(common.HeatTestCase):
|
||||
convergence=True)
|
||||
stack.converge_stack(stack.t, action=stack.CREATE)
|
||||
mock_sleep_time = self.sync_with_sleep(ctx, stack)
|
||||
mock_sleep_time.assert_called_once_with(mock.ANY)
|
||||
self.assertTrue(mock_sleep_time.called)
|
||||
|
Loading…
Reference in New Issue
Block a user