diff --git a/nodepool/fakeprovider.py b/nodepool/fakeprovider.py index b9d051608..8fc590043 100644 --- a/nodepool/fakeprovider.py +++ b/nodepool/fakeprovider.py @@ -41,7 +41,9 @@ class FakeList(object): if x.name == name: return x - def get(self, id): + def get(self, image=None): + if image: + id = image for x in self._list: if x.id == id: return x @@ -52,12 +54,16 @@ class FakeList(object): obj.status = status def delete(self, obj): - self._list.remove(obj) + if hasattr(obj, 'id'): + self._list.remove(obj) + else: + self._list.remove(self.get(obj)) def create(self, **kw): s = Dummy(id=uuid.uuid4().hex, name=kw['name'], status='BUILD', + adminPass='fake', addresses=dict(public=[dict(version=4, addr='fake')]), manager=self) self._list.append(s) @@ -65,8 +71,8 @@ class FakeList(object): t.start() return s - def create_image(self, server, name): - x = self.api.images.create(name=name) + def create_image(self, server, image_name): + x = self.api.images.create(name=image_name) return x.id diff --git a/nodepool/nodepool.py b/nodepool/nodepool.py index dc7e1361c..b07a92b17 100644 --- a/nodepool/nodepool.py +++ b/nodepool/nodepool.py @@ -29,6 +29,7 @@ import zmq import nodedb import nodeutils as utils +import provider_manager MINS = 60 HOURS = 60 * MINS @@ -146,7 +147,7 @@ class NodeLauncher(threading.Thread): self.log.debug("Launching node id: %s" % self.node_id) try: self.node = session.getNode(self.node_id) - self.client = self.nodepool.getClient(self.provider) + self.manager = self.nodepool.getManager(self.provider) except Exception: self.log.exception("Exception preparing to launch node id: %s:" % self.node_id) @@ -158,7 +159,7 @@ class NodeLauncher(threading.Thread): self.log.exception("Exception launching node id: %s:" % self.node_id) try: - utils.delete_node(self.client, self.node) + self.nodepool.deleteNode(session, self.node) except Exception: self.log.exception("Exception deleting node id: %s:" % self.node_id) @@ -173,37 +174,33 @@ class NodeLauncher(threading.Thread): self.node.nodename = hostname.split('.')[0] self.node.target_name = self.target.name - flavor = utils.get_flavor(self.client, self.image.min_ram) snap_image = session.getCurrentSnapshotImage( self.provider.name, self.image.name) if not snap_image: raise Exception("Unable to find current snapshot image %s in %s" % (self.image.name, self.provider.name)) - remote_snap_image = self.client.images.get(snap_image.external_id) self.log.info("Creating server with hostname %s in %s from image %s " "for node id: %s" % (hostname, self.provider.name, self.image.name, self.node_id)) - server = None - server, key = utils.create_server(self.client, hostname, - remote_snap_image, flavor) - self.node.external_id = server.id + server_id = self.manager.createServer(hostname, + self.image.min_ram, + snap_image.external_id) + self.node.external_id = server_id session.commit() self.log.debug("Waiting for server %s for node id: %s" % - (server.id, self.node.id)) - - server = utils.wait_for_resource(server) - if server.status != 'ACTIVE': + (server_id, self.node.id)) + server = self.manager.waitForServer(server_id) + if server['status'] != 'ACTIVE': raise Exception("Server %s for node id: %s status: %s" % - (server.id, self.node.id, server.status)) + (server_id, self.node.id, server['status'])) - ip = utils.get_public_ip(server) - if not ip and 'os-floating-ips' in utils.get_extensions(self.client): - utils.add_public_ip(server) - ip = utils.get_public_ip(server) + ip = server.get('public_v4') + if not ip and self.manager.hasExtension('os-floating-ips'): + ip = self.manager.addPublicIP(server['server_id']) if not ip: - raise Exception("Unable to find public ip of server") + raise Exception("Unable to find public IP of server") self.node.ip = ip self.log.debug("Node id: %s is running, testing ssh" % self.node.id) @@ -288,7 +285,7 @@ class ImageUpdater(threading.Thread): try: self.snap_image = session.getSnapshotImage( self.snap_image_id) - self.client = self.nodepool.getClient(self.provider) + self.manager = self.nodepool.getManager(self.provider) except Exception: self.log.exception("Exception preparing to update image %s " "in %s:" % (self.image.name, @@ -302,7 +299,7 @@ class ImageUpdater(threading.Thread): (self.image.name, self.provider.name)) try: if self.snap_image: - utils.delete_image(self.client, self.snap_image) + self.nodepool.deleteImage(self.snap_image) except Exception: self.log.exception("Exception deleting image id: %s:" % self.snap_image.id) @@ -312,41 +309,50 @@ class ImageUpdater(threading.Thread): start_time = time.time() timestamp = int(start_time) - flavor = utils.get_flavor(self.client, self.image.min_ram) - base_image = self.client.images.find(name=self.image.base_image) hostname = ('%s-%s.template.openstack.org' % (self.image.name, str(timestamp))) self.log.info("Creating image id: %s for %s in %s" % (self.snap_image.id, self.image.name, self.provider.name)) - server, key = utils.create_server( - self.client, hostname, base_image, flavor, add_key=True) + if self.manager.hasExtension('os-keypairs'): + key_name = hostname.split('.')[0] + key = self.manager.addKeypair(key_name) + else: + key_name = None + key = None + server_id = self.manager.createServer(hostname, + self.image.min_ram, + image_name=self.image.base_image, + key_name=key_name) self.snap_image.hostname = hostname self.snap_image.version = timestamp - self.snap_image.server_external_id = server.id + self.snap_image.server_external_id = server_id session.commit() self.log.debug("Image id: %s waiting for server %s" % - (self.snap_image.id, server.id)) - server = utils.wait_for_resource(server) - if not server: - raise Exception("Timeout waiting for server %s" % - server.id) + (self.snap_image.id, server_id)) + server = self.manager.waitForServer(server_id) + if server['status'] != 'ACTIVE': + raise Exception("Server %s for image id: %s status: %s" % + (server_id, self.snap_image.id, server['status'])) - admin_pass = None # server.adminPass - self.bootstrapServer(server, admin_pass, key) + ip = server.get('public_v4') + if not ip and self.manager.hasExtension('os-floating-ips'): + ip = self.manager.addPublicIP(server['server_id']) + if not ip: + raise Exception("Unable to find public IP of server") + server['public_v4'] = ip - image = utils.create_image(self.client, server, hostname) - self.snap_image.external_id = image.id + self.bootstrapServer(server, key) + + image_id = self.manager.createImage(server_id, hostname) + self.snap_image.external_id = image_id session.commit() self.log.debug("Image id: %s building image %s" % - (self.snap_image.id, image.id)) + (self.snap_image.id, image_id)) # It can take a _very_ long time for Rackspace 1.0 to save an image - image = utils.wait_for_resource(image, IMAGE_TIMEOUT) - if not image: - raise Exception("Timeout waiting for image %s" % - self.snap_image.id) + self.manager.waitForImage(image_id, IMAGE_TIMEOUT) if statsd: dt = int((time.time() - start_time) * 1000) @@ -363,28 +369,22 @@ class ImageUpdater(threading.Thread): try: # We made the snapshot, try deleting the server, but it's okay # if we fail. The reap script will find it and try again. - utils.delete_server(self.client, server) + self.manager.cleanupServer(server_id) except: self.log.exception("Exception encountered deleting server" " %s for image id: %s" % - (server.id, self.snap_image.id)) - - def bootstrapServer(self, server, admin_pass, key): - ip = utils.get_public_ip(server) - if not ip and 'os-floating-ips' in utils.get_extensions(self.client): - utils.add_public_ip(server) - ip = utils.get_public_ip(server) - if not ip: - raise Exception("Unable to find public ip of server") + (server_id, self.snap_image.id)) + def bootstrapServer(self, server, key): ssh_kwargs = {} if key: ssh_kwargs['pkey'] = key else: - ssh_kwargs['password'] = admin_pass + ssh_kwargs['password'] = server['admin_pass'] for username in ['root', 'ubuntu']: - host = utils.ssh_connect(ip, username, ssh_kwargs, + host = utils.ssh_connect(server['public_v4'], username, + ssh_kwargs, timeout=CONNECT_TIMEOUT) if host: break @@ -495,7 +495,8 @@ class NodePool(threading.Thread): newconfig.targets = {} newconfig.scriptdir = config.get('script-dir') newconfig.dburi = config.get('dburi') - newconfig.clients = {} + newconfig.managers = {} + stop_managers = [] for provider in config['providers']: p = Provider() @@ -509,23 +510,28 @@ class NodePool(threading.Thread): p.service_name = provider.get('service-name') p.region_name = provider.get('region-name') p.max_servers = provider['max-servers'] - oldclient = None + p.rate = provider.get('rate', 1.0) + oldmanager = None if self.config: - oldclient = self.config.clients.get(p.name) - if oldclient: - if (p.username != oldclient.client.user or - p.password != oldclient.client.password or - p.project_id != oldclient.client.projectid or - p.service_type != oldclient.client.service_type or - p.service_name != oldclient.client.service_name or - p.region_name != oldclient.client.region_name): - oldclient = None - if oldclient: - newconfig.clients[p.name] = oldclient + oldmanager = self.config.managers.get(p.name) + if oldmanager: + if (p.username != oldmanager.provider.username or + p.password != oldmanager.provider.password or + p.project_id != oldmanager.provider.project_id or + p.auth_url != oldmanager.provider.auth_url or + p.service_type != oldmanager.provider.service_type or + p.service_name != oldmanager.provider.service_name or + p.region_name != oldmanager.provider.region_name): + stop_managers.append(oldmanager) + oldmanager = None + if oldmanager: + newconfig.managers[p.name] = oldmanager else: - self.log.debug("Creating new client object for %s" % + self.log.debug("Creating new ProviderManager object for %s" % p.name) - newconfig.clients[p.name] = utils.get_client(p) + newconfig.managers[p.name] = \ + provider_manager.ProviderManager(p) + newconfig.managers[p.name].start() p.images = {} for image in provider['images']: i = ProviderImage() @@ -562,13 +568,15 @@ class NodePool(threading.Thread): i.providers[p.name] = p p.min_ready = provider['min-ready'] self.config = newconfig + for oldmanager in stop_managers: + oldmanager.stop() if self.config.dburi != self.dburi: self.dburi = self.config.dburi self.db = nodedb.NodeDatabase(self.config.dburi) self.startUpdateListeners(config['zmq-publishers']) - def getClient(self, provider): - return self.config.clients[provider.name] + def getManager(self, provider): + return self.config.managers[provider.name] def startUpdateListeners(self, publishers): running = set(self.zmq_listeners.keys()) @@ -696,7 +704,7 @@ class NodePool(threading.Thread): self.updateStats(session, node.provider_name) provider = self.config.providers[node.provider_name] target = self.config.targets[node.target_name] - client = self.getClient(provider) + manager = self.getManager(provider) if target.jenkins_url: jenkins = utils.get_jenkins(target.jenkins_url, @@ -707,7 +715,17 @@ class NodePool(threading.Thread): jenkins.delete_node(jenkins_name) self.log.info("Deleted jenkins node id: %s" % node.id) - utils.delete_node(client, node) + if node.external_id: + try: + server = manager.getServer(node.external_id) + self.log.debug('Deleting server %s for node id: %s' % + node.external_id, + node.id) + manager.cleanupServer(server['id']) + except provider_manager.NotFound: + pass + + node.delete() self.log.info("Deleted node id: %s" % node.id) if statsd: @@ -720,12 +738,32 @@ class NodePool(threading.Thread): self.updateStats(session, node.provider_name) def deleteImage(self, snap_image): - # Delete a node + # Delete an image (and its associated server) snap_image.state = nodedb.DELETE provider = self.config.providers[snap_image.provider_name] - client = self.getClient(provider) + manager = self.getManager(provider) - utils.delete_image(client, snap_image) + if snap_image.server_external_id: + try: + server = manager.getServer(snap_image.server_external_id) + self.log.debug('Deleting server %s for image id: %s' % + snap_image.server_external_id, + snap_image.id) + manager.cleanupServer(server['id']) + except provider_manager.NotFound: + self.log.warning('Image server id %s not found' % + snap_image.server_external_id) + + if snap_image.external_id: + try: + remote_image = manager.getImage(snap_image.external_id) + self.log.debug('Deleting image %s' % remote_image.id) + manager.deleteImage(remote_image['id']) + except provider_manager.NotFound: + self.log.warning('Image id %s not found' % + snap_image.external_id) + + snap_image.delete() self.log.info("Deleted image id: %s" % snap_image.id) def _doPeriodicCleanup(self): diff --git a/nodepool/nodeutils.py b/nodepool/nodeutils.py index 3bf01d4f5..35206e985 100644 --- a/nodepool/nodeutils.py +++ b/nodepool/nodeutils.py @@ -16,15 +16,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import novaclient.client import time -import paramiko import socket import logging import myjenkins from sshclient import SSHClient -import nodedb import fakeprovider log = logging.getLogger("nodepool.utils") @@ -40,22 +37,6 @@ def iterate_timeout(max_seconds, purpose): raise Exception("Timeout waiting for %s" % purpose) -def get_client(provider): - args = ['1.1', provider.username, provider.password, - provider.project_id, provider.auth_url] - kwargs = {} - if provider.service_type: - kwargs['service_type'] = provider.service_type - if provider.service_name: - kwargs['service_name'] = provider.service_name - if provider.region_name: - kwargs['region_name'] = provider.region_name - if provider.auth_url == 'fake': - return fakeprovider.FAKE_CLIENT - client = novaclient.client.Client(*args, **kwargs) - return client - - def get_jenkins(url, user, apikey): if apikey == 'fake': return fakeprovider.FakeJenkins() @@ -65,101 +46,6 @@ def get_jenkins(url, user, apikey): extension_cache = {} -def get_extensions(client): - global extension_cache - cache = extension_cache.get(client) - if cache: - return cache - try: - resp, body = client.client.get('/extensions') - extensions = [x['alias'] for x in body['extensions']] - except novaclient.exceptions.NotFound: - extensions = [] - extension_cache[client] = extensions - return extensions - - -def get_flavor(client, min_ram): - flavors = [f for f in client.flavors.list() if f.ram >= min_ram] - flavors.sort(lambda a, b: cmp(a.ram, b.ram)) - return flavors[0] - - -def get_public_ip(server, version=4): - if 'os-floating-ips' in get_extensions(server.manager.api): - log.debug('Server %s using floating ips' % server.id) - for addr in server.manager.api.floating_ips.list(): - if addr.instance_id == server.id: - return addr.ip - log.debug('Server %s not using floating ips, addresses: %s' % - (server.id, server.addresses)) - for addr in server.addresses.get('public', []): - if type(addr) == type(u''): # Rackspace/openstack 1.0 - return addr - if addr['version'] == version: # Rackspace/openstack 1.1 - return addr['addr'] - for addr in server.addresses.get('private', []): - # HPcloud - if (addr['version'] == version and version == 4): - quad = map(int, addr['addr'].split('.')) - if quad[0] == 10: - continue - if quad[0] == 192 and quad[1] == 168: - continue - if quad[0] == 172 and (16 <= quad[1] <= 31): - continue - return addr['addr'] - return None - - -def add_public_ip(server): - ip = server.manager.api.floating_ips.create() - log.debug('Created floading IP %s for server %s' % (ip, server.id)) - server.add_floating_ip(ip) - for count in iterate_timeout(600, "ip to be added"): - try: - newip = ip.manager.get(ip.id) - except Exception: - log.exception('Unable to get IP details for server %s, ' - 'will retry' % (server.id)) - continue - - if newip.instance_id == server.id: - return - - -def add_keypair(client, name): - key = paramiko.RSAKey.generate(2048) - public_key = key.get_name() + ' ' + key.get_base64() - kp = client.keypairs.create(name, public_key) - return key, kp - - -def wait_for_resource(wait_resource, timeout=3600): - last_progress = None - last_status = None - for count in iterate_timeout(timeout, "waiting for %s" % wait_resource): - try: - resource = wait_resource.manager.get(wait_resource.id) - except: - log.exception('Unable to list resources, while waiting for %s ' - 'will retry' % wait_resource) - continue - - # In Rackspace v1.0, there is no progress attribute while queued - if hasattr(resource, 'progress'): - if (last_progress != resource.progress - or last_status != resource.status): - log.debug('Status of %s: %s %s' % (resource, resource.status, - resource.progress)) - last_progress = resource.progress - elif last_status != resource.status: - log.debug('Status of %s: %s' % (resource, resource.status)) - last_status = resource.status - if resource.status == 'ACTIVE': - return resource - - def ssh_connect(ip, username, connect_kwargs={}, timeout=60): if ip == 'fake': return fakeprovider.FakeSSHClient() @@ -176,93 +62,3 @@ def ssh_connect(ip, username, connect_kwargs={}, timeout=60): if "access okay" in out: return client return None - - -def create_server(client, hostname, base_image, flavor, add_key=False): - create_kwargs = dict(image=base_image, flavor=flavor, name=hostname) - - key = None - # hpcloud can't handle dots in keypair names - if add_key: - key_name = hostname.split('.')[0] - if 'os-keypairs' in get_extensions(client): - key, kp = add_keypair(client, key_name) - create_kwargs['key_name'] = key_name - - server = client.servers.create(**create_kwargs) - return server, key - - -def create_image(client, server, image_name): - # TODO: fix novaclient so it returns an image here - # image = server.create_image(name) - uuid = server.manager.create_image(server, image_name) - image = client.images.get(uuid) - return image - - -def delete_server(client, server): - try: - if 'os-floating-ips' in get_extensions(server.manager.api): - for addr in server.manager.api.floating_ips.list(): - if addr.instance_id == server.id: - server.remove_floating_ip(addr) - addr.delete() - except: - log.exception('Unable to remove floating IP from server %s' % - server.id) - - try: - if 'os-keypairs' in get_extensions(server.manager.api): - for kp in server.manager.api.keypairs.list(): - if kp.name == server.key_name: - kp.delete() - except: - log.exception('Unable to delete keypair from server %s' % server.id) - - log.debug('Deleting server %s' % server.id) - server.delete() - for count in iterate_timeout(3600, "waiting for server %s deletion" % - server.id): - try: - client.servers.get(server.id) - except novaclient.exceptions.NotFound: - return - - -def delete_node(client, node): - node.state = nodedb.DELETE - if node.external_id: - try: - server = client.servers.get(node.external_id) - delete_server(client, server) - except novaclient.exceptions.NotFound: - pass - - node.delete() - - -def delete_image(client, image): - server = None - if image.server_external_id: - try: - server = client.servers.get(image.server_external_id) - except novaclient.exceptions.NotFound: - log.warning('Image server id %s not found' % - image.server_external_id) - - if server: - delete_server(client, server) - - remote_image = None - if image.external_id: - try: - remote_image = client.images.get(image.external_id) - except novaclient.exceptions.NotFound: - log.warning('Image id %s not found' % image.external_id) - - if remote_image: - log.debug('Deleting image %s' % remote_image.id) - remote_image.delete() - - image.delete() diff --git a/nodepool/provider_manager.py b/nodepool/provider_manager.py new file mode 100644 index 000000000..afc0da97c --- /dev/null +++ b/nodepool/provider_manager.py @@ -0,0 +1,424 @@ +#!/usr/bin/env python + +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import threading +import Queue +import logging +import paramiko +import novaclient +import novaclient.client +import time + +import fakeprovider + + +def iterate_timeout(max_seconds, purpose): + start = time.time() + count = 0 + while (time.time() < start + max_seconds): + count += 1 + yield count + time.sleep(2) + raise Exception("Timeout waiting for %s" % purpose) + + +class NotFound(Exception): + pass + + +class Task(object): + def __init__(self, **kw): + self._wait_event = threading.Event() + self._exception = None + self._traceback = None + self._result = None + self.args = kw + + def done(self, result): + self._result = result + self._wait_event.set() + + def exception(self, e, tb): + self._exception = e + self._traceback = tb + self._wait_event.set() + + def wait(self): + self._wait_event.wait() + if self._exception: + raise self._exception, None, self._traceback + return self._result + + def run(self, client): + try: + self.done(self.main(client)) + except Exception as e: + self.exception(e, sys.exc_info()[2]) + + +class CreateServerTask(Task): + def main(self, client): + server = client.servers.create(**self.args) + return server.id + + +class GetServerTask(Task): + def getPublicIP(self, server, version=4): + for addr in server.addresses.get('public', []): + if type(addr) == type(u''): # Rackspace/openstack 1.0 + return addr + if addr['version'] == version: # Rackspace/openstack 1.1 + return addr['addr'] + for addr in server.addresses.get('private', []): + # HPcloud + if (addr['version'] == version and version == 4): + quad = map(int, addr['addr'].split('.')) + if quad[0] == 10: + continue + if quad[0] == 192 and quad[1] == 168: + continue + if quad[0] == 172 and (16 <= quad[1] <= 31): + continue + return addr['addr'] + return None + + def main(self, client): + try: + server = client.servers.get(self.args['server_id']) + except novaclient.exceptions.NotFound: + raise NotFound() + d = dict(id=server.id, + status=server.status, + addresses=server.addresses) + if hasattr(server, 'adminPass'): + d['admin_pass'] = server.adminPass + if hasattr(server, 'key_name'): + d['key_name'] = server.key_name + if hasattr(server, 'progress'): + d['progress'] = server.progress + d['public_v4'] = self.getPublicIP(server) + return d + + +class DeleteServerTask(Task): + def main(self, client): + client.servers.delete(self.args['server_id']) + + +class AddKeypairTask(Task): + def main(self, client): + client.keypairs.create(**self.args) + + +class ListKeypairsTask(Task): + def main(self, client): + keys = client.keypairs.list() + return [dict(id=key.id, name=key.name) for + key in keys] + + +class DeleteKeypairTask(Task): + def main(self, client): + client.keypairs.delete(self.args['name']) + + +class CreateFloatingIPTask(Task): + def main(self, client): + ip = client.floating_ips.create() + return dict(id=ip.id, ip=ip.ip) + + +class AddFloatingIPTask(Task): + def main(self, client): + client.servers.add_floating_ip(**self.args) + + +class GetFloatingIPTask(Task): + def main(self, client): + ip = client.floating_ips.get(self.args['ip_id']) + return dict(id=ip.id, ip=ip.ip, instance_id=ip.instance_id) + + +class ListFloatingIPsTask(Task): + def main(self, client): + ips = client.floating_ips.list() + return [dict(id=ip.id, ip=ip.ip, instance_id=ip.instance_id) for + ip in ips] + + +class RemoveFloatingIPTask(Task): + def main(self, client): + client.servers.remove_floating_ip(**self.args) + + +class DeleteFloatingIPTask(Task): + def main(self, client): + client.floating_ips.delete(self.args['ip_id']) + + +class CreateImageTask(Task): + def main(self, client): + # This returns an id + return client.servers.create_image(**self.args) + + +class GetImageTask(Task): + def main(self, client): + try: + image = client.images.get(**self.args) + except novaclient.exceptions.NotFound: + raise NotFound() + d = dict(id=image.id, status=image.status) + if hasattr(image, 'progress'): + d['progress'] = image.progress + return d + + +class FindImageTask(Task): + def main(self, client): + image = client.images.find(**self.args) + return dict(id=image.id) + + +class DeleteImageTask(Task): + def main(self, client): + client.images.delete(**self.args) + + +class ProviderManager(threading.Thread): + log = logging.getLogger("nodepool.ProviderManager") + + def __init__(self, provider): + threading.Thread.__init__(self) + self.daemon = True + self.queue = Queue.Queue() + self._running = True + self.provider = provider + self._client = self._getClient() + self._flavors = self._getFlavors() + self._images = {} + self._extensions = self._getExtensions() + self._rate = float(provider.rate) + + def _getClient(self): + args = ['1.1', self.provider.username, self.provider.password, + self.provider.project_id, self.provider.auth_url] + kwargs = {} + if self.provider.service_type: + kwargs['service_type'] = self.provider.service_type + if self.provider.service_name: + kwargs['service_name'] = self.provider.service_name + if self.provider.region_name: + kwargs['region_name'] = self.provider.region_name + if self.provider.auth_url == 'fake': + return fakeprovider.FAKE_CLIENT + return novaclient.client.Client(*args, **kwargs) + + def _getFlavors(self): + l = [dict(id=f.id, ram=f.ram) for f in self._client.flavors.list()] + l.sort(lambda a, b: cmp(a['ram'], b['ram'])) + return l + + def _getExtensions(self): + try: + resp, body = self._client.client.get('/extensions') + return [x['alias'] for x in body['extensions']] + except novaclient.exceptions.NotFound: + return [] + + def hasExtension(self, extension): + if extension in self._extensions: + return True + return False + + def stop(self): + self._running = False + self.queue.put(None) + + def run(self): + last_ts = 0 + while self._running: + task = self.queue.get() + if not task: + continue + while True: + delta = time.time() - last_ts + if delta >= self._rate: + break + time.sleep(self._rate - delta) + self.log.debug("Provider %s running task %s" % (self.provider.name, + task)) + task.run(self._client) + last_ts = time.time() + self.queue.task_done() + + def submitTask(self, task): + self.queue.put(task) + return task.wait() + + def findFlavor(self, min_ram): + for f in self._flavors: + if f['ram'] >= min_ram: + return f + raise Exception("Unable to find flavor with min ram: %s" % min_ram) + + def findImage(self, name): + if name in self._images: + return self._images[name] + image = self.submitTask(FindImageTask(name=name)) + self._images[name] = image + return image + + def deleteImage(self, name): + if name in self._images: + del self._images[name] + return self.submitTask(DeleteImageTask(image=name)) + + def addKeypair(self, name): + key = paramiko.RSAKey.generate(2048) + public_key = key.get_name() + ' ' + key.get_base64() + self.submitTask(AddKeypairTask(name=name, public_key=public_key)) + return key + + def listKeypairs(self): + return self.submitTask(ListKeypairsTask()) + + def deleteKeypair(self, name): + return self.submitTask(DeleteKeypairTask(name=name)) + + def createServer(self, name, min_ram, image_id=None, + image_name=None, key_name=None): + if image_name: + image_id = self.findImage(image_name)['id'] + flavor = self.findFlavor(min_ram) + create_args = dict(name=name, image=image_id, flavor=flavor['id']) + if key_name: + create_args['key_name'] = key_name + + return self.submitTask(CreateServerTask(**create_args)) + + def getServer(self, server_id): + return self.submitTask(GetServerTask(server_id=server_id)) + + def getFloatingIP(self, ip_id): + return self.submitTask(GetFloatingIPTask(ip_id=ip_id)) + + def _waitForResource(self, resource_type, resource_id, timeout): + last_progress = None + last_status = None + for count in iterate_timeout(timeout, + "waiting for %s %s" % (resource_type, + resource_id)): + try: + if resource_type == 'server': + resource = self.getServer(resource_id) + elif resource_type == 'image': + resource = self.getImage(resource_id) + except: + self.log.exception('Unable to list %ss while waiting for ' + '%s will retry' % (resource_type, + resource_id)) + continue + + # In Rackspace v1.0, there is no progress attribute while queued + progress = resource.get('progress') + status = resource.get('status') + if (last_progress != progress or + last_status != status): + self.log.debug('Status of %s %s: %s %s' % + (resource_type, resource_id, + status, progress)) + last_status = status + last_progress = progress + if status == 'ACTIVE': + return resource + + def waitForServer(self, server_id, timeout=3600): + return self._waitForResource('server', server_id, timeout) + + def waitForImage(self, image_id, timeout=3600): + return self._waitForResource('image', image_id, timeout) + + def createFloatingIP(self): + return self.submitTask(CreateFloatingIPTask()) + + def addFloatingIP(self, server_id, address): + self.submitTask(AddFloatingIPTask(server=server_id, + address=address)) + + def addPublicIP(self, server_id): + ip = self.createFloatingIP() + self.addFloatingIP(server_id, ip['ip']) + for count in iterate_timeout(600, "ip to be added"): + try: + newip = self.getFloatingIP(ip['id']) + except Exception: + self.log.exception('Unable to get IP details for server %s, ' + 'will retry' % (server_id)) + continue + if newip['instance_id'] == server_id: + return newip['ip'] + + def createImage(self, server_id, image_name): + return self.submitTask(CreateImageTask(server=server_id, + image_name=image_name)) + + def getImage(self, image_id): + return self.submitTask(GetImageTask(image=image_id)) + + def listFloatingIPs(self): + return self.submitTask(ListFloatingIPsTask()) + + def removeFloatingIP(self, server_id, address): + return self.submitTask(RemoveFloatingIPTask(server=server_id, + address=address)) + + def deleteFloatingIP(self, ip_id): + return self.submitTask(DeleteFloatingIPTask(floating_ip=ip_id)) + + def deleteServer(self, server_id): + return self.submitTask(DeleteServerTask(server_id=server_id)) + + def cleanupServer(self, server_id): + server = self.getServer(server_id) + + if self.hasExtension('os-floating-ips'): + for ip in self.listFloatingIPs(): + if ip['instance_id'] == server_id: + self.log.debug('Deleting floating ip for server %s' % + server_id) + self.removeFloatingIP(server_id, ip['ip']) + self.deleteFloatingIP(ip['id']) + + if self.hasExtension('os-keypairs'): + for kp in self.listKeypairs(): + if kp['name'] == server['key_name']: + self.log.debug('Deleting keypair for server %s' % + server_id) + self.deleteKeypair(kp['name']) + + self.log.debug('Deleting server %s' % server_id) + self.deleteServer(server_id) + + for count in iterate_timeout(3600, "waiting for server %s deletion" % + server_id): + try: + self.getServer(server_id) + except NotFound: + return