Call pre/post run task calls from TaskManager.submit_task()

Since I33325fb5be21264df0a68ceef2202ab7875f63ec, the task.run() call
in TaskManager.run_task() is now an asynchronous submission to the
threadpool, rather than a synchronous call to the task's actual
function.  This means taking the elapsed_time around this call is no
longer an indication of the task's runtime, but always comes out as
just a few μs for the insertion.

Move the pre and post calls into TaskManager.submit_task() where the
elapsed_time will reflect the time between insertion into the queue
and the wait() return of its result.

Update documentation for pre/post tasks, and add test-cases.

Depends-On: https://review.openstack.org/613438
Change-Id: I8617ab2895d1544a6902ae5a3d6a97b87bfd2ec9
This commit is contained in:
Ian Wienand 2018-10-26 14:47:44 +11:00 committed by Monty Taylor
parent 448cda91e3
commit dd5f0f6827
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
3 changed files with 54 additions and 6 deletions

View File

@ -167,8 +167,14 @@ class TaskManager(object):
raise exceptions.TaskManagerStopped(
"TaskManager {name} is no longer running".format(
name=self.name))
self.pre_run_task(task)
start = time.time()
self.queue.put(task)
return task.wait()
ret = task.wait()
end = time.time()
dt = end - start
self.post_run_task(dt, task)
return ret
def submit_function(
self, method, name=None, run_async=False, *args, **kwargs):
@ -204,6 +210,13 @@ class TaskManager(object):
method, name=name, run_async=True, *args, **kwargs)
def pre_run_task(self, task):
'''Callback when task enters the task queue
:param task: the task
Intended to be overridden by child classes to track task
progress.
'''
self._log.debug(
"Manager %s running task %s", self.name, task.name)
@ -213,14 +226,21 @@ class TaskManager(object):
# code is designed so that caller of submit_task (which may be
# in a different thread than this run_task) gets the
# exception.
self.pre_run_task(task)
start = time.time()
#
# Note all threads go through the threadpool, so this is an
# async call. submit_task will wait() for the final result.
task.run()
end = time.time()
dt = end - start
self.post_run_task(dt, task)
def post_run_task(self, elapsed_time, task):
'''Callback at task completion
:param float elapsed_time: time in seconds between task entering
queue and finishing
:param task: the task
This function is intended to be overridden by child classes to
monitor task runtimes.
'''
self._log.debug(
"Manager %s ran task %s in %ss",
self.name, task.name, elapsed_time)

View File

@ -17,6 +17,7 @@ import concurrent.futures
import fixtures
import mock
import threading
import time
from six.moves import queue
@ -105,6 +106,28 @@ class TestTaskManager(base.TestCase):
self.manager.submit_function(set, run_async=True)
self.assertTrue(mock_submit.called)
@mock.patch.object(task_manager.TaskManager, 'post_run_task')
@mock.patch.object(task_manager.TaskManager, 'pre_run_task')
def test_pre_post_calls(self, mock_pre, mock_post):
self.manager.submit_function(lambda: None)
mock_pre.assert_called_once()
mock_post.assert_called_once()
@mock.patch.object(task_manager.TaskManager, 'post_run_task')
@mock.patch.object(task_manager.TaskManager, 'pre_run_task')
def test_validate_timing(self, mock_pre, mock_post):
# Note the unit test setup has mocked out time.sleep() and
# done a * 0.0001, and the test should be under the 5
# second timeout. Thus with below, we should see at
# *least* a 1 second pause running the task.
self.manager.submit_function(lambda: time.sleep(10000))
mock_pre.assert_called_once()
mock_post.assert_called_once()
args, kwargs = mock_post.call_args_list[0]
self.assertTrue(args[0] > 1.0)
class ThreadingTaskManager(task_manager.TaskManager):
"""A subclass of TaskManager which exercises the thread-shifting

View File

@ -0,0 +1,5 @@
---
fixes:
- |
Fix a regression where the ``TaskManager.post_run_task`` ``elapsed_time``
argument was not reflecting the time taken by the actual task.