5544d71bc8
Some things that popped out while reading the comments/documentation. Change-Id: I0ccecae3381447ede44bb855d91f997349be1562
98 lines
3.8 KiB
Python
98 lines
3.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
logging.basicConfig(level=logging.ERROR)
|
|
|
|
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
|
os.pardir,
|
|
os.pardir))
|
|
sys.path.insert(0, top_dir)
|
|
|
|
import taskflow.engines
|
|
from taskflow.patterns import linear_flow as lf
|
|
from taskflow.patterns import unordered_flow as uf
|
|
from taskflow import task
|
|
|
|
# INTRO: These examples show how a linear flow and an unordered flow can be
|
|
# used together to execute calculations in parallel and then use the
|
|
# result for the next task/s. The adder task is used for all calculations
|
|
# and argument bindings are used to set correct parameters for each task.
|
|
|
|
|
|
# This task provides some values from as a result of execution, this can be
|
|
# useful when you want to provide values from a static set to other tasks that
|
|
# depend on those values existing before those tasks can run.
|
|
#
|
|
# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
|
|
# that provides those values on engine running by prepopulating the storage
|
|
# backend before your tasks are ran (which accomplishes a similar goal in a
|
|
# more uniform manner).
|
|
class Provider(task.Task):
|
|
def __init__(self, name, *args, **kwargs):
|
|
super(Provider, self).__init__(name=name, **kwargs)
|
|
self._provide = args
|
|
|
|
def execute(self):
|
|
return self._provide
|
|
|
|
|
|
# This task adds two input variables and returns the result of that addition.
|
|
#
|
|
# Note that since this task does not have a revert() function (since addition
|
|
# is a stateless operation) there are no side-effects that this function needs
|
|
# to undo if some later operation fails.
|
|
class Adder(task.Task):
|
|
def execute(self, x, y):
|
|
return x + y
|
|
|
|
|
|
flow = lf.Flow('root').add(
|
|
# Provide the initial values for other tasks to depend on.
|
|
#
|
|
# x1 = 2, y1 = 3, x2 = 5, x3 = 8
|
|
Provider("provide-adder", 2, 3, 5, 8,
|
|
provides=('x1', 'y1', 'x2', 'y2')),
|
|
# Note here that we define the flow that contains the 2 adders to be an
|
|
# unordered flow since the order in which these execute does not matter,
|
|
# another way to solve this would be to use a graph_flow pattern, which
|
|
# also can run in parallel (since they have no ordering dependencies).
|
|
uf.Flow('adders').add(
|
|
# Calculate 'z1 = x1+y1 = 5'
|
|
#
|
|
# Rebind here means that the execute() function x argument will be
|
|
# satisfied from a previous output named 'x1', and the y argument
|
|
# of execute() will be populated from the previous output named 'y1'
|
|
#
|
|
# The output (result of adding) will be mapped into a variable named
|
|
# 'z1' which can then be refereed to and depended on by other tasks.
|
|
Adder(name="add", provides='z1', rebind=['x1', 'y1']),
|
|
# z2 = x2+y2 = 13
|
|
Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
|
|
),
|
|
# r = z1+z2 = 18
|
|
Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))
|
|
|
|
|
|
# The result here will be all results (from all tasks) which is stored in an
|
|
# in-memory storage location that backs this engine since it is not configured
|
|
# with persistence storage.
|
|
result = taskflow.engines.run(flow, engine='parallel')
|
|
print(result)
|