Don't wait for task in submit_task

The exception shifting performed by the Task class is meant to
protect the TaskManager run method from encountering any exceptions
done in the course of running a task.  This is especially important
for subclasses of TaskManager which implement alternate run
strategies.  The exception is shifted so that it is raised by the
Task.wait method.  In a multi-thread environment, it is the caller
of the submit_task method which should receive the exception, and
therefore it is submit_task which should call Task.wait.  Currently
it is the _run_task method which calls Task.wait, which means the
TaskManager itself receives the exception.

Change-Id: I3a6e61601164811fdd255ae54470011768c99a7d
This commit is contained in:
James E. Blair 2018-08-03 13:50:55 -07:00 committed by Monty Taylor
parent 2b864f71ab
commit 75eaaab0b8
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
2 changed files with 108 additions and 2 deletions

View File

@ -121,8 +121,21 @@ class TaskManager(object):
:param task: The task to execute.
:param bool raw: If True, return the raw result as received from the
underlying client call.
This method calls task.wait() so that it only returns when the
task is complete.
"""
return self.run_task(task=task)
if task.run_async:
# Async tasks run the wait lower in the stack because the wait
# is just returning the concurrent Future object. That future
# object handles the exception shifting across threads.
return self.run_task(task=task)
else:
# It's important that we call task.wait() here, rather than in
# the run_task call stack below here, since subclasses may
# cause run_task to be called from a different thread.
self.run_task(task=task)
return task.wait()
def submit_function(self, method, name=None, *args, **kwargs):
""" Allows submitting an arbitrary method for work.
@ -164,9 +177,14 @@ class TaskManager(object):
def _run_task_async(self, task):
self._log.debug(
"Manager %s submitting task %s", self.name, task.name)
return self.executor.submit(self._run_task, task)
return self.executor.submit(self._run_task_wait, task)
def _run_task(self, task):
# Never call task.wait() in the run_task call stack because we
# might be running in another thread. The exception-shifting
# code is designed so that caller of submit_task (which may be
# in a different thread than this run_task) gets the
# exception.
self.pre_run_task(task)
start = time.time()
task.run()
@ -174,6 +192,12 @@ class TaskManager(object):
dt = end - start
self.post_run_task(dt, task)
def _run_task_wait(self, task):
# For async tasks, the action being performed is getting a
# future back from concurrent.futures.ThreadPoolExecutor.
# We do need to run the wait because the Future object is
# handling the exception shifting for us.
self._run_task(task)
return task.wait()

View File

@ -14,7 +14,10 @@
import concurrent.futures
import fixtures
import mock
import queue
import threading
from openstack import task_manager
from openstack.tests.unit import base
@ -106,3 +109,82 @@ class TestTaskManager(base.TestCase):
def test_async(self, mock_submit):
self.manager.submit_task(TaskTestAsync())
self.assertTrue(mock_submit.called)
class ThreadingTaskManager(task_manager.TaskManager):
"""A subclass of TaskManager which exercises the thread-shifting
exception handling behavior."""
def __init__(self, *args, **kw):
super(ThreadingTaskManager, self).__init__(
*args, **kw)
self.queue = queue.Queue()
self._running = True
self._thread = threading.Thread(name=self.name, target=self.run)
self._thread.daemon = True
self.failed = False
def start(self):
self._thread.start()
def stop(self):
self._running = False
self.queue.put(None)
def join(self):
self._thread.join()
def run(self):
# No exception should ever cause this method to hit its
# exception handler.
try:
while True:
task = self.queue.get()
if not task:
if not self._running:
break
continue
self.run_task(task)
self.queue.task_done()
except Exception:
self.failed = True
raise
def submit_task(self, task, raw=False):
# An important part of the exception-shifting feature is that
# this method should raise the exception.
self.queue.put(task)
return task.wait()
class ThreadingTaskManagerFixture(fixtures.Fixture):
def _setUp(self):
self.manager = ThreadingTaskManager(name='threading test')
self.manager.start()
self.addCleanup(self._cleanup)
def _cleanup(self):
self.manager.stop()
self.manager.join()
class TestThreadingTaskManager(base.TestCase):
def setUp(self):
super(TestThreadingTaskManager, self).setUp()
f = self.useFixture(ThreadingTaskManagerFixture())
self.manager = f.manager
def test_wait_re_raise(self):
"""Test that Exceptions thrown in a Task is reraised correctly
This test is aimed to six.reraise(), called in Task::wait().
Specifically, we test if we get the same behaviour with all the
configured interpreters (e.g. py27, p35, ...)
"""
self.assertRaises(TestException, self.manager.submit_task, TaskTest())
# Stop the manager and join the run thread to ensure the
# exception handler has run.
self.manager.stop()
self.manager.join()
self.assertFalse(self.manager.failed)