Gerrit: add polling support for refs

In a Gerrit with no stream-events available, we may need to poll
the server to artificially create ref-updated events for post
and tag pipelines.  This adds support for that, reusing the code
in the git driver.

Change-Id: I3b1d8c6f02ce90bd9cb4d425ddfde28544665891
This commit is contained in:
James E. Blair 2020-02-05 14:27:55 -08:00
parent cbaa384e47
commit 9be39dc67d
4 changed files with 135 additions and 3 deletions

View File

@ -62,6 +62,7 @@ import paramiko
import tests.fakegithub
import zuul.driver.gerrit.gerritsource as gerritsource
import zuul.driver.gerrit.gerritconnection as gerritconnection
import zuul.driver.git.gitwatcher as gitwatcher
import zuul.driver.github.githubconnection as githubconnection
import zuul.driver.pagure.pagureconnection as pagureconnection
import zuul.driver.github
@ -866,6 +867,26 @@ class FakeGerritPoller(gerritconnection.GerritPoller):
return r
class FakeGerritRefWatcher(gitwatcher.GitWatcher):
"""A Fake Gerrit ref watcher.
This subclasses
:py:class:`~zuul.connection.git.GitWatcher`.
"""
def __init__(self, *args, **kw):
super(FakeGerritRefWatcher, self).__init__(*args, **kw)
self.baseurl = self.connection.upstream_root
self.poll_delay = 1
def _run(self, *args, **kw):
r = super(FakeGerritRefWatcher, self)._run(*args, **kw)
# Set the event so tests can confirm that the watcher has run
# after they changed something.
self.connection._ref_watcher_event.set()
return r
class FakeGerritConnection(gerritconnection.GerritConnection):
"""A Fake Gerrit connection for use in tests.
@ -876,9 +897,11 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
log = logging.getLogger("zuul.test.FakeGerritConnection")
_poller_class = FakeGerritPoller
_ref_watcher_class = FakeGerritRefWatcher
def __init__(self, driver, connection_name, connection_config,
changes_db=None, upstream_root=None, poller_event=None):
changes_db=None, upstream_root=None, poller_event=None,
ref_watcher_event=None):
if connection_config.get('password'):
self.web_server = GerritWebServer(self)
@ -899,6 +922,7 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
self.upstream_root = upstream_root
self.fake_checkers = []
self._poller_event = poller_event
self._ref_watcher_event = ref_watcher_event
def addFakeChecker(self, **kw):
self.fake_checkers.append(kw)
@ -3379,11 +3403,14 @@ class ZuulTestCase(BaseTestCase):
def getGerritConnection(driver, name, config):
db = self.gerrit_changes_dbs.setdefault(config['server'], {})
event = self.poller_events.setdefault(name, threading.Event())
poll_event = self.poller_events.setdefault(name, threading.Event())
ref_event = self.poller_events.setdefault(name + '-ref',
threading.Event())
con = FakeGerritConnection(driver, name, config,
changes_db=db,
upstream_root=self.upstream_root,
poller_event=event)
poller_event=poll_event,
ref_watcher_event=ref_event)
if con.web_server:
self.addCleanup(con.web_server.stop)

View File

@ -0,0 +1,37 @@
- pipeline:
name: post
manager: independent
trigger:
gerrit:
- event: ref-updated
ref: ^refs/heads/.*$
- pipeline:
name: tag
manager: independent
trigger:
gerrit:
- event: ref-updated
ref: ^refs/tags/.*$
- job:
name: base
parent: null
run: playbooks/base.yaml
- job:
name: post-job
run: playbooks/post-job.yaml
- job:
name: tag-job
run: playbooks/post-job.yaml
- project:
name: org/project
post:
jobs:
- post-job
tag:
jobs:
- tag-job

View File

@ -420,3 +420,39 @@ class TestPolling(ZuulTestCase):
dict(name='test-job', result='SUCCESS', changes='2,1'),
dict(name='test-job2', result='SUCCESS', changes='2,1'),
], ordered=False)
@simple_layout('layouts/gerrit-poll-post.yaml')
def test_post(self):
# Test that ref-updated events trigger post jobs.
self.waitUntilSettled()
# Wait for an initial poll to get the original sha.
self.waitForPoll('gerrit-ref')
# Merge a change.
self.create_commit('org/project')
# Wait for the job to run.
self.waitForPoll('gerrit-ref')
self.waitUntilSettled()
self.assertHistory([
dict(name='post-job', result='SUCCESS'),
])
@simple_layout('layouts/gerrit-poll-post.yaml')
def test_tag(self):
# Test that ref-updated events trigger post jobs.
self.waitUntilSettled()
# Wait for an initial poll to get the original sha.
self.waitForPoll('gerrit-ref')
# Merge a change.
self.fake_gerrit.addFakeTag('org/project', 'master', 'foo')
# Wait for the job to run.
self.waitForPoll('gerrit-ref')
self.waitUntilSettled()
self.assertHistory([
dict(name='tag-job', result='SUCCESS'),
])

View File

@ -40,6 +40,7 @@ from zuul.connection import BaseConnection
from zuul.driver.gerrit.auth import FormAuth
from zuul.driver.gerrit.gcloudauth import GCloudAuth
from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
from zuul.driver.git.gitwatcher import GitWatcher
from zuul.lib.logutil import get_annotated_logger
from zuul.model import Ref, Tag, Branch, Project
@ -477,6 +478,8 @@ class GerritConnection(BaseConnection):
replication_timeout = 300
replication_retry_interval = 5
_poller_class = GerritPoller
_ref_watcher_class = GitWatcher
ref_watcher_poll_interval = 60
def __init__(self, driver, connection_name, connection_config):
super(GerritConnection, self).__init__(driver, connection_name,
@ -505,6 +508,7 @@ class GerritConnection(BaseConnection):
self.enable_stream_events = False
self.watcher_thread = None
self.poller_thread = None
self.ref_watcher_thread = None
self.event_queue = queue.Queue()
self.client = None
self.watched_checkers = []
@ -1388,6 +1392,18 @@ class GerritConnection(BaseConnection):
self.log.info("Remote version is: %s (parsed as %s)" %
(version, self.version))
def refWatcherCallback(self, data):
event = {
'type': 'ref-updated',
'refUpdate': {
'project': data['project'],
'refName': data['ref'],
'oldRev': data['oldrev'],
'newRev': data['newrev'],
}
}
self.addEvent(event)
def onLoad(self):
self.log.debug("Starting Gerrit Connection/Watchers")
try:
@ -1398,6 +1414,8 @@ class GerritConnection(BaseConnection):
if self.enable_stream_events:
self._start_watcher_thread()
else:
self._start_ref_watcher_thread()
self._start_poller_thread()
self._start_event_connector()
@ -1405,6 +1423,7 @@ class GerritConnection(BaseConnection):
self.log.debug("Stopping Gerrit Connection/Watchers")
self._stop_watcher_thread()
self._stop_poller_thread()
self._stop_ref_watcher_thread()
self._stop_event_connector()
def _stop_watcher_thread(self):
@ -1431,6 +1450,19 @@ class GerritConnection(BaseConnection):
self.poller_thread = self._poller_class(self)
self.poller_thread.start()
def _stop_ref_watcher_thread(self):
if self.ref_watcher_thread:
self.ref_watcher_thread.stop()
self.ref_watcher_thread.join()
def _start_ref_watcher_thread(self):
self.ref_watcher_thread = self._ref_watcher_class(
self,
self.baseurl,
self.ref_watcher_poll_interval,
self.refWatcherCallback)
self.ref_watcher_thread.start()
def _stop_event_connector(self):
if self.gerrit_event_connector:
self.gerrit_event_connector.stop()