Make promote work for any pipeline manager

This alters the behavior of the promote event handler so that it does
something useful in independent pipelines as well as dependent.  It
not only re-orders changes within a pipeline's shared queue (the old
behavior), but it also re-orders the shared queue within the pipeline.

When used in an independent pipeline, this will give the item an
advantage when requesting nodes or semaphores, or starting jobs.

This behavior applies to dependent pipelines as well -- the behavior is
the same for every pipeline.

Restarting jobs for changes in independent pipelines would be
counter-productive (and even in dependent pipelines we may have restarted
more jobs than necessary if the change at the head wasn't being altered),
so it has been altered to only dequeue/enqueue items when necessary to
achieve the requested order.

The event argument to addChange within the promote method has been
changed from the promote event to the original item enqueue event.
Methods within the pipeline manager assume that event type is a
TriggerEvent rather than a ManagementEvent and could throw some (non-fatal)
errors when reporting.

Change-Id: Ib4ab855cff27bf8e96aa852333fb4ace656235b4
This commit is contained in:
James E. Blair 2022-03-09 14:28:24 -08:00
parent c41139d904
commit bc1618ed5b
6 changed files with 262 additions and 55 deletions

View File

@ -146,18 +146,24 @@ Example::
Note that the format of changes id is <number>,<patchset>.
The promote action is used to reorder the change queue in a pipeline, by putting
the provided changes at the top of the queue; therefore this action makes the
most sense when performed against a dependent pipeline.
The promote action is used to reorder the changes in a pipeline, by
putting the provided changes at the top of the queue.
The most common use case for the promote action is the need to merge an urgent
fix when the gate pipeline has already several patches queued ahead. This is
especially needed if there is concern that one or more changes ahead in the queue
may fail, thus increasing the time to land for the fix; or concern that the fix
may not pass validation if applied on top of the current patch queue in the gate.
The most common use case for the promote action is the need to merge
an urgent fix when the gate pipeline has several patches queued
ahead. This is especially needed if there is concern that one or more
changes ahead in the queue may fail, thus increasing the time to land
for the fix; or concern that the fix may not pass validation if
applied on top of the current patch queue in the gate.
If the queue of a dependent pipeline is targeted by the promote, all the ongoing
jobs in that queue will be canceled and restarted on top of the promoted changes.
Any items in a dependent pipeline which have had items ahead of them
changed will have their jobs canceled and restarted based on the new
ordering.
If items in independent pipelines are promoted, no jobs will be
restarted, but their change queues within the pipeline will be
re-ordered so that they will be processed first and their node request
priorities will increase.
tenant-conf-check
^^^^^^^^^^^^^^^^^

View File

@ -0,0 +1,11 @@
---
features:
- |
The promote administrative action now functions with all pipeline
managers. Previously it would only have an impact on dependent
pipelines, but it will now re-order change queues as well as
changes within any type of pipeline.
upgrade:
- |
The promote administrative action will no longer restart jobs for
changes which have not been re-ordered within their change queue.

View File

@ -2205,6 +2205,172 @@ class TestTenantScopedWebApi(BaseTestWeb):
self.assertEqual(B.reported, 2)
self.assertEqual(C.data['status'], 'MERGED')
self.assertEqual(C.reported, 2)
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 1)
self.assertEqual(len(self.history), 10)
def test_promote_no_change(self):
"""Test that jobs are not unecessarily restarted when promoting"""
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
A.addApproval('Code-Review', 2)
B.addApproval('Code-Review', 2)
C.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
self.fake_gerrit.addEvent(C.addApproval('Approved', 1))
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
enqueue_times = {}
for item in items:
enqueue_times[str(item.change)] = item.enqueue_time
# REST API
args = {'pipeline': 'gate',
'changes': ['1,1', '2,1', '3,1']}
authz = {'iss': 'zuul_operator',
'aud': 'zuul.example.com',
'sub': 'testuser',
'zuul': {
'admin': ['tenant-one', ],
},
'exp': time.time() + 3600,
'iat': time.time()}
token = jwt.encode(authz, key='NoDanaOnlyZuul',
algorithm='HS256')
req = self.post_url(
'api/tenant/tenant-one/promote',
headers={'Authorization': 'Bearer %s' % token},
json=args)
self.assertEqual(200, req.status_code, req.text)
data = req.json()
self.assertEqual(True, data)
# ensure that enqueue times are durable
items = tenant.layout.pipelines['gate'].getAllItems()
for item in items:
self.assertEqual(
enqueue_times[str(item.change)], item.enqueue_time)
self.waitUntilSettled()
self.executor_server.release('.*-merge')
self.waitUntilSettled()
self.executor_server.release('.*-merge')
self.waitUntilSettled()
self.executor_server.release('.*-merge')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 6)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test2')
self.assertEqual(self.builds[2].name, 'project-test1')
self.assertEqual(self.builds[3].name, 'project-test2')
self.assertEqual(self.builds[4].name, 'project-test1')
self.assertEqual(self.builds[5].name, 'project-test2')
self.assertTrue(self.builds[0].hasChanges(A))
self.assertFalse(self.builds[0].hasChanges(B))
self.assertFalse(self.builds[0].hasChanges(C))
self.assertTrue(self.builds[2].hasChanges(A))
self.assertTrue(self.builds[2].hasChanges(B))
self.assertFalse(self.builds[2].hasChanges(C))
self.assertTrue(self.builds[4].hasChanges(A))
self.assertTrue(self.builds[4].hasChanges(B))
self.assertTrue(self.builds[4].hasChanges(C))
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
self.assertEqual(B.data['status'], 'MERGED')
self.assertEqual(B.reported, 2)
self.assertEqual(C.data['status'], 'MERGED')
self.assertEqual(C.reported, 2)
# The promote should be a noop, so no canceled jobs
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 0)
self.assertEqual(len(self.history), 9)
def test_promote_check(self):
"""Test that a change can be promoted via the admin web interface"""
self.executor_server.hold_jobs_in_build = True
# Make a patch series so that we have some non-live items in
# the pipeline and we can make sure they are not promoted.
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
B.setDependsOn(A, 1)
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
C.setDependsOn(B, 1)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = [i for i in tenant.layout.pipelines['check'].getAllItems()
if i.live]
enqueue_times = {}
for item in items:
enqueue_times[str(item.change)] = item.enqueue_time
# REST API
args = {'pipeline': 'check',
'changes': ['2,1', '3,1']}
authz = {'iss': 'zuul_operator',
'aud': 'zuul.example.com',
'sub': 'testuser',
'zuul': {
'admin': ['tenant-one', ],
},
'exp': time.time() + 3600,
'iat': time.time()}
token = jwt.encode(authz, key='NoDanaOnlyZuul',
algorithm='HS256')
req = self.post_url(
'api/tenant/tenant-one/promote',
headers={'Authorization': 'Bearer %s' % token},
json=args)
self.assertEqual(200, req.status_code, req.text)
data = req.json()
self.assertEqual(True, data)
self.waitUntilSettled()
# ensure that enqueue times are durable
items = [i for i in tenant.layout.pipelines['check'].getAllItems()
if i.live]
for item in items:
self.assertEqual(
enqueue_times[str(item.change)], item.enqueue_time)
# We can't reliably test for side effects in the check
# pipeline since the change queues are independent, so we
# directly examine the queues.
queue_items = [(item.change.number, item.live) for item in
tenant.layout.pipelines['check'].getAllItems()]
expected = [('1', False),
('2', True),
('1', False),
('2', False),
('3', True),
('1', True)]
self.assertEqual(expected, queue_items)
self.executor_server.release('.*-merge')
self.waitUntilSettled()
self.executor_server.release()
self.waitUntilSettled()
# No jobs should be canceled
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 0)
self.assertEqual(len(self.history), 9)
def test_tenant_authorizations_override(self):
"""Test that user gets overriden tenant authz if allowed"""

