diff --git a/nova/compute/disk.py b/nova/compute/disk.py index d3eeb951fe89..f6a716e658e1 100644 --- a/nova/compute/disk.py +++ b/nova/compute/disk.py @@ -1,3 +1,4 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright [2010] [Anso Labs, LLC] # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,14 +22,21 @@ import logging import os import tempfile -from nova.exception import Error -from nova.utils import execute +from nova import vendor +from twisted.internet import defer -def partition(infile, outfile, local_bytes=0, local_type='ext2'): - """Takes a single partition represented by infile and writes a bootable drive image into outfile. +from nova import exception + +@defer.inlineCallbacks +def partition(infile, outfile, local_bytes=0, local_type='ext2', execute=None): + """Takes a single partition represented by infile and writes a bootable + drive image into outfile. + The first 63 sectors (0-62) of the resulting image is a master boot record. Infile becomes the first primary partition. - If local bytes is specified, a second primary partition is created and formatted as ext2. + If local bytes is specified, a second primary partition is created and + formatted as ext2. + In the diagram below, dashes represent drive sectors. 0 a b c d e +-----+------. . .-------+------. . .------+ @@ -38,10 +46,12 @@ def partition(infile, outfile, local_bytes=0, local_type='ext2'): sector_size = 512 file_size = os.path.getsize(infile) if file_size % sector_size != 0: - logging.warn("Input partition size not evenly divisible by sector size: %d / %d" (file_size, sector_size)) + logging.warn("Input partition size not evenly divisible by" + " sector size: %d / %d", file_size, sector_size) primary_sectors = file_size / sector_size if local_bytes % sector_size != 0: - logging.warn("Bytes for local storage not evenly divisible by sector size: %d / %d" (local_bytes, sector_size)) + logging.warn("Bytes for local storage not evenly divisible" + " by sector size: %d / %d", local_bytes, sector_size) local_sectors = local_bytes / sector_size mbr_last = 62 # a @@ -52,71 +62,80 @@ def partition(infile, outfile, local_bytes=0, local_type='ext2'): last_sector = local_last # e # create an empty file - execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' % (outfile, last_sector, sector_size)) + execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' + % (outfile, last_sector, sector_size)) # make mbr partition - execute('parted --script %s mklabel msdos' % outfile) + yield execute('parted --script %s mklabel msdos' % outfile) # make primary partition - execute('parted --script %s mkpart primary %ds %ds' % (outfile, primary_first, primary_last)) + yield execute('parted --script %s mkpart primary %ds %ds' + % (outfile, primary_first, primary_last)) # make local partition if local_bytes > 0: - execute('parted --script %s mkpartfs primary %s %ds %ds' % (outfile, local_type, local_first, local_last)) + yield execute('parted --script %s mkpartfs primary %s %ds %ds' + % (outfile, local_type, local_first, local_last)) # copy file into partition - execute('dd if=%s of=%s bs=%d seek=%d conv=notrunc,fsync' % (infile, outfile, sector_size, primary_first)) + yield execute('dd if=%s of=%s bs=%d seek=%d conv=notrunc,fsync' + % (infile, outfile, sector_size, primary_first)) - -def inject_key(key, image, partition=None): +@defer.inlineCallbacks +def inject_key(key, image, partition=None, execute=None): """Injects a ssh key into a disk image. It adds the specified key to /root/.ssh/authorized_keys - it will mount the image as a fully partitioned disk and attempt to inject into the specified partition number. + it will mount the image as a fully partitioned disk and attempt to inject + into the specified partition number. If partition is not specified it mounts the image as a single partition. """ - out, err = execute('sudo losetup -f --show %s' % image) + out, err = yield execute('sudo losetup -f --show %s' % image) if err: - raise Error('Could not attach image to loopback: %s' % err) + raise exception.Error('Could not attach image to loopback: %s' % err) device = out.strip() try: if not partition is None: # create partition - out, err = execute('sudo kpartx -a %s' % device) + out, err = yield execute('sudo kpartx -a %s' % device) if err: - raise Error('Failed to load partition: %s' % err) - mapped_device = '/dev/mapper/%sp%s' % ( device.split('/')[-1] , partition ) + raise exception.Error('Failed to load partition: %s' % err) + mapped_device = '/dev/mapper/%sp%s' % (device.split('/')[-1], + partition) else: mapped_device = device - out, err = execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device) + out, err = yield execute('sudo tune2fs -c 0 -i 0 %s' % mapped_device) tmpdir = tempfile.mkdtemp() try: # mount loopback to dir - out, err = execute('sudo mount %s %s' % (mapped_device, tmpdir)) + out, err = yield execute( + 'sudo mount %s %s' % (mapped_device, tmpdir)) if err: - raise Error('Failed to mount filesystem: %s' % err) + raise exception.Error('Failed to mount filesystem: %s' % err) try: # inject key file - _inject_into_fs(key, tmpdir) + yield _inject_into_fs(key, tmpdir, execute=execute) finally: # unmount device - execute('sudo umount %s' % mapped_device) + yield execute('sudo umount %s' % mapped_device) finally: # remove temporary directory - os.rmdir(tmpdir) + # TODO(termie): scary, is there any thing we can check here? + yield execute('rm -rf %s' % tmpdir) if not partition is None: # remove partitions - execute('sudo kpartx -d %s' % device) + yield execute('sudo kpartx -d %s' % device) finally: # remove loopback - execute('sudo losetup -d %s' % device) + yield execute('sudo losetup -d %s' % device) -def _inject_into_fs(key, fs): +@defer.inlineCallbacks +def _inject_into_fs(key, fs, execute=None): sshdir = os.path.join(os.path.join(fs, 'root'), '.ssh') - execute('sudo mkdir %s' % sshdir) #error on existing dir doesn't matter - execute('sudo chown root %s' % sshdir) - execute('sudo chmod 700 %s' % sshdir) + yield execute('sudo mkdir -p %s' % sshdir) # existing dir doesn't matter + yield execute('sudo chown root %s' % sshdir) + yield execute('sudo chmod 700 %s' % sshdir) keyfile = os.path.join(sshdir, 'authorized_keys') - execute('sudo bash -c "cat >> %s"' % keyfile, '\n' + key + '\n') + yield execute('sudo bash -c "cat >> %s"' % keyfile, '\n' + key + '\n') diff --git a/nova/compute/node.py b/nova/compute/node.py index 7c1636f34617..72c2f2b70b3b 100644 --- a/nova/compute/node.py +++ b/nova/compute/node.py @@ -26,7 +26,6 @@ import json import logging import os import random -import shutil import sys from nova import vendor @@ -66,10 +65,14 @@ INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} -# The number of processes to start in our process pool -# TODO(termie): this should probably be a flag and the pool should probably -# be a singleton -PROCESS_POOL_SIZE = 4 + +def _image_path(path=''): + return os.path.join(FLAGS.images_path, path) + + +def _image_url(path): + return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) + class Node(object, service.Service): """ @@ -80,7 +83,7 @@ class Node(object, service.Service): super(Node, self).__init__() self._instances = {} self._conn = self._get_connection() - self._pool = process.Pool(PROCESS_POOL_SIZE) + self._pool = process.ProcessPool() self.instdir = model.InstanceDirectory() # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe @@ -231,63 +234,6 @@ class ProductCode(object): self.product_code = product_code -def _create_image(data, libvirt_xml): - """ create libvirt.xml and copy files into instance path """ - def basepath(path=''): - return os.path.abspath(os.path.join(data['basepath'], path)) - - def imagepath(path=''): - return os.path.join(FLAGS.images_path, path) - - def image_url(path): - return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) - logging.info(basepath('disk')) - try: - os.makedirs(data['basepath']) - os.chmod(data['basepath'], 0777) - except OSError: - # TODO: there is already an instance with this name, do something - pass - try: - logging.info('Creating image for: %s', data['instance_id']) - f = open(basepath('libvirt.xml'), 'w') - f.write(libvirt_xml) - f.close() - if not FLAGS.fake_libvirt: - if FLAGS.use_s3: - if not os.path.exists(basepath('disk')): - utils.fetchfile(image_url("%s/image" % data['image_id']), - basepath('disk-raw')) - if not os.path.exists(basepath('kernel')): - utils.fetchfile(image_url("%s/image" % data['kernel_id']), - basepath('kernel')) - if not os.path.exists(basepath('ramdisk')): - utils.fetchfile(image_url("%s/image" % data['ramdisk_id']), - basepath('ramdisk')) - else: - if not os.path.exists(basepath('disk')): - shutil.copyfile(imagepath("%s/image" % data['image_id']), - basepath('disk-raw')) - if not os.path.exists(basepath('kernel')): - shutil.copyfile(imagepath("%s/image" % data['kernel_id']), - basepath('kernel')) - if not os.path.exists(basepath('ramdisk')): - shutil.copyfile(imagepath("%s/image" % - data['ramdisk_id']), - basepath('ramdisk')) - if data['key_data']: - logging.info('Injecting key data into image %s' % - data['image_id']) - disk.inject_key(data['key_data'], basepath('disk-raw')) - if os.path.exists(basepath('disk')): - os.remove(basepath('disk')) - bytes = INSTANCE_TYPES[data['instance_type']]['local_gb'] * 1024 * 1024 * 1024 - disk.partition(basepath('disk-raw'), basepath('disk'), bytes) - logging.info('Done create image for: %s', data['instance_id']) - except Exception as ex: - return {'exception': ex} - - class Instance(object): NOSTATE = 0x00 @@ -298,16 +244,6 @@ class Instance(object): SHUTOFF = 0x05 CRASHED = 0x06 - def is_pending(self): - return (self.state == Instance.NOSTATE or self.state == 'pending') - - def is_destroyed(self): - return self.state == Instance.SHUTOFF - - def is_running(self): - logging.debug("Instance state is: %s" % self.state) - return (self.state == Instance.RUNNING or self.state == 'running') - def __init__(self, conn, pool, name, data): """ spawn an instance with a given name """ # TODO(termie): pool should probably be a singleton instead of being passed @@ -401,6 +337,16 @@ class Instance(object): def name(self): return self._s['name'] + def is_pending(self): + return (self.state == Instance.NOSTATE or self.state == 'pending') + + def is_destroyed(self): + return self.state == Instance.SHUTOFF + + def is_running(self): + logging.debug("Instance state is: %s" % self.state) + return (self.state == Instance.RUNNING or self.state == 'running') + def describe(self): return self._s @@ -414,6 +360,9 @@ class Instance(object): 'num_cpu': num_cpu, 'cpu_time': cpu_time} + def basepath(self, path=''): + return os.path.abspath(os.path.join(self._s['basepath'], path)) + def update_state(self): info = self.info() self.datamodel['state'] = info['state'] @@ -479,35 +428,89 @@ class Instance(object): logging.debug('rebooted instance %s' % self.name) defer.returnValue(None) - # @exception.wrap_exception + def _fetch_s3_image(self, image, path): + url = _image_url('%s/image' % image) + d = self._pool.simpleExecute('curl --silent %s -o %s' % (url, path)) + return d + + def _fetch_local_image(self, image, path): + source = _image_path('%s/image' % image) + d = self._pool.simpleExecute('cp %s %s' % (source, path)) + return d + + @defer.inlineCallbacks + def _create_image(self, libvirt_xml): + # syntactic nicety + data = self._s + basepath = self.basepath + + # ensure directories exist and are writable + yield self._pool.simpleExecute('mkdir -p %s' % basepath()) + yield self._pool.simpleExecute('chmod 0777 %s' % basepath()) + + + # TODO(termie): these are blocking calls, it would be great + # if they weren't. + logging.info('Creating image for: %s', data['instance_id']) + f = open(basepath('libvirt.xml'), 'w') + f.write(libvirt_xml) + f.close() + + if FLAGS.fake_libvirt: + logging.info('fake_libvirt, nothing to do for create_image') + raise defer.returnValue(None); + + if FLAGS.use_s3: + _fetch_file = self._fetch_s3_image + else: + _fetch_file = self._fetch_local_image + + if not os.path.exists(basepath('disk')): + yield _fetch_file(data['image_id'], basepath('disk-raw')) + if not os.path.exists(basepath('kernel')): + yield _fetch_file(data['kernel_id'], basepath('kernel')) + if not os.path.exists(basepath('ramdisk')): + yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) + + execute = lambda x: self._pool.simpleExecute(x, error_ok=1) + if data['key_data']: + logging.info('Injecting key data into image %s', data['image_id']) + yield disk.inject_key( + data['key_data'], basepath('disk-raw'), execute=execute) + + if os.path.exists(basepath('disk')): + yield self._pool.simpleExecute('rm -f %s' % basepath('disk')) + + bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] + * 1024 * 1024 * 1024) + yield disk.partition( + basepath('disk-raw'), basepath('disk'), bytes, execute=execute) + + @defer.inlineCallbacks + @exception.wrap_exception def spawn(self): self.datamodel['state'] = "spawning" self.datamodel.save() logging.debug("Starting spawn in Instance") + xml = self.toXml() - def _launch(retvals): + logging.info('self %s', self) + try: + yield self._create_image(xml) self.datamodel['state'] = 'launching' self.datamodel.save() - try: - logging.debug("Arrived in _launch") - if retvals and 'exception' in retvals: - raise retvals['exception'] - self._conn.createXML(self.toXml(), 0) - # TODO(termie): this should actually register - # a callback to check for successful boot - self._s['state'] = Instance.RUNNING - self.datamodel['state'] = 'running' - self.datamodel.save() - logging.debug("Instance is running") - except Exception as ex: - logging.debug(ex) - self.datamodel['state'] = 'shutdown' - self.datamodel.save() - #return self - - d = self._pool.apply(_create_image, self._s, xml) - d.addCallback(_launch) - return d + self._conn.createXML(xml, 0) + # TODO(termie): this should actually register + # a callback to check for successful boot + self._s['state'] = Instance.RUNNING + self.datamodel['state'] = 'running' + self.datamodel.save() + logging.debug("Instance is running") + except Exception: + #logging.exception('while spawning instance: %s', self.name) + self.datamodel['state'] = 'shutdown' + self.datamodel.save() + raise @exception.wrap_exception def console_output(self):