Initial commit

Much of this comes from devstack-gate.

Change-Id: I7af197743cdf9523318605b6e85d2cc747a356c7
changes/05/41805/7
James E. Blair 10 years ago
parent a3db12fae9
commit 5866f10601

15
.gitignore vendored

@ -0,0 +1,15 @@
*.egg
*.egg-info
*.pyc
.test
.testrepository
.tox
AUTHORS
build/*
ChangeLog
config
doc/build/*
zuul/versioninfo
dist/
venv/
nodepool.yaml

@ -0,0 +1,104 @@
#!/usr/bin/env python
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import daemon
import extras
# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
# instead it depends on lockfile-0.9.1 which uses pidfile.
pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
import logging.config
import os
import sys
import signal
import nodepool.nodepool
class NodePoolDaemon(object):
def __init__(self):
self.args = None
def parse_arguments(self):
parser = argparse.ArgumentParser(description='Node pool.')
parser.add_argument('-c', dest='config',
help='path to config file')
parser.add_argument('-d', dest='nodaemon', action='store_true',
help='do not run as a daemon')
parser.add_argument('-l', dest='logconfig',
help='path to log config file')
parser.add_argument('-p', dest='pidfile',
help='path to pid file',
default='/var/run/nodepool/nodepool.pid')
parser.add_argument('--version', dest='version', action='store_true',
help='show version')
self.args = parser.parse_args()
def setup_logging(self):
if self.args.logconfig:
fp = os.path.expanduser(self.args.logconfig)
if not os.path.exists(fp):
raise Exception("Unable to read logging config file at %s" %
fp)
logging.config.fileConfig(fp)
else:
logging.basicConfig(level=logging.DEBUG)
def exit_handler(self, signum, frame):
self.pool.stop()
def term_handler(self, signum, frame):
os._exit(0)
def main(self):
self.setup_logging()
self.pool = nodepool.nodepool.NodePool(self.args.config)
signal.signal(signal.SIGUSR1, self.exit_handler)
signal.signal(signal.SIGTERM, self.term_handler)
self.pool.start()
while True:
try:
signal.pause()
except KeyboardInterrupt:
return self.exit_handler(signal.SIGINT, None)
def main():
npd = NodePoolDaemon()
npd.parse_arguments()
if npd.args.version:
from nodepool.version import version_info as npd_version_info
print "Nodepool version: %s" % npd_version_info.version_string()
return(0)
pid = pid_file_module.TimeoutPIDLockFile(npd.args.pidfile, 10)
if npd.args.nodaemon:
npd.main()
else:
with daemon.DaemonContext(pidfile=pid):
npd.main()
if __name__ == "__main__":
sys.path.insert(0, '.')
sys.exit(main())

@ -0,0 +1,133 @@
#!/usr/bin/env python
# Copyright 2011-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import jenkins
import json
import urllib
import urllib2
from jenkins import JenkinsException, NODE_TYPE, CREATE_NODE
TOGGLE_OFFLINE = '/computer/%(name)s/toggleOffline?offlineMessage=%(msg)s'
CONFIG_NODE = '/computer/%(name)s/config.xml'
class Jenkins(jenkins.Jenkins):
def disable_node(self, name, msg=''):
'''
Disable a node
@param name: Jenkins node name
@type name: str
@param msg: Offline message
@type msg: str
'''
info = self.get_node_info(name)
if info['offline']:
return
self.jenkins_open(
urllib2.Request(self.server + TOGGLE_OFFLINE % locals()))
def enable_node(self, name):
'''
Enable a node
@param name: Jenkins node name
@type name: str
'''
info = self.get_node_info(name)
if not info['offline']:
return
msg = ''
self.jenkins_open(
urllib2.Request(self.server + TOGGLE_OFFLINE % locals()))
def get_node_config(self, name):
'''
Get the configuration for a node.
:param name: Jenkins node name, ``str``
'''
get_config_url = self.server + CONFIG_NODE % locals()
return self.jenkins_open(urllib2.Request(get_config_url))
def reconfig_node(self, name, config_xml):
'''
Change the configuration for an existing node.
:param name: Jenkins node name, ``str``
:param config_xml: New XML configuration, ``str``
'''
headers = {'Content-Type': 'text/xml'}
reconfig_url = self.server + CONFIG_NODE % locals()
self.jenkins_open(urllib2.Request(reconfig_url, config_xml, headers))
def create_node(self, name, numExecutors=2, nodeDescription=None,
remoteFS='/var/lib/jenkins', labels=None, exclusive=False,
launcher='hudson.slaves.JNLPLauncher', launcher_params={}):
'''
@param name: name of node to create
@type name: str
@param numExecutors: number of executors for node
@type numExecutors: int
@param nodeDescription: Description of node
@type nodeDescription: str
@param remoteFS: Remote filesystem location to use
@type remoteFS: str
@param labels: Labels to associate with node
@type labels: str
@param exclusive: Use this node for tied jobs only
@type exclusive: boolean
@param launcher: The launch method for the slave
@type launcher: str
@param launcher_params: Additional parameters for the launcher
@type launcher_params: dict
'''
if self.node_exists(name):
raise JenkinsException('node[%s] already exists' % (name))
mode = 'NORMAL'
if exclusive:
mode = 'EXCLUSIVE'
#hudson.plugins.sshslaves.SSHLauncher
#hudson.slaves.CommandLauncher
#hudson.os.windows.ManagedWindowsServiceLauncher
launcher_params['stapler-class'] = launcher
inner_params = {
'name': name,
'nodeDescription': nodeDescription,
'numExecutors': numExecutors,
'remoteFS': remoteFS,
'labelString': labels,
'mode': mode,
'type': NODE_TYPE,
'retentionStrategy': {
'stapler-class': 'hudson.slaves.RetentionStrategy$Always'},
'nodeProperties': {'stapler-class-bag': 'true'},
'launcher': launcher_params
}
params = {
'name': name,
'type': NODE_TYPE,
'json': json.dumps(inner_params)
}
self.jenkins_open(urllib2.Request(
self.server + CREATE_NODE % urllib.urlencode(params)))
if not self.node_exists(name):
raise JenkinsException('create[%s] failed' % (name))

@ -0,0 +1,272 @@
#!/usr/bin/env python
# Copyright (C) 2011-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
# States:
# The cloud provider is building this machine. We have an ID, but it's
# not ready for use.
BUILDING = 1
# The machine is ready for use.
READY = 2
# This can mean in-use, or used but complete.
USED = 3
# Delete this machine immediately.
DELETE = 4
# Keep this machine indefinitely.
HOLD = 5
STATE_NAMES = {
BUILDING: 'building',
READY: 'ready',
USED: 'used',
DELETE: 'delete',
HOLD: 'hold',
}
from sqlalchemy import Table, Column, Integer, String, \
MetaData, create_engine
from sqlalchemy.orm import mapper
from sqlalchemy.orm.session import Session, sessionmaker
metadata = MetaData()
snapshot_image_table = Table(
'snapshot_image', metadata,
Column('id', Integer, primary_key=True),
Column('provider_name', String(255), index=True, nullable=False),
Column('image_name', String(255), index=True, nullable=False),
# Image hostname
Column('hostname', String(255)),
# Version indicator (timestamp)
Column('version', Integer),
# Provider assigned id for this image
Column('external_id', String(255)),
# Provider assigned id of the server used to create the snapshot
Column('server_external_id', String(255)),
# One of the above values
Column('state', Integer),
# Time of last state change
Column('state_time', Integer),
)
node_table = Table(
'node', metadata,
Column('id', Integer, primary_key=True),
Column('provider_name', String(255), index=True, nullable=False),
Column('image_name', String(255), index=True, nullable=False),
Column('target_name', String(255)),
# Machine name
Column('hostname', String(255), index=True),
# Eg, jenkins node name
Column('nodename', String(255), index=True),
# Provider assigned id for this machine
Column('external_id', String(255)),
# Primary IP address
Column('ip', String(255)),
# One of the above values
Column('state', Integer),
# Time of last state change
Column('state_time', Integer),
)
class SnapshotImage(object):
def __init__(self, provider_name, image_name, hostname=None, version=None,
external_id=None, server_external_id=None, state=BUILDING):
self.provider_name = provider_name
self.image_name = image_name
self.hostname = hostname
self.version = version
self.external_id = external_id
self.server_external_id = server_external_id
self.state = state
def delete(self):
session = Session.object_session(self)
session.delete(self)
session.commit()
@property
def state(self):
return self._state
@state.setter
def state(self, state):
self._state = state
self.state_time = int(time.time())
session = Session.object_session(self)
if session:
session.commit()
class Node(object):
def __init__(self, provider_name, image_name, hostname=None,
external_id=None, ip=None, state=BUILDING):
self.provider_name = provider_name
self.image_name = image_name
self.external_id = external_id
self.ip = ip
self.hostname = hostname
self.state = state
def delete(self):
session = Session.object_session(self)
session.delete(self)
session.commit()
@property
def state(self):
return self._state
@state.setter
def state(self, state):
self._state = state
self.state_time = int(time.time())
session = Session.object_session(self)
if session:
session.commit()
mapper(Node, node_table,
properties=dict(_state=node_table.c.state))
mapper(SnapshotImage, snapshot_image_table,
properties=dict(_state=snapshot_image_table.c.state))
class NodeDatabase(object):
def __init__(self, path=os.path.expanduser("~/nodes.db")):
engine = create_engine('sqlite:///%s' % path, echo=False)
metadata.create_all(engine)
Session = sessionmaker(bind=engine, autoflush=True, autocommit=False)
self.session = Session()
def print_state(self):
for provider_name in self.getProviders():
print 'Provider:', provider_name
for image_name in self.getImages(provider_name):
print ' Base image:', image_name
current = self.getCurrentSnapshotImage(
provider_name, image_name)
for snapshot_image in self.getSnapshotImages():
if (snapshot_image.provider_name != provider_name or
snapshot_image.image_name != image_name):
continue
is_current = ('[current]' if current == snapshot_image
else '')
print ' Snapshot: %s %s %s' % (
snapshot_image.hostname,
STATE_NAMES[snapshot_image.state],
is_current)
for node in self.getNodes():
print ' Node: %s %s %s %s %s' % (
node.id, node.hostname, STATE_NAMES[node.state],
node.state_time, node.ip)
def abort(self):
self.session.rollback()
def commit(self):
self.session.commit()
def delete(self, obj):
self.session.delete(obj)
def getProviders(self):
return [
x.provider_name for x in
self.session.query(SnapshotImage).distinct(
snapshot_image_table.c.provider_name).all()]
def getImages(self, provider_name):
return [
x.image_name for x in
self.session.query(SnapshotImage).filter(
snapshot_image_table.c.provider_name == provider_name
).distinct(snapshot_image_table.c.image_name).all()]
def getSnapshotImages(self):
return self.session.query(SnapshotImage).order_by(
snapshot_image_table.c.provider_name,
snapshot_image_table.c.image_name).all()
def getSnapshotImage(self, id):
images = self.session.query(SnapshotImage).filter_by(id=id).all()
if not images:
return None
return images[0]
def getCurrentSnapshotImage(self, provider_name, image_name):
images = self.session.query(SnapshotImage).filter(
snapshot_image_table.c.provider_name == provider_name,
snapshot_image_table.c.image_name == image_name,
snapshot_image_table.c.state == READY).order_by(
snapshot_image_table.c.version.desc()).all()
if not images:
return None
return images[0]
def createSnapshotImage(self, *args, **kwargs):
new = SnapshotImage(*args, **kwargs)
self.session.add(new)
self.session.commit()
return new
def getNodes(self, provider_name=None, image_name=None, target_name=None,
state=None):
exp = self.session.query(Node).order_by(
node_table.c.provider_name,
node_table.c.image_name)
if provider_name:
exp = exp.filter_by(provider_name=provider_name)
if image_name:
exp = exp.filter_by(image_name=image_name)
if target_name:
exp = exp.filter_by(target_name=target_name)
if state:
exp = exp.filter(node_table.c.state == state)
return exp.all()
def createNode(self, *args, **kwargs):
new = Node(*args, **kwargs)
self.session.add(new)
self.session.commit()
return new
def getNode(self, id):
nodes = self.session.query(Node).filter_by(id=id).all()
if not nodes:
return None
return nodes[0]
def getNodeByHostname(self, hostname):
nodes = self.session.query(Node).filter_by(hostname=hostname).all()
if not nodes:
return None
return nodes[0]
def getNodeByNodename(self, nodename):
nodes = self.session.query(Node).filter_by(nodename=nodename).all()
if not nodes:
return None
return nodes[0]
if __name__ == '__main__':
db = NodeDatabase()
db.print_state()

@ -0,0 +1,746 @@
#!/usr/bin/env python
# Copyright (C) 2011-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import json
import threading
import yaml
import apscheduler.scheduler
import os
import myjenkins
from statsd import statsd
import zmq
import nodedb
import nodeutils as utils
MINS = 60
HOURS = 60 * MINS
WATERMARK_SLEEP = 5 # Interval between checking if new servers needed
IMAGE_TIMEOUT = 6 * HOURS # How long to wait for an image save
CONNECT_TIMEOUT = 10 * MINS # How long to try to connect after a server
# is ACTIVE
NODE_CLEANUP = 8 * HOURS # When to start deleting a node that is not
# READY or HOLD
KEEP_OLD_IMAGE = 24 * HOURS # How long to keep an old (good) image
class NodeCompleteThread(threading.Thread):
log = logging.getLogger("nodepool.NodeCompleteThread")
def __init__(self, nodepool, nodename):
threading.Thread.__init__(self)
self.nodename = nodename
self.db = nodedb.NodeDatabase()
self.nodepool = nodepool
def run(self):
try:
self.handleEvent()
except Exception:
self.log.exception("Exception handling event for %s:" %
self.nodename)
def handleEvent(self):
node = self.db.getNodeByNodename(self.nodename)
if not node:
raise Exception("Unable to find node with nodename: %s" %
self.nodename)
self.nodepool.deleteNode(node)
class NodeUpdateListener(threading.Thread):
log = logging.getLogger("nodepool.NodeUpdateListener")
def __init__(self, nodepool, addr):
threading.Thread.__init__(self)
self.nodepool = nodepool
self.socket = self.nodepool.zmq_context.socket(zmq.SUB)
event_filter = b""
self.socket.setsockopt(zmq.SUBSCRIBE, event_filter)
self.socket.connect(addr)
self._stopped = False
self.db = nodedb.NodeDatabase()
def run(self):
while not self._stopped:
m = self.socket.recv().decode('utf-8')
try:
topic, data = m.split(None, 1)
self.handleEvent(topic, data)
except Exception:
self.log.exception("Exception handling job:")
def handleEvent(self, topic, data):
self.log.debug("Received: %s %s" % (topic, data))
args = json.loads(data)
nodename = args['build']['node_name']
if topic == 'onStarted':
self.handleStartPhase(nodename)
elif topic == 'onFinalized':
self.handleCompletePhase(nodename)
else:
raise Exception("Received job for unhandled phase: %s" %
args['phase'])
def handleStartPhase(self, nodename):
node = self.db.getNodeByNodename(nodename)
if not node:
raise Exception("Unable to find node with nodename: %s" %
nodename)
node.state = nodedb.USED
self.nodepool.updateStats(node.provider_name)
def handleCompletePhase(self, nodename):
t = NodeCompleteThread(self.nodepool, nodename)
t.start()
class NodeLauncher(threading.Thread):
log = logging.getLogger("nodepool.NodeLauncher")
def __init__(self, nodepool, provider, image, target, node_id):
threading.Thread.__init__(self)
self.provider = provider
self.image = image
self.target = target
self.node_id = node_id
self.nodepool = nodepool
def run(self):
self.log.debug("Launching node id: %s" % self.node_id)
try:
self.db = nodedb.NodeDatabase()
self.node = self.db.getNode(self.node_id)
self.client = utils.get_client(self.provider)
except Exception:
self.log.exception("Exception preparing to launch node id: %s:" %
self.node_id)
return
try:
self.launchNode()
except Exception:
self.log.exception("Exception launching node id: %s:" %
self.node_id)
try:
utils.delete_node(self.client, self.node)
except Exception:
self.log.exception("Exception deleting node id: %s:" %
self.node_id)
return
def launchNode(self):
start_time = time.time()
hostname = '%s-%s-%s.slave.openstack.org' % (
self.image.name, self.provider.name, self.node.id)
self.node.hostname = hostname
self.node.nodename = hostname.split('.')[0]
self.node.target_name = self.target.name
flavor = utils.get_flavor(self.client, self.image.min_ram)
snap_image = self.db.getCurrentSnapshotImage(
self.provider.name, self.image.name)
if not snap_image:
raise Exception("Unable to find current snapshot image %s in %s" %
(self.image.name, self.provider.name))
remote_snap_image = self.client.images.get(snap_image.external_id)
self.log.info("Creating server with hostname %s in %s from image %s "
"for node id: %s" % (hostname, self.provider.name,
self.image.name, self.node_id))
server = None
server, key = utils.create_server(self.client, hostname,
remote_snap_image, flavor)
self.node.external_id = server.id
self.db.commit()
self.log.debug("Waiting for server %s for node id: %s" %
(server.id, self.node.id))
server = utils.wait_for_resource(server)
if server.status != 'ACTIVE':
raise Exception("Server %s for node id: %s status: %s" %
(server.id, self.node.id, server.status))
ip = utils.get_public_ip(server)
if not ip and 'os-floating-ips' in utils.get_extensions(self.client):
utils.add_public_ip(server)
ip = utils.get_public_ip(server)
if not ip:
raise Exception("Unable to find public ip of server")
self.node.ip = ip
self.log.debug("Node id: %s is running, testing ssh" % self.node.id)
if not utils.ssh_connect(ip, 'jenkins'):
raise Exception("Unable to connect via ssh")
if statsd:
dt = int((time.time() - start_time) * 1000)
key = 'nodepool.launch.%s.%s.%s' % (self.image.name,
self.provider.name,
self.target.name)
statsd.timing(key, dt)
statsd.incr(key)
# Do this before adding to jenkins to avoid a race where
# Jenkins might immediately use the node before we've updated
# the state:
self.node.state = nodedb.READY
self.nodepool.updateStats(self.provider.name)
self.log.info("Node id: %s is ready" % self.node.id)
if self.target.jenkins_url:
self.log.debug("Adding node id: %s to jenkins" % self.node.id)
self.createJenkinsNode()
self.log.info("Node id: %s added to jenkins" % self.node.id)
def createJenkinsNode(self):
jenkins = myjenkins.Jenkins(self.target.jenkins_url,
self.target.jenkins_user,
self.target.jenkins_apikey)
node_desc = 'Dynamic single use %s node' % self.image.name
labels = self.image.name
priv_key = '/var/lib/jenkins/.ssh/id_rsa'
if self.target.jenkins_credentials_id:
launcher_params = {'port': 22,
'credentialsId':
self.target.jenkins_credentials_id, # noqa
'host': self.node.ip}
else:
launcher_params = {'port': 22,
'username': 'jenkins',
'privatekey': priv_key,
'host': self.node.ip}
try:
jenkins.create_node(
self.node.nodename,
numExecutors=1,
nodeDescription=node_desc,
remoteFS='/home/jenkins',
labels=labels,
exclusive=True,
launcher='hudson.plugins.sshslaves.SSHLauncher',
launcher_params=launcher_params)
except myjenkins.JenkinsException as e:
if 'already exists' in str(e):
pass
else:
raise
class ImageUpdater(threading.Thread):
log = logging.getLogger("nodepool.ImageUpdater")
def __init__(self, provider, image, snap_image_id):
threading.Thread.__init__(self)
self.provider = provider
self.image = image
self.snap_image_id = snap_image_id
def run(self):
self.log.debug("Updating image %s in %s " % (self.image.name,
self.provider.name))
try:
self.db = nodedb.NodeDatabase()
self.snap_image = self.db.getSnapshotImage(self.snap_image_id)
self.client = utils.get_client(self.provider)
except Exception:
self.log.exception("Exception preparing to update image %s in %s:"
% (self.image.name, self.provider.name))
return
try:
self.updateImage()
except Exception:
self.log.exception("Exception updating image %s in %s:" %
(self.image.name, self.provider.name))
try:
if self.snap_image:
utils.delete_image(self.client, self.snap_image)
except Exception:
self.log.exception("Exception deleting image id: %s:" %
self.snap_image.id)
return
def updateImage(self):
start_time = time.time()
timestamp = int(start_time)
flavor = utils.get_flavor(self.client, self.image.min_ram)
base_image = self.client.images.find(name=self.image.base_image)
hostname = ('%s-%s.template.openstack.org' %
(self.image.name, str(timestamp)))
self.log.info("Creating image id: %s for %s in %s" %
(self.snap_image.id, self.image.name,
self.provider.name))
server, key = utils.create_server(
self.client, hostname, base_image, flavor, add_key=True)
self.snap_image.hostname = hostname
self.snap_image.version = timestamp
self.snap_image.server_external_id = server.id
self.db.commit()
self.log.debug("Image id: %s waiting for server %s" %
(self.snap_image.id, server.id))
server = utils.wait_for_resource(server)
if not server:
raise Exception("Timeout waiting for server %s" %
server.id)
admin_pass = None # server.adminPass
self.bootstrapServer(server, admin_pass, key)
image = utils.create_image(self.client, server, hostname)
self.snap_image.external_id = image.id
self.db.commit()
self.log.debug("Image id: %s building image %s" %
(self.snap_image.id, image.id))
# It can take a _very_ long time for Rackspace 1.0 to save an image
image = utils.wait_for_resource(image, IMAGE_TIMEOUT)
if not image:
raise Exception("Timeout waiting for image %s" %
self.snap_image.id)
if statsd:
dt = int((time.time() - start_time) * 1000)
key = 'nodepool.image_update.%s.%s' % (self.image.name,
self.provider.name)
statsd.timing(key, dt)
statsd.incr(key)
self.snap_image.state = nodedb.READY
self.log.info("Image %s in %s is ready" % (hostname,
self.provider.name))
try:
# We made the snapshot, try deleting the server, but it's okay
# if we fail. The reap script will find it and try again.
utils.delete_server(self.client, server)
except:
self.log.exception("Exception encountered deleting server"
" %s for image id: %s" %
(server.id, self.snap_image.id))
def bootstrapServer(self, server, admin_pass, key):
ip = utils.get_public_ip(server)
if not ip and 'os-floating-ips' in utils.get_extensions(self.client):
utils.add_public_ip(server)
ip = utils.get_public_ip(server)
if not ip:
raise Exception("Unable to find public ip of server")
ssh_kwargs = {}
if key:
ssh_kwargs['pkey'] = key
else:
ssh_kwargs['password'] = admin_pass
for username in ['root', 'ubuntu']:
host = utils.ssh_connect(ip, username, ssh_kwargs,
timeout=CONNECT_TIMEOUT)
if host:
break
if not host:
raise Exception("Unable to log in via SSH")
host.ssh("make scripts dir", "mkdir -p scripts")
for fname in os.listdir('scripts'):
host.scp(os.path.join('scripts', fname), 'scripts/%s' % fname)
host.ssh("make scripts executable", "chmod a+x scripts/*")
if self.image.setup:
env_vars = ''
for k, v in os.environ.items():
if k.startswith('NODEPOOL_'):
env_vars += ' %s="%s"' % (k, v)
r = host.ssh("run setup script", "cd scripts && %s ./%s" %
(env_vars, self.image.setup))
if not r:
raise Exception("Unable to run setup scripts")
class ConfigValue(object):
pass
class Config(ConfigValue):
pass
class Provider(ConfigValue):
pass
class ProviderImage(ConfigValue):
pass
class Target(ConfigValue):
pass
class TargetImage(ConfigValue):
pass
class TargetImageProvider(ConfigValue):
pass
class NodePool(threading.Thread):
log = logging.getLogger("nodepool.NodePool")
def __init__(self, configfile):
threading.Thread.__init__(self)
self.configfile = configfile
self.db = nodedb.NodeDatabase()
self.zmq_context = None
self.zmq_listeners = {}
self.apsched = apscheduler.scheduler.Scheduler()
self.apsched.start()
self.update_cron = ''
self.update_job = None
self.cleanup_cron = ''
self.cleanup_job = None
self._stopped = False
self.loadConfig()
def stop(self):
self._stopped = True
self.zmq_context.destroy()
self.apsched.shutdown()
def loadConfig(self):
self.log.debug("Loading configuration")
config = yaml.load(open(self.configfile))
update_cron = config.get('cron', {}).get('image-update', '14 2 * * *')
cleanup_cron = config.get('cron', {}).get('cleanup', '27 */6 * * *')
if (update_cron != self.update_cron):
if self.update_job:
self.apsched.unschedule_job(self.update_job)
parts = update_cron.split()
minute, hour, dom, month, dow = parts[:5]
self.apsched.add_cron_job(self.updateImages,
day=dom,
day_of_week=dow,
hour=hour,
minute=minute)
self.update_cron = update_cron
if (cleanup_cron != self.cleanup_cron):
if self.cleanup_job:
self.apsched.unschedule_job(self.cleanup_job)
parts = cleanup_cron.split()
minute, hour, dom, month, dow = parts[:5]
self.apsched.add_cron_job(self.periodicCleanup,
day=dom,
day_of_week=dow,
hour=hour,
minute=minute)
self.cleanup_cron = cleanup_cron
newconfig = Config()
newconfig.providers = {}
newconfig.targets = {}
for provider in config['providers']:
p = Provider()
p.name = provider['name']
newconfig.providers[p.name] = p
p.username = provider['username']
p.password = provider['password']
p.project_id = provider['project-id']
p.auth_url = provider['auth-url']
p.service_type = provider.get('service-type')
p.service_name = provider.get('service-name')
p.region_name = provider.get('region-name')
p.max_servers = provider['max-servers']
p.images = {}
for image in provider['images']:
i = ProviderImage()
i.name = image['name']
p.images[i.name] = i
i.base_image = image['base-image']
i.min_ram = image['min-ram']
i.setup = image.get('setup')
i.reset = image.get('reset')
for target in config['targets']:
t = Target()
t.name = target['name']
newconfig.targets[t.name] = t
jenkins = target.get('jenkins')
if jenkins:
t.jenkins_url = jenkins['url']
t.jenkins_user = jenkins['user']
t.jenkins_apikey = jenkins['apikey']
t.jenkins_credentials_id = jenkins.get('credentials_id')
else:
t.jenkins_url = None
t.jenkins_user = None
t.jenkins_apikey = None
t.jenkins_credentials_id = None
t.images = {}
for image in target['images']:
i = TargetImage()
i.name = image['name']
t.images[i.name] = i
i.providers = {}
for provider in image['providers']:
p = TargetImageProvider()
p.name = provider['name']
i.providers[p.name] = p
p.min_ready = provider['min-ready']
self.config = newconfig
self.startUpdateListeners(config['zmq-publishers'])
def startUpdateListeners(self, publishers):
running = set(self.zmq_listeners.keys())
configured = set(publishers)
if running == configured:
self.log.debug("Listeners do not need to be updated")
return
if self.zmq_context:
self.log.debug("Stopping listeners")
self.zmq_context.destroy()
self.zmq_listeners = {}
self.zmq_context = zmq.Context()
for addr in publishers:
self.log.debug("Starting listener for %s" % addr)
listener = NodeUpdateListener(self, addr)
self.zmq_listeners[addr] = listener
listener.start()
def getNumNeededNodes(self, target, provider, image):
# Count machines that are ready and machines that are building,
# so that if the provider is very slow, we aren't queueing up tons
# of machines to be built.
n_ready = len(self.db.getNodes(provider.name, image.name, target.name,
nodedb.READY))
n_building = len(self.db.getNodes(provider.name, image.name,
target.name, nodedb.BUILDING))
n_provider = len(self.db.getNodes(provider.name))
num_to_launch = provider.min_ready - (n_ready + n_building)
# Don't launch more than our provider max
max_servers = self.config.providers[provider.name].max_servers
num_to_launch = min(max_servers - n_provider, num_to_launch)
# Don't launch less than 0
num_to_launch = max(0, num_to_launch)
return num_to_launch
def run(self):
while not self._stopped:
self.loadConfig()
self.checkForMissingImages()
for target in self.config.targets.values():
self.log.debug("Examining target: %s" % target.name)
for image in target.images.values():
for provider in image.providers.values():
num_to_launch = self.getNumNeededNodes(
target, provider, image)
if num_to_launch:
self.log.info("Need to launch %s %s nodes for "
"%s on %s" %
(num_to_launch, image.name,
target.name, provider.name))
for i in range(num_to_launch):
self.launchNode(provider, image, target)
time.sleep(WATERMARK_SLEEP)
def checkForMissingImages(self):
# If we are missing an image, run the image update function
# outside of its schedule.
missing = False
for target in self.config.targets.values():
for image in target.images.values():
for provider in image.providers.values():
found = False
for snap_image in self.db.getSnapshotImages():
if (snap_image.provider_name == provider.name and
snap_image.image_name == image.name and
snap_image.state in [nodedb.READY,
nodedb.BUILDING]):
found = True
if not found:
self.log.warning("Missing image %s on %s" %
(image.name, provider.name))
missing = True
if missing:
self.updateImages()
def updateImages(self):
# This function should be run periodically to create new snapshot
# images.
for provider in self.config.providers.values():
for image in provider.images.values():
snap_image = self.db.createSnapshotImage(
provider_name=provider.name,
image_name=image.name)
t = ImageUpdater(provider, image, snap_image.id)
t.start()
# Enough time to give them different timestamps (versions)
# Just to keep things clearer.
time.sleep(2)
def launchNode(self, provider, image, target):
provider = self.config.providers[provider.name]
image = provider.images[image.name]
node = self.db.createNode(provider.name, image.name)
t = NodeLauncher(self, provider, image, target, node.id)
t.start()
def deleteNode(self, node):
# Delete a node
start_time = time.time()
node.state = nodedb.DELETE
self.updateStats(node.provider_name)
provider = self.config.providers[node.provider_name]
target = self.config.targets[node.target_name]
client = utils.get_client(provider)
if target.jenkins_url:
jenkins = myjenkins.Jenkins(target.jenkins_url,
target.jenkins_user,
target.jenkins_apikey)
jenkins_name = node.nodename
if jenkins.node_exists(jenkins_name):
jenkins.delete_node(jenkins_name)
self.log.info("Deleted jenkins node ID: %s" % node.id)
utils.delete_node(client, node)
self.log.info("Deleted node ID: %s" % node.id)
if statsd:
dt = int((time.time() - start_time) * 1000)
key = 'nodepool.delete.%s.%s.%s' % (node.image_name,
node.provider_name,
node.target_name)
statsd.timing(key, dt)
statsd.incr(key)
self.updateStats(node.provider_name)
def deleteImage(self, snap_image):
# Delete a node
snap_image.state = nodedb.DELETE
provider = self.config.providers[snap_image.provider_name]
client = utils.get_client(provider)
utils.delete_image(client, snap_image)
self.log.info("Deleted image ID: %s" % snap_image.id)
def periodicCleanup(self):
# This function should be run periodically to clean up any hosts
# that may have slipped through the cracks, as well as to remove
# old images.
self.log.debug("Starting periodic cleanup")
db = nodedb.NodeDatabase()
for node in db.getNodes():
if node.state in [nodedb.READY, nodedb.HOLD]:
continue
delete = False
if (node.state == nodedb.DELETE):
self.log.warning("Deleting node id: %s which is in delete "
"state" % node.id)
delete = True
elif time.time() - node.state_time > NODE_CLEANUP:
self.log.warning("Deleting node id: %s which has been in %s "
"state for %s hours" %
(node.id, node.state,
node.state_time / (60 * 60)))
delete = True
if delete:
try:
self.deleteNode(node)
except Exception:
self.log.exception("Exception deleting node ID: "
"%s" % node.id)
for image in db.getSnapshotImages():
# Normally, reap images that have sat in their current state
# for 24 hours, unless the image is the current snapshot
delete = False
if image.provider_name not in self.config.providers:
delete = True
self.log.info("Deleting image id: %s which has no current "
"provider" % image.id)
elif (image.image_name not in
self.config.providers[image.provider_name].images):
delete = True
self.log.info("Deleting image id: %s which has no current "
"base image" % image.id)
else:
current = db.getCurrentSnapshotImage(image.provider_name,
image.image_name)
if (current and image != current and
(time.time() - current.state_time) > KEEP_OLD_IMAGE):
self.log.info("Deleting image id: %s because the current "
"image is %s hours old" %
(image.id, current.state_time / (60 * 60)))
delete = True
if delete:
try:
self.deleteImage(image)
except Exception:
self.log.exception("Exception deleting image id: %s:" %
image.id)
self.log.debug("Finished periodic cleanup")
def updateStats(self, provider_name):
if not statsd:
return
# This may be called outside of the main thread.
db = nodedb.NodeDatabase()
provider = self.config.providers[provider_name]
states = {}
for target in self.config.targets.values():
for image in target.images.values():
for provider in image.providers.values():
base_key = 'nodepool.target.%s.%s.%s' % (
target.name, image.name,
provider.name)
key = '%s.min_ready' % base_key
statsd.gauge(key, provider.min_ready)
for state in nodedb.STATE_NAMES.values():
key = '%s.%s' % (base_key, state)
states[key] = 0
for node in db.getNodes():
if node.state not in nodedb.STATE_NAMES:
continue
key = 'nodepool.target.%s.%s.%s.%s' % (
node.target_name, node.image_name,
node.provider_name, nodedb.STATE_NAMES[node.state])
states[key] += 1
for key, count in states.items():
statsd.gauge(key, count)
for provider in self.config.providers.values():
key = 'nodepool.provider.%s.max_servers' % provider.name
statsd.gauge(key, provider.max_servers)

