Add a parallel table mutation example
A new simple example that is pretty easy to follow that does a embarrassingly parallel computation on some input table to create a new output table (by performing a multiplication on each cell in that source table to create a new table). Part of blueprint more-examples Change-Id: I2684f39b3525ee2d43a03ab353d029fdc0e1b2a1
This commit is contained in:
@@ -58,6 +58,18 @@ Watching execution timing
|
||||
:linenos:
|
||||
:lines: 16-
|
||||
|
||||
Table multiplier (in parallel)
|
||||
==============================
|
||||
|
||||
.. note::
|
||||
|
||||
Full source located at :example:`parallel_table_multiply`
|
||||
|
||||
.. literalinclude:: ../../taskflow/examples/parallel_table_multiply.py
|
||||
:language: python
|
||||
:linenos:
|
||||
:lines: 16-
|
||||
|
||||
Linear equation solver (explicit dependencies)
|
||||
==============================================
|
||||
|
||||
|
||||
129
taskflow/examples/parallel_table_multiply.py
Normal file
129
taskflow/examples/parallel_table_multiply.py
Normal file
@@ -0,0 +1,129 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import csv
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
|
||||
logging.basicConfig(level=logging.ERROR)
|
||||
|
||||
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
os.pardir,
|
||||
os.pardir))
|
||||
sys.path.insert(0, top_dir)
|
||||
|
||||
from six.moves import range as compat_range
|
||||
|
||||
from taskflow import engines
|
||||
from taskflow.patterns import unordered_flow as uf
|
||||
from taskflow import task
|
||||
from taskflow.types import futures
|
||||
from taskflow.utils import async_utils
|
||||
|
||||
# INTRO: This example walks through a miniature workflow which does a parallel
|
||||
# table modification where each row in the table gets adjusted by a thread, or
|
||||
# green thread (if eventlet is available) in parallel and then the result
|
||||
# is reformed into a new table and some verifications are performed on it
|
||||
# to ensure everything went as expected.
|
||||
|
||||
|
||||
MULTIPLER = 10
|
||||
|
||||
|
||||
class RowMultiplier(task.Task):
|
||||
"""Performs a modification of an input row, creating a output row."""
|
||||
|
||||
def __init__(self, name, index, row, multiplier):
|
||||
super(RowMultiplier, self).__init__(name=name)
|
||||
self.index = index
|
||||
self.multiplier = multiplier
|
||||
self.row = row
|
||||
|
||||
def execute(self):
|
||||
return [r * self.multiplier for r in self.row]
|
||||
|
||||
|
||||
def make_flow(table):
|
||||
# This creation will allow for parallel computation (since the flow here
|
||||
# is specifically unordered; and when things are unordered they have
|
||||
# no dependencies and when things have no dependencies they can just be
|
||||
# ran at the same time, limited in concurrency by the executor or max
|
||||
# workers of that executor...)
|
||||
f = uf.Flow("root")
|
||||
for i, row in enumerate(table):
|
||||
f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
|
||||
# NOTE(harlowja): at this point nothing has ran, the above is just
|
||||
# defining what should be done (but not actually doing it) and associating
|
||||
# an ordering dependencies that should be enforced (the flow pattern used
|
||||
# forces this), the engine in the later main() function will actually
|
||||
# perform this work...
|
||||
return f
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) == 2:
|
||||
tbl = []
|
||||
with open(sys.argv[1], 'rb') as fh:
|
||||
reader = csv.reader(fh)
|
||||
for row in reader:
|
||||
tbl.append([float(r) if r else 0.0 for r in row])
|
||||
else:
|
||||
# Make some random table out of thin air...
|
||||
tbl = []
|
||||
cols = random.randint(1, 100)
|
||||
rows = random.randint(1, 100)
|
||||
for _i in compat_range(0, rows):
|
||||
row = []
|
||||
for _j in compat_range(0, cols):
|
||||
row.append(random.random())
|
||||
tbl.append(row)
|
||||
|
||||
# Generate the work to be done.
|
||||
f = make_flow(tbl)
|
||||
|
||||
# Now run it (using the specified executor)...
|
||||
if async_utils.EVENTLET_AVAILABLE:
|
||||
executor = futures.GreenThreadPoolExecutor(max_workers=5)
|
||||
else:
|
||||
executor = futures.ThreadPoolExecutor(max_workers=5)
|
||||
try:
|
||||
e = engines.load(f, engine='parallel', executor=executor)
|
||||
for st in e.run_iter():
|
||||
print(st)
|
||||
finally:
|
||||
executor.shutdown()
|
||||
|
||||
# Find the old rows and put them into place...
|
||||
#
|
||||
# TODO(harlowja): probably easier just to sort instead of search...
|
||||
computed_tbl = []
|
||||
for i in compat_range(0, len(tbl)):
|
||||
for t in f:
|
||||
if t.index == i:
|
||||
computed_tbl.append(e.storage.get(t.name))
|
||||
|
||||
# Do some basic validation (which causes the return code of this process
|
||||
# to be different if things were not as expected...)
|
||||
if len(computed_tbl) != len(tbl):
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user