diff --git a/doc/source/examples.rst b/doc/source/examples.rst index 9199bc11b..a6f0dd9fd 100644 --- a/doc/source/examples.rst +++ b/doc/source/examples.rst @@ -163,3 +163,26 @@ Distributed execution (simple) :language: python :linenos: :lines: 16- + +Distributed mandelbrot (complex) +================================ + +.. note:: + + Full source located at :example:`wbe_mandelbrot` + +Output +------ + +.. image:: img/mandelbrot.png + :height: 128px + :align: right + :alt: Generated mandelbrot fractal + +Code +---- + +.. literalinclude:: ../../taskflow/examples/wbe_mandelbrot.py + :language: python + :linenos: + :lines: 16- diff --git a/doc/source/img/mandelbrot.png b/doc/source/img/mandelbrot.png new file mode 100644 index 000000000..6dc26ee5e Binary files /dev/null and b/doc/source/img/mandelbrot.png differ diff --git a/taskflow/examples/wbe_mandelbrot.out.txt b/taskflow/examples/wbe_mandelbrot.out.txt new file mode 100644 index 000000000..3b5264143 --- /dev/null +++ b/taskflow/examples/wbe_mandelbrot.out.txt @@ -0,0 +1,6 @@ +Calculating your mandelbrot fractal of size 512x512. +Running 2 workers. +Execution finished. +Stopping workers. +Writing image... +Gathered 262144 results that represents a mandelbrot image (using 8 chunks that are computed jointly by 2 workers). diff --git a/taskflow/examples/wbe_mandelbrot.py b/taskflow/examples/wbe_mandelbrot.py new file mode 100644 index 000000000..55ca6e1a4 --- /dev/null +++ b/taskflow/examples/wbe_mandelbrot.py @@ -0,0 +1,254 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import math +import os +import sys +import threading + +import six + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +from taskflow import engines +from taskflow.engines.worker_based import worker +from taskflow.patterns import unordered_flow as uf +from taskflow import task + +# INTRO: This example walks through a workflow that will in parallel compute +# a mandelbrot result set (using X 'remote' workers) and then combine their +# results together to form a final mandelbrot fractal image. It shows a usage +# of taskflow to perform a well-known embarrassingly parallel problem that has +# the added benefit of also being an elegant visualization. +# +# NOTE(harlowja): this example simulates the expected larger number of workers +# by using a set of threads (which in this example simulate the remote workers +# that would typically be running on other external machines). +# +# NOTE(harlowja): to have it produce an image run (after installing pillow): +# +# $ python taskflow/examples/wbe_mandelbrot.py output.png + +BASE_SHARED_CONF = { + 'exchange': 'taskflow', +} +WORKERS = 2 +WORKER_CONF = { + # These are the tasks the worker can execute, they *must* be importable, + # typically this list is used to restrict what workers may execute to + # a smaller set of *allowed* tasks that are known to be safe (one would + # not want to allow all python code to be executed). + 'tasks': [ + '%s:MandelCalculator' % (__name__), + ], +} +ENGINE_CONF = { + 'engine': 'worker-based', +} + +# Mandelbrot & image settings... +IMAGE_SIZE = (512, 512) +CHUNK_COUNT = 8 +MAX_ITERATIONS = 25 + + +class MandelCalculator(task.Task): + def execute(self, image_config, mandelbrot_config, chunk): + """Returns the number of iterations before the computation "escapes". + + Given the real and imaginary parts of a complex number, determine if it + is a candidate for membership in the mandelbrot set given a fixed + number of iterations. + """ + + # Parts borrowed from (credit to mark harris and benoƮt mandelbrot). + # + # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43 + def mandelbrot(x, y, max_iters): + c = complex(x, y) + z = 0.0j + for i in six.moves.xrange(max_iters): + z = z * z + c + if (z.real * z.real + z.imag * z.imag) >= 4: + return i + return max_iters + + min_x, max_x, min_y, max_y, max_iters = mandelbrot_config + height, width = image_config['size'] + pixel_size_x = (max_x - min_x) / width + pixel_size_y = (max_y - min_y) / height + block = [] + for y in six.moves.xrange(chunk[0], chunk[1]): + row = [] + imag = min_y + y * pixel_size_y + for x in six.moves.xrange(0, width): + real = min_x + x * pixel_size_x + row.append(mandelbrot(real, imag, max_iters)) + block.append(row) + return block + + +def calculate(engine_conf): + # Subdivide the work into X pieces, then request each worker to calculate + # one of those chunks and then later we will write these chunks out to + # an image bitmap file. + + # And unordered flow is used here since the mandelbrot calculation is an + # example of a embarrassingly parallel computation that we can scatter + # across as many workers as possible. + flow = uf.Flow("mandelbrot") + + # These symbols will be automatically given to tasks as input to there + # execute method, in this case these are constants used in the mandelbrot + # calculation. + store = { + 'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS], + 'image_config': { + 'size': IMAGE_SIZE, + } + } + + # We need the task names to be in the right order so that we can extract + # the final results in the right order (we don't care about the order when + # executing). + task_names = [] + + # Compose our workflow. + height, width = IMAGE_SIZE + chunk_size = int(math.ceil(height / float(CHUNK_COUNT))) + for i in six.moves.xrange(0, CHUNK_COUNT): + chunk_name = 'chunk_%s' % i + task_name = "calculation_%s" % i + # Break the calculation up into chunk size pieces. + rows = [i * chunk_size, i * chunk_size + chunk_size] + flow.add( + MandelCalculator(task_name, + # This ensures the storage symbol with name + # 'chunk_name' is sent into the tasks local + # symbol 'chunk'. This is how we give each + # calculator its own correct sequence of rows + # to work on. + rebind={'chunk': chunk_name})) + store[chunk_name] = rows + task_names.append(task_name) + + # Now execute it. + eng = engines.load(flow, store=store, engine_conf=engine_conf) + eng.run() + + # Gather all the results and order them for further processing. + gather = [] + for name in task_names: + gather.extend(eng.storage.get(name)) + points = [] + for y, row in enumerate(gather): + for x, color in enumerate(row): + points.append(((x, y), color)) + return points + + +def write_image(results, output_filename=None): + print("Gathered %s results that represents a mandelbrot" + " image (using %s chunks that are computed jointly" + " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS)) + if not output_filename: + return + + # Pillow (the PIL fork) saves us from writing our own image writer... + try: + from PIL import Image + except ImportError as e: + # To currently get this (may change in the future), + # $ pip install Pillow + raise RuntimeError("Pillow is required to write image files: %s" % e) + + # Limit to 255, find the max and normalize to that... + color_max = 0 + for _point, color in results: + color_max = max(color, color_max) + + # Use gray scale since we don't really have other colors. + img = Image.new('L', IMAGE_SIZE, "black") + pixels = img.load() + for (x, y), color in results: + if color_max == 0: + color = 0 + else: + color = int((float(color) / color_max) * 255.0) + pixels[x, y] = color + img.save(output_filename) + + +def create_fractal(): + logging.basicConfig(level=logging.ERROR) + + # Setup our transport configuration and merge it into the worker and + # engine configuration so that both of those use it correctly. + shared_conf = dict(BASE_SHARED_CONF) + shared_conf.update({ + 'transport': 'memory', + 'transport_options': { + 'polling_interval': 0.1, + }, + }) + + if len(sys.argv) >= 2: + output_filename = sys.argv[1] + else: + output_filename = None + + worker_conf = dict(WORKER_CONF) + worker_conf.update(shared_conf) + engine_conf = dict(ENGINE_CONF) + engine_conf.update(shared_conf) + workers = [] + worker_topics = [] + + print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE) + try: + # Create a set of workers to simulate actual remote workers. + print('Running %s workers.' % (WORKERS)) + for i in range(0, WORKERS): + worker_conf['topic'] = 'calculator_%s' % (i + 1) + worker_topics.append(worker_conf['topic']) + w = worker.Worker(**worker_conf) + runner = threading.Thread(target=w.run) + runner.daemon = True + runner.start() + w.wait() + workers.append((runner, w.stop)) + + # Now use those workers to do something. + engine_conf['topics'] = worker_topics + results = calculate(engine_conf) + print('Execution finished.') + finally: + # And cleanup. + print('Stopping workers.') + while workers: + r, stopper = workers.pop() + stopper() + r.join() + print("Writing image...") + write_image(results, output_filename=output_filename) + + +if __name__ == "__main__": + create_fractal()