From 9db8bae0a197c067f065a1991bd573f9d6fffcfd Mon Sep 17 00:00:00 2001 From: Monty Taylor Date: Sat, 22 Sep 2018 06:46:37 -0500 Subject: [PATCH] Make RateLimitingTaskManager the TaskManager There isn't really a reason to not run the multi-threaded rate-limiting task manager all the time. Modify it slightly so that rate=None means "don't rate limit", which should keep the existing behavior. Change-Id: I3ede4fd0b12a65effade238c4ea967aca51869ba --- openstack/_adapter.py | 1 + openstack/connection.py | 1 + openstack/task_manager.py | 109 ++++++++---------- .../tests/unit/cloud/test_task_manager.py | 1 + 4 files changed, 48 insertions(+), 64 deletions(-) diff --git a/openstack/_adapter.py b/openstack/_adapter.py index 5340888b9..dfe2e3691 100644 --- a/openstack/_adapter.py +++ b/openstack/_adapter.py @@ -121,6 +121,7 @@ class OpenStackSDKAdapter(adapter.Adapter): session=session, *args, **kwargs) if not task_manager: task_manager = _task_manager.TaskManager(name=self.service_type) + task_manager.start() self.task_manager = task_manager diff --git a/openstack/connection.py b/openstack/connection.py index 3459d3a86..3e19f6757 100644 --- a/openstack/connection.py +++ b/openstack/connection.py @@ -290,6 +290,7 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta, self.task_manager = task_manager or _task_manager.TaskManager( self.config.full_name) + self.task_manager.start() self._session = None self._proxies = {} diff --git a/openstack/task_manager.py b/openstack/task_manager.py index e2273bb76..6a58bb06a 100644 --- a/openstack/task_manager.py +++ b/openstack/task_manager.py @@ -95,11 +95,19 @@ class Task(object): class TaskManager(object): - def __init__(self, name, log=_log, workers=5, **kwargs): + def __init__(self, name, rate=None, log=_log, workers=5, **kwargs): self.name = name self._executor = None self._log = log self._workers = workers + self.daemon = True + self.queue = queue.Queue() + self._running = True + if rate is not None: + rate = float(rate) + self.rate = rate + self._thread = threading.Thread(name=name, target=self.run) + self._thread.daemon = True @property def executor(self): @@ -108,18 +116,42 @@ class TaskManager(object): max_workers=self._workers) return self._executor + def start(self): + self._thread.start() + def stop(self): - """ This is a direct action passthrough TaskManager """ + self._running = False + self.queue.put(None) if self._executor: self._executor.shutdown() - def run(self): - """ This is a direct action passthrough TaskManager """ - pass - def join(self): - """ This is a direct action passthrough TaskManager """ - pass + self._thread.join() + + def run(self): + last_ts = 0 + try: + while True: + task = self.queue.get() + if not task: + if not self._running: + break + continue + if self.rate: + while True: + delta = time.time() - last_ts + if delta >= self.rate: + break + time.sleep(self.rate - delta) + self._log.debug( + "TaskManager {name} queue size: {size})".format( + name=self.name, + size=self.queue.qsize())) + self.run_task(task) + self.queue.task_done() + except Exception: + self._log.exception("TaskManager died") + raise def submit_task(self, task): """Submit and execute the given task. @@ -131,7 +163,11 @@ class TaskManager(object): This method calls task.wait() so that it only returns when the task is complete. """ - self.run_task(task=task) + if not self._running: + raise exceptions.TaskManagerStopped( + "TaskManager {name} is no longer running".format( + name=self.name)) + self.queue.put(task) return task.wait() def submit_function( @@ -190,61 +226,6 @@ class TaskManager(object): self.name, task.name, elapsed_time) -class RateLimitingTaskManager(TaskManager): - - def __init__(self, name, rate, workers=5): - super(TaskManager, self).__init__( - name=name, workers=workers) - self.daemon = True - self.queue = queue.Queue() - self._running = True - self.rate = float(rate) - self._thread = threading.Thread(name=name, target=self.run) - self._thread.daemon = True - - def start(self): - self._thread.start() - - def stop(self): - self._running = False - self.queue.put(None) - - def join(self): - self._thread.join() - - def run(self): - last_ts = 0 - try: - while True: - task = self.queue.get() - if not task: - if not self._running: - break - continue - while True: - delta = time.time() - last_ts - if delta >= self.rate: - break - time.sleep(self.rate - delta) - self._log.debug( - "TaskManager {name} queue size: {size})".format( - name=self.name, - size=self.queue.qsize())) - self.run_task(task) - self.queue.task_done() - except Exception: - self._log.exception("TaskManager died") - raise - - def submit_task(self, task): - if not self._running: - raise exceptions.TaskManagerStopped( - "TaskManager {name} is no longer running".format( - name=self.name)) - self.queue.put(task) - return task.wait() - - def wait_for_futures(futures, raise_on_error=True, log=_log): '''Collect results or failures from a list of running future tasks.''' diff --git a/openstack/tests/unit/cloud/test_task_manager.py b/openstack/tests/unit/cloud/test_task_manager.py index 777cb90f4..6c4b19c0c 100644 --- a/openstack/tests/unit/cloud/test_task_manager.py +++ b/openstack/tests/unit/cloud/test_task_manager.py @@ -68,6 +68,7 @@ class TestTaskManager(base.TestCase): def setUp(self): super(TestTaskManager, self).setUp() self.manager = task_manager.TaskManager(name='test') + self.manager.start() def test_wait_re_raise(self): """Test that Exceptions thrown in a Task is reraised correctly