Merge "Add the task to the accumulator before running."
This commit is contained in:
@@ -145,6 +145,11 @@ class Flow(object):
|
||||
def run_task(task, failed=False, result=None, simulate_run=False):
|
||||
try:
|
||||
self._on_task_start(context, task)
|
||||
# Add the task to be rolled back *immediately* so that even if
|
||||
# the task fails while producing results it will be given a
|
||||
# chance to rollback.
|
||||
rb = RollbackTask(context, task, result=None)
|
||||
self._accumulator.add(rb)
|
||||
if not simulate_run:
|
||||
inputs = self._fetch_task_inputs(task)
|
||||
if not inputs:
|
||||
@@ -165,21 +170,25 @@ class Flow(object):
|
||||
" object: %s", result)
|
||||
result = exc.InvalidStateException()
|
||||
raise result
|
||||
# Alter the index we have ran at.
|
||||
self._left_off_at += 1
|
||||
# Keep a pristine copy of the result
|
||||
# so that if said result is altered by other further
|
||||
# states the one here will not be. This ensures that
|
||||
# if rollback occurs that the task gets exactly the
|
||||
# result it returned and not a modified one.
|
||||
self.results.append((task, result))
|
||||
# Add the task result to the accumulator before
|
||||
# Adjust the task result in the accumulator before
|
||||
# notifying others that the task has finished to
|
||||
# avoid the case where a listener might throw an
|
||||
# exception.
|
||||
self._accumulator.add(RollbackTask(context, task,
|
||||
copy.deepcopy(result)))
|
||||
self._on_task_finish(context, task, result)
|
||||
#
|
||||
# Note(harlowja): Keep the original result in the
|
||||
# accumulator only and give a duplicated copy to
|
||||
# avoid the original result being altered by other
|
||||
# tasks.
|
||||
#
|
||||
# This is due to python being by reference (which means
|
||||
# some task could alter this result intentionally or not
|
||||
# intentionally).
|
||||
rb.result = result
|
||||
# Alter the index we have ran at.
|
||||
self._left_off_at += 1
|
||||
result_copy = copy.deepcopy(result)
|
||||
self.results.append((task, result_copy))
|
||||
self._on_task_finish(context, task, result_copy)
|
||||
except Exception as e:
|
||||
cause = FlowFailure(task, self, e)
|
||||
with excutils.save_and_reraise_exception():
|
||||
|
||||
Reference in New Issue
Block a user