Make Gerrit Connection Replication Aware

If Zuul receives replication events and is configured to wait for them
(by setting a positive timeout value) the gerrit connection will now
wait for replication to report completion before processing the related
events. This should allow Zuul deployments to rely on their replicas for
Zuul git operations.

While Zuul's test suite seems to indicate this works there are some
things to keep in mind. We can only associate replication and primary
events using project,ref tuples. This means that if many updates are
made to the same project,ref simultaneously Zuul must wait for all of
them to complete before processing any of the primary events.
Additionally, Gerrit replicas may have different performance and Zuul
may not talk to all of them. A potential followup improvement would be
to specify a matcher to match against replica names to limit the
replicas to only those that impact Zuul.

Change-Id: Ie1dfccac1f5a088547caa999352811ad6842f3e5
This commit is contained in:
Clark Boylan
2025-03-29 14:31:16 -07:00
parent dad87c1e99
commit c0746040e3
11 changed files with 680 additions and 82 deletions

View File

@@ -126,6 +126,31 @@ The supported options in ``zuul.conf`` connections are:
User name to use when accessing Gerrit.
.. replication_timeout
:default: 0
When set to a positive value Zuul will become replication event
aware. Zuul will wait this many seconds for replication to
complete for events like patchset-created, change-merged, and
ref-updated events before proceeding with processing the primary
event. This is useful if you are pointing Zuul to Gerrit
replicas which need replication to complete before Zuul can
successfully fetch updates. You should not set this value if
Zuul talks to Gerrit directly for git repo data.
Note that necessary fields are not present in all events (like
refName in changed-merged events) until Gerrit 2.13 and newer.
If your Gerrit is older you should consider sticking with the
default value of 0.
One major limitation of this feature is that Gerrit replication
events can only be mapped using project and ref values. This
means if you have multiple replication updates to the same project
and ref occuring simultaneously Zuul must wait for all of them to
complete before it continues. For this reason you should set this
timeout to a small multiple (2 or 3) of your typical replication
time.
SSH Configuration
~~~~~~~~~~~~~~~~~

View File

@@ -0,0 +1,10 @@
---
features:
- |
Add replication_timeout to Gerrit connection configuration. Setting
this to a positive value makes Zuul's Gerrit connection replication
aware. Zuul will wait up to this many seconds for replication to
complete for events like patchset-created, change-merged, and
ref-updated before continuing to process those primary events.
This is useful if you have configured Zuul to look at a Gerrit
replica for git data.

View File

