From 6c7e7576dc8e24d73c5bb3cf63e0541ad0aa213b Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 7 May 2014 17:35:28 -0700 Subject: [PATCH] Add an example which uses the run iteration functionality Create an example which can run many engines at once using many engines, each with its own iterator that can be used in a single loop to cause that engine to progress to its next state. Change-Id: I8c6ca19a752f4ced77fd86727f17ecad8c8e72c8 --- taskflow/examples/run_by_iter.out.txt | 106 ++++++++++++++++++++++++++ taskflow/examples/run_by_iter.py | 94 +++++++++++++++++++++++ 2 files changed, 200 insertions(+) create mode 100644 taskflow/examples/run_by_iter.out.txt create mode 100644 taskflow/examples/run_by_iter.py diff --git a/taskflow/examples/run_by_iter.out.txt b/taskflow/examples/run_by_iter.out.txt new file mode 100644 index 00000000..6e12eb40 --- /dev/null +++ b/taskflow/examples/run_by_iter.out.txt @@ -0,0 +1,106 @@ +RESUMING +SCHEDULING +A +WAITING +ANALYZING +SCHEDULING +B +WAITING +ANALYZING +SCHEDULING +C +WAITING +ANALYZING +SCHEDULING +D +WAITING +ANALYZING +SCHEDULING +E +WAITING +ANALYZING +SCHEDULING +F +WAITING +ANALYZING +SCHEDULING +G +WAITING +ANALYZING +SCHEDULING +H +WAITING +ANALYZING +SCHEDULING +I +WAITING +ANALYZING +SCHEDULING +J +WAITING +ANALYZING +SCHEDULING +K +WAITING +ANALYZING +SCHEDULING +L +WAITING +ANALYZING +SCHEDULING +M +WAITING +ANALYZING +SCHEDULING +N +WAITING +ANALYZING +SCHEDULING +O +WAITING +ANALYZING +SCHEDULING +P +WAITING +ANALYZING +SCHEDULING +Q +WAITING +ANALYZING +SCHEDULING +R +WAITING +ANALYZING +SCHEDULING +S +WAITING +ANALYZING +SCHEDULING +T +WAITING +ANALYZING +SCHEDULING +U +WAITING +ANALYZING +SCHEDULING +V +WAITING +ANALYZING +SCHEDULING +W +WAITING +ANALYZING +SCHEDULING +X +WAITING +ANALYZING +SCHEDULING +Y +WAITING +ANALYZING +SCHEDULING +Z +WAITING +ANALYZING +SUCCESS diff --git a/taskflow/examples/run_by_iter.py b/taskflow/examples/run_by_iter.py new file mode 100644 index 00000000..0a7761b7 --- /dev/null +++ b/taskflow/examples/run_by_iter.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +import six + +logging.basicConfig(level=logging.ERROR) + +self_dir = os.path.abspath(os.path.dirname(__file__)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) +sys.path.insert(0, self_dir) + + +from taskflow.engines.action_engine import engine +from taskflow.patterns import linear_flow as lf +from taskflow.persistence.backends import impl_memory +from taskflow import task +from taskflow.utils import persistence_utils + + +# INTRO: This examples shows how to run a set of engines at the same time, each +# running in different engines using a single thread of control to iterate over +# each engine (which causes that engine to advanced to its next state during +# each iteration). + + +class EchoTask(task.Task): + def execute(self, value): + print(value) + return chr(ord(value) + 1) + + +def make_alphabet_flow(i): + f = lf.Flow("alphabet_%s" % (i)) + start_value = 'A' + end_value = 'Z' + curr_value = start_value + while ord(curr_value) <= ord(end_value): + next_value = chr(ord(curr_value) + 1) + if curr_value != end_value: + f.add(EchoTask(name="echoer_%s" % curr_value, + rebind={'value': curr_value}, + provides=next_value)) + else: + f.add(EchoTask(name="echoer_%s" % curr_value, + rebind={'value': curr_value})) + curr_value = next_value + return f + + +# Adjust this number to change how many engines/flows run at once. +flow_count = 1 +flows = [] +for i in range(0, flow_count): + f = make_alphabet_flow(i + 1) + flows.append(make_alphabet_flow(i + 1)) +be = impl_memory.MemoryBackend({}) +book = persistence_utils.temporary_log_book(be) +engines = [] +for f in flows: + fd = persistence_utils.create_flow_detail(f, book, be) + e = engine.SingleThreadedActionEngine(f, fd, be, {}) + e.compile() + e.storage.inject({'A': 'A'}) + e.prepare() + engines.append(e) +engine_iters = [] +for e in engines: + engine_iters.append(e.run_iter()) +while engine_iters: + for it in list(engine_iters): + try: + print(six.next(it)) + except StopIteration: + engine_iters.remove(it)