Add ProviderManager

This is used to serialize all access to an individual provider
(nova client).  One ProviderManager is created for every provider
defined in the configuration.  Any actions that require interaction
with nova submit a task to the manager which processes them serially
with an appropriate delay to ensure that rate limits are not hit.

This solves not only rate-limit problems, but also ends multi-threaded
access to a single novaclient Client object.

Change-Id: I0cdaa747dac08cdbe4719cb6c9c220678b7a0320
This commit is contained in:
James E. Blair 2013-08-20 11:56:59 -07:00
parent d3386fb24a
commit 8dc6c870f2
4 changed files with 545 additions and 281 deletions

View File

@ -41,7 +41,9 @@ class FakeList(object):
if x.name == name: if x.name == name:
return x return x
def get(self, id): def get(self, image=None):
if image:
id = image
for x in self._list: for x in self._list:
if x.id == id: if x.id == id:
return x return x
@ -52,12 +54,16 @@ class FakeList(object):
obj.status = status obj.status = status
def delete(self, obj): def delete(self, obj):
self._list.remove(obj) if hasattr(obj, 'id'):
self._list.remove(obj)
else:
self._list.remove(self.get(obj))
def create(self, **kw): def create(self, **kw):
s = Dummy(id=uuid.uuid4().hex, s = Dummy(id=uuid.uuid4().hex,
name=kw['name'], name=kw['name'],
status='BUILD', status='BUILD',
adminPass='fake',
addresses=dict(public=[dict(version=4, addr='fake')]), addresses=dict(public=[dict(version=4, addr='fake')]),
manager=self) manager=self)
self._list.append(s) self._list.append(s)
@ -65,8 +71,8 @@ class FakeList(object):
t.start() t.start()
return s return s
def create_image(self, server, name): def create_image(self, server, image_name):
x = self.api.images.create(name=name) x = self.api.images.create(name=image_name)
return x.id return x.id

View File

