Fix unknown job detection

If geard loses track of a job, this code is supposed to detect it,
but we've moved classes, so we can't use dunderscores anymore,
we need to change it to a single underscore.

This also corrects the mirror situation where the scheduler is
disconnected from the gearman server; that would eventually fall
through to the lost build handler, but since we get an unambiguous
disconnect event, let's restore the handling of that.

A test case for each of these is added.

Change-Id: I41a926c488717f8bc4cd8a173beaa9458e7cb6e2
This commit is contained in:
James E. Blair 2021-06-09 10:00:52 -07:00
parent 3a0bbd205c
commit 638b1e1090
2 changed files with 106 additions and 12 deletions

View File

@ -5890,6 +5890,85 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(A.reported, 1)
self.assertIn('RETRY_LIMIT', A.messages[0])
def test_executor_disconnect(self):
"Test that jobs are completed after an executor disconnect"
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
# Forcibly disconnect the executor from gearman
for connection in self.executor_server.executor_gearworker.\
gearman.active_connections:
connection.conn.shutdown(socket.SHUT_RDWR)
# Wake up the cleanup thread since it is on a 5 minute interval
self.scheds.first.sched.executor.cleanup_thread.wake_event.set()
# Find the build in the scheduler so we can check its status
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
builds = items[0].current_build_set.getBuilds()
build = builds[0]
# Wait for the build to be reported as lost
for x in iterate_timeout(30, 'lost build'):
if build.result == 'LOST':
break
# If we didn't timeout, then it worked; we're done
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
# LOST builds aren't recorded in the test history; instead the
# original build will be reported as success since it
# continued after our fake disconnect. However, the fact that
# it's the *only* build, and the other two jobs are not
# present is extra confirmation that the scheduler saw the
# build as lost.
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
])
def test_scheduler_disconnect(self):
"Test that jobs are completed after a scheduler disconnect"
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
# Forcibly disconnect the scheduler from gearman
for connection in self.scheds.first.sched.executor.gearman.\
active_connections:
connection.conn.shutdown(socket.SHUT_RDWR)
# Find the build in the scheduler so we can check its status
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
builds = items[0].current_build_set.getBuilds()
build = builds[0]
# Wait for the build to be reported as lost
for x in iterate_timeout(30, 'lost build'):
if build.result == 'RETRY':
break
# If we didn't timeout, then it worked; we're done
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
# There's an extra merge build due to the retry
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
], ordered=False)
def test_zookeeper_disconnect(self):
"Test that jobs are executed after a zookeeper disconnect"

View File

@ -107,12 +107,12 @@ class ZuulGearmanClient(gear.Client):
def handleStatusRes(self, packet):
try:
job = super(ZuulGearmanClient, self).handleStatusRes(packet)
super(ZuulGearmanClient, self).handleStatusRes(packet)
except gear.UnknownJobError:
handle = packet.getArgument(0)
for build in self.__zuul_gearman.builds.values():
if build.__gearman_job.handle == handle:
self.__zuul_gearman.onUnknownJob(job)
if build._gearman_job.handle == handle:
self.__zuul_gearman.onUnknownJob(build)
class ExecutorClient(object):
@ -224,7 +224,7 @@ class ExecutorClient(object):
gearman_job = gear.TextJob(
function_name, json_dumps(params), unique=uuid)
build.__gearman_job = gearman_job
build._gearman_job = gearman_job
build.__gearman_worker = None
if pipeline.precedence == PRECEDENCE_NORMAL:
@ -262,7 +262,7 @@ class ExecutorClient(object):
build.canceled = True
try:
job = build.__gearman_job # noqa
job = build._gearman_job # noqa
except AttributeError:
log.debug("Build has no associated gearman job")
return False
@ -311,14 +311,29 @@ class ExecutorClient(object):
self.log.info("Gearman job %s lost due to disconnect" % job)
self.onBuildCompleted(job, 'DISCONNECT')
def onUnknownJob(self, job):
self.log.info("Gearman job %s lost due to unknown handle" % job)
self.onBuildCompleted(job, 'LOST')
build = self.builds.get(job.unique)
if build:
tenant_name = build.build_set.item.pipeline.tenant.name
pipeline_name = build.build_set.item.pipeline.name
result = {"result": "DISCONNECT", "end_time": time.time()}
event = BuildCompletedEvent(build.uuid, result)
self.result_events[tenant_name][pipeline_name].put(event)
def onUnknownJob(self, build):
self.log.info("Gearman job for build %s lost "
"due to unknown handle" % build)
# We don't need to call onBuildCompleted, because by
# definition, we have no record of the job.
tenant_name = build.build_set.item.pipeline.tenant.name
pipeline_name = build.build_set.item.pipeline.name
result = {"result": "LOST", "end_time": time.time()}
event = BuildCompletedEvent(build.uuid, result)
self.result_events[tenant_name][pipeline_name].put(event)
def cancelJobInQueue(self, build):
log = get_annotated_logger(self.log, build.zuul_event_id,
build=build.uuid)
job = build.__gearman_job
job = build._gearman_job
req = gear.CancelJobAdminRequest(job.handle)
job.connection.sendAdminRequest(req, timeout=300)
@ -341,7 +356,7 @@ class ExecutorClient(object):
if not build.__gearman_worker:
log.error("Build %s has no manager while canceling", build)
stop_uuid = str(uuid4().hex)
data = dict(uuid=build.__gearman_job.unique,
data = dict(uuid=build._gearman_job.unique,
zuul_event_id=build.zuul_event_id)
stop_job = gear.TextJob("executor:stop:%s" % build.__gearman_worker,
json_dumps(data), unique=stop_uuid)
@ -356,7 +371,7 @@ class ExecutorClient(object):
if not build.__gearman_worker:
log.error("Build %s has no manager while resuming", build)
resume_uuid = str(uuid4().hex)
data = dict(uuid=build.__gearman_job.unique,
data = dict(uuid=build._gearman_job.unique,
zuul_event_id=build.zuul_event_id)
stop_job = gear.TextJob("executor:resume:%s" % build.__gearman_worker,
json_dumps(data), unique=resume_uuid)
@ -373,7 +388,7 @@ class ExecutorClient(object):
if build.result:
# The build has finished, it will be removed
continue
job = build.__gearman_job
job = build._gearman_job
if not job.handle:
# The build hasn't been enqueued yet
continue