Rework due to code comments.

This commit is contained in:
Joshua Harlow
2013-05-14 15:24:12 -07:00
parent 7999de8de7
commit 31b91e758e
2 changed files with 59 additions and 25 deletions

View File

@@ -27,6 +27,7 @@ from taskflow import exceptions as exc
from taskflow import job from taskflow import job
from taskflow import jobboard from taskflow import jobboard
from taskflow import logbook from taskflow import logbook
from taskflow import states
from taskflow import utils from taskflow import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -165,6 +166,9 @@ class MemoryJobBoard(jobboard.JobBoard):
break break
if not exists: if not exists:
raise exc.JobNotFound() raise exc.JobNotFound()
if j.state not in (states.SUCCESS, states.FAILURE):
raise exc.InvalidStateException("Can not delete a job in "
"state %s" % (j.state))
self._board = [(d, j) for (d, j) in self._board if j != job] self._board = [(d, j) for (d, j) in self._board if j != job]
@check_not_closed @check_not_closed

View File

@@ -77,21 +77,50 @@ class Workflow(object):
else: else:
result_fetcher = None result_fetcher = None
self.state = states.STARTED self._change_state(context, states.STARTED)
for task in self.tasks:
# TODO(harlowja): we can likely add in custom reconcilation strategies
# here or around here...
def do_rollback_for(task, ex):
self._change_state(context, states.REVERTING)
with excutils.save_and_reraise_exception():
try:
self._on_task_error(context, task)
except Exception:
LOG.exception("Dropping exception catched when"
" notifying about existing task"
" exception.")
self.rollback(context, exc.TaskException(task, self, ex))
self._change_state(context, states.FAILURE)
self._change_state(context, states.RESUMING)
last_task = 0
if result_fetcher:
for (i, task) in enumerate(self.tasks):
(has_result, result) = result_fetcher(context, self, task)
if not has_result:
break
# Fake running the task so that we trigger the same
# notifications and state changes (and rollback that would
# have happened in a normal flow).
last_task = i + 1
try:
self._on_task_start(context, task)
# Keep a pristine copy of the result
# so that if said result is altered by other further
# states the one here will not be. This ensures that
# if rollback occurs that the task gets exactly the
# result it returned and not a modified one.
self.results.append((task, copy.deepcopy(result)))
self._on_task_finish(context, task, result)
except Exception as ex:
do_rollback_for(task, ex)
self._change_state(context, states.RUNNING)
for task in self.tasks[last_task:]:
try: try:
# See if we have already ran this.
result = None
has_result = False
if result_fetcher:
(has_result, result) = result_fetcher(context, self, task)
if not has_result:
self.state = state.RUNNING
else:
self.state = state.RESUMING
self._on_task_start(context, task) self._on_task_start(context, task)
if not has_result: result = task.apply(context, *args, **kwargs)
result = task.apply(context, *args, **kwargs)
# Keep a pristine copy of the result # Keep a pristine copy of the result
# so that if said result is altered by other further states # so that if said result is altered by other further states
# the one here will not be. This ensures that if rollback # the one here will not be. This ensures that if rollback
@@ -100,19 +129,20 @@ class Workflow(object):
self.results.append((task, copy.deepcopy(result))) self.results.append((task, copy.deepcopy(result)))
self._on_task_finish(context, task, result) self._on_task_finish(context, task, result)
except Exception as ex: except Exception as ex:
self.state = states.FAILURE do_rollback_for(task, ex)
with excutils.save_and_reraise_exception():
try:
self._on_task_error(context, task)
except Exception:
LOG.exception("Dropping exception catched when"
" notifying about existing task"
" exception.")
self.state = states.REVERTING
self.rollback(context, exc.TaskException(task, self, ex))
self.state = states.FAILURE
# Only gets here if everything went successfully. # Only gets here if everything went successfully.
self.state = states.SUCCESS self._change_state(context, states.SUCCESS)
def _change_state(self, context, new_state):
if self.state != new_state:
self.state = new_state
self._on_flow_state_change(context)
def _on_flow_state_change(self, context):
# Notify any listeners that the internal state has changed.
for i in self.listeners:
i.notify(context, self)
def _on_task_error(self, context, task): def _on_task_error(self, context, task):
# Notify any listeners that the task has errored. # Notify any listeners that the task has errored.