Task decorator was removed and examples updated

Change-Id: Ie49fe6c2f48a18130d1fd2a3aa5485cd8cee4ed4
This commit is contained in:
Anastasia Karpinska
2013-09-20 12:54:15 +03:00
committed by Ivan A. Melnikov
parent eea2b91030
commit a333c48523
11 changed files with 120 additions and 341 deletions

View File

@@ -18,8 +18,6 @@
import functools
from taskflow import task as base
from taskflow import utils
from taskflow.utils import threading_utils
@@ -60,43 +58,3 @@ def locked(*args, **kwargs):
return decorator(args[0])
else:
return decorator
def _original_function(fun):
"""Get original function from static or class method"""
if isinstance(fun, staticmethod):
return fun.__get__(object())
elif isinstance(fun, classmethod):
return fun.__get__(object()).im_func
return fun
def task(*args, **kwargs):
"""Decorates a given function so that it can be used as a task"""
def decorator(f):
def task_factory(execute, **factory_kwargs):
merged = kwargs.copy()
merged.update(factory_kwargs)
# NOTE(imelnikov): we can't capture f here because for
# bound methods and bound class methods the object it
# is bound to is yet unknown at the moment
#
# See: http://bit.ly/15Cfbjh
return base.FunctorTask(execute, **merged)
w_f = _original_function(f)
setattr(w_f, utils.TASK_FACTORY_ATTRIBUTE, task_factory)
return f
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs:
if args:
raise TypeError('task decorator takes 0 positional arguments,'
'%s given' % len(args))
return decorator
else:
if len(args) == 1:
return decorator(args[0])
else:
return decorator

View File

