From 22fd32fb55d45b5f355a862b2edd78fde3c2086f Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 13 Mar 2015 14:29:34 -0700 Subject: [PATCH] Just let the future executors handle the max workers Instead of providing and retaining a thread count in the worker and action engine executors and checking it and handling the none case, we can just let the future types handle this already (which they already do). And when displaying this information in the worker banner use a new future executor attribute that is the maximum number of workers that will be ever created. Change-Id: I765c22936b53cdbb8a90195a764d4c67bcc3f34b --- taskflow/engines/action_engine/executor.py | 7 ++----- taskflow/engines/worker_based/worker.py | 13 +++++-------- taskflow/tests/unit/worker_based/test_worker.py | 14 ++++---------- taskflow/types/futures.py | 10 ++++++++++ 4 files changed, 21 insertions(+), 23 deletions(-) diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index b271beb8..bd222a55 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -417,11 +417,8 @@ class ParallelTaskExecutor(TaskExecutor): def start(self): if self._own_executor: - if self._max_workers is not None: - max_workers = self._max_workers - else: - max_workers = threading_utils.get_optimal_thread_count() - self._executor = self._create_executor(max_workers=max_workers) + self._executor = self._create_executor( + max_workers=self._max_workers) def stop(self): if self._own_executor: diff --git a/taskflow/engines/worker_based/worker.py b/taskflow/engines/worker_based/worker.py index 5e9ff85e..8a79133f 100644 --- a/taskflow/engines/worker_based/worker.py +++ b/taskflow/engines/worker_based/worker.py @@ -98,13 +98,9 @@ System details: self._topic = topic self._executor = executor self._owns_executor = False - self._threads_count = -1 if self._executor is None: - if threads_count is not None: - self._threads_count = int(threads_count) - else: - self._threads_count = tu.get_optimal_thread_count() - self._executor = futures.ThreadPoolExecutor(self._threads_count) + self._executor = futures.ThreadPoolExecutor( + max_workers=threads_count) self._owns_executor = True self._endpoints = self._derive_endpoints(tasks) self._exchange = exchange @@ -139,8 +135,9 @@ System details: tpl_params['transport_type'] = transport.driver_type tpl_params['connection_uri'] = connection_details.uri tpl_params['executor_type'] = reflection.get_class_name(self._executor) - if self._threads_count != -1: - tpl_params['executor_thread_count'] = self._threads_count + threads_count = getattr(self._executor, 'max_workers', None) + if threads_count is not None: + tpl_params['executor_thread_count'] = threads_count if self._endpoints: pretty_endpoints = [] for ep in self._endpoints: diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index 07095efa..a475c51d 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -33,7 +33,6 @@ class TestWorker(test.MockTestCase): self.broker_url = 'test-url' self.exchange = 'test-exchange' self.topic = 'test-topic' - self.threads_count = 5 self.endpoint_count = 24 # patch classes @@ -42,11 +41,6 @@ class TestWorker(test.MockTestCase): self.server_mock, self.server_inst_mock = self.patchClass( worker.server, 'Server') - # other mocking - self.threads_count_mock = self.patch( - 'taskflow.engines.worker_based.worker.tu.get_optimal_thread_count') - self.threads_count_mock.return_value = self.threads_count - def worker(self, reset_master_mock=False, **kwargs): worker_kwargs = dict(exchange=self.exchange, topic=self.topic, @@ -62,7 +56,7 @@ class TestWorker(test.MockTestCase): self.worker() master_mock_calls = [ - mock.call.executor_class(self.threads_count), + mock.call.executor_class(max_workers=None), mock.call.Server(self.topic, self.exchange, self.executor_inst_mock, [], url=self.broker_url, @@ -70,7 +64,7 @@ class TestWorker(test.MockTestCase): transport=mock.ANY, retry_options=mock.ANY) ] - self.assertEqual(self.master_mock.mock_calls, master_mock_calls) + self.assertEqual(master_mock_calls, self.master_mock.mock_calls) def test_banner_writing(self): buf = six.StringIO() @@ -84,7 +78,7 @@ class TestWorker(test.MockTestCase): self.worker(threads_count=10) master_mock_calls = [ - mock.call.executor_class(10), + mock.call.executor_class(max_workers=10), mock.call.Server(self.topic, self.exchange, self.executor_inst_mock, [], url=self.broker_url, @@ -92,7 +86,7 @@ class TestWorker(test.MockTestCase): transport=mock.ANY, retry_options=mock.ANY) ] - self.assertEqual(self.master_mock.mock_calls, master_mock_calls) + self.assertEqual(master_mock_calls, self.master_mock.mock_calls) def test_creation_with_custom_executor(self): executor_mock = mock.MagicMock(name='executor') diff --git a/taskflow/types/futures.py b/taskflow/types/futures.py index 2a6f7b6f..cfa97214 100644 --- a/taskflow/types/futures.py +++ b/taskflow/types/futures.py @@ -114,6 +114,11 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor): # really want to use anyway). super(ThreadPoolExecutor, self).submit) + @property + def max_workers(self): + """The max number of workers that this executor will ever have.""" + return self._max_workers + @property def statistics(self): """:class:`.ExecutorStatistics` about the executors executions.""" @@ -153,6 +158,11 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor): """Accessor to determine if the executor is alive/active.""" return not self._shutdown_thread + @property + def max_workers(self): + """The max number of workers that this executor will ever have.""" + return self._max_workers + @property def statistics(self): """:class:`.ExecutorStatistics` about the executors executions."""