From 7a04df263cf75b35e558cb11e08ab811192c944a Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 17 Oct 2017 08:44:52 -0700 Subject: [PATCH] Several executor threading fixes * Add a try/except handler to the governor thread * Add names to all executor threads * Show thread names in the stack dump handler Change-Id: I7551a901ffba175224d417f653391c4568f9975b --- tests/base.py | 2 +- tests/unit/test_stack_dump.py | 1 + zuul/cmd/__init__.py | 11 ++++++++++- zuul/executor/server.py | 27 ++++++++++++++++++--------- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/tests/base.py b/tests/base.py index 797bfe671c..a02ee5ab54 100755 --- a/tests/base.py +++ b/tests/base.py @@ -2384,7 +2384,7 @@ class ZuulTestCase(BaseTestCase): # before noticing they should exit, but they should exit on their own. # Further the pydevd threads also need to be whitelisted so debugging # e.g. in PyCharm is possible without breaking shutdown. - whitelist = ['executor-watchdog', + whitelist = ['watchdog', 'pydevd.CommandThread', 'pydevd.Reader', 'pydevd.Writer', diff --git a/tests/unit/test_stack_dump.py b/tests/unit/test_stack_dump.py index 824e04c533..13e83c634e 100644 --- a/tests/unit/test_stack_dump.py +++ b/tests/unit/test_stack_dump.py @@ -31,4 +31,5 @@ class TestStackDump(testtools.TestCase): zuul.cmd.stack_dump_handler(signal.SIGUSR2, None) self.assertIn("Thread", self.log_fixture.output) + self.assertIn("MainThread", self.log_fixture.output) self.assertIn("test_stack_dump_logs", self.log_fixture.output) diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py index 1870890def..86f7f12f51 100755 --- a/zuul/cmd/__init__.py +++ b/zuul/cmd/__init__.py @@ -23,6 +23,7 @@ import os import signal import sys import traceback +import threading yappi = extras.try_import('yappi') objgraph = extras.try_import('objgraph') @@ -41,9 +42,17 @@ def stack_dump_handler(signum, frame): log = logging.getLogger("zuul.stack_dump") log.debug("Beginning debug handler") try: + threads = {} + for t in threading.enumerate(): + threads[t.ident] = t log_str = "" for thread_id, stack_frame in sys._current_frames().items(): - log_str += "Thread: %s\n" % thread_id + thread = threads.get(thread_id) + if thread: + thread_name = thread.name + else: + thread_name = thread.ident + log_str += "Thread: %s %s\n" % (thread_id, thread_name) log_str += "".join(traceback.format_stack(stack_frame)) log.debug(log_str) except Exception: diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 5998922e63..6af538c4d9 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -95,7 +95,7 @@ class DiskAccountant(object): if cache_dir == jobs_base: raise Exception("Cache dir and jobs dir cannot be the same") self.thread = threading.Thread(target=self._run, - name='executor-diskaccountant') + name='diskaccountant') self.thread.daemon = True self._running = False self.jobs_base = jobs_base @@ -154,7 +154,7 @@ class Watchdog(object): self.function = function self.args = args self.thread = threading.Thread(target=self._run, - name='executor-watchdog') + name='watchdog') self.thread.daemon = True self.timed_out = None @@ -556,7 +556,8 @@ class AnsibleJob(object): def run(self): self.running = True - self.thread = threading.Thread(target=self.execute) + self.thread = threading.Thread(target=self.execute, + name='build-%s' % self.job.unique) self.thread.start() def stop(self, reason=None): @@ -1645,22 +1646,27 @@ class ExecutorServer(object): self.log.debug("Starting command processor") self.command_socket.start() - self.command_thread = threading.Thread(target=self.runCommand) + self.command_thread = threading.Thread(target=self.runCommand, + name='command') self.command_thread.daemon = True self.command_thread.start() self.log.debug("Starting worker") - self.update_thread = threading.Thread(target=self._updateLoop) + self.update_thread = threading.Thread(target=self._updateLoop, + name='update') self.update_thread.daemon = True self.update_thread.start() - self.merger_thread = threading.Thread(target=self.run_merger) + self.merger_thread = threading.Thread(target=self.run_merger, + name='merger') self.merger_thread.daemon = True self.merger_thread.start() - self.executor_thread = threading.Thread(target=self.run_executor) + self.executor_thread = threading.Thread(target=self.run_executor, + name='executor') self.executor_thread.daemon = True self.executor_thread.start() self.governor_stop_event = threading.Event() - self.governor_thread = threading.Thread(target=self.run_governor) + self.governor_thread = threading.Thread(target=self.run_governor, + name='governor') self.governor_thread.daemon = True self.governor_thread.start() self.disk_accountant.start() @@ -1869,7 +1875,10 @@ class ExecutorServer(object): def run_governor(self): while not self.governor_stop_event.wait(30): - self.manageLoad() + try: + self.manageLoad() + except Exception: + self.log.exception("Exception in governor thread:") def manageLoad(self): ''' Apply some heuristics to decide whether or not we should