Add a promote client command

Takes one or more changes and promotes them to the head of the queue.

Also, change the command line syntax for the enqueue command to accept
change IDs in the form 'change,patchset' in order to match the syntax
of promote, as well as be potentially more compatible with future
triggers.

Change-Id: Ic7ded9587c68217c060328bf4c3518e32fe659e3
This commit is contained in:
James E. Blair 2013-12-06 17:53:48 -08:00
parent 468c85126e
commit 36658cf7bd
5 changed files with 291 additions and 40 deletions

View File

@ -2976,8 +2976,7 @@ class TestScheduler(testtools.TestCase):
r = client.enqueue(pipeline='gate',
project='org/project',
trigger='gerrit',
change='1',
patchset='1')
change='1,1')
self.waitUntilSettled()
self.assertEqual(self.getJobFromHistory('project-merge').result,
'SUCCESS')
@ -2998,8 +2997,7 @@ class TestScheduler(testtools.TestCase):
r = client.enqueue(pipeline='gate',
project='project-does-not-exist',
trigger='gerrit',
change='1',
patchset='1')
change='1,1')
client.shutdown()
self.assertEqual(r, False)
@ -3008,8 +3006,7 @@ class TestScheduler(testtools.TestCase):
r = client.enqueue(pipeline='pipeline-does-not-exist',
project='org/project',
trigger='gerrit',
change='1',
patchset='1')
change='1,1')
client.shutdown()
self.assertEqual(r, False)
@ -3018,8 +3015,7 @@ class TestScheduler(testtools.TestCase):
r = client.enqueue(pipeline='gate',
project='org/project',
trigger='trigger-does-not-exist',
change='1',
patchset='1')
change='1,1')
client.shutdown()
self.assertEqual(r, False)
@ -3028,11 +3024,164 @@ class TestScheduler(testtools.TestCase):
r = client.enqueue(pipeline='gate',
project='org/project',
trigger='gerrit',
change='1',
patchset='1')
change='1,1')
client.shutdown()
self.assertEqual(r, False)
self.waitUntilSettled()
self.assertEqual(len(self.history), 0)
self.assertEqual(len(self.builds), 0)
def test_client_promote(self):
"Test that the RPC client can promote a change"
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
C.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
self.waitUntilSettled()
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
r = client.promote(pipeline='gate',
change_ids=['2,1', '3,1'])
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 6)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test2')
self.assertEqual(self.builds[2].name, 'project-test1')
self.assertEqual(self.builds[3].name, 'project-test2')
self.assertEqual(self.builds[4].name, 'project-test1')
self.assertEqual(self.builds[5].name, 'project-test2')
self.assertTrue(self.job_has_changes(self.builds[0], B))
self.assertFalse(self.job_has_changes(self.builds[0], A))
self.assertFalse(self.job_has_changes(self.builds[0], C))
self.assertTrue(self.job_has_changes(self.builds[2], B))
self.assertTrue(self.job_has_changes(self.builds[2], C))
self.assertFalse(self.job_has_changes(self.builds[2], A))
self.assertTrue(self.job_has_changes(self.builds[4], B))
self.assertTrue(self.job_has_changes(self.builds[4], C))
self.assertTrue(self.job_has_changes(self.builds[4], A))
self.worker.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(B.reported, 2)
self.assertEqual(C.data['status'], 'MERGED')
self.assertEqual(C.reported, 2)
client.shutdown()
self.assertEqual(r, True)
def test_client_promote_dependent(self):
"Test that the RPC client can promote a dependent change"
# C (depends on B) -> B -> A ; then promote C to get:
# A -> C (depends on B) -> B
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
C.setDependsOn(B, 1)
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
C.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.fake_gerrit.addEvent(C.addApproval('APRV', 1))
self.waitUntilSettled()
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
r = client.promote(pipeline='gate',
change_ids=['3,1'])
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 6)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test2')
self.assertEqual(self.builds[2].name, 'project-test1')
self.assertEqual(self.builds[3].name, 'project-test2')
self.assertEqual(self.builds[4].name, 'project-test1')
self.assertEqual(self.builds[5].name, 'project-test2')
self.assertTrue(self.job_has_changes(self.builds[0], B))
self.assertFalse(self.job_has_changes(self.builds[0], A))
self.assertFalse(self.job_has_changes(self.builds[0], C))
self.assertTrue(self.job_has_changes(self.builds[2], B))
self.assertTrue(self.job_has_changes(self.builds[2], C))
self.assertFalse(self.job_has_changes(self.builds[2], A))
self.assertTrue(self.job_has_changes(self.builds[4], B))
self.assertTrue(self.job_has_changes(self.builds[4], C))
self.assertTrue(self.job_has_changes(self.builds[4], A))
self.worker.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(B.reported, 2)
self.assertEqual(C.data['status'], 'MERGED')
self.assertEqual(C.reported, 2)
client.shutdown()
self.assertEqual(r, True)
def test_client_promote_negative(self):
"Test that the RPC client returns errors for promotion"
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure):
r = client.promote(pipeline='nonexistent',
change_ids=['2,1', '3,1'])
client.shutdown()
self.assertEqual(r, False)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure):
r = client.promote(pipeline='gate',
change_ids=['4,1'])
client.shutdown()
self.assertEqual(r, False)
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()