@@ -5,149 +5,131 @@ import os
import sys
print('GraphFlow is under refactoring now, so this example '
'is temporarily broken')
sys.exit(0)
logging.basicConfig(level=logging.ERROR)
my_dir_path = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
os.pardir))
from taskflow import decorators
from taskflow.engines.action_engine import engine as eng
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
def flow_notify(state, details):
print("'%s' entered state: %s" % (details['flow'], state))
def build_frame():
return 'steel'
def task_notify(state, details):
print("'%s' entered state: %s" % (details['runner'], state))
def build_engine():
return 'honda'
@decorators.task(provides=['spec'])
def build_spec(context):
params = context['inputs']
verified = {}
for k, v in params.items():
verified[k] = int(v)
return {
'spec': verified,
}
def build_doors():
return '2'
@decorators.task(provides=['frame'])
def build_frame(context, spec):
return {
'frame': 'steel',
}
def build_wheels():
return '4'
@decorators.task(provides=['engine'])
def build_engine(context, spec):
return {
'engine': 'honda',
}
def install_engine(frame, engine):
return True
@decorators.task(provides=['doors'])
def build_doors(context, spec):
return {
'doors': '2',
}
def install_doors(frame, windows_installed, doors):
return True
@decorators.task(provides=['wheels'])
def build_wheels(context, spec):
return {
'wheels': '4',
}
def install_windows(frame, doors):
return True
@decorators.task(provides=['wheels'])
def build_windows(context, spec):
return {
'windows': '4',
}
def install_wheels(frame, engine, engine_installed, wheels):
return True
@decorators.task(provides=['engine_installed'])
def install_engine(context, frame, engine):
return {
'engine_installed': True,
}
@decorators.task
def install_doors(context, frame, windows_installed, doors):
pass
@decorators.task(provides=['windows_installed'])
def install_windows(context, frame, doors):
return {
'windows_installed': True,
}
@decorators.task
def install_wheels(context, frame, engine, engine_installed, wheels):
pass
def trash(context, result, cause):
def trash(**kwargs):
print("Throwing away pieces of car!")
@decorators.task(revert=trash)
def startup(context, **kwargs):
def startup(**kwargs):
pass
# TODO(harlowja): try triggering reversion here!
# raise ValueError("Car not verified")
return {
'ran': True,
}
return True
flow = gf.Flow("make-auto")
flow.notifier.register('*', flow_notify)
flow.task_notifier.register('*', task_notify)
def verify(spec, **kwargs):
for key, value in kwargs.items():
if spec[key] != value:
raise Exception("Car doesn't match spec!")
return True
# Lets build a car!!
flow.add(build_spec)
flow.add(build_frame)
flow.add(build_engine)
flow.add(build_doors)
flow.add(build_wheels)
i_uuid1 = flow.add(install_engine)
i_uuid2 = flow.add(install_doors)
i_uuid3 = flow.add(install_windows)
i_uuid4 = flow.add(install_wheels)
install_uuids = [i_uuid1, i_uuid2, i_uuid3, i_uuid4]
def flow_watch(state, details):
print('Flow => %s' % state)
# Lets add a manual dependency that startup needs all the installation to
# complete, this could be done automatically but lets now instead ;)
startup_uuid = flow.add(startup)
for i_uuid in install_uuids:
flow.add_dependency(i_uuid, startup_uuid)
# Now begin the build!
context = {
"inputs": {
'engine': 123,
'tire': '234',
}
}
print '-' * 7
print 'Running'
print '-' * 7
flow.run(context)
def task_watch(state, details):
print('Task %s => %s' % (details.get('task_name'), state))
print '-' * 11
print 'All results'
print '-' * 11
for (uuid, v) in flow.results.items():
print '%s => %s' % (uuid, v)
flow = lf.Flow("make-auto").add(
task.FunctorTask(startup, revert=trash, provides='ran'),
gf.Flow("install-parts").add(
task.FunctorTask(build_frame, provides='frame'),
task.FunctorTask(build_engine, provides='engine'),
task.FunctorTask(build_doors, provides='doors'),
task.FunctorTask(build_wheels, provides='wheels'),
task.FunctorTask(install_engine, provides='engine_installed'),
task.FunctorTask(install_doors, provides='doors_installed'),
task.FunctorTask(install_windows, provides='windows_installed'),
task.FunctorTask(install_wheels, provides='wheels_installed')),
task.FunctorTask(verify, requires=['frame',
'engine',
'doors',
'wheels',
'engine_installed',
'doors_installed',
'windows_installed',
'wheels_installed']))
engine = eng.SingleThreadedActionEngine(flow)
engine.notifier.register('*', flow_watch)
engine.task_notifier.register('*', task_watch)
engine.storage.inject({'spec': {
"frame": 'steel',
"engine": 'honda',
"doors": '2',
"wheels": '4',
"engine_installed": True,
"doors_installed": True,
"windows_installed": True,
"wheels_installed": True,
}})
print "Build a car"
engine.run()
engine = eng.SingleThreadedActionEngine(flow)
engine.notifier.register('*', flow_watch)
engine.task_notifier.register('*', task_watch)
engine.storage.inject({'spec': {
"frame": 'steel',
"engine": 'honda',
"doors": '5',
"wheels": '4',
"engine_installed": True,
"doors_installed": True,
"windows_installed": True,
"wheels_installed": True,
}})
try:
print "Build a wrong car that doesn't match specification"
engine.run()
except Exception as e:
print e

View File

@@ -8,36 +8,32 @@ my_dir_path = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
os.pardir))
from taskflow import decorators
from taskflow.engines.action_engine import engine as eng
from taskflow.patterns import linear_flow as lf
from taskflow import task
@decorators.task
def call_jim(context):
print("Calling jim.")
print("Context = %s" % (context))
@decorators.task
def call_joe(context):
print("Calling joe.")
print("Context = %s" % (context))
@decorators.task
def flow_watch(state, details):
print('Flow => %s' % state)
@decorators.task
def task_watch(state, details):
print('Task %s => %s' % (details.get('task_name'), state))
flow = lf.Flow("Call-them")
flow.add(call_jim)
flow.add(call_joe)
flow.add(task.FunctorTask(execute=call_jim))
flow.add(task.FunctorTask(execute=call_joe))
engine = eng.SingleThreadedActionEngine(flow)
engine.notifier.register('*', flow_watch)

View File

