Merge "Improve test output by using named queues"

This commit is contained in:
Zuul 2021-03-24 22:46:21 +00:00 committed by Gerrit Code Review
commit 9bbbe24100
6 changed files with 49 additions and 28 deletions

View File

@ -28,7 +28,6 @@ import itertools
import json
import logging
import os
import queue
import random
import re
from collections import defaultdict, namedtuple
@ -86,6 +85,7 @@ from zuul.driver.gerrit import GerritDriver
from zuul.driver.github.githubconnection import GithubClientManager
from zuul.driver.elasticsearch import ElasticsearchDriver
from zuul.lib.connections import ConnectionRegistry
from zuul.lib.queue import NamedQueue
from psutil import Popen
import tests.fakegithub
@ -1158,7 +1158,7 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
super(FakeGerritConnection, self).__init__(driver, connection_name,
connection_config)
self.event_queue = queue.Queue()
self.event_queue = NamedQueue('FakeGerritConnectionEventQueue')
self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
self.change_number = 0
self.changes = changes_db
@ -4938,27 +4938,15 @@ class ZuulTestCase(BaseTestCase):
i = i + 1
if time.time() - start > self.wait_timeout:
self.log.error("Timeout waiting for Zuul to settle")
self.log.error("Queue status:")
for event_queue in self.__event_queues(matcher):
self.log.error(" %s: %s" %
(event_queue, event_queue.empty()))
self.log.error(
"All ZK event queues empty: %s",
self._logQueueStatus(
self.log.error, matcher,
self.__areZooKeeperEventQueuesEmpty(matcher),
self.__areAllMergeJobsWaiting(matcher),
self.__haveAllBuildsReported(matcher),
self.__areAllBuildsWaiting(matcher),
self.__areAllNodeRequestsComplete(matcher),
all(self.__eventQueuesEmpty(matcher))
)
self.log.error("All builds waiting: %s" %
(self.__areAllBuildsWaiting(matcher),))
self.log.error("All merge jobs waiting: %s" %
(self.__areAllMergeJobsWaiting(matcher),))
self.log.error("All builds reported: %s" %
(self.__haveAllBuildsReported(matcher),))
self.log.error("All requests completed: %s" %
(self.__areAllNodeRequestsComplete(matcher),))
self.log.error("All event queues empty: %s" %
(all(self.__eventQueuesEmpty(matcher)),))
for app in self.scheds.filter(matcher):
self.log.error("[Sched: %s] Merge client jobs: %s" %
(app.sched, app.sched.merger.jobs,))
raise Exception("Timeout waiting for Zuul to settle")
# Make sure no new events show up while we're checking
@ -4994,6 +4982,25 @@ class ZuulTestCase(BaseTestCase):
self.executor_server.lock.release()
self.scheds.execute(lambda app: app.sched.wake_event.wait(0.1))
def _logQueueStatus(self, logger, matcher, all_zk_queues_empty,
all_merge_jobs_waiting, all_builds_reported,
all_builds_waiting, all_node_requests_completed,
all_event_queues_empty):
logger("Queue status:")
for event_queue in self.__event_queues(matcher):
self.log.debug(" %s: %s", event_queue, event_queue.empty())
logger("All ZK event queues empty: %s", all_zk_queues_empty)
logger("All merge jobs waiting: %s", all_merge_jobs_waiting)
logger("All builds reported: %s", all_builds_reported)
logger("All builds waiting: %s", all_builds_waiting)
logger("All requests completed: %s", all_node_requests_completed)
logger("All event queues empty: %s", all_event_queues_empty)
for app in self.scheds.filter(matcher):
logger(
"[Sched: %s] Merge client jobs: %s",
app.sched, app.sched.merger.jobs
)
def waitForPoll(self, poller, timeout=30):
self.log.debug("Wait for poll on %s", poller)
self.poller_events[poller].clear()

View File

@ -21,7 +21,6 @@ import math
import logging
import paramiko
import pprint
import queue
import re
import re2
import requests
@ -42,6 +41,7 @@ from zuul.driver.gerrit.gcloudauth import GCloudAuth
from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
from zuul.driver.git.gitwatcher import GitWatcher
from zuul.lib.logutil import get_annotated_logger
from zuul.lib.queue import NamedQueue
from zuul.model import Ref, Tag, Branch, Project
# HTTP timeout in seconds
@ -527,7 +527,7 @@ class GerritConnection(BaseConnection):
self.watcher_thread = None
self.poller_thread = None
self.ref_watcher_thread = None
self.event_queue = queue.Queue()
self.event_queue = NamedQueue(f'GerritEventQueue<{connection_name}>')
self.client = None
self.watched_checkers = []
self.project_checker_map = {}

View File

@ -43,6 +43,7 @@ from github3.session import AppInstallationTokenAuth
from zuul.connection import CachedBranchConnection
from zuul.driver.github.graphql import GraphQLClient
from zuul.lib.gearworker import ZuulGearWorker
from zuul.lib.queue import NamedQueue
from zuul.web.handler import BaseWebController
from zuul.lib.logutil import get_annotated_logger
from zuul.model import Ref, Branch, Tag, Project
@ -713,7 +714,7 @@ class GithubEventConnector:
name='GithubEventForwarder', target=self.run_event_forwarder,
daemon=True)
self._thread_pool = concurrent.futures.ThreadPoolExecutor()
self._event_forward_queue = queue.Queue()
self._event_forward_queue = NamedQueue("GithubEventForwardQueue")
def stop(self):
self._stopped = True

View File

@ -18,7 +18,8 @@ import logging
import os
import socket
import threading
import queue
from zuul.lib.queue import NamedQueue
class CommandSocket(object):
@ -27,7 +28,7 @@ class CommandSocket(object):
def __init__(self, path):
self.running = False
self.path = path
self.queue = queue.Queue()
self.queue = NamedQueue('CommandSocketQueue')
def start(self):
self.running = True

View File

@ -14,6 +14,7 @@
# under the License.
import collections
import queue
import threading
@ -76,3 +77,14 @@ class MergedQueue(object):
while self.tasks:
self.join_condition.wait()
self.join_condition.release()
class NamedQueue(queue.Queue):
"""For identification of queues in logs"""
def __init__(self, name, maxsize=0):
super().__init__(maxsize)
self.name = name
def __repr__(self):
return f"<Queue {self.name} [{id(self)}]>"

View File

@ -19,7 +19,6 @@ import json
import logging
import os
import re
import queue
import socket
import sys
import threading
@ -37,6 +36,7 @@ from zuul.lib.ansible import AnsibleManager
from zuul.lib.config import get_default
from zuul.lib.gear_utils import getGearmanFunctions
from zuul.lib.logutil import get_annotated_logger
from zuul.lib.queue import NamedQueue
from zuul.lib.statsd import get_statsd, normalize_statsd_name
import zuul.lib.queue
import zuul.lib.repl
@ -147,7 +147,7 @@ class Scheduler(threading.Thread):
)
)
self.result_event_queue = queue.Queue()
self.result_event_queue = NamedQueue("ResultEventQueue")
self.global_watcher = GlobalEventWatcher(
self.zk_client, self.wake_event.set
)