View File

@ -55,10 +55,16 @@ class Client(object):
required=True)
cmd_enqueue.add_argument('--change', help='change id',
required=True)
cmd_enqueue.add_argument('--patchset', help='patchset number',
required=True)
cmd_enqueue.set_defaults(func=self.enqueue)
cmd_promote = subparsers.add_parser('promote',
help='promote one or more changes')
cmd_promote.add_argument('--pipeline', help='pipeline name',
required=True)
cmd_promote.add_argument('--changes', help='change ids',
required=True, nargs='+')
cmd_promote.set_defaults(func=self.promote)
self.args = parser.parse_args()
def read_config(self):
@ -104,8 +110,13 @@ class Client(object):
r = client.enqueue(pipeline=self.args.pipeline,
project=self.args.project,
trigger=self.args.trigger,
change=self.args.change,
patchset=self.args.patchset)
change=self.args.change)
return r
def promote(self):
client = zuul.rpcclient.RPCClient(self.server, self.port)
r = client.promote(pipeline=self.args.pipeline,
change_ids=self.args.changes)
return r

View File

@ -48,14 +48,19 @@ class RPCClient(object):
self.log.debug("Job complete, success: %s" % (not job.failure))
return (not job.failure)
def enqueue(self, pipeline, project, trigger, change, patchset):
def enqueue(self, pipeline, project, trigger, change):
data = {'pipeline': pipeline,
'project': project,
'trigger': trigger,
'change': change,
'patchset': patchset,
}
return self.submitJob('zuul:enqueue', data)
def promote(self, pipeline, change_ids):
data = {'pipeline': pipeline,
'change_ids': change_ids,
}
return self.submitJob('zuul:promote', data)
def shutdown(self):
self.gearman.shutdown()

View File

@ -39,13 +39,15 @@ class RPCListener(object):
port = 4730
self.worker = gear.Worker('Zuul RPC Listener')
self.worker.addServer(server, port)
self.register()
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
self.worker.waitForServer()
self.register()
def register(self):
self.worker.registerFunction("zuul:enqueue")
self.worker.registerFunction("zuul:promote")
def stop(self):
self.log.debug("Stopping")
@ -57,10 +59,12 @@ class RPCListener(object):
self.thread.join()
def run(self):
self.log.debug("Starting RPC listener")
while self._running:
try:
job = self.worker.getJob()
z, jobname = job.name.split(':')
self.log.debug("Received job %s" % job.name)
attrname = 'handle_' + jobname
if hasattr(self, attrname):
f = getattr(self, attrname)
@ -86,31 +90,36 @@ class RPCListener(object):
if trigger:
event.trigger_name = args['trigger']
else:
errors += 'Invalid trigger: %s\n' % args['trigger']
errors += 'Invalid trigger: %s\n' % (args['trigger'],)
project = self.sched.layout.projects.get(args['project'])
if project:
event.project_name = args['project']
else:
errors += 'Invalid project: %s\n' % args['project']
errors += 'Invalid project: %s\n' % (args['project'],)
pipeline = self.sched.layout.pipelines.get(args['pipeline'])
if pipeline:
event.forced_pipeline = args['pipeline']
else:
errors += 'Invalid pipeline: %s\n' % args['pipeline']
errors += 'Invalid pipeline: %s\n' % (args['pipeline'],)
if not errors:
event.change_number = args['change']
event.patch_number = args['patchset']
event.change_number, event.patch_number = args['change'].split(',')
try:
event.getChange(project, trigger)
except Exception:
errors += 'Invalid change: %s,%s\n' % (
args['change'], args['patchset'])
errors += 'Invalid change: %s\n' % (args['change'],)
if errors:
job.sendWorkException(errors.encode('utf8'))
else:
self.sched.addEvent(event)
job.sendWorkComplete()
def handle_promote(self, job):
args = json.loads(job.arguments)
pipeline_name = args['pipeline']
change_ids = args['change_ids']
self.sched.promote(pipeline_name, change_ids)
job.sendWorkComplete()

View File

