Refactor executor_client in tests

Replace `self.executor_client` with `scheduler.executor`.

This change only touches tests.

Change-Id: I22b01f2eff881e18633e5bab1ec390f3b5367a4d
Story: 2007192
This commit is contained in:
Jan Kubovy 2020-02-25 10:20:43 +01:00
parent 7df5508dbd
commit bc68c48e5b
3 changed files with 86 additions and 81 deletions

View File

@ -3715,7 +3715,6 @@ class ZuulTestCase(BaseTestCase):
self.event_queues.append(
self.fake_github.github_event_connector._event_forward_queue)
self.executor_client = sched_app.sched.executor
self.merge_client = sched_app.sched.merger
self.merge_server = None
self.nodepool = sched_app.sched.nodepool
@ -4113,7 +4112,7 @@ class ZuulTestCase(BaseTestCase):
self.log.debug("Shutting down after tests")
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.executor_client.stop()
self.scheds.execute(lambda app: app.sched.executor.stop())
self.merge_client.stop()
if self.merge_server:
self.merge_server.stop()
@ -4249,76 +4248,80 @@ class ZuulTestCase(BaseTestCase):
parameters = json.loads(job.arguments)
return parameters[name]
def haveAllBuildsReported(self):
# See if Zuul is waiting on a meta job to complete
if self.executor_client.meta_jobs:
return False
# Find out if every build that the worker has completed has been
# reported back to Zuul. If it hasn't then that means a Gearman
# event is still in transit and the system is not stable.
for build in self.history:
zbuild = self.executor_client.builds.get(build.uuid)
if not zbuild:
# It has already been reported
continue
# It hasn't been reported yet.
return False
# Make sure that none of the worker connections are in GRAB_WAIT
worker = self.executor_server.executor_gearworker.gearman
for connection in worker.active_connections:
if connection.state == 'GRAB_WAIT':
def __haveAllBuildsReported(self, matcher) -> bool:
for app in self.scheds.filter(matcher):
executor_client = app.sched.executor
# See if Zuul is waiting on a meta job to complete
if executor_client.meta_jobs:
return False
# Find out if every build that the worker has completed has been
# reported back to Zuul. If it hasn't then that means a Gearman
# event is still in transit and the system is not stable.
for build in self.history:
zbuild = executor_client.builds.get(build.uuid)
if not zbuild:
# It has already been reported
continue
# It hasn't been reported yet.
return False
# Make sure that none of the worker connections are in GRAB_WAIT
worker = self.executor_server.executor_gearworker.gearman
for connection in worker.active_connections:
if connection.state == 'GRAB_WAIT':
return False
return True
def areAllBuildsWaiting(self):
builds = self.executor_client.builds.values()
seen_builds = set()
for build in builds:
seen_builds.add(build.uuid)
client_job = None
for conn in self.executor_client.gearman.active_connections:
for j in conn.related_jobs.values():
if j.unique == build.uuid:
client_job = j
break
if not client_job:
self.log.debug("%s is not known to the gearman client" %
build)
return False
if not client_job.handle:
self.log.debug("%s has no handle" % client_job)
return False
server_job = self.gearman_server.jobs.get(client_job.handle)
if not server_job:
self.log.debug("%s is not known to the gearman server" %
client_job)
return False
if not hasattr(server_job, 'waiting'):
self.log.debug("%s is being enqueued" % server_job)
return False
if server_job.waiting:
continue
if build.url is None:
self.log.debug("%s has not reported start" % build)
return False
# using internal ServerJob which offers no Text interface
worker_build = self.executor_server.job_builds.get(
server_job.unique.decode('utf8'))
if worker_build:
if build.paused:
def __areAllBuildsWaiting(self, matcher) -> bool:
for app in self.scheds.filter(matcher):
executor_client = app.sched.executor
builds = executor_client.builds.values()
seen_builds = set()
for build in builds:
seen_builds.add(build.uuid)
client_job = None
for conn in executor_client.gearman.active_connections:
for j in conn.related_jobs.values():
if j.unique == build.uuid:
client_job = j
break
if not client_job:
self.log.debug("%s is not known to the gearman client" %
build)
return False
if not client_job.handle:
self.log.debug("%s has no handle" % client_job)
return False
server_job = self.gearman_server.jobs.get(client_job.handle)
if not server_job:
self.log.debug("%s is not known to the gearman server" %
client_job)
return False
if not hasattr(server_job, 'waiting'):
self.log.debug("%s is being enqueued" % server_job)
return False
if server_job.waiting:
continue
if worker_build.isWaiting():
continue
self.log.debug("%s is running" % worker_build)
return False
else:
self.log.debug("%s is unassigned" % server_job)
return False
for (build_uuid, job_worker) in \
self.executor_server.job_workers.items():
if build_uuid not in seen_builds:
self.log.debug("%s is not finalized" % build_uuid)
return False
if build.url is None:
self.log.debug("%s has not reported start" % build)
return False
# using internal ServerJob which offers no Text interface
worker_build = self.executor_server.job_builds.get(
server_job.unique.decode('utf8'))
if worker_build:
if build.paused:
continue
if worker_build.isWaiting():
continue
self.log.debug("%s is running" % worker_build)
return False
else:
self.log.debug("%s is unassigned" % server_job)
return False
for (build_uuid, job_worker) in \
self.executor_server.job_workers.items():
if build_uuid not in seen_builds:
self.log.debug("%s is not finalized" % build_uuid)
return False
return True
def areAllNodeRequestsComplete(self):
@ -4356,7 +4359,7 @@ class ZuulTestCase(BaseTestCase):
for event_queue in self.event_queues:
event_queue.join()
def waitUntilSettled(self, msg=""):
def waitUntilSettled(self, msg="", matcher=None) -> None:
self.log.debug("Waiting until settled... (%s)", msg)
start = time.time()
while True:
@ -4367,9 +4370,9 @@ class ZuulTestCase(BaseTestCase):
self.log.error(" %s: %s" %
(event_queue, event_queue.empty()))
self.log.error("All builds waiting: %s" %
(self.areAllBuildsWaiting(),))
(self.__areAllBuildsWaiting(matcher),))
self.log.error("All builds reported: %s" %
(self.haveAllBuildsReported(),))
(self.__haveAllBuildsReported(matcher),))
self.log.error("All requests completed: %s" %
(self.areAllNodeRequestsComplete(),))
self.log.error("Merge client jobs: %s" %
@ -4379,15 +4382,15 @@ class ZuulTestCase(BaseTestCase):
self.executor_server.lock.acquire()
# have all build states propogated to zuul?
if self.haveAllBuildsReported():
if self.__haveAllBuildsReported(matcher):
# Join ensures that the queue is empty _and_ events have been
# processed
self.eventQueuesJoin()
self.scheds.execute(
lambda app: app.sched.run_handler_lock.acquire())
if (self.areAllMergeJobsWaiting() and
self.haveAllBuildsReported() and
self.areAllBuildsWaiting() and
self.__haveAllBuildsReported(matcher) and
self.__areAllBuildsWaiting(matcher) and
self.areAllNodeRequestsComplete() and
all(self.eventQueuesEmpty())):
# The queue empty check is placed at the end to

View File

@ -70,9 +70,11 @@ class TestInventoryBase(ZuulTestCase):
self.waitUntilSettled()
def cancelExecutorJobs(self):
builds = [b for b in self.executor_client.builds.values()]
for build in builds:
self.executor_client.cancelJobInQueue(build)
for app in self.scheds:
executor_client = app.sched.executor
builds = [b for b in executor_client.builds.values()]
for build in builds:
executor_client.cancelJobInQueue(build)
class TestInventoryGithub(TestInventoryBase):

View File

@ -5275,17 +5275,17 @@ class TestScheduler(ZuulTestCase):
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
self.assertEqual(len(self.executor_client.builds), 1)
self.assertEqual(len(self.scheds.first.sched.executor.builds), 1)
self.log.debug('Current builds:')
self.log.debug(self.executor_client.builds)
self.log.debug(self.scheds.first.sched.executor.builds)
start = time.time()
while True:
if time.time() - start > 10:
raise Exception("Timeout waiting for gearman server to report "
+ "back to the client")
build = list(self.executor_client.builds.values())[0]
build = list(self.scheds.first.sched.executor.builds.values())[0]
if build.worker.name == self.executor_server.hostname:
break
else:
@ -5470,7 +5470,7 @@ For CI problems and help debugging, contact ci@example.org"""
if time.time() - start > 10:
raise Exception("Timeout waiting for gearman server to report "
+ "back to the client")
build = list(self.executor_client.builds.values())[0]
build = list(self.scheds.first.sched.executor.builds.values())[0]
if build.worker.name == self.executor_server.hostname:
break
else: