Support tenacity exponential backoff retry on resource sync
Change to use tenacity as the retry library for SyncPoints. Use exponential backoff retry waiting time. The amount of jitter per potential conflict increases 'exponentially' (*cough* geometrically) with each retry. The number of expected conflicts (which drops over time) is updated at each attempt. This allows us to discover the right rate for attempting commits across all resources that are in contention, while actually reducing the delay between retries for any particular resource as the number of outstanding resources drops. Change-Id: I7d5a546a695480df309f22688b239572aa0f897a Co-Authored-By: Zane Bitter <zbitter@redhat.com> Closes-Bug: #1591469
This commit is contained in:
parent
2ced86836e
commit
bc83d86255
|
@ -13,9 +13,8 @@
|
|||
# limitations under the License.
|
||||
|
||||
import ast
|
||||
import eventlet
|
||||
import random
|
||||
import six
|
||||
import tenacity
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
|
@ -116,25 +115,55 @@ def serialize_input_data(input_data):
|
|||
return {'input_data': _serialize(input_data)}
|
||||
|
||||
|
||||
class wait_random_exponential(tenacity.wait_exponential):
|
||||
"""Random wait strategy with a geometrically increasing amount of jitter.
|
||||
|
||||
Implements the truncated binary exponential backoff algorithm as used in
|
||||
e.g. CSMA media access control. The retry occurs at a random time in a
|
||||
(geometrically) expanding interval constrained by minimum and maximum
|
||||
limits.
|
||||
"""
|
||||
def __init__(self, min=0, multiplier=1, max=tenacity._utils.MAX_WAIT,
|
||||
exp_base=2):
|
||||
super(wait_random_exponential, self).__init__(multiplier=multiplier,
|
||||
max=(max-min),
|
||||
exp_base=exp_base)
|
||||
self._random = tenacity.wait_random(min=min, max=(min + multiplier))
|
||||
|
||||
def __call__(self, previous_attempt_number, delay_since_first_attempt):
|
||||
jitter = super(wait_random_exponential,
|
||||
self).__call__(previous_attempt_number,
|
||||
delay_since_first_attempt)
|
||||
self._random.wait_random_max = self._random.wait_random_min + jitter
|
||||
return self._random(previous_attempt_number, delay_since_first_attempt)
|
||||
|
||||
|
||||
def sync(cnxt, entity_id, current_traversal, is_update, propagate,
|
||||
predecessors, new_data):
|
||||
rows_updated = None
|
||||
sync_point = None
|
||||
input_data = None
|
||||
nconflicts = max(0, len(predecessors) - 2)
|
||||
# limit to 10 seconds
|
||||
max_wt = min(nconflicts * 0.01, 10)
|
||||
while not rows_updated:
|
||||
# Retry waits up to 60 seconds at most, with exponentially increasing
|
||||
# amounts of jitter per resource still outstanding
|
||||
wait_strategy = wait_random_exponential(max=60)
|
||||
|
||||
def init_jitter(existing_input_data):
|
||||
nconflicts = max(0, len(predecessors) - len(existing_input_data) - 1)
|
||||
# 10ms per potential conflict, up to a max of 10s in total
|
||||
return min(nconflicts, 1000) * 0.01
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_result(lambda r: r is None),
|
||||
wait=wait_strategy
|
||||
)
|
||||
def _sync():
|
||||
sync_point = get(cnxt, entity_id, current_traversal, is_update)
|
||||
input_data = deserialize_input_data(sync_point.input_data)
|
||||
wait_strategy.multiplier = init_jitter(input_data)
|
||||
input_data.update(new_data)
|
||||
rows_updated = update_input_data(
|
||||
cnxt, entity_id, current_traversal, is_update,
|
||||
sync_point.atomic_key, serialize_input_data(input_data))
|
||||
# don't aggressively spin; induce some sleep
|
||||
if not rows_updated:
|
||||
eventlet.sleep(random.uniform(0, max_wt))
|
||||
return input_data if rows_updated else None
|
||||
|
||||
input_data = _sync()
|
||||
waiting = predecessors - set(input_data)
|
||||
key = make_key(entity_id, current_traversal, is_update)
|
||||
if waiting:
|
||||
|
|
|
@ -74,15 +74,16 @@ class SyncPointTestCase(common.HeatTestCase):
|
|||
self.assertEqual({'input_data': {u'tuple:(3, 8)': None}}, res)
|
||||
|
||||
@mock.patch('heat.engine.sync_point.update_input_data', return_value=None)
|
||||
@mock.patch('eventlet.sleep', side_effect=exception.DBError)
|
||||
@mock.patch('time.sleep', side_effect=exception.DBError)
|
||||
def sync_with_sleep(self, ctx, stack, mock_sleep_time, mock_uid):
|
||||
resource = stack['C']
|
||||
graph = stack.convergence_dependencies.graph()
|
||||
|
||||
mock_callback = mock.Mock()
|
||||
sender = (3, True)
|
||||
self.assertRaises(exception.DBError, sync_point.sync, ctx, resource.id,
|
||||
stack.current_traversal, True, mock_callback,
|
||||
set(graph[(resource.id, True)]), {})
|
||||
set(graph[(resource.id, True)]), {sender: None})
|
||||
return mock_sleep_time
|
||||
|
||||
def test_sync_with_time_throttle(self):
|
||||
|
@ -92,4 +93,4 @@ class SyncPointTestCase(common.HeatTestCase):
|
|||
convergence=True)
|
||||
stack.converge_stack(stack.t, action=stack.CREATE)
|
||||
mock_sleep_time = self.sync_with_sleep(ctx, stack)
|
||||
mock_sleep_time.assert_called_once_with(mock.ANY)
|
||||
self.assertTrue(mock_sleep_time.called)
|
||||
|
|
Loading…
Reference in New Issue