Stop jobs when executor stops

If the executor stops while jobs are running, those jobs are not
explicitly aborted.  In production, the process exit would cause
all of the jobs to terminate and the gearman disconnection would
report a failure, however, in tests the python process may continue
and the ansible threads would essentially leak into a subsequent
test.  This is especially likely to happen if a test holds jobs in
build, and then fails while those jobs are still held.  Those threads
will continue to wait to be released while further tests continue
to run.  Because all tests assert that git.Repo objects are not
leaked, the outstanding reference that the leaked threads have
to a git.Repo object trips that assertion and all subsequent tests
in the same test runner fail.

This adds code to the executor shutdown to stop all jobs at the start
of the shutdown process.  It also adds a test which shuts down the
executor while jobs are held and asserts that after shutdown, those
threads are stopped, and no git repo objects are leaked.

Change-Id: I9d73775a13c289ef922c27b29162efcfca3950a9
This commit is contained in:
James E. Blair 2017-04-18 10:35:48 -07:00
parent 267e516cf4
commit a002b035ee
3 changed files with 63 additions and 5 deletions

View File

@ -769,6 +769,11 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
build.release()
super(RecordingExecutorServer, self).stopJob(job)
def stop(self):
for build in self.running_builds:
build.release()
super(RecordingExecutorServer, self).stop()
class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
def doMergeChanges(self, items):
@ -1398,15 +1403,14 @@ class ZuulTestCase(BaseTestCase):
self.webapp.start()
self.rpc.start()
self.executor_client.gearman.waitForServer()
# Cleanups are run in reverse order
self.addCleanup(self.assertCleanShutdown)
self.addCleanup(self.shutdown)
self.addCleanup(self.assertFinalState)
self.sched.reconfigure(self.config)
self.sched.resume()
def tearDown(self):
super(ZuulTestCase, self).tearDown()
self.assertFinalState()
def configure_connections(self):
# Set up gerrit related fakes
# Set a changes database so multiple FakeGerrit's can report back to
@ -1545,6 +1549,9 @@ class ZuulTestCase(BaseTestCase):
self.assertEqual(test_key, f.read())
def assertFinalState(self):
self.log.debug("Assert final state")
# Make sure no jobs are running
self.assertEqual({}, self.executor_server.job_workers)
# Make sure that git.Repo objects have been garbage collected.
repos = []
gc.collect()
@ -1585,6 +1592,9 @@ class ZuulTestCase(BaseTestCase):
self.log.error("More than one thread is running: %s" % threads)
self.printHistory()
def assertCleanShutdown(self):
pass
def init_repo(self, project):
parts = project.split('/')
path = os.path.join(self.upstream_root, *parts[:-1])
@ -1675,7 +1685,9 @@ class ZuulTestCase(BaseTestCase):
def areAllBuildsWaiting(self):
builds = self.executor_client.builds.values()
seen_builds = set()
for build in builds:
seen_builds.add(build.uuid)
client_job = None
for conn in self.executor_client.gearman.active_connections:
for j in conn.related_jobs.values():
@ -1713,6 +1725,11 @@ class ZuulTestCase(BaseTestCase):
else:
self.log.debug("%s is unassigned" % server_job)
return False
for (build_uuid, job_worker) in \
self.executor_server.job_workers.items():
if build_uuid not in seen_builds:
self.log.debug("%s is not finalized" % build_uuid)
return False
return True
def areAllNodeRequestsComplete(self):

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import gc
import json
import textwrap
@ -4638,6 +4639,39 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertIn('project-test2 : SKIPPED', A.messages[1])
class TestExecutor(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'
def assertFinalState(self):
# In this test, we expect to shut down in a non-final state,
# so skip these checks.
pass
def assertCleanShutdown(self):
self.log.debug("Assert clean shutdown")
# After shutdown, make sure no jobs are running
self.assertEqual({}, self.executor_server.job_workers)
# Make sure that git.Repo objects have been garbage collected.
repos = []
gc.collect()
for obj in gc.get_objects():
if isinstance(obj, git.Repo):
self.log.debug("Leaked git repo object: %s" % repr(obj))
repos.append(obj)
self.assertEqual(len(repos), 0)
def test_executor_shutdown(self):
"Test that the executor can shut down with jobs running"
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('code-review', 2)
self.fake_gerrit.addEvent(A.addApproval('approved', 1))
self.waitUntilSettled()
class TestDependencyGraph(ZuulTestCase):
tenant_config_file = 'config/dependency-graph/main.yaml'

View File

@ -344,10 +344,17 @@ class ExecutorServer(object):
def stop(self):
self.log.debug("Stopping")
self._running = False
self.worker.shutdown()
self._command_running = False
self.command_socket.stop()
self.update_queue.put(None)
for job_worker in self.job_workers.values():
try:
job_worker.stop()
except Exception:
self.log.exception("Exception sending stop command "
"to worker:")
self.worker.shutdown()
self.log.debug("Stopped")
def pause(self):