Add a dequeue command to zuul client

Add the ability for an operator to dequeue a change from a pipeline.

Change-Id: I4524291807c8b97b62cfaa31fb5d46dc48adbac9
Signed-off-by: Paul Belanger <pabelanger@redhat.com>
This commit is contained in:
Matthieu Huin 2018-07-11 07:13:26 -07:00 committed by Tobias Henkel
parent 03219737de
commit c2c5ce26bf
7 changed files with 237 additions and 0 deletions

View File

@ -30,6 +30,15 @@ Example::
zuul autohold --tenant openstack --project example_project --job example_job --reason "reason text" --count 1
Dequeue
^^^^^^^
.. program-output:: zuul dequeue --help
Examples::
zuul dequeue --tenant openstack --pipeline check --project example_project --change 5,1
zuul dequeue --tenant openstack --pipeline periodic --project example_project --ref refs/heads/master
Enqueue
^^^^^^^
.. program-output:: zuul enqueue --help

View File

@ -0,0 +1,5 @@
---
features:
- |
The `dequeue` command has been added to the Zuul CLI.
It allows operators to stop a given buildset at will.

View File

@ -3709,6 +3709,119 @@ class TestScheduler(ZuulTestCase):
self.assertIn('project-post', job_names)
self.assertEqual(r, True)
def test_client_dequeue_dependent_change(self):
"Test that the RPC client can dequeue a change"
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
self.addCleanup(client.shutdown)
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
C.setDependsOn(B, 1)
B.setDependsOn(A, 1)
A.addApproval('Code-Review', 2)
B.addApproval('Code-Review', 2)
C.addApproval('Code-Review', 2)
# Promote to 'gate' pipeline
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
self.fake_gerrit.addEvent(C.addApproval('Approved', 1))
self.waitUntilSettled()
client.dequeue(
tenant='tenant-one',
pipeline='gate',
project='org/project',
change='1,1',
ref=None)
self.waitUntilSettled()
tenant = self.sched.abide.tenants.get('tenant-one')
gate_pipeline = tenant.layout.pipelines['gate']
self.assertEqual(gate_pipeline.getAllItems(), [])
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 1)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
def test_client_dequeue_independent_change(self):
"Test that the RPC client can dequeue a change"
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
self.addCleanup(client.shutdown)
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
A.addApproval('Code-Review', 2)
B.addApproval('Code-Review', 2)
C.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
client.dequeue(
tenant='tenant-one',
pipeline='check',
project='org/project',
change='1,1',
ref=None)
self.waitUntilSettled()
tenant = self.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
self.assertEqual(len(check_pipeline.getAllItems()), 2)
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 1)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
def test_client_dequeue_change_by_ref(self):
"Test that the RPC client can dequeue a change by ref"
# Test this on the periodic pipeline, where it makes most sense to
# use ref
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
self.addCleanup(client.shutdown)
self.create_branch('org/project', 'stable')
self.executor_server.hold_jobs_in_build = True
self.commitConfigUpdate('common-config', 'layouts/timer.yaml')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
time.sleep(5)
client.dequeue(
tenant='tenant-one',
pipeline='periodic',
project='org/project',
change=None,
ref='refs/heads/stable')
self.waitUntilSettled()
self.commitConfigUpdate('common-config',
'layouts/no-timer.yaml')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 1)
def test_client_enqueue_negative(self):
"Test that the RPC client returns errors"
client = zuul.rpcclient.RPCClient('127.0.0.1',

View File

@ -111,6 +111,21 @@ class Client(zuul.cmd.ZuulApp):
'--newrev', help='new revision', default=None)
cmd_enqueue.set_defaults(func=self.enqueue_ref)
cmd_dequeue = subparsers.add_parser('dequeue',
help='dequeue a buildset by its '
'change or ref')
cmd_dequeue.add_argument('--tenant', help='tenant name',
required=True)
cmd_dequeue.add_argument('--pipeline', help='pipeline name',
required=True)
cmd_dequeue.add_argument('--project', help='project name',
required=True)
cmd_dequeue.add_argument('--change', help='change id',
default=None)
cmd_dequeue.add_argument('--ref', help='ref name',
default=None)
cmd_dequeue.set_defaults(func=self.dequeue)
cmd_promote = subparsers.add_parser('promote',
help='promote one or more changes')
cmd_promote.add_argument('--tenant', help='tenant name',
@ -164,6 +179,12 @@ class Client(zuul.cmd.ZuulApp):
self.args.oldrev = '0000000000000000000000000000000000000000'
if self.args.newrev is None:
self.args.newrev = '0000000000000000000000000000000000000000'
if self.args.func == self.dequeue:
if self.args.change is None and self.args.ref is None:
parser.error("Change or ref needed.")
if self.args.change is not None and self.args.ref is not None:
parser.error(
"The 'change' and 'ref' arguments are mutually exclusive.")
def setup_logging(self):
"""Client logging does not rely on conf file"""
@ -255,6 +276,16 @@ class Client(zuul.cmd.ZuulApp):
newrev=self.args.newrev)
return r
def dequeue(self):
client = zuul.rpcclient.RPCClient(
self.server, self.port, self.ssl_key, self.ssl_cert, self.ssl_ca)
r = client.dequeue(tenant=self.args.tenant,
pipeline=self.args.pipeline,
project=self.args.project,
change=self.args.change,
ref=self.args.ref)
return r
def promote(self):
client = zuul.rpcclient.RPCClient(
self.server, self.port, self.ssl_key, self.ssl_cert, self.ssl_ca)

