Add a nodepool command

Moves the daemon command to nodepoold.

Refactor config handling a bit in NodePool to make the config
objects just contain information by default (though things
such as database handles and managers may get added to them
later as needed).

Start with the list and image-list commands.

Change-Id: If2ba7bca7ab4ef922787176af87ad5de31ae4b3e
This commit is contained in:
James E. Blair 2013-08-30 14:31:07 -07:00
parent b1b8a569ef
commit 7a1fe1891f
4 changed files with 261 additions and 125 deletions

103
nodepool/cmd/nodepoolcmd.py Normal file
View File

@ -0,0 +1,103 @@
#!/usr/bin/env python
#
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import logging.config
import sys
import time
from nodepool import nodedb
from nodepool import nodepool
from prettytable import PrettyTable
class NodePoolCmd(object):
def __init__(self):
self.args = None
def parse_arguments(self):
parser = argparse.ArgumentParser(description='Node pool.')
parser.add_argument('-c', dest='config',
help='path to config file')
parser.add_argument('--version', dest='version', action='store_true',
help='show version')
subparsers = parser.add_subparsers(title='commands',
description='valid commands',
help='additional help')
cmd_list = subparsers.add_parser('list', help='list nodes')
cmd_list.set_defaults(func=self.list)
cmd_image_list = subparsers.add_parser('image-list',
help='list images')
cmd_image_list.set_defaults(func=self.image_list)
self.args = parser.parse_args()
def setup_logging(self):
logging.basicConfig(level=logging.INFO)
def list(self):
t = PrettyTable(["Provider", "Image", "Target", "Hostname", "NodeName",
"Server ID", "IP", "State", "Age (hours)"])
t.align = 'l'
now = time.time()
with self.pool.getDB().getSession() as session:
for node in session.getNodes():
t.add_row([node.provider_name, node.image_name,
node.target_name, node.hostname, node.nodename,
node.external_id, node.ip,
nodedb.STATE_NAMES[node.state],
'%.02f' % ((now - node.state_time) / 3600)])
print t
def image_list(self):
t = PrettyTable(["Provider", "Image", "Hostname", "Version",
"Image ID", "Server ID", "State", "Age (hours)"])
t.align = 'l'
now = time.time()
with self.pool.getDB().getSession() as session:
for image in session.getSnapshotImages():
t.add_row([image.provider_name, image.image_name,
image.hostname, image.version,
image.external_id, image.server_external_id,
nodedb.STATE_NAMES[image.state],
'%.02f' % ((now - image.state_time) / 3600)])
print t
def main(self):
self.parse_arguments()
if self.args.version:
from nodepool.version import version_info as npc_version_info
print "Nodepool version: %s" % npc_version_info.version_string()
return(0)
self.pool = nodepool.NodePool(self.args.config)
config = self.pool.loadConfig()
self.pool.reconfigureDatabase(config)
self.pool.setConfig(config)
self.args.func()
def main():
npc = NodePoolCmd()
npc.setup_logging()
return npc.main()
if __name__ == "__main__":
sys.exit(main())

View File

