Merge "Refactor `self.event_queues` in tests"

This commit is contained in:
Zuul 2020-04-03 19:27:58 +00:00 committed by Gerrit Code Review
commit 33fef37bc5
1 changed files with 26 additions and 16 deletions

View File

@ -30,7 +30,8 @@ import queue
import random
import re
from logging import Logger
from typing import Callable, Optional, Any, Iterable
from queue import Queue
from typing import Callable, Optional, Any, Iterable, Generator, List
import requests
import select
@ -3691,7 +3692,7 @@ class ZuulTestCase(BaseTestCase):
gerritsource.GerritSource.replication_retry_interval = 0.5
gerritconnection.GerritEventConnector.delay = 0.0
self.event_queues = []
self.additional_event_queues = []
self.poller_events = {}
self.configure_connections()
@ -3707,12 +3708,11 @@ class ZuulTestCase(BaseTestCase):
self.builds = self.executor_server.running_builds
self.scheds = SchedulerTestManager()
sched_app = self.scheds.create(
self.scheds.create(
self.log, self.config, self.zk_config, self.connections)
self.event_queues = sched_app.event_queues + self.event_queues
if hasattr(self, 'fake_github'):
self.event_queues.append(
self.additional_event_queues.append(
self.fake_github.github_event_connector._event_forward_queue)
self.merge_server = None
@ -3722,6 +3722,12 @@ class ZuulTestCase(BaseTestCase):
self.addCleanup(self.shutdown)
self.addCleanup(self.assertFinalState)
def __event_queues(self, matcher) -> List[Queue]:
sched_queues = map(lambda app: app.event_queues,
self.scheds.filter(matcher))
return [item for sublist in sched_queues for item in sublist] + \
self.additional_event_queues
def configure_connections(self, source_only=False):
# Set up gerrit related fakes
# Set a changes database so multiple FakeGerrit's can report back to
@ -3744,7 +3750,7 @@ class ZuulTestCase(BaseTestCase):
if con.web_server:
self.addCleanup(con.web_server.stop)
self.event_queues.append(con.event_queue)
self.additional_event_queues.append(con.event_queue)
setattr(self, 'fake_' + name, con)
return con
@ -3783,7 +3789,7 @@ class ZuulTestCase(BaseTestCase):
changes_db=db,
upstream_root=self.upstream_root,
git_url_with_auth=self.git_url_with_auth)
self.event_queues.append(con.event_queue)
self.additional_event_queues.append(con.event_queue)
setattr(self, 'fake_' + name, con)
registerGithubProjects(con)
return con
@ -3800,7 +3806,7 @@ class ZuulTestCase(BaseTestCase):
self.rpcclient,
changes_db=db,
upstream_root=self.upstream_root)
self.event_queues.append(con.event_queue)
self.additional_event_queues.append(con.event_queue)
setattr(self, 'fake_' + name, con)
return con
@ -3816,7 +3822,7 @@ class ZuulTestCase(BaseTestCase):
self.rpcclient,
changes_db=db,
upstream_root=self.upstream_root)
self.event_queues.append(con.event_queue)
self.additional_event_queues.append(con.event_queue)
setattr(self, 'fake_' + name, con)
return con
@ -4351,12 +4357,16 @@ class ZuulTestCase(BaseTestCase):
return False
return True
def eventQueuesEmpty(self):
for event_queue in self.event_queues:
def __eventQueuesEmpty(self, matcher)\
-> Generator[bool, None, None]:
for event_queue in self.__event_queues(matcher):
yield event_queue.empty()
def eventQueuesJoin(self):
for event_queue in self.event_queues:
def __eventQueuesJoin(self, matcher) -> None:
for app in self.scheds.filter(matcher):
for event_queue in app.event_queues:
event_queue.join()
for event_queue in self.additional_event_queues:
event_queue.join()
def waitUntilSettled(self, msg="", matcher=None) -> None:
@ -4366,7 +4376,7 @@ class ZuulTestCase(BaseTestCase):
if time.time() - start > self.wait_timeout:
self.log.error("Timeout waiting for Zuul to settle")
self.log.error("Queue status:")
for event_queue in self.event_queues:
for event_queue in self.__event_queues(matcher):
self.log.error(" %s: %s" %
(event_queue, event_queue.empty()))
self.log.error("All builds waiting: %s" %
@ -4386,14 +4396,14 @@ class ZuulTestCase(BaseTestCase):
if self.__haveAllBuildsReported(matcher):
# Join ensures that the queue is empty _and_ events have been
# processed
self.eventQueuesJoin()
self.__eventQueuesJoin(matcher)
self.scheds.execute(
lambda app: app.sched.run_handler_lock.acquire())
if (self.__areAllMergeJobsWaiting(matcher) and
self.__haveAllBuildsReported(matcher) and
self.__areAllBuildsWaiting(matcher) and
self.__areAllNodeRequestsComplete(matcher) and
all(self.eventQueuesEmpty())):
all(self.__eventQueuesEmpty(matcher))):
# The queue empty check is placed at the end to
# ensure that if a component adds an event between
# when locked the run handler and checked that the