From 574d0da91fd322fdae071605e83821afa3fbc6cd Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 27 Jun 2013 15:47:06 -0700 Subject: [PATCH] Add a new simple calculator example. Add a simple linear calculator example which will be given a few sets of tasks to do and some initial provided data and then combine the correct ordering of that tasks to produce an output result that then can be extracted. Change-Id: Ica84dff67ce0a6b7699cd98effebc85c7dbd4cb2 --- taskflow/examples/calculate_linear.py | 100 ++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 taskflow/examples/calculate_linear.py diff --git a/taskflow/examples/calculate_linear.py b/taskflow/examples/calculate_linear.py new file mode 100644 index 00000000..171c5884 --- /dev/null +++ b/taskflow/examples/calculate_linear.py @@ -0,0 +1,100 @@ +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +my_dir_path = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), + os.pardir)) + +from taskflow.patterns import linear_flow as lf +from taskflow import task + + +def flow_notify(state, details): + print("'%s' entered state: %s" % (details['flow'], state)) + + +def task_notify(state, details): + print("'%s' entered state: %s" % (details['runner'], state)) + + +# This class is used to populate requirements that further tasks need to run +# by populating those tasks from a dictionary and returning said dictionary +# when the flow runs (so that further tasks can use those values). You can +# think of this a needed bootstrapping of a flow in a way. +class Provider(task.Task): + def __init__(self, name, **kwargs): + super(Provider, self).__init__(name) + self.provides.update(kwargs.keys()) + self._provide = kwargs + + def __call__(self, context): + return self._provide + + +class Adder(task.Task): + def __init__(self, name, x_name, y_name, provides_name): + super(Adder, self).__init__(name) + self.requires.update([x_name, y_name]) + self.provides.update([provides_name]) + self._provides_name = provides_name + + def __call__(self, context, **kwargs): + return { + self._provides_name: sum(kwargs.values()), + } + + +class Multiplier(task.Task): + def __init__(self, name, z_name, by_how_much): + super(Multiplier, self).__init__(name) + self.requires.update([z_name]) + self._by_how_much = by_how_much + self._z_name = z_name + + def __call__(self, context, **kwargs): + return kwargs.pop(self._z_name) * self._by_how_much + + +flow = lf.Flow("calc-them") +flow.add(Provider("provide-adder", x=2, y=3, d=5)) + +# Add x + y to produce z (5) +flow.add(Adder('add', 'x', 'y', 'z')) + +# Add z + d to produce a (5 + 5) +flow.add(Adder('add', 'z', 'd', 'a')) + +# Multiple a by 3 (30) +multi_uuid = flow.add(Multiplier('multi', 'a', 3)) + +# Get notified of the state changes the flow is going through. +flow.notifier.register('*', flow_notify) + +# Get notified of the state changes the flows tasks/runners are going through. +flow.task_notifier.register('*', task_notify) + +# Context is typically passed in openstack, it is not needed here. +print '-' * 7 +print 'Running' +print '-' * 7 +context = {} +flow.run(context) + +# This will have the last results and the task that produced that result, +# but we don't care about the task that produced it and just want the result +# itself. +print '-' * 11 +print 'All results' +print '-' * 11 +for (uuid, v) in flow.results.items(): + print '%s => %s' % (uuid, v) + +multi_results = flow.results[multi_uuid] +print '-' * 15 +print "Multiply result" +print '-' * 15 +print(multi_results) +assert multi_results == 30, "Example is broken"