From 38cf1981b239a2759a571f27f15df8e5605ea57b Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 14 Oct 2013 16:47:33 -0700 Subject: [PATCH] Add a resuming booting vm example Change-Id: I903d55df4c574856e24242def8753f87c2094e9a --- taskflow/examples/fake_boot_vm.py | 196 -------------------- taskflow/examples/fake_boot_vm2.py | 91 --------- taskflow/examples/resume_vm_boot.py | 274 ++++++++++++++++++++++++++++ 3 files changed, 274 insertions(+), 287 deletions(-) delete mode 100644 taskflow/examples/fake_boot_vm.py delete mode 100644 taskflow/examples/fake_boot_vm2.py create mode 100644 taskflow/examples/resume_vm_boot.py diff --git a/taskflow/examples/fake_boot_vm.py b/taskflow/examples/fake_boot_vm.py deleted file mode 100644 index 9e6cbd12..00000000 --- a/taskflow/examples/fake_boot_vm.py +++ /dev/null @@ -1,196 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import os -import random -import sys -import time -import uuid - - -logging.basicConfig(level=logging.ERROR) - -top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), - os.pardir, - os.pardir)) -sys.path.insert(0, top_dir) - - -import taskflow.engines -from taskflow.patterns import graph_flow as gf -from taskflow import task - -# Simulates some type of persistance. -MY_DB = { - 'space': { - 'cpus': 2, - 'memory': 8192, - 'disk': 1024, - }, - 'vms': {}, - 'places': ['127.0.0.1', '127.0.0.2', '127.0.0.3'], -} - - -# This prints out the transitions a flow is going through. -def flow_notify(state, details): - print("'%s' entered state: %s" % (details.get('flow_name'), state)) - - -# This prints out the transitions a flows tasks are going through. -def task_notify(state, details): - print("'%s' entered state: %s" % (details.get('task_name'), state)) - - -# Simulates what nova/glance/keystone... calls a context -class Context(object): - def __init__(self, **items): - self.__dict__.update(items) - - def __str__(self): - return "Context: %s" % (self.__dict__) - - -# Simulates translating an api request into a validated format (aka a vm-spec) -# that will later be acted upon. -class ValidateAPIInputs(task.Task): - def __init__(self): - super(ValidateAPIInputs, self).__init__('validates-api-inputs', - provides='vm_spec') - - def execute(self, context): - print("Validating api inputs for %s" % (context)) - return {'cpus': 1, - 'memory': 512, - 'disk': 100, - } - - -# Simulates reserving the space for a vm and associating the vm to be with -# a unique identifier. -class PerformReservation(task.Task): - def __init__(self): - super(PerformReservation, self).__init__('reserve-vm', - provides='vm_reservation_' - 'spec') - - def revert(self, context, vm_spec, result): - reserved_spec = result - print("Undoing reservation of %s" % (reserved_spec['uuid'])) - vm_spec = MY_DB['vms'].pop(reserved_spec['uuid']) - print('Space before: %s' % (MY_DB['space'])) - # Unreserve 'atomically' - for (k, v) in vm_spec.items(): - if k in ['scheduled']: - continue - MY_DB['space'][k] += v - print('Space after: %s' % (MY_DB['space'])) - - def execute(self, context, vm_spec): - print('Reserving %s for %s' % (vm_spec, context)) - # Reserve 'atomically' - print('Space before: %s' % (MY_DB['space'])) - for (k, v) in vm_spec.items(): - if MY_DB['space'][k] < v: - raise RuntimeError("Not enough %s available" % (k)) - MY_DB['space'][k] -= vm_spec[k] - print('Space after: %s' % (MY_DB['space'])) - # Create a fake 'db' entry for the vm - vm_uuid = str(uuid.uuid4()) - MY_DB['vms'][vm_uuid] = vm_spec - MY_DB['vms'][vm_uuid]['scheduled'] = False - return {'uuid': vm_uuid, - 'reserved_on': time.time(), - 'vm_spec': vm_spec, - } - - -# Simulates scheudling a vm to some location -class ScheduleVM(task.Task): - def __init__(self): - super(ScheduleVM, self).__init__('find-hole-for-vm', - provides=['vm_hole', 'vm_uuid']) - - def revert(self, context, vm_reservation_spec, result): - vm_uuid = result[1] - vm_place = result[0] - print("Marking %s as not having a home at %s anymore" % (vm_uuid, - vm_place)) - MY_DB['vms'][vm_uuid]['scheduled'] = False - MY_DB['places'].append(vm_place) - - def execute(self, context, vm_reservation_spec): - print("Finding a place to put %s" % (vm_reservation_spec)) - vm_uuid = vm_reservation_spec['uuid'] - MY_DB['vms'][vm_uuid]['scheduled'] = True - # Reserve the place 'atomically' - vm_place = random.choice(MY_DB['places']) - print('Placing %s at %s' % (vm_uuid, vm_place)) - MY_DB['places'].remove(vm_place) - return vm_place, vm_uuid - - -# Fail booting a vm to see what happens. -class BootVM(task.Task): - def __init__(self): - super(BootVM, self).__init__('boot-vm', provides='vm_booted') - - def execute(self, context, vm_reservation_spec, vm_hole): - raise RuntimeError("Failed booting") - - -# Lets try booting a vm (not really) and see how the reversions work. -flow = gf.Flow("Boot-Fake-Vm").add( - ValidateAPIInputs(), - PerformReservation(), - ScheduleVM(), - BootVM()) - -# Simulates what nova/glance/keystone... calls a context -context = { - 'user_id': 'xyz', - 'project_id': 'abc', - 'is_admin': True, -} -context = Context(**context) - -# Load the flow -engine = taskflow.engines.load(flow, store={'context': context}) - -# Get notified of the state changes the flow is going through. -engine.notifier.register('*', flow_notify) - -# Get notified of the state changes the flows tasks/runners are going through. -engine.task_notifier.register('*', task_notify) - - -print('-' * 7) -print('Running') -print('-' * 7) -try: - engine.run() -except Exception as e: - print('Flow failed: %r' % e) - -print('-' * 11) -print('All results') -print('-' * 11) -result = engine.storage.fetch_all() -for tid in sorted(result): - print('%s => %s' % (tid, result[tid])) diff --git a/taskflow/examples/fake_boot_vm2.py b/taskflow/examples/fake_boot_vm2.py deleted file mode 100644 index 63d3e47a..00000000 --- a/taskflow/examples/fake_boot_vm2.py +++ /dev/null @@ -1,91 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import os -import random -import sys - -logging.basicConfig(level=logging.ERROR) - -top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), - os.pardir, - os.pardir)) -sys.path.insert(0, top_dir) - -from taskflow.patterns import graph_flow as gf -from taskflow.patterns import linear_flow as lf - -from taskflow import engines -from taskflow import task - - -class PrintText(task.Task): - def __init__(self, print_what): - super(PrintText, self).__init__(name="Print: %s" % print_what) - self._text = print_what - - def execute(self): - print("-" * (len(self._text))) - print(self._text) - print("-" * (len(self._text))) - - -class AllocateIP(task.Task): - def execute(self): - return "192.168.0.%s" % (random.randint(1, 254)) - - -class AllocateVolumes(task.Task): - def execute(self): - volumes = [] - for i in range(0, random.randint(0, 10)): - volumes.append("/dev/vda%s" % (i + 1)) - return volumes - - -class CreateVM(task.Task): - def execute(self, net_ip, volumes): - print("Making vm %s using ip %s" % (self.name, net_ip)) - if volumes: - print("With volumes:") - for v in volumes: - print(" - %s" % (v)) - - -flow = lf.Flow("root").add( - PrintText("Starting"), - gf.Flow('maker').add( - # First vm creation - AllocateVolumes("volumes-1", provides='volumes_1'), - AllocateIP("ip-1", provides='net_ip1'), - CreateVM("vm-1", rebind=['net_ip1', 'volumes_1']), - # Second vm creation - AllocateVolumes("volumes-2", provides='volumes_2'), - AllocateIP("ip-2", provides='net_ip2'), - CreateVM("vm-2", rebind=['net_ip2', 'volumes_2']) - ), - PrintText("Finished")) - - -# The above vms will be created in parallel, dependencies will be ran in order -# so that means the ip and volume creation for each vm will run before the -# final vm creation is done, but both vm creates will run at the same time. -# -# Pretty cool! -engines.run(flow, engine_conf='parallel') diff --git a/taskflow/examples/resume_vm_boot.py b/taskflow/examples/resume_vm_boot.py new file mode 100644 index 00000000..56535d83 --- /dev/null +++ b/taskflow/examples/resume_vm_boot.py @@ -0,0 +1,274 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import hashlib +import logging +import os +import random +import sys +import time + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +from taskflow.patterns import graph_flow as gf +from taskflow.patterns import linear_flow as lf + +from taskflow import engines +from taskflow import exceptions as exc +from taskflow import task + +from taskflow.persistence import backends +from taskflow.utils import eventlet_utils as e_utils +from taskflow.utils import persistence_utils as p_utils + + +@contextlib.contextmanager +def slow_down(how_long=0.5): + try: + yield how_long + finally: + if len(sys.argv) > 1: + # Only both to do this if user input provided. + print("** Ctrl-c me please!!! **") + time.sleep(how_long) + + +def print_wrapped(text): + print("-" * (len(text))) + print(text) + print("-" * (len(text))) + + +def get_backend(): + try: + backend_uri = sys.argv[1] + except Exception: + backend_uri = 'sqlite://' + + backend = backends.fetch({'connection': backend_uri}) + backend.get_connection().upgrade() + return backend + + +class PrintText(task.Task): + def __init__(self, print_what, no_slow=False): + content_hash = hashlib.md5(print_what).hexdigest()[0:8] + super(PrintText, self).__init__(name="Print: %s" % (content_hash)) + self._text = print_what + self._no_slow = no_slow + + def execute(self): + if self._no_slow: + print_wrapped(self._text) + else: + with slow_down(): + print_wrapped(self._text) + + +class DefineVMSpec(task.Task): + def __init__(self, name): + super(DefineVMSpec, self).__init__(provides='vm_spec', + name=name) + + def execute(self): + return { + 'type': 'kvm', + 'disks': 2, + 'vcpu': 1, + 'ips': 1, + 'volumes': 3, + } + + +class LocateImages(task.Task): + def __init__(self, name): + super(LocateImages, self).__init__(provides='image_locations', + name=name) + + def execute(self, vm_spec): + image_locations = {} + for i in range(0, vm_spec['disks']): + url = "http://www.yahoo.com/images/%s" % (i) + image_locations[url] = "/tmp/%s.img" % (i) + return image_locations + + +class DownloadImages(task.Task): + def __init__(self, name): + super(DownloadImages, self).__init__(provides='download_paths', + name=name) + + def execute(self, image_locations): + for src, loc in image_locations.items(): + with slow_down(1): + print("Downloading from %s => %s" % (src, loc)) + return sorted(image_locations.values()) + + +class FetchNetworkSettings(task.Task): + SYSCONFIG_CONTENTS = """DEVICE=eth%s +BOOTPROTO=static +IPADDR=%s +ONBOOT=yes""" + + def __init__(self, name): + super(FetchNetworkSettings, self).__init__(provides='network_settings', + name=name) + + def execute(self, ips): + settings = [] + for i, ip in enumerate(ips): + settings.append(self.SYSCONFIG_CONTENTS % (i, ip)) + return settings + + +class AllocateIP(task.Task): + def __init__(self, name): + super(AllocateIP, self).__init__(provides='ips', name=name) + + def execute(self, vm_spec): + ips = [] + for i in range(0, vm_spec.get('ips', 0)): + ips.append("192.168.0.%s" % (random.randint(1, 254))) + return ips + + +class WriteNetworkSettings(task.Task): + def execute(self, download_paths, network_settings): + for j, path in enumerate(download_paths): + with slow_down(1): + print("Mounting %s to /tmp/%s" % (path, j)) + for i, setting in enumerate(network_settings): + filename = ("/tmp/etc/sysconfig/network-scripts/" + "ifcfg-eth%s" % (i)) + with slow_down(1): + print("Writing to %s" % (filename)) + print(setting) + + +class BootVM(task.Task): + def execute(self, vm_spec): + print("Starting vm!") + with slow_down(1): + print("Created: %s" % (vm_spec)) + + +class AllocateVolumes(task.Task): + def execute(self, vm_spec): + volumes = [] + for i in range(0, vm_spec['volumes']): + with slow_down(1): + volumes.append("/dev/vda%s" % (i + 1)) + print("Allocated volume %s" % volumes[-1]) + return volumes + + +class FormatVolumes(task.Task): + def execute(self, volumes): + for v in volumes: + print("Formatting volume %s" % v) + with slow_down(1): + pass + print("Formatted volume %s" % v) + + +def create_flow(): + # Setup the set of things to do (mini-nova). + flow = lf.Flow("root").add( + PrintText("Starting vm creation.", no_slow=True), + lf.Flow('vm-maker').add( + # First create a specification for the final vm to-be. + DefineVMSpec("define_spec"), + # This does all the image stuff. + gf.Flow("img-maker").add( + LocateImages("locate_images"), + DownloadImages("download_images"), + ), + # This does all the network stuff. + gf.Flow("net-maker").add( + AllocateIP("get_my_ips"), + FetchNetworkSettings("fetch_net_settings"), + WriteNetworkSettings("write_net_settings"), + ), + # This does all the volume stuff. + gf.Flow("volume-maker").add( + AllocateVolumes("allocate_my_volumes", provides='volumes'), + FormatVolumes("volume_formatter"), + ), + # Finally boot it all. + BootVM("boot-it"), + ), + PrintText("Finished vm create.", no_slow=True), + PrintText("Instance is running!", no_slow=True)) + return flow + +print_wrapped("Initializing") + +# Setup the persistence & resumption layer. +backend = get_backend() +try: + book_id, flow_id = sys.argv[2].split("+", 1) +except (IndexError, ValueError): + book_id = None + flow_id = None + +# Set up how we want our engine to run, serial, parallel... +engine_conf = { + 'engine': 'parallel', + 'executor': e_utils.GreenExecutor(5), +} + +# Create/fetch a logbook that will track the workflows work. +book = None +flow_detail = None +if all([book_id, flow_id]): + with contextlib.closing(backend.get_connection()) as conn: + try: + book = conn.get_logbook(book_id) + flow_detail = book.find(flow_id) + except exc.NotFound: + pass +if book is None and flow_detail is None: + book = p_utils.temporary_log_book(backend) + engine = engines.load_from_factory(create_flow, + backend=backend, book=book, + engine_conf=engine_conf) + print("!! Your tracking id is: '%s+%s'" % (book.uuid, + engine.storage.flow_uuid)) + print("!! Please submit this on later runs for tracking purposes") +else: + engine = engines.load_from_detail(flow_detail, + backend=backend, engine_conf=engine_conf) + +print_wrapped('Running') +engine.run() + +# How to use. +# +# 1. $ python me.py "sqlite:////tmp/nova.db" +# 2. ctrl-c before this finishes +# 3. Find the tracking id (search for 'Your tracking id is') +# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id" +# 5. Watch it pick up where it left off. +# 6. Profit!