Merge "Convergence: Make SyncPoint.update_input_data actually atomic."
This commit is contained in:
commit
4eb3d3e688
|
@ -68,10 +68,12 @@ def delete_all(context, stack_id, traversal_id):
|
|||
|
||||
def update_input_data(context, entity_id, current_traversal,
|
||||
is_update, atomic_key, input_data):
|
||||
sync_point_object.SyncPoint.update_input_data(
|
||||
rows_updated = sync_point_object.SyncPoint.update_input_data(
|
||||
context, entity_id, current_traversal, is_update, atomic_key,
|
||||
input_data)
|
||||
|
||||
return rows_updated
|
||||
|
||||
|
||||
def deserialize_input_data(db_input_data):
|
||||
db_input_data = db_input_data.get('input_data')
|
||||
|
@ -87,17 +89,19 @@ def serialize_input_data(input_data):
|
|||
|
||||
def sync(cnxt, entity_id, current_traversal, is_update, propagate,
|
||||
predecessors, new_data):
|
||||
sync_point = get(cnxt, entity_id, current_traversal,
|
||||
is_update)
|
||||
input_data = dict(deserialize_input_data(sync_point.input_data))
|
||||
input_data.update(new_data)
|
||||
rows_updated = None
|
||||
sync_point = None
|
||||
input_data = None
|
||||
while not rows_updated:
|
||||
# TODO(sirushtim): Add a conf option to add no. of retries
|
||||
sync_point = get(cnxt, entity_id, current_traversal, is_update)
|
||||
input_data = dict(deserialize_input_data(sync_point.input_data))
|
||||
input_data.update(new_data)
|
||||
rows_updated = update_input_data(
|
||||
cnxt, entity_id, current_traversal, is_update,
|
||||
sync_point.atomic_key, serialize_input_data(input_data))
|
||||
|
||||
waiting = predecessors - set(input_data)
|
||||
|
||||
# Note: update must be atomic
|
||||
update_input_data(cnxt, entity_id, current_traversal,
|
||||
is_update, sync_point.atomic_key,
|
||||
serialize_input_data(input_data))
|
||||
|
||||
key = make_key(entity_id, current_traversal, is_update)
|
||||
if waiting:
|
||||
LOG.debug('[%s] Waiting %s: Got %s; still need %s',
|
||||
|
|
Loading…
Reference in New Issue