Merge "Add a zuul client"

This commit is contained in:
Jenkins 2013-12-04 20:37:08 +00:00 committed by Gerrit Code Review
commit 2634d0f0a4
9 changed files with 401 additions and 4 deletions

View File

@ -22,6 +22,7 @@ warnerrors = True
[entry_points]
console_scripts =
zuul-server = zuul.cmd.server:main
zuul = zuul.cmd.client:main
[build_sphinx]
source-dir = doc/source

View File

@ -44,6 +44,8 @@ import testtools
import zuul.scheduler
import zuul.webapp
import zuul.rpclistener
import zuul.rpcclient
import zuul.launcher.gearman
import zuul.reporter.gerrit
import zuul.reporter.smtp
@ -351,8 +353,10 @@ class FakeGerrit(object):
change.setReported()
def query(self, number):
change = self.changes[int(number)]
change = self.changes.get(int(number))
if change:
return change.query()
return {}
def startWatching(self, *args, **kw):
pass
@ -806,6 +810,7 @@ class TestScheduler(testtools.TestCase):
self.fake_gerrit.upstream_root = self.upstream_root
self.webapp = zuul.webapp.WebApp(self.sched, port=0)
self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.sched.setLauncher(self.launcher)
self.sched.registerTrigger(self.gerrit)
@ -824,6 +829,7 @@ class TestScheduler(testtools.TestCase):
self.sched.reconfigure(self.config)
self.sched.resume()
self.webapp.start()
self.rpc.start()
self.launcher.gearman.waitForServer()
self.registerJobs()
self.builds = self.worker.running_builds
@ -857,6 +863,8 @@ class TestScheduler(testtools.TestCase):
self.statsd.join()
self.webapp.stop()
self.webapp.join()
self.rpc.stop()
self.rpc.join()
threads = threading.enumerate()
if len(threads) > 1:
self.log.error("More than one thread is running: %s" % threads)
@ -956,12 +964,14 @@ class TestScheduler(testtools.TestCase):
while True:
done = True
for connection in self.gearman_server.active_connections:
if connection.functions:
if (connection.functions and
connection.client_id != 'Zuul RPC Listener'):
done = False
if done:
break
time.sleep(0)
self.gearman_server.functions = set()
self.rpc.register()
def haveAllBuildsReported(self):
# See if Zuul is waiting on a meta job to complete
@ -2954,3 +2964,75 @@ class TestScheduler(testtools.TestCase):
FakeSMTP.messages[1]['to_email'])
self.assertEqual(A.messages[0],
FakeSMTP.messages[1]['body'])
def test_client_enqueue(self):
"Test that the RPC client can enqueue a change"
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2)
A.addApproval('APRV', 1)
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
r = client.enqueue(pipeline='gate',
project='org/project',
trigger='gerrit',
change='1',
patchset='1')
self.waitUntilSettled()
self.assertEqual(self.getJobFromHistory('project-merge').result,
'SUCCESS')
self.assertEqual(self.getJobFromHistory('project-test1').result,
'SUCCESS')
self.assertEqual(self.getJobFromHistory('project-test2').result,
'SUCCESS')
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
self.assertEqual(r, True)
def test_client_enqueue_negative(self):
"Test that the RPC client returns errors"
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid project"):
r = client.enqueue(pipeline='gate',
project='project-does-not-exist',
trigger='gerrit',
change='1',
patchset='1')
client.shutdown()
self.assertEqual(r, False)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid pipeline"):
r = client.enqueue(pipeline='pipeline-does-not-exist',
project='org/project',
trigger='gerrit',
change='1',
patchset='1')
client.shutdown()
self.assertEqual(r, False)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid trigger"):
r = client.enqueue(pipeline='gate',
project='org/project',
trigger='trigger-does-not-exist',
change='1',
patchset='1')
client.shutdown()
self.assertEqual(r, False)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid change"):
r = client.enqueue(pipeline='gate',
project='org/project',
trigger='gerrit',
change='1',
patchset='1')
client.shutdown()
self.assertEqual(r, False)
self.waitUntilSettled()
self.assertEqual(len(self.history), 0)
self.assertEqual(len(self.builds), 0)

