diff --git a/taskflow/backends/memory.py b/taskflow/backends/memory.py index 6023a81b..6dd39853 100644 --- a/taskflow/backends/memory.py +++ b/taskflow/backends/memory.py @@ -91,10 +91,11 @@ class MemoryCatalog(catalog.Catalog): self._catalogs = [(j, b) for (j, b) in self._catalogs if j != job] -class MemoryWorkflowDetail(logbook.WorkflowDetail): - def __init__(self, book, name): - super(MemoryWorkflowDetail, self).__init__(book, name) +class MemoryFlowDetail(logbook.FlowDetail): + def __init__(self, book, name, task_cls=logbook.TaskDetail): + super(MemoryFlowDetail, self).__init__(book, name) self._tasks = [] + self._task_cls = task_cls def __iter__(self): for t in self._tasks: @@ -106,64 +107,68 @@ class MemoryWorkflowDetail(logbook.WorkflowDetail): return True return False - def fetch_tasks(self, task_name): + def __getitem__(self, task_name): return [t for t in self if t.name == task_name] def __len__(self): return len(self._tasks) - def add_task(self, task_details): + def add_task(self, task_name): + task_details = self._task_cls(task_name) self._tasks.append(task_details) + return task_details - def delete_tasks(self, task_name): + def __delitem__(self, task_name): self._tasks = [t for t in self if t.name != task_name] class MemoryLogBook(logbook.LogBook): def __init__(self): super(MemoryLogBook, self).__init__() - self._workflows = [] - self._workflow_names = set() + self._flows = [] + self._flow_names = set() self._closed = False @check_not_closed - def add_workflow(self, workflow_name): - if workflow_name in self._workflow_names: + def add_flow(self, flow_name): + if flow_name in self._flow_names: raise exc.AlreadyExists() - self._workflows.append(MemoryWorkflowDetail(self, workflow_name)) - self._workflow_names.add(workflow_name) + f = MemoryFlowDetail(self, flow_name) + self._flows.append(f) + self._flow_names.add(flow_name) + return f @check_not_closed - def fetch_workflow(self, workflow_name): - if workflow_name not in self._workflow_names: + def __getitem__(self, flow_name): + if flow_name not in self._flow_names: raise exc.NotFound() - for w in self._workflows: - if w.name == workflow_name: + for w in self._flows: + if w.name == flow_name: return w @check_not_closed def __iter__(self): - for w in self._workflows: + for w in self._flows: yield w def close(self): self._closed = True @check_not_closed - def __contains__(self, workflow_name): + def __contains__(self, flow_name): try: - self.fetch_workflow(workflow_name) + self[flow_name] return True except exc.NotFound: return False - def delete_workflow(self, workflow_name): - w = self.fetch_workflow(workflow_name) - self._workflow_names.remove(workflow_name) - self._workflows.remove(w) + def __delitem__(self, flow_name): + w = self[flow_name] + self._flow_names.remove(flow_name) + self._flows.remove(w) def __len__(self): - return len(self._workflows) + return len(self._flows) class MemoryJobBoard(jobboard.JobBoard): diff --git a/taskflow/job.py b/taskflow/job.py index 4d9ef475..adccb9c2 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -97,10 +97,10 @@ class Job(object): def wf_state_change_listener(context, wf, old_state): if wf.name in self.logbook: return - self.logbook.add_workflow(wf.name) + self.logbook.add_flow(wf.name) def task_result_fetcher(context, wf, task): - wf_details = self.logbook.fetch_workflow(wf.name) + wf_details = self.logbook[wf.name] # See if it completed before so that we can use its results instead # of having to recompute them. td_name = task_state_name_functor(task, states.SUCCESS) @@ -108,21 +108,22 @@ class Job(object): # TODO(harlowja): should we be a little more cautious about # duplicate task results? Maybe we shouldn't allow them to # have the same name in the first place? - task_details = wf_details.fetch_tasks(td_name)[0] + task_details = wf_details[td_name][0] if task_details.metadata and 'result' in task_details.metadata: return (True, task_details.metadata['result']) return (False, None) def task_state_change_listener(context, state, wf, task, result=None): metadata = None - wf_details = self.logbook.fetch_workflow(wf.name) + wf_details = self.logbook[wf.name] if state == states.SUCCESS: metadata = { 'result': result, } td_name = task_state_name_functor(task, state) if td_name not in wf_details: - wf_details.add_task(logbook.TaskDetail(td_name, metadata)) + td_details = wf_details.add_task(td_name) + td_details.metadata = metadata wf.task_listeners.append(task_state_change_listener) wf.listeners.append(wf_state_change_listener) diff --git a/taskflow/logbook.py b/taskflow/logbook.py index b9b28246..e2194d9d 100644 --- a/taskflow/logbook.py +++ b/taskflow/logbook.py @@ -29,14 +29,15 @@ class TaskDetail(object): self.date_created = datetime.utcnow() self.name = name self.metadata = metadata + self.date_updated = None def __str__(self): return "TaskDetail (%s, %s): %s" % (self.name, self.date_created, self.metadata) -class WorkflowDetail(object): - """Workflow details have the bare minimum of these fields/methods.""" +class FlowDetail(object): + """Flow details have the bare minimum of these fields/methods.""" __metaclass__ = abc.ABCMeta @@ -54,31 +55,32 @@ class WorkflowDetail(object): @abc.abstractmethod def __contains__(self, task_name): """Determines if any task details with the given name exists in this - workflow details.""" + flow details.""" raise NotImplementedError() @abc.abstractmethod - def fetch_tasks(self, task_name): + def __getitem__(self, task_name): """Fetch any task details that match the given task name.""" raise NotImplementedError() @abc.abstractmethod - def add_task(self, task_details): - """Adds a task detail entry to this workflow details.""" + def add_task(self, task_name): + """Atomically creates a new task detail entry to this flows details and + returns it for further use.""" raise NotImplementedError() @abc.abstractmethod - def delete_tasks(self, task_name): + def __delitem__(self, task_name): """Deletes any task details that match the given task name.""" raise NotImplementedError() @abc.abstractmethod def __len__(self): - """Returns how many task details objects the workflow contains.""" + """Returns how many task details objects the flow contains.""" raise NotImplementedError() def __str__(self): - return "WorkflowDetail (%s): %s entries" % (self.name, len(self)) + return "FlowDetail (%s): %s entries" % (self.name, len(self)) class LogBook(object): @@ -87,42 +89,42 @@ class LogBook(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def add_workflow(self, workflow_name): - """Atomically adds a new workflow details object to the given logbook - or raises an exception if that workflow (or a workflow with - that name) already exists. + def add_flow(self, flow_name): + """Atomically adds and returns a new flow details object to the given + logbook or raises an exception if that flow (or a flow with that name) + already exists. """ raise NotImplementedError() @abc.abstractmethod - def fetch_workflow(self, workflow_name): - """Fetches the given workflow details object for the given workflow - name or raises an exception if that workflow name does not exist.""" + def __getitem__(self, flow_name): + """Fetches the given flow details object for the given flow + name or raises an exception if that flow name does not exist.""" raise NotImplementedError() @abc.abstractmethod - def __contains__(self, workflow_name): - """Determines if a workflow details object with the given workflow name + def __contains__(self, flow_name): + """Determines if a flow details object with the given flow name exists in this logbook.""" raise NotImplementedError() @abc.abstractmethod - def delete_workflow(self, workflow_name): - """Removes the given workflow details object that matches the provided - workflow name or raises an exception if that workflow name does not + def __delitem__(self, flow_name): + """Removes the given flow details object that matches the provided + flow name or raises an exception if that flow name does not exist.""" raise NotImplementedError() @abc.abstractmethod def __iter__(self): - """Iterates over all the contained workflow details. + """Iterates over all the contained flow details. The order will be in the same order that they were added.""" raise NotImplementedError() @abc.abstractmethod def __len__(self): - """Returns how many workflow details the logbook contains.""" + """Returns how many flow details the logbook contains.""" raise NotImplementedError() def close(self): diff --git a/taskflow/patterns/graph_workflow.py b/taskflow/patterns/graph_flow.py similarity index 88% rename from taskflow/patterns/graph_workflow.py rename to taskflow/patterns/graph_flow.py index 8f8f05ed..ac2aeb51 100644 --- a/taskflow/patterns/graph_workflow.py +++ b/taskflow/patterns/graph_flow.py @@ -25,18 +25,18 @@ from networkx.algorithms import dag from networkx.classes import digraph from taskflow import exceptions as exc -from taskflow.patterns import ordered_workflow +from taskflow.patterns import ordered_flow LOG = logging.getLogger(__name__) -class Workflow(ordered_workflow.Workflow): - """A workflow which will analyze the attached tasks input requirements and +class Flow(ordered_flow.Flow): + """A flow which will analyze the attached tasks input requirements and determine who provides said input and order the task so that said providing task will be ran before.""" def __init__(self, name, tolerant=False, parents=None): - super(Workflow, self).__init__(name, tolerant, parents) + super(Flow, self).__init__(name, tolerant, parents) self._graph = digraph.DiGraph() self._connected = False @@ -51,7 +51,7 @@ class Workflow(ordered_workflow.Workflow): def run(self, context, *args, **kwargs): self.connect() - return super(Workflow, self).run(context, *args, **kwargs) + return super(Flow, self).run(context, *args, **kwargs) def order(self): self.connect() @@ -60,7 +60,7 @@ class Workflow(ordered_workflow.Workflow): except g_exc.NetworkXUnfeasible: raise exc.InvalidStateException("Unable to correctly determine " "the path through the provided " - "workflow which will satisfy the " + "flow which will satisfy the " "tasks needed inputs and outputs.") def connect(self): diff --git a/taskflow/patterns/linear_workflow.py b/taskflow/patterns/linear_flow.py similarity index 86% rename from taskflow/patterns/linear_workflow.py rename to taskflow/patterns/linear_flow.py index 5844797d..9283c05c 100644 --- a/taskflow/patterns/linear_workflow.py +++ b/taskflow/patterns/linear_flow.py @@ -16,15 +16,15 @@ # License for the specific language governing permissions and limitations # under the License. -from taskflow.patterns import ordered_workflow +from taskflow.patterns import ordered_flow -class Workflow(ordered_workflow.Workflow): +class Flow(ordered_flow.Flow): """A linear chain of *independent* tasks that can be applied as one unit or rolled back as one unit.""" def __init__(self, name, tolerant=False, parents=None): - super(Workflow, self).__init__(name, tolerant, parents) + super(Flow, self).__init__(name, tolerant, parents) self._tasks = [] def add(self, task): diff --git a/taskflow/patterns/ordered_workflow.py b/taskflow/patterns/ordered_flow.py similarity index 91% rename from taskflow/patterns/ordered_workflow.py rename to taskflow/patterns/ordered_flow.py index 24813d8b..e6deebed 100644 --- a/taskflow/patterns/ordered_workflow.py +++ b/taskflow/patterns/ordered_flow.py @@ -28,10 +28,10 @@ from taskflow import states LOG = logging.getLogger(__name__) -class Workflow(object): +class Flow(object): """A set tasks that can be applied as one unit or rolled back as one - unit using an ordered arrangements of said tasks where reversion can be - handled by reversing through the tasks applied.""" + unit using an ordered arrangements of said tasks where reversion is by + default handled by reversing through the tasks applied.""" __metaclass__ = abc.ABCMeta @@ -43,8 +43,8 @@ class Workflow(object): # If this chain can ignore individual task reversion failure then this # should be set to true, instead of the default value of false. self.tolerant = tolerant - # If this workflow has a parent workflow/s which need to be reverted if - # this workflow fails then please include them here to allow this child + # If this flow has a parent flow/s which need to be reverted if + # this flow fails then please include them here to allow this child # to call the parents... self.parents = parents # This should be a functor that returns whether a given task has @@ -73,11 +73,11 @@ class Workflow(object): @abc.abstractmethod def add(self, task): - """Adds a given task to this workflow.""" + """Adds a given task to this flow.""" raise NotImplementedError() def __str__(self): - return "Workflow: %s" % (self.name) + return "Flow: %s" % (self.name) @abc.abstractmethod def order(self): @@ -93,7 +93,7 @@ class Workflow(object): def _perform_reconcilation(self, context, task, excp): # Attempt to reconcile the given exception that occured while applying # the given task and either reconcile said task and its associated - # failure, so that the workflow can continue or abort and perform + # failure, so that the flow can continue or abort and perform # some type of undo of the tasks already completed. try: self._change_state(context, states.REVERTING) @@ -111,7 +111,7 @@ class Workflow(object): " exception.") # The default strategy will be to rollback all the contained # tasks by calling there reverting methods, and then calling - # any parent workflows rollbacks (and so-on). + # any parent flows rollbacks (and so-on). try: self.rollback(context, cause) finally: @@ -124,7 +124,7 @@ class Workflow(object): def run(self, context, *args, **kwargs): if self.state != states.PENDING: - raise exc.InvalidStateException("Unable to run workflow when " + raise exc.InvalidStateException("Unable to run flow when " "in state %s" % (self.state)) if self.result_fetcher: @@ -233,10 +233,10 @@ class Workflow(object): def rollback(self, context, cause): # Performs basic task by task rollback by going through the reverse # order that tasks have finished and asking said task to undo whatever - # it has done. If this workflow has any parent workflows then they will + # it has done. If this flow has any parent flows then they will # also be called to rollback any tasks said parents contain. # - # Note(harlowja): if a workflow can more simply revert a whole set of + # Note(harlowja): if a flow can more simply revert a whole set of # tasks via a simpler command then it can override this method to # accomplish that. # @@ -253,14 +253,14 @@ class Workflow(object): if not self.tolerant: log_f = LOG.exception msg = ("Failed rolling back stage %(index)s (%(task)s)" - " of workflow %(workflow)s, due to inner exception.") - log_f(msg % {'index': (i + 1), 'task': task, 'workflow': self}) + " of flow %(flow)s, due to inner exception.") + log_f(msg % {'index': (i + 1), 'task': task, 'flow': self}) if not self.tolerant: # NOTE(harlowja): LOG a msg AND re-raise the exception if # the chain does not tolerate exceptions happening in the # rollback method. raise if self.parents: - # Rollback any parents workflows if they exist... + # Rollback any parents flows if they exist... for p in self.parents: p.rollback(context, cause) diff --git a/taskflow/tests/unit/test_linear_workflow.py b/taskflow/tests/unit/test_linear_flow.py similarity index 92% rename from taskflow/tests/unit/test_linear_workflow.py rename to taskflow/tests/unit/test_linear_flow.py index 4ad37d72..0a58727b 100644 --- a/taskflow/tests/unit/test_linear_workflow.py +++ b/taskflow/tests/unit/test_linear_flow.py @@ -23,14 +23,14 @@ from taskflow import states from taskflow import task from taskflow import wrappers -from taskflow.patterns import linear_workflow as lw +from taskflow.patterns import linear_flow as lw def null_functor(*args, **kwargs): return None -class LinearWorkflowTest(unittest.TestCase): +class LinearFlowTest(unittest.TestCase): def makeRevertingTask(self, token, blowup=False): def do_apply(token, context, *args, **kwargs): @@ -61,7 +61,7 @@ class LinearWorkflowTest(unittest.TestCase): null_functor) def testHappyPath(self): - wf = lw.Workflow("the-test-action") + wf = lw.Flow("the-test-action") for i in range(0, 10): wf.add(self.makeRevertingTask(i)) @@ -74,7 +74,7 @@ class LinearWorkflowTest(unittest.TestCase): self.assertEquals('passed', v) def testRevertingPath(self): - wf = lw.Workflow("the-test-action") + wf = lw.Flow("the-test-action") wf.add(self.makeRevertingTask(1)) wf.add(self.makeRevertingTask(2, True)) @@ -84,7 +84,7 @@ class LinearWorkflowTest(unittest.TestCase): self.assertEquals(1, len(run_context)) def testInterruptPath(self): - wf = lw.Workflow("the-int-action") + wf = lw.Flow("the-int-action") result_storage = {} @@ -124,7 +124,7 @@ class LinearWorkflowTest(unittest.TestCase): self.assertEquals(2, len(context)) def testParentRevertingPath(self): - happy_wf = lw.Workflow("the-happy-action") + happy_wf = lw.Flow("the-happy-action") for i in range(0, 10): happy_wf.add(self.makeRevertingTask(i)) context = {} @@ -133,7 +133,7 @@ class LinearWorkflowTest(unittest.TestCase): for (_k, v) in context.items(): self.assertEquals('passed', v) - baddy_wf = lw.Workflow("the-bad-action", parents=[happy_wf]) + baddy_wf = lw.Flow("the-bad-action", parents=[happy_wf]) baddy_wf.add(self.makeRevertingTask(i + 1)) baddy_wf.add(self.makeRevertingTask(i + 2, True)) self.assertRaises(Exception, baddy_wf.run, context) diff --git a/taskflow/tests/unit/test_memory.py b/taskflow/tests/unit/test_memory.py index b936cb02..a7b88d73 100644 --- a/taskflow/tests/unit/test_memory.py +++ b/taskflow/tests/unit/test_memory.py @@ -32,7 +32,7 @@ from taskflow import task from taskflow import wrappers as wrap from taskflow.backends import memory -from taskflow.patterns import linear_workflow as lw +from taskflow.patterns import linear_flow as lw def null_functor(*args, **kwargs): @@ -94,8 +94,8 @@ class MemoryBackendTest(unittest.TestCase): for j in my_jobs: j.state = states.PENDING for j in my_jobs: - # Create some dummy workflow for the job - wf = lw.Workflow('dummy') + # Create some dummy flow for the job + wf = lw.Flow('dummy') for i in range(0, 5): t = wrap.FunctorTask(None, null_functor, null_functor) @@ -142,7 +142,7 @@ class MemoryBackendTest(unittest.TestCase): self.assertEquals(states.CLAIMED, j.state) self.assertEquals('me', j.owner) - wf = lw.Workflow("the-int-action") + wf = lw.Flow("the-int-action") j.associate(wf) self.assertEquals(states.PENDING, wf.state) @@ -168,7 +168,7 @@ class MemoryBackendTest(unittest.TestCase): wf.run(j.context) self.assertEquals(1, len(j.logbook)) - self.assertEquals(4, len(j.logbook.fetch_workflow("the-int-action"))) + self.assertEquals(4, len(j.logbook["the-int-action"])) self.assertEquals(1, len(call_log)) wf.reset() @@ -176,7 +176,7 @@ class MemoryBackendTest(unittest.TestCase): wf.run(j.context) self.assertEquals(1, len(j.logbook)) - self.assertEquals(6, len(j.logbook.fetch_workflow("the-int-action"))) + self.assertEquals(6, len(j.logbook["the-int-action"])) self.assertEquals(2, len(call_log)) self.assertEquals(states.SUCCESS, wf.state) @@ -190,7 +190,7 @@ class MemoryBackendTest(unittest.TestCase): self.assertEquals(states.CLAIMED, j.state) self.assertEquals('me', j.owner) - wf = lw.Workflow('the-line-action') + wf = lw.Flow('the-line-action') self.assertEquals(states.PENDING, wf.state) j.associate(wf) @@ -207,7 +207,7 @@ class MemoryBackendTest(unittest.TestCase): wf.run(j.context) self.assertEquals(1, len(j.logbook)) - self.assertEquals(4, len(j.logbook.fetch_workflow("the-line-action"))) + self.assertEquals(4, len(j.logbook["the-line-action"])) self.assertEquals(2, len(call_log)) self.assertEquals(states.SUCCESS, wf.state)