diff --git a/taskflow/job.py b/taskflow/job.py index 0857b451..1c0ffce0 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -56,7 +56,7 @@ class Job(object): __metaclass__ = abc.ABCMeta - def __init__(self, name, context, catalog, claimer, jid=None): + def __init__(self, name, context, catalog, claimer, uuid=None): self.name = name self.context = context self.owner = None @@ -64,14 +64,17 @@ class Job(object): self._catalog = catalog self._claimer = claimer self._logbook = None - if not jid: + if not uuid: self._id = uuidutils.generate_uuid() else: - self._id = str(jid) + self._id = str(uuid) self._state = states.UNCLAIMED def __str__(self): - return "Job (%s, %s): %s" % (self.name, self.tracking_id, self.state) + lines = ['Job: %s' % (self.name)] + lines.append("%s" % (self.uuid)) + lines.append("%s" % (self.state)) + return "; ".join(lines) @property def state(self): @@ -143,7 +146,7 @@ class Job(object): return utils.await(check_functor, timeout) @property - def tracking_id(self): + def uuid(self): """Returns a tracking *unique* identifier that can be used to identify this job among other jobs.""" - return "j-%s-%s" % (self.name, self._id) + return "j-%s" % (self._id) diff --git a/taskflow/patterns/base.py b/taskflow/patterns/base.py index a20a7096..4060cf77 100644 --- a/taskflow/patterns/base.py +++ b/taskflow/patterns/base.py @@ -19,6 +19,8 @@ import abc import threading +from taskflow.openstack.common import uuidutils + from taskflow import decorators from taskflow import exceptions as exc from taskflow import states @@ -47,14 +49,17 @@ class Flow(object): states.PENDING, ]) - def __init__(self, name, parents=None): - self.name = name + def __init__(self, name, parents=None, uuid=None): + self._name = str(name) # The state of this flow. self._state = states.PENDING # If this flow has a parent flow/s which need to be reverted if # this flow fails then please include them here to allow this child # to call the parents... - self.parents = parents + if parents: + self.parents = list(parents) + else: + self.parents = [] # Any objects that want to listen when a wf/task starts/stops/completes # or errors should be registered here. This can be used to monitor # progress and record tasks finishing (so that it becomes possible to @@ -65,6 +70,19 @@ class Flow(object): # Ensure that modifications and/or multiple runs aren't happening # at the same time in the same flow at the same time. self._lock = threading.RLock() + # Assign this flow a unique identifer. + if uuid: + self._id = str(uuid) + else: + self._id = uuidutils.generate_uuid() + + @property + def name(self): + return self._name + + @property + def uuid(self): + return "f-%s" % (self._id) @property def state(self): @@ -89,6 +107,8 @@ class Flow(object): def __str__(self): lines = ["Flow: %s" % (self.name)] + lines.append("%s" % (self.uuid)) + lines.append("%s" % (len(self.parents))) lines.append("%s" % (self.state)) return "; ".join(lines) diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 824545ab..1dbf037c 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -36,8 +36,8 @@ class Flow(linear_flow.Flow): a linear topological ordering (and reverse using the same linear topological order)""" - def __init__(self, name, parents=None): - super(Flow, self).__init__(name, parents) + def __init__(self, name, parents=None, uuid=None): + super(Flow, self).__init__(name, parents, uuid) self._graph = digraph.DiGraph() @decorators.locked @@ -58,8 +58,10 @@ class Flow(linear_flow.Flow): def __str__(self): lines = ["GraphFlow: %s" % (self.name)] + lines.append("%s" % (self.uuid)) lines.append("%s" % (self._graph.number_of_nodes())) lines.append("%s" % (self._graph.number_of_edges())) + lines.append("%s" % (len(self.parents))) lines.append("%s" % (self.state)) return "; ".join(lines) diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 149842fb..b5aa3fd0 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -40,8 +40,8 @@ class Flow(base.Flow): Note(harlowja): Each task in the chain must have requirements which are satisfied by the previous task/s in the chain.""" - def __init__(self, name, parents=None): - super(Flow, self).__init__(name, parents) + def __init__(self, name, parents=None, uuid=None): + super(Flow, self).__init__(name, parents, uuid) # The tasks which have been applied will be collected here so that they # can be reverted in the correct order on failure. self._accumulator = utils.RollbackAccumulator() @@ -101,7 +101,9 @@ class Flow(base.Flow): def __str__(self): lines = ["LinearFlow: %s" % (self.name)] + lines.append("%s" % (self.uuid)) lines.append("%s" % (len(self._runners))) + lines.append("%s" % (len(self.parents))) lines.append("%s" % (self.state)) return "; ".join(lines) @@ -279,7 +281,6 @@ class Flow(base.Flow): self._accumulator.rollback(cause) finally: self._change_state(context, states.FAILURE) - if self.parents: - # Rollback any parents flows if they exist... - for p in self.parents: - p.rollback(context, cause) + # Rollback any parents flows if they exist... + for p in self.parents: + p.rollback(context, cause) diff --git a/taskflow/patterns/resumption/logbook.py b/taskflow/patterns/resumption/logbook.py index 604522d2..7d1c5977 100644 --- a/taskflow/patterns/resumption/logbook.py +++ b/taskflow/patterns/resumption/logbook.py @@ -40,10 +40,8 @@ class Resumption(object): flow = details['flow'] LOG.debug("Recording %s of %s has finished state %s", runner, flow, state) - # TODO(harlowja): switch to using uuids - flow_id = flow.name metadata = {} - flow_details = self._logbook[flow_id] + flow_details = self._logbook[flow.uuid] if state in (states.SUCCESS, states.FAILURE): metadata['result'] = runner.result if runner.uuid not in flow_details: @@ -74,11 +72,9 @@ class Resumption(object): old_state = details['old_state'] LOG.debug("%s has transitioned from %s to %s", flow, old_state, state) - # TODO(harlowja): switch to using uuids - flow_id = flow.name - if flow_id in self._logbook: + if flow.uuid in self._logbook: return - self._logbook.add_flow(flow_id) + self._logbook.add_flow(flow.uuid) flow.task_notifier.register('*', _task_listener) flow.notifier.register('*', _workflow_listener) @@ -119,8 +115,7 @@ class Resumption(object): has already completed (or errored) and the second which has not completed or errored.""" - # TODO(harlowja): switch to using uuids - flow_id = flow.name + flow_id = flow.uuid if flow_id not in self._logbook: LOG.debug("No record of %s", flow) return ([], ordering) diff --git a/taskflow/tests/unit/test_memory.py b/taskflow/tests/unit/test_memory.py index 6a1bcd5f..a1a38430 100644 --- a/taskflow/tests/unit/test_memory.py +++ b/taskflow/tests/unit/test_memory.py @@ -151,7 +151,7 @@ class MemoryBackendTest(unittest2.TestCase): wf.run(j.context) self.assertEquals(1, len(j.logbook)) - self.assertEquals(2, len(j.logbook["the-int-action"])) + self.assertEquals(2, len(j.logbook[wf.uuid])) self.assertEquals(1, len(call_log)) wf.reset() @@ -161,7 +161,7 @@ class MemoryBackendTest(unittest2.TestCase): wf.run(j.context) self.assertEquals(1, len(j.logbook)) - self.assertEquals(3, len(j.logbook["the-int-action"])) + self.assertEquals(3, len(j.logbook[wf.uuid])) self.assertEquals(2, len(call_log)) self.assertEquals(states.SUCCESS, wf.state) @@ -196,7 +196,7 @@ class MemoryBackendTest(unittest2.TestCase): wf.run(j.context) self.assertEquals(1, len(j.logbook)) - self.assertEquals(2, len(j.logbook["the-line-action"])) + self.assertEquals(2, len(j.logbook[wf.uuid])) self.assertEquals(2, len(call_log)) self.assertEquals(states.SUCCESS, wf.state) diff --git a/taskflow/utils.py b/taskflow/utils.py index 114c01ec..9b6fd5dd 100644 --- a/taskflow/utils.py +++ b/taskflow/utils.py @@ -192,13 +192,20 @@ class Runner(object): that??? """ - def __init__(self, task): + def __init__(self, task, uuid=None): assert isinstance(task, collections.Callable) self.task = task self.providers = {} - self.uuid = uuidutils.generate_uuid() self.runs_before = [] self.result = None + if not uuid: + self._id = uuidutils.generate_uuid() + else: + self._id = str(uuid) + + @property + def uuid(self): + return "r-%s" % (self._id) @property def version(self): @@ -212,7 +219,10 @@ class Runner(object): self.result = None def __str__(self): - return "Runner %s: %s; %s" % (self.name, self.uuid, self.version) + lines = ["Runner: %s" % (self.name)] + lines.append("%s" % (self.uuid)) + lines.append("%s" % (self.version)) + return "; ".join(lines) def __call__(self, *args, **kwargs): # Find all of our inputs first.