In order to support tasks notifications and progress updates we need to establish a channel & proxy by which those events can be sent from the process executing and producing those events and the originating process that requested that task to be executed. This review adds on such a proxy and adjusts a cloned tasks notification callbacks to place messages on a queue that will be picked up by a thread in the originating process for dispatch to the original callbacks that were registered with the non-cloned task (therefore making the original callbacks appear to be called as they are supposed to be). Part of blueprint process-executor Change-Id: I01c83f13186e4be9fa28c32e34e907bb133e8fb3
102 lines
3.5 KiB
Python
102 lines
3.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import fractions
|
|
import functools
|
|
import logging
|
|
import os
|
|
import string
|
|
import sys
|
|
import time
|
|
|
|
from concurrent import futures
|
|
|
|
logging.basicConfig(level=logging.ERROR)
|
|
|
|
self_dir = os.path.abspath(os.path.dirname(__file__))
|
|
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
|
os.pardir,
|
|
os.pardir))
|
|
sys.path.insert(0, top_dir)
|
|
sys.path.insert(0, self_dir)
|
|
|
|
from taskflow import engines
|
|
from taskflow import exceptions
|
|
from taskflow.patterns import linear_flow
|
|
from taskflow import task
|
|
|
|
|
|
# In this example we show how a simple linear set of tasks can be executed
|
|
# using local processes (and not threads or remote workers) with minimial (if
|
|
# any) modification to those tasks to make them safe to run in this mode.
|
|
#
|
|
# This is useful since it allows further scaling up your workflows when thread
|
|
# execution starts to become a bottleneck (which it can start to be due to the
|
|
# GIL in python). It also offers a intermediary scalable runner that can be
|
|
# used when the scale and or setup of remote workers is not desirable.
|
|
|
|
# How many local processes to potentially use when executing... (one is fine
|
|
# for this example, but more can be used to show play around with what happens
|
|
# with many...)
|
|
WORKERS = 1
|
|
|
|
|
|
def progress_printer(task, event_type, details):
|
|
# This callback, attached to each task will be called in the local
|
|
# process (not the child processes)...
|
|
progress = details.pop('progress')
|
|
progress = int(progress * 100.0)
|
|
print("Task '%s' reached %d%% completion" % (task.name, progress))
|
|
|
|
|
|
class AlphabetTask(task.Task):
|
|
# Second delay between each progress part.
|
|
_DELAY = 0.1
|
|
|
|
# This task will run in X main stages (each with a different progress
|
|
# report that will be delivered back to the running process...). The
|
|
# initial 0% and 100% are triggered automatically by the engine when
|
|
# a task is started and finished (so that's why those are not emitted
|
|
# here).
|
|
_PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
|
|
|
|
def execute(self):
|
|
for p in self._PROGRESS_PARTS:
|
|
self.update_progress(p)
|
|
time.sleep(self._DELAY)
|
|
|
|
|
|
print("Constructing...")
|
|
soup = linear_flow.Flow("alphabet-soup")
|
|
for letter in string.ascii_lowercase:
|
|
abc = AlphabetTask(letter)
|
|
abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
|
|
functools.partial(progress_printer, abc))
|
|
soup.add(abc)
|
|
try:
|
|
with futures.ProcessPoolExecutor(WORKERS) as executor:
|
|
print("Loading...")
|
|
e = engines.load(soup, engine='parallel', executor=executor)
|
|
print("Compiling...")
|
|
e.compile()
|
|
print("Preparing...")
|
|
e.prepare()
|
|
print("Running...")
|
|
e.run()
|
|
print("Done...")
|
|
except exceptions.NotImplementedError as e:
|
|
print(e)
|