Ensure single instance for active event gathering

Active event gathering uses leader election to ensure that there is
only one instance at a time receiving events.

Change-Id: Ib9e095430fa82be2327dcf7fd01ee4275b17415f
This commit is contained in:
Simon Westphahl 2021-03-29 15:56:04 +02:00
parent 7397d030ed
commit ebcbb544be
3 changed files with 101 additions and 65 deletions

View File

@ -42,7 +42,7 @@ from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
from zuul.driver.git.gitwatcher import GitWatcher from zuul.driver.git.gitwatcher import GitWatcher
from zuul.lib.logutil import get_annotated_logger from zuul.lib.logutil import get_annotated_logger
from zuul.model import Ref, Tag, Branch, Project from zuul.model import Ref, Tag, Branch, Project
from zuul.zk.event_queues import ConnectionEventQueue from zuul.zk.event_queues import ConnectionEventQueue, EventReceiverElection
# HTTP timeout in seconds # HTTP timeout in seconds
TIMEOUT = 30 TIMEOUT = 30
@ -335,6 +335,11 @@ class GerritWatcher(threading.Thread):
self.hostname = hostname self.hostname = hostname
self.port = port self.port = port
self.gerrit_connection = gerrit_connection self.gerrit_connection = gerrit_connection
self._stop_event = threading.Event()
self.watcher_election = EventReceiverElection(
gerrit_connection.sched.zk_client,
gerrit_connection.connection_name,
"watcher")
self.keepalive = keepalive self.keepalive = keepalive
self._stopped = False self._stopped = False
@ -392,10 +397,6 @@ class GerritWatcher(threading.Thread):
if ret and ret not in [-1, 130]: if ret and ret not in [-1, 130]:
raise Exception("Gerrit error executing stream-events") raise Exception("Gerrit error executing stream-events")
except Exception:
self.log.exception("Exception on ssh event stream with %s:",
self.gerrit_connection.connection_name)
time.sleep(5)
finally: finally:
# If we don't close on exceptions to connect we can leak the # If we don't close on exceptions to connect we can leak the
# connection and DoS Gerrit. # connection and DoS Gerrit.
@ -403,11 +404,18 @@ class GerritWatcher(threading.Thread):
def run(self): def run(self):
while not self._stopped: while not self._stopped:
self._run() try:
self.watcher_election.run(self._run)
except Exception:
self.log.exception("Exception on ssh event stream with %s:",
self.gerrit_connection.connection_name)
self._stop_event.wait(5)
def stop(self): def stop(self):
self.log.debug("Stopping watcher") self.log.debug("Stopping watcher")
self._stopped = True self._stopped = True
self._stop_event.set()
self.watcher_election.cancel(self._run)
class GerritPoller(threading.Thread): class GerritPoller(threading.Thread):
@ -419,6 +427,8 @@ class GerritPoller(threading.Thread):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.connection = connection self.connection = connection
self.last_merged_poll = 0 self.last_merged_poll = 0
self.poller_election = EventReceiverElection(
connection.sched.zk_client, connection.connection_name, "poller")
self._stopped = False self._stopped = False
self._stop_event = threading.Event() self._stop_event = threading.Event()
@ -449,43 +459,30 @@ class GerritPoller(threading.Thread):
}} }}
def _poll_checkers(self): def _poll_checkers(self):
try: for checker in self.connection.watched_checkers:
for checker in self.connection.watched_checkers: changes = self.connection.get(
changes = self.connection.get( 'plugins/checks/checks.pending/?'
'plugins/checks/checks.pending/?' 'query=checker:%s+(state:NOT_STARTED)' % checker)
'query=checker:%s+(state:NOT_STARTED)' % checker) for change in changes:
for change in changes: for uuid, check in change['pending_checks'].items():
for uuid, check in change['pending_checks'].items(): event = self._makePendingCheckEvent(
event = self._makePendingCheckEvent( change, uuid, check)
change, uuid, check) self.connection.addEvent(event)
self.connection.addEvent(event)
except Exception:
self.log.exception("Exception on Gerrit poll with %s:",
self.connection.connection_name)
def _poll_merged_changes(self): def _poll_merged_changes(self):
try: now = datetime.datetime.utcnow()
now = datetime.datetime.utcnow() age = self.last_merged_poll
age = self.last_merged_poll if age:
if age: # Allow an extra 4 seconds for request time
# Allow an extra 4 seconds for request time age = int(math.ceil((now - age).total_seconds())) + 4
age = int(math.ceil((now - age).total_seconds())) + 4 changes = self.connection.simpleQueryHTTP(
changes = self.connection.simpleQueryHTTP( "status:merged -age:%ss" % (age,))
"status:merged -age:%ss" % (age,)) self.last_merged_poll = now
self.last_merged_poll = now for change in changes:
for change in changes: event = self._makeChangeMergedEvent(change)
event = self._makeChangeMergedEvent(change) self.connection.addEvent(event)
self.connection.addEvent(event)
except Exception:
self.log.exception("Exception on Gerrit poll with %s:",
self.connection.connection_name)
def _run(self): def _run(self):
self._poll_checkers()
if not self.connection.enable_stream_events:
self._poll_merged_changes()
def run(self):
last_start = time.time() last_start = time.time()
while not self._stopped: while not self._stopped:
next_start = last_start + self.poll_interval next_start = last_start + self.poll_interval
@ -493,12 +490,24 @@ class GerritPoller(threading.Thread):
if self._stopped: if self._stopped:
break break
last_start = time.time() last_start = time.time()
self._run()
self._poll_checkers()
if not self.connection.enable_stream_events:
self._poll_merged_changes()
def run(self):
while not self._stopped:
try:
self.poller_election.run(self._run)
except Exception:
self.log.exception("Exception on Gerrit poll with %s:",
self.connection.connection_name)
def stop(self): def stop(self):
self.log.debug("Stopping watcher") self.log.debug("Stopping watcher")
self._stopped = True self._stopped = True
self._stop_event.set() self._stop_event.set()
self.poller_election.cancel()
class GerritConnection(BaseConnection): class GerritConnection(BaseConnection):
@ -1540,7 +1549,8 @@ class GerritConnection(BaseConnection):
self, self,
self.baseurl, self.baseurl,
self.ref_watcher_poll_interval, self.ref_watcher_poll_interval,
self.refWatcherCallback) self.refWatcherCallback,
election_name="ref-watcher")
self.ref_watcher_thread.start() self.ref_watcher_thread.start()
def _stop_event_connector(self): def _stop_event_connector(self):

