Browse Source

Several executor threading fixes

* Add a try/except handler to the governor thread
* Add names to all executor threads
* Show thread names in the stack dump handler

Change-Id: I7551a901ffba175224d417f653391c4568f9975b
changes/73/512673/2
James E. Blair 3 years ago
parent
commit
7a04df263c
4 changed files with 30 additions and 11 deletions
  1. +1
    -1
      tests/base.py
  2. +1
    -0
      tests/unit/test_stack_dump.py
  3. +10
    -1
      zuul/cmd/__init__.py
  4. +18
    -9
      zuul/executor/server.py

+ 1
- 1
tests/base.py View File

@ -2384,7 +2384,7 @@ class ZuulTestCase(BaseTestCase):
# before noticing they should exit, but they should exit on their own.
# Further the pydevd threads also need to be whitelisted so debugging
# e.g. in PyCharm is possible without breaking shutdown.
whitelist = ['executor-watchdog',
whitelist = ['watchdog',
'pydevd.CommandThread',
'pydevd.Reader',
'pydevd.Writer',


+ 1
- 0
tests/unit/test_stack_dump.py View File

@ -31,4 +31,5 @@ class TestStackDump(testtools.TestCase):
zuul.cmd.stack_dump_handler(signal.SIGUSR2, None)
self.assertIn("Thread", self.log_fixture.output)
self.assertIn("MainThread", self.log_fixture.output)
self.assertIn("test_stack_dump_logs", self.log_fixture.output)

+ 10
- 1
zuul/cmd/__init__.py View File

@ -23,6 +23,7 @@ import os
import signal
import sys
import traceback
import threading
yappi = extras.try_import('yappi')
objgraph = extras.try_import('objgraph')
@ -41,9 +42,17 @@ def stack_dump_handler(signum, frame):
log = logging.getLogger("zuul.stack_dump")
log.debug("Beginning debug handler")
try:
threads = {}
for t in threading.enumerate():
threads[t.ident] = t
log_str = ""
for thread_id, stack_frame in sys._current_frames().items():
log_str += "Thread: %s\n" % thread_id
thread = threads.get(thread_id)
if thread:
thread_name = thread.name
else:
thread_name = thread.ident
log_str += "Thread: %s %s\n" % (thread_id, thread_name)
log_str += "".join(traceback.format_stack(stack_frame))
log.debug(log_str)
except Exception:


+ 18
- 9
zuul/executor/server.py View File

@ -95,7 +95,7 @@ class DiskAccountant(object):
if cache_dir == jobs_base:
raise Exception("Cache dir and jobs dir cannot be the same")
self.thread = threading.Thread(target=self._run,
name='executor-diskaccountant')
name='diskaccountant')
self.thread.daemon = True
self._running = False
self.jobs_base = jobs_base
@ -154,7 +154,7 @@ class Watchdog(object):
self.function = function
self.args = args
self.thread = threading.Thread(target=self._run,
name='executor-watchdog')
name='watchdog')
self.thread.daemon = True
self.timed_out = None
@ -556,7 +556,8 @@ class AnsibleJob(object):
def run(self):
self.running = True
self.thread = threading.Thread(target=self.execute)
self.thread = threading.Thread(target=self.execute,
name='build-%s' % self.job.unique)
self.thread.start()
def stop(self, reason=None):
@ -1645,22 +1646,27 @@ class ExecutorServer(object):
self.log.debug("Starting command processor")
self.command_socket.start()
self.command_thread = threading.Thread(target=self.runCommand)
self.command_thread = threading.Thread(target=self.runCommand,
name='command')
self.command_thread.daemon = True
self.command_thread.start()
self.log.debug("Starting worker")
self.update_thread = threading.Thread(target=self._updateLoop)
self.update_thread = threading.Thread(target=self._updateLoop,
name='update')
self.update_thread.daemon = True
self.update_thread.start()
self.merger_thread = threading.Thread(target=self.run_merger)
self.merger_thread = threading.Thread(target=self.run_merger,
name='merger')
self.merger_thread.daemon = True
self.merger_thread.start()
self.executor_thread = threading.Thread(target=self.run_executor)
self.executor_thread = threading.Thread(target=self.run_executor,
name='executor')
self.executor_thread.daemon = True
self.executor_thread.start()
self.governor_stop_event = threading.Event()
self.governor_thread = threading.Thread(target=self.run_governor)
self.governor_thread = threading.Thread(target=self.run_governor,
name='governor')
self.governor_thread.daemon = True
self.governor_thread.start()
self.disk_accountant.start()
@ -1869,7 +1875,10 @@ class ExecutorServer(object):
def run_governor(self):
while not self.governor_stop_event.wait(30):
self.manageLoad()
try:
self.manageLoad()
except Exception:
self.log.exception("Exception in governor thread:")
def manageLoad(self):
''' Apply some heuristics to decide whether or not we should


Loading…
Cancel
Save