Dispatch Gerrit events via Zookeeper

Change-Id: I02b0c45b3e73f545f864588611c65ed7aa5f5d2b
This commit is contained in:
Simon Westphahl 2021-03-29 15:54:00 +02:00
parent c861ec6dbc
commit 7397d030ed
2 changed files with 45 additions and 30 deletions

View File

@ -86,7 +86,6 @@ from zuul.driver.github.githubconnection import GithubClientManager
from zuul.driver.elasticsearch import ElasticsearchDriver from zuul.driver.elasticsearch import ElasticsearchDriver
from zuul.lib.collections import DefaultKeyDict from zuul.lib.collections import DefaultKeyDict
from zuul.lib.connections import ConnectionRegistry from zuul.lib.connections import ConnectionRegistry
from zuul.lib.queue import NamedQueue
from zuul.zk import ZooKeeperClient from zuul.zk import ZooKeeperClient
from zuul.zk.event_queues import ConnectionEventQueue from zuul.zk.event_queues import ConnectionEventQueue
from psutil import Popen from psutil import Popen
@ -236,7 +235,6 @@ class GerritDriverMock(GerritDriver):
if connection.web_server: if connection.web_server:
self.add_cleanup(connection.web_server.stop) self.add_cleanup(connection.web_server.stop)
self.additional_event_queues.append(connection.event_queue)
setattr(self.registry, 'fake_' + name, connection) setattr(self.registry, 'fake_' + name, connection)
return connection return connection
@ -1159,7 +1157,6 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
super(FakeGerritConnection, self).__init__(driver, connection_name, super(FakeGerritConnection, self).__init__(driver, connection_name,
connection_config) connection_config)
self.event_queue = NamedQueue('FakeGerritConnectionEventQueue')
self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit') self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
self.change_number = 0 self.change_number = 0
self.changes = changes_db self.changes = changes_db

View File

@ -41,8 +41,8 @@ from zuul.driver.gerrit.gcloudauth import GCloudAuth
from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
from zuul.driver.git.gitwatcher import GitWatcher from zuul.driver.git.gitwatcher import GitWatcher
from zuul.lib.logutil import get_annotated_logger from zuul.lib.logutil import get_annotated_logger
from zuul.lib.queue import NamedQueue
from zuul.model import Ref, Tag, Branch, Project from zuul.model import Ref, Tag, Branch, Project
from zuul.zk.event_queues import ConnectionEventQueue
# HTTP timeout in seconds # HTTP timeout in seconds
TIMEOUT = 30 TIMEOUT = 30
@ -131,16 +131,44 @@ class GerritEventConnector(threading.Thread):
super(GerritEventConnector, self).__init__() super(GerritEventConnector, self).__init__()
self.daemon = True self.daemon = True
self.connection = connection self.connection = connection
self.event_queue = connection.event_queue
self._stopped = False self._stopped = False
self._connector_wake_event = threading.Event()
def stop(self): def stop(self):
self._stopped = True self._stopped = True
self.connection.addEvent(None) self._connector_wake_event.set()
self.event_queue.election.cancel()
def _handleEvent(self): def _onNewEvent(self):
ts, data = self.connection.getEvent() self._connector_wake_event.set()
# Stop the data watch in case the connector was stopped
return not self._stopped
def run(self):
self.event_queue.registerEventWatch(self._onNewEvent)
while not self._stopped:
try:
self.event_queue.election.run(self._run)
except Exception:
self.log.exception("Exception moving Gerrit event:")
def _run(self):
while not self._stopped:
for event in self.event_queue:
try:
self._handleEvent(event)
finally:
self.event_queue.ack(event)
if self._stopped: if self._stopped:
return return
self._connector_wake_event.wait(10)
self._connector_wake_event.clear()
def _handleEvent(self, connection_event):
timestamp = connection_event["timestamp"]
data = connection_event["payload"]
# Gerrit can produce inconsistent data immediately after an # Gerrit can produce inconsistent data immediately after an
# event, So ensure that we do not deliver the event to Zuul # event, So ensure that we do not deliver the event to Zuul
# until at least a certain amount of time has passed. Note # until at least a certain amount of time has passed. Note
@ -148,9 +176,9 @@ class GerritEventConnector(threading.Thread):
# only need to delay for the first event. In essence, Zuul # only need to delay for the first event. In essence, Zuul
# should always be a constant number of seconds behind Gerrit. # should always be a constant number of seconds behind Gerrit.
now = time.time() now = time.time()
time.sleep(max((ts + self.delay) - now, 0.0)) time.sleep(max((timestamp + self.delay) - now, 0.0))
event = GerritTriggerEvent() event = GerritTriggerEvent()
event.timestamp = ts event.timestamp = timestamp
# Gerrit events don't have an event id that could be used to globally # Gerrit events don't have an event id that could be used to globally
# identify this event in the system so we have to generate one. # identify this event in the system so we have to generate one.
@ -294,17 +322,6 @@ class GerritEventConnector(threading.Thread):
event.patch_number, event.patch_number,
refresh=True, event=event) refresh=True, event=event)
def run(self):
while True:
if self._stopped:
return
try:
self._handleEvent()
except Exception:
self.log.exception("Exception moving Gerrit event:")
finally:
self.connection.eventDone()
class GerritWatcher(threading.Thread): class GerritWatcher(threading.Thread):
log = logging.getLogger("gerrit.GerritWatcher") log = logging.getLogger("gerrit.GerritWatcher")
@ -527,7 +544,6 @@ class GerritConnection(BaseConnection):
self.watcher_thread = None self.watcher_thread = None
self.poller_thread = None self.poller_thread = None
self.ref_watcher_thread = None self.ref_watcher_thread = None
self.event_queue = NamedQueue(f'GerritEventQueue<{connection_name}>')
self.client = None self.client = None
self.watched_checkers = [] self.watched_checkers = []
self.project_checker_map = {} self.project_checker_map = {}
@ -1031,13 +1047,11 @@ class GerritConnection(BaseConnection):
return heads return heads
def addEvent(self, data): def addEvent(self, data):
return self.event_queue.put((time.time(), data)) event = {
"timestamp": time.time(),
def getEvent(self): "payload": data
return self.event_queue.get() }
self.event_queue.put(event)
def eventDone(self):
self.event_queue.task_done()
def review(self, item, message, submit, labels, checks_api, def review(self, item, message, submit, labels, checks_api,
file_comments, zuul_event_id=None): file_comments, zuul_event_id=None):
@ -1468,6 +1482,10 @@ class GerritConnection(BaseConnection):
except Exception: except Exception:
self.log.exception("Unable to determine remote Gerrit version") self.log.exception("Unable to determine remote Gerrit version")
self.log.info("Creating Zookeeper event queue")
self.event_queue = ConnectionEventQueue(self.sched.zk_client,
self.connection_name)
if self.enable_stream_events: if self.enable_stream_events:
self._start_watcher_thread() self._start_watcher_thread()
else: else: