diff --git a/taskflow/examples/delayed_return.py b/taskflow/examples/delayed_return.py new file mode 100644 index 00000000..e77b961c --- /dev/null +++ b/taskflow/examples/delayed_return.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +from concurrent import futures + +logging.basicConfig(level=logging.ERROR) + +self_dir = os.path.abspath(os.path.dirname(__file__)) +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) +sys.path.insert(0, self_dir) + +# INTRO: in this example linear_flow we will attach a listener to an engine +# and delay the return from a function until after the result of a task has +# occured in that engine. The engine will continue running (in the background) +# while the function will have returned. + +import taskflow.engines + +from taskflow.listeners import base +from taskflow.patterns import linear_flow as lf +from taskflow import states +from taskflow import task +from taskflow.utils import misc + + +class PokeFutureListener(base.ListenerBase): + def __init__(self, engine, future, task_name): + super(PokeFutureListener, self).__init__( + engine, + task_listen_for=(misc.Notifier.ANY,), + flow_listen_for=[]) + self._future = future + self._task_name = task_name + + def _task_receiver(self, state, details): + if state in (states.SUCCESS, states.FAILURE): + if details.get('task_name') == self._task_name: + if state == states.SUCCESS: + self._future.set_result(details['result']) + else: + failure = details['result'] + self._future.set_exception(failure.exception) + + +class Hi(task.Task): + def execute(self): + # raise IOError("I broken") + return 'hi' + + +class Bye(task.Task): + def execute(self): + return 'bye' + + +def return_from_flow(pool): + wf = lf.Flow("root").add(Hi("hi"), Bye("bye")) + eng = taskflow.engines.load(wf, engine_conf='serial') + f = futures.Future() + watcher = PokeFutureListener(eng, f, 'hi') + watcher.register() + pool.submit(eng.run) + return (eng, f.result()) + + +with futures.ThreadPoolExecutor(1) as pool: + engine, hi_result = return_from_flow(pool) + print(hi_result) + +print(engine.storage.get_flow_state())