diff --git a/taskflow/examples/resume_from_backend.out.txt b/taskflow/examples/resume_from_backend.out.txt index 6d1116d0..9d80a0d1 100644 --- a/taskflow/examples/resume_from_backend.out.txt +++ b/taskflow/examples/resume_from_backend.out.txt @@ -1,21 +1,26 @@ - -At the beginning, there is no state: -Flow state: None - -Running: +----------------------------------- +At the beginning, there is no state +----------------------------------- +Flow 'resume from backend example' state: None +------- +Running +------- executing first==1.0 - -After running: -Flow state: SUSPENDED -boom==1.0: SUCCESS, result=None -first==1.0: SUCCESS, result=ok -second==1.0: PENDING, result=None - -Resuming and running again: +------------- +After running +------------- +Flow 'resume from backend example' state: SUSPENDED + boom==1.0: SUCCESS, result=None + first==1.0: SUCCESS, result=ok + second==1.0: PENDING, result=None +-------------------------- +Resuming and running again +-------------------------- executing second==1.0 - -At the end: -Flow state: SUCCESS -boom==1.0: SUCCESS, result=None -first==1.0: SUCCESS, result=ok -second==1.0: SUCCESS, result=ok +---------- +At the end +---------- +Flow 'resume from backend example' state: SUCCESS + boom==1.0: SUCCESS, result=None + first==1.0: SUCCESS, result=ok + second==1.0: SUCCESS, result=ok diff --git a/taskflow/examples/resume_from_backend.py b/taskflow/examples/resume_from_backend.py index 519966ed..4ab1f70a 100644 --- a/taskflow/examples/resume_from_backend.py +++ b/taskflow/examples/resume_from_backend.py @@ -33,17 +33,36 @@ from taskflow.persistence import backends from taskflow import task from taskflow.utils import persistence_utils as p_utils +# INTRO: In this example linear_flow is used to group three tasks, one which +# will suspend the future work the engine may do. This suspend engine is then +# discarded and the workflow is reloaded from the persisted data and then the +# workflow is resumed from where it was suspended. This allows you to see how +# to start an engine, have a task stop the engine from doing future work (if +# a multi-threaded engine is being used, then the currently active work is not +# preempted) and then resume the work later. ### UTILITY FUNCTIONS ######################################### +def print_wrapped(text): + print("-" * (len(text))) + print(text) + print("-" * (len(text))) + + def print_task_states(flowdetail, msg): - print(msg) - print('Flow state: %s' % flowdetail.state) + print_wrapped(msg) + # NOTE(harlowja): for this example, just avoid printing uuids so that + # we can correctly compare this output with the expected output, which + # is used by our example test verification (uuids would be different each + # time if we did not do this). + print("Flow '%s' state: %s" % (flowdetail.name, flowdetail.state)) + # Sort by these so that our test validation doesn't get confused by the + # order in which the items in the flow detail can be in. items = sorted((td.name, td.version, td.state, td.results) for td in flowdetail) for item in items: - print("%s==%s: %s, result=%s" % item) + print(" %s==%s: %s, result=%s" % item) def get_backend(): @@ -98,20 +117,28 @@ flowdetail = p_utils.create_flow_detail(flow, logbook, backend) engine = taskflow.engines.load(flow, flow_detail=flowdetail, backend=backend) -print_task_states(flowdetail, "\nAt the beginning, there is no state:") -print("\nRunning:") +print_task_states(flowdetail, "At the beginning, there is no state") +print_wrapped("Running") engine.run() -print_task_states(flowdetail, "\nAfter running:") +print_task_states(flowdetail, "After running") ### RE-CREATE, RESUME, RUN #################################### -print("\nResuming and running again:") -# reload flowdetail from backend +print_wrapped("Resuming and running again") + +# NOTE(harlowja): reload the flow detail from backend, this will allow us to +# resume the flow from its suspended state, but first we need to search for +# the right flow details in the correct logbook where things are stored. +# +# We could avoid re-loading the engine and just do engine.run() again, but this +# example shows how another process may unsuspend a given flow and start it +# again for situations where this is useful to-do (say the process running +# the above flow crashes). flowdetail2 = find_flow_detail(backend, logbook.uuid, flowdetail.uuid) engine2 = taskflow.engines.load(flow_factory(), flow_detail=flowdetail, backend=backend) engine2.run() -print_task_states(flowdetail, "\nAt the end:") +print_task_states(flowdetail, "At the end")