@ -58,7 +58,7 @@ class NodeCompleteThread(threading.Thread):
def run(self): def run(self):
try: try:
with self.nodepool.db.getSession() as session: with self.nodepool.getDB().getSession() as session:
self.handleEvent(session) self.handleEvent(session)
except Exception: except Exception:
self.log.exception("Exception handling event for %s:" % self.log.exception("Exception handling event for %s:" %
@ -134,7 +134,7 @@ class NodeUpdateListener(threading.Thread):
topic) topic)
def handleStartPhase(self, nodename, jobname): def handleStartPhase(self, nodename, jobname):
with self.nodepool.db.getSession() as session: with self.nodepool.getDB().getSession() as session:
node = session.getNodeByNodename(nodename) node = session.getNodeByNodename(nodename)
if not node: if not node:
self.log.debug("Unable to find node with nodename: %s" % self.log.debug("Unable to find node with nodename: %s" %
@ -173,7 +173,7 @@ class NodeLauncher(threading.Thread):
self.log.exception("Exception in run method:") self.log.exception("Exception in run method:")
def _run(self): def _run(self):
with self.nodepool.db.getSession() as session: with self.nodepool.getDB().getSession() as session:
self.log.debug("Launching node id: %s" % self.node_id) self.log.debug("Launching node id: %s" % self.node_id)
try: try:
self.node = session.getNode(self.node_id) self.node = session.getNode(self.node_id)
@ -304,7 +304,7 @@ class ImageUpdater(threading.Thread):
self.log.exception("Exception in run method:") self.log.exception("Exception in run method:")
def _run(self): def _run(self):
with self.nodepool.db.getSession() as session: with self.nodepool.getDB().getSession() as session:
self.log.debug("Updating image %s in %s " % (self.image.name, self.log.debug("Updating image %s in %s " % (self.image.name,
self.provider.name)) self.provider.name))
try: try:
@ -465,83 +465,64 @@ class TargetImageProvider(ConfigValue):
pass pass
class Cron(ConfigValue):
pass
class ZMQPublisher(ConfigValue):
pass
class NodePool(threading.Thread): class NodePool(threading.Thread):
log = logging.getLogger("nodepool.NodePool") log = logging.getLogger("nodepool.NodePool")
def __init__(self, configfile): def __init__(self, configfile):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.configfile = configfile self.configfile = configfile
self.zmq_context = None
self.zmq_listeners = {}
self.db = None
self.dburi = None
self.apsched = apscheduler.scheduler.Scheduler()
self.apsched.start()
self.update_cron = ''
self.update_job = None
self.cleanup_cron = ''
self.cleanup_job = None
self.check_cron = ''
self.check_job = None
self._stopped = False self._stopped = False
self.config = None self.config = None
self.loadConfig() self.zmq_context = None
self.apsched = None
def stop(self): def stop(self):
self._stopped = True self._stopped = True
if self.zmq_context:
self.zmq_context.destroy() self.zmq_context.destroy()
if self.apsched:
self.apsched.shutdown() self.apsched.shutdown()
def loadConfig(self): def loadConfig(self):
self.log.debug("Loading configuration") self.log.debug("Loading configuration")
config = yaml.load(open(self.configfile)) config = yaml.load(open(self.configfile))
update_cron = config.get('cron', {}).get('image-update', '14 2 * * *')
cleanup_cron = config.get('cron', {}).get('cleanup', '27 */6 * * *')
check_cron = config.get('cron', {}).get('check', '*/15 * * * *')
if (update_cron != self.update_cron):
if self.update_job:
self.apsched.unschedule_job(self.update_job)
parts = update_cron.split()
minute, hour, dom, month, dow = parts[:5]
self.apsched.add_cron_job(self._doUpdateImages,
day=dom,
day_of_week=dow,
hour=hour,
minute=minute)
self.update_cron = update_cron
if (cleanup_cron != self.cleanup_cron):
if self.cleanup_job:
self.apsched.unschedule_job(self.cleanup_job)
parts = cleanup_cron.split()
minute, hour, dom, month, dow = parts[:5]
self.apsched.add_cron_job(self._doPeriodicCleanup,
day=dom,
day_of_week=dow,
hour=hour,
minute=minute)
self.cleanup_cron = cleanup_cron
if (check_cron != self.check_cron):
if self.check_job:
self.apsched.unschedule_job(self.check_job)
parts = check_cron.split()
minute, hour, dom, month, dow = parts[:5]
self.apsched.add_cron_job(self._doPeriodicCheck,
day=dom,
day_of_week=dow,
hour=hour,
minute=minute)
self.check_cron = check_cron
newconfig = Config() newconfig = Config()
newconfig.db = None
newconfig.dburi = None
newconfig.providers = {} newconfig.providers = {}
newconfig.targets = {} newconfig.targets = {}
newconfig.scriptdir = config.get('script-dir') newconfig.scriptdir = config.get('script-dir')
newconfig.dburi = config.get('dburi') newconfig.dburi = config.get('dburi')
newconfig.provider_managers = {} newconfig.provider_managers = {}
newconfig.jenkins_managers = {} newconfig.jenkins_managers = {}
stop_managers = [] newconfig.zmq_publishers = {}
newconfig.crons = {}
for name, default in [
('image-update', '14 2 * * *'),
('cleanup', '27 */6 * * *'),
('check', '*/15 * * * *'),
]:
c = Cron()
c.name = name
newconfig.crons[c.name] = c
c.job = None
c.timespec = config.get('cron', {}).get(name, default)
for addr in config['zmq-publishers']:
z = ZMQPublisher()
z.name = addr
z.listener = None
newconfig.zmq_publishers[z.name] = z
for provider in config['providers']: for provider in config['providers']:
p = Provider() p = Provider()
@ -556,27 +537,6 @@ class NodePool(threading.Thread):
p.region_name = provider.get('region-name') p.region_name = provider.get('region-name')
p.max_servers = provider['max-servers'] p.max_servers = provider['max-servers']
p.rate = provider.get('rate', 1.0) p.rate = provider.get('rate', 1.0)
oldmanager = None
if self.config:
oldmanager = self.config.provider_managers.get(p.name)
if oldmanager:
if (p.username != oldmanager.provider.username or
p.password != oldmanager.provider.password or
p.project_id != oldmanager.provider.project_id or
p.auth_url != oldmanager.provider.auth_url or
p.service_type != oldmanager.provider.service_type or
p.service_name != oldmanager.provider.service_name or
p.region_name != oldmanager.provider.region_name):
stop_managers.append(oldmanager)
oldmanager = None
if oldmanager:
newconfig.provider_managers[p.name] = oldmanager
else:
self.log.debug("Creating new ProviderManager object for %s" %
p.name)
newconfig.provider_managers[p.name] = \
provider_manager.ProviderManager(p)
newconfig.provider_managers[p.name].start()
p.images = {} p.images = {}
for image in provider['images']: for image in provider['images']:
i = ProviderImage() i = ProviderImage()
@ -607,23 +567,6 @@ class NodePool(threading.Thread):
t.jenkins_credentials_id = None t.jenkins_credentials_id = None
t.jenkins_test_job = None t.jenkins_test_job = None
t.rate = target.get('rate', 1.0) t.rate = target.get('rate', 1.0)
oldmanager = None
if self.config:
oldmanager = self.config.jenkins_managers.get(t.name)
if oldmanager:
if (t.jenkins_url != oldmanager.target.jenkins_url or
t.jenkins_user != oldmanager.target.jenkins_user or
t.jenkins_apikey != oldmanager.target.jenkins_apikey):
stop_managers.append(oldmanager)
oldmanager = None
if oldmanager:
newconfig.jenkins_managers[t.name] = oldmanager
else:
self.log.debug("Creating new JenkinsManager object for %s" %
t.name)
newconfig.jenkins_managers[t.name] = \
jenkins_manager.JenkinsManager(t)
newconfig.jenkins_managers[t.name].start()
t.images = {} t.images = {}
for image in target['images']: for image in target['images']:
i = TargetImage() i = TargetImage()
@ -635,13 +578,113 @@ class NodePool(threading.Thread):
p.name = provider['name'] p.name = provider['name']
i.providers[p.name] = p i.providers[p.name] = p
p.min_ready = provider['min-ready'] p.min_ready = provider['min-ready']
self.config = newconfig return newconfig
def reconfigureDatabase(self, config):
if (not self.config) or config.dburi != self.config.dburi:
config.db = nodedb.NodeDatabase(config.dburi)
else:
config.db = self.config.db
def reconfigureManagers(self, config):
stop_managers = []
for p in config.providers.values():
oldmanager = None
if self.config:
oldmanager = self.config.provider_managers.get(p.name)
if oldmanager:
if (p.username != oldmanager.provider.username or
p.password != oldmanager.provider.password or
p.project_id != oldmanager.provider.project_id or
p.auth_url != oldmanager.provider.auth_url or
p.service_type != oldmanager.provider.service_type or
p.service_name != oldmanager.provider.service_name or
p.region_name != oldmanager.provider.region_name):
stop_managers.append(oldmanager)
oldmanager = None
if oldmanager:
config.provider_managers[p.name] = oldmanager
else:
self.log.debug("Creating new ProviderManager object for %s" %
p.name)
config.provider_managers[p.name] = \
provider_manager.ProviderManager(p)
config.provider_managers[p.name].start()
for t in config.targets.values():
oldmanager = None
if self.config:
oldmanager = self.config.jenkins_managers.get(t.name)
if oldmanager:
if (t.jenkins_url != oldmanager.target.jenkins_url or
t.jenkins_user != oldmanager.target.jenkins_user or
t.jenkins_apikey != oldmanager.target.jenkins_apikey):
stop_managers.append(oldmanager)
oldmanager = None
if oldmanager:
config.jenkins_managers[t.name] = oldmanager
else:
self.log.debug("Creating new JenkinsManager object for %s" %
t.name)
config.jenkins_managers[t.name] = \
jenkins_manager.JenkinsManager(t)
config.jenkins_managers[t.name].start()
for oldmanager in stop_managers: for oldmanager in stop_managers:
oldmanager.stop() oldmanager.stop()
if self.config.dburi != self.dburi:
self.dburi = self.config.dburi def reconfigureCrons(self, config):
self.db = nodedb.NodeDatabase(self.config.dburi) cron_map = {
self.startUpdateListeners(config['zmq-publishers']) 'image-update': self._doUpdateImages,
'cleanup': self._doPeriodicCleanup,
'check': self._doPeriodicCheck,
}
if not self.apsched:
self.apsched = apscheduler.scheduler.Scheduler()
self.apsched.start()
for c in config.crons.values():
if ((not self.config) or
c.timespec != self.config.crons[c.name].timespec):
if self.config and self.config.crons[c.name].job:
self.apsched.unschedule_job(self.config.crons[c.name].job)
parts = c.timespec.split()
minute, hour, dom, month, dow = parts[:5]
c.job = self.apsched.add_cron_job(
cron_map[c.name],
day=dom,
day_of_week=dow,
hour=hour,
minute=minute)
else:
c.job = self.config.crons[c.name].job
def reconfigureUpdateListeners(self, config):
if self.config:
running = set(self.config.zmq_publishers.keys())
else:
running = set()
configured = set(config.zmq_publishers.keys())
if running == configured:
self.log.debug("ZMQ Listeners do not need to be updated")
config.zmq_publishers = self.config.zmq_publishers
return
if self.zmq_context:
self.log.debug("Stopping listeners")
self.zmq_context.destroy()
self.zmq_context = zmq.Context()
for z in config.zmq_publishers.values():
self.log.debug("Starting listener for %s" % z.name)
z.listener = NodeUpdateListener(self, z.name)
z.listener.start()
def setConfig(self, config):
self.config = config
def getDB(self):
return self.config.db
def getProviderManager(self, provider): def getProviderManager(self, provider):
return self.config.provider_managers[provider.name] return self.config.provider_managers[provider.name]
@ -649,24 +692,6 @@ class NodePool(threading.Thread):
def getJenkinsManager(self, target): def getJenkinsManager(self, target):
return self.config.jenkins_managers[target.name] return self.config.jenkins_managers[target.name]
def startUpdateListeners(self, publishers):
running = set(self.zmq_listeners.keys())
configured = set(publishers)
if running == configured:
self.log.debug("Listeners do not need to be updated")
return
if self.zmq_context:
self.log.debug("Stopping listeners")
self.zmq_context.destroy()
self.zmq_listeners = {}
self.zmq_context = zmq.Context()
for addr in publishers:
self.log.debug("Starting listener for %s" % addr)
listener = NodeUpdateListener(self, addr)
self.zmq_listeners[addr] = listener
listener.start()
def getNumNeededNodes(self, session, target, provider, image): def getNumNeededNodes(self, session, target, provider, image):
# Count machines that are ready and machines that are building, # Count machines that are ready and machines that are building,
# so that if the provider is very slow, we aren't queueing up tons # so that if the provider is very slow, we aren't queueing up tons
@ -692,8 +717,14 @@ class NodePool(threading.Thread):
def run(self): def run(self):
while not self._stopped: while not self._stopped:
try: try:
self.loadConfig() config = self.loadConfig()
with self.db.getSession() as session: self.reconfigureDatabase(config)
self.reconfigureManagers(config)
self.reconfigureCrons(config)
self.reconfigureUpdateListeners(config)
self.setConfig(config)
with self.getDB().getSession() as session:
self._run(session) self._run(session)
except Exception: except Exception:
self.log.exception("Exception in main loop:") self.log.exception("Exception in main loop:")
@ -744,7 +775,7 @@ class NodePool(threading.Thread):
def _doUpdateImages(self): def _doUpdateImages(self):
try: try:
with self.db.getSession() as session: with self.getDB().getSession() as session:
self.updateImages(session) self.updateImages(session)
except Exception: except Exception:
self.log.exception("Exception in periodic image update:") self.log.exception("Exception in periodic image update:")
@ -839,7 +870,7 @@ class NodePool(threading.Thread):
def _doPeriodicCleanup(self): def _doPeriodicCleanup(self):
try: try:
with self.db.getSession() as session: with self.getDB().getSession() as session:
self.periodicCleanup(session) self.periodicCleanup(session)
except Exception: except Exception:
self.log.exception("Exception in periodic cleanup:") self.log.exception("Exception in periodic cleanup:")
@ -913,7 +944,7 @@ class NodePool(threading.Thread):
def _doPeriodicCheck(self): def _doPeriodicCheck(self):
try: try:
with self.db.getSession() as session: with self.getDB().getSession() as session:
self.periodicCheck(session) self.periodicCheck(session)
except Exception: except Exception:
self.log.exception("Exception in periodic chack:") self.log.exception("Exception in periodic chack:")

View File

@ -13,3 +13,4 @@ sqlalchemy>=0.8.2,<0.9.0
pyzmq>=13.1.0,<14.0.0 pyzmq>=13.1.0,<14.0.0
python-novaclient python-novaclient
MySQL-python MySQL-python
PrettyTable>=0.6,<0.8

View File

@ -21,7 +21,8 @@ warnerrors = True
[entry_points] [entry_points]
console_scripts = console_scripts =
nodepool = nodepool.cmd.nodepoold:main nodepool = nodepool.cmd.nodepoolcmd:main
nodepoold = nodepool.cmd.nodepoold:main
[build_sphinx] [build_sphinx]
source-dir = doc/source source-dir = doc/source