Add tenant support to zuul client

Here we are adding tenant support and re-enabling unit tests for
enqueue and promote.

Change-Id: I384128b9b14be1dc3c4a0c914dcaf13d30f1792f
Signed-off-by: Paul Belanger <pabelanger@redhat.com>
This commit is contained in:
Paul Belanger 2016-11-04 12:49:54 -04:00
parent c914f67554
commit baca313dd5
8 changed files with 131 additions and 66 deletions

View File

@ -28,7 +28,7 @@ Enqueue
Example::
zuul enqueue --trigger gerrit --pipeline check --project example_project --change 12345,1
zuul enqueue --tenant openstack --trigger gerrit --pipeline check --project example_project --change 12345,1
Note that the format of change id is <number>,<patchset>.
@ -38,7 +38,7 @@ Promote
Example::
zuul promote --pipeline check --changes 12345,1 13336,3
zuul promote --tenant openstack --pipeline check --changes 12345,1 13336,3
Note that the format of changes id is <number>,<patchset>.

View File

@ -36,6 +36,16 @@
verified: 0
precedence: high
- pipeline:
name: post
manager: independent
source:
gerrit
trigger:
gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
- pipeline:
name: experimental
manager: independent
@ -66,6 +76,12 @@
- name: controller
image: image2
- job:
name: project-post
nodes:
- name: static
image: ubuntu-xenial
- job:
name: project-test2
@ -86,6 +102,9 @@
jobs:
- project-test1
- project-test2
post:
jobs:
- project-post
- project:
name: org/project1

View File

