Unify creation/usage of uuids.
Instead of using the name it is likely better to give the flows a uuid and use that for tracking purposes rather than using the flow name (which maybe duplicated). Also unify the usage of the job tracking id -> uuid as well as adjust the visibility of said uuids (to be read-only). Change-Id: I592800bd9e08e3a7bde33ff250a454588324f052
This commit is contained in:
@@ -56,7 +56,7 @@ class Job(object):
|
|||||||
|
|
||||||
__metaclass__ = abc.ABCMeta
|
__metaclass__ = abc.ABCMeta
|
||||||
|
|
||||||
def __init__(self, name, context, catalog, claimer, jid=None):
|
def __init__(self, name, context, catalog, claimer, uuid=None):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.context = context
|
self.context = context
|
||||||
self.owner = None
|
self.owner = None
|
||||||
@@ -64,14 +64,17 @@ class Job(object):
|
|||||||
self._catalog = catalog
|
self._catalog = catalog
|
||||||
self._claimer = claimer
|
self._claimer = claimer
|
||||||
self._logbook = None
|
self._logbook = None
|
||||||
if not jid:
|
if not uuid:
|
||||||
self._id = uuidutils.generate_uuid()
|
self._id = uuidutils.generate_uuid()
|
||||||
else:
|
else:
|
||||||
self._id = str(jid)
|
self._id = str(uuid)
|
||||||
self._state = states.UNCLAIMED
|
self._state = states.UNCLAIMED
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "Job (%s, %s): %s" % (self.name, self.tracking_id, self.state)
|
lines = ['Job: %s' % (self.name)]
|
||||||
|
lines.append("%s" % (self.uuid))
|
||||||
|
lines.append("%s" % (self.state))
|
||||||
|
return "; ".join(lines)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self):
|
def state(self):
|
||||||
@@ -143,7 +146,7 @@ class Job(object):
|
|||||||
return utils.await(check_functor, timeout)
|
return utils.await(check_functor, timeout)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def tracking_id(self):
|
def uuid(self):
|
||||||
"""Returns a tracking *unique* identifier that can be used to identify
|
"""Returns a tracking *unique* identifier that can be used to identify
|
||||||
this job among other jobs."""
|
this job among other jobs."""
|
||||||
return "j-%s-%s" % (self.name, self._id)
|
return "j-%s" % (self._id)
|
||||||
|
|||||||
@@ -19,6 +19,8 @@
|
|||||||
import abc
|
import abc
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
|
from taskflow.openstack.common import uuidutils
|
||||||
|
|
||||||
from taskflow import decorators
|
from taskflow import decorators
|
||||||
from taskflow import exceptions as exc
|
from taskflow import exceptions as exc
|
||||||
from taskflow import states
|
from taskflow import states
|
||||||
@@ -47,14 +49,17 @@ class Flow(object):
|
|||||||
states.PENDING,
|
states.PENDING,
|
||||||
])
|
])
|
||||||
|
|
||||||
def __init__(self, name, parents=None):
|
def __init__(self, name, parents=None, uuid=None):
|
||||||
self.name = name
|
self._name = str(name)
|
||||||
# The state of this flow.
|
# The state of this flow.
|
||||||
self._state = states.PENDING
|
self._state = states.PENDING
|
||||||
# If this flow has a parent flow/s which need to be reverted if
|
# If this flow has a parent flow/s which need to be reverted if
|
||||||
# this flow fails then please include them here to allow this child
|
# this flow fails then please include them here to allow this child
|
||||||
# to call the parents...
|
# to call the parents...
|
||||||
self.parents = parents
|
if parents:
|
||||||
|
self.parents = list(parents)
|
||||||
|
else:
|
||||||
|
self.parents = []
|
||||||
# Any objects that want to listen when a wf/task starts/stops/completes
|
# Any objects that want to listen when a wf/task starts/stops/completes
|
||||||
# or errors should be registered here. This can be used to monitor
|
# or errors should be registered here. This can be used to monitor
|
||||||
# progress and record tasks finishing (so that it becomes possible to
|
# progress and record tasks finishing (so that it becomes possible to
|
||||||
@@ -65,6 +70,19 @@ class Flow(object):
|
|||||||
# Ensure that modifications and/or multiple runs aren't happening
|
# Ensure that modifications and/or multiple runs aren't happening
|
||||||
# at the same time in the same flow at the same time.
|
# at the same time in the same flow at the same time.
|
||||||
self._lock = threading.RLock()
|
self._lock = threading.RLock()
|
||||||
|
# Assign this flow a unique identifer.
|
||||||
|
if uuid:
|
||||||
|
self._id = str(uuid)
|
||||||
|
else:
|
||||||
|
self._id = uuidutils.generate_uuid()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
return self._name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def uuid(self):
|
||||||
|
return "f-%s" % (self._id)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self):
|
def state(self):
|
||||||
@@ -89,6 +107,8 @@ class Flow(object):
|
|||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
lines = ["Flow: %s" % (self.name)]
|
lines = ["Flow: %s" % (self.name)]
|
||||||
|
lines.append("%s" % (self.uuid))
|
||||||
|
lines.append("%s" % (len(self.parents)))
|
||||||
lines.append("%s" % (self.state))
|
lines.append("%s" % (self.state))
|
||||||
return "; ".join(lines)
|
return "; ".join(lines)
|
||||||
|
|
||||||
|
|||||||
@@ -36,8 +36,8 @@ class Flow(linear_flow.Flow):
|
|||||||
a linear topological ordering (and reverse using the same linear
|
a linear topological ordering (and reverse using the same linear
|
||||||
topological order)"""
|
topological order)"""
|
||||||
|
|
||||||
def __init__(self, name, parents=None):
|
def __init__(self, name, parents=None, uuid=None):
|
||||||
super(Flow, self).__init__(name, parents)
|
super(Flow, self).__init__(name, parents, uuid)
|
||||||
self._graph = digraph.DiGraph()
|
self._graph = digraph.DiGraph()
|
||||||
|
|
||||||
@decorators.locked
|
@decorators.locked
|
||||||
@@ -59,8 +59,10 @@ class Flow(linear_flow.Flow):
|
|||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
lines = ["GraphFlow: %s" % (self.name)]
|
lines = ["GraphFlow: %s" % (self.name)]
|
||||||
|
lines.append("%s" % (self.uuid))
|
||||||
lines.append("%s" % (self._graph.number_of_nodes()))
|
lines.append("%s" % (self._graph.number_of_nodes()))
|
||||||
lines.append("%s" % (self._graph.number_of_edges()))
|
lines.append("%s" % (self._graph.number_of_edges()))
|
||||||
|
lines.append("%s" % (len(self.parents)))
|
||||||
lines.append("%s" % (self.state))
|
lines.append("%s" % (self.state))
|
||||||
return "; ".join(lines)
|
return "; ".join(lines)
|
||||||
|
|
||||||
|
|||||||
@@ -40,8 +40,8 @@ class Flow(base.Flow):
|
|||||||
Note(harlowja): Each task in the chain must have requirements
|
Note(harlowja): Each task in the chain must have requirements
|
||||||
which are satisfied by the previous task/s in the chain."""
|
which are satisfied by the previous task/s in the chain."""
|
||||||
|
|
||||||
def __init__(self, name, parents=None):
|
def __init__(self, name, parents=None, uuid=None):
|
||||||
super(Flow, self).__init__(name, parents)
|
super(Flow, self).__init__(name, parents, uuid)
|
||||||
# The tasks which have been applied will be collected here so that they
|
# The tasks which have been applied will be collected here so that they
|
||||||
# can be reverted in the correct order on failure.
|
# can be reverted in the correct order on failure.
|
||||||
self._accumulator = utils.RollbackAccumulator()
|
self._accumulator = utils.RollbackAccumulator()
|
||||||
@@ -98,7 +98,9 @@ class Flow(base.Flow):
|
|||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
lines = ["LinearFlow: %s" % (self.name)]
|
lines = ["LinearFlow: %s" % (self.name)]
|
||||||
|
lines.append("%s" % (self.uuid))
|
||||||
lines.append("%s" % (len(self._runners)))
|
lines.append("%s" % (len(self._runners)))
|
||||||
|
lines.append("%s" % (len(self.parents)))
|
||||||
lines.append("%s" % (self.state))
|
lines.append("%s" % (self.state))
|
||||||
return "; ".join(lines)
|
return "; ".join(lines)
|
||||||
|
|
||||||
@@ -279,7 +281,6 @@ class Flow(base.Flow):
|
|||||||
self._accumulator.rollback(cause)
|
self._accumulator.rollback(cause)
|
||||||
finally:
|
finally:
|
||||||
self._change_state(context, states.FAILURE)
|
self._change_state(context, states.FAILURE)
|
||||||
if self.parents:
|
# Rollback any parents flows if they exist...
|
||||||
# Rollback any parents flows if they exist...
|
for p in self.parents:
|
||||||
for p in self.parents:
|
p.rollback(context, cause)
|
||||||
p.rollback(context, cause)
|
|
||||||
|
|||||||
@@ -40,10 +40,8 @@ class Resumption(object):
|
|||||||
flow = details['flow']
|
flow = details['flow']
|
||||||
LOG.debug("Recording %s of %s has finished state %s",
|
LOG.debug("Recording %s of %s has finished state %s",
|
||||||
runner, flow, state)
|
runner, flow, state)
|
||||||
# TODO(harlowja): switch to using uuids
|
|
||||||
flow_id = flow.name
|
|
||||||
metadata = {}
|
metadata = {}
|
||||||
flow_details = self._logbook[flow_id]
|
flow_details = self._logbook[flow.uuid]
|
||||||
if state in (states.SUCCESS, states.FAILURE):
|
if state in (states.SUCCESS, states.FAILURE):
|
||||||
metadata['result'] = runner.result
|
metadata['result'] = runner.result
|
||||||
if runner.uuid not in flow_details:
|
if runner.uuid not in flow_details:
|
||||||
@@ -74,11 +72,9 @@ class Resumption(object):
|
|||||||
old_state = details['old_state']
|
old_state = details['old_state']
|
||||||
LOG.debug("%s has transitioned from %s to %s", flow, old_state,
|
LOG.debug("%s has transitioned from %s to %s", flow, old_state,
|
||||||
state)
|
state)
|
||||||
# TODO(harlowja): switch to using uuids
|
if flow.uuid in self._logbook:
|
||||||
flow_id = flow.name
|
|
||||||
if flow_id in self._logbook:
|
|
||||||
return
|
return
|
||||||
self._logbook.add_flow(flow_id)
|
self._logbook.add_flow(flow.uuid)
|
||||||
|
|
||||||
flow.task_notifier.register('*', _task_listener)
|
flow.task_notifier.register('*', _task_listener)
|
||||||
flow.notifier.register('*', _workflow_listener)
|
flow.notifier.register('*', _workflow_listener)
|
||||||
@@ -119,8 +115,7 @@ class Resumption(object):
|
|||||||
has already completed (or errored) and the second which has not
|
has already completed (or errored) and the second which has not
|
||||||
completed or errored."""
|
completed or errored."""
|
||||||
|
|
||||||
# TODO(harlowja): switch to using uuids
|
flow_id = flow.uuid
|
||||||
flow_id = flow.name
|
|
||||||
if flow_id not in self._logbook:
|
if flow_id not in self._logbook:
|
||||||
LOG.debug("No record of %s", flow)
|
LOG.debug("No record of %s", flow)
|
||||||
return ([], ordering)
|
return ([], ordering)
|
||||||
|
|||||||
@@ -151,7 +151,7 @@ class MemoryBackendTest(unittest2.TestCase):
|
|||||||
wf.run(j.context)
|
wf.run(j.context)
|
||||||
|
|
||||||
self.assertEquals(1, len(j.logbook))
|
self.assertEquals(1, len(j.logbook))
|
||||||
self.assertEquals(2, len(j.logbook["the-int-action"]))
|
self.assertEquals(2, len(j.logbook[wf.uuid]))
|
||||||
self.assertEquals(1, len(call_log))
|
self.assertEquals(1, len(call_log))
|
||||||
|
|
||||||
wf.reset()
|
wf.reset()
|
||||||
@@ -161,7 +161,7 @@ class MemoryBackendTest(unittest2.TestCase):
|
|||||||
wf.run(j.context)
|
wf.run(j.context)
|
||||||
|
|
||||||
self.assertEquals(1, len(j.logbook))
|
self.assertEquals(1, len(j.logbook))
|
||||||
self.assertEquals(3, len(j.logbook["the-int-action"]))
|
self.assertEquals(3, len(j.logbook[wf.uuid]))
|
||||||
self.assertEquals(2, len(call_log))
|
self.assertEquals(2, len(call_log))
|
||||||
self.assertEquals(states.SUCCESS, wf.state)
|
self.assertEquals(states.SUCCESS, wf.state)
|
||||||
|
|
||||||
@@ -196,7 +196,7 @@ class MemoryBackendTest(unittest2.TestCase):
|
|||||||
wf.run(j.context)
|
wf.run(j.context)
|
||||||
|
|
||||||
self.assertEquals(1, len(j.logbook))
|
self.assertEquals(1, len(j.logbook))
|
||||||
self.assertEquals(2, len(j.logbook["the-line-action"]))
|
self.assertEquals(2, len(j.logbook[wf.uuid]))
|
||||||
self.assertEquals(2, len(call_log))
|
self.assertEquals(2, len(call_log))
|
||||||
self.assertEquals(states.SUCCESS, wf.state)
|
self.assertEquals(states.SUCCESS, wf.state)
|
||||||
|
|
||||||
|
|||||||
@@ -192,13 +192,20 @@ class Runner(object):
|
|||||||
that???
|
that???
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, task):
|
def __init__(self, task, uuid=None):
|
||||||
assert isinstance(task, collections.Callable)
|
assert isinstance(task, collections.Callable)
|
||||||
self.task = task
|
self.task = task
|
||||||
self.providers = {}
|
self.providers = {}
|
||||||
self.uuid = uuidutils.generate_uuid()
|
|
||||||
self.runs_before = []
|
self.runs_before = []
|
||||||
self.result = None
|
self.result = None
|
||||||
|
if not uuid:
|
||||||
|
self._id = uuidutils.generate_uuid()
|
||||||
|
else:
|
||||||
|
self._id = str(uuid)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def uuid(self):
|
||||||
|
return "r-%s" % (self._id)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def version(self):
|
def version(self):
|
||||||
@@ -212,7 +219,10 @@ class Runner(object):
|
|||||||
self.result = None
|
self.result = None
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "Runner %s: %s; %s" % (self.name, self.uuid, self.version)
|
lines = ["Runner: %s" % (self.name)]
|
||||||
|
lines.append("%s" % (self.uuid))
|
||||||
|
lines.append("%s" % (self.version))
|
||||||
|
return "; ".join(lines)
|
||||||
|
|
||||||
def __call__(self, *args, **kwargs):
|
def __call__(self, *args, **kwargs):
|
||||||
# Find all of our inputs first.
|
# Find all of our inputs first.
|
||||||
|
|||||||
Reference in New Issue
Block a user