# -*- coding: utf-8 -*- # Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import logging import math import os import sys top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, top_dir) from taskflow import engines from taskflow.engines.worker_based import worker from taskflow.patterns import unordered_flow as uf from taskflow import task from taskflow.utils import threading_utils # INTRO: This example walks through a workflow that will in parallel compute # a mandelbrot result set (using X 'remote' workers) and then combine their # results together to form a final mandelbrot fractal image. It shows a usage # of taskflow to perform a well-known embarrassingly parallel problem that has # the added benefit of also being an elegant visualization. # # NOTE(harlowja): this example simulates the expected larger number of workers # by using a set of threads (which in this example simulate the remote workers # that would typically be running on other external machines). # # NOTE(harlowja): to have it produce an image run (after installing pillow): # # $ python taskflow/examples/wbe_mandelbrot.py output.png BASE_SHARED_CONF = { 'exchange': 'taskflow', } WORKERS = 2 WORKER_CONF = { # These are the tasks the worker can execute, they *must* be importable, # typically this list is used to restrict what workers may execute to # a smaller set of *allowed* tasks that are known to be safe (one would # not want to allow all python code to be executed). 'tasks': [ '%s:MandelCalculator' % (__name__), ], } ENGINE_CONF = { 'engine': 'worker-based', } # Mandelbrot & image settings... IMAGE_SIZE = (512, 512) CHUNK_COUNT = 8 MAX_ITERATIONS = 25 class MandelCalculator(task.Task): def execute(self, image_config, mandelbrot_config, chunk): """Returns the number of iterations before the computation "escapes". Given the real and imaginary parts of a complex number, determine if it is a candidate for membership in the mandelbrot set given a fixed number of iterations. """ # Parts borrowed from (credit to mark harris and benoƮt mandelbrot). # # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43 def mandelbrot(x, y, max_iters): c = complex(x, y) z = 0.0j for i in range(max_iters): z = z * z + c if (z.real * z.real + z.imag * z.imag) >= 4: return i return max_iters min_x, max_x, min_y, max_y, max_iters = mandelbrot_config height, width = image_config['size'] pixel_size_x = (max_x - min_x) / width pixel_size_y = (max_y - min_y) / height block = [] for y in range(chunk[0], chunk[1]): row = [] imag = min_y + y * pixel_size_y for x in range(0, width): real = min_x + x * pixel_size_x row.append(mandelbrot(real, imag, max_iters)) block.append(row) return block def calculate(engine_conf): # Subdivide the work into X pieces, then request each worker to calculate # one of those chunks and then later we will write these chunks out to # an image bitmap file. # And unordered flow is used here since the mandelbrot calculation is an # example of an embarrassingly parallel computation that we can scatter # across as many workers as possible. flow = uf.Flow("mandelbrot") # These symbols will be automatically given to tasks as input to their # execute method, in this case these are constants used in the mandelbrot # calculation. store = { 'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS], 'image_config': { 'size': IMAGE_SIZE, } } # We need the task names to be in the right order so that we can extract # the final results in the right order (we don't care about the order when # executing). task_names = [] # Compose our workflow. height, _width = IMAGE_SIZE chunk_size = int(math.ceil(height / float(CHUNK_COUNT))) for i in range(0, CHUNK_COUNT): chunk_name = 'chunk_%s' % i task_name = "calculation_%s" % i # Break the calculation up into chunk size pieces. rows = [i * chunk_size, i * chunk_size + chunk_size] flow.add( MandelCalculator(task_name, # This ensures the storage symbol with name # 'chunk_name' is sent into the tasks local # symbol 'chunk'. This is how we give each # calculator its own correct sequence of rows # to work on. rebind={'chunk': chunk_name})) store[chunk_name] = rows task_names.append(task_name) # Now execute it. eng = engines.load(flow, store=store, engine_conf=engine_conf) eng.run() # Gather all the results and order them for further processing. gather = [] for name in task_names: gather.extend(eng.storage.get(name)) points = [] for y, row in enumerate(gather): for x, color in enumerate(row): points.append(((x, y), color)) return points def write_image(results, output_filename=None): print("Gathered %s results that represents a mandelbrot" " image (using %s chunks that are computed jointly" " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS)) if not output_filename: return # Pillow (the PIL fork) saves us from writing our own image writer... try: from PIL import Image except ImportError as e: # To currently get this (may change in the future), # $ pip install Pillow raise RuntimeError("Pillow is required to write image files: %s" % e) # Limit to 255, find the max and normalize to that... color_max = 0 for _point, color in results: color_max = max(color, color_max) # Use gray scale since we don't really have other colors. img = Image.new('L', IMAGE_SIZE, "black") pixels = img.load() for (x, y), color in results: if color_max == 0: color = 0 else: color = int((float(color) / color_max) * 255.0) pixels[x, y] = color img.save(output_filename) def create_fractal(): logging.basicConfig(level=logging.ERROR) # Setup our transport configuration and merge it into the worker and # engine configuration so that both of those use it correctly. shared_conf = dict(BASE_SHARED_CONF) shared_conf.update({ 'transport': 'memory', 'transport_options': { 'polling_interval': 0.1, }, }) if len(sys.argv) >= 2: output_filename = sys.argv[1] else: output_filename = None worker_conf = dict(WORKER_CONF) worker_conf.update(shared_conf) engine_conf = dict(ENGINE_CONF) engine_conf.update(shared_conf) workers = [] worker_topics = [] print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE) try: # Create a set of workers to simulate actual remote workers. print('Running %s workers.' % (WORKERS)) for i in range(0, WORKERS): worker_conf['topic'] = 'calculator_%s' % (i + 1) worker_topics.append(worker_conf['topic']) w = worker.Worker(**worker_conf) runner = threading_utils.daemon_thread(w.run) runner.start() w.wait() workers.append((runner, w.stop)) # Now use those workers to do something. engine_conf['topics'] = worker_topics results = calculate(engine_conf) print('Execution finished.') finally: # And cleanup. print('Stopping workers.') while workers: r, stopper = workers.pop() stopper() r.join() print("Writing image...") write_image(results, output_filename=output_filename) if __name__ == "__main__": create_fractal()