119
zuul/cmd/client.py Normal file
View File

@ -0,0 +1,119 @@
#!/usr/bin/env python
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import ConfigParser
import logging
import logging.config
import os
import sys
import zuul.rpcclient
class Client(object):
log = logging.getLogger("zuul.Client")
def __init__(self):
self.args = None
self.config = None
self.gear_server_pid = None
def parse_arguments(self):
parser = argparse.ArgumentParser(
description='Zuul Project Gating System Client.')
parser.add_argument('-c', dest='config',
help='specify the config file')
parser.add_argument('-v', dest='verbose', action='store_true',
help='verbose output')
parser.add_argument('--version', dest='version', action='store_true',
help='show zuul version')
subparsers = parser.add_subparsers(title='commands',
description='valid commands',
help='additional help')
cmd_enqueue = subparsers.add_parser('enqueue', help='enqueue a change')
cmd_enqueue.add_argument('--trigger', help='trigger name',
required=True)
cmd_enqueue.add_argument('--pipeline', help='pipeline name',
required=True)
cmd_enqueue.add_argument('--project', help='project name',
required=True)
cmd_enqueue.add_argument('--change', help='change id',
required=True)
cmd_enqueue.add_argument('--patchset', help='patchset number',
required=True)
cmd_enqueue.set_defaults(func=self.enqueue)
self.args = parser.parse_args()
def read_config(self):
self.config = ConfigParser.ConfigParser()
if self.args.config:
locations = [self.args.config]
else:
locations = ['/etc/zuul/zuul.conf',
'~/zuul.conf']
for fp in locations:
if os.path.exists(os.path.expanduser(fp)):
self.config.read(os.path.expanduser(fp))
return
raise Exception("Unable to locate config file in %s" % locations)
def setup_logging(self):
if self.args.verbose:
logging.basicConfig(level=logging.DEBUG)
def main(self):
self.parse_arguments()
self.read_config()
self.setup_logging()
if self.args.version:
from zuul.version import version_info as zuul_version_info
print "Zuul version: %s" % zuul_version_info.version_string()
sys.exit(0)
self.server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'):
self.port = self.config.get('gearman', 'port')
else:
self.port = 4730
if self.args.func():
sys.exit(0)
else:
sys.exit(1)
def enqueue(self):
client = zuul.rpcclient.RPCClient(self.server, self.port)
r = client.enqueue(pipeline=self.args.pipeline,
project=self.args.project,
trigger=self.args.trigger,
change=self.args.change,
patchset=self.args.patchset)
return r
def main():
client = Client()
client.main()
if __name__ == "__main__":
sys.path.insert(0, '.')
main()

View File

