diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..843a7b207 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +*.egg +*.egg-info +*.pyc +.test +.testrepository +.tox +AUTHORS +build/* +ChangeLog +config +doc/build/* +zuul/versioninfo +dist/ +venv/ +nodepool.yaml diff --git a/README.rst b/README.rst new file mode 100644 index 000000000..e69de29bb diff --git a/nodepool/__init__.py b/nodepool/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/nodepool/cmd/__init__.py b/nodepool/cmd/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/nodepool/cmd/nodepoold.py b/nodepool/cmd/nodepoold.py new file mode 100644 index 000000000..ea758eb76 --- /dev/null +++ b/nodepool/cmd/nodepoold.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import daemon +import extras + +# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore +# instead it depends on lockfile-0.9.1 which uses pidfile. +pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile']) + +import logging.config +import os +import sys +import signal + +import nodepool.nodepool + + +class NodePoolDaemon(object): + def __init__(self): + self.args = None + + def parse_arguments(self): + parser = argparse.ArgumentParser(description='Node pool.') + parser.add_argument('-c', dest='config', + help='path to config file') + parser.add_argument('-d', dest='nodaemon', action='store_true', + help='do not run as a daemon') + parser.add_argument('-l', dest='logconfig', + help='path to log config file') + parser.add_argument('-p', dest='pidfile', + help='path to pid file', + default='/var/run/nodepool/nodepool.pid') + parser.add_argument('--version', dest='version', action='store_true', + help='show version') + self.args = parser.parse_args() + + def setup_logging(self): + if self.args.logconfig: + fp = os.path.expanduser(self.args.logconfig) + if not os.path.exists(fp): + raise Exception("Unable to read logging config file at %s" % + fp) + logging.config.fileConfig(fp) + else: + logging.basicConfig(level=logging.DEBUG) + + def exit_handler(self, signum, frame): + self.pool.stop() + + def term_handler(self, signum, frame): + os._exit(0) + + def main(self): + self.setup_logging() + self.pool = nodepool.nodepool.NodePool(self.args.config) + + signal.signal(signal.SIGUSR1, self.exit_handler) + signal.signal(signal.SIGTERM, self.term_handler) + + self.pool.start() + + while True: + try: + signal.pause() + except KeyboardInterrupt: + return self.exit_handler(signal.SIGINT, None) + + +def main(): + npd = NodePoolDaemon() + npd.parse_arguments() + + if npd.args.version: + from nodepool.version import version_info as npd_version_info + print "Nodepool version: %s" % npd_version_info.version_string() + return(0) + + pid = pid_file_module.TimeoutPIDLockFile(npd.args.pidfile, 10) + + if npd.args.nodaemon: + npd.main() + else: + with daemon.DaemonContext(pidfile=pid): + npd.main() + + +if __name__ == "__main__": + sys.path.insert(0, '.') + sys.exit(main()) diff --git a/nodepool/myjenkins.py b/nodepool/myjenkins.py new file mode 100644 index 000000000..f3c95cebd --- /dev/null +++ b/nodepool/myjenkins.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python +# Copyright 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import jenkins +import json +import urllib +import urllib2 +from jenkins import JenkinsException, NODE_TYPE, CREATE_NODE + +TOGGLE_OFFLINE = '/computer/%(name)s/toggleOffline?offlineMessage=%(msg)s' +CONFIG_NODE = '/computer/%(name)s/config.xml' + + +class Jenkins(jenkins.Jenkins): + def disable_node(self, name, msg=''): + ''' + Disable a node + + @param name: Jenkins node name + @type name: str + @param msg: Offline message + @type msg: str + ''' + info = self.get_node_info(name) + if info['offline']: + return + self.jenkins_open( + urllib2.Request(self.server + TOGGLE_OFFLINE % locals())) + + def enable_node(self, name): + ''' + Enable a node + + @param name: Jenkins node name + @type name: str + ''' + info = self.get_node_info(name) + if not info['offline']: + return + msg = '' + self.jenkins_open( + urllib2.Request(self.server + TOGGLE_OFFLINE % locals())) + + def get_node_config(self, name): + ''' + Get the configuration for a node. + + :param name: Jenkins node name, ``str`` + ''' + get_config_url = self.server + CONFIG_NODE % locals() + return self.jenkins_open(urllib2.Request(get_config_url)) + + def reconfig_node(self, name, config_xml): + ''' + Change the configuration for an existing node. + + :param name: Jenkins node name, ``str`` + :param config_xml: New XML configuration, ``str`` + ''' + headers = {'Content-Type': 'text/xml'} + reconfig_url = self.server + CONFIG_NODE % locals() + self.jenkins_open(urllib2.Request(reconfig_url, config_xml, headers)) + + def create_node(self, name, numExecutors=2, nodeDescription=None, + remoteFS='/var/lib/jenkins', labels=None, exclusive=False, + launcher='hudson.slaves.JNLPLauncher', launcher_params={}): + ''' + @param name: name of node to create + @type name: str + @param numExecutors: number of executors for node + @type numExecutors: int + @param nodeDescription: Description of node + @type nodeDescription: str + @param remoteFS: Remote filesystem location to use + @type remoteFS: str + @param labels: Labels to associate with node + @type labels: str + @param exclusive: Use this node for tied jobs only + @type exclusive: boolean + @param launcher: The launch method for the slave + @type launcher: str + @param launcher_params: Additional parameters for the launcher + @type launcher_params: dict + ''' + if self.node_exists(name): + raise JenkinsException('node[%s] already exists' % (name)) + + mode = 'NORMAL' + if exclusive: + mode = 'EXCLUSIVE' + + #hudson.plugins.sshslaves.SSHLauncher + #hudson.slaves.CommandLauncher + #hudson.os.windows.ManagedWindowsServiceLauncher + launcher_params['stapler-class'] = launcher + + inner_params = { + 'name': name, + 'nodeDescription': nodeDescription, + 'numExecutors': numExecutors, + 'remoteFS': remoteFS, + 'labelString': labels, + 'mode': mode, + 'type': NODE_TYPE, + 'retentionStrategy': { + 'stapler-class': 'hudson.slaves.RetentionStrategy$Always'}, + 'nodeProperties': {'stapler-class-bag': 'true'}, + 'launcher': launcher_params + } + + params = { + 'name': name, + 'type': NODE_TYPE, + 'json': json.dumps(inner_params) + } + + self.jenkins_open(urllib2.Request( + self.server + CREATE_NODE % urllib.urlencode(params))) + + if not self.node_exists(name): + raise JenkinsException('create[%s] failed' % (name)) diff --git a/nodepool/nodedb.py b/nodepool/nodedb.py new file mode 100644 index 000000000..2373bec01 --- /dev/null +++ b/nodepool/nodedb.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python + +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time + +# States: +# The cloud provider is building this machine. We have an ID, but it's +# not ready for use. +BUILDING = 1 +# The machine is ready for use. +READY = 2 +# This can mean in-use, or used but complete. +USED = 3 +# Delete this machine immediately. +DELETE = 4 +# Keep this machine indefinitely. +HOLD = 5 + +STATE_NAMES = { + BUILDING: 'building', + READY: 'ready', + USED: 'used', + DELETE: 'delete', + HOLD: 'hold', + } + +from sqlalchemy import Table, Column, Integer, String, \ + MetaData, create_engine +from sqlalchemy.orm import mapper +from sqlalchemy.orm.session import Session, sessionmaker + +metadata = MetaData() +snapshot_image_table = Table( + 'snapshot_image', metadata, + Column('id', Integer, primary_key=True), + Column('provider_name', String(255), index=True, nullable=False), + Column('image_name', String(255), index=True, nullable=False), + # Image hostname + Column('hostname', String(255)), + # Version indicator (timestamp) + Column('version', Integer), + # Provider assigned id for this image + Column('external_id', String(255)), + # Provider assigned id of the server used to create the snapshot + Column('server_external_id', String(255)), + # One of the above values + Column('state', Integer), + # Time of last state change + Column('state_time', Integer), + ) +node_table = Table( + 'node', metadata, + Column('id', Integer, primary_key=True), + Column('provider_name', String(255), index=True, nullable=False), + Column('image_name', String(255), index=True, nullable=False), + Column('target_name', String(255)), + # Machine name + Column('hostname', String(255), index=True), + # Eg, jenkins node name + Column('nodename', String(255), index=True), + # Provider assigned id for this machine + Column('external_id', String(255)), + # Primary IP address + Column('ip', String(255)), + # One of the above values + Column('state', Integer), + # Time of last state change + Column('state_time', Integer), + ) + + +class SnapshotImage(object): + def __init__(self, provider_name, image_name, hostname=None, version=None, + external_id=None, server_external_id=None, state=BUILDING): + self.provider_name = provider_name + self.image_name = image_name + self.hostname = hostname + self.version = version + self.external_id = external_id + self.server_external_id = server_external_id + self.state = state + + def delete(self): + session = Session.object_session(self) + session.delete(self) + session.commit() + + @property + def state(self): + return self._state + + @state.setter + def state(self, state): + self._state = state + self.state_time = int(time.time()) + session = Session.object_session(self) + if session: + session.commit() + + +class Node(object): + def __init__(self, provider_name, image_name, hostname=None, + external_id=None, ip=None, state=BUILDING): + self.provider_name = provider_name + self.image_name = image_name + self.external_id = external_id + self.ip = ip + self.hostname = hostname + self.state = state + + def delete(self): + session = Session.object_session(self) + session.delete(self) + session.commit() + + @property + def state(self): + return self._state + + @state.setter + def state(self, state): + self._state = state + self.state_time = int(time.time()) + session = Session.object_session(self) + if session: + session.commit() + + +mapper(Node, node_table, + properties=dict(_state=node_table.c.state)) + + +mapper(SnapshotImage, snapshot_image_table, + properties=dict(_state=snapshot_image_table.c.state)) + + +class NodeDatabase(object): + def __init__(self, path=os.path.expanduser("~/nodes.db")): + engine = create_engine('sqlite:///%s' % path, echo=False) + metadata.create_all(engine) + Session = sessionmaker(bind=engine, autoflush=True, autocommit=False) + self.session = Session() + + def print_state(self): + for provider_name in self.getProviders(): + print 'Provider:', provider_name + for image_name in self.getImages(provider_name): + print ' Base image:', image_name + current = self.getCurrentSnapshotImage( + provider_name, image_name) + for snapshot_image in self.getSnapshotImages(): + if (snapshot_image.provider_name != provider_name or + snapshot_image.image_name != image_name): + continue + is_current = ('[current]' if current == snapshot_image + else '') + print ' Snapshot: %s %s %s' % ( + snapshot_image.hostname, + STATE_NAMES[snapshot_image.state], + is_current) + for node in self.getNodes(): + print ' Node: %s %s %s %s %s' % ( + node.id, node.hostname, STATE_NAMES[node.state], + node.state_time, node.ip) + + def abort(self): + self.session.rollback() + + def commit(self): + self.session.commit() + + def delete(self, obj): + self.session.delete(obj) + + def getProviders(self): + return [ + x.provider_name for x in + self.session.query(SnapshotImage).distinct( + snapshot_image_table.c.provider_name).all()] + + def getImages(self, provider_name): + return [ + x.image_name for x in + self.session.query(SnapshotImage).filter( + snapshot_image_table.c.provider_name == provider_name + ).distinct(snapshot_image_table.c.image_name).all()] + + def getSnapshotImages(self): + return self.session.query(SnapshotImage).order_by( + snapshot_image_table.c.provider_name, + snapshot_image_table.c.image_name).all() + + def getSnapshotImage(self, id): + images = self.session.query(SnapshotImage).filter_by(id=id).all() + if not images: + return None + return images[0] + + def getCurrentSnapshotImage(self, provider_name, image_name): + images = self.session.query(SnapshotImage).filter( + snapshot_image_table.c.provider_name == provider_name, + snapshot_image_table.c.image_name == image_name, + snapshot_image_table.c.state == READY).order_by( + snapshot_image_table.c.version.desc()).all() + if not images: + return None + return images[0] + + def createSnapshotImage(self, *args, **kwargs): + new = SnapshotImage(*args, **kwargs) + self.session.add(new) + self.session.commit() + return new + + def getNodes(self, provider_name=None, image_name=None, target_name=None, + state=None): + exp = self.session.query(Node).order_by( + node_table.c.provider_name, + node_table.c.image_name) + if provider_name: + exp = exp.filter_by(provider_name=provider_name) + if image_name: + exp = exp.filter_by(image_name=image_name) + if target_name: + exp = exp.filter_by(target_name=target_name) + if state: + exp = exp.filter(node_table.c.state == state) + return exp.all() + + def createNode(self, *args, **kwargs): + new = Node(*args, **kwargs) + self.session.add(new) + self.session.commit() + return new + + def getNode(self, id): + nodes = self.session.query(Node).filter_by(id=id).all() + if not nodes: + return None + return nodes[0] + + def getNodeByHostname(self, hostname): + nodes = self.session.query(Node).filter_by(hostname=hostname).all() + if not nodes: + return None + return nodes[0] + + def getNodeByNodename(self, nodename): + nodes = self.session.query(Node).filter_by(nodename=nodename).all() + if not nodes: + return None + return nodes[0] + +if __name__ == '__main__': + db = NodeDatabase() + db.print_state() diff --git a/nodepool/nodepool.py b/nodepool/nodepool.py new file mode 100644 index 000000000..1acfb686a --- /dev/null +++ b/nodepool/nodepool.py @@ -0,0 +1,746 @@ +#!/usr/bin/env python + +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +import json +import threading +import yaml +import apscheduler.scheduler +import os +import myjenkins +from statsd import statsd +import zmq + +import nodedb +import nodeutils as utils + +MINS = 60 +HOURS = 60 * MINS + +WATERMARK_SLEEP = 5 # Interval between checking if new servers needed +IMAGE_TIMEOUT = 6 * HOURS # How long to wait for an image save +CONNECT_TIMEOUT = 10 * MINS # How long to try to connect after a server + # is ACTIVE +NODE_CLEANUP = 8 * HOURS # When to start deleting a node that is not + # READY or HOLD +KEEP_OLD_IMAGE = 24 * HOURS # How long to keep an old (good) image + + +class NodeCompleteThread(threading.Thread): + log = logging.getLogger("nodepool.NodeCompleteThread") + + def __init__(self, nodepool, nodename): + threading.Thread.__init__(self) + self.nodename = nodename + self.db = nodedb.NodeDatabase() + self.nodepool = nodepool + + def run(self): + try: + self.handleEvent() + except Exception: + self.log.exception("Exception handling event for %s:" % + self.nodename) + + def handleEvent(self): + node = self.db.getNodeByNodename(self.nodename) + if not node: + raise Exception("Unable to find node with nodename: %s" % + self.nodename) + self.nodepool.deleteNode(node) + + +class NodeUpdateListener(threading.Thread): + log = logging.getLogger("nodepool.NodeUpdateListener") + + def __init__(self, nodepool, addr): + threading.Thread.__init__(self) + self.nodepool = nodepool + self.socket = self.nodepool.zmq_context.socket(zmq.SUB) + event_filter = b"" + self.socket.setsockopt(zmq.SUBSCRIBE, event_filter) + self.socket.connect(addr) + self._stopped = False + self.db = nodedb.NodeDatabase() + + def run(self): + while not self._stopped: + m = self.socket.recv().decode('utf-8') + try: + topic, data = m.split(None, 1) + self.handleEvent(topic, data) + except Exception: + self.log.exception("Exception handling job:") + + def handleEvent(self, topic, data): + self.log.debug("Received: %s %s" % (topic, data)) + args = json.loads(data) + nodename = args['build']['node_name'] + if topic == 'onStarted': + self.handleStartPhase(nodename) + elif topic == 'onFinalized': + self.handleCompletePhase(nodename) + else: + raise Exception("Received job for unhandled phase: %s" % + args['phase']) + + def handleStartPhase(self, nodename): + node = self.db.getNodeByNodename(nodename) + if not node: + raise Exception("Unable to find node with nodename: %s" % + nodename) + node.state = nodedb.USED + self.nodepool.updateStats(node.provider_name) + + def handleCompletePhase(self, nodename): + t = NodeCompleteThread(self.nodepool, nodename) + t.start() + + +class NodeLauncher(threading.Thread): + log = logging.getLogger("nodepool.NodeLauncher") + + def __init__(self, nodepool, provider, image, target, node_id): + threading.Thread.__init__(self) + self.provider = provider + self.image = image + self.target = target + self.node_id = node_id + self.nodepool = nodepool + + def run(self): + self.log.debug("Launching node id: %s" % self.node_id) + try: + self.db = nodedb.NodeDatabase() + self.node = self.db.getNode(self.node_id) + self.client = utils.get_client(self.provider) + except Exception: + self.log.exception("Exception preparing to launch node id: %s:" % + self.node_id) + return + + try: + self.launchNode() + except Exception: + self.log.exception("Exception launching node id: %s:" % + self.node_id) + try: + utils.delete_node(self.client, self.node) + except Exception: + self.log.exception("Exception deleting node id: %s:" % + self.node_id) + return + + def launchNode(self): + start_time = time.time() + + hostname = '%s-%s-%s.slave.openstack.org' % ( + self.image.name, self.provider.name, self.node.id) + self.node.hostname = hostname + self.node.nodename = hostname.split('.')[0] + self.node.target_name = self.target.name + + flavor = utils.get_flavor(self.client, self.image.min_ram) + snap_image = self.db.getCurrentSnapshotImage( + self.provider.name, self.image.name) + if not snap_image: + raise Exception("Unable to find current snapshot image %s in %s" % + (self.image.name, self.provider.name)) + remote_snap_image = self.client.images.get(snap_image.external_id) + + self.log.info("Creating server with hostname %s in %s from image %s " + "for node id: %s" % (hostname, self.provider.name, + self.image.name, self.node_id)) + server = None + server, key = utils.create_server(self.client, hostname, + remote_snap_image, flavor) + self.node.external_id = server.id + self.db.commit() + + self.log.debug("Waiting for server %s for node id: %s" % + (server.id, self.node.id)) + + server = utils.wait_for_resource(server) + if server.status != 'ACTIVE': + raise Exception("Server %s for node id: %s status: %s" % + (server.id, self.node.id, server.status)) + + ip = utils.get_public_ip(server) + if not ip and 'os-floating-ips' in utils.get_extensions(self.client): + utils.add_public_ip(server) + ip = utils.get_public_ip(server) + if not ip: + raise Exception("Unable to find public ip of server") + + self.node.ip = ip + self.log.debug("Node id: %s is running, testing ssh" % self.node.id) + if not utils.ssh_connect(ip, 'jenkins'): + raise Exception("Unable to connect via ssh") + + if statsd: + dt = int((time.time() - start_time) * 1000) + key = 'nodepool.launch.%s.%s.%s' % (self.image.name, + self.provider.name, + self.target.name) + statsd.timing(key, dt) + statsd.incr(key) + + # Do this before adding to jenkins to avoid a race where + # Jenkins might immediately use the node before we've updated + # the state: + self.node.state = nodedb.READY + self.nodepool.updateStats(self.provider.name) + self.log.info("Node id: %s is ready" % self.node.id) + + if self.target.jenkins_url: + self.log.debug("Adding node id: %s to jenkins" % self.node.id) + self.createJenkinsNode() + self.log.info("Node id: %s added to jenkins" % self.node.id) + + def createJenkinsNode(self): + jenkins = myjenkins.Jenkins(self.target.jenkins_url, + self.target.jenkins_user, + self.target.jenkins_apikey) + node_desc = 'Dynamic single use %s node' % self.image.name + labels = self.image.name + priv_key = '/var/lib/jenkins/.ssh/id_rsa' + if self.target.jenkins_credentials_id: + launcher_params = {'port': 22, + 'credentialsId': + self.target.jenkins_credentials_id, # noqa + 'host': self.node.ip} + else: + launcher_params = {'port': 22, + 'username': 'jenkins', + 'privatekey': priv_key, + 'host': self.node.ip} + try: + jenkins.create_node( + self.node.nodename, + numExecutors=1, + nodeDescription=node_desc, + remoteFS='/home/jenkins', + labels=labels, + exclusive=True, + launcher='hudson.plugins.sshslaves.SSHLauncher', + launcher_params=launcher_params) + except myjenkins.JenkinsException as e: + if 'already exists' in str(e): + pass + else: + raise + + +class ImageUpdater(threading.Thread): + log = logging.getLogger("nodepool.ImageUpdater") + + def __init__(self, provider, image, snap_image_id): + threading.Thread.__init__(self) + self.provider = provider + self.image = image + self.snap_image_id = snap_image_id + + def run(self): + self.log.debug("Updating image %s in %s " % (self.image.name, + self.provider.name)) + try: + self.db = nodedb.NodeDatabase() + self.snap_image = self.db.getSnapshotImage(self.snap_image_id) + self.client = utils.get_client(self.provider) + except Exception: + self.log.exception("Exception preparing to update image %s in %s:" + % (self.image.name, self.provider.name)) + return + + try: + self.updateImage() + except Exception: + self.log.exception("Exception updating image %s in %s:" % + (self.image.name, self.provider.name)) + try: + if self.snap_image: + utils.delete_image(self.client, self.snap_image) + except Exception: + self.log.exception("Exception deleting image id: %s:" % + self.snap_image.id) + return + + def updateImage(self): + start_time = time.time() + timestamp = int(start_time) + + flavor = utils.get_flavor(self.client, self.image.min_ram) + base_image = self.client.images.find(name=self.image.base_image) + hostname = ('%s-%s.template.openstack.org' % + (self.image.name, str(timestamp))) + self.log.info("Creating image id: %s for %s in %s" % + (self.snap_image.id, self.image.name, + self.provider.name)) + server, key = utils.create_server( + self.client, hostname, base_image, flavor, add_key=True) + + self.snap_image.hostname = hostname + self.snap_image.version = timestamp + self.snap_image.server_external_id = server.id + self.db.commit() + + self.log.debug("Image id: %s waiting for server %s" % + (self.snap_image.id, server.id)) + server = utils.wait_for_resource(server) + if not server: + raise Exception("Timeout waiting for server %s" % + server.id) + + admin_pass = None # server.adminPass + self.bootstrapServer(server, admin_pass, key) + + image = utils.create_image(self.client, server, hostname) + self.snap_image.external_id = image.id + self.db.commit() + self.log.debug("Image id: %s building image %s" % + (self.snap_image.id, image.id)) + # It can take a _very_ long time for Rackspace 1.0 to save an image + image = utils.wait_for_resource(image, IMAGE_TIMEOUT) + if not image: + raise Exception("Timeout waiting for image %s" % + self.snap_image.id) + + if statsd: + dt = int((time.time() - start_time) * 1000) + key = 'nodepool.image_update.%s.%s' % (self.image.name, + self.provider.name) + statsd.timing(key, dt) + statsd.incr(key) + + self.snap_image.state = nodedb.READY + self.log.info("Image %s in %s is ready" % (hostname, + self.provider.name)) + + try: + # We made the snapshot, try deleting the server, but it's okay + # if we fail. The reap script will find it and try again. + utils.delete_server(self.client, server) + except: + self.log.exception("Exception encountered deleting server" + " %s for image id: %s" % + (server.id, self.snap_image.id)) + + def bootstrapServer(self, server, admin_pass, key): + ip = utils.get_public_ip(server) + if not ip and 'os-floating-ips' in utils.get_extensions(self.client): + utils.add_public_ip(server) + ip = utils.get_public_ip(server) + if not ip: + raise Exception("Unable to find public ip of server") + + ssh_kwargs = {} + if key: + ssh_kwargs['pkey'] = key + else: + ssh_kwargs['password'] = admin_pass + + for username in ['root', 'ubuntu']: + host = utils.ssh_connect(ip, username, ssh_kwargs, + timeout=CONNECT_TIMEOUT) + if host: + break + + if not host: + raise Exception("Unable to log in via SSH") + + host.ssh("make scripts dir", "mkdir -p scripts") + for fname in os.listdir('scripts'): + host.scp(os.path.join('scripts', fname), 'scripts/%s' % fname) + host.ssh("make scripts executable", "chmod a+x scripts/*") + if self.image.setup: + env_vars = '' + for k, v in os.environ.items(): + if k.startswith('NODEPOOL_'): + env_vars += ' %s="%s"' % (k, v) + r = host.ssh("run setup script", "cd scripts && %s ./%s" % + (env_vars, self.image.setup)) + if not r: + raise Exception("Unable to run setup scripts") + + +class ConfigValue(object): + pass + + +class Config(ConfigValue): + pass + + +class Provider(ConfigValue): + pass + + +class ProviderImage(ConfigValue): + pass + + +class Target(ConfigValue): + pass + + +class TargetImage(ConfigValue): + pass + + +class TargetImageProvider(ConfigValue): + pass + + +class NodePool(threading.Thread): + log = logging.getLogger("nodepool.NodePool") + + def __init__(self, configfile): + threading.Thread.__init__(self) + self.configfile = configfile + self.db = nodedb.NodeDatabase() + self.zmq_context = None + self.zmq_listeners = {} + self.apsched = apscheduler.scheduler.Scheduler() + self.apsched.start() + + self.update_cron = '' + self.update_job = None + self.cleanup_cron = '' + self.cleanup_job = None + self._stopped = False + self.loadConfig() + + def stop(self): + self._stopped = True + self.zmq_context.destroy() + self.apsched.shutdown() + + def loadConfig(self): + self.log.debug("Loading configuration") + config = yaml.load(open(self.configfile)) + + update_cron = config.get('cron', {}).get('image-update', '14 2 * * *') + cleanup_cron = config.get('cron', {}).get('cleanup', '27 */6 * * *') + if (update_cron != self.update_cron): + if self.update_job: + self.apsched.unschedule_job(self.update_job) + parts = update_cron.split() + minute, hour, dom, month, dow = parts[:5] + self.apsched.add_cron_job(self.updateImages, + day=dom, + day_of_week=dow, + hour=hour, + minute=minute) + self.update_cron = update_cron + if (cleanup_cron != self.cleanup_cron): + if self.cleanup_job: + self.apsched.unschedule_job(self.cleanup_job) + parts = cleanup_cron.split() + minute, hour, dom, month, dow = parts[:5] + self.apsched.add_cron_job(self.periodicCleanup, + day=dom, + day_of_week=dow, + hour=hour, + minute=minute) + self.cleanup_cron = cleanup_cron + + newconfig = Config() + newconfig.providers = {} + newconfig.targets = {} + for provider in config['providers']: + p = Provider() + p.name = provider['name'] + newconfig.providers[p.name] = p + p.username = provider['username'] + p.password = provider['password'] + p.project_id = provider['project-id'] + p.auth_url = provider['auth-url'] + p.service_type = provider.get('service-type') + p.service_name = provider.get('service-name') + p.region_name = provider.get('region-name') + p.max_servers = provider['max-servers'] + p.images = {} + for image in provider['images']: + i = ProviderImage() + i.name = image['name'] + p.images[i.name] = i + i.base_image = image['base-image'] + i.min_ram = image['min-ram'] + i.setup = image.get('setup') + i.reset = image.get('reset') + for target in config['targets']: + t = Target() + t.name = target['name'] + newconfig.targets[t.name] = t + jenkins = target.get('jenkins') + if jenkins: + t.jenkins_url = jenkins['url'] + t.jenkins_user = jenkins['user'] + t.jenkins_apikey = jenkins['apikey'] + t.jenkins_credentials_id = jenkins.get('credentials_id') + else: + t.jenkins_url = None + t.jenkins_user = None + t.jenkins_apikey = None + t.jenkins_credentials_id = None + t.images = {} + for image in target['images']: + i = TargetImage() + i.name = image['name'] + t.images[i.name] = i + i.providers = {} + for provider in image['providers']: + p = TargetImageProvider() + p.name = provider['name'] + i.providers[p.name] = p + p.min_ready = provider['min-ready'] + self.config = newconfig + self.startUpdateListeners(config['zmq-publishers']) + + def startUpdateListeners(self, publishers): + running = set(self.zmq_listeners.keys()) + configured = set(publishers) + if running == configured: + self.log.debug("Listeners do not need to be updated") + return + + if self.zmq_context: + self.log.debug("Stopping listeners") + self.zmq_context.destroy() + self.zmq_listeners = {} + self.zmq_context = zmq.Context() + for addr in publishers: + self.log.debug("Starting listener for %s" % addr) + listener = NodeUpdateListener(self, addr) + self.zmq_listeners[addr] = listener + listener.start() + + def getNumNeededNodes(self, target, provider, image): + # Count machines that are ready and machines that are building, + # so that if the provider is very slow, we aren't queueing up tons + # of machines to be built. + n_ready = len(self.db.getNodes(provider.name, image.name, target.name, + nodedb.READY)) + n_building = len(self.db.getNodes(provider.name, image.name, + target.name, nodedb.BUILDING)) + n_provider = len(self.db.getNodes(provider.name)) + num_to_launch = provider.min_ready - (n_ready + n_building) + + # Don't launch more than our provider max + max_servers = self.config.providers[provider.name].max_servers + num_to_launch = min(max_servers - n_provider, num_to_launch) + + # Don't launch less than 0 + num_to_launch = max(0, num_to_launch) + + return num_to_launch + + def run(self): + while not self._stopped: + self.loadConfig() + self.checkForMissingImages() + for target in self.config.targets.values(): + self.log.debug("Examining target: %s" % target.name) + for image in target.images.values(): + for provider in image.providers.values(): + num_to_launch = self.getNumNeededNodes( + target, provider, image) + if num_to_launch: + self.log.info("Need to launch %s %s nodes for " + "%s on %s" % + (num_to_launch, image.name, + target.name, provider.name)) + for i in range(num_to_launch): + self.launchNode(provider, image, target) + time.sleep(WATERMARK_SLEEP) + + def checkForMissingImages(self): + # If we are missing an image, run the image update function + # outside of its schedule. + missing = False + for target in self.config.targets.values(): + for image in target.images.values(): + for provider in image.providers.values(): + found = False + for snap_image in self.db.getSnapshotImages(): + if (snap_image.provider_name == provider.name and + snap_image.image_name == image.name and + snap_image.state in [nodedb.READY, + nodedb.BUILDING]): + found = True + if not found: + self.log.warning("Missing image %s on %s" % + (image.name, provider.name)) + missing = True + if missing: + self.updateImages() + + def updateImages(self): + # This function should be run periodically to create new snapshot + # images. + for provider in self.config.providers.values(): + for image in provider.images.values(): + snap_image = self.db.createSnapshotImage( + provider_name=provider.name, + image_name=image.name) + t = ImageUpdater(provider, image, snap_image.id) + t.start() + # Enough time to give them different timestamps (versions) + # Just to keep things clearer. + time.sleep(2) + + def launchNode(self, provider, image, target): + provider = self.config.providers[provider.name] + image = provider.images[image.name] + node = self.db.createNode(provider.name, image.name) + t = NodeLauncher(self, provider, image, target, node.id) + t.start() + + def deleteNode(self, node): + # Delete a node + start_time = time.time() + node.state = nodedb.DELETE + self.updateStats(node.provider_name) + provider = self.config.providers[node.provider_name] + target = self.config.targets[node.target_name] + client = utils.get_client(provider) + + if target.jenkins_url: + jenkins = myjenkins.Jenkins(target.jenkins_url, + target.jenkins_user, + target.jenkins_apikey) + jenkins_name = node.nodename + if jenkins.node_exists(jenkins_name): + jenkins.delete_node(jenkins_name) + self.log.info("Deleted jenkins node ID: %s" % node.id) + + utils.delete_node(client, node) + self.log.info("Deleted node ID: %s" % node.id) + + if statsd: + dt = int((time.time() - start_time) * 1000) + key = 'nodepool.delete.%s.%s.%s' % (node.image_name, + node.provider_name, + node.target_name) + statsd.timing(key, dt) + statsd.incr(key) + self.updateStats(node.provider_name) + + def deleteImage(self, snap_image): + # Delete a node + snap_image.state = nodedb.DELETE + provider = self.config.providers[snap_image.provider_name] + client = utils.get_client(provider) + + utils.delete_image(client, snap_image) + self.log.info("Deleted image ID: %s" % snap_image.id) + + def periodicCleanup(self): + # This function should be run periodically to clean up any hosts + # that may have slipped through the cracks, as well as to remove + # old images. + + self.log.debug("Starting periodic cleanup") + db = nodedb.NodeDatabase() + for node in db.getNodes(): + if node.state in [nodedb.READY, nodedb.HOLD]: + continue + delete = False + if (node.state == nodedb.DELETE): + self.log.warning("Deleting node id: %s which is in delete " + "state" % node.id) + delete = True + elif time.time() - node.state_time > NODE_CLEANUP: + self.log.warning("Deleting node id: %s which has been in %s " + "state for %s hours" % + (node.id, node.state, + node.state_time / (60 * 60))) + delete = True + if delete: + try: + self.deleteNode(node) + except Exception: + self.log.exception("Exception deleting node ID: " + "%s" % node.id) + + for image in db.getSnapshotImages(): + # Normally, reap images that have sat in their current state + # for 24 hours, unless the image is the current snapshot + delete = False + if image.provider_name not in self.config.providers: + delete = True + self.log.info("Deleting image id: %s which has no current " + "provider" % image.id) + elif (image.image_name not in + self.config.providers[image.provider_name].images): + delete = True + self.log.info("Deleting image id: %s which has no current " + "base image" % image.id) + else: + current = db.getCurrentSnapshotImage(image.provider_name, + image.image_name) + if (current and image != current and + (time.time() - current.state_time) > KEEP_OLD_IMAGE): + self.log.info("Deleting image id: %s because the current " + "image is %s hours old" % + (image.id, current.state_time / (60 * 60))) + delete = True + if delete: + try: + self.deleteImage(image) + except Exception: + self.log.exception("Exception deleting image id: %s:" % + image.id) + self.log.debug("Finished periodic cleanup") + + def updateStats(self, provider_name): + if not statsd: + return + # This may be called outside of the main thread. + db = nodedb.NodeDatabase() + provider = self.config.providers[provider_name] + + states = {} + + for target in self.config.targets.values(): + for image in target.images.values(): + for provider in image.providers.values(): + base_key = 'nodepool.target.%s.%s.%s' % ( + target.name, image.name, + provider.name) + key = '%s.min_ready' % base_key + statsd.gauge(key, provider.min_ready) + for state in nodedb.STATE_NAMES.values(): + key = '%s.%s' % (base_key, state) + states[key] = 0 + + for node in db.getNodes(): + if node.state not in nodedb.STATE_NAMES: + continue + key = 'nodepool.target.%s.%s.%s.%s' % ( + node.target_name, node.image_name, + node.provider_name, nodedb.STATE_NAMES[node.state]) + states[key] += 1 + + for key, count in states.items(): + statsd.gauge(key, count) + + for provider in self.config.providers.values(): + key = 'nodepool.provider.%s.max_servers' % provider.name + statsd.gauge(key, provider.max_servers) diff --git a/nodepool/nodeutils.py b/nodepool/nodeutils.py new file mode 100644 index 000000000..345e1d80f --- /dev/null +++ b/nodepool/nodeutils.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python + +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import novaclient.client +import time +import paramiko +import socket +import logging +from sshclient import SSHClient + +import nodedb + +log = logging.getLogger("nodepool.utils") + + +def iterate_timeout(max_seconds, purpose): + start = time.time() + count = 0 + while (time.time() < start + max_seconds): + count += 1 + yield count + time.sleep(2) + raise Exception("Timeout waiting for %s" % purpose) + + +def get_client(provider): + args = ['1.1', provider.username, provider.password, + provider.project_id, provider.auth_url] + kwargs = {} + if provider.service_type: + kwargs['service_type'] = provider.service_type + if provider.service_name: + kwargs['service_name'] = provider.service_name + if provider.region_name: + kwargs['region_name'] = provider.region_name + client = novaclient.client.Client(*args, **kwargs) + return client + +extension_cache = {} + + +def get_extensions(client): + global extension_cache + cache = extension_cache.get(client) + if cache: + return cache + try: + resp, body = client.client.get('/extensions') + extensions = [x['alias'] for x in body['extensions']] + except novaclient.exceptions.NotFound: + extensions = [] + extension_cache[client] = extensions + return extensions + + +def get_flavor(client, min_ram): + flavors = [f for f in client.flavors.list() if f.ram >= min_ram] + flavors.sort(lambda a, b: cmp(a.ram, b.ram)) + return flavors[0] + + +def get_public_ip(server, version=4): + if 'os-floating-ips' in get_extensions(server.manager.api): + log.debug('Server %s using floating ips' % server.id) + for addr in server.manager.api.floating_ips.list(): + if addr.instance_id == server.id: + return addr.ip + log.debug('Server %s not using floating ips, addresses: %s' % + (server.id, server.addresses)) + for addr in server.addresses.get('public', []): + if type(addr) == type(u''): # Rackspace/openstack 1.0 + return addr + if addr['version'] == version: # Rackspace/openstack 1.1 + return addr['addr'] + for addr in server.addresses.get('private', []): + # HPcloud + if (addr['version'] == version and version == 4): + quad = map(int, addr['addr'].split('.')) + if quad[0] == 10: + continue + if quad[0] == 192 and quad[1] == 168: + continue + if quad[0] == 172 and (16 <= quad[1] <= 31): + continue + return addr['addr'] + return None + + +def add_public_ip(server): + ip = server.manager.api.floating_ips.create() + log.debug('Created floading IP %s for server %s' % (ip, server.id)) + server.add_floating_ip(ip) + for count in iterate_timeout(600, "ip to be added"): + try: + newip = ip.manager.get(ip.id) + except Exception: + log.exception('Unable to get IP details for server %s, ' + 'will retry' % (server.id)) + continue + + if newip.instance_id == server.id: + return + + +def add_keypair(client, name): + key = paramiko.RSAKey.generate(2048) + public_key = key.get_name() + ' ' + key.get_base64() + kp = client.keypairs.create(name, public_key) + return key, kp + + +def wait_for_resource(wait_resource, timeout=3600): + last_progress = None + last_status = None + for count in iterate_timeout(timeout, "waiting for %s" % wait_resource): + try: + resource = wait_resource.manager.get(wait_resource.id) + except: + log.exception('Unable to list resources, while waiting for %s ' + 'will retry' % wait_resource) + continue + + # In Rackspace v1.0, there is no progress attribute while queued + if hasattr(resource, 'progress'): + if (last_progress != resource.progress + or last_status != resource.status): + log.debug('Status of %s: %s %s' % (resource, resource.status, + resource.progress)) + last_progress = resource.progress + elif last_status != resource.status: + log.debug('Status of %s: %s' % (resource, resource.status)) + last_status = resource.status + if resource.status == 'ACTIVE': + return resource + + +def ssh_connect(ip, username, connect_kwargs={}, timeout=60): + # HPcloud may return errno 111 for about 30 seconds after adding the IP + for count in iterate_timeout(timeout, "ssh access"): + try: + client = SSHClient(ip, username, **connect_kwargs) + break + except socket.error, e: + if e[0] != 111: + log.exception('Exception while testing ssh access:') + + out = client.ssh("test ssh access", "echo access okay") + if "access okay" in out: + return client + return None + + +def create_server(client, hostname, base_image, flavor, add_key=False): + create_kwargs = dict(image=base_image, flavor=flavor, name=hostname) + + key = None + # hpcloud can't handle dots in keypair names + if add_key: + key_name = hostname.split('.')[0] + if 'os-keypairs' in get_extensions(client): + key, kp = add_keypair(client, key_name) + create_kwargs['key_name'] = key_name + + server = client.servers.create(**create_kwargs) + return server, key + + +def create_image(client, server, image_name): + # TODO: fix novaclient so it returns an image here + # image = server.create_image(name) + uuid = server.manager.create_image(server, image_name) + image = client.images.get(uuid) + return image + + +def delete_server(client, server): + try: + if 'os-floating-ips' in get_extensions(server.manager.api): + for addr in server.manager.api.floating_ips.list(): + if addr.instance_id == server.id: + server.remove_floating_ip(addr) + addr.delete() + except: + log.exception('Unable to remove floating IP from server %s' % + server.id) + + try: + if 'os-keypairs' in get_extensions(server.manager.api): + for kp in server.manager.api.keypairs.list(): + if kp.name == server.key_name: + kp.delete() + except: + log.exception('Unable to delete keypair from server %s' % server.id) + + log.debug('Deleting server %s' % server.id) + server.delete() + for count in iterate_timeout(3600, "waiting for server %s deletion" % + server.id): + try: + client.servers.get(server.id) + except novaclient.exceptions.NotFound: + return + + +def delete_node(client, node): + node.state = nodedb.DELETE + if node.external_id: + try: + server = client.servers.get(node.external_id) + delete_server(client, server) + except novaclient.exceptions.NotFound: + pass + + node.delete() + + +def delete_image(client, image): + server = None + if image.server_external_id: + try: + server = client.servers.get(image.server_external_id) + except novaclient.exceptions.NotFound: + log.warning('Image server id %s not found' % + image.server_external_id) + + if server: + delete_server(client, server) + + remote_image = None + if image.external_id: + try: + remote_image = client.images.get(image.external_id) + except novaclient.exceptions.NotFound: + log.warning('Image id %s not found' % image.external_id) + + if remote_image: + log.debug('Deleting image %s' % remote_image.id) + remote_image.delete() + + image.delete() diff --git a/nodepool/sshclient.py b/nodepool/sshclient.py new file mode 100644 index 000000000..ed7a20ac6 --- /dev/null +++ b/nodepool/sshclient.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python + +# Update the base image that is used for devstack VMs. + +# Copyright (C) 2011-2012 OpenStack LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import paramiko +import sys + + +class SSHClient(object): + def __init__(self, ip, username, password=None, pkey=None): + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy()) + client.connect(ip, username=username, password=password, pkey=pkey) + self.client = client + + def ssh(self, action, command): + stdin, stdout, stderr = self.client.exec_command(command) + print command + output = '' + for x in stdout: + output += x + sys.stdout.write(x) + ret = stdout.channel.recv_exit_status() + print stderr.read() + if ret: + raise Exception("Unable to %s" % action) + return output + + def scp(self, source, dest): + print 'copy', source, dest + ftp = self.client.open_sftp() + ftp.put(source, dest) + ftp.close() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..a713f4f85 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +pbr>=0.5.21,<1.0 + +PyYAML +python-jenkins +paramiko +lockfile +python-daemon +extras +statsd>=1.0.0,<3.0 +voluptuous>=0.6,<0.7 +apscheduler>=2.1.1,<3.0 +sqlalchemy>=0.8.2,<0.9.0 +pyzmq>=13.1.0,<14.0.0 +python-novaclient diff --git a/scripts/devstack-cache.py b/scripts/devstack-cache.py new file mode 100755 index 000000000..824139f84 --- /dev/null +++ b/scripts/devstack-cache.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python + +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import subprocess + +DEVSTACK='~/workspace-cache/devstack' + +def run_local(cmd, status=False, cwd='.', env={}): + print "Running:", cmd + newenv = os.environ + newenv.update(env) + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=cwd, + stderr=subprocess.STDOUT, env=newenv) + (out, nothing) = p.communicate() + if status: + return (p.returncode, out.strip()) + return out.strip() + + +def git_branches(): + branches = [] + for branch in run_local(['git', 'branch', '-a'], cwd=DEVSTACK).split("\n"): + branch = branch.strip() + if not branch.startswith('remotes/origin'): + continue + branches.append(branch) + return branches + + +def local_prep(distribution): + branches = [] + for branch in git_branches(): + # Ignore branches of the form 'somestring -> someotherstring' + # as this denotes a symbolic reference and the entire string + # as is cannot be checked out. We can do this safely as the + # reference will refer to one of the other branches returned + # by git_branches. + if ' -> ' in branch: + continue + branch_data = {'name': branch} + print 'Branch: ', branch + run_local(['git', 'checkout', branch], cwd=DEVSTACK) + run_local(['git', 'pull', '--ff-only', 'origin'], cwd=DEVSTACK) + + debs = [] + debdir = os.path.join(DEVSTACK, 'files', 'apts') + for fn in os.listdir(debdir): + fn = os.path.join(debdir, fn) + tokenize(fn, debs, distribution, comment='#') + branch_data['debs'] = debs + + images = [] + for line in open(os.path.join(DEVSTACK, 'stackrc')): + line = line.strip() + if line.startswith('IMAGE_URLS'): + if '#' in line: + line = line[:line.rfind('#')] + if line.endswith(';;'): + line = line[:-2] + line = line.split('=', 1)[1].strip() + if line.startswith('${IMAGE_URLS:-'): + line = line[len('${IMAGE_URLS:-'):] + if line.endswith('}'): + line = line[:-1] + if line[0] == line[-1] == '"': + line = line[1:-1] + images += [x.strip() for x in line.split(',')] + branch_data['images'] = images + branches.append(branch_data) + return branches + + +def main(): + distribution = sys.argv[1] + + branches = local_prep(distribution) + image_filenames = {} + for branch_data in branches: + if branch_data['debs']: + run_local(['sudo', 'apt-get', '-y', '-d', 'install'] + + branch_data['debs']) + + for url in branch_data['images']: + fname = url.split('/')[-1] + if fname in image_filenames: + continue + run_local(['wget', '-nv', '-c', url, + '-O', '~/cache/files/%s' % fname]) diff --git a/scripts/prepare_devstack.sh b/scripts/prepare_devstack.sh new file mode 100755 index 000000000..68b958739 --- /dev/null +++ b/scripts/prepare_devstack.sh @@ -0,0 +1,63 @@ +#!/bin/bash -xe + +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +mkdir -p ~/cache/files +mkdir -p ~/cache/pip +sudo DEBIAN_FRONTEND=noninteractive apt-get \ + --option "Dpkg::Options::=--force-confold" \ + --assume-yes install build-essential python-dev \ + linux-headers-virtual linux-headers-`uname -r` + +rm -rf ~/workspace-cache +mkdir -p ~/workspace-cache + +pushd ~/workspace-cache +git clone https://review.openstack.org/p/openstack-dev/devstack +git clone https://review.openstack.org/p/openstack-dev/grenade +git clone https://review.openstack.org/p/openstack-dev/pbr +git clone https://review.openstack.org/p/openstack-infra/devstack-gate +git clone https://review.openstack.org/p/openstack-infra/jeepyb +git clone https://review.openstack.org/p/openstack/ceilometer +git clone https://review.openstack.org/p/openstack/cinder +git clone https://review.openstack.org/p/openstack/glance +git clone https://review.openstack.org/p/openstack/heat +git clone https://review.openstack.org/p/openstack/horizon +git clone https://review.openstack.org/p/openstack/keystone +git clone https://review.openstack.org/p/openstack/neutron +git clone https://review.openstack.org/p/openstack/nova +git clone https://review.openstack.org/p/openstack/oslo.config +git clone https://review.openstack.org/p/openstack/oslo.messaging +git clone https://review.openstack.org/p/openstack/python-ceilometerclient +git clone https://review.openstack.org/p/openstack/python-cinderclient +git clone https://review.openstack.org/p/openstack/python-glanceclient +git clone https://review.openstack.org/p/openstack/python-heatclient +git clone https://review.openstack.org/p/openstack/python-keystoneclient +git clone https://review.openstack.org/p/openstack/python-neutronclient +git clone https://review.openstack.org/p/openstack/python-novaclient +git clone https://review.openstack.org/p/openstack/python-openstackclient +git clone https://review.openstack.org/p/openstack/python-swiftclient +git clone https://review.openstack.org/p/openstack/requirements +git clone https://review.openstack.org/p/openstack/swift +git clone https://review.openstack.org/p/openstack/tempest +popd + +. /etc/lsb-release +python ./devstack-cache.py $DISTRIB_CODENAME + +sync +sleep 5 diff --git a/scripts/prepare_node.sh b/scripts/prepare_node.sh new file mode 100755 index 000000000..545766308 --- /dev/null +++ b/scripts/prepare_node.sh @@ -0,0 +1,33 @@ +#!/bin/bash -xe + +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +HOSTNAME=$1 + +sudo hostname $1 +wget https://raw.github.com/openstack-infra/config/master/install_puppet.sh +sudo bash -xe install_puppet.sh +sudo git clone https://review.openstack.org/p/openstack-infra/config.git \ + /root/config +sudo /bin/bash /root/config/install_modules.sh +if [ -z $NODEPOOL_SSH_KEY ] ; then + sudo puppet apply --modulepath=/root/config/modules:/etc/puppet/modules \ + -e "class {'openstack_project::slave_template': }" +else + sudo puppet apply --modulepath=/root/config/modules:/etc/puppet/modules \ + -e "class {'openstack_project::slave_template': install_users => false, ssh_key => '$NODEPOOL_SSH_KEY', }" +fi diff --git a/scripts/prepare_node_devstack.sh b/scripts/prepare_node_devstack.sh new file mode 100755 index 000000000..094df63fc --- /dev/null +++ b/scripts/prepare_node_devstack.sh @@ -0,0 +1,22 @@ +#!/bin/bash -xe + +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +HOSTNAME=$1 + +./prepare_node.sh $HOSTNAME +./prepare_devstack.sh $HOSTNAME diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 000000000..bb4ab1468 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,29 @@ +[metadata] +name = nodepool +summary = Node pool management for a distributed test infrastructure +description-file = + README.rst +author = OpenStack Infrastructure Team +author-email = openstack-infra@lists.openstack.org +home-page = http://ci.openstack.org/ +classifier = + Intended Audience :: Information Technology + Intended Audience :: System Administrators + License :: OSI Approved :: Apache Software License + Operating System :: POSIX :: Linux + Programming Language :: Python + Programming Language :: Python :: 2 + Programming Language :: Python :: 2.7 + Programming Language :: Python :: 2.6 + +[pbr] +warnerrors = True + +[entry_points] +console_scripts = + nodepool = nodepool.cmd.nodepoold:main + +[build_sphinx] +source-dir = doc/source +build-dir = doc/build +all_files = 1 diff --git a/setup.py b/setup.py new file mode 100644 index 000000000..f429ba9c1 --- /dev/null +++ b/setup.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +# Copyright (c) 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import setuptools + +setuptools.setup( + setup_requires=['pbr>=0.5.21,<1.0'], + pbr=True) diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 000000000..84496b491 --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1,9 @@ +flake8==2.0 +coverage +sphinx +docutils==0.9.1 +discover +fixtures>=0.3.12 +python-subunit +testrepository>=0.0.13 +testtools>=0.9.27 diff --git a/tools/fake-statsd.py b/tools/fake-statsd.py new file mode 100644 index 000000000..1f0c0567f --- /dev/null +++ b/tools/fake-statsd.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python + +# Copyright 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +# A fake statsd daemon; it just prints everything out. + +import threading +import signal +import socket +import os +import select + +class FakeStatsd(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + self.daemon = True + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock.bind(('', 8125)) + self.port = self.sock.getsockname()[1] + self.wake_read, self.wake_write = os.pipe() + self.stats = [] + + def run(self): + while True: + poll = select.poll() + poll.register(self.sock, select.POLLIN) + poll.register(self.wake_read, select.POLLIN) + ret = poll.poll() + for (fd, event) in ret: + if fd == self.sock.fileno(): + data = self.sock.recvfrom(1024) + if not data: + return + self.stats.append(data[0]) + print data[0] + if fd == self.wake_read: + return + + def stop(self): + os.write(self.wake_write, '1\n') + +s = FakeStatsd() +s.start() +signal.pause() diff --git a/tools/zmq-server.py b/tools/zmq-server.py new file mode 100644 index 000000000..2d0f96b0c --- /dev/null +++ b/tools/zmq-server.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python + +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# Copyright 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +# A test script to stand in for a zeromq enabled jenkins. It sends zmq +# events that simulate the jenkins node lifecycle. +# +# Usage: +# zmq-server.py start HOSTNAME +# zmq-server.py complete HOSTNAME + +import zmq +import json +context = zmq.Context() +socket = context.socket(zmq.PUB) +socket.bind("tcp://*:8888") + +print('ready') +while True: + line = raw_input() + phase, host = line.split() + if phase=='start': + topic = 'onStarted' + data = {"name":"test","url":"job/test/","build":{"full_url":"http://localhost:8080/job/test/1/","number":1,"phase":"STARTED","url":"job/test/1/","node_name":host}} + elif phase=='complete': + topic = 'onFinalized' + data = {"name":"test","url":"job/test/","build":{"full_url":"http://localhost:8080/job/test/1/","number":1,"phase":"FINISHED","status":"SUCCESS","url":"job/test/1/","node_name":host}} + socket.send("%s %s" % (topic, json.dumps(data))) diff --git a/tools/zmq-stream.py b/tools/zmq-stream.py new file mode 100644 index 000000000..aa0227547 --- /dev/null +++ b/tools/zmq-stream.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +# A test script to watch a zmq stream +# +# Usage: +# zmq-stream.py + +import zmq + +context = zmq.Context() +socket = context.socket(zmq.SUB) +event_filter = b"" +socket.setsockopt(zmq.SUBSCRIBE, event_filter) +socket.connect("tcp://localhost:8888") + +print('ready') +while True: + m = socket.recv().decode('utf-8') + print(m) diff --git a/tox.ini b/tox.ini new file mode 100644 index 000000000..6282707a6 --- /dev/null +++ b/tox.ini @@ -0,0 +1,30 @@ +[tox] +envlist = pep8 + +[testenv] +# Set STATSD env variables so that statsd code paths are tested. +setenv = STATSD_HOST=localhost + STATSD_PORT=8125 + VIRTUAL_ENV={envdir} +deps = -r{toxinidir}/requirements.txt + -r{toxinidir}/test-requirements.txt +commands = + python setup.py testr --slowest --testr-args='{posargs}' + +[tox:jenkins] +downloadcache = ~/cache/pip + +[testenv:pep8] +commands = flake8 nodepool + +[testenv:cover] +commands = + python setup.py testr --coverage + +[testenv:venv] +commands = {posargs} + +[flake8] +ignore = E123,E125 +show-source = True +exclude = .venv,.tox,dist,doc,build,*.egg