@@ -299,6 +299,52 @@ class FakeGerritChange(object):
comment['range'] = comment_range
self.comments.append(comment)
def getPatchsetReplicationStartedEvent(self, patchset):
ref = "refs/changes/%s/%s/%s" % \
(str(self.number).zfill(2)[-2:], self.number, patchset)
event = {"type": "ref-replication-scheduled",
"project": self.project,
"ref": ref,
"targetNode": "git@gitserver:22",
"targetUri": "ssh://git@gitserver:22/%s" % self.project,
"eventCreatedOn": int(time.time())}
return event
def getPatchsetReplicatedEvent(self, patchset):
ref = "refs/changes/%s/%s/%s" % \
(str(self.number).zfill(2)[-2:], self.number, patchset)
event = {"type": "ref-replicated",
"project": self.project,
"refStatus": "OK",
"status": "succeeded",
"ref": ref,
"targetNode": "git@gitserver:22",
"targetUri": "ssh://git@gitserver:22/%s" % self.project,
"eventCreatedOn": int(time.time())}
return event
def getChangeMergedReplicationStartedEvent(self):
ref = "refs/heads/%s" % self.branch
event = {"type": "ref-replication-scheduled",
"project": self.project,
"ref": ref,
"targetNode": "git@gitserver:22",
"targetUri": "ssh://git@gitserver:22/%s" % self.project,
"eventCreatedOn": int(time.time())}
return event
def getChangeMergedReplicatedEvent(self):
ref = "refs/heads/%s" % self.branch
event = {"type": "ref-replicated",
"project": self.project,
"refStatus": "OK",
"status": "succeeded",
"ref": ref,
"targetNode": "git@gitserver:22",
"targetUri": "ssh://git@gitserver:22/%s" % self.project,
"eventCreatedOn": int(time.time())}
return event
def getPatchsetCreatedEvent(self, patchset):
event = {"type": "patchset-created",
"change": {"project": self.project,
@@ -380,6 +426,7 @@ class FakeGerritChange(object):
"patchSet": self.patchsets[-1],
"change": self.data,
"type": "change-merged",
"refName": "refs/heads/%s" % self.branch,
"eventCreatedOn": 1487613810}
return event

View File

@@ -0,0 +1,2 @@
- hosts: all
tasks: []

View File

@@ -43,6 +43,14 @@
ref: ^(?!refs/).*$
precedence: low
- pipeline:
name: promote
manager: supercedent
trigger:
gerrit:
- event: change-merged
precedence: low
- pipeline:
name: periodic
manager: independent
@@ -91,6 +99,14 @@
label: ubuntu-xenial
run: playbooks/project-post.yaml
- job:
name: project-promote
nodeset:
nodes:
- name: static
label: ubuntu-xenial
run: playbooks/project-promote.yaml
- job:
name: project-test2
nodeset:
@@ -134,6 +150,9 @@
post:
jobs:
- project-post
promote:
jobs:
- project-promote
- project:
name: org/project1
@@ -159,6 +178,9 @@
post:
jobs:
- project-post
promote:
jobs:
- project-promote
- project:
name: org/project2

View File

@@ -0,0 +1,39 @@
[statsd]
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
# see: https://github.com/jsocol/pystatsd/issues/61
server=127.0.0.1
[scheduler]
tenant_config=main.yaml
[merger]
git_dir=/tmp/zuul-test/merger-git
git_user_email=zuul@example.com
git_user_name=zuul
[web]
root=http://zuul.example.com
[executor]
git_dir=/tmp/zuul-test/executor-git
load_multiplier=100
[connection gerrit]
driver=gerrit
server=review.example.com
user=jenkins
sshkey=fake_id_rsa_path
password=badpassword
# Set a short timeout so that we timeout within the test case
# runtime.
replication_timeout=10
[connection smtp]
driver=smtp
server=localhost
port=25
default_from=zuul@example.com
default_to=you@example.com
[database]
dburi=$MYSQL_FIXTURE_DBURI$

View File

@@ -0,0 +1,39 @@
[statsd]
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
# see: https://github.com/jsocol/pystatsd/issues/61
server=127.0.0.1
[scheduler]
tenant_config=main.yaml
[merger]
git_dir=/tmp/zuul-test/merger-git
git_user_email=zuul@example.com
git_user_name=zuul
[web]
root=http://zuul.example.com
[executor]
git_dir=/tmp/zuul-test/executor-git
load_multiplier=100
[connection gerrit]
driver=gerrit
server=review.example.com
user=jenkins
sshkey=fake_id_rsa_path
password=badpassword
# This is longer than our test case timeout so that we can ensure
# test cases aren't magically working because replication timed out.
replication_timeout=300
[connection smtp]
driver=smtp
server=localhost
port=25
default_from=zuul@example.com
default_to=you@example.com
[database]
dburi=$MYSQL_FIXTURE_DBURI$

View File

@@ -508,6 +508,7 @@ class TestOnlineZKOperations(ZuulTestCase):
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='project-promote', result='SUCCESS', changes='1,1'),
dict(name='project-merge', result='SUCCESS', changes='2,1'),
dict(name='project-merge', result='SUCCESS', changes='3,1'),
dict(name='project-test1', result='SUCCESS', changes='3,1'),

View File

