diff --git a/heat/engine/sync_point.py b/heat/engine/sync_point.py index 0ea8dd2048..09f92365d0 100644 --- a/heat/engine/sync_point.py +++ b/heat/engine/sync_point.py @@ -13,9 +13,8 @@ # limitations under the License. import ast -import eventlet -import random import six +import tenacity from oslo_log import log as logging @@ -116,25 +115,55 @@ def serialize_input_data(input_data): return {'input_data': _serialize(input_data)} +class wait_random_exponential(tenacity.wait_exponential): + """Random wait strategy with a geometrically increasing amount of jitter. + + Implements the truncated binary exponential backoff algorithm as used in + e.g. CSMA media access control. The retry occurs at a random time in a + (geometrically) expanding interval constrained by minimum and maximum + limits. + """ + def __init__(self, min=0, multiplier=1, max=tenacity._utils.MAX_WAIT, + exp_base=2): + super(wait_random_exponential, self).__init__(multiplier=multiplier, + max=(max-min), + exp_base=exp_base) + self._random = tenacity.wait_random(min=min, max=(min + multiplier)) + + def __call__(self, previous_attempt_number, delay_since_first_attempt): + jitter = super(wait_random_exponential, + self).__call__(previous_attempt_number, + delay_since_first_attempt) + self._random.wait_random_max = self._random.wait_random_min + jitter + return self._random(previous_attempt_number, delay_since_first_attempt) + + def sync(cnxt, entity_id, current_traversal, is_update, propagate, predecessors, new_data): - rows_updated = None - sync_point = None - input_data = None - nconflicts = max(0, len(predecessors) - 2) - # limit to 10 seconds - max_wt = min(nconflicts * 0.01, 10) - while not rows_updated: + # Retry waits up to 60 seconds at most, with exponentially increasing + # amounts of jitter per resource still outstanding + wait_strategy = wait_random_exponential(max=60) + + def init_jitter(existing_input_data): + nconflicts = max(0, len(predecessors) - len(existing_input_data) - 1) + # 10ms per potential conflict, up to a max of 10s in total + return min(nconflicts, 1000) * 0.01 + + @tenacity.retry( + retry=tenacity.retry_if_result(lambda r: r is None), + wait=wait_strategy + ) + def _sync(): sync_point = get(cnxt, entity_id, current_traversal, is_update) input_data = deserialize_input_data(sync_point.input_data) + wait_strategy.multiplier = init_jitter(input_data) input_data.update(new_data) rows_updated = update_input_data( cnxt, entity_id, current_traversal, is_update, sync_point.atomic_key, serialize_input_data(input_data)) - # don't aggressively spin; induce some sleep - if not rows_updated: - eventlet.sleep(random.uniform(0, max_wt)) + return input_data if rows_updated else None + input_data = _sync() waiting = predecessors - set(input_data) key = make_key(entity_id, current_traversal, is_update) if waiting: diff --git a/heat/tests/engine/test_sync_point.py b/heat/tests/engine/test_sync_point.py index 615f70cd32..cbe03d1641 100644 --- a/heat/tests/engine/test_sync_point.py +++ b/heat/tests/engine/test_sync_point.py @@ -74,15 +74,16 @@ class SyncPointTestCase(common.HeatTestCase): self.assertEqual({'input_data': {u'tuple:(3, 8)': None}}, res) @mock.patch('heat.engine.sync_point.update_input_data', return_value=None) - @mock.patch('eventlet.sleep', side_effect=exception.DBError) + @mock.patch('time.sleep', side_effect=exception.DBError) def sync_with_sleep(self, ctx, stack, mock_sleep_time, mock_uid): resource = stack['C'] graph = stack.convergence_dependencies.graph() mock_callback = mock.Mock() + sender = (3, True) self.assertRaises(exception.DBError, sync_point.sync, ctx, resource.id, stack.current_traversal, True, mock_callback, - set(graph[(resource.id, True)]), {}) + set(graph[(resource.id, True)]), {sender: None}) return mock_sleep_time def test_sync_with_time_throttle(self): @@ -92,4 +93,4 @@ class SyncPointTestCase(common.HeatTestCase): convergence=True) stack.converge_stack(stack.t, action=stack.CREATE) mock_sleep_time = self.sync_with_sleep(ctx, stack) - mock_sleep_time.assert_called_once_with(mock.ANY) + self.assertTrue(mock_sleep_time.called)