Fix issue with reopened PR dependencies
Given two PRs with B depending on A which are enqueued in gate, A is closed and then immediately reopened. This sequence of events will currently dequeue A and then immediately enqueue it behind B. Since the check for whether a dependency is already in the queue doesn't care if it's ahead or behind the current change, we'll not dequeue B and the content of builds executed by B will not include A. This change updates the check to determine if a change is already in the queue to only check for changes ahead of it. This causes B to be correctly dequeued in the next pipeline pass. This behavior is correct, but isn't always intuitive or consistent. If the time between closing and reopening a change is long enough for a pipeline process, then both changes will be enqueued by the reopening (because we check for changes needing enqueued changes and enqueue them behind). But if both events are processed in a single pipeline run, then the removal of B happens after the re-enqueue of A which means that it won't be re-added. To correct this, whenever we remove abandoned changes, we will also remove changes behind them that depend on the removed abandoned changes at the same time. This means that in our scenario above, the re-enqueue happens under the same conditions as the original enqueue, and both A and B are re-enqueued. Co-Authored-By: James E. Blair <jim@acmegating.com> Change-Id: Ia1d79bccb9ea39e486483283611601aa23903000
This commit is contained in:
parent
2c2a2d61a5
commit
0349628249
2
tests/fixtures/layouts/crd-github.yaml
vendored
2
tests/fixtures/layouts/crd-github.yaml
vendored
@ -19,6 +19,8 @@
|
||||
github:
|
||||
- event: pull_request
|
||||
action: edited
|
||||
- event: pull_request
|
||||
action: reopened
|
||||
start:
|
||||
github: {}
|
||||
success:
|
||||
|
@ -215,3 +215,45 @@ class TestGithubCrossRepoDeps(ZuulTestCase):
|
||||
|
||||
# The job should be aborted since B was updated while enqueued.
|
||||
self.assertHistory([dict(name='project3-test', result='ABORTED')])
|
||||
|
||||
@simple_layout('layouts/crd-github.yaml', driver='github')
|
||||
def test_crd_dependent_close_reopen(self):
|
||||
"""
|
||||
Test that closing and reopening PR correctly re-enqueues the
|
||||
dependent change in the correct order.
|
||||
"""
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
A = self.fake_github.openFakePullRequest('org/project3', 'master', 'A')
|
||||
B = self.fake_github.openFakePullRequest(
|
||||
'org/project4', 'master', 'B',
|
||||
body=f"Depends-On: https://github.com/org/project3/pull/{A.number}"
|
||||
)
|
||||
|
||||
self.fake_github.emitEvent(B.getPullRequestEditedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Close and reopen PR A
|
||||
self.fake_github.emitEvent(A.getPullRequestClosedEvent())
|
||||
self.fake_github.emitEvent(A.getPullRequestReopenedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
# The changes for the job from project4 should include the project3
|
||||
# PR content
|
||||
self.assertHistory([
|
||||
dict(name='project3-test', result='ABORTED',
|
||||
changes=f"{A.number},{A.head_sha}"),
|
||||
dict(name='project4-test', result='ABORTED',
|
||||
changes=f"{A.number},{A.head_sha} {B.number},{B.head_sha}"),
|
||||
dict(name='project3-test', result='SUCCESS',
|
||||
changes=f"{A.number},{A.head_sha}"),
|
||||
dict(name='project4-test', result='SUCCESS',
|
||||
changes=f"{A.number},{A.head_sha} {B.number},{B.head_sha}"),
|
||||
], ordered=False)
|
||||
|
||||
self.assertTrue(A.is_merged)
|
||||
self.assertTrue(B.is_merged)
|
||||
|
@ -312,10 +312,15 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
return True
|
||||
return False
|
||||
|
||||
def isChangeAlreadyInQueue(self, change, change_queue):
|
||||
def isChangeAlreadyInQueue(self, change, change_queue, item=None):
|
||||
# Checks any item in the specified change queue
|
||||
for item in change_queue.queue:
|
||||
for c in item.changes:
|
||||
# If item is supplied, only consider items ahead of the
|
||||
# supplied item (ie, is the change already in the queue ahead
|
||||
# of this item?).
|
||||
for queue_item in change_queue.queue:
|
||||
if queue_item is item:
|
||||
break
|
||||
for c in queue_item.changes:
|
||||
if change.equals(c):
|
||||
return True
|
||||
return False
|
||||
@ -419,13 +424,18 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
return True
|
||||
|
||||
def getMissingNeededChanges(self, changes, change_queue, event,
|
||||
dependency_graph=None):
|
||||
dependency_graph=None, item=None):
|
||||
"""Check that all needed changes are ahead in the queue.
|
||||
|
||||
Return a list of any that are missing. If it is not possible
|
||||
to correct the missing changes, "abort" will be true.
|
||||
|
||||
If item is supplied, only consider items ahead of the supplied
|
||||
item when determining whether needed changes are already ahead
|
||||
in the queue.
|
||||
|
||||
:returns: (abort, needed_changes)
|
||||
|
||||
"""
|
||||
return False, []
|
||||
|
||||
@ -472,23 +482,71 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
def removeAbandonedChange(self, change, event):
|
||||
log = get_annotated_logger(self.log, event)
|
||||
log.debug("Change %s abandoned, removing", change)
|
||||
for item in self.pipeline.getAllItems():
|
||||
if not item.live:
|
||||
for queue in self.pipeline.queues:
|
||||
# Below we need to remove dependent changes of abandoned
|
||||
# changes we remove here, but only if both are live.
|
||||
# Therefore, track the changes we remove independently for
|
||||
# each shared queue (since different queues may have
|
||||
# different live/non-live changes).
|
||||
changes_removed = set()
|
||||
for item in queue.queue:
|
||||
if not item.live:
|
||||
continue
|
||||
self._removeAbandonedChangeExamineItem(
|
||||
change, item, changes_removed)
|
||||
# Abbreviated version of dependency check; we're not going
|
||||
# to check everything (processOneItem will take care of
|
||||
# that), but we will remove any changes that depend on any
|
||||
# changes we just removed; we can do that without
|
||||
# recreating the full dependency graphs.
|
||||
if not changes_removed:
|
||||
continue
|
||||
for item_change in item.changes:
|
||||
if item_change.equals(change):
|
||||
if len(item.changes) > 1:
|
||||
# We need to cancel all jobs here as setting
|
||||
# dequeued needing change will skip all jobs.
|
||||
self.cancelJobs(item)
|
||||
msg = ("Dependency cycle change "
|
||||
f"{change.url} abandoned.")
|
||||
for item in queue.queue:
|
||||
if not item.live:
|
||||
continue
|
||||
self._removeAbandonedChangeDependentsExamineItem(
|
||||
log, item, changes_removed)
|
||||
|
||||
def _removeAbandonedChangeExamineItem(self, change, item, changes_removed):
|
||||
# The inner loop from the first part of removeAbandonedChange.
|
||||
for item_change in item.changes:
|
||||
if item_change.equals(change):
|
||||
if len(item.changes) > 1:
|
||||
# We need to cancel all jobs here as setting
|
||||
# dequeued needing change will skip all jobs.
|
||||
self.cancelJobs(item)
|
||||
msg = ("Dependency cycle change "
|
||||
f"{change.url} abandoned.")
|
||||
item.setDequeuedNeedingChange(msg)
|
||||
try:
|
||||
self.reportItem(item)
|
||||
except exceptions.MergeFailure:
|
||||
pass
|
||||
self.removeItem(item)
|
||||
changes_removed.update(item.changes)
|
||||
return
|
||||
|
||||
def _removeAbandonedChangeDependentsExamineItem(
|
||||
self, log, item, changes_removed):
|
||||
# The inner loop from the second part of removeAbandonedChange.
|
||||
for item_change in item.changes:
|
||||
for needed_change in self.resolveChangeReferences(
|
||||
item_change.getNeedsChanges(
|
||||
self.useDependenciesByTopic(item_change.project))):
|
||||
for removed_change in changes_removed:
|
||||
if needed_change.equals(removed_change):
|
||||
log.debug("Change %s depends on abandoned change %s, "
|
||||
"removing", item_change, removed_change)
|
||||
msg = f'Change {removed_change} is needed.'
|
||||
item.setDequeuedNeedingChange(msg)
|
||||
try:
|
||||
self.reportItem(item)
|
||||
except exceptions.MergeFailure:
|
||||
pass
|
||||
self.removeItem(item)
|
||||
if item.live:
|
||||
try:
|
||||
self.reportItem(item)
|
||||
except exceptions.MergeFailure:
|
||||
pass
|
||||
self.dequeueItem(item)
|
||||
changes_removed.update(item.changes)
|
||||
return
|
||||
|
||||
def reEnqueueChanges(self, item, changes):
|
||||
for change in changes:
|
||||
@ -1659,7 +1717,8 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
|
||||
abort, needs_changes = self.getMissingNeededChanges(
|
||||
item.changes, change_queue, item.event,
|
||||
dependency_graph=dependency_graph)
|
||||
dependency_graph=dependency_graph,
|
||||
item=item)
|
||||
if not (meets_reqs and not needs_changes):
|
||||
# It's not okay to enqueue this change, we should remove it.
|
||||
log.info("Dequeuing %s because "
|
||||
|
@ -183,7 +183,8 @@ class DependentPipelineManager(SharedQueuePipelineManager):
|
||||
return True
|
||||
|
||||
def getMissingNeededChanges(self, changes, change_queue, event,
|
||||
dependency_graph=None, warnings=None):
|
||||
dependency_graph=None, warnings=None,
|
||||
item=None):
|
||||
log = get_annotated_logger(self.log, event)
|
||||
changes_needed = []
|
||||
abort = False
|
||||
@ -231,7 +232,7 @@ class DependentPipelineManager(SharedQueuePipelineManager):
|
||||
log.debug(" Needed change is in cycle")
|
||||
continue
|
||||
if self.isChangeAlreadyInQueue(
|
||||
needed_change, change_queue):
|
||||
needed_change, change_queue, item):
|
||||
log.debug(" Needed change is already "
|
||||
"ahead in the queue")
|
||||
continue
|
||||
|
@ -80,7 +80,7 @@ class IndependentPipelineManager(PipelineManager):
|
||||
return True
|
||||
|
||||
def getMissingNeededChanges(self, changes, change_queue, event,
|
||||
dependency_graph=None):
|
||||
dependency_graph=None, item=None):
|
||||
log = get_annotated_logger(self.log, event)
|
||||
|
||||
if self.pipeline.ignore_dependencies:
|
||||
|
Loading…
x
Reference in New Issue
Block a user