From 760766240903301297e87b33bc10169786512d8c Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 26 Feb 2014 13:46:01 -0800 Subject: [PATCH] Avoid shutting down of a passed executor For the case where an executor is being provided we should not shutdown the executor on-behalf of the provider since they would likely expect that they can still use the executor elsewhere. If the executor is shutdown then further usage is not possible. Change-Id: Ia83d8c2f1df06200e32e9b0c7340f32e74fd5d85 --- taskflow/engines/worker_based/server.py | 1 - taskflow/engines/worker_based/worker.py | 4 ++++ taskflow/tests/unit/worker_based/test_server.py | 3 +-- taskflow/tests/unit/worker_based/test_worker.py | 3 ++- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 68df7f71..6b0d8848 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -173,4 +173,3 @@ class Server(object): def stop(self): """Stop processing incoming requests.""" self._proxy.stop() - self._executor.shutdown() diff --git a/taskflow/engines/worker_based/worker.py b/taskflow/engines/worker_based/worker.py index f6fe608f..49ac0448 100644 --- a/taskflow/engines/worker_based/worker.py +++ b/taskflow/engines/worker_based/worker.py @@ -71,6 +71,7 @@ class Worker(object): def __init__(self, exchange, topic, tasks, executor=None, **kwargs): self._topic = topic self._executor = executor + self._owns_executor = False self._threads_count = -1 if self._executor is None: if 'threads_count' in kwargs: @@ -80,6 +81,7 @@ class Worker(object): else: self._threads_count = tu.get_optimal_thread_count() self._executor = futures.ThreadPoolExecutor(self._threads_count) + self._owns_executor = True self._endpoints = self._derive_endpoints(tasks) self._server = server.Server(topic, exchange, self._executor, self._endpoints, **kwargs) @@ -137,3 +139,5 @@ class Worker(object): def stop(self): """Stop worker.""" self._server.stop() + if self._owns_executor: + self._executor.shutdown() diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index 9349aeee..707a82ab 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -371,7 +371,6 @@ class TestServer(test.MockTestCase): # check calls master_mock_calls = [ - mock.call.proxy.stop(), - mock.call.executor.shutdown() + mock.call.proxy.stop() ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index c2d6dcfc..f78f150a 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -119,7 +119,8 @@ class TestWorker(test.MockTestCase): self.worker(reset_master_mock=True).stop() master_mock_calls = [ - mock.call.server.stop() + mock.call.server.stop(), + mock.call.executor.shutdown() ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls)