From bc83d86255a61b5af7864889cd90960069ade598 Mon Sep 17 00:00:00 2001 From: ricolin Date: Wed, 3 Aug 2016 16:22:29 +0800 Subject: [PATCH] Support tenacity exponential backoff retry on resource sync Change to use tenacity as the retry library for SyncPoints. Use exponential backoff retry waiting time. The amount of jitter per potential conflict increases 'exponentially' (*cough* geometrically) with each retry. The number of expected conflicts (which drops over time) is updated at each attempt. This allows us to discover the right rate for attempting commits across all resources that are in contention, while actually reducing the delay between retries for any particular resource as the number of outstanding resources drops. Change-Id: I7d5a546a695480df309f22688b239572aa0f897a Co-Authored-By: Zane Bitter Closes-Bug: #1591469 --- heat/engine/sync_point.py | 53 +++++++++++++++++++++------- heat/tests/engine/test_sync_point.py | 7 ++-- 2 files changed, 45 insertions(+), 15 deletions(-) diff --git a/heat/engine/sync_point.py b/heat/engine/sync_point.py index a8feea4af1..73ac23bcf2 100644 --- a/heat/engine/sync_point.py +++ b/heat/engine/sync_point.py @@ -13,9 +13,8 @@ # limitations under the License. import ast -import eventlet -import random import six +import tenacity from oslo_log import log as logging @@ -116,25 +115,55 @@ def serialize_input_data(input_data): return {'input_data': _serialize(input_data)} +class wait_random_exponential(tenacity.wait_exponential): + """Random wait strategy with a geometrically increasing amount of jitter. + + Implements the truncated binary exponential backoff algorithm as used in + e.g. CSMA media access control. The retry occurs at a random time in a + (geometrically) expanding interval constrained by minimum and maximum + limits. + """ + def __init__(self, min=0, multiplier=1, max=tenacity._utils.MAX_WAIT, + exp_base=2): + super(wait_random_exponential, self).__init__(multiplier=multiplier, + max=(max-min), + exp_base=exp_base) + self._random = tenacity.wait_random(min=min, max=(min + multiplier)) + + def __call__(self, previous_attempt_number, delay_since_first_attempt): + jitter = super(wait_random_exponential, + self).__call__(previous_attempt_number, + delay_since_first_attempt) + self._random.wait_random_max = self._random.wait_random_min + jitter + return self._random(previous_attempt_number, delay_since_first_attempt) + + def sync(cnxt, entity_id, current_traversal, is_update, propagate, predecessors, new_data): - rows_updated = None - sync_point = None - input_data = None - nconflicts = max(0, len(predecessors) - 2) - # limit to 10 seconds - max_wt = min(nconflicts * 0.01, 10) - while not rows_updated: + # Retry waits up to 60 seconds at most, with exponentially increasing + # amounts of jitter per resource still outstanding + wait_strategy = wait_random_exponential(max=60) + + def init_jitter(existing_input_data): + nconflicts = max(0, len(predecessors) - len(existing_input_data) - 1) + # 10ms per potential conflict, up to a max of 10s in total + return min(nconflicts, 1000) * 0.01 + + @tenacity.retry( + retry=tenacity.retry_if_result(lambda r: r is None), + wait=wait_strategy + ) + def _sync(): sync_point = get(cnxt, entity_id, current_traversal, is_update) input_data = deserialize_input_data(sync_point.input_data) + wait_strategy.multiplier = init_jitter(input_data) input_data.update(new_data) rows_updated = update_input_data( cnxt, entity_id, current_traversal, is_update, sync_point.atomic_key, serialize_input_data(input_data)) - # don't aggressively spin; induce some sleep - if not rows_updated: - eventlet.sleep(random.uniform(0, max_wt)) + return input_data if rows_updated else None + input_data = _sync() waiting = predecessors - set(input_data) key = make_key(entity_id, current_traversal, is_update) if waiting: diff --git a/heat/tests/engine/test_sync_point.py b/heat/tests/engine/test_sync_point.py index 615f70cd32..cbe03d1641 100644 --- a/heat/tests/engine/test_sync_point.py +++ b/heat/tests/engine/test_sync_point.py @@ -74,15 +74,16 @@ class SyncPointTestCase(common.HeatTestCase): self.assertEqual({'input_data': {u'tuple:(3, 8)': None}}, res) @mock.patch('heat.engine.sync_point.update_input_data', return_value=None) - @mock.patch('eventlet.sleep', side_effect=exception.DBError) + @mock.patch('time.sleep', side_effect=exception.DBError) def sync_with_sleep(self, ctx, stack, mock_sleep_time, mock_uid): resource = stack['C'] graph = stack.convergence_dependencies.graph() mock_callback = mock.Mock() + sender = (3, True) self.assertRaises(exception.DBError, sync_point.sync, ctx, resource.id, stack.current_traversal, True, mock_callback, - set(graph[(resource.id, True)]), {}) + set(graph[(resource.id, True)]), {sender: None}) return mock_sleep_time def test_sync_with_time_throttle(self): @@ -92,4 +93,4 @@ class SyncPointTestCase(common.HeatTestCase): convergence=True) stack.converge_stack(stack.t, action=stack.CREATE) mock_sleep_time = self.sync_with_sleep(ctx, stack) - mock_sleep_time.assert_called_once_with(mock.ANY) + self.assertTrue(mock_sleep_time.called)