Decouple nodepool db and config from builders

Currently we use the nodepool config object and nodedb database
to determine information about the diskimages we build. Start parsing
the config on our own and stop using nodedb.

This changes images to use a uuid for ID's since the primary key
ID which was previously used is no longer readable by the builders.

Co-Authored-By: Joshua Hesketh <josh@nitrotech.org>

Change-Id: Id1bb431839cde2bd7b42cf1f57460febd56849b3
This commit is contained in:
Greory Haynes 2015-09-21 15:53:26 -05:00 committed by Clark Boylan
parent 9c6ae68a56
commit 8f3db89306
6 changed files with 305 additions and 160 deletions

View File

@ -26,7 +26,7 @@ import gear
import shlex
from stats import statsd
import nodedb
from config import loadConfig
MINS = 60
HOURS = 60 * MINS
@ -37,30 +37,66 @@ IMAGE_TIMEOUT = 6 * HOURS # How long to wait for an image save
DEFAULT_QEMU_IMAGE_COMPAT_OPTIONS = "--qemu-img-options 'compat=0.10'"
class DibImageFile(object):
def __init__(self, image_id, extension=None):
self.image_id = image_id
self.extension = extension
@staticmethod
def from_path(path):
image_file = os.path.basename(path)
image_id, extension = image_file.rsplit('.', 1)
return DibImageFile(image_id, extension)
@staticmethod
def from_image_id(images_dir, image_id):
images = []
for image_filename in os.listdir(images_dir):
image = DibImageFile.from_path(image_filename)
if image.image_id == image_id:
images.append(image)
return images
@staticmethod
def from_images_dir(images_dir):
return [DibImageFile.from_path(x) for x in os.listdir(images_dir)]
def to_path(self, images_dir, with_extension=True):
my_path = os.path.join(images_dir, self.image_id)
if with_extension:
if self.extension is None:
raise ValueError('Cannot specify image extension of None')
my_path += '.' + self.extension
return my_path
class NodePoolBuilder(object):
log = logging.getLogger("nodepool.builder")
def __init__(self, nodepool):
def __init__(self, config_path, nodepool):
self._config_path = config_path
self.nodepool = nodepool
self._running = False
self._built_image_ids = set()
self._start_lock = threading.Lock()
self._gearman_worker = None
self._config = None
@property
def running(self):
return self._running
def start(self):
self.load_config(self._config_path)
with self._start_lock:
if self._running:
raise RuntimeError('Cannot start, already running.')
self._running = True
self._gearman_worker = self._initializeGearmanWorker(
self.nodepool.config.gearman_servers.values())
images = self.nodepool.config.diskimages.keys()
self._registerGearmanFunctions(images)
self._config.gearman_servers.values())
self._registerGearmanFunctions(self._config.diskimages.values())
self._registerExistingImageUploads()
self.thread = threading.Thread(target=self._run,
name='NodePool Builder')
@ -91,6 +127,9 @@ class NodePoolBuilder(object):
else:
raise
def load_config(self, config_path):
self._config = loadConfig(config_path)
def _run(self):
self.log.debug('Starting listener for build jobs')
while self._running:
@ -114,23 +153,24 @@ class NodePoolBuilder(object):
def _registerGearmanFunctions(self, images):
self.log.debug('Registering gearman functions')
for image in images:
self._gearman_worker.registerFunction('image-build:%s' % image)
self._gearman_worker.registerFunction(
'image-build:%s' % image.name)
def _registerExistingImageUploads(self):
with self.nodepool.getDB().getSession() as session:
for image in session.getDibImages():
self.registerImageId(image.id)
images = DibImageFile.from_images_dir(self._config.imagesdir)
for image in images:
self._registerImageId(image.image_id)
def _registerImageId(self, image_id):
self.log.debug('registering image %d', image_id)
self.log.debug('registering image %s', image_id)
self._gearman_worker.registerFunction('image-upload:%s' % image_id)
self._gearman_worker.registerFunction('image-delete:%s' % image_id)
self._built_image_ids.add(image_id)
def _unregisterImageId(self, worker, image_id):
if image_id in self._built_image_ids:
self.log.debug('unregistering image %d', image_id)
worker.unRegisterFunction('image-upload:%d' % image_id)
self.log.debug('unregistering image %s', image_id)
worker.unRegisterFunction('image-upload:%s' % image_id)
self._built_image_ids.remove(image_id)
else:
self.log.warning('Attempting to remove image %d but image not '
@ -138,7 +178,7 @@ class NodePoolBuilder(object):
def _canHandleImageidJob(self, job, image_op):
return (job.name.startswith(image_op + ':') and
int(job.name.split(':')[1]) in self._built_image_ids)
job.name.split(':')[1]) in self._built_image_ids
def _handleJob(self, job):
try:
@ -146,21 +186,32 @@ class NodePoolBuilder(object):
job.name, job.arguments)
if job.name.startswith('image-build:'):
args = json.loads(job.arguments)
self.buildImage(args['image_id'])
image_id = args['image-id']
if '/' in image_id:
raise RuntimeError('Invalid image-id specified')
# We can now upload this image
self._registerImageId(args['image_id'])
job.sendWorkComplete()
image_name = job.name.split(':', 1)[1]
try:
self._buildImage(image_name, image_id)
except RuntimeError:
self.log.exception('Error building image')
job.sendWorkFail()
else:
# We can now upload this image
self._registerImageId(image_id)
job.sendWorkComplete(json.dumps({'image-id': image_id}))
elif self._canHandleImageidJob(job, 'image-upload'):
args = json.loads(job.arguments)
image_name = args['image-name']
image_id = job.name.split(':')[1]
external_id = self._uploadImage(int(image_id),
args['provider'])
external_id = self._uploadImage(image_id,
args['provider'],
image_name)
job.sendWorkComplete(json.dumps({'external-id': external_id}))
elif self._canHandleImageidJob(job, 'image-delete'):
image_id = job.name.split(':')[1]
self._deleteImage(int(image_id))
self._unregisterImageId(self._gearman_worker, int(image_id))
self._deleteImage(image_id)
self._unregisterImageId(self._gearman_worker, image_id)
job.sendWorkComplete()
else:
self.log.error('Unable to handle job %s', job.name)
@ -170,87 +221,87 @@ class NodePoolBuilder(object):
job.sendWorkException(traceback.format_exc())
def _deleteImage(self, image_id):
with self.nodepool.getDB().getSession() as session:
dib_image = session.getDibImage(image_id)
image_files = DibImageFile.from_image_id(self._config.imagesdir,
image_id)
# Remove image from the nodedb
dib_image.state = nodedb.DELETE
dib_image.delete()
# Delete a dib image and its associated file
for image_file in image_files:
img_path = image_file.to_path(self._config.imagesdir)
if os.path.exists(img_path):
self.log.debug('Removing filename %s', img_path)
os.remove(img_path)
else:
self.log.debug('No filename %s found to remove', img_path)
self.log.info("Deleted dib image id: %s" % image_id)
config = self.nodepool.config
image_config = config.diskimages.get(dib_image.image_name)
if not image_config:
self.log.error("Deleting image %d but configuration not found."
"Cannot delete image from disk without a "
"configuration.", image_id)
return
# Delete a dib image and its associated file
for image_type in image_config.image_types:
if os.path.exists(dib_image.filename + '.' + image_type):
os.remove(dib_image.filename + '.' + image_type)
def _uploadImage(self, image_id, provider_name, image_name):
start_time = time.time()
timestamp = int(start_time)
self.log.info("Deleted dib image id: %s" % dib_image.id)
provider = self._config.providers[provider_name]
image_type = provider.image_type
def _uploadImage(self, image_id, provider_name):
with self.nodepool.getDB().getSession() as session:
start_time = time.time()
timestamp = int(start_time)
image_files = DibImageFile.from_image_id(self._config.imagesdir,
image_id)
image_files = filter(lambda x: x.extension == image_type, image_files)
if len(image_files) == 0:
self.log.error("Unable to find image file for id %s to upload",
image_id)
return
if len(image_files) > 1:
self.log.error("Found more than one image for id %s. This should "
"never happen.", image_id)
dib_image = session.getDibImage(image_id)
provider = self.nodepool.config.providers[provider_name]
image_file = image_files[0]
filename = image_file.to_path(self._config.imagesdir,
with_extension=False)
filename = dib_image.filename
dummy_image = type('obj', (object,),
{'name': image_name})
ext_image_name = provider.template_hostname.format(
provider=provider, image=dummy_image, timestamp=str(timestamp))
self.log.info("Uploading dib image id: %s from %s in %s" %
(image_id, filename, provider.name))
dummy_image = type('obj', (object,),
{'name': dib_image.image_name})
image_name = provider.template_hostname.format(
provider=provider, image=dummy_image, timestamp=str(timestamp))
self.log.info("Uploading dib image id: %s from %s for %s in %s" %
(dib_image.id, filename, image_name, provider.name))
manager = self.nodepool.getProviderManager(provider)
try:
provider_image = provider.images[image_name]
except KeyError:
self.log.error("Could not find matching provider image for %s",
image_name)
return
image_meta = provider_image.meta
external_id = manager.uploadImage(ext_image_name, filename,
image_file.extension, 'bare',
image_meta)
self.log.debug("Saving image id: %s", external_id)
# It can take a _very_ long time for Rackspace 1.0 to save an image
manager.waitForImage(external_id, IMAGE_TIMEOUT)
manager = self.nodepool.getProviderManager(provider)
provider_image = filter(
lambda x: x.diskimage == dib_image.image_name,
provider.images.values())
if len(provider_image) != 1:
self.log.error("Could not find a matching provider image for "
"%s", dib_image.image_name)
return
provider_image = provider_image[0]
if statsd:
dt = int((time.time() - start_time) * 1000)
key = 'nodepool.image_update.%s.%s' % (image_name,
provider.name)
statsd.timing(key, dt)
statsd.incr(key)
image_meta = provider_image.meta
provider_image_id = manager.uploadImage(image_name, filename,
provider.image_type,
'bare', image_meta)
self.log.debug("Image id: %s saving image %s" %
(dib_image.id, provider_image_id))
# It can take a _very_ long time for Rackspace 1.0 to save an image
manager.waitForImage(provider_image_id, IMAGE_TIMEOUT)
self.log.info("Image %s in %s is ready" % (image_id,
provider.name))
return external_id
if statsd:
dt = int((time.time() - start_time) * 1000)
key = 'nodepool.image_update.%s.%s' % (image_name,
provider.name)
statsd.timing(key, dt)
statsd.incr(key)
self.log.info("Image %s in %s is ready" % (dib_image.image_name,
provider.name))
return provider_image_id
def _buildImage(self, image, image_name, filename):
def _runDibForImage(self, image, filename):
env = os.environ.copy()
env['DIB_RELEASE'] = image.release
env['DIB_IMAGE_NAME'] = image_name
env['DIB_IMAGE_NAME'] = image.name
env['DIB_IMAGE_FILENAME'] = filename
# Note we use a reference to the nodepool config here so
# that whenever the config is updated we get up to date
# values in this thread.
if self.nodepool.config.elementsdir:
env['ELEMENTS_PATH'] = self.nodepool.config.elementsdir
if self.nodepool.config.scriptdir:
env['NODEPOOL_SCRIPTDIR'] = self.nodepool.config.scriptdir
if self._config.elementsdir:
env['ELEMENTS_PATH'] = self._config.elementsdir
if self._config.scriptdir:
env['NODEPOOL_SCRIPTDIR'] = self._config.scriptdir
# send additional env vars if needed
for k, v in image.env_vars.items():
@ -263,7 +314,7 @@ class NodePoolBuilder(object):
if 'qcow2' in img_types:
qemu_img_options = DEFAULT_QEMU_IMAGE_COMPAT_OPTIONS
if 'fake-' in filename:
if 'fake-' in image.name:
dib_cmd = 'nodepool/tests/fake-image-create'
else:
dib_cmd = 'disk-image-create'
@ -272,7 +323,7 @@ class NodePoolBuilder(object):
(dib_cmd, img_types, qemu_img_options, filename, img_elements))
log = logging.getLogger("nodepool.image.build.%s" %
(image_name,))
(image.name,))
self.log.info('Running %s' % cmd)
@ -297,40 +348,31 @@ class NodePoolBuilder(object):
if ret:
raise Exception("DIB failed creating %s" % (filename,))
def buildImage(self, image_id):
with self.nodepool.getDB().getSession() as session:
image = session.getDibImage(image_id)
self.log.info("Creating image: %s with filename %s" %
(image.image_name, image.filename))
def _getDiskimageByName(self, name):
for image in self._config.diskimages.values():
if image.name == name:
return image
return None
start_time = time.time()
timestamp = int(start_time)
image.version = timestamp
session.commit()
def _buildImage(self, image_name, image_id):
diskimage = self._getDiskimageByName(image_name)
if diskimage is None:
raise RuntimeError('Could not find matching image in config for '
'%s', image_name)
# retrieve image details
image_details = \
self.nodepool.config.diskimages[image.image_name]
try:
self._buildImage(
image_details,
image.image_name,
image.filename)
except Exception:
self.log.exception("Exception building DIB image %s:" %
(image_id,))
# DIB should've cleaned up after itself, just remove this
# image from the DB.
image.delete()
return
start_time = time.time()
image_file = DibImageFile(image_id)
filename = image_file.to_path(self._config.imagesdir, False)
image.state = nodedb.READY
session.commit()
self.log.info("DIB image %s with file %s is built" % (
image_id, image.filename))
self.log.info("Creating image: %s with filename %s" %
(diskimage.name, filename))
self._runDibForImage(diskimage, filename)
if statsd:
dt = int((time.time() - start_time) * 1000)
key = 'nodepool.dib_image_build.%s' % image.image_name
statsd.timing(key, dt)
statsd.incr(key)
self.log.info("DIB image %s with file %s is built" % (
image_name, filename))
if statsd:
dt = int((time.time() - start_time) * 1000)
key = 'nodepool.dib_image_build.%s' % diskimage.name
statsd.timing(key, dt)
statsd.incr(key)

