[zmq] Remove rpc_zmq_concurrency option

Closes-Bug: #1569800
Change-Id: Ia61a5dc9ecac202b936d1923af82ba9a7a85775b
This commit is contained in:
Gevorg Davoian 2016-06-03 18:24:27 +03:00
parent f7ceec7aa4
commit 6166b44cba
6 changed files with 43 additions and 141 deletions

View File

@ -32,8 +32,6 @@ from oslo_messaging import server
RPCException = rpc_common.RPCException
_MATCHMAKER_BACKENDS = ('redis', 'dummy')
_MATCHMAKER_DEFAULT = 'redis'
_CONCURRENCY_CHOICES = ('eventlet', 'native')
_CONCURRENCY_DEFAULT = 'eventlet'
LOG = logging.getLogger(__name__)
@ -48,10 +46,6 @@ zmq_opts = [
choices=_MATCHMAKER_BACKENDS,
help='MatchMaker driver.'),
cfg.StrOpt('rpc_zmq_concurrency', default=_CONCURRENCY_DEFAULT,
choices=_CONCURRENCY_CHOICES,
help='Type of concurrency used. Either "native" or "eventlet"'),
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1.'),

View File

@ -19,7 +19,7 @@ from stevedore import driver
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LI
zmq = zmq_async.import_zmq(zmq_concurrency='native')
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)

View File

@ -24,7 +24,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LI
zmq = zmq_async.import_zmq(zmq_concurrency='native')
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
@ -35,7 +35,7 @@ class UniversalQueueProxy(object):
self.context = context
super(UniversalQueueProxy, self).__init__()
self.matchmaker = matchmaker
self.poller = zmq_async.get_poller(zmq_concurrency='native')
self.poller = zmq_async.get_poller()
self.fe_router_socket = zmq_socket.ZmqRandomPortSocket(
conf, context, zmq.ROUTER)

View File

@ -15,21 +15,13 @@
import logging
import threading
from oslo_utils import eventletutils
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_poller
zmq = zmq_async.import_zmq(zmq_concurrency='native')
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
_threading = threading
if eventletutils.EVENTLET_AVAILABLE:
import eventlet
_threading = eventlet.patcher.original('threading')
class ThreadingPoller(zmq_poller.ZmqPoller):
@ -69,8 +61,8 @@ class ThreadingExecutor(zmq_poller.Executor):
def __init__(self, method):
self._method = method
super(ThreadingExecutor, self).__init__(
_threading.Thread(target=self._loop))
self._stop = _threading.Event()
threading.Thread(target=self._loop))
self._stop = threading.Event()
def _loop(self):
while not self._stop.is_set():

View File

