diff --git a/taskflow/engines/worker_based/worker.py b/taskflow/engines/worker_based/worker.py index 609339de..f6fe608f 100644 --- a/taskflow/engines/worker_based/worker.py +++ b/taskflow/engines/worker_based/worker.py @@ -71,9 +71,14 @@ class Worker(object): def __init__(self, exchange, topic, tasks, executor=None, **kwargs): self._topic = topic self._executor = executor - self._threads_count = kwargs.pop('threads_count', - tu.get_optimal_thread_count()) + self._threads_count = -1 if self._executor is None: + if 'threads_count' in kwargs: + self._threads_count = int(kwargs.pop('threads_count')) + if self._threads_count <= 0: + raise ValueError("threads_count provided must be > 0") + else: + self._threads_count = tu.get_optimal_thread_count() self._executor = futures.ThreadPoolExecutor(self._threads_count) self._endpoints = self._derive_endpoints(tasks) self._server = server.Server(topic, exchange, self._executor, @@ -114,11 +119,15 @@ class Worker(object): def run(self): """Run worker.""" - LOG.info("Starting the '%s' topic worker in %s threads." % - (self._topic, self._threads_count)) + if self._threads_count != -1: + LOG.info("Starting the '%s' topic worker in %s threads.", + self._topic, self._threads_count) + else: + LOG.info("Starting the '%s' topic worker using a %s.", self._topic, + self._executor) LOG.info("Tasks list:") for endpoint in self._endpoints: - LOG.info("|-- %s" % endpoint) + LOG.info("|-- %s", endpoint) self._server.start() def wait(self):