diff --git a/nodepool/jenkins_manager.py b/nodepool/jenkins_manager.py new file mode 100644 index 000000000..faafb5846 --- /dev/null +++ b/nodepool/jenkins_manager.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python + +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import myjenkins +import fakeprovider +from task_manager import Task, TaskManager + + +class CreateNodeTask(Task): + def main(self, jenkins): + if 'credentials_id' in self.args: + launcher_params = {'port': 22, + 'credentialsId': self.args['credentials_id'], + 'host': self.args['host']} + else: + launcher_params = {'port': 22, + 'username': self.args['username'], + 'privatekey': self.args['private_key'], + 'host': self.args['host']} + try: + jenkins.create_node( + name=self.args['name'], + numExecutors=self.args['executors'], + nodeDescription=self.args['description'], + remoteFS=self.args['root'], + labels=self.args['labels'], + exclusive=True, + launcher='hudson.plugins.sshslaves.SSHLauncher', + launcher_params=launcher_params) + except myjenkins.JenkinsException as e: + if 'already exists' in str(e): + pass + else: + raise + + +class NodeExistsTask(Task): + def main(self, jenkins): + return jenkins.node_exists(self.args['name']) + + +class DeleteNodeTask(Task): + def main(self, jenkins): + return jenkins.delete_node(self.args['name']) + + +class JenkinsManager(TaskManager): + log = logging.getLogger("nodepool.JenkinsManager") + + def __init__(self, target): + super(JenkinsManager, self).__init__(None, target.name, target.rate) + self.target = target + self._client = self._getClient() + + def _getClient(self): + if self.target.jenkins_apikey == 'fake': + return fakeprovider.FakeJenkins() + return myjenkins.Jenkins(self.target.jenkins_url, + self.target.jenkins_user, + self.target.jenkins_apikey) + + def createNode(self, name, host, description, labels, executors, root, + credentials_id=None, username=None, private_key=None): + args = dict(name=name, host=host, description=description, + labels=labels, executors=executors, root=root) + if credentials_id: + args['credentials_id'] = credentials_id + else: + args['username'] = username + args['private_key'] = private_key + return self.submitTask(CreateNodeTask(**args)) + + def nodeExists(self, name): + return self.submitTask(NodeExistsTask(name=name)) + + def deleteNode(self, name): + return self.submitTask(DeleteNodeTask(name=name)) diff --git a/nodepool/nodepool.py b/nodepool/nodepool.py index b07a92b17..f90627299 100644 --- a/nodepool/nodepool.py +++ b/nodepool/nodepool.py @@ -23,13 +23,13 @@ import threading import yaml import apscheduler.scheduler import os -import myjenkins from statsd import statsd import zmq import nodedb import nodeutils as utils import provider_manager +import jenkins_manager MINS = 60 HOURS = 60 * MINS @@ -147,7 +147,7 @@ class NodeLauncher(threading.Thread): self.log.debug("Launching node id: %s" % self.node_id) try: self.node = session.getNode(self.node_id) - self.manager = self.nodepool.getManager(self.provider) + self.manager = self.nodepool.getProviderManager(self.provider) except Exception: self.log.exception("Exception preparing to launch node id: %s:" % self.node_id) @@ -228,37 +228,21 @@ class NodeLauncher(threading.Thread): self.log.info("Node id: %s added to jenkins" % self.node.id) def createJenkinsNode(self): - jenkins = utils.get_jenkins(self.target.jenkins_url, - self.target.jenkins_user, - self.target.jenkins_apikey) - node_desc = 'Dynamic single use %s node' % self.image.name - labels = self.image.name - priv_key = '/var/lib/jenkins/.ssh/id_rsa' + jenkins = self.nodepool.getJenkinsManager(self.target) + + args = dict(name=self.node.nodename, + host=self.node.ip, + description='Dynamic single use %s node' % self.image.name, + labels=self.image.name, + executors=1, + root='/home/jenkins') if self.target.jenkins_credentials_id: - launcher_params = {'port': 22, - 'credentialsId': - self.target.jenkins_credentials_id, # noqa - 'host': self.node.ip} + args['credentials_id'] = self.target.jenkins_credentials_id else: - launcher_params = {'port': 22, - 'username': 'jenkins', - 'privatekey': priv_key, - 'host': self.node.ip} - try: - jenkins.create_node( - self.node.nodename, - numExecutors=1, - nodeDescription=node_desc, - remoteFS='/home/jenkins', - labels=labels, - exclusive=True, - launcher='hudson.plugins.sshslaves.SSHLauncher', - launcher_params=launcher_params) - except myjenkins.JenkinsException as e: - if 'already exists' in str(e): - pass - else: - raise + args['username'] = 'jenkins' + args['private_key'] = '/var/lib/jenkins/.ssh/id_rsa' + + jenkins.createNode(**args) class ImageUpdater(threading.Thread): @@ -285,7 +269,7 @@ class ImageUpdater(threading.Thread): try: self.snap_image = session.getSnapshotImage( self.snap_image_id) - self.manager = self.nodepool.getManager(self.provider) + self.manager = self.nodepool.getProviderManager(self.provider) except Exception: self.log.exception("Exception preparing to update image %s " "in %s:" % (self.image.name, @@ -495,7 +479,8 @@ class NodePool(threading.Thread): newconfig.targets = {} newconfig.scriptdir = config.get('script-dir') newconfig.dburi = config.get('dburi') - newconfig.managers = {} + newconfig.provider_managers = {} + newconfig.jenkins_managers = {} stop_managers = [] for provider in config['providers']: @@ -513,7 +498,7 @@ class NodePool(threading.Thread): p.rate = provider.get('rate', 1.0) oldmanager = None if self.config: - oldmanager = self.config.managers.get(p.name) + oldmanager = self.config.provider_managers.get(p.name) if oldmanager: if (p.username != oldmanager.provider.username or p.password != oldmanager.provider.password or @@ -525,13 +510,13 @@ class NodePool(threading.Thread): stop_managers.append(oldmanager) oldmanager = None if oldmanager: - newconfig.managers[p.name] = oldmanager + newconfig.provider_managers[p.name] = oldmanager else: self.log.debug("Creating new ProviderManager object for %s" % p.name) - newconfig.managers[p.name] = \ + newconfig.provider_managers[p.name] = \ provider_manager.ProviderManager(p) - newconfig.managers[p.name].start() + newconfig.provider_managers[p.name].start() p.images = {} for image in provider['images']: i = ProviderImage() @@ -556,6 +541,24 @@ class NodePool(threading.Thread): t.jenkins_user = None t.jenkins_apikey = None t.jenkins_credentials_id = None + t.rate = target.get('rate', 1.0) + oldmanager = None + if self.config: + oldmanager = self.config.jenkins_managers.get(t.name) + if oldmanager: + if (t.jenkins_url != oldmanager.target.jenkins_url or + t.jenkins_user != oldmanager.target.jenkins_user or + t.jenkins_apikey != oldmanager.target.jenkins_apikey): + stop_managers.append(oldmanager) + oldmanager = None + if oldmanager: + newconfig.jenkins_managers[t.name] = oldmanager + else: + self.log.debug("Creating new JenkinsManager object for %s" % + t.name) + newconfig.jenkins_managers[t.name] = \ + jenkins_manager.JenkinsManager(t) + newconfig.jenkins_managers[t.name].start() t.images = {} for image in target['images']: i = TargetImage() @@ -575,8 +578,11 @@ class NodePool(threading.Thread): self.db = nodedb.NodeDatabase(self.config.dburi) self.startUpdateListeners(config['zmq-publishers']) - def getManager(self, provider): - return self.config.managers[provider.name] + def getProviderManager(self, provider): + return self.config.provider_managers[provider.name] + + def getJenkinsManager(self, target): + return self.config.jenkins_managers[target.name] def startUpdateListeners(self, publishers): running = set(self.zmq_listeners.keys()) @@ -704,23 +710,21 @@ class NodePool(threading.Thread): self.updateStats(session, node.provider_name) provider = self.config.providers[node.provider_name] target = self.config.targets[node.target_name] - manager = self.getManager(provider) + manager = self.getProviderManager(provider) if target.jenkins_url: - jenkins = utils.get_jenkins(target.jenkins_url, - target.jenkins_user, - target.jenkins_apikey) + jenkins = self.getJenkinsManager(target) jenkins_name = node.nodename - if jenkins.node_exists(jenkins_name): - jenkins.delete_node(jenkins_name) + if jenkins.nodeExists(jenkins_name): + jenkins.deleteNode(jenkins_name) self.log.info("Deleted jenkins node id: %s" % node.id) if node.external_id: try: server = manager.getServer(node.external_id) self.log.debug('Deleting server %s for node id: %s' % - node.external_id, - node.id) + (node.external_id, + node.id)) manager.cleanupServer(server['id']) except provider_manager.NotFound: pass @@ -741,14 +745,14 @@ class NodePool(threading.Thread): # Delete an image (and its associated server) snap_image.state = nodedb.DELETE provider = self.config.providers[snap_image.provider_name] - manager = self.getManager(provider) + manager = self.getProviderManager(provider) if snap_image.server_external_id: try: server = manager.getServer(snap_image.server_external_id) self.log.debug('Deleting server %s for image id: %s' % - snap_image.server_external_id, - snap_image.id) + (snap_image.server_external_id, + snap_image.id)) manager.cleanupServer(server['id']) except provider_manager.NotFound: self.log.warning('Image server id %s not found' % diff --git a/nodepool/nodeutils.py b/nodepool/nodeutils.py index 35206e985..589fa094f 100644 --- a/nodepool/nodeutils.py +++ b/nodepool/nodeutils.py @@ -19,7 +19,6 @@ import time import socket import logging -import myjenkins from sshclient import SSHClient import fakeprovider @@ -37,15 +36,6 @@ def iterate_timeout(max_seconds, purpose): raise Exception("Timeout waiting for %s" % purpose) -def get_jenkins(url, user, apikey): - if apikey == 'fake': - return fakeprovider.FakeJenkins() - return myjenkins.Jenkins(url, user, apikey) - - -extension_cache = {} - - def ssh_connect(ip, username, connect_kwargs={}, timeout=60): if ip == 'fake': return fakeprovider.FakeSSHClient() diff --git a/nodepool/provider_manager.py b/nodepool/provider_manager.py index afc0da97c..289a6a7fa 100644 --- a/nodepool/provider_manager.py +++ b/nodepool/provider_manager.py @@ -16,9 +16,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys -import threading -import Queue import logging import paramiko import novaclient @@ -26,6 +23,7 @@ import novaclient.client import time import fakeprovider +from task_manager import Task, TaskManager def iterate_timeout(max_seconds, purpose): @@ -42,36 +40,6 @@ class NotFound(Exception): pass -class Task(object): - def __init__(self, **kw): - self._wait_event = threading.Event() - self._exception = None - self._traceback = None - self._result = None - self.args = kw - - def done(self, result): - self._result = result - self._wait_event.set() - - def exception(self, e, tb): - self._exception = e - self._traceback = tb - self._wait_event.set() - - def wait(self): - self._wait_event.wait() - if self._exception: - raise self._exception, None, self._traceback - return self._result - - def run(self, client): - try: - self.done(self.main(client)) - except Exception as e: - self.exception(e, sys.exc_info()[2]) - - class CreateServerTask(Task): def main(self, client): server = client.servers.create(**self.args) @@ -201,20 +169,17 @@ class DeleteImageTask(Task): client.images.delete(**self.args) -class ProviderManager(threading.Thread): +class ProviderManager(TaskManager): log = logging.getLogger("nodepool.ProviderManager") def __init__(self, provider): - threading.Thread.__init__(self) - self.daemon = True - self.queue = Queue.Queue() - self._running = True + super(ProviderManager, self).__init__(None, provider.name, + provider.rate) self.provider = provider self._client = self._getClient() self._flavors = self._getFlavors() self._images = {} self._extensions = self._getExtensions() - self._rate = float(provider.rate) def _getClient(self): args = ['1.1', self.provider.username, self.provider.password, @@ -247,27 +212,6 @@ class ProviderManager(threading.Thread): return True return False - def stop(self): - self._running = False - self.queue.put(None) - - def run(self): - last_ts = 0 - while self._running: - task = self.queue.get() - if not task: - continue - while True: - delta = time.time() - last_ts - if delta >= self._rate: - break - time.sleep(self._rate - delta) - self.log.debug("Provider %s running task %s" % (self.provider.name, - task)) - task.run(self._client) - last_ts = time.time() - self.queue.task_done() - def submitTask(self, task): self.queue.put(task) return task.wait() diff --git a/nodepool/task_manager.py b/nodepool/task_manager.py new file mode 100644 index 000000000..dc6d7c26e --- /dev/null +++ b/nodepool/task_manager.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python + +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import threading +import Queue +import logging +import time + + +class Task(object): + def __init__(self, **kw): + self._wait_event = threading.Event() + self._exception = None + self._traceback = None + self._result = None + self.args = kw + + def done(self, result): + self._result = result + self._wait_event.set() + + def exception(self, e, tb): + self._exception = e + self._traceback = tb + self._wait_event.set() + + def wait(self): + self._wait_event.wait() + if self._exception: + raise self._exception, None, self._traceback + return self._result + + def run(self, client): + try: + self.done(self.main(client)) + except Exception as e: + self.exception(e, sys.exc_info()[2]) + + +class TaskManager(threading.Thread): + log = logging.getLogger("nodepool.ProviderManager") + + def __init__(self, client, name, rate): + super(TaskManager, self).__init__() + self.daemon = True + self.queue = Queue.Queue() + self._running = True + self.name = name + self.rate = float(rate) + self._client = None + + def stop(self): + self._running = False + self.queue.put(None) + + def run(self): + last_ts = 0 + while self._running: + task = self.queue.get() + if not task: + continue + while True: + delta = time.time() - last_ts + if delta >= self.rate: + break + time.sleep(self.rate - delta) + self.log.debug("Manager %s running task %s" % (self.name, task)) + task.run(self._client) + last_ts = time.time() + self.queue.task_done() + + def submitTask(self, task): + self.queue.put(task) + return task.wait()