diff --git a/taskflow/job.py b/taskflow/job.py index 91873409..cb1b6aee 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -26,14 +26,28 @@ from taskflow.openstack.common import uuidutils def task_and_state(task, state): + name_pieces = [] try: - name = task.name + name_pieces.append(task.name) + if isinstance(task.version, (list, tuple)): + name_pieces.append(utils.join(task.version, ".")) + else: + name_pieces.append(task.version) except AttributeError: - try: - name = task.__name__ - except AttributeError: - name = str(task) - return "%s:%s" % (name, state) + pass + if not name_pieces: + # Likely a function and not a task object so let us search for these + # attributes to get a good name for this task. + name_pieces = [a for a in utils.get_many_attr(task, + '__module__', + '__name__', + '__version__') + if a is not None] + if not name_pieces: + # Ok, unsure what this task is, just use whatever its string + # representation is. + name_pieces.append(task) + return "%s;%s" % (utils.join(name_pieces, ':'), state) class Claimer(object): diff --git a/taskflow/task.py b/taskflow/task.py index 51601101..32d99231 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -18,6 +18,8 @@ import abc +from taskflow import utils + class Task(object): """An abstraction that defines a potential piece of work that can be @@ -33,9 +35,14 @@ class Task(object): # An *immutable* output 'resource' name set this task # produces that other tasks may depend on this task providing. self.provides = set() + # This identifies the version of the task to be ran which + # can be useful in resuming older versions of tasks. Standard + # major, minor version semantics apply. + self.version = (1, 0) def __str__(self): - return "Task: %s" % (self.name) + return "Task: %s v%s" % (self.name, utils.join(self.version, + with_what=".")) @abc.abstractmethod def __call__(self, context, *args, **kwargs): diff --git a/taskflow/utils.py b/taskflow/utils.py index 730bc817..6bc2c5e0 100644 --- a/taskflow/utils.py +++ b/taskflow/utils.py @@ -24,6 +24,18 @@ import time LOG = logging.getLogger(__name__) +def join(itr, with_what=","): + pieces = [str(i) for i in itr] + return with_what.join(pieces) + + +def get_many_attr(obj, *attrs): + many = [] + for a in attrs: + many.append(getattr(obj, a, None)) + return many + + def await(check_functor, timeout=None): if timeout is not None: end_time = time.time() + max(0, timeout)