Remove Gearman and ZMQ

Sets up a shim for the new nodepool.NodePool.run() method that does
not require any service except ZooKeeper and removes all references
to Gearman/ZMQ.

Change-Id: I452c24d631592f47eb3f4cbffb56f3252f36c298
This commit is contained in:
David Shrewsbury 2017-01-05 13:12:04 -05:00
parent eac6ca73f3
commit 8ce719b626
36 changed files with 54 additions and 909 deletions

View File

@ -184,15 +184,10 @@ zookeeper-servers:
- host: localhost
port: 2181
gearman-servers:
- host: localhost
port: 8991
zmq-publishers: []
# Need to have at least one target for node allocations, but
# this does not need to be a jenkins target.
targets:
- name: dummy
assign-via-gearman: True
cron:
cleanup: '*/1 * * * *'
@ -419,9 +414,6 @@ function start_nodepool {
export PATH=$NODEPOOL_INSTALL/bin:$PATH
# start gearman server
run_process geard "$NODEPOOL_INSTALL/bin/geard -p 8991 -d"
# run a fake statsd so we test stats sending paths
export STATSD_HOST=localhost
export STATSD_PORT=8125

View File

@ -112,29 +112,6 @@ and also indicates their default values::
cleanup: '27 */6 * * *'
check: '*/15 * * * *'
zmq-publishers
--------------
Lists the ZeroMQ endpoints for the Jenkins masters. Nodepool uses
this to receive real-time notification that jobs are running on nodes
or are complete and nodes may be deleted. Example::
zmq-publishers:
- tcp://jenkins1.example.com:8888
- tcp://jenkins2.example.com:8888
gearman-servers
---------------
Lists the Zuul Gearman servers that should be consulted for real-time
demand. Nodepool will use information from these servers to determine
if additional nodes should be created to satisfy current demand.
Example::
gearman-servers:
- host: zuul.example.com
port: 4730
The ``port`` key is optional (default: 4730).
zookeeper-servers
-----------------
Lists the ZooKeeper servers uses for coordinating information between

View File

@ -3,51 +3,12 @@
Installation
============
Nodepool consists of a set of long-running daemons which use an SQL
database, a ZooKeeper cluster, and communicates with Jenkins using
ZeroMQ.
Nodepool consists of a long-running daemon which uses ZooKeeper
for coordination with Zuul.
External Requirements
---------------------
Jenkins
~~~~~~~
You should have a Jenkins server running with the `ZMQ Event Publisher
<http://git.openstack.org/cgit/openstack-infra/zmq-event-publisher/tree/README>`_
plugin installed (it is available in the Jenkins Update Center). Be
sure that the machine where you plan to run Nodepool can connect to
the ZMQ port specified by the plugin on your Jenkins master(s).
Zuul
~~~~
If you plan to use Nodepool with Zuul (it is optional), you should
ensure that Nodepool can connect to the gearman port on your Zuul
server (TCP 4730 by default). This will allow Nodepool to respond to
current Zuul demand. If you elect not to connect Nodepool to Zuul, it
will still operate in a node-replacement mode.
Database
~~~~~~~~
Nodepool requires an SQL server. MySQL with the InnoDB storage engine
is tested and recommended. PostgreSQL should work fine. Due to the
high number of concurrent connections from Nodepool, SQLite is not
recommended. When adding or deleting nodes, Nodepool will hold open a
database connection for each node. Be sure to configure the database
server to support at least a number of connections equal to twice the
number of nodes you expect to be in use at once.
All that is necessary is that the database is created. Nodepool will
handle the schema by itself when it is run.
MySQL Example::
CREATE USER 'nodepool'@'localhost' IDENTIFIED BY '<password>';
CREATE DATABASE nodepooldb;
GRANT ALL ON nodepooldb.* TO 'nodepool'@'localhost';
ZooKeeper
~~~~~~~~~
@ -88,11 +49,6 @@ Or install directly from a git checkout with::
pip install .
Note that some distributions provide a libzmq1 which does not support
RCVTIMEO. Removing this libzmq1 from the system libraries will ensure
pip compiles a libzmq1 with appropriate options for the version of
pyzmq used by nodepool.
Configuration
-------------

View File

@ -94,7 +94,6 @@ class ConfigValidator:
'name': str,
'hostname': str,
'subnode-hostname': str,
'assign-via-gearman': bool,
'jenkins': {
'url': str,
'user': str,
@ -117,11 +116,6 @@ class ConfigValidator:
'elements-dir': str,
'images-dir': str,
'dburi': str,
'zmq-publishers': [str],
'gearman-servers': [{
'host': str,
'port': int,
}],
'zookeeper-servers': [{
'host': str,
'port': int,

View File

@ -101,16 +101,6 @@ class Cron(ConfigValue):
return "<Cron %s>" % self.name
class ZMQPublisher(ConfigValue):
def __repr__(self):
return "<ZMQPublisher %s>" % self.name
class GearmanServer(ConfigValue):
def __repr__(self):
return "<GearmanServer %s>" % self.name
class DiskImage(ConfigValue):
def __repr__(self):
return "<DiskImage %s>" % self.name
@ -154,8 +144,6 @@ def loadConfig(config_path):
newconfig.dburi = None
newconfig.provider_managers = {}
newconfig.jenkins_managers = {}
newconfig.zmq_publishers = {}
newconfig.gearman_servers = {}
newconfig.zookeeper_servers = {}
newconfig.diskimages = {}
newconfig.crons = {}
@ -170,19 +158,6 @@ def loadConfig(config_path):
c.job = None
c.timespec = config.get('cron', {}).get(name, default)
for addr in config.get('zmq-publishers', []):
z = ZMQPublisher()
z.name = addr
z.listener = None
newconfig.zmq_publishers[z.name] = z
for server in config.get('gearman-servers', []):
g = GearmanServer()
g.host = server['host']
g.port = server.get('port', 4730)
g.name = g.host + '_' + str(g.port)
newconfig.gearman_servers[g.name] = g
for server in config.get('zookeeper-servers', []):
z = zk.ZooKeeperConnectionConfig(server['host'],
server.get('port', 2181),
@ -312,8 +287,6 @@ def loadConfig(config_path):
t.jenkins_apikey = None
t.jenkins_credentials_id = None
t.assign_via_gearman = target.get('assign-via-gearman', False)
t.hostname = target.get(
'hostname',
'{label.name}-{provider.name}-{node_id}'

View File

@ -1,78 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import uuid
import threading
import gear
class WatchableJob(gear.Job):
def __init__(self, *args, **kwargs):
super(WatchableJob, self).__init__(*args, **kwargs)
self._completion_handlers = []
self._event = threading.Event()
def _handleCompletion(self, mode=None):
self._event.set()
for handler in self._completion_handlers:
handler(self)
def addCompletionHandler(self, handler):
self._completion_handlers.append(handler)
def onCompleted(self):
self._handleCompletion()
def onFailed(self):
self._handleCompletion()
def onDisconnect(self):
self._handleCompletion()
def onWorkStatus(self):
pass
def waitForCompletion(self, timeout=None):
return self._event.wait(timeout)
class NodepoolJob(WatchableJob):
def __init__(self, job_name, job_data_obj, nodepool):
job_uuid = str(uuid.uuid4().hex)
job_data = json.dumps(job_data_obj)
super(NodepoolJob, self).__init__(job_name, job_data, job_uuid)
self.nodepool = nodepool
def getDbSession(self):
return self.nodepool.getDB().getSession()
class NodeAssignmentJob(NodepoolJob):
log = logging.getLogger("jobs.NodeAssignmentJob")
def __init__(self, node_id, target_name, data, nodepool):
self.node_id = node_id
job_name = 'node_assign:%s' % target_name
super(NodeAssignmentJob, self).__init__(job_name, data, nodepool)
class NodeRevokeJob(NodepoolJob):
log = logging.getLogger("jobs.NodeRevokeJob")
def __init__(self, node_id, manager_name, data, nodepool):
self.node_id = node_id
job_name = 'node_revoke:%s' % manager_name
super(NodeRevokeJob, self).__init__(job_name, data, nodepool)

View File

@ -18,7 +18,6 @@
import apscheduler.schedulers.background
import apscheduler.triggers.cron
import gear
import json
import logging
import os
@ -29,7 +28,6 @@ import random
import socket
import threading
import time
import zmq
import allocation
import jenkins_manager
@ -40,7 +38,6 @@ import provider_manager
import stats
import config as nodepool_config
import jobs
import zk
MINS = 60
@ -57,6 +54,8 @@ IMAGE_CLEANUP = 8 * HOURS # When to start deleting an image that is not
# READY or is not the current or previous image
DELETE_DELAY = 1 * MINS # Delay before deleting a node that has completed
# its job.
SUSPEND_WAIT_TIME = 30 # How long to wait between checks for ZooKeeper
# connectivity if it disappears.
class LaunchNodepoolException(Exception):
@ -172,174 +171,6 @@ class NodeCompleteThread(threading.Thread):
self.nodepool.deleteNode(node.id)
class NodeUpdateListener(threading.Thread):
log = logging.getLogger("nodepool.NodeUpdateListener")
def __init__(self, nodepool, addr):
threading.Thread.__init__(self, name='NodeUpdateListener')
self.nodepool = nodepool
self.socket = self.nodepool.zmq_context.socket(zmq.SUB)
self.socket.RCVTIMEO = 1000
event_filter = b""
self.socket.setsockopt(zmq.SUBSCRIBE, event_filter)
self.socket.connect(addr)
self._stopped = False
def run(self):
while not self._stopped:
try:
m = self.socket.recv().decode('utf-8')
except zmq.error.Again:
continue
try:
topic, data = m.split(None, 1)
self.handleEvent(topic, data)
except Exception:
self.log.exception("Exception handling job:")
def stop(self):
self._stopped = True
def handleEvent(self, topic, data):
self.log.debug("Received: %s %s" % (topic, data))
args = json.loads(data)
build = args['build']
if 'node_name' not in build:
return
jobname = args['name']
nodename = args['build']['node_name']
if topic == 'onStarted':
self.handleStartPhase(nodename, jobname)
elif topic == 'onCompleted':
pass
elif topic == 'onFinalized':
result = args['build'].get('status')
params = args['build'].get('parameters')
if params:
branch = params.get('ZUUL_BRANCH', 'unknown_branch')
else:
branch = 'unknown_branch'
self.handleCompletePhase(nodename, jobname, result, branch)
else:
raise Exception("Received job for unhandled phase: %s" %
topic)
def handleStartPhase(self, nodename, jobname):
with self.nodepool.getDB().getSession() as session:
node = session.getNodeByNodename(nodename)
if not node:
self.log.debug("Unable to find node with nodename: %s" %
nodename)
return
target = self.nodepool.config.targets[node.target_name]
if jobname == target.jenkins_test_job:
self.log.debug("Test job for node id: %s started" % node.id)
return
# Preserve the HOLD state even if a job starts on the node.
if node.state != nodedb.HOLD:
self.log.info("Setting node id: %s to USED" % node.id)
node.state = nodedb.USED
self.nodepool.updateStats(session, node.provider_name)
def handleCompletePhase(self, nodename, jobname, result, branch):
t = NodeCompleteThread(self.nodepool, nodename, jobname, result,
branch)
t.start()
class GearmanClient(gear.Client):
def __init__(self):
super(GearmanClient, self).__init__(client_id='nodepool')
self.__log = logging.getLogger("nodepool.GearmanClient")
def getNeededWorkers(self):
needed_workers = {}
job_worker_map = {}
unspecified_jobs = {}
for connection in self.active_connections:
try:
req = gear.StatusAdminRequest()
connection.sendAdminRequest(req, timeout=300)
except Exception:
self.__log.exception("Exception while listing functions")
self._lostConnection(connection)
continue
for line in req.response.split('\n'):
parts = [x.strip() for x in line.split('\t')]
# parts[0] - function name
# parts[1] - total jobs queued (including building)
# parts[2] - jobs building
# parts[3] - workers registered
if not parts or parts[0] == '.':
continue
if not parts[0].startswith('build:'):
continue
function = parts[0][len('build:'):]
# total jobs in queue (including building jobs)
# NOTE(jhesketh): Jobs that are being built are accounted for
# in the demand algorithm by subtracting the running nodes.
# If there are foreign (to nodepool) workers accepting jobs
# the demand will be higher than actually required. However
# better to have too many than too few and if you have a
# foreign worker this may be desired.
try:
queued = int(parts[1])
except ValueError as e:
self.__log.warn(
'Server returned non-integer value in status. (%s)' %
str(e))
queued = 0
if queued > 0:
self.__log.debug("Function: %s queued: %s" % (function,
queued))
if ':' in function:
fparts = function.split(':')
# fparts[0] - function name
# fparts[1] - target node [type]
job = fparts[-2]
worker = fparts[-1]
workers = job_worker_map.get(job, [])
workers.append(worker)
job_worker_map[job] = workers
if queued > 0:
needed_workers[worker] = (
needed_workers.get(worker, 0) + queued)
elif queued > 0:
job = function
unspecified_jobs[job] = (unspecified_jobs.get(job, 0) +
queued)
for job, queued in unspecified_jobs.items():
workers = job_worker_map.get(job)
if not workers:
continue
worker = workers[0]
needed_workers[worker] = (needed_workers.get(worker, 0) +
queued)
return needed_workers
def handleWorkComplete(self, packet):
job = super(GearmanClient, self).handleWorkComplete(packet)
job.onCompleted()
def handleWorkFail(self, packet):
job = super(GearmanClient, self).handleWorkFail(packet)
job.onFailed()
def handleWorkException(self, packet):
job = super(GearmanClient, self).handleWorkException(packet)
job.onFailed()
def handleDisconnect(self, job):
super(GearmanClient, self).handleDisconnect(job)
job.onDisconnect()
def handleWorkStatus(self, packet):
job = super(GearmanClient, self).handleWorkStatus(packet)
job.onWorkStatus()
class InstanceDeleter(threading.Thread):
log = logging.getLogger("nodepool.InstanceDeleter")
@ -569,10 +400,6 @@ class NodeLauncher(threading.Thread):
self.createJenkinsNode()
self.log.info("Node id: %s added to jenkins" % self.node.id)
if self.target.assign_via_gearman:
self.log.info("Node id: %s assigning via gearman" % self.node.id)
self.assignViaGearman()
return dt
def createJenkinsNode(self):
@ -597,24 +424,6 @@ class NodeLauncher(threading.Thread):
params = dict(NODE=self.node.nodename)
jenkins.startBuild(self.target.jenkins_test_job, params)
def assignViaGearman(self):
args = dict(name=self.node.nodename,
host=self.node.ip,
description='Dynamic single use %s node' % self.label.name,
labels=self.label.name,
root=self.image.user_home)
job = jobs.NodeAssignmentJob(self.node.id, self.node.target_name,
args, self.nodepool)
self.nodepool.gearman_client.submitJob(job, timeout=300)
job.waitForCompletion()
self.log.info("Node id: %s received %s from assignment" % (
self.node.id, job.data))
if job.failure:
raise Exception("Node id: %s received job failure on assignment" %
self.node.id)
data = json.loads(job.data[-1])
self.node.manager_name = data['manager']
def writeNodepoolInfo(self, nodelist):
key = paramiko.RSAKey.generate(2048)
public_key = key.get_name() + ' ' + key.get_base64()
@ -862,6 +671,20 @@ class SubNodeLauncher(threading.Thread):
return dt
class RequestWorker(threading.Thread):
log = logging.getLogger("nodepool.RequestWorker")
def __init__(self, request, zk):
threading.Thread.__init__(
self, name='RequestWorker for %s' % request.id
)
self.request = request
self.zk = zk
def run(self):
self.log.info("Handling node request %s" % self.request.id)
class NodePool(threading.Thread):
log = logging.getLogger("nodepool.NodePool")
@ -875,8 +698,6 @@ class NodePool(threading.Thread):
self.watermark_sleep = watermark_sleep
self._stopped = False
self.config = None
self.zmq_context = None
self.gearman_client = None
self.apsched = None
self.zk = None
self.statsd = stats.get_client()
@ -895,16 +716,9 @@ class NodePool(threading.Thread):
self._wake_condition.notify()
self._wake_condition.release()
if self.config:
for z in self.config.zmq_publishers.values():
z.listener.stop()
z.listener.join()
provider_manager.ProviderManager.stopProviders(self.config)
if self.zmq_context:
self.zmq_context.destroy()
if self.apsched and self.apsched.running:
self.apsched.shutdown()
if self.gearman_client:
self.gearman_client.shutdown()
self.log.debug("finished stopping")
def loadConfig(self):
@ -913,12 +727,6 @@ class NodePool(threading.Thread):
nodepool_config.loadSecureConfig(config, self.securefile)
return config
def reconfigureDatabase(self, config):
if (not self.config) or config.dburi != self.config.dburi:
config.db = nodedb.NodeDatabase(config.dburi)
else:
config.db = self.config.db
def reconfigureManagers(self, config, check_targets=True):
provider_manager.ProviderManager.reconfigure(self.config, config)
@ -989,54 +797,6 @@ class NodePool(threading.Thread):
else:
c.job = self.config.crons[c.name].job
def reconfigureUpdateListeners(self, config):
if self.no_deletes:
return
if self.config:
running = set(self.config.zmq_publishers.keys())
else:
running = set()
configured = set(config.zmq_publishers.keys())
if running == configured:
self.log.debug("ZMQ Listeners do not need to be updated")
if self.config:
config.zmq_publishers = self.config.zmq_publishers
return
if self.zmq_context:
self.log.debug("Stopping listeners")
self.zmq_context.destroy()
self.zmq_context = zmq.Context()
for z in config.zmq_publishers.values():
self.log.debug("Starting listener for %s" % z.name)
z.listener = NodeUpdateListener(self, z.name)
z.listener.start()
def reconfigureGearmanClient(self, config):
if self.config:
running = set(self.config.gearman_servers.keys())
else:
running = set()
configured = set(config.gearman_servers.keys())
if running == configured:
self.log.debug("Gearman client does not need to be updated")
if self.config:
config.gearman_servers = self.config.gearman_servers
return
if self.gearman_client:
self.log.debug("Stopping gearman client")
self.gearman_client.shutdown()
self.gearman_client = None
if configured:
self.gearman_client = GearmanClient()
for g in config.gearman_servers.values():
self.log.debug("Adding gearman server %s" % g.name)
self.gearman_client.addServer(g.host, g.port)
self.gearman_client.waitForServer()
def reconfigureZooKeeper(self, config):
if self.config:
running = self.config.zookeeper_servers.values()
@ -1078,10 +838,7 @@ class NodePool(threading.Thread):
def getNeededNodes(self, session, allocation_history):
self.log.debug("Beginning node launch calculation")
# Get the current demand for nodes.
if self.gearman_client:
label_demand = self.gearman_client.getNeededWorkers()
else:
label_demand = {}
label_demand = {}
for name, demand in label_demand.items():
self.log.debug(" Demand from gearman: %s: %s" % (name, demand))
@ -1245,45 +1002,49 @@ class NodePool(threading.Thread):
def updateConfig(self):
config = self.loadConfig()
self.reconfigureDatabase(config)
self.reconfigureZooKeeper(config)
self.reconfigureManagers(config)
self.reconfigureUpdateListeners(config)
self.reconfigureGearmanClient(config)
self.reconfigureCrons(config)
self.setConfig(config)
def startup(self):
self.updateConfig()
self.zk.registerLauncher(self.launcher_id)
# Currently nodepool can not resume building a node or image
# after a restart. To clean up, mark all building node and
# images for deletion when the daemon starts.
with self.getDB().getSession() as session:
for node in session.getNodes(state=nodedb.BUILDING):
self.log.info("Setting building node id: %s to delete "
"on startup" % node.id)
node.state = nodedb.DELETE
def run(self):
try:
self.startup()
except Exception:
self.log.exception("Exception in startup:")
'''
Start point for the NodePool thread.
'''
allocation_history = allocation.AllocationHistory()
while not self._stopped:
try:
self.updateConfig()
with self.getDB().getSession() as session:
self._run(session, allocation_history)
# Don't do work if we've lost communication with the ZK cluster
while self.zk and (self.zk.suspended or self.zk.lost):
self.log.info("ZooKeeper suspended. Waiting")
time.sleep(SUSPEND_WAIT_TIME)
# Make sure we're always registered with ZK
self.zk.registerLauncher(self.launcher_id)
self._run(allocation_history)
except Exception:
self.log.exception("Exception in main loop:")
self._wake_condition.acquire()
self._wake_condition.wait(self.watermark_sleep)
self._wake_condition.release()
def _run(self, session, allocation_history):
def _run(self, allocation_history):
if self.no_launches:
return
for req_id in self.zk.getNodeRequests():
request = self.zk.getNodeRequest(req_id)
if request.state != zk.REQUESTED:
continue
worker = RequestWorker(request, self.zk)
worker.start()
def _run_OLD(self, session, allocation_history):
if self.no_launches:
return
# Make up the subnode deficit first to make sure that an
@ -1380,13 +1141,6 @@ class NodePool(threading.Thread):
finally:
self._delete_threads_lock.release()
def revokeAssignedNode(self, node):
args = dict(name=node.nodename)
job = jobs.NodeRevokeJob(node.id, node.manager_name,
args, self)
self.gearman_client.submitJob(job, timeout=300)
# Do not wait for completion in case the manager is offline
def _deleteNode(self, session, node):
self.log.debug("Deleting node id: %s which has been in %s "
"state for %s hours" %

View File

@ -15,13 +15,11 @@
"""Common utilities used in testing"""
import errno
import glob
import logging
import os
import pymysql
import random
import re
import string
import subprocess
import threading
@ -30,7 +28,6 @@ import time
import uuid
import fixtures
import gear
import lockfile
import kazoo.client
import testtools
@ -46,74 +43,6 @@ class LoggingPopen(subprocess.Popen):
pass
class FakeGearmanServer(gear.Server):
def __init__(self, port=0):
self.hold_jobs_in_queue = False
super(FakeGearmanServer, self).__init__(port)
def getJobForConnection(self, connection, peek=False):
for queue in [self.high_queue, self.normal_queue, self.low_queue]:
for job in queue:
if not hasattr(job, 'waiting'):
if job.name.startswith('build:'):
job.waiting = self.hold_jobs_in_queue
else:
job.waiting = False
if job.waiting:
continue
if job.name in connection.functions:
if not peek:
queue.remove(job)
connection.related_jobs[job.handle] = job
job.worker_connection = connection
job.running = True
return job
return None
def release(self, regex=None):
released = False
qlen = (len(self.high_queue) + len(self.normal_queue) +
len(self.low_queue))
self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
for job in self.getQueue():
cmd, name = job.name.split(':')
if cmd != 'build':
continue
if not regex or re.match(regex, name):
self.log.debug("releasing queued job %s" %
job.unique)
job.waiting = False
released = True
else:
self.log.debug("not releasing queued job %s" %
job.unique)
if released:
self.wakeConnections()
qlen = (len(self.high_queue) + len(self.normal_queue) +
len(self.low_queue))
self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
class GearmanServerFixture(fixtures.Fixture):
def __init__(self, port=0):
self._port = port
def setUp(self):
super(GearmanServerFixture, self).setUp()
self.gearman_server = FakeGearmanServer(self._port)
self.addCleanup(self.shutdownGearman)
def shutdownGearman(self):
#TODO:greghaynes remove try once gear client protects against this
try:
self.gearman_server.shutdown()
except OSError as e:
if e.errno == errno.EBADF:
pass
else:
raise
class ZookeeperServerFixture(fixtures.Fixture):
def _setUp(self):
zk_host = os.environ.get('NODEPOOL_ZK_HOST', 'localhost')
@ -171,37 +100,6 @@ class ChrootedKazooFixture(fixtures.Fixture):
_tmp_client.close()
class GearmanClient(gear.Client):
def __init__(self):
super(GearmanClient, self).__init__(client_id='test_client')
self.__log = logging.getLogger("tests.GearmanClient")
def get_queued_image_jobs(self):
'Count the number of image-build and upload jobs queued.'
queued = 0
for connection in self.active_connections:
try:
req = gear.StatusAdminRequest()
connection.sendAdminRequest(req)
except Exception:
self.__log.exception("Exception while listing functions")
self._lostConnection(connection)
continue
for line in req.response.split('\n'):
parts = [x.strip() for x in line.split('\t')]
# parts[0] - function name
# parts[1] - total jobs queued (including building)
# parts[2] - jobs building
# parts[3] - workers registered
if not parts or parts[0] == '.':
continue
if (not parts[0].startswith('image-build:') and
not parts[0].startswith('image-upload:')):
continue
queued += int(parts[1])
return queued
class BaseTestCase(testtools.TestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
@ -265,8 +163,6 @@ class BaseTestCase(testtools.TestCase):
'NodePool',
'NodePool Builder',
'NodeUpdateListener',
'Gearman client connect',
'Gearman client poll',
'fake-provider',
'fake-provider1',
'fake-provider2',
@ -397,11 +293,6 @@ class DBTestCase(BaseTestCase):
self.useFixture(f)
self.dburi = f.dburi
self.secure_conf = self._setup_secure()
gearman_fixture = GearmanServerFixture()
self.useFixture(gearman_fixture)
self.gearman_server = gearman_fixture.gearman_server
self.setupZK()
def setup_config(self, filename, images_dir=None):
@ -414,7 +305,6 @@ class DBTestCase(BaseTestCase):
with open(configfile) as conf_fd:
config = conf_fd.read()
os.write(fd, config.format(images_dir=images_dir.path,
gearman_port=self.gearman_server.port,
zookeeper_host=self.zookeeper_host,
zookeeper_port=self.zookeeper_port,
zookeeper_chroot=self.zookeeper_chroot))
@ -540,18 +430,6 @@ class DBTestCase(BaseTestCase):
time.sleep(1)
self.wait_for_threads()
def waitForJobs(self):
# XXX:greghaynes - There is a very narrow race here where nodepool
# is who actually updates the database so this may return before the
# image rows are updated.
client = GearmanClient()
client.addServer('localhost', self.gearman_server.port)
client.waitForServer()
while client.get_queued_image_jobs() > 0:
time.sleep(.2)
client.shutdown()
def useNodepool(self, *args, **kwargs):
args = (self.secure_conf,) + args
pool = nodepool.NodePool(*args, **kwargs)

View File

@ -5,18 +5,6 @@ cron:
cleanup: '*/1 * * * *'
check: '*/15 * * * *'
zmq-publishers:
- tcp://jenkins01.openstack.org:8888
- tcp://jenkins02.openstack.org:8888
- tcp://jenkins03.openstack.org:8888
- tcp://jenkins04.openstack.org:8888
- tcp://jenkins05.openstack.org:8888
- tcp://jenkins06.openstack.org:8888
- tcp://jenkins07.openstack.org:8888
gearman-servers:
- host: zuul.openstack.org
zookeeper-servers:
- host: zk1.openstack.org
port: 2181

View File

@ -5,18 +5,6 @@ cron:
cleanup: '*/1 * * * *'
check: '*/15 * * * *'
zmq-publishers:
- tcp://jenkins01.openstack.org:8888
- tcp://jenkins02.openstack.org:8888
- tcp://jenkins03.openstack.org:8888
- tcp://jenkins04.openstack.org:8888
- tcp://jenkins05.openstack.org:8888
- tcp://jenkins06.openstack.org:8888
- tcp://jenkins07.openstack.org:8888
gearman-servers:
- host: zuul.openstack.org
zookeeper-servers:
- host: zk1.openstack.org
port: 2181

View File

@ -4,13 +4,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost

View File

@ -4,13 +4,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '* * * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -4,13 +4,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -5,13 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}

View File

@ -35,8 +35,15 @@ READY = 'ready'
DELETING = 'deleting'
# The build failed.
FAILED = 'failed'
# Node request is submitted/unhandled.
REQUESTED = 'requested'
# Node request has been processed successfully.
FULFILLED = 'fulfilled'
# Node request is being worked.
PENDING = 'pending'
STATES = set([BUILDING, UPLOADING, READY, DELETING, FAILED])
STATES = set([BUILDING, UPLOADING, READY, DELETING, FAILED,
REQUESTED, FULFILLED, PENDING])
class ZooKeeperConnectionConfig(object):
'''

View File

@ -1,6 +1,5 @@
pbr>=1.3
gear
PyYAML
python-jenkins
paramiko>1.11.6,<2.0.0
@ -9,7 +8,6 @@ extras
statsd>=3.0
apscheduler>=3.0
sqlalchemy>=0.8.2,<1.1.0
pyzmq>=13.1.0
PyMySQL
PrettyTable>=0.6,<0.8
# shade has a looser requirement on six than nodepool, so install six first

View File

@ -5,12 +5,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
diskimages:
- name: fake-dib-image
elements:

View File

@ -1,93 +0,0 @@
#!/usr/bin/env python
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# Copyright 2011-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
# A test script to stand in for a zeromq enabled jenkins. It sends zmq
# events that simulate the jenkins node lifecycle.
#
# Usage:
# zmq-server.py start HOSTNAME
# zmq-server.py complete HOSTNAME
import gear
import json
import logging
import select
import socket
import threading
import zmq
class MyGearmanServer(gear.Server):
def handleStatus(self, request):
request.connection.conn.send(("build:fake_job\t%s\t0\t0\n" %
self._count).encode('utf8'))
request.connection.conn.send(("build:fake_job:devstack-precise\t%s\t0\t0\n" %
0).encode('utf8'))
request.connection.conn.send(b'.\n')
class FakeStatsd(object):
def __init__(self):
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('', 8125))
self.stats = []
self.thread.start()
def run(self):
while True:
poll = select.poll()
poll.register(self.sock, select.POLLIN)
ret = poll.poll()
for (fd, event) in ret:
if fd == self.sock.fileno():
data = self.sock.recvfrom(1024)
if not data:
return
print data[0]
self.stats.append(data[0])
def main():
logging.basicConfig(level=logging.DEBUG)
context = zmq.Context()
zsocket = context.socket(zmq.PUB)
zsocket.bind("tcp://*:8881")
geard = MyGearmanServer(statsd_host='localhost', statsd_port=8125,
statsd_prefix='zuul.geard')
geard._count = 0
statsd = FakeStatsd()
print('ready')
while True:
line = raw_input()
command, arg = line.split()
if command == 'queue':
geard._count = int(arg)
elif command == 'start':
topic = 'onStarted'
data = {"name":"test","url":"job/test/","build":{"full_url":"http://localhost:8080/job/test/1/","number":1,"phase":"STARTED","url":"job/test/1/","node_name":arg}}
zsocket.send("%s %s" % (topic, json.dumps(data)))
elif command == 'complete':
topic = 'onFinalized'
data = {"name":"test","url":"job/test/","build":{"full_url":"http://localhost:8080/job/test/1/","number":1,"phase":"FINISHED","status":"SUCCESS","url":"job/test/1/","node_name":arg, "parameters":{"BASE_LOG_PATH":"05/60105/3/gate","LOG_PATH":"05/60105/3/gate/gate-tempest-dsvm-postgres-full/bf0f215","OFFLINE_NODE_WHEN_COMPLETE":"1","ZUUL_BRANCH":"master","ZUUL_CHANGE":"60105","ZUUL_CHANGE_IDS":"60105,3","ZUUL_CHANGES":"openstack/cinder:master:refs/changes/05/60105/3","ZUUL_COMMIT":"ccd02fce4148d5ac2b3e1e68532b55eb5c1c356d","ZUUL_PATCHSET":"3","ZUUL_PIPELINE":"gate","ZUUL_PROJECT":"openstack/cinder","ZUUL_REF":"refs/zuul/master/Z6726d84e57a04ec79585b895ace08f7e","ZUUL_URL":"http://zuul.openstack.org/p","ZUUL_UUID":"bf0f21577026492a985ca98a9ea14cc1"}}}
zsocket.send("%s %s" % (topic, json.dumps(data)))
if __name__ == '__main__':
main()

View File

@ -4,12 +4,6 @@ cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
zookeeper-servers:
- host: localhost
@ -56,4 +50,3 @@ providers:
targets:
- name: zuul
assign-via-gearman: True

View File

@ -1,36 +0,0 @@
#!/usr/bin/env python
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
# A test script to watch a zmq stream
#
# Usage:
# zmq-stream.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
event_filter = b""
socket.setsockopt(zmq.SUBSCRIBE, event_filter)
socket.connect("tcp://localhost:8888")
print('ready')
while True:
m = socket.recv().decode('utf-8')
print(m)