Add JenkinsManager

Same idea as a ProviderManager: serialize changes to each jenkins
server (with a rate limit).

Change-Id: I631d50dcfd13c29d2802c192d6e1ac7889256a90
This commit is contained in:
James E. Blair 2013-08-20 15:29:16 -07:00
parent 8dc6c870f2
commit 0ec2246514
5 changed files with 242 additions and 120 deletions

View File

@ -0,0 +1,94 @@
#!/usr/bin/env python
# Copyright (C) 2011-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import myjenkins
import fakeprovider
from task_manager import Task, TaskManager
class CreateNodeTask(Task):
def main(self, jenkins):
if 'credentials_id' in self.args:
launcher_params = {'port': 22,
'credentialsId': self.args['credentials_id'],
'host': self.args['host']}
else:
launcher_params = {'port': 22,
'username': self.args['username'],
'privatekey': self.args['private_key'],
'host': self.args['host']}
try:
jenkins.create_node(
name=self.args['name'],
numExecutors=self.args['executors'],
nodeDescription=self.args['description'],
remoteFS=self.args['root'],
labels=self.args['labels'],
exclusive=True,
launcher='hudson.plugins.sshslaves.SSHLauncher',
launcher_params=launcher_params)
except myjenkins.JenkinsException as e:
if 'already exists' in str(e):
pass
else:
raise
class NodeExistsTask(Task):
def main(self, jenkins):
return jenkins.node_exists(self.args['name'])
class DeleteNodeTask(Task):
def main(self, jenkins):
return jenkins.delete_node(self.args['name'])
class JenkinsManager(TaskManager):
log = logging.getLogger("nodepool.JenkinsManager")
def __init__(self, target):
super(JenkinsManager, self).__init__(None, target.name, target.rate)
self.target = target
self._client = self._getClient()
def _getClient(self):
if self.target.jenkins_apikey == 'fake':
return fakeprovider.FakeJenkins()
return myjenkins.Jenkins(self.target.jenkins_url,
self.target.jenkins_user,
self.target.jenkins_apikey)
def createNode(self, name, host, description, labels, executors, root,
credentials_id=None, username=None, private_key=None):
args = dict(name=name, host=host, description=description,
labels=labels, executors=executors, root=root)
if credentials_id:
args['credentials_id'] = credentials_id
else:
args['username'] = username
args['private_key'] = private_key
return self.submitTask(CreateNodeTask(**args))
def nodeExists(self, name):
return self.submitTask(NodeExistsTask(name=name))
def deleteNode(self, name):
return self.submitTask(DeleteNodeTask(name=name))

View File

