diff --git a/taskflow/decorators.py b/taskflow/decorators.py index fb0b7903..2739edc2 100644 --- a/taskflow/decorators.py +++ b/taskflow/decorators.py @@ -47,16 +47,78 @@ def wraps(fn): return wrapper +def task(*args, **kwargs): + """Decorates a given function and ensures that all needed attributes of + that function are set so that the function can be used as a task.""" + + def decorator(f): + + def noop(*args, **kwargs): + pass + + f.revert = kwargs.pop('revert_with', noop) + + # Sets the version of the task. + version = kwargs.pop('version', (1, 0)) + f = versionize(*version)(f) + + # Attach any requirements this function needs for running. + requires_what = kwargs.pop('requires', []) + f = requires(*requires_what, **kwargs)(f) + + # Attach any items this function provides as output + provides_what = kwargs.pop('provides', []) + f = provides(*provides_what, **kwargs)(f) + + # Associate a name of this task that is the module + function name. + f.name = "%s.%s" % (f.__module__, f.__name__) + + @wraps(f) + def wrapper(*args, **kwargs): + return f(*args, **kwargs) + + return wrapper + + # This is needed to handle when the decorator has args or the decorator + # doesn't have args, python is rather weird here... + if kwargs or not args: + return decorator + else: + if isinstance(args[0], collections.Callable): + return decorator(args[0]) + else: + return decorator + + +def versionize(major, minor=None): + """A decorator that marks the wrapped function with a major & minor version + number.""" + + if minor is None: + minor = 0 + + def decorator(f): + f.__version__ = (major, minor) + + @wraps(f) + def wrapper(*args, **kwargs): + return f(*args, **kwargs) + + return wrapper + + return decorator + + def requires(*args, **kwargs): + """Attaches a set of items that the decorated function requires as input + to the functions underlying dictionary.""" def decorator(f): if not hasattr(f, 'requires'): f.requires = set() if kwargs.pop('auto_extract', True): - inspect_what = f - if hasattr(f, '__wrapped__'): - inspect_what = f.__wrapped__ + inspect_what = getattr(f, '__wrapped__', f) f_args = inspect.getargspec(inspect_what).args f.requires.update([a for a in f_args if _take_arg(a)]) @@ -80,6 +142,8 @@ def requires(*args, **kwargs): def provides(*args, **kwargs): + """Attaches a set of items that the decorated function provides as output + to the functions underlying dictionary.""" def decorator(f): if not hasattr(f, 'provides'): diff --git a/taskflow/job.py b/taskflow/job.py index 467b0db5..5703fd3b 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -17,6 +17,7 @@ # under the License. import abc +import types from taskflow import exceptions as exc from taskflow import states @@ -26,28 +27,29 @@ from taskflow.openstack.common import uuidutils def task_and_state(task, state): - name_pieces = [] - try: - name_pieces.append(task.name) - if isinstance(task.version, (list, tuple)): - name_pieces.append(utils.join(task.version, ".")) + """Combines a task objects string representation with a state to + create a uniquely identifying task+state name.""" + + task_name = "" + if isinstance(task, types.FunctionType): + # If its a function look for the attributes that should have been + # set using the task() decorator provided in the decorators file. If + # those have not been set, then we should at least have enough basic + # information (not a version) to form a useful task name. + if hasattr(task, 'name'): + task_name = str(task.name) else: - name_pieces.append(task.version) - except AttributeError: - pass - if not name_pieces: - # Likely a function and not a task object so let us search for these - # attributes to get a good name for this task. - name_pieces = [a for a in utils.get_many_attr(task, - '__module__', - '__name__', - '__version__') - if a is not None] - if not name_pieces: - # Ok, unsure what this task is, just use whatever its string - # representation is. - name_pieces.append(task) - return "%s;%s" % (utils.join(name_pieces, ':'), state) + name_pieces = [a for a in utils.get_many_attr(task, + '__module__', + '__name__') + if a is not None] + task_name = utils.join(name_pieces, ".") + version_pieces = utils.get_many_attr(task, '__version__') + if version_pieces and version_pieces[0]: + task_name += "==" + utils.join(version_pieces[0], with_what=".") + else: + task_name = str(task) + return "%s;%s" % (task_name, state) class Claimer(object): diff --git a/taskflow/task.py b/taskflow/task.py index 32d99231..2ad11735 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -41,8 +41,7 @@ class Task(object): self.version = (1, 0) def __str__(self): - return "Task: %s v%s" % (self.name, utils.join(self.version, - with_what=".")) + return "%s==%s" % (self.name, utils.join(self.version, with_what=".")) @abc.abstractmethod def __call__(self, context, *args, **kwargs): diff --git a/taskflow/tests/unit/test_graph_flow.py b/taskflow/tests/unit/test_graph_flow.py index 13cc6265..24be03af 100644 --- a/taskflow/tests/unit/test_graph_flow.py +++ b/taskflow/tests/unit/test_graph_flow.py @@ -19,9 +19,9 @@ import collections import unittest +from taskflow import decorators from taskflow import exceptions as excp from taskflow import states -from taskflow import wrappers from taskflow.patterns import graph_flow as gw from taskflow.tests import utils @@ -32,25 +32,23 @@ class GraphFlowTest(unittest.TestCase): flo = gw.Flow("test-flow") reverted = [] - def run1(context): # pylint: disable=W0613 - return { - 'a': 1, - } - def run1_revert(context, result, cause): # pylint: disable=W0613 reverted.append('run1') self.assertEquals(states.REVERTING, cause.flow.state) self.assertEquals(result, {'a': 1}) + @decorators.task(revert_with=run1_revert, provides=['a']) + def run1(context): # pylint: disable=W0613 + return { + 'a': 1, + } + + @decorators.task(provides=['c']) def run2(context, a): # pylint: disable=W0613,C0103 raise Exception('Dead') - flo.add(wrappers.FunctorTask(None, run1, run1_revert, - provides_what=['a'], - extract_requires=True)) - flo.add(wrappers.FunctorTask(None, run2, utils.null_functor, - provides_what=['c'], - extract_requires=True)) + flo.add(run1) + flo.add(run2) self.assertEquals(states.PENDING, flo.state) self.assertRaises(Exception, flo.run, {}) @@ -147,20 +145,19 @@ class GraphFlowTest(unittest.TestCase): def test_connect_requirement_failure(self): + @decorators.task(provides=['a']) def run1(context): # pylint: disable=W0613 return { 'a': 1, } + @decorators.task def run2(context, b, c, d): # pylint: disable=W0613,C0103 return None flo = gw.Flow("test-flow") - flo.add(wrappers.FunctorTask(None, run1, utils.null_functor, - provides_what=['a'], - extract_requires=True)) - flo.add(wrappers.FunctorTask(None, run2, utils.null_functor, - extract_requires=True)) + flo.add(run1) + flo.add(run2) self.assertRaises(excp.InvalidStateException, flo.connect) self.assertRaises(excp.InvalidStateException, flo.run, {}) @@ -172,40 +169,37 @@ class GraphFlowTest(unittest.TestCase): run_order = [] f_args = {} + @decorators.task(provides=['a']) def run1(context): # pylint: disable=W0613,C0103 run_order.append('ran1') return { 'a': 1, } + @decorators.task(provides=['c']) def run2(context, a): # pylint: disable=W0613,C0103 run_order.append('ran2') return { 'c': 3, } + @decorators.task(provides=['b']) def run3(context, a): # pylint: disable=W0613,C0103 run_order.append('ran3') return { 'b': 2, } + @decorators.task def run4(context, b, c): # pylint: disable=W0613,C0103 run_order.append('ran4') f_args['b'] = b f_args['c'] = c - flo.add(wrappers.FunctorTask(None, run1, utils.null_functor, - provides_what=['a'], - extract_requires=True)) - flo.add(wrappers.FunctorTask(None, run2, utils.null_functor, - provides_what=['c'], - extract_requires=True)) - flo.add(wrappers.FunctorTask(None, run3, utils.null_functor, - provides_what=['b'], - extract_requires=True)) - flo.add(wrappers.FunctorTask(None, run4, utils.null_functor, - extract_requires=True)) + flo.add(run1) + flo.add(run2) + flo.add(run3) + flo.add(run4) flo.run({}) self.assertEquals(['ran1', 'ran2', 'ran3', 'ran4'], sorted(run_order)) diff --git a/taskflow/tests/unit/test_linear_flow.py b/taskflow/tests/unit/test_linear_flow.py index 115ad5c5..2646448c 100644 --- a/taskflow/tests/unit/test_linear_flow.py +++ b/taskflow/tests/unit/test_linear_flow.py @@ -22,7 +22,6 @@ import unittest from taskflow import decorators from taskflow import exceptions as exc from taskflow import states -from taskflow import wrappers from taskflow.patterns import linear_flow as lw from taskflow.tests import utils @@ -31,32 +30,33 @@ from taskflow.tests import utils class LinearFlowTest(unittest.TestCase): def make_reverting_task(self, token, blowup=False): - def do_apply(token, context, *_args, **_kwargs): - context[token] = 'passed' - - def do_revert(token, context, *_args, **_kwargs): + def do_revert(context, *args, **kwargs): context[token] = 'reverted' - def blow_up(_context, *_args, **_kwargs): + @decorators.task(revert_with=do_revert) + def do_apply(context, *args, **kwargs): + context[token] = 'passed' + + @decorators.task + def blow_up(context, *args, **kwargs): raise Exception("I blew up") if blowup: - return wrappers.FunctorTask('task-%s' % (token), - functools.partial(blow_up, token), - utils.null_functor) + # Alter the task name so that its unique by including the token. + blow_up.name += str(token) + return blow_up else: - return wrappers.FunctorTask('task-%s' % (token), - functools.partial(do_apply, token), - functools.partial(do_revert, token)) + # Alter the task name so that its unique by including the token. + do_apply.name += str(token) + return do_apply def make_interrupt_task(self, token, wf): - def do_interrupt(_context, *_args, **_kwargs): + @decorators.task + def do_interrupt(context, *args, **kwargs): wf.interrupt() - return wrappers.FunctorTask('task-%s' % (token), - do_interrupt, - utils.null_functor) + return do_interrupt def test_functor_flow(self): wf = lw.Flow("the-test-action") @@ -148,31 +148,27 @@ class LinearFlowTest(unittest.TestCase): def test_not_satisfied_inputs_previous(self): wf = lw.Flow("the-test-action") + @decorators.task def task_a(context, *args, **kwargs): pass + @decorators.task def task_b(context, c, *args, **kwargs): pass - wf.add(wrappers.FunctorTask(None, task_a, utils.null_functor, - extract_requires=True)) + wf.add(task_a) self.assertRaises(exc.InvalidStateException, - wf.add, - wrappers.FunctorTask(None, task_b, - utils.null_functor, - extract_requires=True)) + wf.add, task_b) def test_not_satisfied_inputs_no_previous(self): wf = lw.Flow("the-test-action") + @decorators.task def task_a(context, c, *args, **kwargs): pass self.assertRaises(exc.InvalidStateException, - wf.add, - wrappers.FunctorTask(None, task_a, - utils.null_functor, - extract_requires=True)) + wf.add, task_a) def test_flow_add_order(self): wf = lw.Flow("the-test-action") diff --git a/taskflow/tests/unit/test_memory.py b/taskflow/tests/unit/test_memory.py index 00162c20..f9e17e84 100644 --- a/taskflow/tests/unit/test_memory.py +++ b/taskflow/tests/unit/test_memory.py @@ -22,10 +22,10 @@ import functools import threading import unittest +from taskflow import decorators from taskflow import exceptions as exc from taskflow import job from taskflow import states -from taskflow import wrappers as wrap from taskflow.backends import memory from taskflow.patterns import linear_flow as lw @@ -79,10 +79,7 @@ class MemoryBackendTest(unittest.TestCase): # Create some dummy flow for the job wf = lw.Flow('dummy') for _i in range(0, 5): - t = wrap.FunctorTask(None, - utils.null_functor, - utils.null_functor) - wf.add(t) + wf.add(utils.null_functor) j.associate(wf) j.state = states.RUNNING wf.run(j.context) @@ -131,18 +128,21 @@ class MemoryBackendTest(unittest.TestCase): call_log = [] - def do_1(_context, *_args, **_kwargs): + @decorators.task + def do_1(context, *args, **kwargs): call_log.append(1) - def do_2(_context, *_args, **_kwargs): + @decorators.task + def do_2(context, *args, **kwargs): call_log.append(2) - def do_interrupt(_context, *_args, **_kwargs): + @decorators.task + def do_interrupt(context, *args, **kwargs): wf.interrupt() - task_1 = wrap.FunctorTask(None, do_1, utils.null_functor) - task_1_5 = wrap.FunctorTask(None, do_interrupt, utils.null_functor) - task_2 = wrap.FunctorTask(None, do_2, utils.null_functor) + task_1 = do_1 + task_1_5 = do_interrupt + task_2 = do_2 wf.add(task_1) wf.add(task_1_5) # Interrupt it after task_1 finishes @@ -180,14 +180,16 @@ class MemoryBackendTest(unittest.TestCase): call_log = [] - def do_1(_context, *_args, **_kwargs): + @decorators.task + def do_1(context, *args, **kwargs): call_log.append(1) - def do_2(_context, *_args, **_kwargs): + @decorators.task + def do_2(context, *args, **kwargs): call_log.append(2) - wf.add(wrap.FunctorTask(None, do_1, utils.null_functor)) - wf.add(wrap.FunctorTask(None, do_2, utils.null_functor)) + wf.add(do_1) + wf.add(do_2) wf.run(j.context) self.assertEquals(1, len(j.logbook)) diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 00e3e198..2b133ea0 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -37,8 +37,8 @@ def null_functor(*args, **kwargs): # pylint: disable=W0613 class ProvidesRequiresTask(task.Task): def __init__(self, name, provides, requires): super(ProvidesRequiresTask, self).__init__(name) - self.provides = provides - self.requires = requires + self.provides.update(provides) + self.requires.update(requires) def __call__(self, context, *args, **kwargs): outs = { diff --git a/taskflow/utils.py b/taskflow/utils.py index 6bc2c5e0..dfec1f6e 100644 --- a/taskflow/utils.py +++ b/taskflow/utils.py @@ -195,4 +195,3 @@ class LazyPluggable(object): def __getattr__(self, key): backend = self.__get_backend() return getattr(backend, key) - diff --git a/taskflow/wrappers.py b/taskflow/wrappers.py deleted file mode 100644 index 01046227..00000000 --- a/taskflow/wrappers.py +++ /dev/null @@ -1,48 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from taskflow import decorators -from taskflow import task - - -class FunctorTask(task.Task): - """A simple task that can wrap two given functions and allow them to be - in combination used in apply and reverting a given task. Useful for - situations where existing functions already are in place and you just want - to wrap them up.""" - - def __init__(self, name, apply_functor, revert_functor=None, - provides_what=None, extract_requires=False): - if not name: - name = "_".join([apply_functor.__name__, revert_functor.__name__]) - super(FunctorTask, self).__init__(name) - if extract_requires: - self._apply_functor = decorators.requires(apply_functor) - self.requires.update(self._apply_functor.requires) - else: - self._apply_functor = apply_functor - self._revert_functor = revert_functor - if provides_what: - self.provides.update(provides_what) - - def __call__(self, context, *args, **kwargs): - return self._apply_functor(context, *args, **kwargs) - - def revert(self, context, result, cause): - if self._revert_functor: - self._revert_functor(context, result, cause)