@ -3084,7 +3084,6 @@ jobs:
self.launch_server.release('.*')
self.waitUntilSettled()
@skip("Disabled for early v3 development")
def test_client_enqueue_change(self):
"Test that the RPC client can enqueue a change"
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@ -3093,7 +3092,8 @@ jobs:
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
r = client.enqueue(pipeline='gate',
r = client.enqueue(tenant='tenant-one',
pipeline='gate',
project='org/project',
trigger='gerrit',
change='1,1')
@ -3115,6 +3115,7 @@ jobs:
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
r = client.enqueue_ref(
tenant='tenant-one',
pipeline='post',
project='org/project',
trigger='gerrit',
@ -3127,14 +3128,24 @@ jobs:
self.assertIn('project-post', job_names)
self.assertEqual(r, True)
@skip("Disabled for early v3 development")
def test_client_enqueue_negative(self):
"Test that the RPC client returns errors"
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid tenant"):
r = client.enqueue(tenant='tenant-foo',
pipeline='gate',
project='org/project',
trigger='gerrit',
change='1,1')
client.shutdown()
self.assertEqual(r, False)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid project"):
r = client.enqueue(pipeline='gate',
r = client.enqueue(tenant='tenant-one',
pipeline='gate',
project='project-does-not-exist',
trigger='gerrit',
change='1,1')
@ -3143,7 +3154,8 @@ jobs:
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid pipeline"):
r = client.enqueue(pipeline='pipeline-does-not-exist',
r = client.enqueue(tenant='tenant-one',
pipeline='pipeline-does-not-exist',
project='org/project',
trigger='gerrit',
change='1,1')
@ -3152,7 +3164,8 @@ jobs:
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid trigger"):
r = client.enqueue(pipeline='gate',
r = client.enqueue(tenant='tenant-one',
pipeline='gate',
project='org/project',
trigger='trigger-does-not-exist',
change='1,1')
@ -3161,7 +3174,8 @@ jobs:
with testtools.ExpectedException(zuul.rpcclient.RPCFailure,
"Invalid change"):
r = client.enqueue(pipeline='gate',
r = client.enqueue(tenant='tenant-one',
pipeline='gate',
project='org/project',
trigger='gerrit',
change='1,1')
@ -3172,7 +3186,6 @@ jobs:
self.assertEqual(len(self.history), 0)
self.assertEqual(len(self.builds), 0)
@skip("Disabled for early v3 development")
def test_client_promote(self):
"Test that the RPC client can promote a change"
self.launch_server.hold_jobs_in_build = True
@ -3189,18 +3202,20 @@ jobs:
self.waitUntilSettled()
items = self.sched.layout.pipelines['gate'].getAllItems()
tenant = self.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
enqueue_times = {}
for item in items:
enqueue_times[str(item.change)] = item.enqueue_time
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
r = client.promote(pipeline='gate',
r = client.promote(tenant='tenant-one',
pipeline='gate',
change_ids=['2,1', '3,1'])
# ensure that enqueue times are durable
items = self.sched.layout.pipelines['gate'].getAllItems()
items = tenant.layout.pipelines['gate'].getAllItems()
for item in items:
self.assertEqual(
enqueue_times[str(item.change)], item.enqueue_time)
@ -3221,17 +3236,17 @@ jobs:
self.assertEqual(self.builds[4].name, 'project-test1')
self.assertEqual(self.builds[5].name, 'project-test2')
self.assertTrue(self.job_has_changes(self.builds[0], B))
self.assertFalse(self.job_has_changes(self.builds[0], A))
self.assertFalse(self.job_has_changes(self.builds[0], C))
self.assertTrue(self.builds[0].hasChanges(B))
self.assertFalse(self.builds[0].hasChanges(A))
self.assertFalse(self.builds[0].hasChanges(C))
self.assertTrue(self.job_has_changes(self.builds[2], B))
self.assertTrue(self.job_has_changes(self.builds[2], C))
self.assertFalse(self.job_has_changes(self.builds[2], A))
self.assertTrue(self.builds[2].hasChanges(B))
self.assertTrue(self.builds[2].hasChanges(C))
self.assertFalse(self.builds[2].hasChanges(A))
self.assertTrue(self.job_has_changes(self.builds[4], B))
self.assertTrue(self.job_has_changes(self.builds[4], C))
self.assertTrue(self.job_has_changes(self.builds[4], A))
self.assertTrue(self.builds[4].hasChanges(B))
self.assertTrue(self.builds[4].hasChanges(C))
self.assertTrue(self.builds[4].hasChanges(A))
self.launch_server.release()
self.waitUntilSettled()
@ -3246,7 +3261,6 @@ jobs:
client.shutdown()
self.assertEqual(r, True)
@skip("Disabled for early v3 development")
def test_client_promote_dependent(self):
"Test that the RPC client can promote a dependent change"
# C (depends on B) -> B -> A ; then promote C to get:
@ -3270,7 +3284,8 @@ jobs:
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
r = client.promote(pipeline='gate',
r = client.promote(tenant='tenant-one',
pipeline='gate',
change_ids=['3,1'])
self.waitUntilSettled()
@ -3289,17 +3304,17 @@ jobs:
self.assertEqual(self.builds[4].name, 'project-test1')
self.assertEqual(self.builds[5].name, 'project-test2')
self.assertTrue(self.job_has_changes(self.builds[0], B))
self.assertFalse(self.job_has_changes(self.builds[0], A))
self.assertFalse(self.job_has_changes(self.builds[0], C))
self.assertTrue(self.builds[0].hasChanges(B))
self.assertFalse(self.builds[0].hasChanges(A))
self.assertFalse(self.builds[0].hasChanges(C))
self.assertTrue(self.job_has_changes(self.builds[2], B))
self.assertTrue(self.job_has_changes(self.builds[2], C))
self.assertFalse(self.job_has_changes(self.builds[2], A))
self.assertTrue(self.builds[2].hasChanges(B))
self.assertTrue(self.builds[2].hasChanges(C))
self.assertFalse(self.builds[2].hasChanges(A))
self.assertTrue(self.job_has_changes(self.builds[4], B))
self.assertTrue(self.job_has_changes(self.builds[4], C))
self.assertTrue(self.job_has_changes(self.builds[4], A))
self.assertTrue(self.builds[4].hasChanges(B))
self.assertTrue(self.builds[4].hasChanges(C))
self.assertTrue(self.builds[4].hasChanges(A))
self.launch_server.release()
self.waitUntilSettled()
@ -3314,7 +3329,6 @@ jobs:
client.shutdown()
self.assertEqual(r, True)
@skip("Disabled for early v3 development")
def test_client_promote_negative(self):
"Test that the RPC client returns errors for promotion"
self.launch_server.hold_jobs_in_build = True
@ -3327,13 +3341,15 @@ jobs:
self.gearman_server.port)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure):
r = client.promote(pipeline='nonexistent',
r = client.promote(tenant='tenant-one',
pipeline='nonexistent',
change_ids=['2,1', '3,1'])
client.shutdown()
self.assertEqual(r, False)
with testtools.ExpectedException(zuul.rpcclient.RPCFailure):
r = client.promote(pipeline='gate',
r = client.promote(tenant='tenant-one',
pipeline='gate',
change_ids=['4,1'])
client.shutdown()
self.assertEqual(r, False)

View File

@ -46,6 +46,8 @@ class Client(zuul.cmd.ZuulApp):
help='additional help')
cmd_enqueue = subparsers.add_parser('enqueue', help='enqueue a change')
cmd_enqueue.add_argument('--tenant', help='tenant name',
required=True)
cmd_enqueue.add_argument('--trigger', help='trigger name',
required=True)
cmd_enqueue.add_argument('--pipeline', help='pipeline name',
@ -58,6 +60,8 @@ class Client(zuul.cmd.ZuulApp):
cmd_enqueue = subparsers.add_parser('enqueue-ref',
help='enqueue a ref')
cmd_enqueue.add_argument('--tenant', help='tenant name',
required=True)
cmd_enqueue.add_argument('--trigger', help='trigger name',
required=True)
cmd_enqueue.add_argument('--pipeline', help='pipeline name',
@ -76,6 +80,8 @@ class Client(zuul.cmd.ZuulApp):
cmd_promote = subparsers.add_parser('promote',
help='promote one or more changes')
cmd_promote.add_argument('--tenant', help='tenant name',
required=True)
cmd_promote.add_argument('--pipeline', help='pipeline name',
required=True)
cmd_promote.add_argument('--changes', help='change ids',
@ -127,7 +133,8 @@ class Client(zuul.cmd.ZuulApp):
def enqueue(self):
client = zuul.rpcclient.RPCClient(self.server, self.port)
r = client.enqueue(pipeline=self.args.pipeline,
r = client.enqueue(tenant=self.args.tenant,
pipeline=self.args.pipeline,
project=self.args.project,
trigger=self.args.trigger,
change=self.args.change)
@ -135,7 +142,8 @@ class Client(zuul.cmd.ZuulApp):
def enqueue_ref(self):
client = zuul.rpcclient.RPCClient(self.server, self.port)
r = client.enqueue_ref(pipeline=self.args.pipeline,
r = client.enqueue_ref(tenant=self.args.tenant,
pipeline=self.args.pipeline,
project=self.args.project,
trigger=self.args.trigger,
ref=self.args.ref,
@ -145,7 +153,8 @@ class Client(zuul.cmd.ZuulApp):
def promote(self):
client = zuul.rpcclient.RPCClient(self.server, self.port)
r = client.promote(pipeline=self.args.pipeline,
r = client.promote(tenant=self.args.tenant,
pipeline=self.args.pipeline,
change_ids=self.args.changes)
return r

View File

@ -1231,6 +1231,8 @@ class TriggerEvent(object):
self.data = None
# common
self.type = None
# For management events (eg: enqueue / promote)
self.tenant_name = None
self.project_name = None
self.trigger_name = None
# Representation of the user account that performed the event.

View File

@ -48,16 +48,19 @@ class RPCClient(object):
self.log.debug("Job complete, success: %s" % (not job.failure))
return job
def enqueue(self, pipeline, project, trigger, change):
data = {'pipeline': pipeline,
def enqueue(self, tenant, pipeline, project, trigger, change):
data = {'tenant': tenant,
'pipeline': pipeline,
'project': project,
'trigger': trigger,
'change': change,
}
return not self.submitJob('zuul:enqueue', data).failure
def enqueue_ref(self, pipeline, project, trigger, ref, oldrev, newrev):
data = {'pipeline': pipeline,
def enqueue_ref(
self, tenant, pipeline, project, trigger, ref, oldrev, newrev):
data = {'tenant': tenant,
'pipeline': pipeline,
'project': project,
'trigger': trigger,
'ref': ref,
@ -66,8 +69,9 @@ class RPCClient(object):
}
return not self.submitJob('zuul:enqueue_ref', data).failure
def promote(self, pipeline, change_ids):
data = {'pipeline': pipeline,
def promote(self, tenant, pipeline, change_ids):
data = {'tenant': tenant,
'pipeline': pipeline,
'change_ids': change_ids,
}
return not self.submitJob('zuul:promote', data).failure

View File

@ -88,24 +88,34 @@ class RPCListener(object):
args = json.loads(job.arguments)
event = model.TriggerEvent()
errors = ''
tenant = None
project = None
pipeline = None
trigger = self.sched.triggers.get(args['trigger'])
if trigger:
event.trigger_name = args['trigger']
else:
errors += 'Invalid trigger: %s\n' % (args['trigger'],)
tenant = self.sched.abide.tenants.get(args['tenant'])
if tenant:
event.tenant_name = args['tenant']
project = self.sched.layout.projects.get(args['project'])
if project:
event.project_name = args['project']
else:
errors += 'Invalid project: %s\n' % (args['project'],)
project = tenant.layout.project_configs.get(args['project'])
if project:
event.project_name = args['project']
else:
errors += 'Invalid project: %s\n' % (args['project'],)
pipeline = self.sched.layout.pipelines.get(args['pipeline'])
if pipeline:
event.forced_pipeline = args['pipeline']
pipeline = tenant.layout.pipelines.get(args['pipeline'])
if pipeline:
event.forced_pipeline = args['pipeline']
for trigger in pipeline.triggers:
if trigger.name == args['trigger']:
event.trigger_name = args['trigger']
continue
if not event.trigger_name:
errors += 'Invalid trigger: %s\n' % (args['trigger'],)
else:
errors += 'Invalid pipeline: %s\n' % (args['pipeline'],)
else:
errors += 'Invalid pipeline: %s\n' % (args['pipeline'],)
errors += 'Invalid tenant: %s\n' % (args['tenant'],)
return (args, event, errors, pipeline, project)
@ -141,9 +151,10 @@ class RPCListener(object):
def handle_promote(self, job):
args = json.loads(job.arguments)
tenant_name = args['tenant']
pipeline_name = args['pipeline']
change_ids = args['change_ids']
self.sched.promote(pipeline_name, change_ids)
self.sched.promote(tenant_name, pipeline_name, change_ids)
job.sendWorkComplete()
def handle_get_running_jobs(self, job):

View File

@ -128,13 +128,15 @@ class ReconfigureEvent(ManagementEvent):
class PromoteEvent(ManagementEvent):
"""Promote one or more changes to the head of the queue.
:arg str tenant_name: the name of the tenant
:arg str pipeline_name: the name of the pipeline
:arg list change_ids: a list of strings of change ids in the form
1234,1
"""
def __init__(self, pipeline_name, change_ids):
def __init__(self, tenant_name, pipeline_name, change_ids):
super(PromoteEvent, self).__init__()
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.change_ids = change_ids
@ -370,8 +372,8 @@ class Scheduler(threading.Thread):
self.log.debug("Reconfiguration complete")
self.last_reconfigured = int(time.time())
def promote(self, pipeline_name, change_ids):
event = PromoteEvent(pipeline_name, change_ids)
def promote(self, tenant_name, pipeline_name, change_ids):
event = PromoteEvent(tenant_name, pipeline_name, change_ids)
self.management_event_queue.put(event)
self.wake_event.set()
self.log.debug("Waiting for promotion")
@ -547,7 +549,8 @@ class Scheduler(threading.Thread):
"pipeline stats:")
def _doPromoteEvent(self, event):
pipeline = self.layout.pipelines[event.pipeline_name]
tenant = self.abide.tenants.get(event.tenant_name)
pipeline = tenant.layout.pipelines[event.pipeline_name]
change_ids = [c.split(',') for c in event.change_ids]
items_to_enqueue = []
change_queue = None
@ -586,8 +589,9 @@ class Scheduler(threading.Thread):
ignore_requirements=True)
def _doEnqueueEvent(self, event):
project = self.layout.projects.get(event.project_name)
pipeline = self.layout.pipelines[event.forced_pipeline]
tenant = self.abide.tenants.get(event.tenant_name)
project = tenant.layout.project_configs.get(event.project_name)
pipeline = tenant.layout.pipelines[event.forced_pipeline]
change = pipeline.source.getChange(event, project)
self.log.debug("Event %s for change %s was directly assigned "
"to pipeline %s" % (event, change, self))