@@ -35,7 +35,6 @@ from zuul.driver.gerrit import GerritDriver
from zuul.driver.gerrit.gerritconnection import (
ChangeNetworkConflict,
GerritConnection,
GerritEventProcessor,
PeekQueue,
)
@@ -1227,7 +1226,6 @@ class TestGerritConnection(ZuulTestCase):
# Gerrit emits change-merged events after ref-updated events for the
# change; make sure that job configuration changes take effect
# for post pipelines that trigger off of ref-updated.
GerritEventProcessor.delay = 10.0
in_repo_conf = textwrap.dedent(
"""
- job:
@@ -1249,6 +1247,7 @@ class TestGerritConnection(ZuulTestCase):
self.assertHistory([
dict(name='project-post', result='SUCCESS'),
dict(name='new-post-job', result='SUCCESS'),
dict(name='project-promote', result='SUCCESS'),
], ordered=False)
@@ -1335,6 +1334,184 @@ class TestGerritConnectionPreFilter(ZuulTestCase):
self.waitUntilSettled()
class TestGerritConnectionReplication(ZuulTestCase):
config_file = 'zuul-gerrit-replication.conf'
tenant_config_file = 'config/single-tenant/main.yaml'
def test_replication_new_patchset(self):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
self.fake_gerrit.addEvent(A.getPatchsetReplicationStartedEvent(1))
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
old_wait_timeout = self.wait_timeout
self.wait_timeout = 10
with testtools.ExpectedException(Exception,
"Timeout waiting for Zuul to settle"):
# We expect this exception because ZK queues won't empty until
# we either timeout waiting for the replication event or get
# the replication event which happens below.
self.waitUntilSettled()
# In addition to checking that we have inflight items above we
# also ensure that we haven't run any jobs yet.
self.assertHistory([])
self.wait_timeout = old_wait_timeout
# Add a second patchset created event to ensure that we're not going
# to wait for that one to replicate.
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
# Finally complete replication and ensure everything settles out.
self.fake_gerrit.addEvent(A.getPatchsetReplicatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project-merge', result='SUCCESS', changes='2,1'),
dict(name='project-test1', result='SUCCESS', changes='2,1'),
dict(name='project-test2', result='SUCCESS', changes='2,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='2,1'),
], ordered=False)
def test_replication_change_merged(self):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.setMerged()
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
B.setMerged()
self.fake_gerrit.addEvent(A.getChangeMergedReplicationStartedEvent())
self.fake_gerrit.addEvent(A.getChangeMergedEvent())
old_wait_timeout = self.wait_timeout
self.wait_timeout = 10
with testtools.ExpectedException(Exception,
"Timeout waiting for Zuul to settle"):
# We expect this exception because ZK queues won't empty until
# we either timeout waiting for the replication event or get
# the replication event which happens below.
self.waitUntilSettled()
# In addition to checking that we have inflight items above we
# also ensure that we haven't run any jobs yet.
self.assertHistory([])
self.wait_timeout = old_wait_timeout
# Add a second merged change event to ensure that we're not going
# to wait for that one to replicate
self.fake_gerrit.addEvent(B.getChangeMergedEvent())
# Finally complete replication and ensure everything settles out.
self.fake_gerrit.addEvent(A.getChangeMergedReplicatedEvent())
self.waitUntilSettled()
self.assertHistory([
dict(name='project-promote', result='SUCCESS', changes='1,1'),
dict(name='project-promote', result='SUCCESS', changes='2,1'),
], ordered=False)
def test_replication_ref_updated(self):
# This test is admittedly a bit contrived but is a good exercise of
# the ref updated without change merged event combo alongside our
# wait for replication to complete.
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.setMerged()
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
B.setMerged()
self.fake_gerrit.addEvent(A.getChangeMergedReplicationStartedEvent())
# Change merged and ref updated events share the same replication
# events.
self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
old_wait_timeout = self.wait_timeout
self.wait_timeout = 10
with testtools.ExpectedException(Exception,
"Timeout waiting for Zuul to settle"):
# We expect this exception because ZK queues won't empty until
# we either timeout waiting for the replication event or get
# the replication event which happens below.
self.waitUntilSettled()
# In addition to checking that we have inflight items above we
# also ensure that we haven't run any jobs yet.
self.assertHistory([])
self.wait_timeout = old_wait_timeout
# Add a second ref updated event to ensure that we're not going
# to wait for that one to replicate
self.fake_gerrit.addEvent(B.getRefUpdatedEvent())
# Finally complete replication and ensure everything settles out.
self.fake_gerrit.addEvent(A.getChangeMergedReplicatedEvent())
self.waitUntilSettled()
self.assertHistory([
dict(name='project-post', result='SUCCESS',
ref='refs/heads/master'),
dict(name='project-post', result='SUCCESS',
ref='refs/heads/master'),
], ordered=False)
def test_replication_change_merged_combo(self):
# This test tests the combo of ref updated and change merged events
# which occurs when gerrit merges changes.
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.setMerged()
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
B.setMerged()
self.fake_gerrit.addEvent(A.getChangeMergedReplicationStartedEvent())
# Change merged and ref updated events share the same replication
# events.
self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
self.fake_gerrit.addEvent(A.getChangeMergedEvent())
old_wait_timeout = self.wait_timeout
self.wait_timeout = 10
with testtools.ExpectedException(Exception,
"Timeout waiting for Zuul to settle"):
# We expect this exception because ZK queues won't empty until
# we either timeout waiting for the replication event or get
# the replication event which happens below.
self.waitUntilSettled()
# In addition to checking that we have inflight items above we
# also ensure that we haven't run any jobs yet.
self.assertHistory([])
self.wait_timeout = old_wait_timeout
# Add a second ref updated event to ensure that we're not going
# to wait for that one to replicate
self.fake_gerrit.addEvent(B.getRefUpdatedEvent())
self.fake_gerrit.addEvent(B.getChangeMergedEvent())
# Finally complete replication and ensure everything settles out.
self.fake_gerrit.addEvent(A.getChangeMergedReplicatedEvent())
self.waitUntilSettled()
self.assertHistory([
dict(name='project-promote', result='SUCCESS', changes='1,1'),
dict(name='project-post', result='SUCCESS',
ref='refs/heads/master'),
dict(name='project-promote', result='SUCCESS', changes='2,1'),
dict(name='project-post', result='SUCCESS',
ref='refs/heads/master'),
], ordered=False)
class TestGerritConnectionReplicationTimeout(ZuulTestCase):
config_file = 'zuul-gerrit-replication-timeout.conf'
tenant_config_file = 'config/single-tenant/main.yaml'
def test_new_patchset_replication_timeout(self):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
self.fake_gerrit.addEvent(A.getPatchsetReplicationStartedEvent(1))
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# We configure a 10 second timeout. Without replication completion
# events we know that we timed out if the jobs run.
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project-merge', result='SUCCESS', changes='2,1'),
dict(name='project-test1', result='SUCCESS', changes='2,1'),
dict(name='project-test2', result='SUCCESS', changes='2,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='2,1'),
], ordered=False)
class TestGerritUnicodeRefs(ZuulTestCase):
config_file = 'zuul-gerrit-web.conf'
tenant_config_file = 'config/single-tenant/main.yaml'
@@ -1739,19 +1916,54 @@ class TestGerritPeekQueue(BaseTestCase):
"branch": "master",
"number": number,
},
"refName": "ref/heads/master",
"type": "change-merged",
}
})
e.zuul_event_ltime = ltime
return e
def make_replication_scheduled_event(self, ref, ltime):
e = zuul.model.ConnectionEvent({
"timestamp": time.time(),
"payload": {
"type": "ref-replication-scheduled",
"project": "org/project",
"ref": ref,
"refStatus": "OK",
"status": "succeeded",
"targetNode": "git@gitserver:22",
"targetUri": "ssh://git@gitserver:22/org/project",
"eventCreatedOn": int(time.time()),
}
})
e.zuul_event_ltime = ltime
return e
def make_replicated_event(self, ref, ltime):
e = zuul.model.ConnectionEvent({
"timestamp": time.time(),
"payload": {
"type": "ref-replicated",
"project": "org/project",
"ref": ref,
"refStatus": "OK",
"status": "succeeded",
"targetNode": "git@gitserver:22",
"targetUri": "ssh://git@gitserver:22/org/project",
"eventCreatedOn": int(time.time()),
}
})
e.zuul_event_ltime = ltime
return e
def test_peek_queue(self):
handled = []
def handler(x):
handled.append(x)
q = PeekQueue(handler)
q = PeekQueue(handler, 0)
# Check noop
q.run()
@@ -1817,12 +2029,15 @@ class TestGerritPeekQueue(BaseTestCase):
self.assertEqual([], handled)
# This is what the loop will acutally do
q.timeout = 0
delay = q.run()
self.assertEqual(delay, None)
q.run()
q.run(end=True)
expected = [orig[0], orig[1]]
self.assertEqual(expected, handled)
self.assertEqual(1, handled[0].zuul_event_ltime)
self.assertEqual(2, handled[1].zuul_event_ltime)
q.timeout = 10
# Check if Gerrit changes the event order; it doesn't do this
# today, but we want to defend against that.
@@ -1850,6 +2065,77 @@ class TestGerritPeekQueue(BaseTestCase):
self.assertEqual(2, handled[1].zuul_event_ltime)
self.assertEqual(3, handled[2].zuul_event_ltime)
def test_peek_queue_with_replication(self):
handled = []
def handler(x):
handled.append(x)
q = PeekQueue(handler, 0)
q.replication_timeout = 5
# Check noop
q.run()
self.assertEqual([], handled)
# Check one at a time (typical case)
handled.clear()
orig = [
self.make_replication_scheduled_event('refs/heads/master', 1),
self.make_ref_updated_event('refs/heads/master', 'new1', 2),
self.make_change_merged_event(1, 'new1', 3),
self.make_replicated_event('refs/heads/master', 4),
]
q.append(orig[0])
q.run()
self.assertEqual([], handled)
q.append(orig[1])
q.run()
self.assertEqual([], handled)
q.append(orig[2])
q.run()
self.assertEqual([], handled)
q.append(orig[3])
q.run()
expected = [orig[0], orig[2], orig[1], orig[3]]
self.assertEqual(expected, handled)
self.assertEqual(1, handled[0].zuul_event_ltime)
self.assertEqual(2, handled[1].zuul_event_ltime)
self.assertEqual(2, handled[2].zuul_event_ltime)
self.assertEqual(4, handled[3].zuul_event_ltime)
# Check with replication timeout
handled.clear()
q.replication_timeout = 1
orig = [
self.make_replication_scheduled_event('refs/heads/master', 1),
self.make_ref_updated_event('refs/heads/master', 'new1', 2),
self.make_change_merged_event(1, 'new1', 3),
]
q.append(orig[0])
q.run()
self.assertEqual([], handled)
q.append(orig[1])
q.run()
self.assertEqual([], handled)
q.append(orig[2])
q.run()
self.assertEqual([], handled)
# Longer than our timeout
time.sleep(2)
q.run()
expected = [orig[0], orig[2], orig[1]]
self.assertEqual(expected, handled)
self.assertEqual(1, handled[0].zuul_event_ltime)
self.assertEqual(2, handled[1].zuul_event_ltime)
self.assertEqual(2, handled[2].zuul_event_ltime)
def test_peek_queue_busy_timeout(self):
# Our last line of defense on a busy system is the extra 10
# second timeout -- if we see an event that's 10 seconds later
@@ -1861,7 +2147,7 @@ class TestGerritPeekQueue(BaseTestCase):
def handler(x):
handled.append(x)
q = PeekQueue(handler)
q = PeekQueue(handler, 0)
orig = [
self.make_ref_updated_event('refs/heads/master', 'new1', 1),

View File

@@ -653,6 +653,7 @@ class TestWeb(BaseTestWeb):
{'name': 'check', 'triggers': [gerrit_trigger]},
{'name': 'gate', 'triggers': [gerrit_trigger]},
{'name': 'post', 'triggers': [gerrit_trigger]},
{'name': 'promote', 'triggers': [gerrit_trigger]},
{'name': 'periodic', 'triggers': [timer_trigger]},
]
self.assertEqual(expected_list, data)
@@ -934,6 +935,53 @@ class TestWeb(BaseTestWeb):
'workspace_checkout': True,
}
]],
}, {'name': 'promote',
'jobs': [[
{'abstract': False,
'ansible_split_streams': None,
'ansible_version': None,
'attempts': 3,
'branches': None,
'deduplicate': 'auto',
'dependencies': [],
'description': None,
'files': [],
'final': False,
'failure_output': [],
'image_build_name': None,
'intermediate': False,
'irrelevant_files': [],
'match_on_config_updates': True,
'name': 'project-promote',
'override_checkout': None,
'parent': 'base',
'post_review': None,
'post_run': [],
'cleanup_run': [],
'pre_run': [],
'protected': None,
'provides': [],
'required_projects': [],
'requires': [],
'roles': [],
'run': [],
'semaphores': [],
'source_context': {'branch': 'master',
'path': 'zuul.yaml',
'project': 'common-config'},
'tags': [],
'timeout': None,
'variables': {},
'extra_variables': {},
'group_variables': {},
'host_variables': {},
'include_vars': [],
'variant_description': '',
'voting': True,
'workspace_scheme': 'golang',
'workspace_checkout': True,
}
]],
}
]
}]
@@ -1081,7 +1129,7 @@ class TestWeb(BaseTestWeb):
def test_jobs_list(self):
jobs = self.get_url("api/tenant/tenant-one/jobs").json()
self.assertEqual(len(jobs), 10)
self.assertEqual(len(jobs), 11)
resp = self.get_url("api/tenant/non-tenant/jobs")
self.assertEqual(404, resp.status_code)
@@ -1238,14 +1286,14 @@ class TestWeb(BaseTestWeb):
'_inheritance_path': [
'<Job base explicit: None implied: '
'{MatchAny:{ImpliedBranchMatcher:master}} '
'source: common-config/zuul.yaml@master#53>',
'source: common-config/zuul.yaml@master#61>',
'<Job project-test1 explicit: None '
'implied: '
'{MatchAny:{ImpliedBranchMatcher:master}} '
'source: common-config/zuul.yaml@master#66>',
'source: common-config/zuul.yaml@master#74>',
'<Job project-test1 explicit: None '
'implied: None source: '
'common-config/zuul.yaml@master#138>'],
'common-config/zuul.yaml@master#157>'],
'build': '00000000000000000000000000000000',
'buildset': None,
'branch': 'master',

View File

@@ -329,37 +329,90 @@ class QueryHistory:
class PeekQueue:
# If we see events whose timestamp is this long since an event
# we're waiting for, give up the wait. This is an extra 10
# seconds beyond the 10 second propagation delay.
timeout = 20
# we're waiting for, give up the wait. This is a 10 second propogation
# delay. Replication timeouts are handled separately and are operator
# configurable.
timeout = 10
def __init__(self, handler):
def __init__(self, handler, replication_timeout):
self.queue = collections.deque()
self.handler = handler
self.replication_timeout = replication_timeout
self.change_merged_cache = cachetools.LRUCache(128)
def append(self, event):
self.queue.append(event)
def run(self, end=False):
def _getProjectAndRef(self, data):
# We need to match up ref-updated, change-merged, and patchset-created
# events to various replication events based on project, ref tuples.
# Unfortunately gerrit events don't use consistent data structures to
# present this data across the events. Pull the required info out
# based on the event type with fallback.
kind = data.get('type')
if kind == 'ref-updated':
refupdate = data.get('refUpdate', {})
return refupdate.get('project'), refupdate.get('refName')
elif kind == 'patchset-created':
change = data.get('change', {})
patchset = data.get('patchSet', {})
return change.get('project'), patchset.get('ref')
elif kind == 'change-merged':
change = data.get('change', {})
# change-merged events need to match up the branch name to
# replication events and not the change ref (it is already
# replicated and captured by patchset-created events).
return change.get('project'), data.get('refName')
elif (project := data.get('project')) and (ref := data.get('ref')):
return project, ref
else:
return None, None
def run(self):
if not self.queue:
return
# Try to do two things:
# Hold events until they have replicated and
# Attempt to match ref-updated events with change-merged
# events.
ref_replication = collections.defaultdict(list)
ref_updates = {}
new_event_list = collections.deque()
latest_time = 0
for event in self.queue:
data = event["payload"]
kind = data.get('type')
refupdate = data.get('refUpdate', {})
ref = refupdate.get('refName')
latest_time = max(event["timestamp"], latest_time)
inserted = False
if (kind == 'ref-updated' and
((not ref.startswith('refs/')) or
ref.startswith('refs/heads/'))):
if kind == 'ref-replication-scheduled':
# Note we can get many ref-replication-scheduled events for
# a single ref-replication-done event. We can also get
# overlapping ref-replication-scheduled events for the same
# ref. For this reason we don't use the -done events and
# instead rely on counting matching pairs of -scheduled and
# ref-replicated events. When all have paired up or we timeout
# the related events are considered valid.
#
# Replication events don't use the same refUpdate and refName
# conventions...
ref = data.get('ref')
project = data.get('project')
ref_replication[(project, ref)].append(event)
elif kind == 'ref-replicated':
ref = data.get('ref')
project = data.get('project')
replication_events = ref_replication[(project, ref)]
# TODO(clarkb) Is it necessary to wait for success?
if (len(replication_events) and
data.get("status") == "succeeded"):
# Its possible we start listening when things have already
# started replicating. In that case we'll empty the
# replication events list early.
replication_events.pop()
elif (kind == 'ref-updated' and
((not ref.startswith('refs/')) or
ref.startswith('refs/heads/'))):
# This is a ref-updated event for a branch, we
# want to find its change-merged event.
newrev = refupdate.get('newRev')
@@ -391,37 +444,59 @@ class PeekQueue:
while new_event_list:
event = new_event_list.popleft()
data = event["payload"]
kind = data.get('type')
project, ref = self._getProjectAndRef(data)
ok = False
delay = None
now = time.time()
time_since_event = now - event["timestamp"]
# First check if replication is up to date.
if self.replication_timeout <= 0:
# We are not configured to look at Gerrit replication
# targets. Ignore replication status.
ok = True
elif ref and project and ref_replication[(project, ref)]:
if time_since_event >= self.replication_timeout:
# Waited long enough for replication
del ref_replication[(project, ref)]
ok = True
else:
# If replication hasn't completed wait longer
ok = False
# Wait max one second for replication to complete.
delay = min(self.replication_timeout - time_since_event, 1)
else:
# We have replicated or don't have replication events
# corresponding to our project,ref. Proceed.
ok = True
if not ok:
return delay
# Now check if ref-updated and change-merged events need to be
# reordered
kind = data.get('type')
if kind == 'ref-updated':
refupdate = data.get('refUpdate')
refupdate = data.get('refUpdate', {})
newrev = refupdate.get('newRev')
if newrev in ref_updates:
# We're waiting on data for this one
if end:
# It's been more than 10 seconds (gerrit
# event delay) since we saw the
# ref-updated event, and we're at the end
# of the list of events in zk, so it's
# probably not going to show up. Release
# it.
ok = True
elif latest_time - event["timestamp"] >= self.timeout:
# It's been a further 10 seconds since we saw
if time_since_event >= self.timeout:
# It's been 10 seconds since we saw
# the event, so it may be missing at this
# point; release it.
ok = True
# Otherwise, we're still waiting
else:
# Otherwise, we're still waiting
ok = False
delay = min(self.timeout - time_since_event, 1)
else:
# Not a branch ref-update
ok = True
else:
# Not a ref-update at all
# Not a ref-update and not waiting for replication
ok = True
if not ok:
# if we're still waiting for an event, don't send
# any more so that we preserve the order.
return
return delay
self.queue.remove(event)
self.handler(event)
@@ -434,39 +509,23 @@ class GerritEventConnector(BaseThreadPoolEventConnector):
'cache-eviction', # evict-cache plugin
'fetch-ref-replicated',
'fetch-ref-replication-scheduled',
'ref-replicated',
'ref-replication-scheduled',
'ref-replication-done'
)
log = logging.getLogger("zuul.GerritEventConnector")
def __init__(self, connection):
def __init__(self, connection, replication_timeout):
super().__init__(connection)
self._peek_queue = PeekQueue(self._peekQueueHandler)
self.replication_timeout = replication_timeout
self._peek_queue = PeekQueue(
self._peekQueueHandler, replication_timeout)
def _getEventProcessor(self, event):
return GerritEventProcessor(self, event).run
def _calculateDelay(self, connection_event):
timestamp = connection_event["timestamp"]
now = time.time()
delay = max((timestamp + GerritEventProcessor.delay) - now, 0.0)
# Gerrit can produce inconsistent data immediately after an
# event, So ensure that we do not deliver the event to Zuul
# until at least a certain amount of time has passed. Note
# that if we receive several events in succession, we will
# only need to delay for the first event. In essence, Zuul
# should always be a constant number of seconds behind Gerrit.
self.log.debug("Handling event received %ss ago, delaying %ss",
now - timestamp, delay)
time.sleep(delay)
return delay
def _dispatchEvents(self):
# This is the first half of the event dispatcher. It reads
# events from the webhook event queue and passes them to a
# events from the ssh stream event queue and passes them to a
# concurrent executor for pre-processing.
# This overrides the superclass in order to add the peek queue.
@@ -479,17 +538,16 @@ class GerritEventConnector(BaseThreadPoolEventConnector):
except ValueError:
event_id_offset = None
delay = None
for event in self.event_queue.iter(event_id_offset):
if self._stopped:
break
delay = self._calculateDelay(event)
self._peek_queue.append(event)
delay = self._peek_queue.run()
if delay:
return delay
self._peek_queue.append(event)
self._peek_queue.run()
self._peek_queue.run(end=True)
return self._peek_queue.run()
def _peekQueueHandler(self, event):
# Called when the peek queue has decided an event should be processed
@@ -505,7 +563,6 @@ class GerritEventConnector(BaseThreadPoolEventConnector):
class GerritEventProcessor:
tracer = trace.get_tracer("zuul")
delay = 10.0
def __init__(self, connector, connection_event):
self.connector = connector
@@ -539,6 +596,13 @@ class GerritEventProcessor:
data = connection_event["payload"]
event = GerritTriggerEvent.fromGerritEventDict(
data, timestamp, self.connection, self.zuul_event_id)
event_type = data.get("type")
# Recheck events as we may have only cared about replication events
# in preprocessing.
if not self.connection.shouldProcessEvent(event_type, event, False):
self.log.debug("Event did not match pre-filters after "
"preprocessing %s", event)
return []
min_change_ltime = self.zk_client.getCurrentLtime()
# In order to perform connection hygene actions like those
@@ -618,7 +682,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
refname_bad_sequences = re2.compile(
r"[ \\*\[?:^~\x00-\x1F\x7F]|" # Forbidden characters
r"@{|\.\.|\.$|^@$|/$|^/|//+") # everything else we can check with re2
replication_timeout = 300
is_merged_replication_timeout = 300
replication_retry_interval = 5
_poller_class = GerritChecksPoller
_ref_watcher_class = GitWatcher
@@ -667,6 +731,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.event_source = self.EVENT_SOURCE_KINESIS
elif self.connection_config.get('gcloud_pubsub_project', None):
self.event_source = self.EVENT_SOURCE_GCLOUD_PUBSUB
self.replication_timeout = int(self.connection_config.get(
'replication_timeout', 0))
# Thread for whatever event source we use
self.event_thread = None
@@ -1239,7 +1305,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
ref: str, old_sha: str='') -> bool:
# Wait for the ref to show up in the repo
start = time.time()
while time.time() - start < self.replication_timeout:
while time.time() - start < self.is_merged_replication_timeout:
sha = self.getRefSha(project, ref)
if old_sha != sha:
return True
@@ -1364,18 +1430,38 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
# with user-specific branches.
return True
def shouldProcessEvent(self, event_type, event, replication):
valid_events = event._branch_ref_update or \
event.default_branch_changed or \
event.change_number
if replication:
valid_events = valid_events or \
event_type == "ref-replication-scheduled" or \
event_type == "ref-replicated"
if not (valid_events):
# Check if the events match specific filters and are
# valid.
for event_filter in self.watched_event_filters:
r = event_filter.preFilter(event)
if r:
break
else:
return False
return True
def addEvent(self, data):
# NOTE(mnaser): Certain plugins fire events which end up causing
# an unrecognized event log *and* a traceback if they
# do not contain full project information, we skip them
# here to keep logs clean.
if data.get('type') in GerritEventConnector.IGNORED_EVENTS:
event_type = data.get('type')
if event_type in GerritEventConnector.IGNORED_EVENTS:
return
# Due to notedb, an high percentage of all events Zuul
# processes are ref-updated of the /meta ref, and that is
# unlikely to be used in Zuul. Skip those here so that we
# reduce traffic on the event queue.
if data.get('type') == 'ref-updated':
if event_type == 'ref-updated':
refname = data.get('refUpdate', {}).get('refName', '')
if (refname.startswith('refs/changes/') and
refname.endswith('/meta')):
@@ -1386,20 +1472,12 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
event = GerritTriggerEvent.fromGerritEventDict(
data, None, self, None)
# If the event might trigger Zuul reconfiguration actions, we
# keep it; otherwise check to see if it matches our
# pre-filters:
if not (
event._branch_ref_update or
event.default_branch_changed or
event.change_number):
for event_filter in self.watched_event_filters:
r = event_filter.preFilter(event)
if r:
break
else:
self.log.debug("Event did not match pre-filters %s", event)
return
# If the event communicates replication info or might trigger
# reconfiguration actions we keep it; otherwise check to see if
# it matches our pre-filters:
if not self.shouldProcessEvent(event_type, event, True):
self.log.debug("Event did not match pre-filters %s", event)
return
event_uuid = uuid4().hex
attributes = {
@@ -2073,7 +2151,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.ref_watcher_thread.start()
def startEventConnector(self):
self.gerrit_event_connector = GerritEventConnector(self)
self.gerrit_event_connector = GerritEventConnector(
self, self.replication_timeout)
self.gerrit_event_connector.start()
def stopEventConnector(self):