diff --git a/nodepool/builder.py b/nodepool/builder.py index 3e580ad70..9a42d758a 100644 --- a/nodepool/builder.py +++ b/nodepool/builder.py @@ -26,7 +26,7 @@ import gear import shlex from stats import statsd -import nodedb +from config import loadConfig MINS = 60 HOURS = 60 * MINS @@ -37,30 +37,66 @@ IMAGE_TIMEOUT = 6 * HOURS # How long to wait for an image save DEFAULT_QEMU_IMAGE_COMPAT_OPTIONS = "--qemu-img-options 'compat=0.10'" +class DibImageFile(object): + def __init__(self, image_id, extension=None): + self.image_id = image_id + self.extension = extension + + @staticmethod + def from_path(path): + image_file = os.path.basename(path) + image_id, extension = image_file.rsplit('.', 1) + return DibImageFile(image_id, extension) + + @staticmethod + def from_image_id(images_dir, image_id): + images = [] + for image_filename in os.listdir(images_dir): + image = DibImageFile.from_path(image_filename) + if image.image_id == image_id: + images.append(image) + return images + + @staticmethod + def from_images_dir(images_dir): + return [DibImageFile.from_path(x) for x in os.listdir(images_dir)] + + def to_path(self, images_dir, with_extension=True): + my_path = os.path.join(images_dir, self.image_id) + if with_extension: + if self.extension is None: + raise ValueError('Cannot specify image extension of None') + my_path += '.' + self.extension + return my_path + + class NodePoolBuilder(object): log = logging.getLogger("nodepool.builder") - def __init__(self, nodepool): + def __init__(self, config_path, nodepool): + self._config_path = config_path self.nodepool = nodepool self._running = False self._built_image_ids = set() self._start_lock = threading.Lock() self._gearman_worker = None + self._config = None @property def running(self): return self._running def start(self): + self.load_config(self._config_path) + with self._start_lock: if self._running: raise RuntimeError('Cannot start, already running.') self._running = True self._gearman_worker = self._initializeGearmanWorker( - self.nodepool.config.gearman_servers.values()) - images = self.nodepool.config.diskimages.keys() - self._registerGearmanFunctions(images) + self._config.gearman_servers.values()) + self._registerGearmanFunctions(self._config.diskimages.values()) self._registerExistingImageUploads() self.thread = threading.Thread(target=self._run, name='NodePool Builder') @@ -91,6 +127,9 @@ class NodePoolBuilder(object): else: raise + def load_config(self, config_path): + self._config = loadConfig(config_path) + def _run(self): self.log.debug('Starting listener for build jobs') while self._running: @@ -114,23 +153,24 @@ class NodePoolBuilder(object): def _registerGearmanFunctions(self, images): self.log.debug('Registering gearman functions') for image in images: - self._gearman_worker.registerFunction('image-build:%s' % image) + self._gearman_worker.registerFunction( + 'image-build:%s' % image.name) def _registerExistingImageUploads(self): - with self.nodepool.getDB().getSession() as session: - for image in session.getDibImages(): - self.registerImageId(image.id) + images = DibImageFile.from_images_dir(self._config.imagesdir) + for image in images: + self._registerImageId(image.image_id) def _registerImageId(self, image_id): - self.log.debug('registering image %d', image_id) + self.log.debug('registering image %s', image_id) self._gearman_worker.registerFunction('image-upload:%s' % image_id) self._gearman_worker.registerFunction('image-delete:%s' % image_id) self._built_image_ids.add(image_id) def _unregisterImageId(self, worker, image_id): if image_id in self._built_image_ids: - self.log.debug('unregistering image %d', image_id) - worker.unRegisterFunction('image-upload:%d' % image_id) + self.log.debug('unregistering image %s', image_id) + worker.unRegisterFunction('image-upload:%s' % image_id) self._built_image_ids.remove(image_id) else: self.log.warning('Attempting to remove image %d but image not ' @@ -138,7 +178,7 @@ class NodePoolBuilder(object): def _canHandleImageidJob(self, job, image_op): return (job.name.startswith(image_op + ':') and - int(job.name.split(':')[1]) in self._built_image_ids) + job.name.split(':')[1]) in self._built_image_ids def _handleJob(self, job): try: @@ -146,21 +186,32 @@ class NodePoolBuilder(object): job.name, job.arguments) if job.name.startswith('image-build:'): args = json.loads(job.arguments) - self.buildImage(args['image_id']) + image_id = args['image-id'] + if '/' in image_id: + raise RuntimeError('Invalid image-id specified') - # We can now upload this image - self._registerImageId(args['image_id']) - job.sendWorkComplete() + image_name = job.name.split(':', 1)[1] + try: + self._buildImage(image_name, image_id) + except RuntimeError: + self.log.exception('Error building image') + job.sendWorkFail() + else: + # We can now upload this image + self._registerImageId(image_id) + job.sendWorkComplete(json.dumps({'image-id': image_id})) elif self._canHandleImageidJob(job, 'image-upload'): args = json.loads(job.arguments) + image_name = args['image-name'] image_id = job.name.split(':')[1] - external_id = self._uploadImage(int(image_id), - args['provider']) + external_id = self._uploadImage(image_id, + args['provider'], + image_name) job.sendWorkComplete(json.dumps({'external-id': external_id})) elif self._canHandleImageidJob(job, 'image-delete'): image_id = job.name.split(':')[1] - self._deleteImage(int(image_id)) - self._unregisterImageId(self._gearman_worker, int(image_id)) + self._deleteImage(image_id) + self._unregisterImageId(self._gearman_worker, image_id) job.sendWorkComplete() else: self.log.error('Unable to handle job %s', job.name) @@ -170,87 +221,87 @@ class NodePoolBuilder(object): job.sendWorkException(traceback.format_exc()) def _deleteImage(self, image_id): - with self.nodepool.getDB().getSession() as session: - dib_image = session.getDibImage(image_id) + image_files = DibImageFile.from_image_id(self._config.imagesdir, + image_id) - # Remove image from the nodedb - dib_image.state = nodedb.DELETE - dib_image.delete() + # Delete a dib image and its associated file + for image_file in image_files: + img_path = image_file.to_path(self._config.imagesdir) + if os.path.exists(img_path): + self.log.debug('Removing filename %s', img_path) + os.remove(img_path) + else: + self.log.debug('No filename %s found to remove', img_path) + self.log.info("Deleted dib image id: %s" % image_id) - config = self.nodepool.config - image_config = config.diskimages.get(dib_image.image_name) - if not image_config: - self.log.error("Deleting image %d but configuration not found." - "Cannot delete image from disk without a " - "configuration.", image_id) - return - # Delete a dib image and its associated file - for image_type in image_config.image_types: - if os.path.exists(dib_image.filename + '.' + image_type): - os.remove(dib_image.filename + '.' + image_type) + def _uploadImage(self, image_id, provider_name, image_name): + start_time = time.time() + timestamp = int(start_time) - self.log.info("Deleted dib image id: %s" % dib_image.id) + provider = self._config.providers[provider_name] + image_type = provider.image_type - def _uploadImage(self, image_id, provider_name): - with self.nodepool.getDB().getSession() as session: - start_time = time.time() - timestamp = int(start_time) + image_files = DibImageFile.from_image_id(self._config.imagesdir, + image_id) + image_files = filter(lambda x: x.extension == image_type, image_files) + if len(image_files) == 0: + self.log.error("Unable to find image file for id %s to upload", + image_id) + return + if len(image_files) > 1: + self.log.error("Found more than one image for id %s. This should " + "never happen.", image_id) - dib_image = session.getDibImage(image_id) - provider = self.nodepool.config.providers[provider_name] + image_file = image_files[0] + filename = image_file.to_path(self._config.imagesdir, + with_extension=False) - filename = dib_image.filename + dummy_image = type('obj', (object,), + {'name': image_name}) + ext_image_name = provider.template_hostname.format( + provider=provider, image=dummy_image, timestamp=str(timestamp)) + self.log.info("Uploading dib image id: %s from %s in %s" % + (image_id, filename, provider.name)) - dummy_image = type('obj', (object,), - {'name': dib_image.image_name}) - image_name = provider.template_hostname.format( - provider=provider, image=dummy_image, timestamp=str(timestamp)) - self.log.info("Uploading dib image id: %s from %s for %s in %s" % - (dib_image.id, filename, image_name, provider.name)) + manager = self.nodepool.getProviderManager(provider) + try: + provider_image = provider.images[image_name] + except KeyError: + self.log.error("Could not find matching provider image for %s", + image_name) + return + image_meta = provider_image.meta + external_id = manager.uploadImage(ext_image_name, filename, + image_file.extension, 'bare', + image_meta) + self.log.debug("Saving image id: %s", external_id) + # It can take a _very_ long time for Rackspace 1.0 to save an image + manager.waitForImage(external_id, IMAGE_TIMEOUT) - manager = self.nodepool.getProviderManager(provider) - provider_image = filter( - lambda x: x.diskimage == dib_image.image_name, - provider.images.values()) - if len(provider_image) != 1: - self.log.error("Could not find a matching provider image for " - "%s", dib_image.image_name) - return - provider_image = provider_image[0] + if statsd: + dt = int((time.time() - start_time) * 1000) + key = 'nodepool.image_update.%s.%s' % (image_name, + provider.name) + statsd.timing(key, dt) + statsd.incr(key) - image_meta = provider_image.meta - provider_image_id = manager.uploadImage(image_name, filename, - provider.image_type, - 'bare', image_meta) - self.log.debug("Image id: %s saving image %s" % - (dib_image.id, provider_image_id)) - # It can take a _very_ long time for Rackspace 1.0 to save an image - manager.waitForImage(provider_image_id, IMAGE_TIMEOUT) + self.log.info("Image %s in %s is ready" % (image_id, + provider.name)) + return external_id - if statsd: - dt = int((time.time() - start_time) * 1000) - key = 'nodepool.image_update.%s.%s' % (image_name, - provider.name) - statsd.timing(key, dt) - statsd.incr(key) - - self.log.info("Image %s in %s is ready" % (dib_image.image_name, - provider.name)) - return provider_image_id - - def _buildImage(self, image, image_name, filename): + def _runDibForImage(self, image, filename): env = os.environ.copy() env['DIB_RELEASE'] = image.release - env['DIB_IMAGE_NAME'] = image_name + env['DIB_IMAGE_NAME'] = image.name env['DIB_IMAGE_FILENAME'] = filename # Note we use a reference to the nodepool config here so # that whenever the config is updated we get up to date # values in this thread. - if self.nodepool.config.elementsdir: - env['ELEMENTS_PATH'] = self.nodepool.config.elementsdir - if self.nodepool.config.scriptdir: - env['NODEPOOL_SCRIPTDIR'] = self.nodepool.config.scriptdir + if self._config.elementsdir: + env['ELEMENTS_PATH'] = self._config.elementsdir + if self._config.scriptdir: + env['NODEPOOL_SCRIPTDIR'] = self._config.scriptdir # send additional env vars if needed for k, v in image.env_vars.items(): @@ -263,7 +314,7 @@ class NodePoolBuilder(object): if 'qcow2' in img_types: qemu_img_options = DEFAULT_QEMU_IMAGE_COMPAT_OPTIONS - if 'fake-' in filename: + if 'fake-' in image.name: dib_cmd = 'nodepool/tests/fake-image-create' else: dib_cmd = 'disk-image-create' @@ -272,7 +323,7 @@ class NodePoolBuilder(object): (dib_cmd, img_types, qemu_img_options, filename, img_elements)) log = logging.getLogger("nodepool.image.build.%s" % - (image_name,)) + (image.name,)) self.log.info('Running %s' % cmd) @@ -297,40 +348,31 @@ class NodePoolBuilder(object): if ret: raise Exception("DIB failed creating %s" % (filename,)) - def buildImage(self, image_id): - with self.nodepool.getDB().getSession() as session: - image = session.getDibImage(image_id) - self.log.info("Creating image: %s with filename %s" % - (image.image_name, image.filename)) + def _getDiskimageByName(self, name): + for image in self._config.diskimages.values(): + if image.name == name: + return image + return None - start_time = time.time() - timestamp = int(start_time) - image.version = timestamp - session.commit() + def _buildImage(self, image_name, image_id): + diskimage = self._getDiskimageByName(image_name) + if diskimage is None: + raise RuntimeError('Could not find matching image in config for ' + '%s', image_name) - # retrieve image details - image_details = \ - self.nodepool.config.diskimages[image.image_name] - try: - self._buildImage( - image_details, - image.image_name, - image.filename) - except Exception: - self.log.exception("Exception building DIB image %s:" % - (image_id,)) - # DIB should've cleaned up after itself, just remove this - # image from the DB. - image.delete() - return + start_time = time.time() + image_file = DibImageFile(image_id) + filename = image_file.to_path(self._config.imagesdir, False) - image.state = nodedb.READY - session.commit() - self.log.info("DIB image %s with file %s is built" % ( - image_id, image.filename)) + self.log.info("Creating image: %s with filename %s" % + (diskimage.name, filename)) + self._runDibForImage(diskimage, filename) - if statsd: - dt = int((time.time() - start_time) * 1000) - key = 'nodepool.dib_image_build.%s' % image.image_name - statsd.timing(key, dt) - statsd.incr(key) + self.log.info("DIB image %s with file %s is built" % ( + image_name, filename)) + + if statsd: + dt = int((time.time() - start_time) * 1000) + key = 'nodepool.dib_image_build.%s' % diskimage.name + statsd.timing(key, dt) + statsd.incr(key) diff --git a/nodepool/config.py b/nodepool/config.py index 83e18c393..789689a6a 100644 --- a/nodepool/config.py +++ b/nodepool/config.py @@ -1,3 +1,4 @@ +import os_client_config from six.moves import configparser as ConfigParser import yaml @@ -54,8 +55,9 @@ class Network(ConfigValue): pass -def loadConfig(config_path, cloud_config): +def loadConfig(config_path): config = yaml.load(open(config_path)) + cloud_config = os_client_config.OpenStackConfig() newconfig = Config() newconfig.db = None diff --git a/nodepool/nodepool.py b/nodepool/nodepool.py index 19ba85d79..157455ed5 100644 --- a/nodepool/nodepool.py +++ b/nodepool/nodepool.py @@ -20,7 +20,6 @@ import apscheduler.scheduler import gear import json import logging -import os_client_config import os.path import paramiko import pprint @@ -337,7 +336,6 @@ class GearmanClient(gear.Client): job = super(GearmanClient, self).handleWorkComplete(packet) job.onCompleted() - class InstanceDeleter(threading.Thread): log = logging.getLogger("nodepool.InstanceDeleter") @@ -1108,8 +1106,7 @@ class NodePool(threading.Thread): def loadConfig(self): self.log.debug("Loading configuration") - config = nodepool_config.loadConfig(self.configfile, - os_client_config.OpenStackConfig()) + config = nodepool_config.loadConfig(self.configfile) nodepool_config.loadSecureConfig(config, self.securefile) return config @@ -1290,7 +1287,8 @@ class NodePool(threading.Thread): def reconfigureImageBuilder(self): # start disk image builder thread if not self._image_builder_thread and self.run_builder: - self._image_builder_thread = builder.NodePoolBuilder(self) + self._image_builder_thread = builder.NodePoolBuilder( + self.configfile, self) self._image_builder_thread.start() def setConfig(self, config): @@ -1567,25 +1565,7 @@ class NodePool(threading.Thread): # This is either building or in an error state # that will be handled by periodic cleanup return - types_found = True - diskimage = self.config.diskimages[image.diskimage] - for image_type in diskimage.image_types: - if (not os.path.exists( - dib_image.filename + '.' + image_type) and - not 'fake-dib-image' in dib_image.filename): - # if image is in ready state, check if image - # file exists in directory, otherwise we need - # to rebuild and delete this buggy image - types_found = False - self.log.warning("Image filename %s does not " - "exist. Removing image" % - dib_image.filename) - self.deleteDibImage(dib_image) - break - if types_found: - # Found a matching image that is READY and has a file - found = True - break + found = True if not found: # only build the image, we'll recheck again self.log.warning("Missing disk image %s" % image.name) @@ -1714,22 +1694,27 @@ class NodePool(threading.Thread): filename = os.path.join(self.config.imagesdir, '%s-%s' % (image.name, str(timestamp))) + job_uuid = str(uuid4().hex) - self.log.debug("Queued image building task for %s" % - image.name) dib_image = session.createDibImage(image_name=image.name, - filename=filename) - session.commit() + filename=filename, + version=timestamp) + self.log.debug("Created DibImage record %s with state %s", + dib_image.image_name, dib_image.state) # Submit image-build job - job_uuid = str(uuid4().hex) + job_data = json.dumps({ + 'image-id': str(dib_image.id) + }) gearman_job = WatchableJob( - 'image-build:%s' % image.name, - json.dumps({'image_id': dib_image.id}), - job_uuid) + 'image-build:%s' % image.name, job_data, job_uuid) self._image_build_jobs.addJob(gearman_job) + gearman_job.addCompletionHandler( + self.handleImageBuildComplete, image_id=dib_image.id) self.gearman_client.submitJob(gearman_job) + self.log.debug("Queued image building task for %s" % + image.name) except Exception: self.log.exception( "Could not build image %s", image.name) @@ -1754,14 +1739,17 @@ class NodePool(threading.Thread): # Submit image-upload job gearman_job = WatchableJob( - 'image-upload:%d' % image_id, - json.dumps({'provider': provider}), + 'image-upload:%s' % image_id, + json.dumps({ + 'provider': provider, + 'image-name': image_name + }), job_uuid) self._image_upload_jobs.addJob(gearman_job) gearman_job.addCompletionHandler(self.handleImageUploadComplete, snap_image_id=snap_image.id) - - self.log.debug('Submitting image-upload job for %d', image_id) + self.log.debug('Submitting image-upload job for image id %s', + image_id,) self.gearman_client.submitJob(gearman_job) return gearman_job @@ -1788,6 +1776,27 @@ class NodePool(threading.Thread): self.log.debug('Image %s is ready with external_id %s', snap_image_id, external_id) + def handleImageBuildComplete(self, job, image_id): + with self.getDB().getSession() as session: + dib_image = session.getDibImage(image_id) + if dib_image is None: + self.log.error( + 'Unable to find matching dib_image for image_id %s', + image_id) + return + dib_image.state = nodedb.READY + session.commit() + self.log.debug('DIB Image %s (id %d) is ready', + job.name.split(':', 1)[0], image_id) + + def handleImageDeleteComplete(self, job, image_id): + with self.getDB().getSession() as session: + dib_image = session.getDibImage(image_id) + + # Remove image from the nodedb + dib_image.state = nodedb.DELETE + dib_image.delete() + def launchNode(self, session, provider, label, target): try: self._launchNode(session, provider, label, target) @@ -1973,8 +1982,12 @@ class NodePool(threading.Thread): try: # Submit image-delete job job_uuid = str(uuid4().hex) - gearman_job = WatchableJob('image-delete:%s' % dib_image.id, - '', job_uuid) + gearman_job = WatchableJob( + 'image-delete:%s' % dib_image.id, + '', job_uuid + ) + gearman_job.addCompletionHandler(self.handleImageDeleteComplete, + image_id=dib_image.id) self.gearman_client.submitJob(gearman_job) return gearman_job except Exception: diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 34d492853..7b481541d 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -282,7 +282,8 @@ class BuilderFixture(fixtures.Fixture): def setUp(self): super(BuilderFixture, self).setUp() - self.builder = builder.NodePoolBuilder(self.nodepool) + self.builder = builder.NodePoolBuilder(self.nodepool.configfile, + self.nodepool) self.addCleanup(self.cleanup) self.builder.start() diff --git a/nodepool/tests/fixtures/node_ipv6.yaml b/nodepool/tests/fixtures/node_ipv6.yaml index 9a3fca0bc..eb707c626 100644 --- a/nodepool/tests/fixtures/node_ipv6.yaml +++ b/nodepool/tests/fixtures/node_ipv6.yaml @@ -1,4 +1,6 @@ script-dir: . +elements-dir: . +images-dir: '{images_dir}' cron: check: '*/15 * * * *' diff --git a/nodepool/tests/test_builder.py b/nodepool/tests/test_builder.py new file mode 100644 index 000000000..8b59b41a3 --- /dev/null +++ b/nodepool/tests/test_builder.py @@ -0,0 +1,85 @@ +# Copyright (C) 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import fixtures + +from nodepool import builder, tests + + +class TestNodepoolBuilderDibImage(tests.BaseTestCase): + def test_from_path(self): + image = builder.DibImageFile.from_path( + '/foo/bar/myid1234.qcow2') + self.assertEqual(image.image_id, 'myid1234') + self.assertEqual(image.extension, 'qcow2') + + def test_from_image_id(self): + tempdir = fixtures.TempDir() + self.useFixture(tempdir) + image_path = os.path.join(tempdir.path, 'myid1234.qcow2') + open(image_path, 'w') + + images = builder.DibImageFile.from_image_id(tempdir.path, 'myid1234') + self.assertEqual(len(images), 1) + + image = images[0] + self.assertEqual(image.image_id, 'myid1234') + self.assertEqual(image.extension, 'qcow2') + + def test_from_id_multiple(self): + tempdir = fixtures.TempDir() + self.useFixture(tempdir) + image_path_1 = os.path.join(tempdir.path, 'myid1234.qcow2') + image_path_2 = os.path.join(tempdir.path, 'myid1234.raw') + open(image_path_1, 'w') + open(image_path_2, 'w') + + images = builder.DibImageFile.from_image_id(tempdir.path, 'myid1234') + images = sorted(images, key=lambda x: x.extension) + self.assertEqual(len(images), 2) + + self.assertEqual(images[0].extension, 'qcow2') + self.assertEqual(images[1].extension, 'raw') + + def test_from_images_dir(self): + tempdir = fixtures.TempDir() + self.useFixture(tempdir) + image_path_1 = os.path.join(tempdir.path, 'myid1234.qcow2') + image_path_2 = os.path.join(tempdir.path, 'myid1234.raw') + open(image_path_1, 'w') + open(image_path_2, 'w') + + images = builder.DibImageFile.from_images_dir(tempdir.path) + images = sorted(images, key=lambda x: x.extension) + self.assertEqual(len(images), 2) + + self.assertEqual(images[0].image_id, 'myid1234') + self.assertEqual(images[0].extension, 'qcow2') + self.assertEqual(images[1].image_id, 'myid1234') + self.assertEqual(images[1].extension, 'raw') + + def test_to_path(self): + image = builder.DibImageFile('myid1234', 'qcow2') + self.assertEqual(image.to_path('/imagedir'), + '/imagedir/myid1234.qcow2') + self.assertEqual(image.to_path('/imagedir/'), + '/imagedir/myid1234.qcow2') + self.assertEqual(image.to_path('/imagedir/', False), + '/imagedir/myid1234') + + image = builder.DibImageFile('myid1234') + self.assertRaises(ValueError, image.to_path, '/imagedir/')