Add a new simple calculator example.

Add a simple linear calculator example which will
be given a few sets of tasks to do and some initial provided
data and then combine the correct ordering of that tasks
to produce an output result that then can be extracted.

Change-Id: Ica84dff67ce0a6b7699cd98effebc85c7dbd4cb2
This commit is contained in:
Joshua Harlow
2013-06-27 15:47:06 -07:00
parent 9be6c5d18a
commit 574d0da91f

View File

@@ -0,0 +1,100 @@
import logging
import os
import sys
logging.basicConfig(level=logging.ERROR)
my_dir_path = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
os.pardir))
from taskflow.patterns import linear_flow as lf
from taskflow import task
def flow_notify(state, details):
print("'%s' entered state: %s" % (details['flow'], state))
def task_notify(state, details):
print("'%s' entered state: %s" % (details['runner'], state))
# This class is used to populate requirements that further tasks need to run
# by populating those tasks from a dictionary and returning said dictionary
# when the flow runs (so that further tasks can use those values). You can
# think of this a needed bootstrapping of a flow in a way.
class Provider(task.Task):
def __init__(self, name, **kwargs):
super(Provider, self).__init__(name)
self.provides.update(kwargs.keys())
self._provide = kwargs
def __call__(self, context):
return self._provide
class Adder(task.Task):
def __init__(self, name, x_name, y_name, provides_name):
super(Adder, self).__init__(name)
self.requires.update([x_name, y_name])
self.provides.update([provides_name])
self._provides_name = provides_name
def __call__(self, context, **kwargs):
return {
self._provides_name: sum(kwargs.values()),
}
class Multiplier(task.Task):
def __init__(self, name, z_name, by_how_much):
super(Multiplier, self).__init__(name)
self.requires.update([z_name])
self._by_how_much = by_how_much
self._z_name = z_name
def __call__(self, context, **kwargs):
return kwargs.pop(self._z_name) * self._by_how_much
flow = lf.Flow("calc-them")
flow.add(Provider("provide-adder", x=2, y=3, d=5))
# Add x + y to produce z (5)
flow.add(Adder('add', 'x', 'y', 'z'))
# Add z + d to produce a (5 + 5)
flow.add(Adder('add', 'z', 'd', 'a'))
# Multiple a by 3 (30)
multi_uuid = flow.add(Multiplier('multi', 'a', 3))
# Get notified of the state changes the flow is going through.
flow.notifier.register('*', flow_notify)
# Get notified of the state changes the flows tasks/runners are going through.
flow.task_notifier.register('*', task_notify)
# Context is typically passed in openstack, it is not needed here.
print '-' * 7
print 'Running'
print '-' * 7
context = {}
flow.run(context)
# This will have the last results and the task that produced that result,
# but we don't care about the task that produced it and just want the result
# itself.
print '-' * 11
print 'All results'
print '-' * 11
for (uuid, v) in flow.results.items():
print '%s => %s' % (uuid, v)
multi_results = flow.results[multi_uuid]
print '-' * 15
print "Multiply result"
print '-' * 15
print(multi_results)
assert multi_results == 30, "Example is broken"