View File

@ -17,18 +17,19 @@
import os import os
import logging import logging
import threading import threading
import time
import git import git
from zuul.driver.git.gitmodel import EMPTY_GIT_REF from zuul.driver.git.gitmodel import EMPTY_GIT_REF
from zuul.zk.event_queues import EventReceiverElection
# This class may be used by any driver to implement git head polling. # This class may be used by any driver to implement git head polling.
class GitWatcher(threading.Thread): class GitWatcher(threading.Thread):
log = logging.getLogger("zuul.connection.git.watcher") log = logging.getLogger("zuul.connection.git.watcher")
def __init__(self, connection, baseurl, poll_delay, callback): def __init__(self, connection, baseurl, poll_delay, callback,
election_name="watcher"):
"""Watch for branch changes """Watch for branch changes
Watch every project listed in the connection and call a Watch every project listed in the connection and call a
@ -43,6 +44,8 @@ class GitWatcher(threading.Thread):
:param function callback: :param function callback:
A callback method to be called for each updated ref. The sole A callback method to be called for each updated ref. The sole
argument is a dictionary describing the update. argument is a dictionary describing the update.
:param str election_name:
Name to use in the Zookeeper election of the watcher.
""" """
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.daemon = True self.daemon = True
@ -50,8 +53,13 @@ class GitWatcher(threading.Thread):
self.baseurl = baseurl self.baseurl = baseurl
self.poll_delay = poll_delay self.poll_delay = poll_delay
self._stopped = False self._stopped = False
self._stop_event = threading.Event()
self.projects_refs = {} self.projects_refs = {}
self.callback = callback self.callback = callback
self.watcher_election = EventReceiverElection(
connection.sched.zk_client,
connection.connection_name,
election_name)
# This is used by the test framework # This is used by the test framework
self._event_count = 0 self._event_count = 0
self._pause = False self._pause = False
@ -110,37 +118,44 @@ class GitWatcher(threading.Thread):
events.append(event) events.append(event)
return events return events
def _run(self): def _poll(self):
self.log.debug("Walk through projects refs for connection: %s" % self.log.debug("Walk through projects refs for connection: %s" %
self.connection.connection_name) self.connection.connection_name)
try: for project in self.connection.projects:
for project in self.connection.projects: refs = self.lsRemote(project)
refs = self.lsRemote(project) self.log.debug("Read %s refs for project %s",
self.log.debug("Read %s refs for project %s", len(refs), project)
len(refs), project) if not self.projects_refs.get(project):
if not self.projects_refs.get(project): # State for this project does not exist yet so add it.
# State for this project does not exist yet so add it. # No event will be triggered in this loop as
# No event will be triggered in this loop as # projects_refs['project'] and refs are equal
# projects_refs['project'] and refs are equal
self.projects_refs[project] = refs
events = self.compareRefs(project, refs)
self.projects_refs[project] = refs self.projects_refs[project] = refs
# Send events to the scheduler events = self.compareRefs(project, refs)
for event in events: self.projects_refs[project] = refs
self.log.debug("Sending event: %s" % event) # Send events to the scheduler
self.callback(event) for event in events:
self._event_count += 1 self.log.debug("Sending event: %s" % event)
except Exception as e: self.callback(event)
self.log.debug("Unexpected issue in _run loop: %s" % str(e)) self._event_count += 1
def run(self): def _run(self):
while not self._stopped: while not self._stopped:
if not self._pause: if not self._pause:
self._run() self._poll()
# Polling wait delay # Polling wait delay
else: else:
self.log.debug("Watcher is on pause") self.log.debug("Watcher is on pause")
time.sleep(self.poll_delay) self._stop_event.wait(self.poll_delay)
def run(self):
while not self._stopped:
try:
self.watcher_election.run(self._run)
except Exception:
self.log.exception("Unexpected issue in _run loop:")
def stop(self): def stop(self):
self.log.debug("Stopping watcher")
self._stopped = True self._stopped = True
self._stop_event.set()
self.watcher_election.cancel()

View File

@ -25,6 +25,7 @@ from contextlib import suppress
from kazoo.exceptions import NoNodeError from kazoo.exceptions import NoNodeError
from kazoo.protocol.states import EventType from kazoo.protocol.states import EventType
from kazoo.recipe.election import Election
from zuul import model from zuul import model
from zuul.lib.collections import DefaultKeyDict from zuul.lib.collections import DefaultKeyDict
@ -581,3 +582,13 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
event = model.ConnectionEvent.fromDict(data) event = model.ConnectionEvent.fromDict(data)
event.ack_ref = ack_ref event.ack_ref = ack_ref
yield event yield event
class EventReceiverElection(Election):
"""Election for a singleton event receiver."""
def __init__(self, client, connection_name, receiver_name):
self.election_root = "/".join(
(CONNECTION_ROOT, connection_name, f"election-{receiver_name}")
)
super().__init__(client.client, self.election_root)