2241 lines
91 KiB
Python
2241 lines
91 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Copyright (C) 2011-2014 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
#
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import apscheduler.schedulers.background
|
|
import apscheduler.triggers.cron
|
|
import gear
|
|
import json
|
|
import logging
|
|
import os.path
|
|
import paramiko
|
|
import pprint
|
|
import random
|
|
import re
|
|
import threading
|
|
import time
|
|
from uuid import uuid4
|
|
import zmq
|
|
|
|
import allocation
|
|
import jenkins_manager
|
|
import nodedb
|
|
import exceptions
|
|
import nodeutils as utils
|
|
import provider_manager
|
|
import stats
|
|
import config as nodepool_config
|
|
|
|
import jobs
|
|
|
|
MINS = 60
|
|
HOURS = 60 * MINS
|
|
|
|
WATERMARK_SLEEP = 10 # Interval between checking if new servers needed
|
|
IMAGE_TIMEOUT = 6 * HOURS # How long to wait for an image save
|
|
CONNECT_TIMEOUT = 10 * MINS # How long to try to connect after a server
|
|
# is ACTIVE
|
|
NODE_CLEANUP = 8 * HOURS # When to start deleting a node that is not
|
|
# READY or HOLD
|
|
TEST_CLEANUP = 5 * MINS # When to start deleting a node that is in TEST
|
|
IMAGE_CLEANUP = 8 * HOURS # When to start deleting an image that is not
|
|
# READY or is not the current or previous image
|
|
DELETE_DELAY = 1 * MINS # Delay before deleting a node that has completed
|
|
# its job.
|
|
|
|
|
|
class LaunchNodepoolException(Exception):
|
|
statsd_key = 'error.nodepool'
|
|
|
|
|
|
class LaunchStatusException(Exception):
|
|
statsd_key = 'error.status'
|
|
|
|
|
|
class LaunchNetworkException(Exception):
|
|
statsd_key = 'error.network'
|
|
|
|
|
|
class LaunchAuthException(Exception):
|
|
statsd_key = 'error.auth'
|
|
|
|
|
|
class NodeCompleteThread(threading.Thread):
|
|
log = logging.getLogger("nodepool.NodeCompleteThread")
|
|
|
|
def __init__(self, nodepool, nodename, jobname, result, branch):
|
|
threading.Thread.__init__(self, name='NodeCompleteThread for %s' %
|
|
nodename)
|
|
self.nodename = nodename
|
|
self.nodepool = nodepool
|
|
self.jobname = jobname
|
|
self.result = result
|
|
self.branch = branch
|
|
self.statsd = stats.get_client()
|
|
|
|
def run(self):
|
|
try:
|
|
with self.nodepool.getDB().getSession() as session:
|
|
self.handleEvent(session)
|
|
except Exception:
|
|
self.log.exception("Exception handling event for %s:" %
|
|
self.nodename)
|
|
|
|
def handleEvent(self, session):
|
|
node = session.getNodeByNodename(self.nodename)
|
|
if not node:
|
|
self.log.debug("Unable to find node with nodename: %s" %
|
|
self.nodename)
|
|
return
|
|
|
|
if node.state == nodedb.HOLD:
|
|
self.log.info("Node id: %s is complete but in HOLD state" %
|
|
node.id)
|
|
return
|
|
|
|
target = self.nodepool.config.targets[node.target_name]
|
|
if self.jobname == target.jenkins_test_job:
|
|
self.log.debug("Test job for node id: %s complete, result: %s" %
|
|
(node.id, self.result))
|
|
if self.result == 'SUCCESS':
|
|
jenkins = self.nodepool.getJenkinsManager(target)
|
|
old = jenkins.relabelNode(node.nodename, [node.image_name])
|
|
if not old:
|
|
old = '[unlabeled]'
|
|
self.log.info("Relabeled jenkins node id: %s from %s to %s" %
|
|
(node.id, old, node.image_name))
|
|
node.state = nodedb.READY
|
|
self.log.info("Node id: %s is ready" % node.id)
|
|
self.nodepool.updateStats(session, node.provider_name)
|
|
return
|
|
self.log.info("Node id: %s failed acceptance test, deleting" %
|
|
node.id)
|
|
|
|
if self.statsd and self.result == 'SUCCESS':
|
|
start = node.state_time
|
|
dt = int((time.time() - start) * 1000)
|
|
|
|
# nodepool.job.tempest
|
|
key = 'nodepool.job.%s' % self.jobname
|
|
self.statsd.timing(key + '.runtime', dt)
|
|
self.statsd.incr(key + '.builds')
|
|
|
|
# nodepool.job.tempest.master
|
|
key += '.%s' % self.branch
|
|
self.statsd.timing(key + '.runtime', dt)
|
|
self.statsd.incr(key + '.builds')
|
|
|
|
# nodepool.job.tempest.master.devstack-precise
|
|
key += '.%s' % node.label_name
|
|
self.statsd.timing(key + '.runtime', dt)
|
|
self.statsd.incr(key + '.builds')
|
|
|
|
# nodepool.job.tempest.master.devstack-precise.rax-ord
|
|
key += '.%s' % node.provider_name
|
|
self.statsd.timing(key + '.runtime', dt)
|
|
self.statsd.incr(key + '.builds')
|
|
|
|
time.sleep(DELETE_DELAY)
|
|
self.nodepool.deleteNode(node.id)
|
|
|
|
|
|
class NodeUpdateListener(threading.Thread):
|
|
log = logging.getLogger("nodepool.NodeUpdateListener")
|
|
|
|
def __init__(self, nodepool, addr):
|
|
threading.Thread.__init__(self, name='NodeUpdateListener')
|
|
self.nodepool = nodepool
|
|
self.socket = self.nodepool.zmq_context.socket(zmq.SUB)
|
|
self.socket.RCVTIMEO = 1000
|
|
event_filter = b""
|
|
self.socket.setsockopt(zmq.SUBSCRIBE, event_filter)
|
|
self.socket.connect(addr)
|
|
self._stopped = False
|
|
|
|
def run(self):
|
|
while not self._stopped:
|
|
try:
|
|
m = self.socket.recv().decode('utf-8')
|
|
except zmq.error.Again:
|
|
continue
|
|
try:
|
|
topic, data = m.split(None, 1)
|
|
self.handleEvent(topic, data)
|
|
except Exception:
|
|
self.log.exception("Exception handling job:")
|
|
|
|
def stop(self):
|
|
self._stopped = True
|
|
|
|
def handleEvent(self, topic, data):
|
|
self.log.debug("Received: %s %s" % (topic, data))
|
|
args = json.loads(data)
|
|
build = args['build']
|
|
if 'node_name' not in build:
|
|
return
|
|
jobname = args['name']
|
|
nodename = args['build']['node_name']
|
|
if topic == 'onStarted':
|
|
self.handleStartPhase(nodename, jobname)
|
|
elif topic == 'onCompleted':
|
|
pass
|
|
elif topic == 'onFinalized':
|
|
result = args['build'].get('status')
|
|
params = args['build'].get('parameters')
|
|
if params:
|
|
branch = params.get('ZUUL_BRANCH', 'unknown_branch')
|
|
else:
|
|
branch = 'unknown_branch'
|
|
self.handleCompletePhase(nodename, jobname, result, branch)
|
|
else:
|
|
raise Exception("Received job for unhandled phase: %s" %
|
|
topic)
|
|
|
|
def handleStartPhase(self, nodename, jobname):
|
|
with self.nodepool.getDB().getSession() as session:
|
|
node = session.getNodeByNodename(nodename)
|
|
if not node:
|
|
self.log.debug("Unable to find node with nodename: %s" %
|
|
nodename)
|
|
return
|
|
|
|
target = self.nodepool.config.targets[node.target_name]
|
|
if jobname == target.jenkins_test_job:
|
|
self.log.debug("Test job for node id: %s started" % node.id)
|
|
return
|
|
|
|
# Preserve the HOLD state even if a job starts on the node.
|
|
if node.state != nodedb.HOLD:
|
|
self.log.info("Setting node id: %s to USED" % node.id)
|
|
node.state = nodedb.USED
|
|
self.nodepool.updateStats(session, node.provider_name)
|
|
|
|
def handleCompletePhase(self, nodename, jobname, result, branch):
|
|
t = NodeCompleteThread(self.nodepool, nodename, jobname, result,
|
|
branch)
|
|
t.start()
|
|
|
|
|
|
class JobTracker(object):
|
|
def __init__(self):
|
|
self._running_jobs = set()
|
|
|
|
@property
|
|
def running_jobs(self):
|
|
return list(self._running_jobs)
|
|
|
|
def addJob(self, job):
|
|
self._running_jobs.add(job)
|
|
job.addCompletionHandler(self._running_jobs.remove)
|
|
|
|
|
|
class GearmanClient(gear.Client):
|
|
def __init__(self):
|
|
super(GearmanClient, self).__init__(client_id='nodepool')
|
|
self.__log = logging.getLogger("nodepool.GearmanClient")
|
|
|
|
def getNeededWorkers(self):
|
|
needed_workers = {}
|
|
job_worker_map = {}
|
|
unspecified_jobs = {}
|
|
for connection in self.active_connections:
|
|
try:
|
|
req = gear.StatusAdminRequest()
|
|
connection.sendAdminRequest(req, timeout=300)
|
|
except Exception:
|
|
self.__log.exception("Exception while listing functions")
|
|
self._lostConnection(connection)
|
|
continue
|
|
for line in req.response.split('\n'):
|
|
parts = [x.strip() for x in line.split('\t')]
|
|
# parts[0] - function name
|
|
# parts[1] - total jobs queued (including building)
|
|
# parts[2] - jobs building
|
|
# parts[3] - workers registered
|
|
if not parts or parts[0] == '.':
|
|
continue
|
|
if not parts[0].startswith('build:'):
|
|
continue
|
|
function = parts[0][len('build:'):]
|
|
# total jobs in queue (including building jobs)
|
|
# NOTE(jhesketh): Jobs that are being built are accounted for
|
|
# in the demand algorithm by subtracting the running nodes.
|
|
# If there are foreign (to nodepool) workers accepting jobs
|
|
# the demand will be higher than actually required. However
|
|
# better to have too many than too few and if you have a
|
|
# foreign worker this may be desired.
|
|
try:
|
|
queued = int(parts[1])
|
|
except ValueError as e:
|
|
self.__log.warn(
|
|
'Server returned non-integer value in status. (%s)' %
|
|
str(e))
|
|
queued = 0
|
|
if queued > 0:
|
|
self.__log.debug("Function: %s queued: %s" % (function,
|
|
queued))
|
|
if ':' in function:
|
|
fparts = function.split(':')
|
|
# fparts[0] - function name
|
|
# fparts[1] - target node [type]
|
|
job = fparts[-2]
|
|
worker = fparts[-1]
|
|
workers = job_worker_map.get(job, [])
|
|
workers.append(worker)
|
|
job_worker_map[job] = workers
|
|
if queued > 0:
|
|
needed_workers[worker] = (
|
|
needed_workers.get(worker, 0) + queued)
|
|
elif queued > 0:
|
|
job = function
|
|
unspecified_jobs[job] = (unspecified_jobs.get(job, 0) +
|
|
queued)
|
|
for job, queued in unspecified_jobs.items():
|
|
workers = job_worker_map.get(job)
|
|
if not workers:
|
|
continue
|
|
worker = workers[0]
|
|
needed_workers[worker] = (needed_workers.get(worker, 0) +
|
|
queued)
|
|
return needed_workers
|
|
|
|
def handleWorkComplete(self, packet):
|
|
job = super(GearmanClient, self).handleWorkComplete(packet)
|
|
job.onCompleted()
|
|
|
|
def handleWorkFail(self, packet):
|
|
job = super(GearmanClient, self).handleWorkFail(packet)
|
|
job.onFailed()
|
|
|
|
def handleWorkException(self, packet):
|
|
job = super(GearmanClient, self).handleWorkException(packet)
|
|
job.onFailed()
|
|
|
|
def handleDisconnect(self, job):
|
|
super(GearmanClient, self).handleDisconnect(job)
|
|
job.onDisconnect()
|
|
|
|
def handleWorkStatus(self, packet):
|
|
job = super(GearmanClient, self).handleWorkStatus(packet)
|
|
job.onWorkStatus()
|
|
|
|
|
|
class InstanceDeleter(threading.Thread):
|
|
log = logging.getLogger("nodepool.InstanceDeleter")
|
|
|
|
def __init__(self, nodepool, provider_name, external_id):
|
|
threading.Thread.__init__(self, name='InstanceDeleter for %s %s' %
|
|
(provider_name, external_id))
|
|
self.nodepool = nodepool
|
|
self.provider_name = provider_name
|
|
self.external_id = external_id
|
|
|
|
def run(self):
|
|
try:
|
|
self.nodepool._deleteInstance(self.provider_name,
|
|
self.external_id)
|
|
except Exception:
|
|
self.log.exception("Exception deleting instance %s from %s:" %
|
|
(self.external_id, self.provider_name))
|
|
|
|
|
|
class NodeDeleter(threading.Thread):
|
|
log = logging.getLogger("nodepool.NodeDeleter")
|
|
|
|
def __init__(self, nodepool, node_id):
|
|
threading.Thread.__init__(self, name='NodeDeleter for %s' % node_id)
|
|
self.node_id = node_id
|
|
self.nodepool = nodepool
|
|
|
|
def run(self):
|
|
try:
|
|
with self.nodepool.getDB().getSession() as session:
|
|
node = session.getNode(self.node_id)
|
|
self.nodepool._deleteNode(session, node)
|
|
except Exception:
|
|
self.log.exception("Exception deleting node %s:" %
|
|
self.node_id)
|
|
|
|
|
|
class NodeLauncher(threading.Thread):
|
|
log = logging.getLogger("nodepool.NodeLauncher")
|
|
|
|
def __init__(self, nodepool, provider, label, target, node_id, timeout,
|
|
launch_timeout):
|
|
threading.Thread.__init__(self, name='NodeLauncher for %s' % node_id)
|
|
self.provider = provider
|
|
self.label = label
|
|
self.image = provider.images[label.image]
|
|
self.target = target
|
|
self.node_id = node_id
|
|
self.timeout = timeout
|
|
self.nodepool = nodepool
|
|
self.launch_timeout = launch_timeout
|
|
|
|
def run(self):
|
|
try:
|
|
self._run()
|
|
except Exception:
|
|
self.log.exception("Exception in run method:")
|
|
|
|
def _run(self):
|
|
with self.nodepool.getDB().getSession() as session:
|
|
self.log.debug("Launching node id: %s" % self.node_id)
|
|
try:
|
|
self.node = session.getNode(self.node_id)
|
|
self.manager = self.nodepool.getProviderManager(self.provider)
|
|
except Exception:
|
|
self.log.exception("Exception preparing to launch node id: %s:"
|
|
% self.node_id)
|
|
return
|
|
|
|
try:
|
|
start_time = time.time()
|
|
dt = self.launchNode(session)
|
|
failed = False
|
|
statsd_key = 'ready'
|
|
self.log.debug('Node %s ready in provider: %s' %
|
|
(self.node_id, self.provider.name))
|
|
except exceptions.TimeoutException as e:
|
|
# Don't log exception for timeouts. Each one has
|
|
# a specific Exception, and we know it's a timeout, so
|
|
# the traceback in the log is just noise
|
|
self.log.error("Timeout launching node id: %s "
|
|
"in provider: %s error: %s" %
|
|
(self.node_id, self.provider.name,
|
|
str(e)))
|
|
dt = int((time.time() - start_time) * 1000)
|
|
failed = True
|
|
statsd_key = e.statsd_key
|
|
except Exception as e:
|
|
self.log.exception("%s launching node id: %s "
|
|
"in provider: %s error:" %
|
|
(e.__class__.__name__,
|
|
self.node_id, self.provider.name))
|
|
dt = int((time.time() - start_time) * 1000)
|
|
failed = True
|
|
if hasattr(e, 'statsd_key'):
|
|
statsd_key = e.statsd_key
|
|
else:
|
|
statsd_key = 'error.unknown'
|
|
|
|
try:
|
|
self.nodepool.launchStats(statsd_key, dt, self.image.name,
|
|
self.provider.name,
|
|
self.target.name,
|
|
self.node.az)
|
|
except Exception:
|
|
self.log.exception("Exception reporting launch stats:")
|
|
|
|
if failed:
|
|
try:
|
|
self.nodepool.deleteNode(self.node_id)
|
|
except Exception:
|
|
self.log.exception("Exception deleting node id: %s:" %
|
|
self.node_id)
|
|
|
|
def launchNode(self, session):
|
|
start_time = time.time()
|
|
timestamp = int(start_time)
|
|
|
|
hostname = self.target.hostname.format(
|
|
label=self.label, provider=self.provider, node_id=self.node.id,
|
|
timestamp=str(timestamp))
|
|
self.node.hostname = hostname
|
|
self.node.nodename = hostname.split('.')[0]
|
|
self.node.target_name = self.target.name
|
|
|
|
snap_image = session.getCurrentSnapshotImage(
|
|
self.provider.name, self.image.name)
|
|
if not snap_image:
|
|
raise LaunchNodepoolException("Unable to find current snapshot "
|
|
"image %s in %s" %
|
|
(self.image.name,
|
|
self.provider.name))
|
|
|
|
self.log.info("Creating server with hostname %s in %s from image %s "
|
|
"for node id: %s" % (hostname, self.provider.name,
|
|
self.image.name, self.node_id))
|
|
server_id = self.manager.createServer(
|
|
hostname, self.image.min_ram, snap_image.external_id,
|
|
name_filter=self.image.name_filter, az=self.node.az,
|
|
config_drive=self.image.config_drive,
|
|
nodepool_node_id=self.node_id,
|
|
nodepool_image_name=self.image.name)
|
|
self.node.external_id = server_id
|
|
session.commit()
|
|
|
|
self.log.debug("Waiting for server %s for node id: %s" %
|
|
(server_id, self.node.id))
|
|
server = self.manager.waitForServer(server_id, self.launch_timeout)
|
|
if server['status'] != 'ACTIVE':
|
|
raise LaunchStatusException("Server %s for node id: %s "
|
|
"status: %s" %
|
|
(server_id, self.node.id,
|
|
server['status']))
|
|
|
|
ip = server.get('public_v4')
|
|
ip_v6 = server.get('public_v6')
|
|
if self.provider.ipv6_preferred:
|
|
if ip_v6:
|
|
ip = ip_v6
|
|
else:
|
|
self.log.warning('Preferred ipv6 not available, '
|
|
'falling back to ipv4.')
|
|
if not ip and self.manager.hasExtension('os-floating-ips'):
|
|
ip = self.manager.addPublicIP(server_id,
|
|
pool=self.provider.pool)
|
|
if not ip:
|
|
self.log.debug(
|
|
"Server data for failed IP: %s" % pprint.pformat(
|
|
server))
|
|
raise LaunchNetworkException("Unable to find public IP of server")
|
|
|
|
self.node.ip_private = server.get('private_v4')
|
|
self.node.ip = ip
|
|
self.log.debug("Node id: %s is running, ipv4: %s, ipv6: %s" %
|
|
(self.node.id, server.get('public_v4'),
|
|
server.get('public_v6')))
|
|
|
|
self.log.debug("Node id: %s testing ssh at ip: %s" %
|
|
(self.node.id, ip))
|
|
connect_kwargs = dict(key_filename=self.image.private_key)
|
|
if not utils.ssh_connect(ip, self.image.username,
|
|
connect_kwargs=connect_kwargs,
|
|
timeout=self.timeout):
|
|
raise LaunchAuthException("Unable to connect via ssh")
|
|
|
|
# Save the elapsed time for statsd
|
|
dt = int((time.time() - start_time) * 1000)
|
|
|
|
if self.label.subnodes:
|
|
self.log.info("Node id: %s is waiting on subnodes" % self.node.id)
|
|
|
|
while ((time.time() - start_time) < (NODE_CLEANUP - 60)):
|
|
session.commit()
|
|
ready_subnodes = [n for n in self.node.subnodes
|
|
if n.state == nodedb.READY]
|
|
if len(ready_subnodes) == self.label.subnodes:
|
|
break
|
|
time.sleep(5)
|
|
|
|
nodelist = []
|
|
for subnode in self.node.subnodes:
|
|
nodelist.append(('sub', subnode))
|
|
nodelist.append(('primary', self.node))
|
|
|
|
self.writeNodepoolInfo(nodelist)
|
|
if self.label.ready_script:
|
|
self.runReadyScript(nodelist)
|
|
|
|
# Do this before adding to jenkins to avoid a race where
|
|
# Jenkins might immediately use the node before we've updated
|
|
# the state:
|
|
if self.target.jenkins_test_job:
|
|
self.node.state = nodedb.TEST
|
|
self.log.info("Node id: %s is in testing" % self.node.id)
|
|
else:
|
|
self.node.state = nodedb.READY
|
|
self.log.info("Node id: %s is ready" % self.node.id)
|
|
self.nodepool.updateStats(session, self.provider.name)
|
|
|
|
if self.target.jenkins_url:
|
|
self.log.debug("Adding node id: %s to jenkins" % self.node.id)
|
|
self.createJenkinsNode()
|
|
self.log.info("Node id: %s added to jenkins" % self.node.id)
|
|
|
|
return dt
|
|
|
|
def createJenkinsNode(self):
|
|
jenkins = self.nodepool.getJenkinsManager(self.target)
|
|
|
|
args = dict(name=self.node.nodename,
|
|
host=self.node.ip,
|
|
description='Dynamic single use %s node' % self.label.name,
|
|
executors=1,
|
|
root=self.image.user_home)
|
|
if not self.target.jenkins_test_job:
|
|
args['labels'] = self.label.name
|
|
if self.target.jenkins_credentials_id:
|
|
args['credentials_id'] = self.target.jenkins_credentials_id
|
|
else:
|
|
args['username'] = self.image.username
|
|
args['private_key'] = self.image.private_key
|
|
|
|
jenkins.createNode(**args)
|
|
|
|
if self.target.jenkins_test_job:
|
|
params = dict(NODE=self.node.nodename)
|
|
jenkins.startBuild(self.target.jenkins_test_job, params)
|
|
|
|
def writeNodepoolInfo(self, nodelist):
|
|
key = paramiko.RSAKey.generate(2048)
|
|
public_key = key.get_name() + ' ' + key.get_base64()
|
|
|
|
for role, n in nodelist:
|
|
connect_kwargs = dict(key_filename=self.image.private_key)
|
|
host = utils.ssh_connect(n.ip, self.image.username,
|
|
connect_kwargs=connect_kwargs,
|
|
timeout=self.timeout)
|
|
if not host:
|
|
raise Exception("Unable to log in via SSH")
|
|
|
|
host.ssh("test for config dir", "ls /etc/nodepool")
|
|
|
|
ftp = host.client.open_sftp()
|
|
|
|
# The Role of this node
|
|
f = ftp.open('/etc/nodepool/role', 'w')
|
|
f.write(role + '\n')
|
|
f.close()
|
|
# The IP of this node
|
|
f = ftp.open('/etc/nodepool/node', 'w')
|
|
f.write(n.ip + '\n')
|
|
f.close()
|
|
# The private IP of this node
|
|
f = ftp.open('/etc/nodepool/node_private', 'w')
|
|
f.write(n.ip_private + '\n')
|
|
f.close()
|
|
# The IP of the primary node of this node set
|
|
f = ftp.open('/etc/nodepool/primary_node', 'w')
|
|
f.write(self.node.ip + '\n')
|
|
f.close()
|
|
# The private IP of the primary node of this node set
|
|
f = ftp.open('/etc/nodepool/primary_node_private', 'w')
|
|
f.write(self.node.ip_private + '\n')
|
|
f.close()
|
|
# The IPs of all sub nodes in this node set
|
|
f = ftp.open('/etc/nodepool/sub_nodes', 'w')
|
|
for subnode in self.node.subnodes:
|
|
f.write(subnode.ip + '\n')
|
|
f.close()
|
|
# The private IPs of all sub nodes in this node set
|
|
f = ftp.open('/etc/nodepool/sub_nodes_private', 'w')
|
|
for subnode in self.node.subnodes:
|
|
f.write(subnode.ip_private + '\n')
|
|
f.close()
|
|
# The SSH key for this node set
|
|
f = ftp.open('/etc/nodepool/id_rsa', 'w')
|
|
key.write_private_key(f)
|
|
f.close()
|
|
f = ftp.open('/etc/nodepool/id_rsa.pub', 'w')
|
|
f.write(public_key)
|
|
f.close()
|
|
# Provider information for this node set
|
|
f = ftp.open('/etc/nodepool/provider', 'w')
|
|
f.write('NODEPOOL_PROVIDER=%s\n' % self.provider.name)
|
|
f.write('NODEPOOL_CLOUD=%s\n' % self.provider.cloud_config.name)
|
|
f.write('NODEPOOL_REGION=%s\n' % (
|
|
self.provider.region_name or '',))
|
|
f.write('NODEPOOL_AZ=%s\n' % (self.node.az or '',))
|
|
f.close()
|
|
# The instance UUID for this node
|
|
f = ftp.open('/etc/nodepool/uuid', 'w')
|
|
f.write(n.external_id + '\n')
|
|
f.close()
|
|
|
|
ftp.close()
|
|
|
|
def runReadyScript(self, nodelist):
|
|
for role, n in nodelist:
|
|
connect_kwargs = dict(key_filename=self.image.private_key)
|
|
host = utils.ssh_connect(n.ip, self.image.username,
|
|
connect_kwargs=connect_kwargs,
|
|
timeout=self.timeout)
|
|
if not host:
|
|
raise Exception("Unable to log in via SSH")
|
|
|
|
env_vars = ''
|
|
for k, v in os.environ.items():
|
|
if k.startswith('NODEPOOL_'):
|
|
env_vars += ' %s="%s"' % (k, v)
|
|
host.ssh("run ready script",
|
|
"cd /opt/nodepool-scripts && %s ./%s %s" %
|
|
(env_vars, self.label.ready_script, n.hostname),
|
|
output=True)
|
|
|
|
|
|
class SubNodeLauncher(threading.Thread):
|
|
log = logging.getLogger("nodepool.SubNodeLauncher")
|
|
|
|
def __init__(self, nodepool, provider, label, subnode_id,
|
|
node_id, node_target_name, timeout, launch_timeout, node_az):
|
|
threading.Thread.__init__(self, name='SubNodeLauncher for %s'
|
|
% subnode_id)
|
|
self.provider = provider
|
|
self.label = label
|
|
self.image = provider.images[label.image]
|
|
self.node_target_name = node_target_name
|
|
self.subnode_id = subnode_id
|
|
self.node_id = node_id
|
|
self.timeout = timeout
|
|
self.nodepool = nodepool
|
|
self.launch_timeout = launch_timeout
|
|
self.node_az = node_az
|
|
|
|
def run(self):
|
|
try:
|
|
self._run()
|
|
except Exception:
|
|
self.log.exception("Exception in run method:")
|
|
|
|
def _run(self):
|
|
with self.nodepool.getDB().getSession() as session:
|
|
self.log.debug("Launching subnode id: %s for node id: %s" %
|
|
(self.subnode_id, self.node_id))
|
|
try:
|
|
self.subnode = session.getSubNode(self.subnode_id)
|
|
self.manager = self.nodepool.getProviderManager(self.provider)
|
|
except Exception:
|
|
self.log.exception("Exception preparing to launch subnode "
|
|
"id: %s for node id: %s:"
|
|
% (self.subnode_id, self.node_id))
|
|
return
|
|
|
|
try:
|
|
start_time = time.time()
|
|
dt = self.launchSubNode(session)
|
|
failed = False
|
|
statsd_key = 'ready'
|
|
except Exception as e:
|
|
self.log.exception("%s launching subnode id: %s "
|
|
"for node id: %s in provider: %s error:" %
|
|
(e.__class__.__name__, self.subnode_id,
|
|
self.node_id, self.provider.name))
|
|
dt = int((time.time() - start_time) * 1000)
|
|
failed = True
|
|
if hasattr(e, 'statsd_key'):
|
|
statsd_key = e.statsd_key
|
|
else:
|
|
statsd_key = 'error.unknown'
|
|
|
|
try:
|
|
self.nodepool.launchStats(statsd_key, dt, self.image.name,
|
|
self.provider.name,
|
|
self.node_target_name,
|
|
self.node_az)
|
|
except Exception:
|
|
self.log.exception("Exception reporting launch stats:")
|
|
|
|
if failed:
|
|
try:
|
|
self.nodepool.deleteSubNode(self.subnode, self.manager)
|
|
except Exception:
|
|
self.log.exception("Exception deleting subnode id: %s: "
|
|
"for node id: %s:" %
|
|
(self.subnode_id, self.node_id))
|
|
return
|
|
|
|
def launchSubNode(self, session):
|
|
start_time = time.time()
|
|
timestamp = int(start_time)
|
|
|
|
target = self.nodepool.config.targets[self.node_target_name]
|
|
hostname = target.subnode_hostname.format(
|
|
label=self.label, provider=self.provider, node_id=self.node_id,
|
|
subnode_id=self.subnode_id, timestamp=str(timestamp))
|
|
self.subnode.hostname = hostname
|
|
self.subnode.nodename = hostname.split('.')[0]
|
|
|
|
snap_image = session.getCurrentSnapshotImage(
|
|
self.provider.name, self.image.name)
|
|
if not snap_image:
|
|
raise LaunchNodepoolException("Unable to find current snapshot "
|
|
"image %s in %s" %
|
|
(self.image.name,
|
|
self.provider.name))
|
|
|
|
self.log.info("Creating server with hostname %s in %s from image %s "
|
|
"for subnode id: %s for node id: %s"
|
|
% (hostname, self.provider.name,
|
|
self.image.name, self.subnode_id, self.node_id))
|
|
server_id = self.manager.createServer(
|
|
hostname, self.image.min_ram, snap_image.external_id,
|
|
name_filter=self.image.name_filter, az=self.node_az,
|
|
config_drive=self.image.config_drive,
|
|
nodepool_node_id=self.node_id,
|
|
nodepool_image_name=self.image.name)
|
|
self.subnode.external_id = server_id
|
|
session.commit()
|
|
|
|
self.log.debug("Waiting for server %s for subnode id: %s for "
|
|
"node id: %s" %
|
|
(server_id, self.subnode_id, self.node_id))
|
|
server = self.manager.waitForServer(server_id, self.launch_timeout)
|
|
if server['status'] != 'ACTIVE':
|
|
raise LaunchStatusException("Server %s for subnode id: "
|
|
"%s for node id: %s "
|
|
"status: %s" %
|
|
(server_id, self.subnode_id,
|
|
self.node_id, server['status']))
|
|
|
|
ip = server.get('public_v4')
|
|
ip_v6 = server.get('public_v6')
|
|
if self.provider.ipv6_preferred:
|
|
if ip_v6:
|
|
ip = ip_v6
|
|
else:
|
|
self.log.warning('Preferred ipv6 not available, '
|
|
'falling back to ipv4.')
|
|
if not ip and self.manager.hasExtension('os-floating-ips'):
|
|
ip = self.manager.addPublicIP(server_id,
|
|
pool=self.provider.pool)
|
|
if not ip:
|
|
raise LaunchNetworkException("Unable to find public IP of server")
|
|
|
|
self.subnode.ip_private = server.get('private_v4')
|
|
self.subnode.ip = ip
|
|
self.log.debug("Subnode id: %s for node id: %s is running, "
|
|
"ipv4: %s, ipv6: %s" %
|
|
(self.subnode_id, self.node_id, server.get('public_v4'),
|
|
server.get('public_v6')))
|
|
|
|
self.log.debug("Subnode id: %s for node id: %s testing ssh at ip: %s" %
|
|
(self.subnode_id, self.node_id, ip))
|
|
connect_kwargs = dict(key_filename=self.image.private_key)
|
|
if not utils.ssh_connect(ip, self.image.username,
|
|
connect_kwargs=connect_kwargs,
|
|
timeout=self.timeout):
|
|
raise LaunchAuthException("Unable to connect via ssh")
|
|
|
|
# Save the elapsed time for statsd
|
|
dt = int((time.time() - start_time) * 1000)
|
|
|
|
self.subnode.state = nodedb.READY
|
|
self.log.info("Subnode id: %s for node id: %s is ready"
|
|
% (self.subnode_id, self.node_id))
|
|
self.nodepool.updateStats(session, self.provider.name)
|
|
|
|
return dt
|
|
|
|
|
|
class ImageDeleter(threading.Thread):
|
|
log = logging.getLogger("nodepool.ImageDeleter")
|
|
|
|
def __init__(self, nodepool, snap_image_id):
|
|
threading.Thread.__init__(self,
|
|
name='ImageDeleter for %s' % snap_image_id)
|
|
self.snap_image_id = snap_image_id
|
|
self.nodepool = nodepool
|
|
|
|
def run(self):
|
|
try:
|
|
with self.nodepool.getDB().getSession() as session:
|
|
snap_image = session.getSnapshotImage(self.snap_image_id)
|
|
self.nodepool._deleteImage(session, snap_image)
|
|
except Exception:
|
|
self.log.exception("Exception deleting image %s:" %
|
|
self.snap_image_id)
|
|
|
|
|
|
class ImageUpdater(threading.Thread):
|
|
log = logging.getLogger("nodepool.ImageUpdater")
|
|
|
|
def __init__(self, nodepool, provider, image, snap_image_id):
|
|
threading.Thread.__init__(self, name='ImageUpdater for %s' %
|
|
snap_image_id)
|
|
self.provider = provider
|
|
self.image = image
|
|
self.snap_image_id = snap_image_id
|
|
self.nodepool = nodepool
|
|
self.scriptdir = self.nodepool.config.scriptdir
|
|
self.elementsdir = self.nodepool.config.elementsdir
|
|
self.imagesdir = self.nodepool.config.imagesdir
|
|
self.statsd = stats.get_client()
|
|
|
|
def run(self):
|
|
try:
|
|
self._run()
|
|
except Exception:
|
|
self.log.exception("Exception in run method:")
|
|
|
|
def _run(self):
|
|
with self.nodepool.getDB().getSession() as session:
|
|
self.log.debug("Updating image %s in %s " % (self.image.name,
|
|
self.provider.name))
|
|
try:
|
|
self.snap_image = session.getSnapshotImage(
|
|
self.snap_image_id)
|
|
self.manager = self.nodepool.getProviderManager(self.provider)
|
|
except Exception:
|
|
self.log.exception("Exception preparing to update image %s "
|
|
"in %s:" % (self.image.name,
|
|
self.provider.name))
|
|
return
|
|
|
|
try:
|
|
self.updateImage(session)
|
|
except Exception:
|
|
self.log.exception("Exception updating image %s in %s:" %
|
|
(self.image.name, self.provider.name))
|
|
try:
|
|
if self.snap_image:
|
|
self.nodepool.deleteImage(self.snap_image.id)
|
|
except Exception:
|
|
self.log.exception("Exception deleting image id: %s:" %
|
|
self.snap_image.id)
|
|
return
|
|
|
|
|
|
class SnapshotImageUpdater(ImageUpdater):
|
|
|
|
log = logging.getLogger("nodepool.SnapshotImageUpdater")
|
|
|
|
def updateImage(self, session):
|
|
start_time = time.time()
|
|
timestamp = int(start_time)
|
|
|
|
hostname = self.provider.template_hostname.format(
|
|
provider=self.provider, image=self.image, timestamp=str(timestamp))
|
|
self.log.info("Creating image id: %s with hostname %s for %s in %s" %
|
|
(self.snap_image.id, hostname, self.image.name,
|
|
self.provider.name))
|
|
if self.provider.keypair:
|
|
key_name = self.provider.keypair
|
|
key = None
|
|
use_password = False
|
|
elif self.manager.hasExtension('os-keypairs'):
|
|
key_name = hostname.split('.')[0]
|
|
key = self.manager.addKeypair(key_name)
|
|
use_password = False
|
|
else:
|
|
key_name = None
|
|
key = None
|
|
use_password = True
|
|
|
|
uuid_pattern = 'hex{8}-(hex{4}-){3}hex{12}'.replace('hex',
|
|
'[0-9a-fA-F]')
|
|
if re.match(uuid_pattern, self.image.base_image):
|
|
image_name = None
|
|
image_id = self.image.base_image
|
|
else:
|
|
image_name = self.image.base_image
|
|
image_id = None
|
|
try:
|
|
server_id = self.manager.createServer(
|
|
hostname, self.image.min_ram, image_name=image_name,
|
|
key_name=key_name, name_filter=self.image.name_filter,
|
|
image_id=image_id, config_drive=self.image.config_drive,
|
|
nodepool_snapshot_image_id=self.snap_image.id)
|
|
except Exception:
|
|
if (self.manager.hasExtension('os-keypairs') and
|
|
not self.provider.keypair):
|
|
for kp in self.manager.listKeypairs():
|
|
if kp['name'] == key_name:
|
|
self.log.debug(
|
|
'Deleting keypair for failed image build %s' %
|
|
self.snap_image.id)
|
|
self.manager.deleteKeypair(kp['name'])
|
|
raise
|
|
|
|
self.snap_image.hostname = hostname
|
|
self.snap_image.version = timestamp
|
|
self.snap_image.server_external_id = server_id
|
|
session.commit()
|
|
|
|
self.log.debug("Image id: %s waiting for server %s" %
|
|
(self.snap_image.id, server_id))
|
|
server = self.manager.waitForServer(server_id)
|
|
if server['status'] != 'ACTIVE':
|
|
raise Exception("Server %s for image id: %s status: %s" %
|
|
(server_id, self.snap_image.id, server['status']))
|
|
|
|
ip = server.get('public_v4')
|
|
ip_v6 = server.get('public_v6')
|
|
if self.provider.ipv6_preferred:
|
|
if ip_v6:
|
|
ip = ip_v6
|
|
else:
|
|
self.log.warning('Preferred ipv6 not available, '
|
|
'falling back to ipv4.')
|
|
if not ip and self.manager.hasExtension('os-floating-ips'):
|
|
ip = self.manager.addPublicIP(server_id,
|
|
pool=self.provider.pool)
|
|
if not ip:
|
|
raise Exception("Unable to find public IP of server")
|
|
server['public_ip'] = ip
|
|
|
|
self.bootstrapServer(server, key, use_password=use_password)
|
|
|
|
image_id = self.manager.createImage(server_id, hostname,
|
|
self.image.meta)
|
|
self.snap_image.external_id = image_id
|
|
session.commit()
|
|
self.log.debug("Image id: %s building image %s" %
|
|
(self.snap_image.id, image_id))
|
|
# It can take a _very_ long time for Rackspace 1.0 to save an image
|
|
image = self.manager.waitForImage(image_id, IMAGE_TIMEOUT)
|
|
if image['status'] != 'ACTIVE':
|
|
raise Exception("Image %s for image id: %s status: %s" %
|
|
(image_id, self.snap_image.id, image['status']))
|
|
|
|
if self.statsd:
|
|
dt = int((time.time() - start_time) * 1000)
|
|
key = 'nodepool.image_update.%s.%s' % (self.image.name,
|
|
self.provider.name)
|
|
self.statsd.timing(key, dt)
|
|
self.statsd.incr(key)
|
|
|
|
self.snap_image.state = nodedb.READY
|
|
session.commit()
|
|
self.log.info("Image %s in %s is ready" % (hostname,
|
|
self.provider.name))
|
|
|
|
try:
|
|
# We made the snapshot, try deleting the server, but it's okay
|
|
# if we fail. The reap script will find it and try again.
|
|
self.manager.cleanupServer(server_id)
|
|
self.manager.waitForServerDeletion(server_id)
|
|
except:
|
|
self.log.exception("Exception encountered deleting server"
|
|
" %s for image id: %s" %
|
|
(server_id, self.snap_image.id))
|
|
|
|
def bootstrapServer(self, server, key, use_password=False):
|
|
log = logging.getLogger("nodepool.image.build.%s.%s" %
|
|
(self.provider.name, self.image.name))
|
|
|
|
ssh_kwargs = dict(log=log)
|
|
if not use_password:
|
|
ssh_kwargs['pkey'] = key
|
|
else:
|
|
ssh_kwargs['password'] = server['admin_pass']
|
|
|
|
host = utils.ssh_connect(server['public_ip'], 'root', ssh_kwargs,
|
|
timeout=CONNECT_TIMEOUT)
|
|
|
|
if not host:
|
|
# We have connected to the node but couldn't do anything as root
|
|
# try distro specific users, since we know ssh is up (a timeout
|
|
# didn't occur), we can connect with a very sort timeout.
|
|
for username in ['ubuntu', 'fedora', 'cloud-user', 'centos',
|
|
'debian']:
|
|
try:
|
|
host = utils.ssh_connect(server['public_ip'], username,
|
|
ssh_kwargs,
|
|
timeout=10)
|
|
if host:
|
|
break
|
|
except:
|
|
continue
|
|
|
|
if not host:
|
|
raise Exception("Unable to log in via SSH")
|
|
|
|
# /etc/nodepool is world writable because by the time we write
|
|
# the contents after the node is launched, we may not have
|
|
# sudo access any more.
|
|
host.ssh("make config dir", "sudo mkdir -p /etc/nodepool")
|
|
host.ssh("chmod config dir", "sudo chmod 0777 /etc/nodepool")
|
|
if self.scriptdir:
|
|
host.ssh("make scripts dir", "mkdir -p scripts")
|
|
for fname in os.listdir(self.scriptdir):
|
|
path = os.path.join(self.scriptdir, fname)
|
|
if not os.path.isfile(path):
|
|
continue
|
|
host.scp(path, 'scripts/%s' % fname)
|
|
host.ssh("move scripts to opt",
|
|
"sudo mv scripts /opt/nodepool-scripts")
|
|
host.ssh("set scripts permissions",
|
|
"sudo chmod -R a+rx /opt/nodepool-scripts")
|
|
|
|
if self.image.setup:
|
|
env_vars = ''
|
|
for k, v in os.environ.items():
|
|
if k.startswith('NODEPOOL_'):
|
|
env_vars += ' %s="%s"' % (k, v)
|
|
|
|
# non-interactive "cloud-user" type logins can have very
|
|
# restrictive paths of just /bin:/usr/bin. Because on
|
|
# some hosts we log-in as root and others as a user, we
|
|
# standarise the path here
|
|
set_path = "export PATH=" \
|
|
"/usr/local/sbin:/sbin:/usr/sbin:" \
|
|
"/usr/local/bin:/bin:/usr/bin"
|
|
host.ssh("run setup script",
|
|
"%s; cd /opt/nodepool-scripts "
|
|
"&& %s ./%s %s && sync && sleep 5" %
|
|
(set_path, env_vars, self.image.setup, server['name']))
|
|
|
|
|
|
class NodePool(threading.Thread):
|
|
log = logging.getLogger("nodepool.NodePool")
|
|
|
|
def __init__(self, securefile, configfile,
|
|
watermark_sleep=WATERMARK_SLEEP):
|
|
threading.Thread.__init__(self, name='NodePool')
|
|
self.securefile = securefile
|
|
self.configfile = configfile
|
|
self.watermark_sleep = watermark_sleep
|
|
self._stopped = False
|
|
self.config = None
|
|
self.zmq_context = None
|
|
self.gearman_client = None
|
|
self.apsched = None
|
|
self.statsd = stats.get_client()
|
|
self._delete_threads = {}
|
|
self._delete_threads_lock = threading.Lock()
|
|
self._image_delete_threads = {}
|
|
self._image_delete_threads_lock = threading.Lock()
|
|
self._instance_delete_threads = {}
|
|
self._instance_delete_threads_lock = threading.Lock()
|
|
self._image_build_jobs = JobTracker()
|
|
|
|
def stop(self):
|
|
self._stopped = True
|
|
if self.config:
|
|
for z in self.config.zmq_publishers.values():
|
|
z.listener.stop()
|
|
z.listener.join()
|
|
if self.zmq_context:
|
|
self.zmq_context.destroy()
|
|
if self.apsched and self.apsched.running:
|
|
self.apsched.shutdown()
|
|
|
|
def waitForBuiltImages(self):
|
|
self.log.debug("Waiting for images to complete building.")
|
|
for job in self._image_build_jobs.running_jobs:
|
|
job.waitForCompletion()
|
|
self.log.debug("Done waiting for images to complete building.")
|
|
|
|
def loadConfig(self):
|
|
self.log.debug("Loading configuration")
|
|
config = nodepool_config.loadConfig(self.configfile)
|
|
nodepool_config.loadSecureConfig(config, self.securefile)
|
|
return config
|
|
|
|
def reconfigureDatabase(self, config):
|
|
if (not self.config) or config.dburi != self.config.dburi:
|
|
config.db = nodedb.NodeDatabase(config.dburi)
|
|
else:
|
|
config.db = self.config.db
|
|
|
|
def reconfigureManagers(self, config, check_targets=True):
|
|
provider_manager.ProviderManager.reconfigure(self.config, config)
|
|
|
|
stop_managers = []
|
|
for t in config.targets.values():
|
|
oldmanager = None
|
|
if self.config:
|
|
oldmanager = self.config.jenkins_managers.get(t.name)
|
|
if oldmanager:
|
|
if (t.jenkins_url != oldmanager.target.jenkins_url or
|
|
t.jenkins_user != oldmanager.target.jenkins_user or
|
|
t.jenkins_apikey != oldmanager.target.jenkins_apikey):
|
|
stop_managers.append(oldmanager)
|
|
oldmanager = None
|
|
if oldmanager:
|
|
config.jenkins_managers[t.name] = oldmanager
|
|
elif t.jenkins_url:
|
|
self.log.debug("Creating new JenkinsManager object "
|
|
"for %s" % t.name)
|
|
config.jenkins_managers[t.name] = \
|
|
jenkins_manager.JenkinsManager(t)
|
|
config.jenkins_managers[t.name].start()
|
|
for oldmanager in stop_managers:
|
|
oldmanager.stop()
|
|
|
|
# only do it if we need to check for targets
|
|
if check_targets:
|
|
for t in config.targets.values():
|
|
if t.jenkins_url:
|
|
try:
|
|
info = config.jenkins_managers[t.name].getInfo()
|
|
if info['quietingDown']:
|
|
self.log.info("Target %s is offline" % t.name)
|
|
t.online = False
|
|
else:
|
|
t.online = True
|
|
except Exception:
|
|
self.log.exception("Unable to check status of %s" %
|
|
t.name)
|
|
t.online = False
|
|
|
|
def reconfigureCrons(self, config):
|
|
cron_map = {
|
|
'image-update': self._doUpdateImages,
|
|
'cleanup': self._doPeriodicCleanup,
|
|
'check': self._doPeriodicCheck,
|
|
}
|
|
|
|
if not self.apsched:
|
|
self.apsched = apscheduler.schedulers.background.BackgroundScheduler()
|
|
self.apsched.start()
|
|
|
|
for c in config.crons.values():
|
|
if ((not self.config) or
|
|
c.timespec != self.config.crons[c.name].timespec):
|
|
if self.config and self.config.crons[c.name].job:
|
|
self.config.crons[c.name].job.remove()
|
|
parts = c.timespec.split()
|
|
if len(parts) > 5:
|
|
second = parts[5]
|
|
else:
|
|
second = None
|
|
minute, hour, dom, month, dow = parts[:5]
|
|
trigger = apscheduler.triggers.cron.CronTrigger(
|
|
day=dom, day_of_week=dow, hour=hour, minute=minute,
|
|
second=second)
|
|
c.job = self.apsched.add_job(
|
|
cron_map[c.name], trigger=trigger)
|
|
else:
|
|
c.job = self.config.crons[c.name].job
|
|
|
|
def reconfigureUpdateListeners(self, config):
|
|
if self.config:
|
|
running = set(self.config.zmq_publishers.keys())
|
|
else:
|
|
running = set()
|
|
|
|
configured = set(config.zmq_publishers.keys())
|
|
if running == configured:
|
|
self.log.debug("ZMQ Listeners do not need to be updated")
|
|
if self.config:
|
|
config.zmq_publishers = self.config.zmq_publishers
|
|
return
|
|
|
|
if self.zmq_context:
|
|
self.log.debug("Stopping listeners")
|
|
self.zmq_context.destroy()
|
|
self.zmq_context = zmq.Context()
|
|
for z in config.zmq_publishers.values():
|
|
self.log.debug("Starting listener for %s" % z.name)
|
|
z.listener = NodeUpdateListener(self, z.name)
|
|
z.listener.start()
|
|
|
|
def reconfigureGearmanClient(self, config):
|
|
if self.config:
|
|
running = set(self.config.gearman_servers.keys())
|
|
else:
|
|
running = set()
|
|
|
|
configured = set(config.gearman_servers.keys())
|
|
if running == configured:
|
|
self.log.debug("Gearman client does not need to be updated")
|
|
if self.config:
|
|
config.gearman_servers = self.config.gearman_servers
|
|
return
|
|
|
|
if self.gearman_client:
|
|
self.log.debug("Stopping gearman client")
|
|
self.gearman_client.shutdown()
|
|
self.gearman_client = None
|
|
if configured:
|
|
self.gearman_client = GearmanClient()
|
|
for g in config.gearman_servers.values():
|
|
self.log.debug("Adding gearman server %s" % g.name)
|
|
self.gearman_client.addServer(g.host, g.port)
|
|
self.gearman_client.waitForServer()
|
|
|
|
def setConfig(self, config):
|
|
self.config = config
|
|
|
|
def getDB(self):
|
|
return self.config.db
|
|
|
|
def getProviderManager(self, provider):
|
|
return self.config.provider_managers[provider.name]
|
|
|
|
def getJenkinsManager(self, target):
|
|
if target.name in self.config.jenkins_managers:
|
|
return self.config.jenkins_managers[target.name]
|
|
else:
|
|
raise KeyError("{0} not in {1}".format(target.name,
|
|
self.config.jenkins_managers.keys()))
|
|
|
|
def getNeededNodes(self, session, allocation_history):
|
|
self.log.debug("Beginning node launch calculation")
|
|
# Get the current demand for nodes.
|
|
if self.gearman_client:
|
|
label_demand = self.gearman_client.getNeededWorkers()
|
|
else:
|
|
label_demand = {}
|
|
|
|
for name, demand in label_demand.items():
|
|
self.log.debug(" Demand from gearman: %s: %s" % (name, demand))
|
|
|
|
online_targets = set()
|
|
for target in self.config.targets.values():
|
|
if not target.online:
|
|
continue
|
|
online_targets.add(target.name)
|
|
|
|
nodes = session.getNodes()
|
|
|
|
def count_nodes(label_name, state):
|
|
return len([n for n in nodes
|
|
if (n.target_name in online_targets and
|
|
n.label_name == label_name and
|
|
n.state == state)])
|
|
|
|
def count_nodes_and_subnodes(provider_name):
|
|
count = 0
|
|
for n in nodes:
|
|
if n.provider_name != provider_name:
|
|
continue
|
|
count += 1 + len(n.subnodes)
|
|
return count
|
|
|
|
# Add a provider for each node provider, along with current
|
|
# capacity
|
|
allocation_providers = {}
|
|
for provider in self.config.providers.values():
|
|
provider_max = provider.max_servers
|
|
n_provider = count_nodes_and_subnodes(provider.name)
|
|
available = provider_max - n_provider
|
|
if available < 0:
|
|
self.log.warning("Provider %s over-allocated: "
|
|
"max-servers %d but counted %d " %
|
|
(provider.name, provider_max, n_provider))
|
|
available = 0
|
|
ap = allocation.AllocationProvider(provider.name, available)
|
|
allocation_providers[provider.name] = ap
|
|
|
|
# calculate demand for labels
|
|
# Actual need is: demand - (ready + building + used)
|
|
# NOTE(jhesketh): This assumes that the nodes in use are actually being
|
|
# used for a job in demand.
|
|
for label in self.config.labels.values():
|
|
start_demand = label_demand.get(label.name, 0)
|
|
n_ready = count_nodes(label.name, nodedb.READY)
|
|
n_building = count_nodes(label.name, nodedb.BUILDING)
|
|
n_used = count_nodes(label.name, nodedb.USED)
|
|
n_test = count_nodes(label.name, nodedb.TEST)
|
|
ready = n_ready + n_building + n_used + n_test
|
|
|
|
capacity = 0
|
|
for provider in label.providers.values():
|
|
capacity += allocation_providers[provider.name].available
|
|
|
|
# Note actual_demand and extra_demand are written this way
|
|
# because max(0, x - y + z) != max(0, x - y) + z.
|
|
# The requested number of nodes minus those already available
|
|
actual_demand = max(0, start_demand - ready)
|
|
# Demand that accomodates extra demand from min-ready value
|
|
extra_demand = max(0, start_demand + label.min_ready - ready)
|
|
# We only request extras for the min ready value if there is
|
|
# clearly capacity for them. This is to avoid the allocator
|
|
# making bad choices spinning up nodes to satisfy min-ready when
|
|
# there is "real" work to do with other nodes.
|
|
if extra_demand <= capacity:
|
|
demand = extra_demand
|
|
else:
|
|
demand = actual_demand
|
|
|
|
label_demand[label.name] = demand
|
|
self.log.debug(" Deficit: %s: %s "
|
|
"(start: %s min-ready: %s ready: %s capacity: %s)" %
|
|
(label.name, demand,
|
|
start_demand, label.min_ready, ready, capacity))
|
|
|
|
# "Target-Label-Provider" -- the triplet of info that identifies
|
|
# the source and location of each node. The mapping is
|
|
# AllocationGrantTarget -> TargetLabelProvider, because
|
|
# the allocation system produces AGTs as the final product.
|
|
tlps = {}
|
|
# label_name -> AllocationRequest
|
|
allocation_requests = {}
|
|
# Set up the request values in the allocation system
|
|
for target in self.config.targets.values():
|
|
if not target.online:
|
|
continue
|
|
at = allocation.AllocationTarget(target.name)
|
|
for label in self.config.labels.values():
|
|
ar = allocation_requests.get(label.name)
|
|
if not ar:
|
|
# A request for a certain number of nodes of this
|
|
# label type. We may have already started a
|
|
# request from a previous target-label in this
|
|
# loop.
|
|
ar = allocation.AllocationRequest(label.name,
|
|
label_demand[label.name],
|
|
allocation_history)
|
|
|
|
nodes = session.getNodes(label_name=label.name,
|
|
target_name=target.name)
|
|
allocation_requests[label.name] = ar
|
|
ar.addTarget(at, len(nodes))
|
|
for provider in label.providers.values():
|
|
image = session.getCurrentSnapshotImage(
|
|
provider.name, label.image)
|
|
if image:
|
|
# This request may be supplied by this provider
|
|
# (and nodes from this provider supplying this
|
|
# request should be distributed to this target).
|
|
sr, agt = ar.addProvider(
|
|
allocation_providers[provider.name],
|
|
at, label.subnodes)
|
|
tlps[agt] = (target, label,
|
|
self.config.providers[provider.name])
|
|
else:
|
|
self.log.debug(" %s does not have image %s "
|
|
"for label %s." % (provider.name,
|
|
label.image,
|
|
label.name))
|
|
|
|
self.log.debug(" Allocation requests:")
|
|
for ar in allocation_requests.values():
|
|
self.log.debug(' %s' % ar)
|
|
for sr in ar.sub_requests.values():
|
|
self.log.debug(' %s' % sr)
|
|
|
|
nodes_to_launch = []
|
|
|
|
# Let the allocation system do it's thing, and then examine
|
|
# the AGT objects that it produces.
|
|
self.log.debug(" Grants:")
|
|
for ap in allocation_providers.values():
|
|
ap.makeGrants()
|
|
for g in ap.grants:
|
|
self.log.debug(' %s' % g)
|
|
for agt in g.targets:
|
|
self.log.debug(' %s' % agt)
|
|
tlp = tlps[agt]
|
|
nodes_to_launch.append((tlp, agt.amount))
|
|
|
|
allocation_history.grantsDone()
|
|
|
|
self.log.debug("Finished node launch calculation")
|
|
return nodes_to_launch
|
|
|
|
def getNeededSubNodes(self, session):
|
|
nodes_to_launch = []
|
|
for node in session.getNodes():
|
|
if node.label_name in self.config.labels:
|
|
expected_subnodes = \
|
|
self.config.labels[node.label_name].subnodes
|
|
active_subnodes = len([n for n in node.subnodes
|
|
if n.state != nodedb.DELETE])
|
|
deficit = max(expected_subnodes - active_subnodes, 0)
|
|
if deficit:
|
|
nodes_to_launch.append((node, deficit))
|
|
return nodes_to_launch
|
|
|
|
def updateConfig(self):
|
|
config = self.loadConfig()
|
|
self.reconfigureDatabase(config)
|
|
self.reconfigureManagers(config)
|
|
self.reconfigureUpdateListeners(config)
|
|
self.reconfigureGearmanClient(config)
|
|
self.reconfigureCrons(config)
|
|
self.setConfig(config)
|
|
|
|
def startup(self):
|
|
self.updateConfig()
|
|
|
|
# Currently nodepool can not resume building a node or image
|
|
# after a restart. To clean up, mark all building node and
|
|
# images for deletion when the daemon starts.
|
|
with self.getDB().getSession() as session:
|
|
for node in session.getNodes(state=nodedb.BUILDING):
|
|
self.log.info("Setting building node id: %s to delete "
|
|
"on startup" % node.id)
|
|
node.state = nodedb.DELETE
|
|
|
|
for image in session.getSnapshotImages(state=nodedb.BUILDING):
|
|
self.log.info("Setting building image id: %s to delete "
|
|
"on startup" % image.id)
|
|
image.state = nodedb.DELETE
|
|
|
|
def run(self):
|
|
try:
|
|
self.startup()
|
|
except Exception:
|
|
self.log.exception("Exception in startup:")
|
|
allocation_history = allocation.AllocationHistory()
|
|
while not self._stopped:
|
|
try:
|
|
self.updateConfig()
|
|
with self.getDB().getSession() as session:
|
|
self._run(session, allocation_history)
|
|
except Exception:
|
|
self.log.exception("Exception in main loop:")
|
|
time.sleep(self.watermark_sleep)
|
|
|
|
def _run(self, session, allocation_history):
|
|
self.checkForMissingImages(session)
|
|
|
|
# Make up the subnode deficit first to make sure that an
|
|
# already allocated node has priority in filling its subnodes
|
|
# ahead of new nodes.
|
|
subnodes_to_launch = self.getNeededSubNodes(session)
|
|
for (node, num_to_launch) in subnodes_to_launch:
|
|
self.log.info("Need to launch %s subnodes for node id: %s" %
|
|
(num_to_launch, node.id))
|
|
for i in range(num_to_launch):
|
|
self.launchSubNode(session, node)
|
|
|
|
nodes_to_launch = self.getNeededNodes(session, allocation_history)
|
|
|
|
for (tlp, num_to_launch) in nodes_to_launch:
|
|
(target, label, provider) = tlp
|
|
if (not target.online) or (not num_to_launch):
|
|
continue
|
|
self.log.info("Need to launch %s %s nodes for %s on %s" %
|
|
(num_to_launch, label.name,
|
|
target.name, provider.name))
|
|
for i in range(num_to_launch):
|
|
snap_image = session.getCurrentSnapshotImage(
|
|
provider.name, label.image)
|
|
if not snap_image:
|
|
self.log.debug("No current image for %s on %s"
|
|
% (label.image, provider.name))
|
|
else:
|
|
self.launchNode(session, provider, label, target)
|
|
|
|
def checkForMissingSnapshotImage(self, session, provider, image):
|
|
found = False
|
|
for snap_image in session.getSnapshotImages():
|
|
if (snap_image.provider_name == provider.name and
|
|
snap_image.image_name == image.name and
|
|
snap_image.state in [nodedb.READY,
|
|
nodedb.BUILDING]):
|
|
found = True
|
|
if not found:
|
|
self.log.warning("Missing image %s on %s" %
|
|
(image.name, provider.name))
|
|
self.updateImage(session, provider.name, image.name)
|
|
|
|
def checkForMissingDiskImage(self, session, provider, image):
|
|
found = False
|
|
for dib_image in session.getDibImages():
|
|
if dib_image.image_name != image.diskimage:
|
|
continue
|
|
if dib_image.state != nodedb.READY:
|
|
# This is either building or in an error state
|
|
# that will be handled by periodic cleanup
|
|
return
|
|
found = True
|
|
if not found:
|
|
# only build the image, we'll recheck again
|
|
self.log.warning("Missing disk image %s" % image.name)
|
|
self.buildImage(self.config.diskimages[image.diskimage])
|
|
else:
|
|
found = False
|
|
for snap_image in session.getSnapshotImages():
|
|
if (snap_image.provider_name == provider.name and
|
|
snap_image.image_name == image.name and
|
|
snap_image.state in [nodedb.READY,
|
|
nodedb.BUILDING]):
|
|
found = True
|
|
break
|
|
if not found:
|
|
self.log.warning("Missing image %s on %s" %
|
|
(image.name, provider.name))
|
|
self.uploadImage(session, provider.name,
|
|
image.name)
|
|
|
|
def checkForMissingImage(self, session, provider, image):
|
|
if image.name in self.config.images_in_use:
|
|
if not image.diskimage:
|
|
self.checkForMissingSnapshotImage(session, provider, image)
|
|
else:
|
|
self.checkForMissingDiskImage(session, provider, image)
|
|
|
|
def checkForMissingImages(self, session):
|
|
# If we are missing an image, run the image update function
|
|
# outside of its schedule.
|
|
self.log.debug("Checking missing images.")
|
|
|
|
# this is sorted so we can have a deterministic pass on providers
|
|
# (very useful for testing/debugging)
|
|
providers = sorted(self.config.providers.values(),
|
|
key=lambda x: x.name)
|
|
for provider in providers:
|
|
for image in provider.images.values():
|
|
try:
|
|
self.checkForMissingImage(session, provider, image)
|
|
except Exception:
|
|
self.log.exception("Exception in missing image check:")
|
|
|
|
def _doUpdateImages(self):
|
|
try:
|
|
with self.getDB().getSession() as session:
|
|
self.updateImages(session)
|
|
except Exception:
|
|
self.log.exception("Exception in periodic image update:")
|
|
|
|
def updateImages(self, session):
|
|
self.log.debug("Updating all images.")
|
|
|
|
# first run the snapshot image updates
|
|
for provider in self.config.providers.values():
|
|
for image in provider.images.values():
|
|
if image.name not in self.config.images_in_use:
|
|
continue
|
|
if image.diskimage:
|
|
continue
|
|
self.updateImage(session, provider.name, image.name)
|
|
|
|
needs_build = False
|
|
for diskimage in self.config.diskimages.values():
|
|
if diskimage.name not in self.config.images_in_use:
|
|
continue
|
|
self.buildImage(diskimage)
|
|
needs_build = True
|
|
if needs_build:
|
|
# wait for all builds to finish, to have updated images to upload
|
|
self.waitForBuiltImages()
|
|
|
|
for provider in self.config.providers.values():
|
|
for image in provider.images.values():
|
|
if image.name not in self.config.images_in_use:
|
|
continue
|
|
if not image.diskimage:
|
|
continue
|
|
self.uploadImage(session, provider.name, image.name)
|
|
|
|
def updateImage(self, session, provider_name, image_name):
|
|
try:
|
|
return self._updateImage(session, provider_name, image_name)
|
|
except Exception:
|
|
self.log.exception(
|
|
"Could not update image %s on %s", image_name, provider_name)
|
|
|
|
def _updateImage(self, session, provider_name, image_name):
|
|
provider = self.config.providers[provider_name]
|
|
image = provider.images[image_name]
|
|
# check type of image depending on diskimage flag
|
|
if image.diskimage:
|
|
raise Exception(
|
|
"Cannot update disk image images. "
|
|
"Please build and upload images")
|
|
|
|
if not image.setup:
|
|
raise Exception(
|
|
"Invalid image config. Must specify either "
|
|
"a setup script, or a diskimage to use.")
|
|
snap_image = session.createSnapshotImage(
|
|
provider_name=provider.name,
|
|
image_name=image_name)
|
|
|
|
t = SnapshotImageUpdater(self, provider, image, snap_image.id)
|
|
t.start()
|
|
# Enough time to give them different timestamps (versions)
|
|
# Just to keep things clearer.
|
|
time.sleep(2)
|
|
return t
|
|
|
|
def buildImage(self, image):
|
|
# check if we already have this item in the queue
|
|
with self.getDB().getSession() as session:
|
|
queued_images = session.getBuildingDibImagesByName(image.name)
|
|
if queued_images:
|
|
self.log.error('Image %s is already being built' %
|
|
image.name)
|
|
return
|
|
else:
|
|
try:
|
|
start_time = time.time()
|
|
timestamp = int(start_time)
|
|
|
|
filename = os.path.join(self.config.imagesdir,
|
|
'%s-%s' %
|
|
(image.name, str(timestamp)))
|
|
dib_image = session.createDibImage(image_name=image.name,
|
|
filename=filename,
|
|
version=timestamp)
|
|
self.log.debug("Created DibImage record %s with state %s",
|
|
dib_image.image_name, dib_image.state)
|
|
|
|
# Submit image-build job
|
|
gearman_job = jobs.ImageBuildJob(image.name, dib_image.id,
|
|
self)
|
|
self._image_build_jobs.addJob(gearman_job)
|
|
|
|
try:
|
|
self.gearman_client.submitJob(gearman_job, timeout=300)
|
|
except Exception:
|
|
self.log.warning("Failed to submit build job for "
|
|
"DibImage record %s with id %d",
|
|
dib_image.image_name, dib_image.id)
|
|
# Remove job from _image_build_jobs
|
|
gearman_job.onFailed()
|
|
raise
|
|
else:
|
|
self.log.debug("Queued image building task for %s" %
|
|
image.name)
|
|
except Exception:
|
|
self.log.exception(
|
|
"Could not build image %s", image.name)
|
|
|
|
def uploadImage(self, session, provider, image_name):
|
|
try:
|
|
provider_entity = self.config.providers[provider]
|
|
provider_image = provider_entity.images[image_name]
|
|
images = session.getOrderedReadyDibImages(provider_image.diskimage)
|
|
image_id = images[0].id
|
|
timestamp = int(time.time())
|
|
job_uuid = str(uuid4().hex)
|
|
|
|
snap_image = session.createSnapshotImage(
|
|
provider_name=provider, image_name=provider_image.name)
|
|
self.log.debug('Created snapshot image id: %s for upload of '
|
|
'DIB image id: %s with job uuid: %s ' %
|
|
(snap_image.id, image_id, job_uuid))
|
|
|
|
# TODO(mordred) abusing the hostname field
|
|
snap_image.hostname = image_name
|
|
snap_image.version = timestamp
|
|
session.commit()
|
|
|
|
# Submit image-upload job
|
|
gearman_job = jobs.ImageUploadJob(image_id, provider, image_name,
|
|
snap_image.id, self)
|
|
self.log.debug('Submitting image-upload job uuid: %s' %
|
|
(gearman_job.unique,))
|
|
self.gearman_client.submitJob(gearman_job, timeout=300)
|
|
|
|
return gearman_job
|
|
except Exception:
|
|
self.log.exception(
|
|
"Could not upload image %s on %s", image_name, provider)
|
|
|
|
def launchNode(self, session, provider, label, target):
|
|
try:
|
|
self._launchNode(session, provider, label, target)
|
|
except Exception:
|
|
self.log.exception(
|
|
"Could not launch node %s on %s", label.name, provider.name)
|
|
|
|
def _launchNode(self, session, provider, label, target):
|
|
provider = self.config.providers[provider.name]
|
|
timeout = provider.boot_timeout
|
|
launch_timeout = provider.launch_timeout
|
|
if provider.azs:
|
|
az = random.choice(provider.azs)
|
|
else:
|
|
az = None
|
|
node = session.createNode(provider.name, label.name, target.name, az)
|
|
t = NodeLauncher(self, provider, label, target, node.id, timeout,
|
|
launch_timeout)
|
|
t.start()
|
|
|
|
def launchSubNode(self, session, node):
|
|
try:
|
|
self._launchSubNode(session, node)
|
|
except Exception:
|
|
self.log.exception(
|
|
"Could not launch subnode for node id: %s", node.id)
|
|
|
|
def _launchSubNode(self, session, node):
|
|
provider = self.config.providers[node.provider_name]
|
|
label = self.config.labels[node.label_name]
|
|
timeout = provider.boot_timeout
|
|
launch_timeout = provider.launch_timeout
|
|
subnode = session.createSubNode(node)
|
|
t = SubNodeLauncher(self, provider, label, subnode.id,
|
|
node.id, node.target_name, timeout, launch_timeout,
|
|
node_az=node.az)
|
|
t.start()
|
|
|
|
def deleteSubNode(self, subnode, manager):
|
|
# Don't try too hard here, the actual node deletion will make
|
|
# sure this is cleaned up.
|
|
if subnode.external_id:
|
|
try:
|
|
self.log.debug('Deleting server %s for subnode id: '
|
|
'%s of node id: %s' %
|
|
(subnode.external_id, subnode.id,
|
|
subnode.node.id))
|
|
manager.cleanupServer(subnode.external_id)
|
|
manager.waitForServerDeletion(subnode.external_id)
|
|
except provider_manager.NotFound:
|
|
pass
|
|
subnode.delete()
|
|
|
|
def deleteNode(self, node_id):
|
|
try:
|
|
self._delete_threads_lock.acquire()
|
|
if node_id in self._delete_threads:
|
|
return
|
|
t = NodeDeleter(self, node_id)
|
|
self._delete_threads[node_id] = t
|
|
t.start()
|
|
except Exception:
|
|
self.log.exception("Could not delete node %s", node_id)
|
|
finally:
|
|
self._delete_threads_lock.release()
|
|
|
|
def _deleteNode(self, session, node):
|
|
self.log.debug("Deleting node id: %s which has been in %s "
|
|
"state for %s hours" %
|
|
(node.id, nodedb.STATE_NAMES[node.state],
|
|
(time.time() - node.state_time) / (60 * 60)))
|
|
# Delete a node
|
|
if node.state != nodedb.DELETE:
|
|
# Don't write to the session if not needed.
|
|
node.state = nodedb.DELETE
|
|
self.updateStats(session, node.provider_name)
|
|
provider = self.config.providers[node.provider_name]
|
|
target = self.config.targets[node.target_name]
|
|
label = self.config.labels.get(node.label_name, None)
|
|
if label and label.image in provider.images:
|
|
image_name = provider.images[label.image].name
|
|
else:
|
|
image_name = None
|
|
manager = self.getProviderManager(provider)
|
|
|
|
if target.jenkins_url and (node.nodename is not None):
|
|
jenkins = self.getJenkinsManager(target)
|
|
jenkins_name = node.nodename
|
|
if jenkins.nodeExists(jenkins_name):
|
|
jenkins.deleteNode(jenkins_name)
|
|
self.log.info("Deleted jenkins node id: %s" % node.id)
|
|
|
|
for subnode in node.subnodes:
|
|
if subnode.external_id:
|
|
try:
|
|
self.log.debug('Deleting server %s for subnode id: '
|
|
'%s of node id: %s' %
|
|
(subnode.external_id, subnode.id, node.id))
|
|
manager.cleanupServer(subnode.external_id)
|
|
except provider_manager.NotFound:
|
|
pass
|
|
|
|
if node.external_id:
|
|
try:
|
|
self.log.debug('Deleting server %s for node id: %s' %
|
|
(node.external_id, node.id))
|
|
manager.cleanupServer(node.external_id)
|
|
manager.waitForServerDeletion(node.external_id)
|
|
except provider_manager.NotFound:
|
|
pass
|
|
node.external_id = None
|
|
|
|
for subnode in node.subnodes:
|
|
if subnode.external_id:
|
|
manager.waitForServerDeletion(subnode.external_id)
|
|
subnode.delete()
|
|
|
|
node.delete()
|
|
self.log.info("Deleted node id: %s" % node.id)
|
|
|
|
if self.statsd:
|
|
dt = int((time.time() - node.state_time) * 1000)
|
|
key = 'nodepool.delete.%s.%s.%s' % (image_name,
|
|
node.provider_name,
|
|
node.target_name)
|
|
self.statsd.timing(key, dt)
|
|
self.statsd.incr(key)
|
|
self.updateStats(session, node.provider_name)
|
|
|
|
def deleteImage(self, snap_image_id):
|
|
try:
|
|
self._image_delete_threads_lock.acquire()
|
|
|
|
snap_image = None
|
|
with self.getDB().getSession() as session:
|
|
snap_image = session.getSnapshotImage(snap_image_id)
|
|
if snap_image is None:
|
|
self.log.error("No image '%s' found.", snap_image_id)
|
|
return
|
|
|
|
if snap_image_id in self._image_delete_threads:
|
|
return
|
|
t = ImageDeleter(self, snap_image_id)
|
|
self._image_delete_threads[snap_image_id] = t
|
|
t.start()
|
|
return t
|
|
except Exception:
|
|
self.log.exception("Could not delete image %s", snap_image_id)
|
|
finally:
|
|
self._image_delete_threads_lock.release()
|
|
|
|
def _deleteImage(self, session, snap_image):
|
|
# Delete an image (and its associated server)
|
|
snap_image.state = nodedb.DELETE
|
|
provider = self.config.providers[snap_image.provider_name]
|
|
manager = self.getProviderManager(provider)
|
|
|
|
if snap_image.server_external_id:
|
|
try:
|
|
server = manager.getServer(snap_image.server_external_id)
|
|
self.log.debug('Deleting server %s for image id: %s' %
|
|
(snap_image.server_external_id,
|
|
snap_image.id))
|
|
manager.cleanupServer(server['id'])
|
|
manager.waitForServerDeletion(server['id'])
|
|
except provider_manager.NotFound:
|
|
self.log.warning('Image server id %s not found' %
|
|
snap_image.server_external_id)
|
|
|
|
if snap_image.external_id:
|
|
try:
|
|
remote_image = manager.getImage(snap_image.external_id)
|
|
self.log.debug('Deleting image %s' % remote_image['id'])
|
|
manager.deleteImage(remote_image['id'])
|
|
except provider_manager.NotFound:
|
|
self.log.warning('Image id %s not found' %
|
|
snap_image.external_id)
|
|
|
|
snap_image.delete()
|
|
self.log.info("Deleted image id: %s" % snap_image.id)
|
|
|
|
def deleteDibImage(self, dib_image):
|
|
try:
|
|
# Submit image-delete job
|
|
gearman_job = jobs.ImageDeleteJob(dib_image.id, self)
|
|
self.gearman_client.submitJob(gearman_job, timeout=300)
|
|
dib_image.delete()
|
|
return gearman_job
|
|
except Exception:
|
|
self.log.exception('Could not submit delete job for image id %s',
|
|
dib_image.id)
|
|
|
|
def deleteInstance(self, provider_name, external_id):
|
|
key = (provider_name, external_id)
|
|
try:
|
|
self._instance_delete_threads_lock.acquire()
|
|
if key in self._instance_delete_threads:
|
|
return
|
|
t = InstanceDeleter(self, provider_name, external_id)
|
|
self._instance_delete_threads[key] = t
|
|
t.start()
|
|
except Exception:
|
|
self.log.exception("Could not delete instance %s on provider %s",
|
|
provider_name, external_id)
|
|
finally:
|
|
self._instance_delete_threads_lock.release()
|
|
|
|
def _deleteInstance(self, provider_name, external_id):
|
|
provider = self.config.providers[provider_name]
|
|
manager = self.getProviderManager(provider)
|
|
manager.cleanupServer(external_id)
|
|
|
|
def _doPeriodicCleanup(self):
|
|
try:
|
|
self.periodicCleanup()
|
|
except Exception:
|
|
self.log.exception("Exception in periodic cleanup:")
|
|
|
|
def periodicCleanup(self):
|
|
# This function should be run periodically to clean up any hosts
|
|
# that may have slipped through the cracks, as well as to remove
|
|
# old images.
|
|
|
|
self.log.debug("Starting periodic cleanup")
|
|
|
|
for k, t in self._delete_threads.items()[:]:
|
|
if not t.isAlive():
|
|
del self._delete_threads[k]
|
|
|
|
for k, t in self._image_delete_threads.items()[:]:
|
|
if not t.isAlive():
|
|
del self._image_delete_threads[k]
|
|
|
|
for k, t in self._instance_delete_threads.items()[:]:
|
|
if not t.isAlive():
|
|
del self._instance_delete_threads[k]
|
|
|
|
node_ids = []
|
|
image_ids = []
|
|
dib_image_ids = []
|
|
with self.getDB().getSession() as session:
|
|
for node in session.getNodes():
|
|
node_ids.append(node.id)
|
|
for image in session.getSnapshotImages():
|
|
image_ids.append(image.id)
|
|
for dib_image in session.getDibImages():
|
|
dib_image_ids.append(dib_image.id)
|
|
|
|
for node_id in node_ids:
|
|
try:
|
|
with self.getDB().getSession() as session:
|
|
node = session.getNode(node_id)
|
|
if node:
|
|
self.cleanupOneNode(session, node)
|
|
except Exception:
|
|
self.log.exception("Exception cleaning up node id %s:" %
|
|
node_id)
|
|
|
|
for image_id in image_ids:
|
|
try:
|
|
with self.getDB().getSession() as session:
|
|
image = session.getSnapshotImage(image_id)
|
|
self.cleanupOneImage(session, image)
|
|
except Exception:
|
|
self.log.exception("Exception cleaning up image id %s:" %
|
|
image_id)
|
|
|
|
for dib_image_id in dib_image_ids:
|
|
try:
|
|
with self.getDB().getSession() as session:
|
|
dib_image = session.getDibImage(dib_image_id)
|
|
if dib_image:
|
|
self.cleanupOneDibImage(session, dib_image)
|
|
except Exception:
|
|
self.log.exception("Exception cleaning up image id %s:" %
|
|
dib_image_id)
|
|
|
|
try:
|
|
self.cleanupLeakedInstances()
|
|
pass
|
|
except Exception:
|
|
self.log.exception("Exception cleaning up leaked nodes")
|
|
|
|
self.log.debug("Finished periodic cleanup")
|
|
|
|
def cleanupLeakedInstances(self):
|
|
known_providers = self.config.providers.keys()
|
|
for provider in self.config.providers.values():
|
|
manager = self.getProviderManager(provider)
|
|
servers = manager.listServers()
|
|
with self.getDB().getSession() as session:
|
|
for server in servers:
|
|
meta = server.get('metadata', {}).get('nodepool')
|
|
if not meta:
|
|
self.log.debug("Instance %s (%s) in %s has no "
|
|
"nodepool metadata" % (
|
|
server['name'], server['id'],
|
|
provider.name))
|
|
continue
|
|
meta = json.loads(meta)
|
|
if meta['provider_name'] not in known_providers:
|
|
self.log.debug("Instance %s (%s) in %s "
|
|
"lists unknown provider %s" % (
|
|
server['name'], server['id'],
|
|
provider.name,
|
|
meta['provider_name']))
|
|
continue
|
|
snap_image_id = meta.get('snapshot_image_id')
|
|
node_id = meta.get('node_id')
|
|
if snap_image_id:
|
|
if session.getSnapshotImage(snap_image_id):
|
|
continue
|
|
self.log.warning("Deleting leaked instance %s (%s) "
|
|
"in %s for snapshot image id: %s" % (
|
|
server['name'], server['id'],
|
|
provider.name,
|
|
snap_image_id))
|
|
self.deleteInstance(provider.name, server['id'])
|
|
elif node_id:
|
|
if session.getNode(node_id):
|
|
continue
|
|
self.log.warning("Deleting leaked instance %s (%s) "
|
|
"in %s for node id: %s" % (
|
|
server['name'], server['id'],
|
|
provider.name, node_id))
|
|
self.deleteInstance(provider.name, server['id'])
|
|
else:
|
|
self.log.warning("Instance %s (%s) in %s has no "
|
|
"database id" % (
|
|
server['name'], server['id'],
|
|
provider.name))
|
|
continue
|
|
|
|
def cleanupOneNode(self, session, node):
|
|
now = time.time()
|
|
time_in_state = now - node.state_time
|
|
if (node.state in [nodedb.READY, nodedb.HOLD]):
|
|
return
|
|
delete = False
|
|
if (node.state == nodedb.DELETE):
|
|
delete = True
|
|
elif (node.state == nodedb.TEST and
|
|
time_in_state > TEST_CLEANUP):
|
|
delete = True
|
|
elif time_in_state > NODE_CLEANUP:
|
|
delete = True
|
|
if delete:
|
|
try:
|
|
self.deleteNode(node.id)
|
|
except Exception:
|
|
self.log.exception("Exception deleting node id: "
|
|
"%s" % node.id)
|
|
|
|
def cleanupOneImage(self, session, image):
|
|
# Normally, reap images that have sat in their current state
|
|
# for 8 hours, unless the image is the current or previous
|
|
# snapshot.
|
|
delete = False
|
|
now = time.time()
|
|
if image.provider_name not in self.config.providers:
|
|
delete = True
|
|
self.log.info("Deleting image id: %s which has no current "
|
|
"provider" % image.id)
|
|
elif (image.image_name not in
|
|
self.config.providers[image.provider_name].images):
|
|
delete = True
|
|
self.log.info("Deleting image id: %s which has no current "
|
|
"base image" % image.id)
|
|
elif (image.state == nodedb.DELETE):
|
|
delete = True
|
|
self.log.info("Deleting image id: %s which is in delete state "
|
|
% image.id)
|
|
else:
|
|
images = session.getOrderedReadySnapshotImages(
|
|
image.provider_name, image.image_name)
|
|
current = previous = None
|
|
if len(images) > 0:
|
|
current = images[0]
|
|
if len(images) > 1:
|
|
previous = images[1]
|
|
if (image != current and image != previous and
|
|
(now - image.state_time) > IMAGE_CLEANUP):
|
|
self.log.info("Deleting image id: %s which is "
|
|
"%s hours old" %
|
|
(image.id,
|
|
(now - image.state_time) / (60 * 60)))
|
|
delete = True
|
|
if delete:
|
|
try:
|
|
self.deleteImage(image.id)
|
|
except Exception:
|
|
self.log.exception("Exception deleting image id: %s:" %
|
|
image.id)
|
|
|
|
def cleanupOneDibImage(self, session, image):
|
|
delete = False
|
|
now = time.time()
|
|
if (image.image_name not in self.config.diskimages):
|
|
delete = True
|
|
self.log.info("Deleting image id: %s which has no current "
|
|
"base image" % image.id)
|
|
else:
|
|
images = session.getOrderedReadyDibImages(
|
|
image.image_name)
|
|
current = previous = None
|
|
if len(images) > 0:
|
|
current = images[0]
|
|
if len(images) > 1:
|
|
previous = images[1]
|
|
if (image != current and image != previous and
|
|
(now - image.state_time) > IMAGE_CLEANUP):
|
|
self.log.info("Deleting image id: %s which is "
|
|
"%s hours old" %
|
|
(image.id,
|
|
(now - image.state_time) / (60 * 60)))
|
|
delete = True
|
|
if image.state == nodedb.DELETE:
|
|
delete = True
|
|
if delete:
|
|
try:
|
|
self.deleteDibImage(image)
|
|
except Exception:
|
|
self.log.exception("Exception deleting image id: %s:" %
|
|
image.id)
|
|
|
|
def _doPeriodicCheck(self):
|
|
try:
|
|
with self.getDB().getSession() as session:
|
|
self.periodicCheck(session)
|
|
except Exception:
|
|
self.log.exception("Exception in periodic check:")
|
|
|
|
def periodicCheck(self, session):
|
|
# This function should be run periodically to make sure we can
|
|
# still access hosts via ssh.
|
|
|
|
self.log.debug("Starting periodic check")
|
|
for node in session.getNodes():
|
|
if node.state != nodedb.READY:
|
|
continue
|
|
provider = self.config.providers[node.provider_name]
|
|
if node.label_name in self.config.labels:
|
|
label = self.config.labels[node.label_name]
|
|
image = provider.images[label.image]
|
|
connect_kwargs = dict(key_filename=image.private_key)
|
|
try:
|
|
if utils.ssh_connect(node.ip, image.username,
|
|
connect_kwargs=connect_kwargs):
|
|
continue
|
|
except Exception:
|
|
self.log.exception("SSH Check failed for node id: %s" %
|
|
node.id)
|
|
else:
|
|
self.log.exception("Node with non-existing label %s" %
|
|
node.label_name)
|
|
self.deleteNode(node.id)
|
|
self.log.debug("Finished periodic check")
|
|
|
|
def updateStats(self, session, provider_name):
|
|
if not self.statsd:
|
|
return
|
|
# This may be called outside of the main thread.
|
|
|
|
states = {}
|
|
|
|
#nodepool.nodes.STATE
|
|
#nodepool.target.TARGET.nodes.STATE
|
|
#nodepool.label.LABEL.nodes.STATE
|
|
#nodepool.provider.PROVIDER.nodes.STATE
|
|
for state in nodedb.STATE_NAMES.values():
|
|
key = 'nodepool.nodes.%s' % state
|
|
states[key] = 0
|
|
for target in self.config.targets.values():
|
|
key = 'nodepool.target.%s.nodes.%s' % (
|
|
target.name, state)
|
|
states[key] = 0
|
|
for label in self.config.labels.values():
|
|
key = 'nodepool.label.%s.nodes.%s' % (
|
|
label.name, state)
|
|
states[key] = 0
|
|
for provider in self.config.providers.values():
|
|
key = 'nodepool.provider.%s.nodes.%s' % (
|
|
provider.name, state)
|
|
states[key] = 0
|
|
|
|
for node in session.getNodes():
|
|
if node.state not in nodedb.STATE_NAMES:
|
|
continue
|
|
state = nodedb.STATE_NAMES[node.state]
|
|
key = 'nodepool.nodes.%s' % state
|
|
states[key] += 1
|
|
|
|
key = 'nodepool.target.%s.nodes.%s' % (
|
|
node.target_name, state)
|
|
states[key] += 1
|
|
|
|
key = 'nodepool.label.%s.nodes.%s' % (
|
|
node.label_name, state)
|
|
states[key] += 1
|
|
|
|
key = 'nodepool.provider.%s.nodes.%s' % (
|
|
node.provider_name, state)
|
|
states[key] += 1
|
|
|
|
for key, count in states.items():
|
|
self.statsd.gauge(key, count)
|
|
|
|
#nodepool.provider.PROVIDER.max_servers
|
|
for provider in self.config.providers.values():
|
|
key = 'nodepool.provider.%s.max_servers' % provider.name
|
|
self.statsd.gauge(key, provider.max_servers)
|
|
|
|
def launchStats(self, subkey, dt, image_name,
|
|
provider_name, target_name, node_az):
|
|
if not self.statsd:
|
|
return
|
|
#nodepool.launch.provider.PROVIDER.subkey
|
|
#nodepool.launch.image.IMAGE.subkey
|
|
#nodepool.launch.target.TARGET.subkey
|
|
#nodepool.launch.subkey
|
|
keys = [
|
|
'nodepool.launch.provider.%s.%s' % (provider_name, subkey),
|
|
'nodepool.launch.image.%s.%s' % (image_name, subkey),
|
|
'nodepool.launch.target.%s.%s' % (target_name, subkey),
|
|
'nodepool.launch.%s' % (subkey,),
|
|
]
|
|
if node_az:
|
|
#nodepool.launch.provider.PROVIDER.AZ.subkey
|
|
keys.append('nodepool.launch.provider.%s.%s.%s' %
|
|
(provider_name, node_az, subkey))
|
|
for key in keys:
|
|
self.statsd.timing(key, dt)
|
|
self.statsd.incr(key)
|