Use explicit WBE worker object arguments (instead of kwargs)

Removes the kwargs usage that is now uniform across the other
WBE components from the workers module so that the usage of
kwargs for setting up these objects no longer is valid.

Change-Id: I4e25b88c5d2f7e2d7933ff270e2782cebe227025
This commit is contained in:
Joshua Harlow 2015-01-23 00:41:58 -08:00
parent 292adc5a62
commit fc9cb88228
2 changed files with 27 additions and 14 deletions

View File

@ -20,13 +20,13 @@ import socket
import string import string
import sys import sys
from concurrent import futures
from oslo_utils import reflection from oslo_utils import reflection
from taskflow.engines.worker_based import endpoint from taskflow.engines.worker_based import endpoint
from taskflow.engines.worker_based import server from taskflow.engines.worker_based import server
from taskflow import logging from taskflow import logging
from taskflow import task as t_task from taskflow import task as t_task
from taskflow.types import futures
from taskflow.utils import misc from taskflow.utils import misc
from taskflow.utils import threading_utils as tu from taskflow.utils import threading_utils as tu
from taskflow import version from taskflow import version
@ -77,23 +77,26 @@ class Worker(object):
will be used to create tasks from. will be used to create tasks from.
:param executor: custom executor object that can used for processing :param executor: custom executor object that can used for processing
requests in separate threads (if not provided one will be created) requests in separate threads (if not provided one will be created)
:param threads_count: threads count to be passed to the default executor :param threads_count: threads count to be passed to the
default executor (used only if an executor is not
passed in)
:param transport: transport to be used (e.g. amqp, memory, etc.) :param transport: transport to be used (e.g. amqp, memory, etc.)
:param transport_options: transport specific options :param transport_options: transport specific options
:param retry_options: retry specific options (used to configure how kombu :param retry_options: retry specific options (used to configure how kombu
handles retrying under tolerable/transient failures). handles retrying under tolerable/transient failures).
""" """
def __init__(self, exchange, topic, tasks, executor=None, **kwargs): def __init__(self, exchange, topic, tasks,
executor=None, threads_count=None, url=None,
transport=None, transport_options=None,
retry_options=None):
self._topic = topic self._topic = topic
self._executor = executor self._executor = executor
self._owns_executor = False self._owns_executor = False
self._threads_count = -1 self._threads_count = -1
if self._executor is None: if self._executor is None:
if 'threads_count' in kwargs: if threads_count is not None:
self._threads_count = int(kwargs.pop('threads_count')) self._threads_count = int(threads_count)
if self._threads_count <= 0:
raise ValueError("threads_count provided must be > 0")
else: else:
self._threads_count = tu.get_optimal_thread_count() self._threads_count = tu.get_optimal_thread_count()
self._executor = futures.ThreadPoolExecutor(self._threads_count) self._executor = futures.ThreadPoolExecutor(self._threads_count)
@ -101,7 +104,10 @@ class Worker(object):
self._endpoints = self._derive_endpoints(tasks) self._endpoints = self._derive_endpoints(tasks)
self._exchange = exchange self._exchange = exchange
self._server = server.Server(topic, exchange, self._executor, self._server = server.Server(topic, exchange, self._executor,
self._endpoints, **kwargs) self._endpoints, url=url,
transport=transport,
transport_options=transport_options,
retry_options=retry_options)
@staticmethod @staticmethod
def _derive_endpoints(tasks): def _derive_endpoints(tasks):

View File

@ -64,7 +64,11 @@ class TestWorker(test.MockTestCase):
master_mock_calls = [ master_mock_calls = [
mock.call.executor_class(self.threads_count), mock.call.executor_class(self.threads_count),
mock.call.Server(self.topic, self.exchange, mock.call.Server(self.topic, self.exchange,
self.executor_inst_mock, [], url=self.broker_url) self.executor_inst_mock, [],
url=self.broker_url,
transport_options=mock.ANY,
transport=mock.ANY,
retry_options=mock.ANY)
] ]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls) self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
@ -83,20 +87,23 @@ class TestWorker(test.MockTestCase):
mock.call.executor_class(10), mock.call.executor_class(10),
mock.call.Server(self.topic, self.exchange, mock.call.Server(self.topic, self.exchange,
self.executor_inst_mock, [], self.executor_inst_mock, [],
url=self.broker_url) url=self.broker_url,
transport_options=mock.ANY,
transport=mock.ANY,
retry_options=mock.ANY)
] ]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls) self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
def test_creation_with_negative_threads_count(self):
self.assertRaises(ValueError, self.worker, threads_count=-10)
def test_creation_with_custom_executor(self): def test_creation_with_custom_executor(self):
executor_mock = mock.MagicMock(name='executor') executor_mock = mock.MagicMock(name='executor')
self.worker(executor=executor_mock) self.worker(executor=executor_mock)
master_mock_calls = [ master_mock_calls = [
mock.call.Server(self.topic, self.exchange, executor_mock, [], mock.call.Server(self.topic, self.exchange, executor_mock, [],
url=self.broker_url) url=self.broker_url,
transport_options=mock.ANY,
transport=mock.ANY,
retry_options=mock.ANY)
] ]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls) self.assertEqual(self.master_mock.mock_calls, master_mock_calls)