Merge "Add a promote client command"

This commit is contained in:
Jenkins 2013-12-11 17:15:15 +00:00 committed by Gerrit Code Review
commit 4561fa6e72
5 changed files with 291 additions and 40 deletions

View File

@ -2976,8 +2976,7 @@ class TestScheduler(testtools.TestCase):
r = client.enqueue(pipeline='gate',
project='org/project',
trigger='gerrit',
change='1',
patchset='1')
change='1,1')
self.waitUntilSettled()
self.assertEqual(self.getJobFromHistory('project-merge').result,
'SUCCESS')
@ -2998,8 +2997,7 @@ class TestScheduler(testtools.TestCase):
r = client.enqueue(pipeline='gate',
project='project-does-not-exist',
trigger='gerrit',
change='1',
patchset='1')
change='1,1')
client.shutdown()
self.assertEqual(r, False)
@ -3008,8 +3006,7 @@ class TestScheduler(testtools.TestCase):
r = client.enqueue(pipeline='pipeline-does-not-exist',
project='org/project',
trigger='gerrit',
change='1',
patchset='1')
change='1,1')
client.shutdown()
self.assertEqual(r, False)
@ -3018,8 +3015,7 @@ class TestScheduler(testtools.TestCase):
r = client.enqueue(pipeline='gate',
project='org/project',
trigger='trigger-does-not-exist',
change='1',
patchset='1')
change='1,1')
client.shutdown()
self.assertEqual(r, False)
@ -3028,11 +3024,164 @@ class TestScheduler(testtools.TestCase):
r = client.enqueue(pipeline='gate',
project='org/project',
trigger='gerrit',
change='1',
patchset='1')
change='1,1')
client.shutdown()
self.assertEqual(r, False)
self.waitUntilSettled()
self.assertEqual(len(self.history), 0)
self.assertEqual(len(self.builds), 0)
def test_client_promote(self):
"Test that the RPC client can promote a change"
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
C.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
self.waitUntilSettled()
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
r = client.promote(pipeline='gate',
change_ids=['2,1', '3,1'])
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 6)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test2')
self.assertEqual(self.builds[2].name, 'project-test1')
self.assertEqual(self.builds[3].name, 'project-test2')
self.assertEqual(self.builds[4].name, 'project-test1')
self.assertEqual(self.builds[5].name, 'project-test2')
self.assertTrue(self.job_has_changes(self.builds[0], B))
self.assertFalse(self.job_has_changes(self.builds[0], A))
self.assertFalse(self.job_has_changes(self.builds[0], C))
self.assertTrue(self.job_has_changes(self.builds[2], B))
self.assertTrue(self.job_has_changes(self.builds[2], C))
self.assertFalse(self.job_has_changes(self.builds[2], A))
self.assertTrue(self.job_has_changes(self.builds[4], B))
self.assertTrue(self.job_has_changes(self.builds[4], C))
self.assertTrue(self.job_has_changes(self.builds[4], A))
self.worker.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(B.reported, 2)
self.assertEqual(C.data['status'], 'MERGED')
self.assertEqual(C.reported, 2)
client.shutdown()
self.assertEqual(r, True)
def test_client_promote_dependent(self):
"Test that the RPC client can promote a dependent change"
# C (depends on B) -> B -> A ; then promote C to get:
# A -> C (depends on B) -> B
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
C.setDependsOn(B, 1)
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
C.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
self.waitUntilSettled()
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
r = client.promote(pipeline='gate',
change_ids=['3,1'])
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 6)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test2')
self.assertEqual(self.builds[2].name, 'project-test1')
self.assertEqual(self.builds[3].name, 'project-test2')
self.assertEqual(self.builds[4].name, 'project-test1')
self.assertEqual(self.builds[5].name, 'project-test2')
self.assertTrue(self.job_has_changes(self.builds[0], B))
self.assertFalse(self.job_has_changes(self.builds[0], A))
self.assertFalse(self.job_has_changes(self.builds[0], C))
self.assertTrue(self.job_has_changes(self.builds[2], B))
self.assertTrue(self.job_has_changes(self.builds[2], C))
self.assertFalse(self.job_has_changes(self.builds[2], A))
self.assertTrue(self.job_has_changes(self.builds[4], B))
self.assertTrue(self.job_has_changes(self.builds[4], C))
self.assertTrue(self.job_has_changes(self.builds[4], A))
self.worker.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(B.reported, 2)
self.assertEqual(C.data['status'], 'MERGED')
self.assertEqual(C.reported, 2)
client.shutdown()
self.assertEqual(r, True)
def test_client_promote_negative(self):
"Test that the RPC client returns errors for promotion"
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure):
r = client.promote(pipeline='nonexistent',
change_ids=['2,1', '3,1'])
client.shutdown()
self.assertEqual(r, False)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure):
r = client.promote(pipeline='gate',
change_ids=['4,1'])
client.shutdown()
self.assertEqual(r, False)
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()

View File