@ -22,6 +22,7 @@ import os
import pickle
import Queue
import re
import sys
import threading
import time
import yaml
@ -64,12 +65,21 @@ class ManagementEvent(object):
"""An event that should be processed within the main queue run loop"""
def __init__(self):
self._wait_event = threading.Event()
self._exception = None
self._traceback = None
def setComplete(self):
def exception(self, e, tb):
self._exception = e
self._traceback = tb
self._wait_event.set()
def done(self):
self._wait_event.set()
def wait(self, timeout=None):
self._wait_event.wait(timeout)
if self._exception:
raise self._exception, None, self._traceback
return self._wait_event.is_set()
@ -84,6 +94,20 @@ class ReconfigureEvent(ManagementEvent):
self.config = config
class PromoteEvent(ManagementEvent):
"""Promote one or more changes to the head of the queue.
:arg str pipeline_name: the name of the pipeline
:arg list change_ids: a list of strings of change ids in the form
1234,1
"""
def __init__(self, pipeline_name, change_ids):
super(PromoteEvent, self).__init__()
self.pipeline_name = pipeline_name
self.change_ids = change_ids
class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler")
@ -396,6 +420,14 @@ class Scheduler(threading.Thread):
event.wait()
self.log.debug("Reconfiguration complete")
def promote(self, pipeline_name, change_ids):
event = PromoteEvent(pipeline_name, change_ids)
self.management_event_queue.put(event)
self.wake_event.set()
self.log.debug("Waiting for promotion")
event.wait()
self.log.debug("Promotion complete")
def exit(self):
self.log.debug("Prepare to exit")
self._pause = True
@ -520,6 +552,43 @@ class Scheduler(threading.Thread):
finally:
self.layout_lock.release()
def _doPromoteEvent(self, event):
pipeline = self.layout.pipelines[event.pipeline_name]
change_ids = [c.split(',') for c in event.change_ids]
items_to_enqueue = []
change_queue = None
for shared_queue in pipeline.queues:
if change_queue:
break
for item in shared_queue.queue:
if (item.change.number == change_ids[0][0] and
item.change.patchset == change_ids[0][1]):
change_queue = shared_queue
break
if not change_queue:
raise Exception("Unable to find shared change queue for %s" %
event.change_ids[0])
for number, patchset in change_ids:
found = False
for item in change_queue.queue:
if (item.change.number == number and
item.change.patchset == patchset):
found = True
items_to_enqueue.append(item)
break
if not found:
raise Exception("Unable to find %s,%s in queue %s" %
(number, patchset, change_queue))
for item in change_queue.queue[:]:
if item not in items_to_enqueue:
items_to_enqueue.append(item)
pipeline.manager.cancelJobs(item)
pipeline.manager.dequeueItem(item)
for item in items_to_enqueue:
pipeline.manager.addChange(item.change, quiet=True)
while pipeline.manager.processQueue():
pass
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
waiting = False
@ -625,9 +694,16 @@ class Scheduler(threading.Thread):
self.log.debug("Fetching management event")
event = self.management_event_queue.get()
self.log.debug("Processing management event %s" % event)
try:
if isinstance(event, ReconfigureEvent):
self._doReconfigureEvent(event)
event.setComplete()
elif isinstance(event, PromoteEvent):
self._doPromoteEvent(event)
else:
self.log.error("Unable to handle event %s" % event)
event.done()
except Exception as e:
event.exception(e, sys.exc_info()[2])
self.management_event_queue.task_done()
def process_result_queue(self):
@ -815,10 +891,10 @@ class BasePipelineManager(object):
def isChangeReadyToBeEnqueued(self, change):
return True
def enqueueChangesAhead(self, change):
def enqueueChangesAhead(self, change, quiet):
return True
def enqueueChangesBehind(self, change):
def enqueueChangesBehind(self, change, quiet):
return True
def checkForChangesNeededBy(self, change):
@ -872,7 +948,7 @@ class BasePipelineManager(object):
item.change.project)
return False
def addChange(self, change):
def addChange(self, change, quiet=False):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
@ -883,7 +959,7 @@ class BasePipelineManager(object):
change)
return False
if not self.enqueueChangesAhead(change):
if not self.enqueueChangesAhead(change, quiet):
self.log.debug("Failed to enqueue changes ahead of %s" % change)
return False
@ -895,11 +971,12 @@ class BasePipelineManager(object):
if change_queue:
self.log.debug("Adding change %s to queue %s" %
(change, change_queue))
if not quiet:
if len(self.pipeline.start_actions) > 0:
self.reportStart(change)
item = change_queue.enqueueChange(change)
self.reportStats(item)
self.enqueueChangesBehind(change)
self.enqueueChangesBehind(change, quiet)
else:
self.log.error("Unable to find change queue for project %s" %
change.project)
@ -970,7 +1047,7 @@ class BasePipelineManager(object):
self.log.debug("Cancel jobs for change %s" % item.change)
canceled = False
to_remove = []
if prime and item.current_build_set.builds:
if prime and item.current_build_set.ref:
item.resetAllBuilds()
for build, build_item in self.building_jobs.items():
if build_item == item:
@ -1440,7 +1517,7 @@ class DependentPipelineManager(BasePipelineManager):
return False
return True
def enqueueChangesBehind(self, change):
def enqueueChangesBehind(self, change, quiet):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
@ -1456,15 +1533,15 @@ class DependentPipelineManager(BasePipelineManager):
self.log.debug(" No changes need %s" % change)
for other_change in to_enqueue:
self.addChange(other_change)
self.addChange(other_change, quiet)
def enqueueChangesAhead(self, change):
def enqueueChangesAhead(self, change, quiet):
ret = self.checkForChangesNeededBy(change)
if ret in [True, False]:
return ret
self.log.debug(" Change %s must be merged ahead of %s" %
(ret, change))
return self.addChange(ret)
return self.addChange(ret, quiet)
def checkForChangesNeededBy(self, change):
self.log.debug("Checking for changes needed by %s:" % change)