Merge "Move dependency cycle detection into pipelines" into feature/zuulv3
This commit is contained in:
commit
5f366e478b
|
@ -4173,6 +4173,7 @@ For CI problems and help debugging, contact ci@example.org"""
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
self.assertEqual(A.reported, 1)
|
||||||
|
|
||||||
# Create B->A
|
# Create B->A
|
||||||
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
|
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
|
||||||
|
@ -4181,41 +4182,33 @@ For CI problems and help debugging, contact ci@example.org"""
|
||||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
|
# Dep is there so zuul should have reported on B
|
||||||
|
self.assertEqual(B.reported, 1)
|
||||||
|
|
||||||
# Update A to add A->B (a cycle).
|
# Update A to add A->B (a cycle).
|
||||||
A.addPatchset()
|
A.addPatchset()
|
||||||
A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
|
A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
|
||||||
A.subject, B.data['id'])
|
A.subject, B.data['id'])
|
||||||
# Normally we would submit the patchset-created event for
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
|
||||||
# processing here, however, we have no way of noting whether
|
self.waitUntilSettled()
|
||||||
# the dependency cycle detection correctly raised an
|
|
||||||
# exception, so instead, we reach into the source driver and
|
|
||||||
# call the method that would ultimately be called by the event
|
|
||||||
# processing.
|
|
||||||
|
|
||||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
# Dependency cycle injected so zuul should not have reported again on A
|
||||||
(trusted, project) = tenant.getProject('org/project')
|
self.assertEqual(A.reported, 1)
|
||||||
source = project.source
|
|
||||||
|
|
||||||
# TODO(pabelanger): As we add more source / trigger APIs we should make
|
|
||||||
# it easier for users to create events for testing.
|
|
||||||
event = zuul.model.TriggerEvent()
|
|
||||||
event.trigger_name = 'gerrit'
|
|
||||||
event.change_number = '1'
|
|
||||||
event.patch_number = '2'
|
|
||||||
with testtools.ExpectedException(
|
|
||||||
Exception, "Dependency cycle detected"):
|
|
||||||
source.getChange(event, True)
|
|
||||||
self.log.debug("Got expected dependency cycle exception")
|
|
||||||
|
|
||||||
# Now if we update B to remove the depends-on, everything
|
# Now if we update B to remove the depends-on, everything
|
||||||
# should be okay. B; A->B
|
# should be okay. B; A->B
|
||||||
|
|
||||||
B.addPatchset()
|
B.addPatchset()
|
||||||
B.data['commitMessage'] = '%s\n' % (B.subject,)
|
B.data['commitMessage'] = '%s\n' % (B.subject,)
|
||||||
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
|
||||||
|
self.waitUntilSettled()
|
||||||
|
|
||||||
source.getChange(event, True)
|
# Cycle was removed so now zuul should have reported again on A
|
||||||
event.change_number = '2'
|
self.assertEqual(A.reported, 2)
|
||||||
source.getChange(event, True)
|
|
||||||
|
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
|
||||||
|
self.waitUntilSettled()
|
||||||
|
self.assertEqual(B.reported, 2)
|
||||||
|
|
||||||
@simple_layout('layouts/disable_at.yaml')
|
@simple_layout('layouts/disable_at.yaml')
|
||||||
def test_disable_at(self):
|
def test_disable_at(self):
|
||||||
|
|
|
@ -31,20 +31,6 @@ from zuul import exceptions
|
||||||
from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
|
from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
|
||||||
|
|
||||||
|
|
||||||
# Walk the change dependency tree to find a cycle
|
|
||||||
def detect_cycle(change, history=None):
|
|
||||||
if history is None:
|
|
||||||
history = []
|
|
||||||
else:
|
|
||||||
history = history[:]
|
|
||||||
history.append(change.number)
|
|
||||||
for dep in change.needs_changes:
|
|
||||||
if dep.number in history:
|
|
||||||
raise Exception("Dependency cycle detected: %s in %s" % (
|
|
||||||
dep.number, history))
|
|
||||||
detect_cycle(dep, history)
|
|
||||||
|
|
||||||
|
|
||||||
class GerritEventConnector(threading.Thread):
|
class GerritEventConnector(threading.Thread):
|
||||||
"""Move events from Gerrit to the scheduler."""
|
"""Move events from Gerrit to the scheduler."""
|
||||||
|
|
||||||
|
@ -383,6 +369,13 @@ class GerritConnection(BaseConnection):
|
||||||
return records
|
return records
|
||||||
|
|
||||||
def _updateChange(self, change, history=None):
|
def _updateChange(self, change, history=None):
|
||||||
|
|
||||||
|
# In case this change is already in the history we have a cyclic
|
||||||
|
# dependency and don't need to update ourselves again as this gets
|
||||||
|
# done in a previous frame of the call stack.
|
||||||
|
if history and change.number in history:
|
||||||
|
return change
|
||||||
|
|
||||||
self.log.info("Updating %s" % (change,))
|
self.log.info("Updating %s" % (change,))
|
||||||
data = self.query(change.number)
|
data = self.query(change.number)
|
||||||
change._data = data
|
change._data = data
|
||||||
|
@ -432,18 +425,9 @@ class GerritConnection(BaseConnection):
|
||||||
if 'dependsOn' in data:
|
if 'dependsOn' in data:
|
||||||
parts = data['dependsOn'][0]['ref'].split('/')
|
parts = data['dependsOn'][0]['ref'].split('/')
|
||||||
dep_num, dep_ps = parts[3], parts[4]
|
dep_num, dep_ps = parts[3], parts[4]
|
||||||
if dep_num in history:
|
|
||||||
raise Exception("Dependency cycle detected: %s in %s" % (
|
|
||||||
dep_num, history))
|
|
||||||
self.log.debug("Updating %s: Getting git-dependent change %s,%s" %
|
self.log.debug("Updating %s: Getting git-dependent change %s,%s" %
|
||||||
(change, dep_num, dep_ps))
|
(change, dep_num, dep_ps))
|
||||||
dep = self._getChange(dep_num, dep_ps, history=history)
|
dep = self._getChange(dep_num, dep_ps, history=history)
|
||||||
# Because we are not forcing a refresh in _getChange, it
|
|
||||||
# may return without executing this code, so if we are
|
|
||||||
# updating our change to add ourselves to a dependency
|
|
||||||
# cycle, we won't detect it. By explicitly performing a
|
|
||||||
# walk of the dependency tree, we will.
|
|
||||||
detect_cycle(dep, history)
|
|
||||||
if (not dep.is_merged) and dep not in needs_changes:
|
if (not dep.is_merged) and dep not in needs_changes:
|
||||||
needs_changes.append(dep)
|
needs_changes.append(dep)
|
||||||
|
|
||||||
|
@ -451,19 +435,10 @@ class GerritConnection(BaseConnection):
|
||||||
change):
|
change):
|
||||||
dep_num = record['number']
|
dep_num = record['number']
|
||||||
dep_ps = record['currentPatchSet']['number']
|
dep_ps = record['currentPatchSet']['number']
|
||||||
if dep_num in history:
|
|
||||||
raise Exception("Dependency cycle detected: %s in %s" % (
|
|
||||||
dep_num, history))
|
|
||||||
self.log.debug("Updating %s: Getting commit-dependent "
|
self.log.debug("Updating %s: Getting commit-dependent "
|
||||||
"change %s,%s" %
|
"change %s,%s" %
|
||||||
(change, dep_num, dep_ps))
|
(change, dep_num, dep_ps))
|
||||||
dep = self._getChange(dep_num, dep_ps, history=history)
|
dep = self._getChange(dep_num, dep_ps, history=history)
|
||||||
# Because we are not forcing a refresh in _getChange, it
|
|
||||||
# may return without executing this code, so if we are
|
|
||||||
# updating our change to add ourselves to a dependency
|
|
||||||
# cycle, we won't detect it. By explicitly performing a
|
|
||||||
# walk of the dependency tree, we will.
|
|
||||||
detect_cycle(dep, history)
|
|
||||||
if (not dep.is_merged) and dep not in needs_changes:
|
if (not dep.is_merged) and dep not in needs_changes:
|
||||||
needs_changes.append(dep)
|
needs_changes.append(dep)
|
||||||
change.needs_changes = needs_changes
|
change.needs_changes = needs_changes
|
||||||
|
@ -475,7 +450,7 @@ class GerritConnection(BaseConnection):
|
||||||
dep_num, dep_ps = parts[3], parts[4]
|
dep_num, dep_ps = parts[3], parts[4]
|
||||||
self.log.debug("Updating %s: Getting git-needed change %s,%s" %
|
self.log.debug("Updating %s: Getting git-needed change %s,%s" %
|
||||||
(change, dep_num, dep_ps))
|
(change, dep_num, dep_ps))
|
||||||
dep = self._getChange(dep_num, dep_ps)
|
dep = self._getChange(dep_num, dep_ps, history=history)
|
||||||
if (not dep.is_merged) and dep.is_current_patchset:
|
if (not dep.is_merged) and dep.is_current_patchset:
|
||||||
needed_by_changes.append(dep)
|
needed_by_changes.append(dep)
|
||||||
|
|
||||||
|
@ -487,8 +462,11 @@ class GerritConnection(BaseConnection):
|
||||||
# Because a commit needed-by may be a cross-repo
|
# Because a commit needed-by may be a cross-repo
|
||||||
# dependency, cause that change to refresh so that it will
|
# dependency, cause that change to refresh so that it will
|
||||||
# reference the latest patchset of its Depends-On (this
|
# reference the latest patchset of its Depends-On (this
|
||||||
# change).
|
# change). In case the dep is already in history we already
|
||||||
dep = self._getChange(dep_num, dep_ps, refresh=True)
|
# refreshed this change so refresh is not needed in this case.
|
||||||
|
refresh = dep_num not in history
|
||||||
|
dep = self._getChange(
|
||||||
|
dep_num, dep_ps, refresh=refresh, history=history)
|
||||||
if (not dep.is_merged) and dep.is_current_patchset:
|
if (not dep.is_merged) and dep.is_current_patchset:
|
||||||
needed_by_changes.append(dep)
|
needed_by_changes.append(dep)
|
||||||
change.needed_by_changes = needed_by_changes
|
change.needed_by_changes = needed_by_changes
|
||||||
|
|
|
@ -176,7 +176,7 @@ class PipelineManager(object):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
|
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
|
||||||
change_queue):
|
change_queue, history=None):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def enqueueChangesBehind(self, change, quiet, ignore_requirements,
|
def enqueueChangesBehind(self, change, quiet, ignore_requirements,
|
||||||
|
@ -264,7 +264,7 @@ class PipelineManager(object):
|
||||||
|
|
||||||
def addChange(self, change, quiet=False, enqueue_time=None,
|
def addChange(self, change, quiet=False, enqueue_time=None,
|
||||||
ignore_requirements=False, live=True,
|
ignore_requirements=False, live=True,
|
||||||
change_queue=None):
|
change_queue=None, history=None):
|
||||||
self.log.debug("Considering adding change %s" % change)
|
self.log.debug("Considering adding change %s" % change)
|
||||||
|
|
||||||
# If we are adding a live change, check if it's a live item
|
# If we are adding a live change, check if it's a live item
|
||||||
|
@ -299,7 +299,7 @@ class PipelineManager(object):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
|
if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
|
||||||
change_queue):
|
change_queue, history=history):
|
||||||
self.log.debug("Failed to enqueue changes "
|
self.log.debug("Failed to enqueue changes "
|
||||||
"ahead of %s" % change)
|
"ahead of %s" % change)
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -115,7 +115,15 @@ class DependentPipelineManager(PipelineManager):
|
||||||
change_queue=change_queue)
|
change_queue=change_queue)
|
||||||
|
|
||||||
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
|
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
|
||||||
change_queue):
|
change_queue, history=None):
|
||||||
|
if history and change.number in history:
|
||||||
|
# detected dependency cycle
|
||||||
|
self.log.warn("Dependency cycle detected")
|
||||||
|
return False
|
||||||
|
if hasattr(change, 'number'):
|
||||||
|
history = history or []
|
||||||
|
history.append(change.number)
|
||||||
|
|
||||||
ret = self.checkForChangesNeededBy(change, change_queue)
|
ret = self.checkForChangesNeededBy(change, change_queue)
|
||||||
if ret in [True, False]:
|
if ret in [True, False]:
|
||||||
return ret
|
return ret
|
||||||
|
@ -124,7 +132,7 @@ class DependentPipelineManager(PipelineManager):
|
||||||
for needed_change in ret:
|
for needed_change in ret:
|
||||||
r = self.addChange(needed_change, quiet=quiet,
|
r = self.addChange(needed_change, quiet=quiet,
|
||||||
ignore_requirements=ignore_requirements,
|
ignore_requirements=ignore_requirements,
|
||||||
change_queue=change_queue)
|
change_queue=change_queue, history=history)
|
||||||
if not r:
|
if not r:
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
|
@ -36,7 +36,15 @@ class IndependentPipelineManager(PipelineManager):
|
||||||
return DynamicChangeQueueContextManager(change_queue)
|
return DynamicChangeQueueContextManager(change_queue)
|
||||||
|
|
||||||
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
|
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
|
||||||
change_queue):
|
change_queue, history=None):
|
||||||
|
if history and change.number in history:
|
||||||
|
# detected dependency cycle
|
||||||
|
self.log.warn("Dependency cycle detected")
|
||||||
|
return False
|
||||||
|
if hasattr(change, 'number'):
|
||||||
|
history = history or []
|
||||||
|
history.append(change.number)
|
||||||
|
|
||||||
ret = self.checkForChangesNeededBy(change, change_queue)
|
ret = self.checkForChangesNeededBy(change, change_queue)
|
||||||
if ret in [True, False]:
|
if ret in [True, False]:
|
||||||
return ret
|
return ret
|
||||||
|
@ -50,7 +58,8 @@ class IndependentPipelineManager(PipelineManager):
|
||||||
# live).
|
# live).
|
||||||
r = self.addChange(needed_change, quiet=True,
|
r = self.addChange(needed_change, quiet=True,
|
||||||
ignore_requirements=True,
|
ignore_requirements=True,
|
||||||
live=False, change_queue=change_queue)
|
live=False, change_queue=change_queue,
|
||||||
|
history=history)
|
||||||
if not r:
|
if not r:
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
|
@ -154,7 +154,8 @@ class Pipeline(object):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def removeQueue(self, queue):
|
def removeQueue(self, queue):
|
||||||
self.queues.remove(queue)
|
if queue in self.queues:
|
||||||
|
self.queues.remove(queue)
|
||||||
|
|
||||||
def getChangesInQueue(self):
|
def getChangesInQueue(self):
|
||||||
changes = []
|
changes = []
|
||||||
|
|
Loading…
Reference in New Issue