@@ -19,8 +19,6 @@
import abc
from taskflow.openstack.common import uuidutils
from taskflow import task
from taskflow import utils
def _class_name(obj):
@@ -67,16 +65,6 @@ class Flow(object):
lines.append("%s" % (len(self)))
return "; ".join(lines)
def _extract_item(self, item):
if isinstance(item, (task.BaseTask, Flow)):
return item
if issubclass(item, task.BaseTask):
return item()
task_factory = getattr(item, utils.TASK_FACTORY_ATTRIBUTE, None)
if task_factory:
return self._extract_item(task_factory(item))
raise TypeError("Invalid item %r: it's not task and not flow" % item)
@abc.abstractmethod
def add(self, *items):
"""Adds a given item/items to this flow."""

View File

@@ -37,8 +37,6 @@ class Flow(flow.Flow):
def add(self, *items):
"""Adds a given task/tasks/flow/flows to this flow."""
items = [self._extract_item(item) for item in items]
# NOTE(imelnikov): we add item to the end of flow, so it should
# not provide anything previous items of the flow require
requires = self.requires

View File

@@ -39,8 +39,6 @@ class Flow(flow.Flow):
def add(self, *items):
"""Adds a given task/tasks/flow/flows to this flow."""
items = [self._extract_item(item) for item in items]
# check that items are actually independent
provides = self.provides
old_requires = self.requires

View File

@@ -1,136 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import decorators
from taskflow.patterns import linear_flow
from taskflow import test
from taskflow.engines.action_engine import engine as eng
class WrapableObjectsTest(test.TestCase):
def _make_engine(self, flow):
e = eng.SingleThreadedActionEngine(flow)
e.compile()
return e
def test_simple_function(self):
values = []
def revert_one(*args, **kwargs):
values.append('revert one')
@decorators.task(revert=revert_one)
def run_one(*args, **kwargs):
values.append('one')
@decorators.task
def run_fail(*args, **kwargs):
values.append('fail')
raise RuntimeError('Woot!')
flow = linear_flow.Flow('test')
flow.add(
run_one,
run_fail
)
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
e = self._make_engine(flow)
e.run()
self.assertEquals(values, ['one', 'fail', 'revert one'])
def test_simple_method(self):
class MyTasks(object):
def __init__(self):
# NOTE(imelnikov): that's really *bad thing* to pass
# data between task like this; though, its good enough
# for our testing here
self.values = []
@decorators.task
def run_one(self, *args, **kwargs):
self.values.append('one')
@decorators.task
def run_fail(self, *args, **kwargs):
self.values.append('fail')
raise RuntimeError('Woot!')
tasks = MyTasks()
flow = linear_flow.Flow('test')
flow.add(
tasks.run_one,
tasks.run_fail
)
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
e = self._make_engine(flow)
e.run()
self.assertEquals(tasks.values, ['one', 'fail'])
def test_static_method(self):
values = []
class MyTasks(object):
@decorators.task
@staticmethod
def run_one(*args, **kwargs):
values.append('one')
# NOTE(imelnikov): decorators should work in any order:
@staticmethod
@decorators.task
def run_fail(*args, **kwargs):
values.append('fail')
raise RuntimeError('Woot!')
flow = linear_flow.Flow('test')
flow.add(
MyTasks.run_one,
MyTasks.run_fail
)
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
e = self._make_engine(flow)
e.run()
self.assertEquals(values, ['one', 'fail'])
def test_class_method(self):
class MyTasks(object):
values = []
@decorators.task
@classmethod
def run_one(cls, *args, **kwargs):
cls.values.append('one')
# NOTE(imelnikov): decorators should work in any order:
@classmethod
@decorators.task
def run_fail(cls, *args, **kwargs):
cls.values.append('fail')
raise RuntimeError('Woot!')
flow = linear_flow.Flow('test')
flow.add(
MyTasks.run_one,
MyTasks.run_fail
)
with self.assertRaisesRegexp(RuntimeError, '^Woot'):
e = self._make_engine(flow)
e.run()
self.assertEquals(MyTasks.values, ['one', 'fail'])

View File