View File

@ -166,7 +166,7 @@ class Change extends React.Component {
renderAdminCommands(idx) {
const { showAdminActions } = this.state
const { queue, pipeline } = this.props
const { queue } = this.props
const dropdownCommands = [
<DropdownItem
key="dequeue"
@ -179,22 +179,18 @@ class Change extends React.Component {
this.setState(() => ({ showDequeueModal: true }))
}}
>Dequeue</DropdownItem>,
<DropdownItem
key="promote"
icon={<AngleDoubleUpIcon style={{
color: 'var(--pf-global--default-color--200)',
}} />}
description="Promote this change to the top of the queue"
onClick={(event) => {
event.preventDefault()
this.setState(() => ({ showPromoteModal: true }))
}}
>Promote</DropdownItem>
]
if (pipeline.manager === 'dependent') {
dropdownCommands.push(
<DropdownItem
key="promote"
icon={<AngleDoubleUpIcon style={{
color: 'var(--pf-global--default-color--200)',
}} />}
description="Promote this change to the top of the queue"
onClick={(event) => {
event.preventDefault()
this.setState(() => ({ showPromoteModal: true }))
}}
>Promote</DropdownItem>
)
}
return (
<Dropdown
title='Actions'

View File

@ -527,6 +527,13 @@ class Pipeline(object):
self.queues.remove(queue)
queue.delete(self.manager.current_context)
def promoteQueue(self, queue):
if queue not in self.queues:
return
with self.state.activeContext(self.manager.current_context):
self.queues.remove(queue)
self.queues.insert(0, queue)
def getChangesInQueue(self):
changes = []
for shared_queue in self.queues:

View File

@ -25,7 +25,7 @@ import traceback
import uuid
from contextlib import suppress
from zuul.vendor.contextlib import nullcontext
from collections import defaultdict
from collections import defaultdict, OrderedDict
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
@ -1659,39 +1659,60 @@ class Scheduler(threading.Thread):
change_ids = [c.split(',') for c in event.change_ids]
items_to_enqueue = []
change_queue = None
for shared_queue in pipeline.queues:
if change_queue:
break
for item in shared_queue.queue:
if (item.change.number == change_ids[0][0] and
item.change.patchset == change_ids[0][1]):
change_queue = shared_queue
break
if not change_queue:
raise Exception("Unable to find shared change queue for %s" %
event.change_ids[0])
# A list of (queue, items); the queue should be promoted
# within the pipeline, and the items should be promoted within
# the queue.
promote_operations = OrderedDict()
for number, patchset in change_ids:
found = False
for item in change_queue.queue:
if (item.change.number == number and
for shared_queue in pipeline.queues:
for item in shared_queue.queue:
if not item.live:
continue
if (item.change.number == number and
item.change.patchset == patchset):
found = True
items_to_enqueue.append(item)
promote_operations.setdefault(
shared_queue, []).append(item)
found = True
break
if found:
break
if not found:
raise Exception("Unable to find %s,%s in queue %s" %
(number, patchset, change_queue))
for item in change_queue.queue[:]:
if item not in items_to_enqueue:
items_to_enqueue.append(item)
pipeline.manager.cancelJobs(item)
pipeline.manager.dequeueItem(item)
for item in items_to_enqueue:
pipeline.manager.addChange(
item.change, event,
enqueue_time=item.enqueue_time,
quiet=True,
ignore_requirements=True)
raise Exception("Unable to find %s,%s" % (number, patchset))
# Reverse the promote operations so that we promote the first
# change queue last (which will put it at the top).
for change_queue, items_to_enqueue in\
reversed(promote_operations.items()):
# We can leave changes in the pipeline as long as the head
# remains identical; as soon as we make a change, we have
# to re-enqueue everything behind it in order to ensure
# dependencies are correct.
head_same = True
for item in change_queue.queue[:]:
if item.live and item == items_to_enqueue[0] and head_same:
# We can skip this one and leave it in place
items_to_enqueue.pop(0)
continue
elif not item.live:
# Ignore this; if the actual item behind it is
# dequeued, it will get cleaned up eventually.
continue
if item not in items_to_enqueue:
items_to_enqueue.append(item)
head_same = False
pipeline.manager.cancelJobs(item)
pipeline.manager.dequeueItem(item)
for item in items_to_enqueue:
pipeline.manager.addChange(
item.change, item.event,
enqueue_time=item.enqueue_time,
quiet=True,
ignore_requirements=True)
# Regardless, move this shared change queue to the head.
pipeline.promoteQueue(change_queue)
def _doDequeueEvent(self, event):
tenant = self.abide.tenants.get(event.tenant_name)