From a638c9864f8d5f050fc59be1a84e728e30f981aa Mon Sep 17 00:00:00 2001 From: Anastasia Karpinska Date: Wed, 4 Sep 2013 11:27:39 +0400 Subject: [PATCH] Converted some examples to use patterns/engines Change-Id: If7154019f1cb5e723069ff35f6301fce048323b5 --- taskflow/examples/calculate_in_parallel.py | 55 +++++++++++ taskflow/examples/calculate_linear.py | 103 +++++++-------------- taskflow/examples/reverting_linear.out.txt | 6 +- taskflow/examples/reverting_linear.py | 56 +++++------ taskflow/examples/simple_linear.out.txt | 6 +- taskflow/examples/simple_linear.py | 39 ++++---- 6 files changed, 146 insertions(+), 119 deletions(-) create mode 100644 taskflow/examples/calculate_in_parallel.py diff --git a/taskflow/examples/calculate_in_parallel.py b/taskflow/examples/calculate_in_parallel.py new file mode 100644 index 000000000..3172c4d6a --- /dev/null +++ b/taskflow/examples/calculate_in_parallel.py @@ -0,0 +1,55 @@ +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +my_dir_path = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), + os.pardir)) + +from taskflow import blocks +from taskflow.engines.action_engine import engine as eng +from taskflow import task + +# This examples shows how LinearFlow and ParallelFlow can be used +# together to execute calculations in parallel and then use the +# result for the next task. Adder task is used for all calculations +# and arguments' bindings are used to set correct parameters to the task. + + +class Provider(task.Task): + + def __init__(self, name, *args): + super(Provider, self).__init__(name) + self._provide = args + + def execute(self): + return self._provide + + +class Adder(task.Task): + + def __init__(self, name): + super(Adder, self).__init__(name) + + def execute(self, x, y): + return x + y + + +flow = blocks.LinearFlow().add( + # x1 = 2, y1 = 3, x2 = 5, x3 = 8 + blocks.Task(Provider("provide-adder", 2, 3, 5, 8), + save_as=('x1', 'y1', 'x2', 'y2')), + blocks.ParallelFlow().add( + # z1 = x1+y1 = 5 + blocks.Task(Adder("add"), save_as='z1', rebind_args=['x1', 'y1']), + # z2 = x2+y2 = 13 + blocks.Task(Adder("add"), save_as='z2', rebind_args=['x2', 'y2'])), + # r = z1+z2 = 18 + blocks.Task(Adder("add"), save_as='r', rebind_args=['z1', 'z2'])) + +engine = eng.MultiThreadedActionEngine(flow) +engine.run() + +print engine.storage.fetch_all() diff --git a/taskflow/examples/calculate_linear.py b/taskflow/examples/calculate_linear.py index 58b6a7e70..ed45b0909 100644 --- a/taskflow/examples/calculate_linear.py +++ b/taskflow/examples/calculate_linear.py @@ -8,93 +8,58 @@ my_dir_path = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), os.pardir)) -from taskflow.patterns import linear_flow as lf +from taskflow import blocks +from taskflow.engines.action_engine import engine as eng from taskflow import task -def flow_notify(state, details): - print("'%s' entered state: %s" % (details['flow'], state)) +# In this example LinearFlow is used to group four tasks to +# calculate value. Added task is used twice. In the first case +# it uses default parameters ('x' and 'y') and in the second one +# arguments are binding with 'z' and 'd' keys from engine storage. +# Multiplier task uses binding too, but explicitly shows that 'z' +# parameter is binded with 'a' key from engine storage. -def task_notify(state, details): - print("'%s' entered state: %s" % (details['runner'], state)) - - -# This class is used to populate requirements that further tasks need to run -# by populating those tasks from a dictionary and returning said dictionary -# when the flow runs (so that further tasks can use those values). You can -# think of this a needed bootstrapping of a flow in a way. class Provider(task.Task): - def __init__(self, name, **kwargs): - super(Provider, self).__init__(name) - self.provides.update(kwargs.keys()) - self._provide = kwargs - def execute(self, context): + def __init__(self, name, *args): + super(Provider, self).__init__(name) + self._provide = args + + def execute(self): return self._provide class Adder(task.Task): - def __init__(self, name, x_name, y_name, provides_name): - super(Adder, self).__init__(name) - self.requires.update([x_name, y_name]) - self.provides.update([provides_name]) - self._provides_name = provides_name - def execute(self, context, **kwargs): - return { - self._provides_name: sum(kwargs.values()), - } + def __init__(self, name): + super(Adder, self).__init__(name) + + def execute(self, x, y): + return x + y class Multiplier(task.Task): - def __init__(self, name, z_name, by_how_much): + def __init__(self, name, multiplier): super(Multiplier, self).__init__(name) - self.requires.update([z_name]) - self._by_how_much = by_how_much - self._z_name = z_name + self._multiplier = multiplier - def execute(self, context, **kwargs): - return kwargs.pop(self._z_name) * self._by_how_much + def execute(self, z): + return z * self._multiplier -flow = lf.Flow("calc-them") -flow.add(Provider("provide-adder", x=2, y=3, d=5)) +flow = blocks.LinearFlow().add( + # x = 2, y = 3, d = 5 + blocks.Task(Provider("provide-adder", 2, 3, 5), save_as=('x', 'y', 'd')), + # z = x+y = 5 + blocks.Task(Adder("add"), save_as='z'), + # a = z+d = 10 + blocks.Task(Adder("add"), save_as='a', rebind_args=['z', 'd']), + # r = a*3 = 30 + blocks.Task(Multiplier("multi", 3), save_as='r', rebind_args={'z': 'a'})) -# Add x + y to produce z (5) -flow.add(Adder('add', 'x', 'y', 'z')) +engine = eng.SingleThreadedActionEngine(flow) +engine.run() -# Add z + d to produce a (5 + 5) -flow.add(Adder('add', 'z', 'd', 'a')) - -# Multiple a by 3 (30) -multi_uuid = flow.add(Multiplier('multi', 'a', 3)) - -# Get notified of the state changes the flow is going through. -flow.notifier.register('*', flow_notify) - -# Get notified of the state changes the flows tasks/runners are going through. -flow.task_notifier.register('*', task_notify) - -# Context is typically passed in openstack, it is not needed here. -print '-' * 7 -print 'Running' -print '-' * 7 -context = {} -flow.run(context) - -# This will have the last results and the task that produced that result, -# but we don't care about the task that produced it and just want the result -# itself. -print '-' * 11 -print 'All results' -print '-' * 11 -for (uuid, v) in flow.results.items(): - print '%s => %s' % (uuid, v) - -multi_results = flow.results[multi_uuid] -print '-' * 15 -print "Multiply result" -print '-' * 15 -print(multi_results) -assert multi_results == 30, "Example is broken" +print engine.storage.fetch_all() diff --git a/taskflow/examples/reverting_linear.out.txt b/taskflow/examples/reverting_linear.out.txt index 06bf7aae5..37c4bd9cb 100644 --- a/taskflow/examples/reverting_linear.out.txt +++ b/taskflow/examples/reverting_linear.out.txt @@ -1,7 +1,5 @@ -Calling jim. -Context = {'joe_number': 444, 'jim_number': 555} -Calling joe. -Context = {'joe_number': 444, 'jim_number': 555} +Calling jim 555. +Calling joe 444. Calling 444 and apologizing. Calling 555 and apologizing. Flow failed: IOError('Suzzie not home right now.',) diff --git a/taskflow/examples/reverting_linear.py b/taskflow/examples/reverting_linear.py index 36164b5df..0a34a1bff 100644 --- a/taskflow/examples/reverting_linear.py +++ b/taskflow/examples/reverting_linear.py @@ -8,44 +8,48 @@ my_dir_path = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), os.pardir)) -from taskflow import decorators -from taskflow.patterns import linear_flow as lf +from taskflow import blocks +from taskflow.engines.action_engine import engine as eng +from taskflow import task -def undo_call(context, result, cause): - print("Calling %s and apologizing." % result) +class CallJim(task.Task): + def execute(self, jim_number, *args, **kwargs): + print("Calling jim %s." % jim_number) + + def revert(self, jim_number, *args, **kwargs): + print("Calling %s and apologizing." % jim_number) -@decorators.task(revert=undo_call) -def call_jim(context): - print("Calling jim.") - print("Context = %s" % (context)) - return context['jim_number'] +class CallJoe(task.Task): + def execute(self, joe_number, *args, **kwargs): + print("Calling joe %s." % joe_number) + + def revert(self, joe_number, *args, **kwargs): + print("Calling %s and apologizing." % joe_number) -@decorators.task(revert=undo_call) -def call_joe(context): - print("Calling joe.") - print("Context = %s" % (context)) - return context['joe_number'] +class CallSuzzie(task.Task): + def execute(self, suzzie_number, *args, **kwargs): + raise IOError("Suzzie not home right now.") + + def revert(self, suzzie_number, *args, **kwargs): + # TODO(imelnikov): this method should not be requred + pass -@decorators.task -def call_suzzie(context): - raise IOError("Suzzie not home right now.") +flow = blocks.LinearFlow().add(blocks.Task(CallJim), + blocks.Task(CallJoe), + blocks.Task(CallSuzzie)) +engine = eng.SingleThreadedActionEngine(flow) - -flow = lf.Flow("call-them") -flow.add(call_jim) -flow.add(call_joe) -flow.add(call_suzzie) - -context = { +engine.storage.inject({ "joe_number": 444, "jim_number": 555, -} + "suzzie_number": 666 +}) try: - flow.run(context) + engine.run() except Exception as e: print "Flow failed: %r" % e diff --git a/taskflow/examples/simple_linear.out.txt b/taskflow/examples/simple_linear.out.txt index 4ff650196..2850f26d5 100644 --- a/taskflow/examples/simple_linear.out.txt +++ b/taskflow/examples/simple_linear.out.txt @@ -1,4 +1,2 @@ -Calling jim. -Context = {'joe_number': 444, 'jim_number': 555} -Calling joe. -Context = {'joe_number': 444, 'jim_number': 555} +Calling jim 555. +Calling joe 444. diff --git a/taskflow/examples/simple_linear.py b/taskflow/examples/simple_linear.py index cc190d882..3ec0cb5fa 100644 --- a/taskflow/examples/simple_linear.py +++ b/taskflow/examples/simple_linear.py @@ -8,28 +8,35 @@ my_dir_path = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), os.pardir)) -from taskflow import decorators -from taskflow.patterns import linear_flow as lf +from taskflow import blocks +from taskflow.engines.action_engine import engine as eng +from taskflow import task -@decorators.task -def call_jim(context): - print("Calling jim.") - print("Context = %s" % (context)) +class CallJim(task.Task): + + def __init__(self): + super(CallJim, self).__init__() + + def execute(self, jim_number, *args, **kwargs): + print("Calling jim %s." % jim_number) -@decorators.task -def call_joe(context): - print("Calling joe.") - print("Context = %s" % (context)) +class CallJoe(task.Task): + def __init__(self): + super(CallJoe, self).__init__() -flow = lf.Flow("call-them") -flow.add(call_jim) -flow.add(call_joe) + def execute(self, joe_number, *args, **kwargs): + print("Calling joe %s." % joe_number) -context = { +flow = blocks.LinearFlow().add(blocks.Task(CallJim), + blocks.Task(CallJoe)) +engine = eng.SingleThreadedActionEngine(flow) + +engine.storage.inject({ "joe_number": 444, "jim_number": 555, -} -flow.run(context) +}) + +engine.run()