Fix trigger event forwarding bug
The change to move trigger events into ZK had an error. It tries to determine if it needs to forward each trigger event to each pipeline in each tenant. To do that, it re-uses the pipeline management functions to determine if an event might be relevant to a change in that pipeline (for example, it is an abandon event). Those comparisons have not been used to compare items of different types before (changes to tags, for example), and raise exceptions in those cases. To correct this, improve the equality comparisons for those types. Additionally, when this exception was raised, it caused any further processing of the event to abort, so any pipelines or tenants after the tenant-pipeline which caused the error would not receive the event. To make the system more robust so that an error related to one tenant does not affect others, we add an extra exception handler to log those errors and proceed to the next tenant. Change-Id: I9b62b83d51f66bee85cdb968d24bd5367e62e35d
This commit is contained in:
parent
92f43d874a
commit
fd6181baf1
|
@ -0,0 +1 @@
|
|||
---
|
|
@ -0,0 +1,33 @@
|
|||
- pipeline:
|
||||
name: check
|
||||
manager: independent
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: patchset-created
|
||||
success:
|
||||
gerrit:
|
||||
Verified: 1
|
||||
failure:
|
||||
gerrit:
|
||||
Verified: -1
|
||||
|
||||
- pipeline:
|
||||
name: tag
|
||||
manager: independent
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: ref-updated
|
||||
ref: ^refs/tags/.*$
|
||||
|
||||
- job:
|
||||
name: base
|
||||
parent: null
|
||||
run: playbooks/run.yaml
|
||||
|
||||
- job:
|
||||
name: checkjob
|
||||
parent: base
|
||||
|
||||
- job:
|
||||
name: tagjob
|
||||
parent: base
|
|
@ -0,0 +1 @@
|
|||
test
|
|
@ -0,0 +1,7 @@
|
|||
- project:
|
||||
check:
|
||||
jobs:
|
||||
- checkjob
|
||||
tag:
|
||||
jobs:
|
||||
- tagjob
|
|
@ -0,0 +1 @@
|
|||
test
|
|
@ -0,0 +1,7 @@
|
|||
- project:
|
||||
check:
|
||||
jobs:
|
||||
- checkjob
|
||||
tag:
|
||||
jobs:
|
||||
- tagjob
|
|
@ -0,0 +1,17 @@
|
|||
- tenant:
|
||||
name: tenant-one
|
||||
source:
|
||||
gerrit:
|
||||
config-projects:
|
||||
- common-config
|
||||
untrusted-projects:
|
||||
- org/project1
|
||||
|
||||
- tenant:
|
||||
name: tenant-two
|
||||
source:
|
||||
gerrit:
|
||||
config-projects:
|
||||
- common-config
|
||||
untrusted-projects:
|
||||
- org/project2
|
|
@ -774,3 +774,38 @@ class TestFreezable(BaseTestCase):
|
|||
# still shows the ordered behavior.
|
||||
self.assertTrue(isinstance(o.odict2, types.MappingProxyType))
|
||||
self.assertEqual(list(o.odict2.keys()), seq)
|
||||
|
||||
|
||||
class TestRef(BaseTestCase):
|
||||
def test_ref_equality(self):
|
||||
change1 = model.Change('project1')
|
||||
change1.ref = '/change1'
|
||||
change1b = model.Change('project1')
|
||||
change1b.ref = '/change1'
|
||||
change2 = model.Change('project2')
|
||||
change2.ref = '/change2'
|
||||
self.assertFalse(change1.equals(change2))
|
||||
self.assertTrue(change1.equals(change1b))
|
||||
|
||||
tag1 = model.Tag('project1')
|
||||
tag1.ref = '/tag1'
|
||||
tag1b = model.Tag('project1')
|
||||
tag1b.ref = '/tag1'
|
||||
tag2 = model.Tag('project2')
|
||||
tag2.ref = '/tag2'
|
||||
self.assertFalse(tag1.equals(tag2))
|
||||
self.assertTrue(tag1.equals(tag1b))
|
||||
|
||||
self.assertFalse(tag1.equals(change1))
|
||||
|
||||
branch1 = model.Branch('project1')
|
||||
branch1.ref = '/branch1'
|
||||
branch1b = model.Branch('project1')
|
||||
branch1b.ref = '/branch1'
|
||||
branch2 = model.Branch('project2')
|
||||
branch2.ref = '/branch2'
|
||||
self.assertFalse(branch1.equals(branch2))
|
||||
self.assertTrue(branch1.equals(branch1b))
|
||||
|
||||
self.assertFalse(branch1.equals(change1))
|
||||
self.assertFalse(branch1.equals(tag1))
|
||||
|
|
|
@ -31,6 +31,7 @@ import git
|
|||
import testtools
|
||||
from testtools.matchers import AfterPreprocessing, MatchesRegex
|
||||
from zuul.scheduler import Scheduler
|
||||
import fixtures
|
||||
|
||||
import zuul.change_matcher
|
||||
from zuul.driver.gerrit import gerritreporter
|
||||
|
@ -8862,3 +8863,61 @@ class TestReconfigureBranchCreateDeleteHttp(TestReconfigureBranch):
|
|||
self._expectReconfigure(True)
|
||||
self._deleteBranch()
|
||||
self._expectReconfigure(True)
|
||||
|
||||
|
||||
class TestEventProcessing(ZuulTestCase):
|
||||
tenant_config_file = 'config/event-processing/main.yaml'
|
||||
|
||||
# Some regression tests for ZK-distributed event processing
|
||||
|
||||
def test_independent_tenants(self):
|
||||
# Test that an exception in one tenant doesn't break others
|
||||
|
||||
orig = zuul.scheduler.Scheduler._forward_trigger_event
|
||||
|
||||
def patched_forward(obj, *args, **kw):
|
||||
if args[1].name == 'tenant-one':
|
||||
raise Exception("test")
|
||||
return orig(obj, *args, **kw)
|
||||
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'zuul.scheduler.Scheduler._forward_trigger_event',
|
||||
patched_forward))
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
self.assertHistory([
|
||||
dict(name='checkjob', result='SUCCESS', changes='1,1'),
|
||||
], ordered=False)
|
||||
|
||||
@skip("Failure can only be detected in logs; see test_ref_equality")
|
||||
def test_change_types(self):
|
||||
# Test that when we decide whether to forward events, we can
|
||||
# compare items with different change types (branch vs
|
||||
# change).
|
||||
|
||||
# We can't detect a failure here except by observing the logs;
|
||||
# this test is left in case that's useful in the future, but
|
||||
# automated detection of this failure case is handled by
|
||||
# test_ref_equality.
|
||||
|
||||
# Enqueue a tag
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
event = self.fake_gerrit.addFakeTag('org/project1', 'master', 'foo')
|
||||
self.fake_gerrit.addEvent(event)
|
||||
|
||||
# Enqueue a change and make sure the scheduler is able to
|
||||
# compare the two when forwarding the event
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertHistory([
|
||||
dict(name='tagjob', result='SUCCESS'),
|
||||
dict(name='checkjob', result='SUCCESS', changes='1,1'),
|
||||
], ordered=False)
|
||||
|
|
|
@ -3419,7 +3419,9 @@ class Change(Branch):
|
|||
return '<Change 0x%x %s %s>' % (id(self), pname, self._id())
|
||||
|
||||
def equals(self, other):
|
||||
if self.number == other.number and self.patchset == other.patchset:
|
||||
if (super().equals(other) and
|
||||
isinstance(other, Change) and
|
||||
self.number == other.number and self.patchset == other.patchset):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
|
|
@ -1101,23 +1101,22 @@ class Scheduler(threading.Thread):
|
|||
|
||||
def process_global_trigger_queue(self):
|
||||
for event in self.trigger_events:
|
||||
log = get_annotated_logger(
|
||||
self.log, event.zuul_event_id
|
||||
)
|
||||
log = get_annotated_logger(self.log, event.zuul_event_id)
|
||||
log.debug("Forwarding trigger event %s", event)
|
||||
try:
|
||||
for tenant in self.abide.tenants.values():
|
||||
self._forward_trigger_event(event, tenant)
|
||||
try:
|
||||
self._forward_trigger_event(event, tenant)
|
||||
except Exception:
|
||||
log.exception("Unable to forward event %s "
|
||||
"to tenant %s", event, tenant.name)
|
||||
finally:
|
||||
self.trigger_events.ack(event)
|
||||
|
||||
def _forward_trigger_event(self, event, tenant):
|
||||
log = get_annotated_logger(
|
||||
self.log, event.zuul_event_id
|
||||
)
|
||||
_, project = tenant.getProject(
|
||||
event.canonical_project_name
|
||||
)
|
||||
log = get_annotated_logger(self.log, event.zuul_event_id)
|
||||
trusted, project = tenant.getProject(event.canonical_project_name)
|
||||
|
||||
if project is None:
|
||||
return
|
||||
|
||||
|
|
Loading…
Reference in New Issue