Convergence: Make SyncPoint.update_input_data actually atomic.
Co-Authored-By: Sirushti Murugesan <sirushti.murugesan@hp.com> Co-Authored-By: Anant Patil <anant.patil@hp.com> Change-Id: I3ed7f50d9d48c3c8713c167d2864464c0fefdb70
This commit is contained in:
parent
7e7aad34a8
commit
abb69bb554
|
@ -68,10 +68,12 @@ def delete_all(context, stack_id, traversal_id):
|
||||||
|
|
||||||
def update_input_data(context, entity_id, current_traversal,
|
def update_input_data(context, entity_id, current_traversal,
|
||||||
is_update, atomic_key, input_data):
|
is_update, atomic_key, input_data):
|
||||||
sync_point_object.SyncPoint.update_input_data(
|
rows_updated = sync_point_object.SyncPoint.update_input_data(
|
||||||
context, entity_id, current_traversal, is_update, atomic_key,
|
context, entity_id, current_traversal, is_update, atomic_key,
|
||||||
input_data)
|
input_data)
|
||||||
|
|
||||||
|
return rows_updated
|
||||||
|
|
||||||
|
|
||||||
def deserialize_input_data(db_input_data):
|
def deserialize_input_data(db_input_data):
|
||||||
db_input_data = db_input_data.get('input_data')
|
db_input_data = db_input_data.get('input_data')
|
||||||
|
@ -87,17 +89,19 @@ def serialize_input_data(input_data):
|
||||||
|
|
||||||
def sync(cnxt, entity_id, current_traversal, is_update, propagate,
|
def sync(cnxt, entity_id, current_traversal, is_update, propagate,
|
||||||
predecessors, new_data):
|
predecessors, new_data):
|
||||||
sync_point = get(cnxt, entity_id, current_traversal,
|
rows_updated = None
|
||||||
is_update)
|
sync_point = None
|
||||||
input_data = dict(deserialize_input_data(sync_point.input_data))
|
input_data = None
|
||||||
input_data.update(new_data)
|
while not rows_updated:
|
||||||
|
# TODO(sirushtim): Add a conf option to add no. of retries
|
||||||
|
sync_point = get(cnxt, entity_id, current_traversal, is_update)
|
||||||
|
input_data = dict(deserialize_input_data(sync_point.input_data))
|
||||||
|
input_data.update(new_data)
|
||||||
|
rows_updated = update_input_data(
|
||||||
|
cnxt, entity_id, current_traversal, is_update,
|
||||||
|
sync_point.atomic_key, serialize_input_data(input_data))
|
||||||
|
|
||||||
waiting = predecessors - set(input_data)
|
waiting = predecessors - set(input_data)
|
||||||
|
|
||||||
# Note: update must be atomic
|
|
||||||
update_input_data(cnxt, entity_id, current_traversal,
|
|
||||||
is_update, sync_point.atomic_key,
|
|
||||||
serialize_input_data(input_data))
|
|
||||||
|
|
||||||
key = make_key(entity_id, current_traversal, is_update)
|
key = make_key(entity_id, current_traversal, is_update)
|
||||||
if waiting:
|
if waiting:
|
||||||
LOG.debug('[%s] Waiting %s: Got %s; still need %s',
|
LOG.debug('[%s] Waiting %s: Got %s; still need %s',
|
||||||
|
|
Loading…
Reference in New Issue