@ -55,10 +55,16 @@ class Client(object):
required=True)
cmd_enqueue.add_argument('--change', help='change id',
required=True)
cmd_enqueue.add_argument('--patchset', help='patchset number',
required=True)
cmd_enqueue.set_defaults(func=self.enqueue)
cmd_promote = subparsers.add_parser('promote',
help='promote one or more changes')
cmd_promote.add_argument('--pipeline', help='pipeline name',
required=True)
cmd_promote.add_argument('--changes', help='change ids',
required=True, nargs='+')
cmd_promote.set_defaults(func=self.promote)
self.args = parser.parse_args()
def read_config(self):
@ -104,8 +110,13 @@ class Client(object):
r = client.enqueue(pipeline=self.args.pipeline,
project=self.args.project,
trigger=self.args.trigger,
change=self.args.change,
patchset=self.args.patchset)
change=self.args.change)
return r
def promote(self):
client = zuul.rpcclient.RPCClient(self.server, self.port)
r = client.promote(pipeline=self.args.pipeline,
change_ids=self.args.changes)
return r

View File

@ -48,14 +48,19 @@ class RPCClient(object):
self.log.debug("Job complete, success: %s" % (not job.failure))
return (not job.failure)
def enqueue(self, pipeline, project, trigger, change, patchset):
def enqueue(self, pipeline, project, trigger, change):
data = {'pipeline': pipeline,
'project': project,
'trigger': trigger,
'change': change,
'patchset': patchset,
}
return self.submitJob('zuul:enqueue', data)
def promote(self, pipeline, change_ids):
data = {'pipeline': pipeline,
'change_ids': change_ids,
}
return self.submitJob('zuul:promote', data)
def shutdown(self):
self.gearman.shutdown()

View File

@ -39,13 +39,15 @@ class RPCListener(object):
port = 4730
self.worker = gear.Worker('Zuul RPC Listener')
self.worker.addServer(server, port)
self.register()
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
self.worker.waitForServer()
self.register()
def register(self):
self.worker.registerFunction("zuul:enqueue")
self.worker.registerFunction("zuul:promote")
def stop(self):
self.log.debug("Stopping")
@ -57,10 +59,12 @@ class RPCListener(object):
self.thread.join()
def run(self):
self.log.debug("Starting RPC listener")
while self._running:
try:
job = self.worker.getJob()
z, jobname = job.name.split(':')
self.log.debug("Received job %s" % job.name)
attrname = 'handle_' + jobname
if hasattr(self, attrname):
f = getattr(self, attrname)
@ -86,31 +90,36 @@ class RPCListener(object):
if trigger:
event.trigger_name = args['trigger']
else:
errors += 'Invalid trigger: %s\n' % args['trigger']
errors += 'Invalid trigger: %s\n' % (args['trigger'],)
project = self.sched.layout.projects.get(args['project'])
if project:
event.project_name = args['project']
else:
errors += 'Invalid project: %s\n' % args['project']
errors += 'Invalid project: %s\n' % (args['project'],)
pipeline = self.sched.layout.pipelines.get(args['pipeline'])
if pipeline:
event.forced_pipeline = args['pipeline']
else:
errors += 'Invalid pipeline: %s\n' % args['pipeline']
errors += 'Invalid pipeline: %s\n' % (args['pipeline'],)
if not errors:
event.change_number = args['change']
event.patch_number = args['patchset']
event.change_number, event.patch_number = args['change'].split(',')
try:
event.getChange(project, trigger)
except Exception:
errors += 'Invalid change: %s,%s\n' % (
args['change'], args['patchset'])
errors += 'Invalid change: %s\n' % (args['change'],)
if errors:
job.sendWorkException(errors.encode('utf8'))
else:
self.sched.addEvent(event)
job.sendWorkComplete()
def handle_promote(self, job):
args = json.loads(job.arguments)
pipeline_name = args['pipeline']
change_ids = args['change_ids']
self.sched.promote(pipeline_name, change_ids)
job.sendWorkComplete()

View File

