taskflow/taskflow/examples/wbe_mandelbrot.py

252 lines
8.6 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import math
import os
import sys
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
os.pardir))
sys.path.insert(0, top_dir)
from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.utils import threading_utils
# INTRO: This example walks through a workflow that will in parallel compute
# a mandelbrot result set (using X 'remote' workers) and then combine their
# results together to form a final mandelbrot fractal image. It shows a usage
# of taskflow to perform a well-known embarrassingly parallel problem that has
# the added benefit of also being an elegant visualization.
#
# NOTE(harlowja): this example simulates the expected larger number of workers
# by using a set of threads (which in this example simulate the remote workers
# that would typically be running on other external machines).
#
# NOTE(harlowja): to have it produce an image run (after installing pillow):
#
# $ python taskflow/examples/wbe_mandelbrot.py output.png
BASE_SHARED_CONF = {
'exchange': 'taskflow',
}
WORKERS = 2
WORKER_CONF = {
# These are the tasks the worker can execute, they *must* be importable,
# typically this list is used to restrict what workers may execute to
# a smaller set of *allowed* tasks that are known to be safe (one would
# not want to allow all python code to be executed).
'tasks': [
'%s:MandelCalculator' % (__name__),
],
}
ENGINE_CONF = {
'engine': 'worker-based',
}
# Mandelbrot & image settings...
IMAGE_SIZE = (512, 512)
CHUNK_COUNT = 8
MAX_ITERATIONS = 25
class MandelCalculator(task.Task):
def execute(self, image_config, mandelbrot_config, chunk):
"""Returns the number of iterations before the computation "escapes".
Given the real and imaginary parts of a complex number, determine if it
is a candidate for membership in the mandelbrot set given a fixed
number of iterations.
"""
# Parts borrowed from (credit to mark harris and benoît mandelbrot).
#
# http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
def mandelbrot(x, y, max_iters):
c = complex(x, y)
z = 0.0j
for i in range(max_iters):
z = z * z + c
if (z.real * z.real + z.imag * z.imag) >= 4:
return i
return max_iters
min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
height, width = image_config['size']
pixel_size_x = (max_x - min_x) / width
pixel_size_y = (max_y - min_y) / height
block = []
for y in range(chunk[0], chunk[1]):
row = []
imag = min_y + y * pixel_size_y
for x in range(0, width):
real = min_x + x * pixel_size_x
row.append(mandelbrot(real, imag, max_iters))
block.append(row)
return block
def calculate(engine_conf):
# Subdivide the work into X pieces, then request each worker to calculate
# one of those chunks and then later we will write these chunks out to
# an image bitmap file.
# And unordered flow is used here since the mandelbrot calculation is an
# example of an embarrassingly parallel computation that we can scatter
# across as many workers as possible.
flow = uf.Flow("mandelbrot")
# These symbols will be automatically given to tasks as input to their
# execute method, in this case these are constants used in the mandelbrot
# calculation.
store = {
'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
'image_config': {
'size': IMAGE_SIZE,
}
}
# We need the task names to be in the right order so that we can extract
# the final results in the right order (we don't care about the order when
# executing).
task_names = []
# Compose our workflow.
height, _width = IMAGE_SIZE
chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
for i in range(0, CHUNK_COUNT):
chunk_name = 'chunk_%s' % i
task_name = "calculation_%s" % i
# Break the calculation up into chunk size pieces.
rows = [i * chunk_size, i * chunk_size + chunk_size]
flow.add(
MandelCalculator(task_name,
# This ensures the storage symbol with name
# 'chunk_name' is sent into the tasks local
# symbol 'chunk'. This is how we give each
# calculator its own correct sequence of rows
# to work on.
rebind={'chunk': chunk_name}))
store[chunk_name] = rows
task_names.append(task_name)
# Now execute it.
eng = engines.load(flow, store=store, engine_conf=engine_conf)
eng.run()
# Gather all the results and order them for further processing.
gather = []
for name in task_names:
gather.extend(eng.storage.get(name))
points = []
for y, row in enumerate(gather):
for x, color in enumerate(row):
points.append(((x, y), color))
return points
def write_image(results, output_filename=None):
print("Gathered %s results that represents a mandelbrot"
" image (using %s chunks that are computed jointly"
" by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
if not output_filename:
return
# Pillow (the PIL fork) saves us from writing our own image writer...
try:
from PIL import Image
except ImportError as e:
# To currently get this (may change in the future),
# $ pip install Pillow
raise RuntimeError("Pillow is required to write image files: %s" % e)
# Limit to 255, find the max and normalize to that...
color_max = 0
for _point, color in results:
color_max = max(color, color_max)
# Use gray scale since we don't really have other colors.
img = Image.new('L', IMAGE_SIZE, "black")
pixels = img.load()
for (x, y), color in results:
if color_max == 0:
color = 0
else:
color = int((float(color) / color_max) * 255.0)
pixels[x, y] = color
img.save(output_filename)
def create_fractal():
logging.basicConfig(level=logging.ERROR)
# Setup our transport configuration and merge it into the worker and
# engine configuration so that both of those use it correctly.
shared_conf = dict(BASE_SHARED_CONF)
shared_conf.update({
'transport': 'memory',
'transport_options': {
'polling_interval': 0.1,
},
})
if len(sys.argv) >= 2:
output_filename = sys.argv[1]
else:
output_filename = None
worker_conf = dict(WORKER_CONF)
worker_conf.update(shared_conf)
engine_conf = dict(ENGINE_CONF)
engine_conf.update(shared_conf)
workers = []
worker_topics = []
print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
try:
# Create a set of workers to simulate actual remote workers.
print('Running %s workers.' % (WORKERS))
for i in range(0, WORKERS):
worker_conf['topic'] = 'calculator_%s' % (i + 1)
worker_topics.append(worker_conf['topic'])
w = worker.Worker(**worker_conf)
runner = threading_utils.daemon_thread(w.run)
runner.start()
w.wait()
workers.append((runner, w.stop))
# Now use those workers to do something.
engine_conf['topics'] = worker_topics
results = calculate(engine_conf)
print('Execution finished.')
finally:
# And cleanup.
print('Stopping workers.')
while workers:
r, stopper = workers.pop()
stopper()
r.join()
print("Writing image...")
write_image(results, output_filename=output_filename)
if __name__ == "__main__":
create_fractal()