@ -12,30 +12,20 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._i18n import _
from oslo_utils import eventletutils
from oslo_utils import importutils
# Map zmq_concurrency config option names to the actual module name.
ZMQ_MODULES = {
'native': 'zmq',
'eventlet': 'eventlet.green.zmq',
}
def import_zmq(zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
imported_zmq = importutils.try_import(ZMQ_MODULES[zmq_concurrency],
default=None)
def import_zmq():
imported_zmq = importutils.try_import(
'eventlet.green.zmq' if eventletutils.is_monkey_patched('thread') else
'zmq', default=None
)
return imported_zmq
def get_poller(zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
def get_poller():
if eventletutils.is_monkey_patched('thread'):
from oslo_messaging._drivers.zmq_driver.poller import green_poller
return green_poller.GreenPoller()
@ -43,10 +33,8 @@ def get_poller(zmq_concurrency='eventlet'):
return threading_poller.ThreadingPoller()
def get_executor(method, zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
def get_executor(method):
if eventletutils.is_monkey_patched('thread'):
from oslo_messaging._drivers.zmq_driver.poller import green_poller
return green_poller.GreenExecutor(method)
@ -54,26 +42,10 @@ def get_executor(method, zmq_concurrency='eventlet'):
return threading_poller.ThreadingExecutor(method)
def is_eventlet_concurrency(conf):
return _is_eventlet_zmq_available() and conf.rpc_zmq_concurrency == \
'eventlet'
def _is_eventlet_zmq_available():
return importutils.try_import('eventlet.green.zmq')
def _raise_error_if_invalid_config_value(zmq_concurrency):
if zmq_concurrency not in ZMQ_MODULES:
errmsg = _('Invalid zmq_concurrency value: %s')
raise ValueError(errmsg % zmq_concurrency)
def get_queue(zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
def get_queue():
if eventletutils.is_monkey_patched('thread'):
import eventlet
return eventlet.queue.Queue(), eventlet.queue.Empty
else:
import six
return six.moves.queue.Queue(), six.moves.queue.Empty
import six
return six.moves.queue.Queue(), six.moves.queue.Empty

View File

@ -27,20 +27,9 @@ class TestImportZmq(test_utils.BaseTestCase):
def setUp(self):
super(TestImportZmq, self).setUp()
def test_config_short_names_are_converted_to_correct_module_names(self):
mock_try_import = mock.Mock()
zmq_async.importutils.try_import = mock_try_import
def test_when_eventlet_is_available_then_load_eventlet_green_zmq(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: True
zmq_async.importutils.try_import.return_value = 'mock zmq module'
self.assertEqual('mock zmq module', zmq_async.import_zmq('native'))
mock_try_import.assert_called_with('zmq', default=None)
zmq_async.importutils.try_import.return_value = 'mock eventlet module'
self.assertEqual('mock eventlet module',
zmq_async.import_zmq('eventlet'))
mock_try_import.assert_called_with('eventlet.green.zmq', default=None)
def test_when_no_args_then_default_zmq_module_is_loaded(self):
mock_try_import = mock.Mock()
zmq_async.importutils.try_import = mock_try_import
@ -48,12 +37,15 @@ class TestImportZmq(test_utils.BaseTestCase):
mock_try_import.assert_called_with('eventlet.green.zmq', default=None)
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
def test_when_evetlet_is_unavailable_then_load_zmq(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: False
errmsg = 'Invalid zmq_concurrency value: x'
with self.assertRaisesRegexp(ValueError, errmsg):
zmq_async.import_zmq(invalid_opt)
mock_try_import = mock.Mock()
zmq_async.importutils.try_import = mock_try_import
zmq_async.import_zmq()
mock_try_import.assert_called_with('zmq', default=None)
class TestGetPoller(test_utils.BaseTestCase):
@ -62,39 +54,20 @@ class TestGetPoller(test_utils.BaseTestCase):
def setUp(self):
super(TestGetPoller, self).setUp()
def test_when_no_arg_to_get_poller_then_return_default_poller(self):
zmq_async._is_eventlet_zmq_available = lambda: True
def test_when_eventlet_is_available_then_return_GreenPoller(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: True
actual = zmq_async.get_poller()
self.assertTrue(isinstance(actual, green_poller.GreenPoller))
def test_when_native_poller_requested_then_return_ThreadingPoller(self):
actual = zmq_async.get_poller('native')
self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: False
zmq_async.eventletutils.is_monkey_patched = lambda _: False
actual = zmq_async.get_poller('eventlet')
actual = zmq_async.get_poller()
self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
def test_when_eventlet_is_available_then_return_GreenPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: True
actual = zmq_async.get_poller('eventlet')
self.assertTrue(isinstance(actual, green_poller.GreenPoller))
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
errmsg = 'Invalid zmq_concurrency value: x'
with self.assertRaisesRegexp(ValueError, errmsg):
zmq_async.get_poller(invalid_opt)
class TestGetReplyPoller(test_utils.BaseTestCase):
@ -102,34 +75,20 @@ class TestGetReplyPoller(test_utils.BaseTestCase):
def setUp(self):
super(TestGetReplyPoller, self).setUp()
def test_default_reply_poller_is_HoldReplyPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: True
def test_when_eventlet_is_available_then_return_HoldReplyPoller(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: True
actual = zmq_async.get_poller()
self.assertTrue(isinstance(actual, green_poller.GreenPoller))
def test_when_eventlet_is_available_then_return_HoldReplyPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: True
actual = zmq_async.get_poller('eventlet')
self.assertTrue(isinstance(actual, green_poller.GreenPoller))
def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: False
zmq_async.eventletutils.is_monkey_patched = lambda _: False
actual = zmq_async.get_poller('eventlet')
actual = zmq_async.get_poller()
self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
errmsg = 'Invalid zmq_concurrency value: x'
with self.assertRaisesRegexp(ValueError, errmsg):
zmq_async.get_poller(invalid_opt)
class TestGetExecutor(test_utils.BaseTestCase):
@ -137,34 +96,19 @@ class TestGetExecutor(test_utils.BaseTestCase):
def setUp(self):
super(TestGetExecutor, self).setUp()
def test_default_executor_is_GreenExecutor(self):
zmq_async._is_eventlet_zmq_available = lambda: True
def test_when_eventlet_module_is_available_then_return_GreenExecutor(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: True
executor = zmq_async.get_executor('any method')
self.assertTrue(isinstance(executor, green_poller.GreenExecutor))
self.assertEqual('any method', executor._method)
def test_when_eventlet_module_is_available_then_return_GreenExecutor(self):
zmq_async._is_eventlet_zmq_available = lambda: True
executor = zmq_async.get_executor('any method', 'eventlet')
self.assertTrue(isinstance(executor, green_poller.GreenExecutor))
self.assertEqual('any method', executor._method)
def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self):
zmq_async._is_eventlet_zmq_available = lambda: False
zmq_async.eventletutils.is_monkey_patched = lambda _: False
executor = zmq_async.get_executor('any method', 'eventlet')
executor = zmq_async.get_executor('any method')
self.assertTrue(isinstance(executor,
threading_poller.ThreadingExecutor))
self.assertEqual('any method', executor._method)
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
errmsg = 'Invalid zmq_concurrency value: x'
with self.assertRaisesRegexp(ValueError, errmsg):
zmq_async.get_executor('any method', invalid_opt)