View File

@ -89,6 +89,15 @@ class RPCClient(object):
}
return not self.submitJob('zuul:enqueue_ref', data).failure
def dequeue(self, tenant, pipeline, project, change, ref):
data = {'tenant': tenant,
'pipeline': pipeline,
'project': project,
'change': change,
'ref': ref,
}
return not self.submitJob('zuul:dequeue', data).failure
def promote(self, tenant, pipeline, change_ids):
data = {'tenant': tenant,
'pipeline': pipeline,

View File

@ -52,6 +52,7 @@ class RPCListener(object):
def register(self):
self.worker.registerFunction("zuul:autohold")
self.worker.registerFunction("zuul:autohold_list")
self.worker.registerFunction("zuul:dequeue")
self.worker.registerFunction("zuul:enqueue")
self.worker.registerFunction("zuul:enqueue_ref")
self.worker.registerFunction("zuul:promote")
@ -121,6 +122,21 @@ class RPCListener(object):
except Exception:
self.log.exception("Exception while getting job")
def handle_dequeue(self, job):
args = json.loads(job.arguments)
tenant_name = args['tenant']
pipeline_name = args['pipeline']
project_name = args['project']
change = args['change']
ref = args['ref']
try:
self.sched.dequeue(
tenant_name, pipeline_name, project_name, change, ref)
except Exception as e:
job.sendWorkException(str(e).encode('utf8'))
return
job.sendWorkComplete()
def handle_autohold_list(self, job):
req = {}

View File

@ -119,6 +119,32 @@ class PromoteEvent(ManagementEvent):
self.change_ids = change_ids
class DequeueEvent(ManagementEvent):
"""Dequeue a change from a pipeline
:arg str tenant_name: the name of the tenant
:arg str pipeline_name: the name of the pipeline
:arg str project_name: the name of the project
:arg str change: optional, the change to dequeue
:arg str ref: optional, the ref to look for
"""
def __init__(self, tenant_name, pipeline_name, project_name, change, ref):
super(DequeueEvent, self).__init__()
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.project_name = project_name
self.change = change
if change is not None:
self.change_number, self.patch_number = change.split(',')
else:
self.change_number, self.patch_number = (None, None)
self.ref = ref
# set to mock values
self.oldrev = '0000000000000000000000000000000000000000'
self.newrev = '0000000000000000000000000000000000000000'
class EnqueueEvent(ManagementEvent):
"""Enqueue a change into a pipeline
@ -465,6 +491,15 @@ class Scheduler(threading.Thread):
event.wait()
self.log.debug("Promotion complete")
def dequeue(self, tenant_name, pipeline_name, project_name, change, ref):
event = DequeueEvent(
tenant_name, pipeline_name, project_name, change, ref)
self.management_event_queue.put(event)
self.wake_event.set()
self.log.debug("Waiting for dequeue")
event.wait()
self.log.debug("Dequeue complete")
def enqueue(self, trigger_event):
event = EnqueueEvent(trigger_event)
self.management_event_queue.put(event)
@ -830,6 +865,23 @@ class Scheduler(threading.Thread):
quiet=True,
ignore_requirements=True)
def _doDequeueEvent(self, event):
tenant = self.abide.tenants.get(event.tenant_name)
pipeline = tenant.layout.pipelines[event.pipeline_name]
(trusted, project) = tenant.getProject(event.project_name)
change = project.source.getChange(event, project)
for shared_queue in pipeline.queues:
for item in shared_queue.queue:
if (isinstance(item.change, model.Change) and
item.change.number == change.number and
item.change.patchset == change.patchset) or\
(item.change.ref == change.ref):
pipeline.manager.removeItem(item)
return
raise Exception("Unable to find shared change queue for %s:%s" %
(event.project_name,
event.change or event.ref))
def _doEnqueueEvent(self, event):
tenant = self.abide.tenants.get(event.tenant_name)
full_project_name = ('/'.join([event.project_hostname,
@ -987,6 +1039,8 @@ class Scheduler(threading.Thread):
self._doTenantReconfigureEvent(event)
elif isinstance(event, PromoteEvent):
self._doPromoteEvent(event)
elif isinstance(event, DequeueEvent):
self._doDequeueEvent(event)
elif isinstance(event, EnqueueEvent):
self._doEnqueueEvent(event.trigger_event)
else: