Merge "Fixups for threads_count usage and logging"
This commit is contained in:
@@ -71,9 +71,14 @@ class Worker(object):
|
|||||||
def __init__(self, exchange, topic, tasks, executor=None, **kwargs):
|
def __init__(self, exchange, topic, tasks, executor=None, **kwargs):
|
||||||
self._topic = topic
|
self._topic = topic
|
||||||
self._executor = executor
|
self._executor = executor
|
||||||
self._threads_count = kwargs.pop('threads_count',
|
self._threads_count = -1
|
||||||
tu.get_optimal_thread_count())
|
|
||||||
if self._executor is None:
|
if self._executor is None:
|
||||||
|
if 'threads_count' in kwargs:
|
||||||
|
self._threads_count = int(kwargs.pop('threads_count'))
|
||||||
|
if self._threads_count <= 0:
|
||||||
|
raise ValueError("threads_count provided must be > 0")
|
||||||
|
else:
|
||||||
|
self._threads_count = tu.get_optimal_thread_count()
|
||||||
self._executor = futures.ThreadPoolExecutor(self._threads_count)
|
self._executor = futures.ThreadPoolExecutor(self._threads_count)
|
||||||
self._endpoints = self._derive_endpoints(tasks)
|
self._endpoints = self._derive_endpoints(tasks)
|
||||||
self._server = server.Server(topic, exchange, self._executor,
|
self._server = server.Server(topic, exchange, self._executor,
|
||||||
@@ -114,11 +119,15 @@ class Worker(object):
|
|||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Run worker."""
|
"""Run worker."""
|
||||||
LOG.info("Starting the '%s' topic worker in %s threads." %
|
if self._threads_count != -1:
|
||||||
(self._topic, self._threads_count))
|
LOG.info("Starting the '%s' topic worker in %s threads.",
|
||||||
|
self._topic, self._threads_count)
|
||||||
|
else:
|
||||||
|
LOG.info("Starting the '%s' topic worker using a %s.", self._topic,
|
||||||
|
self._executor)
|
||||||
LOG.info("Tasks list:")
|
LOG.info("Tasks list:")
|
||||||
for endpoint in self._endpoints:
|
for endpoint in self._endpoints:
|
||||||
LOG.info("|-- %s" % endpoint)
|
LOG.info("|-- %s", endpoint)
|
||||||
self._server.start()
|
self._server.start()
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user