@ -29,6 +29,7 @@ import zmq
import nodedb import nodedb
import nodeutils as utils import nodeutils as utils
import provider_manager
MINS = 60 MINS = 60
HOURS = 60 * MINS HOURS = 60 * MINS
@ -146,7 +147,7 @@ class NodeLauncher(threading.Thread):
self.log.debug("Launching node id: %s" % self.node_id) self.log.debug("Launching node id: %s" % self.node_id)
try: try:
self.node = session.getNode(self.node_id) self.node = session.getNode(self.node_id)
self.client = self.nodepool.getClient(self.provider) self.manager = self.nodepool.getManager(self.provider)
except Exception: except Exception:
self.log.exception("Exception preparing to launch node id: %s:" self.log.exception("Exception preparing to launch node id: %s:"
% self.node_id) % self.node_id)
@ -158,7 +159,7 @@ class NodeLauncher(threading.Thread):
self.log.exception("Exception launching node id: %s:" % self.log.exception("Exception launching node id: %s:" %
self.node_id) self.node_id)
try: try:
utils.delete_node(self.client, self.node) self.nodepool.deleteNode(session, self.node)
except Exception: except Exception:
self.log.exception("Exception deleting node id: %s:" % self.log.exception("Exception deleting node id: %s:" %
self.node_id) self.node_id)
@ -173,37 +174,33 @@ class NodeLauncher(threading.Thread):
self.node.nodename = hostname.split('.')[0] self.node.nodename = hostname.split('.')[0]
self.node.target_name = self.target.name self.node.target_name = self.target.name
flavor = utils.get_flavor(self.client, self.image.min_ram)
snap_image = session.getCurrentSnapshotImage( snap_image = session.getCurrentSnapshotImage(
self.provider.name, self.image.name) self.provider.name, self.image.name)
if not snap_image: if not snap_image:
raise Exception("Unable to find current snapshot image %s in %s" % raise Exception("Unable to find current snapshot image %s in %s" %
(self.image.name, self.provider.name)) (self.image.name, self.provider.name))
remote_snap_image = self.client.images.get(snap_image.external_id)
self.log.info("Creating server with hostname %s in %s from image %s " self.log.info("Creating server with hostname %s in %s from image %s "
"for node id: %s" % (hostname, self.provider.name, "for node id: %s" % (hostname, self.provider.name,
self.image.name, self.node_id)) self.image.name, self.node_id))
server = None server_id = self.manager.createServer(hostname,
server, key = utils.create_server(self.client, hostname, self.image.min_ram,
remote_snap_image, flavor) snap_image.external_id)
self.node.external_id = server.id self.node.external_id = server_id
session.commit() session.commit()
self.log.debug("Waiting for server %s for node id: %s" % self.log.debug("Waiting for server %s for node id: %s" %
(server.id, self.node.id)) (server_id, self.node.id))
server = self.manager.waitForServer(server_id)
server = utils.wait_for_resource(server) if server['status'] != 'ACTIVE':
if server.status != 'ACTIVE':
raise Exception("Server %s for node id: %s status: %s" % raise Exception("Server %s for node id: %s status: %s" %
(server.id, self.node.id, server.status)) (server_id, self.node.id, server['status']))
ip = utils.get_public_ip(server) ip = server.get('public_v4')
if not ip and 'os-floating-ips' in utils.get_extensions(self.client): if not ip and self.manager.hasExtension('os-floating-ips'):
utils.add_public_ip(server) ip = self.manager.addPublicIP(server['server_id'])
ip = utils.get_public_ip(server)
if not ip: if not ip:
raise Exception("Unable to find public ip of server") raise Exception("Unable to find public IP of server")
self.node.ip = ip self.node.ip = ip
self.log.debug("Node id: %s is running, testing ssh" % self.node.id) self.log.debug("Node id: %s is running, testing ssh" % self.node.id)
@ -288,7 +285,7 @@ class ImageUpdater(threading.Thread):
try: try:
self.snap_image = session.getSnapshotImage( self.snap_image = session.getSnapshotImage(
self.snap_image_id) self.snap_image_id)
self.client = self.nodepool.getClient(self.provider) self.manager = self.nodepool.getManager(self.provider)
except Exception: except Exception:
self.log.exception("Exception preparing to update image %s " self.log.exception("Exception preparing to update image %s "
"in %s:" % (self.image.name, "in %s:" % (self.image.name,
@ -302,7 +299,7 @@ class ImageUpdater(threading.Thread):
(self.image.name, self.provider.name)) (self.image.name, self.provider.name))
try: try:
if self.snap_image: if self.snap_image:
utils.delete_image(self.client, self.snap_image) self.nodepool.deleteImage(self.snap_image)
except Exception: except Exception:
self.log.exception("Exception deleting image id: %s:" % self.log.exception("Exception deleting image id: %s:" %
self.snap_image.id) self.snap_image.id)
@ -312,41 +309,50 @@ class ImageUpdater(threading.Thread):
start_time = time.time() start_time = time.time()
timestamp = int(start_time) timestamp = int(start_time)
flavor = utils.get_flavor(self.client, self.image.min_ram)
base_image = self.client.images.find(name=self.image.base_image)
hostname = ('%s-%s.template.openstack.org' % hostname = ('%s-%s.template.openstack.org' %
(self.image.name, str(timestamp))) (self.image.name, str(timestamp)))
self.log.info("Creating image id: %s for %s in %s" % self.log.info("Creating image id: %s for %s in %s" %
(self.snap_image.id, self.image.name, (self.snap_image.id, self.image.name,
self.provider.name)) self.provider.name))
server, key = utils.create_server( if self.manager.hasExtension('os-keypairs'):
self.client, hostname, base_image, flavor, add_key=True) key_name = hostname.split('.')[0]
key = self.manager.addKeypair(key_name)
else:
key_name = None
key = None
server_id = self.manager.createServer(hostname,
self.image.min_ram,
image_name=self.image.base_image,
key_name=key_name)
self.snap_image.hostname = hostname self.snap_image.hostname = hostname
self.snap_image.version = timestamp self.snap_image.version = timestamp
self.snap_image.server_external_id = server.id self.snap_image.server_external_id = server_id
session.commit() session.commit()
self.log.debug("Image id: %s waiting for server %s" % self.log.debug("Image id: %s waiting for server %s" %
(self.snap_image.id, server.id)) (self.snap_image.id, server_id))
server = utils.wait_for_resource(server) server = self.manager.waitForServer(server_id)
if not server: if server['status'] != 'ACTIVE':
raise Exception("Timeout waiting for server %s" % raise Exception("Server %s for image id: %s status: %s" %
server.id) (server_id, self.snap_image.id, server['status']))
admin_pass = None # server.adminPass ip = server.get('public_v4')
self.bootstrapServer(server, admin_pass, key) if not ip and self.manager.hasExtension('os-floating-ips'):
ip = self.manager.addPublicIP(server['server_id'])
if not ip:
raise Exception("Unable to find public IP of server")
server['public_v4'] = ip
image = utils.create_image(self.client, server, hostname) self.bootstrapServer(server, key)
self.snap_image.external_id = image.id
image_id = self.manager.createImage(server_id, hostname)
self.snap_image.external_id = image_id
session.commit() session.commit()
self.log.debug("Image id: %s building image %s" % self.log.debug("Image id: %s building image %s" %
(self.snap_image.id, image.id)) (self.snap_image.id, image_id))
# It can take a _very_ long time for Rackspace 1.0 to save an image # It can take a _very_ long time for Rackspace 1.0 to save an image
image = utils.wait_for_resource(image, IMAGE_TIMEOUT) self.manager.waitForImage(image_id, IMAGE_TIMEOUT)
if not image:
raise Exception("Timeout waiting for image %s" %
self.snap_image.id)
if statsd: if statsd:
dt = int((time.time() - start_time) * 1000) dt = int((time.time() - start_time) * 1000)
@ -363,28 +369,22 @@ class ImageUpdater(threading.Thread):
try: try:
# We made the snapshot, try deleting the server, but it's okay # We made the snapshot, try deleting the server, but it's okay
# if we fail. The reap script will find it and try again. # if we fail. The reap script will find it and try again.
utils.delete_server(self.client, server) self.manager.cleanupServer(server_id)
except: except:
self.log.exception("Exception encountered deleting server" self.log.exception("Exception encountered deleting server"
" %s for image id: %s" % " %s for image id: %s" %
(server.id, self.snap_image.id)) (server_id, self.snap_image.id))
def bootstrapServer(self, server, admin_pass, key):
ip = utils.get_public_ip(server)
if not ip and 'os-floating-ips' in utils.get_extensions(self.client):
utils.add_public_ip(server)
ip = utils.get_public_ip(server)
if not ip:
raise Exception("Unable to find public ip of server")
def bootstrapServer(self, server, key):
ssh_kwargs = {} ssh_kwargs = {}
if key: if key:
ssh_kwargs['pkey'] = key ssh_kwargs['pkey'] = key
else: else:
ssh_kwargs['password'] = admin_pass ssh_kwargs['password'] = server['admin_pass']
for username in ['root', 'ubuntu']: for username in ['root', 'ubuntu']:
host = utils.ssh_connect(ip, username, ssh_kwargs, host = utils.ssh_connect(server['public_v4'], username,
ssh_kwargs,
timeout=CONNECT_TIMEOUT) timeout=CONNECT_TIMEOUT)
if host: if host:
break break
@ -495,7 +495,8 @@ class NodePool(threading.Thread):
newconfig.targets = {} newconfig.targets = {}
newconfig.scriptdir = config.get('script-dir') newconfig.scriptdir = config.get('script-dir')
newconfig.dburi = config.get('dburi') newconfig.dburi = config.get('dburi')
newconfig.clients = {} newconfig.managers = {}
stop_managers = []
for provider in config['providers']: for provider in config['providers']:
p = Provider() p = Provider()
@ -509,23 +510,28 @@ class NodePool(threading.Thread):
p.service_name = provider.get('service-name') p.service_name = provider.get('service-name')
p.region_name = provider.get('region-name') p.region_name = provider.get('region-name')
p.max_servers = provider['max-servers'] p.max_servers = provider['max-servers']
oldclient = None p.rate = provider.get('rate', 1.0)
oldmanager = None
if self.config: if self.config:
oldclient = self.config.clients.get(p.name) oldmanager = self.config.managers.get(p.name)
if oldclient: if oldmanager:
if (p.username != oldclient.client.user or if (p.username != oldmanager.provider.username or
p.password != oldclient.client.password or p.password != oldmanager.provider.password or
p.project_id != oldclient.client.projectid or p.project_id != oldmanager.provider.project_id or
p.service_type != oldclient.client.service_type or p.auth_url != oldmanager.provider.auth_url or
p.service_name != oldclient.client.service_name or p.service_type != oldmanager.provider.service_type or
p.region_name != oldclient.client.region_name): p.service_name != oldmanager.provider.service_name or
oldclient = None p.region_name != oldmanager.provider.region_name):
if oldclient: stop_managers.append(oldmanager)
newconfig.clients[p.name] = oldclient oldmanager = None
if oldmanager:
newconfig.managers[p.name] = oldmanager
else: else:
self.log.debug("Creating new client object for %s" % self.log.debug("Creating new ProviderManager object for %s" %
p.name) p.name)
newconfig.clients[p.name] = utils.get_client(p) newconfig.managers[p.name] = \
provider_manager.ProviderManager(p)
newconfig.managers[p.name].start()
p.images = {} p.images = {}
for image in provider['images']: for image in provider['images']:
i = ProviderImage() i = ProviderImage()
@ -562,13 +568,15 @@ class NodePool(threading.Thread):
i.providers[p.name] = p i.providers[p.name] = p
p.min_ready = provider['min-ready'] p.min_ready = provider['min-ready']
self.config = newconfig self.config = newconfig
for oldmanager in stop_managers:
oldmanager.stop()
if self.config.dburi != self.dburi: if self.config.dburi != self.dburi:
self.dburi = self.config.dburi self.dburi = self.config.dburi
self.db = nodedb.NodeDatabase(self.config.dburi) self.db = nodedb.NodeDatabase(self.config.dburi)
self.startUpdateListeners(config['zmq-publishers']) self.startUpdateListeners(config['zmq-publishers'])
def getClient(self, provider): def getManager(self, provider):
return self.config.clients[provider.name] return self.config.managers[provider.name]
def startUpdateListeners(self, publishers): def startUpdateListeners(self, publishers):
running = set(self.zmq_listeners.keys()) running = set(self.zmq_listeners.keys())
@ -696,7 +704,7 @@ class NodePool(threading.Thread):
self.updateStats(session, node.provider_name) self.updateStats(session, node.provider_name)
provider = self.config.providers[node.provider_name] provider = self.config.providers[node.provider_name]
target = self.config.targets[node.target_name] target = self.config.targets[node.target_name]
client = self.getClient(provider) manager = self.getManager(provider)
if target.jenkins_url: if target.jenkins_url:
jenkins = utils.get_jenkins(target.jenkins_url, jenkins = utils.get_jenkins(target.jenkins_url,
@ -707,7 +715,17 @@ class NodePool(threading.Thread):
jenkins.delete_node(jenkins_name) jenkins.delete_node(jenkins_name)
self.log.info("Deleted jenkins node id: %s" % node.id) self.log.info("Deleted jenkins node id: %s" % node.id)
utils.delete_node(client, node) if node.external_id:
try:
server = manager.getServer(node.external_id)
self.log.debug('Deleting server %s for node id: %s' %
node.external_id,
node.id)
manager.cleanupServer(server['id'])
except provider_manager.NotFound:
pass
node.delete()
self.log.info("Deleted node id: %s" % node.id) self.log.info("Deleted node id: %s" % node.id)
if statsd: if statsd:
@ -720,12 +738,32 @@ class NodePool(threading.Thread):
self.updateStats(session, node.provider_name) self.updateStats(session, node.provider_name)
def deleteImage(self, snap_image): def deleteImage(self, snap_image):
# Delete a node # Delete an image (and its associated server)
snap_image.state = nodedb.DELETE snap_image.state = nodedb.DELETE
provider = self.config.providers[snap_image.provider_name] provider = self.config.providers[snap_image.provider_name]
client = self.getClient(provider) manager = self.getManager(provider)
utils.delete_image(client, snap_image) if snap_image.server_external_id:
try:
server = manager.getServer(snap_image.server_external_id)
self.log.debug('Deleting server %s for image id: %s' %
snap_image.server_external_id,
snap_image.id)
manager.cleanupServer(server['id'])
except provider_manager.NotFound:
self.log.warning('Image server id %s not found' %
snap_image.server_external_id)
if snap_image.external_id:
try:
remote_image = manager.getImage(snap_image.external_id)
self.log.debug('Deleting image %s' % remote_image.id)
manager.deleteImage(remote_image['id'])
except provider_manager.NotFound:
self.log.warning('Image id %s not found' %
snap_image.external_id)
snap_image.delete()
self.log.info("Deleted image id: %s" % snap_image.id) self.log.info("Deleted image id: %s" % snap_image.id)
def _doPeriodicCleanup(self): def _doPeriodicCleanup(self):

View File

@ -16,15 +16,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import novaclient.client
import time import time
import paramiko
import socket import socket
import logging import logging
import myjenkins import myjenkins
from sshclient import SSHClient from sshclient import SSHClient
import nodedb
import fakeprovider import fakeprovider
log = logging.getLogger("nodepool.utils") log = logging.getLogger("nodepool.utils")
@ -40,22 +37,6 @@ def iterate_timeout(max_seconds, purpose):
raise Exception("Timeout waiting for %s" % purpose) raise Exception("Timeout waiting for %s" % purpose)
def get_client(provider):
args = ['1.1', provider.username, provider.password,
provider.project_id, provider.auth_url]
kwargs = {}
if provider.service_type:
kwargs['service_type'] = provider.service_type
if provider.service_name:
kwargs['service_name'] = provider.service_name
if provider.region_name:
kwargs['region_name'] = provider.region_name
if provider.auth_url == 'fake':
return fakeprovider.FAKE_CLIENT
client = novaclient.client.Client(*args, **kwargs)
return client
def get_jenkins(url, user, apikey): def get_jenkins(url, user, apikey):
if apikey == 'fake': if apikey == 'fake':
return fakeprovider.FakeJenkins() return fakeprovider.FakeJenkins()
@ -65,101 +46,6 @@ def get_jenkins(url, user, apikey):
extension_cache = {} extension_cache = {}
def get_extensions(client):
global extension_cache
cache = extension_cache.get(client)
if cache:
return cache
try:
resp, body = client.client.get('/extensions')
extensions = [x['alias'] for x in body['extensions']]
except novaclient.exceptions.NotFound:
extensions = []
extension_cache[client] = extensions
return extensions
def get_flavor(client, min_ram):
flavors = [f for f in client.flavors.list() if f.ram >= min_ram]
flavors.sort(lambda a, b: cmp(a.ram, b.ram))
return flavors[0]
def get_public_ip(server, version=4):
if 'os-floating-ips' in get_extensions(server.manager.api):
log.debug('Server %s using floating ips' % server.id)
for addr in server.manager.api.floating_ips.list():
if addr.instance_id == server.id:
return addr.ip
log.debug('Server %s not using floating ips, addresses: %s' %
(server.id, server.addresses))
for addr in server.addresses.get('public', []):
if type(addr) == type(u''): # Rackspace/openstack 1.0
return addr
if addr['version'] == version: # Rackspace/openstack 1.1
return addr['addr']
for addr in server.addresses.get('private', []):
# HPcloud
if (addr['version'] == version and version == 4):
quad = map(int, addr['addr'].split('.'))
if quad[0] == 10:
continue
if quad[0] == 192 and quad[1] == 168:
continue
if quad[0] == 172 and (16 <= quad[1] <= 31):
continue
return addr['addr']
return None
def add_public_ip(server):
ip = server.manager.api.floating_ips.create()
log.debug('Created floading IP %s for server %s' % (ip, server.id))
server.add_floating_ip(ip)
for count in iterate_timeout(600, "ip to be added"):
try:
newip = ip.manager.get(ip.id)
except Exception:
log.exception('Unable to get IP details for server %s, '
'will retry' % (server.id))
continue
if newip.instance_id == server.id:
return
def add_keypair(client, name):
key = paramiko.RSAKey.generate(2048)
public_key = key.get_name() + ' ' + key.get_base64()
kp = client.keypairs.create(name, public_key)
return key, kp
def wait_for_resource(wait_resource, timeout=3600):
last_progress = None
last_status = None
for count in iterate_timeout(timeout, "waiting for %s" % wait_resource):
try:
resource = wait_resource.manager.get(wait_resource.id)
except:
log.exception('Unable to list resources, while waiting for %s '
'will retry' % wait_resource)
continue
# In Rackspace v1.0, there is no progress attribute while queued
if hasattr(resource, 'progress'):
if (last_progress != resource.progress
or last_status != resource.status):
log.debug('Status of %s: %s %s' % (resource, resource.status,
resource.progress))
last_progress = resource.progress
elif last_status != resource.status:
log.debug('Status of %s: %s' % (resource, resource.status))
last_status = resource.status
if resource.status == 'ACTIVE':
return resource
def ssh_connect(ip, username, connect_kwargs={}, timeout=60): def ssh_connect(ip, username, connect_kwargs={}, timeout=60):
if ip == 'fake': if ip == 'fake':
return fakeprovider.FakeSSHClient() return fakeprovider.FakeSSHClient()
@ -176,93 +62,3 @@ def ssh_connect(ip, username, connect_kwargs={}, timeout=60):
if "access okay" in out: if "access okay" in out:
return client return client
return None return None
def create_server(client, hostname, base_image, flavor, add_key=False):
create_kwargs = dict(image=base_image, flavor=flavor, name=hostname)
key = None
# hpcloud can't handle dots in keypair names
if add_key:
key_name = hostname.split('.')[0]
if 'os-keypairs' in get_extensions(client):
key, kp = add_keypair(client, key_name)
create_kwargs['key_name'] = key_name
server = client.servers.create(**create_kwargs)
return server, key
def create_image(client, server, image_name):
# TODO: fix novaclient so it returns an image here
# image = server.create_image(name)
uuid = server.manager.create_image(server, image_name)
image = client.images.get(uuid)
return image
def delete_server(client, server):
try:
if 'os-floating-ips' in get_extensions(server.manager.api):
for addr in server.manager.api.floating_ips.list():
if addr.instance_id == server.id:
server.remove_floating_ip(addr)
addr.delete()
except:
log.exception('Unable to remove floating IP from server %s' %
server.id)
try:
if 'os-keypairs' in get_extensions(server.manager.api):
for kp in server.manager.api.keypairs.list():
if kp.name == server.key_name:
kp.delete()
except:
log.exception('Unable to delete keypair from server %s' % server.id)
log.debug('Deleting server %s' % server.id)
server.delete()
for count in iterate_timeout(3600, "waiting for server %s deletion" %
server.id):
try:
client.servers.get(server.id)
except novaclient.exceptions.NotFound:
return
def delete_node(client, node):
node.state = nodedb.DELETE
if node.external_id:
try:
server = client.servers.get(node.external_id)
delete_server(client, server)
except novaclient.exceptions.NotFound:
pass
node.delete()
def delete_image(client, image):
server = None
if image.server_external_id:
try:
server = client.servers.get(image.server_external_id)
except novaclient.exceptions.NotFound:
log.warning('Image server id %s not found' %
image.server_external_id)
if server:
delete_server(client, server)
remote_image = None
if image.external_id:
try:
remote_image = client.images.get(image.external_id)
except novaclient.exceptions.NotFound:
log.warning('Image id %s not found' % image.external_id)
if remote_image:
log.debug('Deleting image %s' % remote_image.id)
remote_image.delete()
image.delete()

View File

@ -0,0 +1,424 @@
#!/usr/bin/env python
# Copyright (C) 2011-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import threading
import Queue
import logging
import paramiko
import novaclient
import novaclient.client
import time
import fakeprovider
def iterate_timeout(max_seconds, purpose):
start = time.time()
count = 0
while (time.time() < start + max_seconds):
count += 1
yield count
time.sleep(2)
raise Exception("Timeout waiting for %s" % purpose)
class NotFound(Exception):
pass
class Task(object):
def __init__(self, **kw):
self._wait_event = threading.Event()
self._exception = None
self._traceback = None
self._result = None
self.args = kw
def done(self, result):
self._result = result
self._wait_event.set()
def exception(self, e, tb):
self._exception = e
self._traceback = tb
self._wait_event.set()
def wait(self):
self._wait_event.wait()
if self._exception:
raise self._exception, None, self._traceback
return self._result
def run(self, client):
try:
self.done(self.main(client))
except Exception as e:
self.exception(e, sys.exc_info()[2])
class CreateServerTask(Task):
def main(self, client):
server = client.servers.create(**self.args)
return server.id
class GetServerTask(Task):
def getPublicIP(self, server, version=4):
for addr in server.addresses.get('public', []):
if type(addr) == type(u''): # Rackspace/openstack 1.0
return addr
if addr['version'] == version: # Rackspace/openstack 1.1
return addr['addr']
for addr in server.addresses.get('private', []):
# HPcloud
if (addr['version'] == version and version == 4):
quad = map(int, addr['addr'].split('.'))
if quad[0] == 10:
continue
if quad[0] == 192 and quad[1] == 168:
continue
if quad[0] == 172 and (16 <= quad[1] <= 31):
continue
return addr['addr']
return None
def main(self, client):
try:
server = client.servers.get(self.args['server_id'])
except novaclient.exceptions.NotFound:
raise NotFound()
d = dict(id=server.id,
status=server.status,
addresses=server.addresses)
if hasattr(server, 'adminPass'):
d['admin_pass'] = server.adminPass
if hasattr(server, 'key_name'):
d['key_name'] = server.key_name
if hasattr(server, 'progress'):
d['progress'] = server.progress
d['public_v4'] = self.getPublicIP(server)
return d
class DeleteServerTask(Task):
def main(self, client):
client.servers.delete(self.args['server_id'])
class AddKeypairTask(Task):
def main(self, client):
client.keypairs.create(**self.args)
class ListKeypairsTask(Task):
def main(self, client):
keys = client.keypairs.list()
return [dict(id=key.id, name=key.name) for
key in keys]
class DeleteKeypairTask(Task):
def main(self, client):
client.keypairs.delete(self.args['name'])
class CreateFloatingIPTask(Task):
def main(self, client):
ip = client.floating_ips.create()
return dict(id=ip.id, ip=ip.ip)
class AddFloatingIPTask(Task):
def main(self, client):
client.servers.add_floating_ip(**self.args)
class GetFloatingIPTask(Task):
def main(self, client):
ip = client.floating_ips.get(self.args['ip_id'])
return dict(id=ip.id, ip=ip.ip, instance_id=ip.instance_id)
class ListFloatingIPsTask(Task):
def main(self, client):
ips = client.floating_ips.list()
return [dict(id=ip.id, ip=ip.ip, instance_id=ip.instance_id) for
ip in ips]
class RemoveFloatingIPTask(Task):
def main(self, client):
client.servers.remove_floating_ip(**self.args)
class DeleteFloatingIPTask(Task):
def main(self, client):
client.floating_ips.delete(self.args['ip_id'])
class CreateImageTask(Task):
def main(self, client):
# This returns an id
return client.servers.create_image(**self.args)
class GetImageTask(Task):
def main(self, client):
try:
image = client.images.get(**self.args)
except novaclient.exceptions.NotFound:
raise NotFound()
d = dict(id=image.id, status=image.status)
if hasattr(image, 'progress'):
d['progress'] = image.progress
return d
class FindImageTask(Task):
def main(self, client):
image = client.images.find(**self.args)
return dict(id=image.id)
class DeleteImageTask(Task):
def main(self, client):
client.images.delete(**self.args)
class ProviderManager(threading.Thread):
log = logging.getLogger("nodepool.ProviderManager")
def __init__(self, provider):
threading.Thread.__init__(self)
self.daemon = True
self.queue = Queue.Queue()
self._running = True
self.provider = provider
self._client = self._getClient()
self._flavors = self._getFlavors()
self._images = {}
self._extensions = self._getExtensions()
self._rate = float(provider.rate)
def _getClient(self):
args = ['1.1', self.provider.username, self.provider.password,
self.provider.project_id, self.provider.auth_url]
kwargs = {}
if self.provider.service_type:
kwargs['service_type'] = self.provider.service_type
if self.provider.service_name:
kwargs['service_name'] = self.provider.service_name
if self.provider.region_name:
kwargs['region_name'] = self.provider.region_name
if self.provider.auth_url == 'fake':
return fakeprovider.FAKE_CLIENT
return novaclient.client.Client(*args, **kwargs)
def _getFlavors(self):
l = [dict(id=f.id, ram=f.ram) for f in self._client.flavors.list()]
l.sort(lambda a, b: cmp(a['ram'], b['ram']))
return l
def _getExtensions(self):
try:
resp, body = self._client.client.get('/extensions')
return [x['alias'] for x in body['extensions']]
except novaclient.exceptions.NotFound:
return []
def hasExtension(self, extension):
if extension in self._extensions:
return True
return False
def stop(self):
self._running = False
self.queue.put(None)
def run(self):
last_ts = 0
while self._running:
task = self.queue.get()
if not task:
continue
while True:
delta = time.time() - last_ts
if delta >= self._rate:
break
time.sleep(self._rate - delta)
self.log.debug("Provider %s running task %s" % (self.provider.name,
task))
task.run(self._client)
last_ts = time.time()
self.queue.task_done()
def submitTask(self, task):
self.queue.put(task)
return task.wait()
def findFlavor(self, min_ram):
for f in self._flavors:
if f['ram'] >= min_ram:
return f
raise Exception("Unable to find flavor with min ram: %s" % min_ram)
def findImage(self, name):
if name in self._images:
return self._images[name]
image = self.submitTask(FindImageTask(name=name))
self._images[name] = image
return image
def deleteImage(self, name):
if name in self._images:
del self._images[name]
return self.submitTask(DeleteImageTask(image=name))
def addKeypair(self, name):
key = paramiko.RSAKey.generate(2048)
public_key = key.get_name() + ' ' + key.get_base64()
self.submitTask(AddKeypairTask(name=name, public_key=public_key))
return key
def listKeypairs(self):
return self.submitTask(ListKeypairsTask())
def deleteKeypair(self, name):
return self.submitTask(DeleteKeypairTask(name=name))
def createServer(self, name, min_ram, image_id=None,
image_name=None, key_name=None):
if image_name:
image_id = self.findImage(image_name)['id']
flavor = self.findFlavor(min_ram)
create_args = dict(name=name, image=image_id, flavor=flavor['id'])
if key_name:
create_args['key_name'] = key_name
return self.submitTask(CreateServerTask(**create_args))
def getServer(self, server_id):
return self.submitTask(GetServerTask(server_id=server_id))
def getFloatingIP(self, ip_id):
return self.submitTask(GetFloatingIPTask(ip_id=ip_id))
def _waitForResource(self, resource_type, resource_id, timeout):
last_progress = None
last_status = None
for count in iterate_timeout(timeout,
"waiting for %s %s" % (resource_type,
resource_id)):
try:
if resource_type == 'server':
resource = self.getServer(resource_id)
elif resource_type == 'image':
resource = self.getImage(resource_id)
except:
self.log.exception('Unable to list %ss while waiting for '
'%s will retry' % (resource_type,
resource_id))
continue
# In Rackspace v1.0, there is no progress attribute while queued
progress = resource.get('progress')
status = resource.get('status')
if (last_progress != progress or
last_status != status):
self.log.debug('Status of %s %s: %s %s' %
(resource_type, resource_id,
status, progress))
last_status = status
last_progress = progress
if status == 'ACTIVE':
return resource
def waitForServer(self, server_id, timeout=3600):
return self._waitForResource('server', server_id, timeout)
def waitForImage(self, image_id, timeout=3600):
return self._waitForResource('image', image_id, timeout)
def createFloatingIP(self):
return self.submitTask(CreateFloatingIPTask())
def addFloatingIP(self, server_id, address):
self.submitTask(AddFloatingIPTask(server=server_id,
address=address))
def addPublicIP(self, server_id):
ip = self.createFloatingIP()
self.addFloatingIP(server_id, ip['ip'])
for count in iterate_timeout(600, "ip to be added"):
try:
newip = self.getFloatingIP(ip['id'])
except Exception:
self.log.exception('Unable to get IP details for server %s, '
'will retry' % (server_id))
continue
if newip['instance_id'] == server_id:
return newip['ip']
def createImage(self, server_id, image_name):
return self.submitTask(CreateImageTask(server=server_id,
image_name=image_name))
def getImage(self, image_id):
return self.submitTask(GetImageTask(image=image_id))
def listFloatingIPs(self):
return self.submitTask(ListFloatingIPsTask())
def removeFloatingIP(self, server_id, address):
return self.submitTask(RemoveFloatingIPTask(server=server_id,
address=address))
def deleteFloatingIP(self, ip_id):
return self.submitTask(DeleteFloatingIPTask(floating_ip=ip_id))
def deleteServer(self, server_id):
return self.submitTask(DeleteServerTask(server_id=server_id))
def cleanupServer(self, server_id):
server = self.getServer(server_id)
if self.hasExtension('os-floating-ips'):
for ip in self.listFloatingIPs():
if ip['instance_id'] == server_id:
self.log.debug('Deleting floating ip for server %s' %
server_id)
self.removeFloatingIP(server_id, ip['ip'])
self.deleteFloatingIP(ip['id'])
if self.hasExtension('os-keypairs'):
for kp in self.listKeypairs():
if kp['name'] == server['key_name']:
self.log.debug('Deleting keypair for server %s' %
server_id)
self.deleteKeypair(kp['name'])
self.log.debug('Deleting server %s' % server_id)
self.deleteServer(server_id)
for count in iterate_timeout(3600, "waiting for server %s deletion" %
server_id):
try:
self.getServer(server_id)
except NotFound:
return