@@ -18,11 +18,11 @@
import collections
from taskflow import decorators
from taskflow.engines.action_engine import engine as eng
from taskflow import exceptions as exc
from taskflow.patterns import linear_flow as lw
from taskflow import states
from taskflow import task
from taskflow import test
from taskflow.tests import utils
@@ -37,12 +37,11 @@ class LinearFlowTest(test.TestCase):
def test_result_access(self):
@decorators.task(provides=['a', 'b'])
def do_apply1(context):
return [1, 2]
wf = lw.Flow("the-test-action")
wf.add(do_apply1)
wf.add(task.FunctorTask(do_apply1, provides=['a', 'b']))
e = self._make_engine(wf)
e.run()
@@ -55,19 +54,17 @@ class LinearFlowTest(test.TestCase):
def test_functor_flow(self):
wf = lw.Flow("the-test-action")
@decorators.task(provides=['a', 'b', 'c'])
def do_apply1(context):
context['1'] = True
return ['a', 'b', 'c']
@decorators.task(requires=set(['c']))
def do_apply2(context, a, **kwargs):
self.assertTrue('c' in kwargs)
self.assertEquals('a', a)
context['2'] = True
wf.add(do_apply1)
wf.add(do_apply2)
wf.add(task.FunctorTask(do_apply1, provides=['a', 'b', 'c']))
wf.add(task.FunctorTask(do_apply2, requires=set(['c'])))
e = self._make_engine(wf)
e.run()
@@ -191,17 +188,15 @@ class LinearFlowTest(test.TestCase):
def test_not_satisfied_inputs(self):
@decorators.task
def task_a(context, *args, **kwargs):
pass
@decorators.task
def task_b(context, c, *args, **kwargs):
pass
wf = lw.Flow("the-test-action")
wf.add(task_a)
wf.add(task_b)
wf.add(task.FunctorTask(task_a))
wf.add(task.FunctorTask(task_b))
e = self._make_engine(wf)
self.assertRaises(exc.MissingDependencies, e.run)

View File

@@ -16,9 +16,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import decorators
from taskflow.engines.action_engine import engine as eng
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow import test
from taskflow.tests import utils
@@ -33,12 +33,14 @@ class UnorderedFlowTest(test.TestCase):
def test_result_access(self):
@decorators.task(provides=['a', 'b'])
def do_apply1(context):
return [1, 2]
class DoApply(task.Task):
default_provides = ('a', 'b')
def execute(self):
return [1, 2]
wf = uf.Flow("the-test-action")
wf.add(do_apply1)
wf.add(DoApply())
e = self._make_engine(wf)
e.run()
@@ -57,18 +59,20 @@ class UnorderedFlowTest(test.TestCase):
def test_functor_flow(self):
@decorators.task(provides=['a', 'b', 'c'])
def do_apply1(context):
context['1'] = True
return ['a', 'b', 'c']
class DoApply1(task.Task):
default_provides = ('a', 'b', 'c')
@decorators.task
def do_apply2(context, **kwargs):
context['2'] = True
def execute(self, context):
context['1'] = True
return ['a', 'b', 'c']
class DoApply2(task.Task):
def execute(self, context):
context['2'] = True
wf = uf.Flow("the-test-action")
wf.add(do_apply1)
wf.add(do_apply2)
wf.add(DoApply1())
wf.add(DoApply2())
e = self._make_engine(wf)
e.run()
self.assertEquals(2, len(e.storage.fetch('context')))

View File

@@ -16,7 +16,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import decorators
from taskflow import task
ARGS_KEY = '__args__'
@@ -47,18 +46,17 @@ def make_reverting_task(token, blowup=False):
if blowup:
@decorators.task(name='blowup_%s' % token)
def blow_up(context, *args, **kwargs):
raise Exception("I blew up")
return blow_up
return task.FunctorTask(blow_up, name='blowup_%s' % token)
else:
@decorators.task(revert=do_revert, name='do_apply_%s' % token)
def do_apply(context, *args, **kwargs):
context[token] = 'passed'
return do_apply
return task.FunctorTask(do_apply, revert=do_revert,
name='do_apply_%s' % token)
class ProvidesRequiresTask(task.Task):

View File

@@ -15,5 +15,3 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
TASK_FACTORY_ATTRIBUTE = '_TaskFlow_task_factory'