Fixed (hopefully) db transaction errors
Previous implementation were not respecting DB transactions (sessions), so data consistency is not guaranteed. This patch attempts to fix that.
This commit is contained in:
@@ -835,16 +835,17 @@ def _action_dependency_add(context, action_id, field, adds):
|
||||
being completed.')
|
||||
|
||||
|
||||
def _action_dependency_del(context, action_id, field, dels):
|
||||
def _action_dependency_del(query, action_id, field, dels):
|
||||
if not isinstance(dels, list):
|
||||
del_list = [dels]
|
||||
else:
|
||||
del_list = dels
|
||||
|
||||
action = model_query(context, models.Action).get(action_id)
|
||||
action = query.get(action_id)
|
||||
if not action:
|
||||
msg = _('Action with id "%s" not found') % action_id
|
||||
raise exception.NotFound(msg)
|
||||
LOG.warning(msg)
|
||||
return
|
||||
|
||||
if action[field] is not None:
|
||||
action[field] = list(set(action[field]) - set(del_list))
|
||||
@@ -860,13 +861,16 @@ def action_add_dependency(context, depended, dependent):
|
||||
raise exception.NotSupport(
|
||||
_('Multiple dependencies between lists not support'))
|
||||
|
||||
if isinstance(depended, list): # e.g. D depends on A,B,C
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
for d in depended:
|
||||
_action_dependency_add(context, d, "depended_by", dependent)
|
||||
query = model_query(context, models.Action)
|
||||
session = query.session
|
||||
|
||||
_action_dependency_add(context, dependent, "depends_on", depended)
|
||||
if isinstance(depended, list): # e.g. D depends on A,B,C
|
||||
session.begin()
|
||||
for d in depended:
|
||||
_action_dependency_add(context, d, "depended_by", dependent)
|
||||
|
||||
_action_dependency_add(context, dependent, "depends_on", depended)
|
||||
session.commit()
|
||||
return
|
||||
|
||||
# Only dependent can be a list now, convert it to a list if it
|
||||
@@ -876,13 +880,12 @@ def action_add_dependency(context, depended, dependent):
|
||||
else:
|
||||
dependents = dependent
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
_action_dependency_add(context, depended, "depended_by", dependent)
|
||||
session.begin()
|
||||
_action_dependency_add(context, depended, "depended_by", dependent)
|
||||
|
||||
for d in dependents:
|
||||
_action_dependency_add(context, d, "depends_on", depended)
|
||||
return
|
||||
for d in dependents:
|
||||
_action_dependency_add(context, d, "depends_on", depended)
|
||||
session.commit()
|
||||
|
||||
|
||||
def action_del_dependency(context, depended, dependent):
|
||||
@@ -890,13 +893,16 @@ def action_del_dependency(context, depended, dependent):
|
||||
raise exception.NotSupport(
|
||||
_('Multiple dependencies between lists not support'))
|
||||
|
||||
if isinstance(depended, list): # e.g. D depends on A,B,C
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
for d in depended:
|
||||
_action_dependency_del(context, d, "depended_by", dependent)
|
||||
query = model_query(context, models.Action)
|
||||
session = query.session
|
||||
|
||||
_action_dependency_del(context, dependent, "depends_on", depended)
|
||||
if isinstance(depended, list): # e.g. D depends on A,B,C
|
||||
session.begin()
|
||||
for d in depended:
|
||||
_action_dependency_del(query, d, "depended_by", dependent)
|
||||
|
||||
_action_dependency_del(query, dependent, "depends_on", depended)
|
||||
session.commit()
|
||||
return
|
||||
|
||||
# Only dependent can be a list now, convert it to a list if it
|
||||
@@ -906,13 +912,12 @@ def action_del_dependency(context, depended, dependent):
|
||||
else:
|
||||
dependents = dependent
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
_action_dependency_del(context, depended, "depended_by", dependent)
|
||||
session.begin()
|
||||
_action_dependency_del(query, depended, "depended_by", dependent)
|
||||
|
||||
for d in dependents:
|
||||
_action_dependency_del(context, d, "depends_on", depended)
|
||||
return
|
||||
for d in dependents:
|
||||
_action_dependency_del(query, d, "depends_on", depended)
|
||||
session.commit()
|
||||
|
||||
|
||||
def action_mark_succeeded(context, action_id):
|
||||
@@ -922,31 +927,29 @@ def action_mark_succeeded(context, action_id):
|
||||
raise exception.NotFound(
|
||||
_('Action with id "%s" not found') % action_id)
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
action.status = ACTION_SUCCEEDED
|
||||
session = query.session
|
||||
session.begin()
|
||||
|
||||
for a in action.depended_by:
|
||||
_action_dependency_del(context, a, 'depends_on', action_id)
|
||||
action.depended_by = []
|
||||
action.status = ACTION_SUCCEEDED
|
||||
|
||||
for a in action.depended_by:
|
||||
_action_dependency_del(query, a, 'depends_on', action_id)
|
||||
action.depended_by = []
|
||||
|
||||
session.commit()
|
||||
return action
|
||||
|
||||
|
||||
def _mark_failed_action(context, action_id):
|
||||
query = model_query(context, models.Action)
|
||||
def _mark_failed_action(query, action_id):
|
||||
action = query.get(action_id)
|
||||
if not action:
|
||||
raise exception.NotFound(
|
||||
_('Action with id "%s" not found') % action_id)
|
||||
LOG.warning(_('Action with id "%s" not found') % action_id)
|
||||
return
|
||||
|
||||
action.status = ACTION_FAILED
|
||||
|
||||
if action.depended_by is not None:
|
||||
for a in action.depended_by:
|
||||
_mark_failed_action(context, a)
|
||||
|
||||
return True
|
||||
_mark_failed_action(query, a)
|
||||
|
||||
|
||||
def action_mark_failed(context, action_id):
|
||||
@@ -956,32 +959,27 @@ def action_mark_failed(context, action_id):
|
||||
raise exception.NotFound(
|
||||
_('Action with id "%s" not found') % action_id)
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
action.status = ACTION_FAILED
|
||||
session = query.session
|
||||
session.begin()
|
||||
action.status = ACTION_FAILED
|
||||
|
||||
if action.depended_by is not None:
|
||||
for a in action.depended_by:
|
||||
#'''recursion'''
|
||||
_mark_failed_action(context, a)
|
||||
for a in action.depended_by:
|
||||
_mark_failed_action(query, a)
|
||||
|
||||
session.commit()
|
||||
return action
|
||||
|
||||
|
||||
def _mark_cancelled_action(context, action_id):
|
||||
query = model_query(context, models.Action)
|
||||
def _mark_cancelled_action(query, action_id):
|
||||
action = query.get(action_id)
|
||||
if not action:
|
||||
raise exception.NotFound(
|
||||
_('Action with id "%s" not found') % action_id)
|
||||
LOG.warning(_('Action with id "%s" not found') % action_id)
|
||||
return
|
||||
|
||||
action.status = ACTION_CANCELED
|
||||
|
||||
if action.depended_by is not None:
|
||||
for a in action.depended_by:
|
||||
_mark_cancelled_action(context, a)
|
||||
|
||||
return True
|
||||
_mark_cancelled_action(query, a)
|
||||
|
||||
|
||||
def action_mark_cancelled(context, action_id):
|
||||
@@ -991,20 +989,20 @@ def action_mark_cancelled(context, action_id):
|
||||
raise exception.NotFound(
|
||||
_('Action with id "%s" not found') % action_id)
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
action.status = ACTION_CANCELED
|
||||
session = query.session
|
||||
session.begin()
|
||||
action.status = ACTION_CANCELED
|
||||
|
||||
if action.depended_by is not None:
|
||||
for a in action.depended_by:
|
||||
#'''recursion'''
|
||||
_mark_cancelled_action(context, a)
|
||||
for a in action.depended_by:
|
||||
_mark_cancelled_action(query, a)
|
||||
session.commit()
|
||||
|
||||
return action
|
||||
|
||||
|
||||
def action_start_work_on(context, action_id, owner):
|
||||
action = model_query(context, models.Action).get(action_id)
|
||||
query = model_query(context, models.Action)
|
||||
action = query.get(action_id)
|
||||
if not action:
|
||||
raise exception.NotFound(
|
||||
_('Action with id "%s" not found') % action_id)
|
||||
@@ -1012,13 +1010,11 @@ def action_start_work_on(context, action_id, owner):
|
||||
if action.owner and action.owner != owner:
|
||||
raise exception.ActionBeingWorked(owner=action.owner)
|
||||
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
action.owner = owner
|
||||
action.status = ACTION_RUNNING
|
||||
action.status_reason = _('The action is being processed.')
|
||||
action.save(_session(context))
|
||||
return action
|
||||
action.owner = owner
|
||||
action.status = ACTION_RUNNING
|
||||
action.status_reason = _('The action is being processed.')
|
||||
action.save(query.session)
|
||||
return action
|
||||
|
||||
|
||||
def action_lock_check(context, action_id, owner=None):
|
||||
@@ -1028,25 +1024,20 @@ def action_lock_check(context, action_id, owner=None):
|
||||
_('Action with id "%s" not found') % action_id)
|
||||
|
||||
if owner:
|
||||
if action.owner == owner:
|
||||
return owner
|
||||
else:
|
||||
return action.owner
|
||||
return owner if owner == action.owner else action.owner
|
||||
else:
|
||||
if action.owner:
|
||||
return action.owner
|
||||
else:
|
||||
return False
|
||||
return action.owner if action.owner else None
|
||||
|
||||
|
||||
def action_control(context, action_id, value):
|
||||
action = model_query(context, models.Action).get(action_id)
|
||||
query = model_query(context, models.Action)
|
||||
action = query.get(action_id)
|
||||
if not action:
|
||||
raise exception.NotFound(
|
||||
_('Action with id "%s" not found') % action_id)
|
||||
|
||||
action.control = value
|
||||
action.save(_session(context))
|
||||
action.save(query.session)
|
||||
|
||||
|
||||
def action_control_check(context, action_id):
|
||||
@@ -1055,12 +1046,12 @@ def action_control_check(context, action_id):
|
||||
raise exception.NotFound(
|
||||
_('Action with id "%s" not found') % action_id)
|
||||
|
||||
value = action.control
|
||||
return value
|
||||
return action.control
|
||||
|
||||
|
||||
def action_delete(context, action_id, force=False):
|
||||
action = action_get(context, action_id)
|
||||
query = model_query(context, models.Action)
|
||||
action = query.get(action_id)
|
||||
|
||||
if not action:
|
||||
msg = _('Attempt to delete a action with id "%s" that does not'
|
||||
@@ -1069,6 +1060,7 @@ def action_delete(context, action_id, force=False):
|
||||
|
||||
# TODO(liuh): Need check if and how an action can be safety deleted
|
||||
action.delete()
|
||||
query.session.flush()
|
||||
|
||||
|
||||
# Utils
|
||||
|
||||
Reference in New Issue
Block a user