@ -23,13 +23,13 @@ import threading
import yaml
import apscheduler.scheduler
import os
import myjenkins
from statsd import statsd
import zmq
import nodedb
import nodeutils as utils
import provider_manager
import jenkins_manager
MINS = 60
HOURS = 60 * MINS
@ -147,7 +147,7 @@ class NodeLauncher(threading.Thread):
self.log.debug("Launching node id: %s" % self.node_id)
try:
self.node = session.getNode(self.node_id)
self.manager = self.nodepool.getManager(self.provider)
self.manager = self.nodepool.getProviderManager(self.provider)
except Exception:
self.log.exception("Exception preparing to launch node id: %s:"
% self.node_id)
@ -228,37 +228,21 @@ class NodeLauncher(threading.Thread):
self.log.info("Node id: %s added to jenkins" % self.node.id)
def createJenkinsNode(self):
jenkins = utils.get_jenkins(self.target.jenkins_url,
self.target.jenkins_user,
self.target.jenkins_apikey)
node_desc = 'Dynamic single use %s node' % self.image.name
labels = self.image.name
priv_key = '/var/lib/jenkins/.ssh/id_rsa'
jenkins = self.nodepool.getJenkinsManager(self.target)
args = dict(name=self.node.nodename,
host=self.node.ip,
description='Dynamic single use %s node' % self.image.name,
labels=self.image.name,
executors=1,
root='/home/jenkins')
if self.target.jenkins_credentials_id:
launcher_params = {'port': 22,
'credentialsId':
self.target.jenkins_credentials_id, # noqa
'host': self.node.ip}
args['credentials_id'] = self.target.jenkins_credentials_id
else:
launcher_params = {'port': 22,
'username': 'jenkins',
'privatekey': priv_key,
'host': self.node.ip}
try:
jenkins.create_node(
self.node.nodename,
numExecutors=1,
nodeDescription=node_desc,
remoteFS='/home/jenkins',
labels=labels,
exclusive=True,
launcher='hudson.plugins.sshslaves.SSHLauncher',
launcher_params=launcher_params)
except myjenkins.JenkinsException as e:
if 'already exists' in str(e):
pass
else:
raise
args['username'] = 'jenkins'
args['private_key'] = '/var/lib/jenkins/.ssh/id_rsa'
jenkins.createNode(**args)
class ImageUpdater(threading.Thread):
@ -285,7 +269,7 @@ class ImageUpdater(threading.Thread):
try:
self.snap_image = session.getSnapshotImage(
self.snap_image_id)
self.manager = self.nodepool.getManager(self.provider)
self.manager = self.nodepool.getProviderManager(self.provider)
except Exception:
self.log.exception("Exception preparing to update image %s "
"in %s:" % (self.image.name,
@ -495,7 +479,8 @@ class NodePool(threading.Thread):
newconfig.targets = {}
newconfig.scriptdir = config.get('script-dir')
newconfig.dburi = config.get('dburi')
newconfig.managers = {}
newconfig.provider_managers = {}
newconfig.jenkins_managers = {}
stop_managers = []
for provider in config['providers']:
@ -513,7 +498,7 @@ class NodePool(threading.Thread):
p.rate = provider.get('rate', 1.0)
oldmanager = None
if self.config:
oldmanager = self.config.managers.get(p.name)
oldmanager = self.config.provider_managers.get(p.name)
if oldmanager:
if (p.username != oldmanager.provider.username or
p.password != oldmanager.provider.password or
@ -525,13 +510,13 @@ class NodePool(threading.Thread):
stop_managers.append(oldmanager)
oldmanager = None
if oldmanager:
newconfig.managers[p.name] = oldmanager
newconfig.provider_managers[p.name] = oldmanager
else:
self.log.debug("Creating new ProviderManager object for %s" %
p.name)
newconfig.managers[p.name] = \
newconfig.provider_managers[p.name] = \
provider_manager.ProviderManager(p)
newconfig.managers[p.name].start()
newconfig.provider_managers[p.name].start()
p.images = {}
for image in provider['images']:
i = ProviderImage()
@ -556,6 +541,24 @@ class NodePool(threading.Thread):
t.jenkins_user = None
t.jenkins_apikey = None
t.jenkins_credentials_id = None
t.rate = target.get('rate', 1.0)
oldmanager = None
if self.config:
oldmanager = self.config.jenkins_managers.get(t.name)
if oldmanager:
if (t.jenkins_url != oldmanager.target.jenkins_url or
t.jenkins_user != oldmanager.target.jenkins_user or
t.jenkins_apikey != oldmanager.target.jenkins_apikey):
stop_managers.append(oldmanager)
oldmanager = None
if oldmanager:
newconfig.jenkins_managers[t.name] = oldmanager
else:
self.log.debug("Creating new JenkinsManager object for %s" %
t.name)
newconfig.jenkins_managers[t.name] = \
jenkins_manager.JenkinsManager(t)
newconfig.jenkins_managers[t.name].start()
t.images = {}
for image in target['images']:
i = TargetImage()
@ -575,8 +578,11 @@ class NodePool(threading.Thread):
self.db = nodedb.NodeDatabase(self.config.dburi)
self.startUpdateListeners(config['zmq-publishers'])
def getManager(self, provider):
return self.config.managers[provider.name]
def getProviderManager(self, provider):
return self.config.provider_managers[provider.name]
def getJenkinsManager(self, target):
return self.config.jenkins_managers[target.name]
def startUpdateListeners(self, publishers):
running = set(self.zmq_listeners.keys())
@ -704,23 +710,21 @@ class NodePool(threading.Thread):
self.updateStats(session, node.provider_name)
provider = self.config.providers[node.provider_name]
target = self.config.targets[node.target_name]
manager = self.getManager(provider)
manager = self.getProviderManager(provider)
if target.jenkins_url:
jenkins = utils.get_jenkins(target.jenkins_url,
target.jenkins_user,
target.jenkins_apikey)
jenkins = self.getJenkinsManager(target)
jenkins_name = node.nodename
if jenkins.node_exists(jenkins_name):
jenkins.delete_node(jenkins_name)
if jenkins.nodeExists(jenkins_name):
jenkins.deleteNode(jenkins_name)
self.log.info("Deleted jenkins node id: %s" % node.id)
if node.external_id:
try:
server = manager.getServer(node.external_id)
self.log.debug('Deleting server %s for node id: %s' %
node.external_id,
node.id)
(node.external_id,
node.id))
manager.cleanupServer(server['id'])
except provider_manager.NotFound:
pass
@ -741,14 +745,14 @@ class NodePool(threading.Thread):
# Delete an image (and its associated server)
snap_image.state = nodedb.DELETE
provider = self.config.providers[snap_image.provider_name]
manager = self.getManager(provider)
manager = self.getProviderManager(provider)
if snap_image.server_external_id:
try:
server = manager.getServer(snap_image.server_external_id)
self.log.debug('Deleting server %s for image id: %s' %
snap_image.server_external_id,
snap_image.id)
(snap_image.server_external_id,
snap_image.id))
manager.cleanupServer(server['id'])
except provider_manager.NotFound:
self.log.warning('Image server id %s not found' %

View File

@ -19,7 +19,6 @@
import time
import socket
import logging
import myjenkins
from sshclient import SSHClient
import fakeprovider
@ -37,15 +36,6 @@ def iterate_timeout(max_seconds, purpose):
raise Exception("Timeout waiting for %s" % purpose)
def get_jenkins(url, user, apikey):
if apikey == 'fake':
return fakeprovider.FakeJenkins()
return myjenkins.Jenkins(url, user, apikey)
extension_cache = {}
def ssh_connect(ip, username, connect_kwargs={}, timeout=60):
if ip == 'fake':
return fakeprovider.FakeSSHClient()

View File

@ -16,9 +16,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import threading
import Queue
import logging
import paramiko
import novaclient
@ -26,6 +23,7 @@ import novaclient.client
import time
import fakeprovider
from task_manager import Task, TaskManager
def iterate_timeout(max_seconds, purpose):
@ -42,36 +40,6 @@ class NotFound(Exception):
pass
class Task(object):
def __init__(self, **kw):
self._wait_event = threading.Event()
self._exception = None
self._traceback = None
self._result = None
self.args = kw
def done(self, result):
self._result = result
self._wait_event.set()
def exception(self, e, tb):
self._exception = e
self._traceback = tb
self._wait_event.set()
def wait(self):
self._wait_event.wait()
if self._exception:
raise self._exception, None, self._traceback
return self._result
def run(self, client):
try:
self.done(self.main(client))
except Exception as e:
self.exception(e, sys.exc_info()[2])
class CreateServerTask(Task):
def main(self, client):
server = client.servers.create(**self.args)
@ -201,20 +169,17 @@ class DeleteImageTask(Task):
client.images.delete(**self.args)
class ProviderManager(threading.Thread):
class ProviderManager(TaskManager):
log = logging.getLogger("nodepool.ProviderManager")
def __init__(self, provider):
threading.Thread.__init__(self)
self.daemon = True
self.queue = Queue.Queue()
self._running = True
super(ProviderManager, self).__init__(None, provider.name,
provider.rate)
self.provider = provider
self._client = self._getClient()
self._flavors = self._getFlavors()
self._images = {}
self._extensions = self._getExtensions()
self._rate = float(provider.rate)
def _getClient(self):
args = ['1.1', self.provider.username, self.provider.password,
@ -247,27 +212,6 @@ class ProviderManager(threading.Thread):
return True
return False
def stop(self):
self._running = False
self.queue.put(None)
def run(self):
last_ts = 0
while self._running:
task = self.queue.get()
if not task:
continue
while True:
delta = time.time() - last_ts
if delta >= self._rate:
break
time.sleep(self._rate - delta)
self.log.debug("Provider %s running task %s" % (self.provider.name,
task))
task.run(self._client)
last_ts = time.time()
self.queue.task_done()
def submitTask(self, task):
self.queue.put(task)
return task.wait()

90
nodepool/task_manager.py Normal file
View File

@ -0,0 +1,90 @@
#!/usr/bin/env python
# Copyright (C) 2011-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import threading
import Queue
import logging
import time
class Task(object):
def __init__(self, **kw):
self._wait_event = threading.Event()
self._exception = None
self._traceback = None
self._result = None
self.args = kw
def done(self, result):
self._result = result
self._wait_event.set()
def exception(self, e, tb):
self._exception = e
self._traceback = tb
self._wait_event.set()
def wait(self):
self._wait_event.wait()
if self._exception:
raise self._exception, None, self._traceback
return self._result
def run(self, client):
try:
self.done(self.main(client))
except Exception as e:
self.exception(e, sys.exc_info()[2])
class TaskManager(threading.Thread):
log = logging.getLogger("nodepool.ProviderManager")
def __init__(self, client, name, rate):
super(TaskManager, self).__init__()
self.daemon = True
self.queue = Queue.Queue()
self._running = True
self.name = name
self.rate = float(rate)
self._client = None
def stop(self):
self._running = False
self.queue.put(None)
def run(self):
last_ts = 0
while self._running:
task = self.queue.get()
if not task:
continue
while True:
delta = time.time() - last_ts
if delta >= self.rate:
break
time.sleep(self.rate - delta)
self.log.debug("Manager %s running task %s" % (self.name, task))
task.run(self._client)
last_ts = time.time()
self.queue.task_done()
def submitTask(self, task):
self.queue.put(task)
return task.wait()