Improve gerrit connection event queue logging

It can be difficult to understand what happens between the time
a Gerrit event is received over SSH and it is added to the trigger
event queue.  To address that:

* Generate the zuul event id earlier so that we can match the event
  received from gerrit to the event pulled off the connection event
  queue to build the trigger event (the uuid is currently generated
  at the time the trigger event is created).
* Log when the event is handled by the event connector (and how long
  ago it was received).
* Use an annotated logger when logging the connection event submission.
* Log the start and stop of the event connector election.
* Indicate how long the connection event queue is when we start
  processing it.

Change-Id: I2394290cd1612c47525c034342b28dae6cdf71cb
This commit is contained in:
James E. Blair
2022-06-16 15:18:38 -07:00
parent f2d4ff276b
commit 366cc2a64a
2 changed files with 31 additions and 10 deletions

View File

@@ -197,7 +197,13 @@ class GerritEventConnector(threading.Thread):
time.sleep(1)
def _run(self):
self.log.info("Won connection event queue election for %s",
self.connection.connection_name)
while not self._stopped and self.event_queue.election.is_still_valid():
qlen = len(self.event_queue)
if qlen:
self.log.debug("Connection event queue length for %s: %s",
self.connection.connection_name, qlen)
for event in self.event_queue:
try:
self._handleEvent(event)
@@ -207,27 +213,35 @@ class GerritEventConnector(threading.Thread):
return
self._connector_wake_event.wait(10)
self._connector_wake_event.clear()
self.log.info("Terminating connection event queue processing for %s",
self.connection.connection_name)
def _handleEvent(self, connection_event):
timestamp = connection_event["timestamp"]
data = connection_event["payload"]
if "zuul_event_id" in connection_event:
zuul_event_id = connection_event["zuul_event_id"]
else:
# TODO: This is for backwards compat; Remove after 7.0.0
zuul_event_id = str(uuid4().hex)
log = get_annotated_logger(self.log, zuul_event_id)
now = time.time()
delay = max((timestamp + self.delay) - now, 0.0)
# Gerrit can produce inconsistent data immediately after an
# event, So ensure that we do not deliver the event to Zuul
# until at least a certain amount of time has passed. Note
# that if we receive several events in succession, we will
# only need to delay for the first event. In essence, Zuul
# should always be a constant number of seconds behind Gerrit.
now = time.time()
time.sleep(max((timestamp + self.delay) - now, 0.0))
log.debug("Handling event received %ss ago, delaying %ss",
now - timestamp, delay)
time.sleep(delay)
event = GerritTriggerEvent()
event.timestamp = timestamp
event.connection_name = self.connection.connection_name
# Gerrit events don't have an event id that could be used to globally
# identify this event in the system so we have to generate one.
event.zuul_event_id = str(uuid4().hex)
log = get_annotated_logger(self.log, event)
event.zuul_event_id = zuul_event_id
event.type = data.get('type')
event.uuid = data.get('uuid')
@@ -1196,8 +1210,11 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
if data.get('type') in GerritEventConnector.IGNORED_EVENTS:
return
# Gerrit events don't have an event id that could be used to globally
# identify this event in the system so we have to generate one.
event = {
"timestamp": time.time(),
"zuul_event_id": str(uuid4().hex),
"payload": data
}
self.event_queue.put(event)

View File

@@ -28,6 +28,7 @@ from kazoo.protocol.states import EventType
from zuul import model
from zuul.lib.collections import DefaultKeyDict
from zuul.lib.logutil import get_annotated_logger
from zuul.zk import ZooKeeperSimpleBase, sharding
from zuul.zk.election import SessionAwareElection
@@ -853,7 +854,7 @@ class PipelineTriggerEventQueue(TriggerEventQueue):
class ConnectionEventQueue(ZooKeeperEventQueue):
"""Connection events via ZooKeeper"""
log = logging.getLogger("zuul.zk.event_queues.ConnectionEventQueue")
log = logging.getLogger("zuul.ConnectionEventQueue")
def __init__(self, client, connection_name):
queue_root = "/".join((CONNECTION_ROOT, connection_name, "events"))
@@ -875,8 +876,11 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
)
def put(self, data):
self.log.debug("Submitting connection event to queue %s: %s",
self.event_root, data)
log = self.log
if "zuul_event_id" in data:
log = get_annotated_logger(log, data["zuul_event_id"])
log.debug("Submitting connection event to queue %s: %s",
self.event_root, data)
self._put({'event_data': data})
def __iter__(self):