diff --git a/taskflow/examples/fake_boot_vm.py b/taskflow/examples/fake_boot_vm.py index a64e0737..e14e63a6 100644 --- a/taskflow/examples/fake_boot_vm.py +++ b/taskflow/examples/fake_boot_vm.py @@ -5,9 +5,6 @@ import sys import time import uuid -print('GraphFlow is under refactoring now, so this example ' - 'is temporarily broken') -sys.exit(0) logging.basicConfig(level=logging.ERROR) @@ -15,6 +12,7 @@ my_dir_path = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), os.pardir)) +from taskflow.engines.action_engine import engine as eng from taskflow.patterns import graph_flow as gf from taskflow import task @@ -32,12 +30,12 @@ MY_DB = { # This prints out the transitions a flow is going through. def flow_notify(state, details): - print("'%s' entered state: %s" % (details['flow'], state)) + print("'%s' entered state: %s" % (details.get('flow_name'), state)) # This prints out the transitions a flows tasks are going through. def task_notify(state, details): - print("'%s' entered state: %s" % (details['runner'], state)) + print("'%s' entered state: %s" % (details.get('task_name'), state)) # Simulates what nova/glance/keystone... calls a context @@ -53,30 +51,26 @@ class Context(object): # that will later be acted upon. class ValidateAPIInputs(task.Task): def __init__(self): - super(ValidateAPIInputs, self).__init__('validates-api-inputs') - self.provides.update(['vm_spec']) + super(ValidateAPIInputs, self).__init__('validates-api-inputs', + provides='vm_spec') def execute(self, context): print "Validating api inputs for %s" % (context) - return { - 'vm_spec': { - 'cpus': 1, + return {'cpus': 1, 'memory': 512, 'disk': 100, - } - } + } # Simulates reserving the space for a vm and associating the vm to be with # a unique identifier. class PeformReservation(task.Task): def __init__(self): - super(PeformReservation, self).__init__('reserve-vm') - self.provides.update(['vm_reservation_spec']) - self.requires.update(['vm_spec']) + super(PeformReservation, self).__init__('reserve-vm', + provides='vm_reservation_spec') - def revert(self, context, result, cause): - reserved_spec = result['vm_reservation_spec'] + def revert(self, context, vm_spec, result): + reserved_spec = result print("Undoing reservation of %s" % (reserved_spec['uuid'])) vm_spec = MY_DB['vms'].pop(reserved_spec['uuid']) print 'Space before: %s' % (MY_DB['space']) @@ -84,7 +78,7 @@ class PeformReservation(task.Task): for (k, v) in vm_spec.items(): if k in ['scheduled']: continue - MY_DB['space'][k] += vm_spec[k] + MY_DB['space'][k] += v print 'Space after: %s' % (MY_DB['space']) def execute(self, context, vm_spec): @@ -92,7 +86,7 @@ class PeformReservation(task.Task): # Reserve 'atomically' print 'Space before: %s' % (MY_DB['space']) for (k, v) in vm_spec.items(): - if MY_DB['space'][k] < vm_spec[k]: + if MY_DB['space'][k] < v: raise RuntimeError("Not enough %s available" % (k)) MY_DB['space'][k] -= vm_spec[k] print 'Space after: %s' % (MY_DB['space']) @@ -100,25 +94,21 @@ class PeformReservation(task.Task): vm_uuid = str(uuid.uuid4()) MY_DB['vms'][vm_uuid] = vm_spec MY_DB['vms'][vm_uuid]['scheduled'] = False - return { - 'vm_reservation_spec': { - 'uuid': vm_uuid, + return {'uuid': vm_uuid, 'reserved_on': time.time(), 'vm_spec': vm_spec, - } - } + } # Simulates scheudling a vm to some location class ScheduleVM(task.Task): def __init__(self): - super(ScheduleVM, self).__init__('find-hole-for-vm') - self.provides.update(['vm_hole']) - self.requires.update(['vm_reservation_spec']) + super(ScheduleVM, self).__init__('find-hole-for-vm', + provides=['vm_hole', 'vm_uuid']) - def revert(self, context, result, cause): - vm_uuid = result['vm_uuid'] - vm_place = result['vm_hole'] + def revert(self, context, vm_reservation_spec, result): + vm_uuid = result[1] + vm_place = result[0] print "Marking %s as not having a home at %s anymore" % (vm_uuid, vm_place) MY_DB['vms'][vm_uuid]['scheduled'] = False @@ -132,35 +122,32 @@ class ScheduleVM(task.Task): vm_place = random.choice(MY_DB['places']) print 'Placing %s at %s' % (vm_uuid, vm_place) MY_DB['places'].remove(vm_place) - return { - 'vm_hole': vm_place, - 'vm_uuid': vm_uuid, - } + return vm_place, vm_uuid # Fail booting a vm to see what happens. class BootVM(task.Task): def __init__(self): - super(BootVM, self).__init__('boot-vm') - self.provides.update(['vm_booted']) - self.requires.update(['vm_reservation_spec', 'vm_hole']) + super(BootVM, self).__init__('boot-vm', provides='vm_booted') def execute(self, context, vm_reservation_spec, vm_hole): raise RuntimeError("Failed booting") # Lets try booting a vm (not really) and see how the reversions work. -flo = gf.Flow("Boot-Fake-Vm") -flo.add(ValidateAPIInputs()) -flo.add(PeformReservation()) -flo.add(ScheduleVM()) -flo.add(BootVM()) +flow = gf.Flow("Boot-Fake-Vm").add( + ValidateAPIInputs(), + PeformReservation(), + ScheduleVM(), + BootVM()) + +engine = eng.SingleThreadedActionEngine(flow) # Get notified of the state changes the flow is going through. -flo.notifier.register('*', flow_notify) +engine.notifier.register('*', flow_notify) # Get notified of the state changes the flows tasks/runners are going through. -flo.task_notifier.register('*', task_notify) +engine.task_notifier.register('*', task_notify) # Simulates what nova/glance/keystone... calls a context context = { @@ -170,18 +157,19 @@ context = { } context = Context(**context) +engine.storage.inject({'context': context}) + print '-' * 7 print 'Running' print '-' * 7 try: - flo.run(context) + engine.run() except Exception as e: print 'Flow failed: %r' % e -print '-- Flow state %s' % (flo.state) - print '-' * 11 print 'All results' print '-' * 11 -for (tid, v) in sorted(flo.results.items()): - print '%s => %s' % (tid, v) +result = engine.storage.fetch_all() +for tid in sorted(result): + print '%s => %s' % (tid, result[tid])