@ -172,6 +172,7 @@ class Server(object):
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.webapp
import zuul.rpclistener
if (self.config.has_option('gearman_server', 'start') and
self.config.getboolean('gearman_server', 'start')):
@ -185,6 +186,7 @@ class Server(object):
gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
timer = zuul.trigger.timer.Timer(self.config, self.sched)
webapp = zuul.webapp.WebApp(self.sched)
rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
gerrit_reporter = zuul.reporter.gerrit.Reporter(gerrit)
smtp_reporter = zuul.reporter.smtp.Reporter(
self.config.get('smtp', 'default_from')
@ -207,6 +209,7 @@ class Server(object):
self.sched.reconfigure(self.config)
self.sched.resume()
webapp.start()
rpc.start()
signal.signal(signal.SIGHUP, self.reconfigure_handler)
signal.signal(signal.SIGUSR1, self.exit_handler)

View File

@ -802,6 +802,9 @@ class TriggerEvent(object):
self.newrev = None
# timer
self.timespec = None
# For events that arrive with a destination pipeline (eg, from
# an admin command, etc):
self.forced_pipeline = None
def __repr__(self):
ret = '<TriggerEvent %s %s' % (self.type, self.project_name)

61
zuul/rpcclient.py Normal file
View File

@ -0,0 +1,61 @@
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import time
import gear
class RPCFailure(Exception):
pass
class RPCClient(object):
log = logging.getLogger("zuul.RPCClient")
def __init__(self, server, port):
self.log.debug("Connecting to gearman at %s:%s" % (server, port))
self.gearman = gear.Client()
self.gearman.addServer(server, port)
self.log.debug("Waiting for gearman")
self.gearman.waitForServer()
def submitJob(self, name, data):
self.log.debug("Submitting job %s with data %s" % (name, data))
job = gear.Job(name,
json.dumps(data),
unique=str(time.time()))
self.gearman.submitJob(job)
self.log.debug("Waiting for job completion")
while not job.complete:
time.sleep(0.1)
if job.exception:
raise RPCFailure(job.exception)
self.log.debug("Job complete, success: %s" % (not job.failure))
return (not job.failure)
def enqueue(self, pipeline, project, trigger, change, patchset):
data = {'pipeline': pipeline,
'project': project,
'trigger': trigger,
'change': change,
'patchset': patchset,
}
return self.submitJob('zuul:enqueue', data)
def shutdown(self):
self.gearman.shutdown()

116
zuul/rpclistener.py Normal file
View File

@ -0,0 +1,116 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import threading
import traceback
import gear
import model
class RPCListener(object):
log = logging.getLogger("zuul.RPCListener")
def __init__(self, config, sched):
self.config = config
self.sched = sched
def start(self):
self._running = True
server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'):
port = self.config.get('gearman', 'port')
else:
port = 4730
self.worker = gear.Worker('Zuul RPC Listener')
self.worker.addServer(server, port)
self.register()
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
def register(self):
self.worker.registerFunction("zuul:enqueue")
def stop(self):
self.log.debug("Stopping")
self._running = False
self.worker.shutdown()
self.log.debug("Stopped")
def join(self):
self.thread.join()
def run(self):
while self._running:
try:
job = self.worker.getJob()
z, jobname = job.name.split(':')
attrname = 'handle_' + jobname
if hasattr(self, attrname):
f = getattr(self, attrname)
if callable(f):
try:
f(job)
except Exception:
self.log.exception("Exception while running job")
job.sendWorkException(traceback.format_exc())
else:
job.sendWorkFail()
else:
job.sendWorkFail()
except Exception:
self.log.exception("Exception while getting job")
def handle_enqueue(self, job):
args = json.loads(job.arguments)
event = model.TriggerEvent()
errors = ''
trigger = self.sched.triggers.get(args['trigger'])
if trigger:
event.trigger_name = args['trigger']
else:
errors += 'Invalid trigger: %s\n' % args['trigger']
project = self.sched.layout.projects.get(args['project'])
if project:
event.project_name = args['project']
else:
errors += 'Invalid project: %s\n' % args['project']
pipeline = self.sched.layout.pipelines.get(args['pipeline'])
if pipeline:
event.forced_pipeline = args['pipeline']
else:
errors += 'Invalid pipeline: %s\n' % args['pipeline']
if not errors:
event.change_number = args['change']
event.patch_number = args['patchset']
try:
event.getChange(project, trigger)
except Exception:
errors += 'Invalid change: %s,%s\n' % (
args['change'], args['patchset'])
if errors:
job.sendWorkException(errors.encode('utf8'))
else:
self.sched.addEvent(event)
job.sendWorkComplete()

View File

@ -731,6 +731,11 @@ class BasePipelineManager(object):
return allow_needs
def eventMatches(self, event):
if event.forced_pipeline:
if event.forced_pipeline == self.pipeline.name:
return True
else:
return False
for ef in self.event_filters:
if ef.matches(event):
return True

View File

@ -302,7 +302,11 @@ class Gerrit(object):
change.patchset = patchset
key = '%s,%s' % (change.number, change.patchset)
self._change_cache[key] = change
try:
self.updateChange(change)
except Exception:
del self._change_cache[key]
raise
return change
def updateChange(self, change):
@ -314,6 +318,9 @@ class Gerrit(object):
if change.patchset is None:
change.patchset = data['currentPatchSet']['number']
if 'project' not in data:
raise Exception("Change %s,%s not found" % (change.number,
change.patchset))
change.project = self.sched.getProject(data['project'])
change.branch = data['branch']
change.url = data['url']