@ -0,0 +1,255 @@
#!/usr/bin/env python
# Copyright (C) 2011-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import novaclient.client
import time
import paramiko
import socket
import logging
from sshclient import SSHClient
import nodedb
log = logging.getLogger("nodepool.utils")
def iterate_timeout(max_seconds, purpose):
start = time.time()
count = 0
while (time.time() < start + max_seconds):
count += 1
yield count
time.sleep(2)
raise Exception("Timeout waiting for %s" % purpose)
def get_client(provider):
args = ['1.1', provider.username, provider.password,
provider.project_id, provider.auth_url]
kwargs = {}
if provider.service_type:
kwargs['service_type'] = provider.service_type
if provider.service_name:
kwargs['service_name'] = provider.service_name
if provider.region_name:
kwargs['region_name'] = provider.region_name
client = novaclient.client.Client(*args, **kwargs)
return client
extension_cache = {}
def get_extensions(client):
global extension_cache
cache = extension_cache.get(client)
if cache:
return cache
try:
resp, body = client.client.get('/extensions')
extensions = [x['alias'] for x in body['extensions']]
except novaclient.exceptions.NotFound:
extensions = []
extension_cache[client] = extensions
return extensions
def get_flavor(client, min_ram):
flavors = [f for f in client.flavors.list() if f.ram >= min_ram]
flavors.sort(lambda a, b: cmp(a.ram, b.ram))
return flavors[0]
def get_public_ip(server, version=4):
if 'os-floating-ips' in get_extensions(server.manager.api):
log.debug('Server %s using floating ips' % server.id)
for addr in server.manager.api.floating_ips.list():
if addr.instance_id == server.id:
return addr.ip
log.debug('Server %s not using floating ips, addresses: %s' %
(server.id, server.addresses))
for addr in server.addresses.get('public', []):
if type(addr) == type(u''): # Rackspace/openstack 1.0
return addr
if addr['version'] == version: # Rackspace/openstack 1.1
return addr['addr']
for addr in server.addresses.get('private', []):
# HPcloud
if (addr['version'] == version and version == 4):
quad = map(int, addr['addr'].split('.'))
if quad[0] == 10:
continue
if quad[0] == 192 and quad[1] == 168:
continue
if quad[0] == 172 and (16 <= quad[1] <= 31):
continue
return addr['addr']
return None
def add_public_ip(server):
ip = server.manager.api.floating_ips.create()
log.debug('Created floading IP %s for server %s' % (ip, server.id))
server.add_floating_ip(ip)
for count in iterate_timeout(600, "ip to be added"):
try:
newip = ip.manager.get(ip.id)
except Exception:
log.exception('Unable to get IP details for server %s, '
'will retry' % (server.id))
continue
if newip.instance_id == server.id:
return
def add_keypair(client, name):
key = paramiko.RSAKey.generate(2048)
public_key = key.get_name() + ' ' + key.get_base64()
kp = client.keypairs.create(name, public_key)
return key, kp
def wait_for_resource(wait_resource, timeout=3600):
last_progress = None
last_status = None
for count in iterate_timeout(timeout, "waiting for %s" % wait_resource):
try:
resource = wait_resource.manager.get(wait_resource.id)
except:
log.exception('Unable to list resources, while waiting for %s '
'will retry' % wait_resource)
continue
# In Rackspace v1.0, there is no progress attribute while queued
if hasattr(resource, 'progress'):
if (last_progress != resource.progress
or last_status != resource.status):
log.debug('Status of %s: %s %s' % (resource, resource.status,
resource.progress))
last_progress = resource.progress
elif last_status != resource.status:
log.debug('Status of %s: %s' % (resource, resource.status))
last_status = resource.status
if resource.status == 'ACTIVE':
return resource
def ssh_connect(ip, username, connect_kwargs={}, timeout=60):
# HPcloud may return errno 111 for about 30 seconds after adding the IP
for count in iterate_timeout(timeout, "ssh access"):
try:
client = SSHClient(ip, username, **connect_kwargs)
break
except socket.error, e:
if e[0] != 111:
log.exception('Exception while testing ssh access:')
out = client.ssh("test ssh access", "echo access okay")
if "access okay" in out:
return client
return None
def create_server(client, hostname, base_image, flavor, add_key=False):
create_kwargs = dict(image=base_image, flavor=flavor, name=hostname)