Merge "Unify creation/usage of uuids."
This commit is contained in:
@@ -56,7 +56,7 @@ class Job(object):
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
def __init__(self, name, context, catalog, claimer, jid=None):
|
||||
def __init__(self, name, context, catalog, claimer, uuid=None):
|
||||
self.name = name
|
||||
self.context = context
|
||||
self.owner = None
|
||||
@@ -64,14 +64,17 @@ class Job(object):
|
||||
self._catalog = catalog
|
||||
self._claimer = claimer
|
||||
self._logbook = None
|
||||
if not jid:
|
||||
if not uuid:
|
||||
self._id = uuidutils.generate_uuid()
|
||||
else:
|
||||
self._id = str(jid)
|
||||
self._id = str(uuid)
|
||||
self._state = states.UNCLAIMED
|
||||
|
||||
def __str__(self):
|
||||
return "Job (%s, %s): %s" % (self.name, self.tracking_id, self.state)
|
||||
lines = ['Job: %s' % (self.name)]
|
||||
lines.append("%s" % (self.uuid))
|
||||
lines.append("%s" % (self.state))
|
||||
return "; ".join(lines)
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
@@ -143,7 +146,7 @@ class Job(object):
|
||||
return utils.await(check_functor, timeout)
|
||||
|
||||
@property
|
||||
def tracking_id(self):
|
||||
def uuid(self):
|
||||
"""Returns a tracking *unique* identifier that can be used to identify
|
||||
this job among other jobs."""
|
||||
return "j-%s-%s" % (self.name, self._id)
|
||||
return "j-%s" % (self._id)
|
||||
|
||||
@@ -19,6 +19,8 @@
|
||||
import abc
|
||||
import threading
|
||||
|
||||
from taskflow.openstack.common import uuidutils
|
||||
|
||||
from taskflow import decorators
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import states
|
||||
@@ -47,14 +49,17 @@ class Flow(object):
|
||||
states.PENDING,
|
||||
])
|
||||
|
||||
def __init__(self, name, parents=None):
|
||||
self.name = name
|
||||
def __init__(self, name, parents=None, uuid=None):
|
||||
self._name = str(name)
|
||||
# The state of this flow.
|
||||
self._state = states.PENDING
|
||||
# If this flow has a parent flow/s which need to be reverted if
|
||||
# this flow fails then please include them here to allow this child
|
||||
# to call the parents...
|
||||
self.parents = parents
|
||||
if parents:
|
||||
self.parents = list(parents)
|
||||
else:
|
||||
self.parents = []
|
||||
# Any objects that want to listen when a wf/task starts/stops/completes
|
||||
# or errors should be registered here. This can be used to monitor
|
||||
# progress and record tasks finishing (so that it becomes possible to
|
||||
@@ -65,6 +70,19 @@ class Flow(object):
|
||||
# Ensure that modifications and/or multiple runs aren't happening
|
||||
# at the same time in the same flow at the same time.
|
||||
self._lock = threading.RLock()
|
||||
# Assign this flow a unique identifer.
|
||||
if uuid:
|
||||
self._id = str(uuid)
|
||||
else:
|
||||
self._id = uuidutils.generate_uuid()
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
return "f-%s" % (self._id)
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
@@ -89,6 +107,8 @@ class Flow(object):
|
||||
|
||||
def __str__(self):
|
||||
lines = ["Flow: %s" % (self.name)]
|
||||
lines.append("%s" % (self.uuid))
|
||||
lines.append("%s" % (len(self.parents)))
|
||||
lines.append("%s" % (self.state))
|
||||
return "; ".join(lines)
|
||||
|
||||
|
||||
@@ -36,8 +36,8 @@ class Flow(linear_flow.Flow):
|
||||
a linear topological ordering (and reverse using the same linear
|
||||
topological order)"""
|
||||
|
||||
def __init__(self, name, parents=None):
|
||||
super(Flow, self).__init__(name, parents)
|
||||
def __init__(self, name, parents=None, uuid=None):
|
||||
super(Flow, self).__init__(name, parents, uuid)
|
||||
self._graph = digraph.DiGraph()
|
||||
|
||||
@decorators.locked
|
||||
@@ -58,8 +58,10 @@ class Flow(linear_flow.Flow):
|
||||
|
||||
def __str__(self):
|
||||
lines = ["GraphFlow: %s" % (self.name)]
|
||||
lines.append("%s" % (self.uuid))
|
||||
lines.append("%s" % (self._graph.number_of_nodes()))
|
||||
lines.append("%s" % (self._graph.number_of_edges()))
|
||||
lines.append("%s" % (len(self.parents)))
|
||||
lines.append("%s" % (self.state))
|
||||
return "; ".join(lines)
|
||||
|
||||
|
||||
@@ -40,8 +40,8 @@ class Flow(base.Flow):
|
||||
Note(harlowja): Each task in the chain must have requirements
|
||||
which are satisfied by the previous task/s in the chain."""
|
||||
|
||||
def __init__(self, name, parents=None):
|
||||
super(Flow, self).__init__(name, parents)
|
||||
def __init__(self, name, parents=None, uuid=None):
|
||||
super(Flow, self).__init__(name, parents, uuid)
|
||||
# The tasks which have been applied will be collected here so that they
|
||||
# can be reverted in the correct order on failure.
|
||||
self._accumulator = utils.RollbackAccumulator()
|
||||
@@ -101,7 +101,9 @@ class Flow(base.Flow):
|
||||
|
||||
def __str__(self):
|
||||
lines = ["LinearFlow: %s" % (self.name)]
|
||||
lines.append("%s" % (self.uuid))
|
||||
lines.append("%s" % (len(self._runners)))
|
||||
lines.append("%s" % (len(self.parents)))
|
||||
lines.append("%s" % (self.state))
|
||||
return "; ".join(lines)
|
||||
|
||||
@@ -279,7 +281,6 @@ class Flow(base.Flow):
|
||||
self._accumulator.rollback(cause)
|
||||
finally:
|
||||
self._change_state(context, states.FAILURE)
|
||||
if self.parents:
|
||||
# Rollback any parents flows if they exist...
|
||||
for p in self.parents:
|
||||
p.rollback(context, cause)
|
||||
# Rollback any parents flows if they exist...
|
||||
for p in self.parents:
|
||||
p.rollback(context, cause)
|
||||
|
||||
@@ -40,10 +40,8 @@ class Resumption(object):
|
||||
flow = details['flow']
|
||||
LOG.debug("Recording %s of %s has finished state %s",
|
||||
runner, flow, state)
|
||||
# TODO(harlowja): switch to using uuids
|
||||
flow_id = flow.name
|
||||
metadata = {}
|
||||
flow_details = self._logbook[flow_id]
|
||||
flow_details = self._logbook[flow.uuid]
|
||||
if state in (states.SUCCESS, states.FAILURE):
|
||||
metadata['result'] = runner.result
|
||||
if runner.uuid not in flow_details:
|
||||
@@ -74,11 +72,9 @@ class Resumption(object):
|
||||
old_state = details['old_state']
|
||||
LOG.debug("%s has transitioned from %s to %s", flow, old_state,
|
||||
state)
|
||||
# TODO(harlowja): switch to using uuids
|
||||
flow_id = flow.name
|
||||
if flow_id in self._logbook:
|
||||
if flow.uuid in self._logbook:
|
||||
return
|
||||
self._logbook.add_flow(flow_id)
|
||||
self._logbook.add_flow(flow.uuid)
|
||||
|
||||
flow.task_notifier.register('*', _task_listener)
|
||||
flow.notifier.register('*', _workflow_listener)
|
||||
@@ -119,8 +115,7 @@ class Resumption(object):
|
||||
has already completed (or errored) and the second which has not
|
||||
completed or errored."""
|
||||
|
||||
# TODO(harlowja): switch to using uuids
|
||||
flow_id = flow.name
|
||||
flow_id = flow.uuid
|
||||
if flow_id not in self._logbook:
|
||||
LOG.debug("No record of %s", flow)
|
||||
return ([], ordering)
|
||||
|
||||
@@ -151,7 +151,7 @@ class MemoryBackendTest(unittest2.TestCase):
|
||||
wf.run(j.context)
|
||||
|
||||
self.assertEquals(1, len(j.logbook))
|
||||
self.assertEquals(2, len(j.logbook["the-int-action"]))
|
||||
self.assertEquals(2, len(j.logbook[wf.uuid]))
|
||||
self.assertEquals(1, len(call_log))
|
||||
|
||||
wf.reset()
|
||||
@@ -161,7 +161,7 @@ class MemoryBackendTest(unittest2.TestCase):
|
||||
wf.run(j.context)
|
||||
|
||||
self.assertEquals(1, len(j.logbook))
|
||||
self.assertEquals(3, len(j.logbook["the-int-action"]))
|
||||
self.assertEquals(3, len(j.logbook[wf.uuid]))
|
||||
self.assertEquals(2, len(call_log))
|
||||
self.assertEquals(states.SUCCESS, wf.state)
|
||||
|
||||
@@ -196,7 +196,7 @@ class MemoryBackendTest(unittest2.TestCase):
|
||||
wf.run(j.context)
|
||||
|
||||
self.assertEquals(1, len(j.logbook))
|
||||
self.assertEquals(2, len(j.logbook["the-line-action"]))
|
||||
self.assertEquals(2, len(j.logbook[wf.uuid]))
|
||||
self.assertEquals(2, len(call_log))
|
||||
self.assertEquals(states.SUCCESS, wf.state)
|
||||
|
||||
|
||||
@@ -192,13 +192,20 @@ class Runner(object):
|
||||
that???
|
||||
"""
|
||||
|
||||
def __init__(self, task):
|
||||
def __init__(self, task, uuid=None):
|
||||
assert isinstance(task, collections.Callable)
|
||||
self.task = task
|
||||
self.providers = {}
|
||||
self.uuid = uuidutils.generate_uuid()
|
||||
self.runs_before = []
|
||||
self.result = None
|
||||
if not uuid:
|
||||
self._id = uuidutils.generate_uuid()
|
||||
else:
|
||||
self._id = str(uuid)
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
return "r-%s" % (self._id)
|
||||
|
||||
@property
|
||||
def version(self):
|
||||
@@ -212,7 +219,10 @@ class Runner(object):
|
||||
self.result = None
|
||||
|
||||
def __str__(self):
|
||||
return "Runner %s: %s; %s" % (self.name, self.uuid, self.version)
|
||||
lines = ["Runner: %s" % (self.name)]
|
||||
lines.append("%s" % (self.uuid))
|
||||
lines.append("%s" % (self.version))
|
||||
return "; ".join(lines)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
# Find all of our inputs first.
|
||||
|
||||
Reference in New Issue
Block a user