View File

@ -1,3 +1,4 @@
import os_client_config
from six.moves import configparser as ConfigParser
import yaml
@ -54,8 +55,9 @@ class Network(ConfigValue):
pass
def loadConfig(config_path, cloud_config):
def loadConfig(config_path):
config = yaml.load(open(config_path))
cloud_config = os_client_config.OpenStackConfig()
newconfig = Config()
newconfig.db = None

View File

@ -20,7 +20,6 @@ import apscheduler.scheduler
import gear
import json
import logging
import os_client_config
import os.path
import paramiko
import pprint
@ -337,7 +336,6 @@ class GearmanClient(gear.Client):
job = super(GearmanClient, self).handleWorkComplete(packet)
job.onCompleted()
class InstanceDeleter(threading.Thread):
log = logging.getLogger("nodepool.InstanceDeleter")
@ -1108,8 +1106,7 @@ class NodePool(threading.Thread):
def loadConfig(self):
self.log.debug("Loading configuration")
config = nodepool_config.loadConfig(self.configfile,
os_client_config.OpenStackConfig())
config = nodepool_config.loadConfig(self.configfile)
nodepool_config.loadSecureConfig(config, self.securefile)
return config
@ -1290,7 +1287,8 @@ class NodePool(threading.Thread):
def reconfigureImageBuilder(self):
# start disk image builder thread
if not self._image_builder_thread and self.run_builder:
self._image_builder_thread = builder.NodePoolBuilder(self)
self._image_builder_thread = builder.NodePoolBuilder(
self.configfile, self)
self._image_builder_thread.start()
def setConfig(self, config):
@ -1567,25 +1565,7 @@ class NodePool(threading.Thread):
# This is either building or in an error state
# that will be handled by periodic cleanup
return
types_found = True
diskimage = self.config.diskimages[image.diskimage]
for image_type in diskimage.image_types:
if (not os.path.exists(
dib_image.filename + '.' + image_type) and
not 'fake-dib-image' in dib_image.filename):
# if image is in ready state, check if image
# file exists in directory, otherwise we need
# to rebuild and delete this buggy image
types_found = False
self.log.warning("Image filename %s does not "
"exist. Removing image" %
dib_image.filename)
self.deleteDibImage(dib_image)
break
if types_found:
# Found a matching image that is READY and has a file
found = True
break
found = True
if not found:
# only build the image, we'll recheck again
self.log.warning("Missing disk image %s" % image.name)
@ -1714,22 +1694,27 @@ class NodePool(threading.Thread):
filename = os.path.join(self.config.imagesdir,
'%s-%s' %
(image.name, str(timestamp)))
job_uuid = str(uuid4().hex)
self.log.debug("Queued image building task for %s" %
image.name)
dib_image = session.createDibImage(image_name=image.name,
filename=filename)
session.commit()
filename=filename,
version=timestamp)
self.log.debug("Created DibImage record %s with state %s",
dib_image.image_name, dib_image.state)
# Submit image-build job
job_uuid = str(uuid4().hex)
job_data = json.dumps({
'image-id': str(dib_image.id)
})
gearman_job = WatchableJob(
'image-build:%s' % image.name,
json.dumps({'image_id': dib_image.id}),
job_uuid)
'image-build:%s' % image.name, job_data, job_uuid)
self._image_build_jobs.addJob(gearman_job)
gearman_job.addCompletionHandler(
self.handleImageBuildComplete, image_id=dib_image.id)
self.gearman_client.submitJob(gearman_job)
self.log.debug("Queued image building task for %s" %
image.name)
except Exception:
self.log.exception(
"Could not build image %s", image.name)
@ -1754,14 +1739,17 @@ class NodePool(threading.Thread):
# Submit image-upload job
gearman_job = WatchableJob(
'image-upload:%d' % image_id,
json.dumps({'provider': provider}),
'image-upload:%s' % image_id,
json.dumps({
'provider': provider,
'image-name': image_name
}),
job_uuid)
self._image_upload_jobs.addJob(gearman_job)
gearman_job.addCompletionHandler(self.handleImageUploadComplete,
snap_image_id=snap_image.id)
self.log.debug('Submitting image-upload job for %d', image_id)
self.log.debug('Submitting image-upload job for image id %s',
image_id,)
self.gearman_client.submitJob(gearman_job)
return gearman_job
@ -1788,6 +1776,27 @@ class NodePool(threading.Thread):
self.log.debug('Image %s is ready with external_id %s',
snap_image_id, external_id)
def handleImageBuildComplete(self, job, image_id):
with self.getDB().getSession() as session:
dib_image = session.getDibImage(image_id)
if dib_image is None:
self.log.error(
'Unable to find matching dib_image for image_id %s',
image_id)
return
dib_image.state = nodedb.READY
session.commit()
self.log.debug('DIB Image %s (id %d) is ready',
job.name.split(':', 1)[0], image_id)
def handleImageDeleteComplete(self, job, image_id):
with self.getDB().getSession() as session:
dib_image = session.getDibImage(image_id)
# Remove image from the nodedb
dib_image.state = nodedb.DELETE
dib_image.delete()
def launchNode(self, session, provider, label, target):
try:
self._launchNode(session, provider, label, target)
@ -1973,8 +1982,12 @@ class NodePool(threading.Thread):
try:
# Submit image-delete job
job_uuid = str(uuid4().hex)
gearman_job = WatchableJob('image-delete:%s' % dib_image.id,
'', job_uuid)
gearman_job = WatchableJob(
'image-delete:%s' % dib_image.id,
'', job_uuid
)
gearman_job.addCompletionHandler(self.handleImageDeleteComplete,
image_id=dib_image.id)
self.gearman_client.submitJob(gearman_job)
return gearman_job
except Exception:

