Merge "Add a mandelbrot parallel calculation WBE example"
This commit is contained in:
@@ -175,3 +175,26 @@ Distributed execution (simple)
|
||||
:language: python
|
||||
:linenos:
|
||||
:lines: 16-
|
||||
|
||||
Distributed mandelbrot (complex)
|
||||
================================
|
||||
|
||||
.. note::
|
||||
|
||||
Full source located at :example:`wbe_mandelbrot`
|
||||
|
||||
Output
|
||||
------
|
||||
|
||||
.. image:: img/mandelbrot.png
|
||||
:height: 128px
|
||||
:align: right
|
||||
:alt: Generated mandelbrot fractal
|
||||
|
||||
Code
|
||||
----
|
||||
|
||||
.. literalinclude:: ../../taskflow/examples/wbe_mandelbrot.py
|
||||
:language: python
|
||||
:linenos:
|
||||
:lines: 16-
|
||||
|
||||
BIN
doc/source/img/mandelbrot.png
Normal file
BIN
doc/source/img/mandelbrot.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 22 KiB |
6
taskflow/examples/wbe_mandelbrot.out.txt
Normal file
6
taskflow/examples/wbe_mandelbrot.out.txt
Normal file
@@ -0,0 +1,6 @@
|
||||
Calculating your mandelbrot fractal of size 512x512.
|
||||
Running 2 workers.
|
||||
Execution finished.
|
||||
Stopping workers.
|
||||
Writing image...
|
||||
Gathered 262144 results that represents a mandelbrot image (using 8 chunks that are computed jointly by 2 workers).
|
||||
254
taskflow/examples/wbe_mandelbrot.py
Normal file
254
taskflow/examples/wbe_mandelbrot.py
Normal file
@@ -0,0 +1,254 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
|
||||
import six
|
||||
|
||||
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
os.pardir,
|
||||
os.pardir))
|
||||
sys.path.insert(0, top_dir)
|
||||
|
||||
from taskflow import engines
|
||||
from taskflow.engines.worker_based import worker
|
||||
from taskflow.patterns import unordered_flow as uf
|
||||
from taskflow import task
|
||||
|
||||
# INTRO: This example walks through a workflow that will in parallel compute
|
||||
# a mandelbrot result set (using X 'remote' workers) and then combine their
|
||||
# results together to form a final mandelbrot fractal image. It shows a usage
|
||||
# of taskflow to perform a well-known embarrassingly parallel problem that has
|
||||
# the added benefit of also being an elegant visualization.
|
||||
#
|
||||
# NOTE(harlowja): this example simulates the expected larger number of workers
|
||||
# by using a set of threads (which in this example simulate the remote workers
|
||||
# that would typically be running on other external machines).
|
||||
#
|
||||
# NOTE(harlowja): to have it produce an image run (after installing pillow):
|
||||
#
|
||||
# $ python taskflow/examples/wbe_mandelbrot.py output.png
|
||||
|
||||
BASE_SHARED_CONF = {
|
||||
'exchange': 'taskflow',
|
||||
}
|
||||
WORKERS = 2
|
||||
WORKER_CONF = {
|
||||
# These are the tasks the worker can execute, they *must* be importable,
|
||||
# typically this list is used to restrict what workers may execute to
|
||||
# a smaller set of *allowed* tasks that are known to be safe (one would
|
||||
# not want to allow all python code to be executed).
|
||||
'tasks': [
|
||||
'%s:MandelCalculator' % (__name__),
|
||||
],
|
||||
}
|
||||
ENGINE_CONF = {
|
||||
'engine': 'worker-based',
|
||||
}
|
||||
|
||||
# Mandelbrot & image settings...
|
||||
IMAGE_SIZE = (512, 512)
|
||||
CHUNK_COUNT = 8
|
||||
MAX_ITERATIONS = 25
|
||||
|
||||
|
||||
class MandelCalculator(task.Task):
|
||||
def execute(self, image_config, mandelbrot_config, chunk):
|
||||
"""Returns the number of iterations before the computation "escapes".
|
||||
|
||||
Given the real and imaginary parts of a complex number, determine if it
|
||||
is a candidate for membership in the mandelbrot set given a fixed
|
||||
number of iterations.
|
||||
"""
|
||||
|
||||
# Parts borrowed from (credit to mark harris and benoît mandelbrot).
|
||||
#
|
||||
# http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
|
||||
def mandelbrot(x, y, max_iters):
|
||||
c = complex(x, y)
|
||||
z = 0.0j
|
||||
for i in six.moves.xrange(max_iters):
|
||||
z = z * z + c
|
||||
if (z.real * z.real + z.imag * z.imag) >= 4:
|
||||
return i
|
||||
return max_iters
|
||||
|
||||
min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
|
||||
height, width = image_config['size']
|
||||
pixel_size_x = (max_x - min_x) / width
|
||||
pixel_size_y = (max_y - min_y) / height
|
||||
block = []
|
||||
for y in six.moves.xrange(chunk[0], chunk[1]):
|
||||
row = []
|
||||
imag = min_y + y * pixel_size_y
|
||||
for x in six.moves.xrange(0, width):
|
||||
real = min_x + x * pixel_size_x
|
||||
row.append(mandelbrot(real, imag, max_iters))
|
||||
block.append(row)
|
||||
return block
|
||||
|
||||
|
||||
def calculate(engine_conf):
|
||||
# Subdivide the work into X pieces, then request each worker to calculate
|
||||
# one of those chunks and then later we will write these chunks out to
|
||||
# an image bitmap file.
|
||||
|
||||
# And unordered flow is used here since the mandelbrot calculation is an
|
||||
# example of a embarrassingly parallel computation that we can scatter
|
||||
# across as many workers as possible.
|
||||
flow = uf.Flow("mandelbrot")
|
||||
|
||||
# These symbols will be automatically given to tasks as input to there
|
||||
# execute method, in this case these are constants used in the mandelbrot
|
||||
# calculation.
|
||||
store = {
|
||||
'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
|
||||
'image_config': {
|
||||
'size': IMAGE_SIZE,
|
||||
}
|
||||
}
|
||||
|
||||
# We need the task names to be in the right order so that we can extract
|
||||
# the final results in the right order (we don't care about the order when
|
||||
# executing).
|
||||
task_names = []
|
||||
|
||||
# Compose our workflow.
|
||||
height, width = IMAGE_SIZE
|
||||
chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
|
||||
for i in six.moves.xrange(0, CHUNK_COUNT):
|
||||
chunk_name = 'chunk_%s' % i
|
||||
task_name = "calculation_%s" % i
|
||||
# Break the calculation up into chunk size pieces.
|
||||
rows = [i * chunk_size, i * chunk_size + chunk_size]
|
||||
flow.add(
|
||||
MandelCalculator(task_name,
|
||||
# This ensures the storage symbol with name
|
||||
# 'chunk_name' is sent into the tasks local
|
||||
# symbol 'chunk'. This is how we give each
|
||||
# calculator its own correct sequence of rows
|
||||
# to work on.
|
||||
rebind={'chunk': chunk_name}))
|
||||
store[chunk_name] = rows
|
||||
task_names.append(task_name)
|
||||
|
||||
# Now execute it.
|
||||
eng = engines.load(flow, store=store, engine_conf=engine_conf)
|
||||
eng.run()
|
||||
|
||||
# Gather all the results and order them for further processing.
|
||||
gather = []
|
||||
for name in task_names:
|
||||
gather.extend(eng.storage.get(name))
|
||||
points = []
|
||||
for y, row in enumerate(gather):
|
||||
for x, color in enumerate(row):
|
||||
points.append(((x, y), color))
|
||||
return points
|
||||
|
||||
|
||||
def write_image(results, output_filename=None):
|
||||
print("Gathered %s results that represents a mandelbrot"
|
||||
" image (using %s chunks that are computed jointly"
|
||||
" by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
|
||||
if not output_filename:
|
||||
return
|
||||
|
||||
# Pillow (the PIL fork) saves us from writing our own image writer...
|
||||
try:
|
||||
from PIL import Image
|
||||
except ImportError as e:
|
||||
# To currently get this (may change in the future),
|
||||
# $ pip install Pillow
|
||||
raise RuntimeError("Pillow is required to write image files: %s" % e)
|
||||
|
||||
# Limit to 255, find the max and normalize to that...
|
||||
color_max = 0
|
||||
for _point, color in results:
|
||||
color_max = max(color, color_max)
|
||||
|
||||
# Use gray scale since we don't really have other colors.
|
||||
img = Image.new('L', IMAGE_SIZE, "black")
|
||||
pixels = img.load()
|
||||
for (x, y), color in results:
|
||||
if color_max == 0:
|
||||
color = 0
|
||||
else:
|
||||
color = int((float(color) / color_max) * 255.0)
|
||||
pixels[x, y] = color
|
||||
img.save(output_filename)
|
||||
|
||||
|
||||
def create_fractal():
|
||||
logging.basicConfig(level=logging.ERROR)
|
||||
|
||||
# Setup our transport configuration and merge it into the worker and
|
||||
# engine configuration so that both of those use it correctly.
|
||||
shared_conf = dict(BASE_SHARED_CONF)
|
||||
shared_conf.update({
|
||||
'transport': 'memory',
|
||||
'transport_options': {
|
||||
'polling_interval': 0.1,
|
||||
},
|
||||
})
|
||||
|
||||
if len(sys.argv) >= 2:
|
||||
output_filename = sys.argv[1]
|
||||
else:
|
||||
output_filename = None
|
||||
|
||||
worker_conf = dict(WORKER_CONF)
|
||||
worker_conf.update(shared_conf)
|
||||
engine_conf = dict(ENGINE_CONF)
|
||||
engine_conf.update(shared_conf)
|
||||
workers = []
|
||||
worker_topics = []
|
||||
|
||||
print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
|
||||
try:
|
||||
# Create a set of workers to simulate actual remote workers.
|
||||
print('Running %s workers.' % (WORKERS))
|
||||
for i in range(0, WORKERS):
|
||||
worker_conf['topic'] = 'calculator_%s' % (i + 1)
|
||||
worker_topics.append(worker_conf['topic'])
|
||||
w = worker.Worker(**worker_conf)
|
||||
runner = threading.Thread(target=w.run)
|
||||
runner.daemon = True
|
||||
runner.start()
|
||||
w.wait()
|
||||
workers.append((runner, w.stop))
|
||||
|
||||
# Now use those workers to do something.
|
||||
engine_conf['topics'] = worker_topics
|
||||
results = calculate(engine_conf)
|
||||
print('Execution finished.')
|
||||
finally:
|
||||
# And cleanup.
|
||||
print('Stopping workers.')
|
||||
while workers:
|
||||
r, stopper = workers.pop()
|
||||
stopper()
|
||||
r.join()
|
||||
print("Writing image...")
|
||||
write_image(results, output_filename=output_filename)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
create_fractal()
|
||||
Reference in New Issue
Block a user