Merge "Boot fake vm example fixed"
This commit is contained in:
		@@ -5,9 +5,6 @@ import sys
 | 
			
		||||
import time
 | 
			
		||||
import uuid
 | 
			
		||||
 | 
			
		||||
print('GraphFlow is under refactoring now, so this example '
 | 
			
		||||
      'is temporarily broken')
 | 
			
		||||
sys.exit(0)
 | 
			
		||||
 | 
			
		||||
logging.basicConfig(level=logging.ERROR)
 | 
			
		||||
 | 
			
		||||
@@ -15,6 +12,7 @@ my_dir_path = os.path.dirname(os.path.abspath(__file__))
 | 
			
		||||
sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
 | 
			
		||||
                                os.pardir))
 | 
			
		||||
 | 
			
		||||
from taskflow.engines.action_engine import engine as eng
 | 
			
		||||
from taskflow.patterns import graph_flow as gf
 | 
			
		||||
from taskflow import task
 | 
			
		||||
 | 
			
		||||
@@ -32,12 +30,12 @@ MY_DB = {
 | 
			
		||||
 | 
			
		||||
# This prints out the transitions a flow is going through.
 | 
			
		||||
def flow_notify(state, details):
 | 
			
		||||
    print("'%s' entered state: %s" % (details['flow'], state))
 | 
			
		||||
    print("'%s' entered state: %s" % (details.get('flow_name'), state))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# This prints out the transitions a flows tasks are going through.
 | 
			
		||||
def task_notify(state, details):
 | 
			
		||||
    print("'%s' entered state: %s" % (details['runner'], state))
 | 
			
		||||
    print("'%s' entered state: %s" % (details.get('task_name'), state))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Simulates what nova/glance/keystone... calls a context
 | 
			
		||||
@@ -53,30 +51,26 @@ class Context(object):
 | 
			
		||||
# that will later be acted upon.
 | 
			
		||||
class ValidateAPIInputs(task.Task):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        super(ValidateAPIInputs, self).__init__('validates-api-inputs')
 | 
			
		||||
        self.provides.update(['vm_spec'])
 | 
			
		||||
        super(ValidateAPIInputs, self).__init__('validates-api-inputs',
 | 
			
		||||
                                                provides='vm_spec')
 | 
			
		||||
 | 
			
		||||
    def execute(self, context):
 | 
			
		||||
        print "Validating api inputs for %s" % (context)
 | 
			
		||||
        return {
 | 
			
		||||
            'vm_spec': {
 | 
			
		||||
                'cpus': 1,
 | 
			
		||||
        return {'cpus': 1,
 | 
			
		||||
                'memory': 512,
 | 
			
		||||
                'disk': 100,
 | 
			
		||||
                }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Simulates reserving the space for a vm and associating the vm to be with
 | 
			
		||||
# a unique identifier.
 | 
			
		||||
class PeformReservation(task.Task):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        super(PeformReservation, self).__init__('reserve-vm')
 | 
			
		||||
        self.provides.update(['vm_reservation_spec'])
 | 
			
		||||
        self.requires.update(['vm_spec'])
 | 
			
		||||
        super(PeformReservation, self).__init__('reserve-vm',
 | 
			
		||||
                                                provides='vm_reservation_spec')
 | 
			
		||||
 | 
			
		||||
    def revert(self, context, result, cause):
 | 
			
		||||
        reserved_spec = result['vm_reservation_spec']
 | 
			
		||||
    def revert(self, context, vm_spec, result):
 | 
			
		||||
        reserved_spec = result
 | 
			
		||||
        print("Undoing reservation of %s" % (reserved_spec['uuid']))
 | 
			
		||||
        vm_spec = MY_DB['vms'].pop(reserved_spec['uuid'])
 | 
			
		||||
        print 'Space before: %s' % (MY_DB['space'])
 | 
			
		||||
@@ -84,7 +78,7 @@ class PeformReservation(task.Task):
 | 
			
		||||
        for (k, v) in vm_spec.items():
 | 
			
		||||
            if k in ['scheduled']:
 | 
			
		||||
                continue
 | 
			
		||||
            MY_DB['space'][k] += vm_spec[k]
 | 
			
		||||
            MY_DB['space'][k] += v
 | 
			
		||||
        print 'Space after: %s' % (MY_DB['space'])
 | 
			
		||||
 | 
			
		||||
    def execute(self, context, vm_spec):
 | 
			
		||||
@@ -92,7 +86,7 @@ class PeformReservation(task.Task):
 | 
			
		||||
        # Reserve 'atomically'
 | 
			
		||||
        print 'Space before: %s' % (MY_DB['space'])
 | 
			
		||||
        for (k, v) in vm_spec.items():
 | 
			
		||||
            if MY_DB['space'][k] < vm_spec[k]:
 | 
			
		||||
            if MY_DB['space'][k] < v:
 | 
			
		||||
                raise RuntimeError("Not enough %s available" % (k))
 | 
			
		||||
            MY_DB['space'][k] -= vm_spec[k]
 | 
			
		||||
        print 'Space after: %s' % (MY_DB['space'])
 | 
			
		||||
@@ -100,25 +94,21 @@ class PeformReservation(task.Task):
 | 
			
		||||
        vm_uuid = str(uuid.uuid4())
 | 
			
		||||
        MY_DB['vms'][vm_uuid] = vm_spec
 | 
			
		||||
        MY_DB['vms'][vm_uuid]['scheduled'] = False
 | 
			
		||||
        return {
 | 
			
		||||
            'vm_reservation_spec': {
 | 
			
		||||
                'uuid': vm_uuid,
 | 
			
		||||
        return {'uuid': vm_uuid,
 | 
			
		||||
                'reserved_on': time.time(),
 | 
			
		||||
                'vm_spec': vm_spec,
 | 
			
		||||
                }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Simulates scheudling a vm to some location
 | 
			
		||||
class ScheduleVM(task.Task):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        super(ScheduleVM, self).__init__('find-hole-for-vm')
 | 
			
		||||
        self.provides.update(['vm_hole'])
 | 
			
		||||
        self.requires.update(['vm_reservation_spec'])
 | 
			
		||||
        super(ScheduleVM, self).__init__('find-hole-for-vm',
 | 
			
		||||
                                         provides=['vm_hole', 'vm_uuid'])
 | 
			
		||||
 | 
			
		||||
    def revert(self, context, result, cause):
 | 
			
		||||
        vm_uuid = result['vm_uuid']
 | 
			
		||||
        vm_place = result['vm_hole']
 | 
			
		||||
    def revert(self, context, vm_reservation_spec, result):
 | 
			
		||||
        vm_uuid = result[1]
 | 
			
		||||
        vm_place = result[0]
 | 
			
		||||
        print "Marking %s as not having a home at %s anymore" % (vm_uuid,
 | 
			
		||||
                                                                 vm_place)
 | 
			
		||||
        MY_DB['vms'][vm_uuid]['scheduled'] = False
 | 
			
		||||
@@ -132,35 +122,32 @@ class ScheduleVM(task.Task):
 | 
			
		||||
        vm_place = random.choice(MY_DB['places'])
 | 
			
		||||
        print 'Placing %s at %s' % (vm_uuid, vm_place)
 | 
			
		||||
        MY_DB['places'].remove(vm_place)
 | 
			
		||||
        return {
 | 
			
		||||
            'vm_hole': vm_place,
 | 
			
		||||
            'vm_uuid': vm_uuid,
 | 
			
		||||
        }
 | 
			
		||||
        return vm_place, vm_uuid
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Fail booting a vm to see what happens.
 | 
			
		||||
class BootVM(task.Task):
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        super(BootVM, self).__init__('boot-vm')
 | 
			
		||||
        self.provides.update(['vm_booted'])
 | 
			
		||||
        self.requires.update(['vm_reservation_spec', 'vm_hole'])
 | 
			
		||||
        super(BootVM, self).__init__('boot-vm', provides='vm_booted')
 | 
			
		||||
 | 
			
		||||
    def execute(self, context, vm_reservation_spec, vm_hole):
 | 
			
		||||
        raise RuntimeError("Failed booting")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Lets try booting a vm (not really) and see how the reversions work.
 | 
			
		||||
flo = gf.Flow("Boot-Fake-Vm")
 | 
			
		||||
flo.add(ValidateAPIInputs())
 | 
			
		||||
flo.add(PeformReservation())
 | 
			
		||||
flo.add(ScheduleVM())
 | 
			
		||||
flo.add(BootVM())
 | 
			
		||||
flow = gf.Flow("Boot-Fake-Vm").add(
 | 
			
		||||
    ValidateAPIInputs(),
 | 
			
		||||
    PeformReservation(),
 | 
			
		||||
    ScheduleVM(),
 | 
			
		||||
    BootVM())
 | 
			
		||||
 | 
			
		||||
engine = eng.SingleThreadedActionEngine(flow)
 | 
			
		||||
 | 
			
		||||
# Get notified of the state changes the flow is going through.
 | 
			
		||||
flo.notifier.register('*', flow_notify)
 | 
			
		||||
engine.notifier.register('*', flow_notify)
 | 
			
		||||
 | 
			
		||||
# Get notified of the state changes the flows tasks/runners are going through.
 | 
			
		||||
flo.task_notifier.register('*', task_notify)
 | 
			
		||||
engine.task_notifier.register('*', task_notify)
 | 
			
		||||
 | 
			
		||||
# Simulates what nova/glance/keystone... calls a context
 | 
			
		||||
context = {
 | 
			
		||||
@@ -170,18 +157,19 @@ context = {
 | 
			
		||||
}
 | 
			
		||||
context = Context(**context)
 | 
			
		||||
 | 
			
		||||
engine.storage.inject({'context': context})
 | 
			
		||||
 | 
			
		||||
print '-' * 7
 | 
			
		||||
print 'Running'
 | 
			
		||||
print '-' * 7
 | 
			
		||||
try:
 | 
			
		||||
    flo.run(context)
 | 
			
		||||
    engine.run()
 | 
			
		||||
except Exception as e:
 | 
			
		||||
    print 'Flow failed: %r' % e
 | 
			
		||||
 | 
			
		||||
print '-- Flow state %s' % (flo.state)
 | 
			
		||||
 | 
			
		||||
print '-' * 11
 | 
			
		||||
print 'All results'
 | 
			
		||||
print '-' * 11
 | 
			
		||||
for (tid, v) in sorted(flo.results.items()):
 | 
			
		||||
    print '%s => %s' % (tid, v)
 | 
			
		||||
result = engine.storage.fetch_all()
 | 
			
		||||
for tid in sorted(result):
 | 
			
		||||
    print '%s => %s' % (tid, result[tid])
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user