View File

@ -282,7 +282,8 @@ class BuilderFixture(fixtures.Fixture):
def setUp(self):
super(BuilderFixture, self).setUp()
self.builder = builder.NodePoolBuilder(self.nodepool)
self.builder = builder.NodePoolBuilder(self.nodepool.configfile,
self.nodepool)
self.addCleanup(self.cleanup)
self.builder.start()

View File

@ -1,4 +1,6 @@
script-dir: .
elements-dir: .
images-dir: '{images_dir}'
cron:
check: '*/15 * * * *'

View File

@ -0,0 +1,85 @@
# Copyright (C) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import fixtures
from nodepool import builder, tests
class TestNodepoolBuilderDibImage(tests.BaseTestCase):
def test_from_path(self):
image = builder.DibImageFile.from_path(
'/foo/bar/myid1234.qcow2')
self.assertEqual(image.image_id, 'myid1234')
self.assertEqual(image.extension, 'qcow2')
def test_from_image_id(self):
tempdir = fixtures.TempDir()
self.useFixture(tempdir)
image_path = os.path.join(tempdir.path, 'myid1234.qcow2')
open(image_path, 'w')
images = builder.DibImageFile.from_image_id(tempdir.path, 'myid1234')
self.assertEqual(len(images), 1)
image = images[0]
self.assertEqual(image.image_id, 'myid1234')
self.assertEqual(image.extension, 'qcow2')
def test_from_id_multiple(self):
tempdir = fixtures.TempDir()
self.useFixture(tempdir)
image_path_1 = os.path.join(tempdir.path, 'myid1234.qcow2')
image_path_2 = os.path.join(tempdir.path, 'myid1234.raw')
open(image_path_1, 'w')
open(image_path_2, 'w')
images = builder.DibImageFile.from_image_id(tempdir.path, 'myid1234')
images = sorted(images, key=lambda x: x.extension)
self.assertEqual(len(images), 2)
self.assertEqual(images[0].extension, 'qcow2')
self.assertEqual(images[1].extension, 'raw')
def test_from_images_dir(self):
tempdir = fixtures.TempDir()
self.useFixture(tempdir)
image_path_1 = os.path.join(tempdir.path, 'myid1234.qcow2')
image_path_2 = os.path.join(tempdir.path, 'myid1234.raw')
open(image_path_1, 'w')
open(image_path_2, 'w')
images = builder.DibImageFile.from_images_dir(tempdir.path)
images = sorted(images, key=lambda x: x.extension)
self.assertEqual(len(images), 2)
self.assertEqual(images[0].image_id, 'myid1234')
self.assertEqual(images[0].extension, 'qcow2')
self.assertEqual(images[1].image_id, 'myid1234')
self.assertEqual(images[1].extension, 'raw')
def test_to_path(self):
image = builder.DibImageFile('myid1234', 'qcow2')
self.assertEqual(image.to_path('/imagedir'),
'/imagedir/myid1234.qcow2')
self.assertEqual(image.to_path('/imagedir/'),
'/imagedir/myid1234.qcow2')
self.assertEqual(image.to_path('/imagedir/', False),
'/imagedir/myid1234')
image = builder.DibImageFile('myid1234')
self.assertRaises(ValueError, image.to_path, '/imagedir/')