44f17d005f
This library no longer supports Python 2, thus usage of six can be removed. This also removes workaround about pickle library used in Python 2 only. Change-Id: I19d298cf0f402d65f0b142dea0bf35cf992332a9
252 lines
8.6 KiB
Python
252 lines
8.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
import math
|
|
import os
|
|
import sys
|
|
|
|
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
|
os.pardir,
|
|
os.pardir))
|
|
sys.path.insert(0, top_dir)
|
|
|
|
from taskflow import engines
|
|
from taskflow.engines.worker_based import worker
|
|
from taskflow.patterns import unordered_flow as uf
|
|
from taskflow import task
|
|
from taskflow.utils import threading_utils
|
|
|
|
# INTRO: This example walks through a workflow that will in parallel compute
|
|
# a mandelbrot result set (using X 'remote' workers) and then combine their
|
|
# results together to form a final mandelbrot fractal image. It shows a usage
|
|
# of taskflow to perform a well-known embarrassingly parallel problem that has
|
|
# the added benefit of also being an elegant visualization.
|
|
#
|
|
# NOTE(harlowja): this example simulates the expected larger number of workers
|
|
# by using a set of threads (which in this example simulate the remote workers
|
|
# that would typically be running on other external machines).
|
|
#
|
|
# NOTE(harlowja): to have it produce an image run (after installing pillow):
|
|
#
|
|
# $ python taskflow/examples/wbe_mandelbrot.py output.png
|
|
|
|
BASE_SHARED_CONF = {
|
|
'exchange': 'taskflow',
|
|
}
|
|
WORKERS = 2
|
|
WORKER_CONF = {
|
|
# These are the tasks the worker can execute, they *must* be importable,
|
|
# typically this list is used to restrict what workers may execute to
|
|
# a smaller set of *allowed* tasks that are known to be safe (one would
|
|
# not want to allow all python code to be executed).
|
|
'tasks': [
|
|
'%s:MandelCalculator' % (__name__),
|
|
],
|
|
}
|
|
ENGINE_CONF = {
|
|
'engine': 'worker-based',
|
|
}
|
|
|
|
# Mandelbrot & image settings...
|
|
IMAGE_SIZE = (512, 512)
|
|
CHUNK_COUNT = 8
|
|
MAX_ITERATIONS = 25
|
|
|
|
|
|
class MandelCalculator(task.Task):
|
|
def execute(self, image_config, mandelbrot_config, chunk):
|
|
"""Returns the number of iterations before the computation "escapes".
|
|
|
|
Given the real and imaginary parts of a complex number, determine if it
|
|
is a candidate for membership in the mandelbrot set given a fixed
|
|
number of iterations.
|
|
"""
|
|
|
|
# Parts borrowed from (credit to mark harris and benoît mandelbrot).
|
|
#
|
|
# http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
|
|
def mandelbrot(x, y, max_iters):
|
|
c = complex(x, y)
|
|
z = 0.0j
|
|
for i in range(max_iters):
|
|
z = z * z + c
|
|
if (z.real * z.real + z.imag * z.imag) >= 4:
|
|
return i
|
|
return max_iters
|
|
|
|
min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
|
|
height, width = image_config['size']
|
|
pixel_size_x = (max_x - min_x) / width
|
|
pixel_size_y = (max_y - min_y) / height
|
|
block = []
|
|
for y in range(chunk[0], chunk[1]):
|
|
row = []
|
|
imag = min_y + y * pixel_size_y
|
|
for x in range(0, width):
|
|
real = min_x + x * pixel_size_x
|
|
row.append(mandelbrot(real, imag, max_iters))
|
|
block.append(row)
|
|
return block
|
|
|
|
|
|
def calculate(engine_conf):
|
|
# Subdivide the work into X pieces, then request each worker to calculate
|
|
# one of those chunks and then later we will write these chunks out to
|
|
# an image bitmap file.
|
|
|
|
# And unordered flow is used here since the mandelbrot calculation is an
|
|
# example of an embarrassingly parallel computation that we can scatter
|
|
# across as many workers as possible.
|
|
flow = uf.Flow("mandelbrot")
|
|
|
|
# These symbols will be automatically given to tasks as input to their
|
|
# execute method, in this case these are constants used in the mandelbrot
|
|
# calculation.
|
|
store = {
|
|
'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
|
|
'image_config': {
|
|
'size': IMAGE_SIZE,
|
|
}
|
|
}
|
|
|
|
# We need the task names to be in the right order so that we can extract
|
|
# the final results in the right order (we don't care about the order when
|
|
# executing).
|
|
task_names = []
|
|
|
|
# Compose our workflow.
|
|
height, _width = IMAGE_SIZE
|
|
chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
|
|
for i in range(0, CHUNK_COUNT):
|
|
chunk_name = 'chunk_%s' % i
|
|
task_name = "calculation_%s" % i
|
|
# Break the calculation up into chunk size pieces.
|
|
rows = [i * chunk_size, i * chunk_size + chunk_size]
|
|
flow.add(
|
|
MandelCalculator(task_name,
|
|
# This ensures the storage symbol with name
|
|
# 'chunk_name' is sent into the tasks local
|
|
# symbol 'chunk'. This is how we give each
|
|
# calculator its own correct sequence of rows
|
|
# to work on.
|
|
rebind={'chunk': chunk_name}))
|
|
store[chunk_name] = rows
|
|
task_names.append(task_name)
|
|
|
|
# Now execute it.
|
|
eng = engines.load(flow, store=store, engine_conf=engine_conf)
|
|
eng.run()
|
|
|
|
# Gather all the results and order them for further processing.
|
|
gather = []
|
|
for name in task_names:
|
|
gather.extend(eng.storage.get(name))
|
|
points = []
|
|
for y, row in enumerate(gather):
|
|
for x, color in enumerate(row):
|
|
points.append(((x, y), color))
|
|
return points
|
|
|
|
|
|
def write_image(results, output_filename=None):
|
|
print("Gathered %s results that represents a mandelbrot"
|
|
" image (using %s chunks that are computed jointly"
|
|
" by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
|
|
if not output_filename:
|
|
return
|
|
|
|
# Pillow (the PIL fork) saves us from writing our own image writer...
|
|
try:
|
|
from PIL import Image
|
|
except ImportError as e:
|
|
# To currently get this (may change in the future),
|
|
# $ pip install Pillow
|
|
raise RuntimeError("Pillow is required to write image files: %s" % e)
|
|
|
|
# Limit to 255, find the max and normalize to that...
|
|
color_max = 0
|
|
for _point, color in results:
|
|
color_max = max(color, color_max)
|
|
|
|
# Use gray scale since we don't really have other colors.
|
|
img = Image.new('L', IMAGE_SIZE, "black")
|
|
pixels = img.load()
|
|
for (x, y), color in results:
|
|
if color_max == 0:
|
|
color = 0
|
|
else:
|
|
color = int((float(color) / color_max) * 255.0)
|
|
pixels[x, y] = color
|
|
img.save(output_filename)
|
|
|
|
|
|
def create_fractal():
|
|
logging.basicConfig(level=logging.ERROR)
|
|
|
|
# Setup our transport configuration and merge it into the worker and
|
|
# engine configuration so that both of those use it correctly.
|
|
shared_conf = dict(BASE_SHARED_CONF)
|
|
shared_conf.update({
|
|
'transport': 'memory',
|
|
'transport_options': {
|
|
'polling_interval': 0.1,
|
|
},
|
|
})
|
|
|
|
if len(sys.argv) >= 2:
|
|
output_filename = sys.argv[1]
|
|
else:
|
|
output_filename = None
|
|
|
|
worker_conf = dict(WORKER_CONF)
|
|
worker_conf.update(shared_conf)
|
|
engine_conf = dict(ENGINE_CONF)
|
|
engine_conf.update(shared_conf)
|
|
workers = []
|
|
worker_topics = []
|
|
|
|
print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
|
|
try:
|
|
# Create a set of workers to simulate actual remote workers.
|
|
print('Running %s workers.' % (WORKERS))
|
|
for i in range(0, WORKERS):
|
|
worker_conf['topic'] = 'calculator_%s' % (i + 1)
|
|
worker_topics.append(worker_conf['topic'])
|
|
w = worker.Worker(**worker_conf)
|
|
runner = threading_utils.daemon_thread(w.run)
|
|
runner.start()
|
|
w.wait()
|
|
workers.append((runner, w.stop))
|
|
|
|
# Now use those workers to do something.
|
|
engine_conf['topics'] = worker_topics
|
|
results = calculate(engine_conf)
|
|
print('Execution finished.')
|
|
finally:
|
|
# And cleanup.
|
|
print('Stopping workers.')
|
|
while workers:
|
|
r, stopper = workers.pop()
|
|
stopper()
|
|
r.join()
|
|
print("Writing image...")
|
|
write_image(results, output_filename=output_filename)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
create_fractal()
|