@ -22,6 +22,7 @@ import os
import pickle
import Queue
import re
import sys
import threading
import time
import yaml
@ -64,12 +65,21 @@ class ManagementEvent(object):
"""An event that should be processed within the main queue run loop"""
def __init__(self):
self._wait_event = threading.Event()
self._exception = None
self._traceback = None
def setComplete(self):
def exception(self, e, tb):
self._exception = e
self._traceback = tb
self._wait_event.set()
def done(self):
self._wait_event.set()
def wait(self, timeout=None):
self._wait_event.wait(timeout)
if self._exception:
raise self._exception, None, self._traceback
return self._wait_event.is_set()
@ -84,6 +94,20 @@ class ReconfigureEvent(ManagementEvent):
self.config = config
class PromoteEvent(ManagementEvent):
"""Promote one or more changes to the head of the queue.
:arg str pipeline_name: the name of the pipeline
:arg list change_ids: a list of strings of change ids in the form
1234,1
"""
def __init__(self, pipeline_name, change_ids):
super(PromoteEvent, self).__init__()
self.pipeline_name = pipeline_name
self.change_ids = change_ids
class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler")
@ -396,6 +420,14 @@ class Scheduler(threading.Thread):
event.wait()
self.log.debug("Reconfiguration complete")
def promote(self, pipeline_name, change_ids):
event = PromoteEvent(pipeline_name, change_ids)
self.management_event_queue.put(event)
self.wake_event.set()
self.log.debug("Waiting for promotion")
event.wait()
self.log.debug("Promotion complete")
def exit(self):
self.log.debug("Prepare to exit")
self._pause = True
@ -520,6 +552,43 @@ class Scheduler(threading.Thread):
finally:
self.layout_lock.release()
def _doPromoteEvent(self, event):
pipeline = self.layout.pipelines[event.pipeline_name]
change_ids = [c.split(',') for c in event.change_ids]
items_to_enqueue = []
change_queue = None
for shared_queue in pipeline.queues:
if change_queue:
break
for item in shared_queue.queue:
if (item.change.number == change_ids[0][0] and
item.change.patchset == change_ids[0][1]):
change_queue = shared_queue
break
if not change_queue:
raise Exception("Unable to find shared change queue for %s" %
event.change_ids[0])
for number, patchset in change_ids:
found = False
for item in change_queue.queue:
if (item.change.number == number and
item.change.patchset == patchset):
found = True
items_to_enqueue.append(item)
break
if not found:
raise Exception("Unable to find %s,%s in queue %s" %
(number, patchset, change_queue))
for item in change_queue.queue[:]:
if item not in items_to_enqueue:
items_to_enqueue.append(item)
pipeline.manager.cancelJobs(item)
pipeline.manager.dequeueItem(item)
for item in items_to_enqueue:
pipeline.manager.addChange(item.change, quiet=True)
while pipeline.manager.processQueue():
pass
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
waiting = False
@ -625,9 +694,16 @@ class Scheduler(threading.Thread):
self.log.debug("Fetching management event")
event = self.management_event_queue.get()
self.log.debug("Processing management event %s" % event)
if isinstance(event, ReconfigureEvent):
self._doReconfigureEvent(event)
event.setComplete()
try:
if isinstance(event, ReconfigureEvent):
self._doReconfigureEvent(event)
elif isinstance(event, PromoteEvent):
self._doPromoteEvent(event)
else:
self.log.error("Unable to handle event %s" % event)
event.done()
except Exception as e:
event.exception(e, sys.exc_info()[2])
self.management_event_queue.task_done()
def process_result_queue(self):
@ -815,10 +891,10 @@ class BasePipelineManager(object):
def isChangeReadyToBeEnqueued(self, change):
return True
def enqueueChangesAhead(self, change):
def enqueueChangesAhead(self, change, quiet):
return True
def enqueueChangesBehind(self, change):
def enqueueChangesBehind(self, change, quiet):
return True
def checkForChangesNeededBy(self, change):
@ -872,7 +948,7 @@ class BasePipelineManager(object):
item.change.project)
return False
def addChange(self, change):
def addChange(self, change, quiet=False):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
@ -883,7 +959,7 @@ class BasePipelineManager(object):
change)
return False
if not self.enqueueChangesAhead(change):
if not self.enqueueChangesAhead(change, quiet):
self.log.debug("Failed to enqueue changes ahead of %s" % change)
return False
@ -895,11 +971,12 @@ class BasePipelineManager(object):
if change_queue:
self.log.debug("Adding change %s to queue %s" %
(change, change_queue))
if len(self.pipeline.start_actions) > 0:
self.reportStart(change)
if not quiet:
if len(self.pipeline.start_actions) > 0:
self.reportStart(change)
item = change_queue.enqueueChange(change)
self.reportStats(item)
self.enqueueChangesBehind(change)
self.enqueueChangesBehind(change, quiet)
else:
self.log.error("Unable to find change queue for project %s" %
change.project)
@ -970,7 +1047,7 @@ class BasePipelineManager(object):
self.log.debug("Cancel jobs for change %s" % item.change)
canceled = False
to_remove = []
if prime and item.current_build_set.builds:
if prime and item.current_build_set.ref:
item.resetAllBuilds()
for build, build_item in self.building_jobs.items():
if build_item == item:
@ -1440,7 +1517,7 @@ class DependentPipelineManager(BasePipelineManager):
return False
return True
def enqueueChangesBehind(self, change):
def enqueueChangesBehind(self, change, quiet):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
@ -1456,15 +1533,15 @@ class DependentPipelineManager(BasePipelineManager):
self.log.debug(" No changes need %s" % change)
for other_change in to_enqueue:
self.addChange(other_change)
self.addChange(other_change, quiet)
def enqueueChangesAhead(self, change):
def enqueueChangesAhead(self, change, quiet):
ret = self.checkForChangesNeededBy(change)
if ret in [True, False]:
return ret
self.log.debug(" Change %s must be merged ahead of %s" %
(ret, change))
return self.addChange(ret)
return self.addChange(ret, quiet)
def checkForChangesNeededBy(self, change):
self.log.debug("Checking for changes needed by %s:" % change)