Fix threading zmq poller and proxy

- Fixed universal proxy to not get stuck with multiple backends
- Fixed threading pollers/executors (proxy side)
    - Driver option to switch green/no-green impl.
    - Swtiched to no-green in real-world proxy (green left for unit tests)
- Minor names fixes in serializer

Change-Id: Id6508101521d8914228c639ed58ecd29db0ef456
This commit is contained in:
Oleksii Zamiatin 2015-07-14 23:03:22 +03:00
parent 12aff74f53
commit ebcadf3d5e
16 changed files with 123 additions and 79 deletions

View File

@ -14,9 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
eventlet.monkey_patch()
import contextlib
import logging
import sys
@ -30,6 +27,9 @@ from oslo_messaging._executors import base # FIXME(markmc)
CONF = cfg.CONF
CONF.register_opts(impl_zmq.zmq_opts)
CONF.register_opts(base._pool_opts)
# TODO(ozamiatin): Move this option assignment to an external config file
# Use efficient zmq poller in real-world deployment
CONF.rpc_zmq_native = True
def main():

View File

@ -75,8 +75,7 @@ class Listener(object):
def cleanup(self):
"""Cleanup listener.
Close connection used by listener if any. For some listeners like
zmq there is no connection so no need to close connection.
Close connection (socket) used by listener if any.
As this is listener specific method, overwrite it in to derived class
if cleanup of listener required.
"""

View File

@ -45,9 +45,13 @@ zmq_opts = [
cfg.BoolOpt('rpc_zmq_all_req_rep',
default=True,
deprecated_group='DEFAULT',
help='Use REQ/REP pattern for all methods CALL/CAST/FANOUT.'),
cfg.BoolOpt('rpc_zmq_native',
default=False,
help='Switches ZeroMQ eventlet/threading way of usage.'
'Affects pollers, executors etc.'),
# The following port is unassigned by IANA as of 2012-05-21
cfg.IntOpt('rpc_zmq_port', default=9501,
help='ZeroMQ receiver listening port.'),

View File

@ -41,7 +41,8 @@ class BaseProxy(object):
super(BaseProxy, self).__init__()
self.conf = conf
self.context = context
self.executor = zmq_async.get_executor(self.run)
self.executor = zmq_async.get_executor(
self.run, native_zmq=conf.rpc_zmq_native)
@abc.abstractmethod
def run(self):
@ -132,9 +133,8 @@ class DirectBackendMatcher(BaseBackendMatcher):
def _match_backend(self, message):
topic = self._get_topic(message)
ipc_address = self._get_ipc_address(topic)
if ipc_address not in self.backends:
self._create_backend(ipc_address)
return self.backend, topic
backend = self._create_backend(ipc_address)
return backend, topic
@abc.abstractmethod
def _get_topic(self, message):

View File

@ -24,8 +24,6 @@ from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ZmqBroker(object):
"""Local messaging IPC broker (nodes are still peers).
@ -42,6 +40,7 @@ class ZmqBroker(object):
def __init__(self, conf):
super(ZmqBroker, self).__init__()
zmq = zmq_async.import_zmq(native_zmq=conf.rpc_zmq_native)
self.conf = conf
self.context = zmq.Context()
proxy = zmq_universal_proxy.UniversalProxy(conf, self.context)

View File

@ -46,12 +46,11 @@ class CallProxy(base_proxy.BaseProxy):
class DealerBackend(base_proxy.DirectBackendMatcher):
def __init__(self, conf, context):
super(DealerBackend, self).__init__(conf,
zmq_async.get_poller(),
context)
self.backend = self.context.socket(zmq.DEALER)
self.poller.register(self.backend)
def __init__(self, conf, context, poller=None):
if poller is None:
poller = zmq_async.get_poller(
native_zmq=conf.rpc_zmq_native)
super(DealerBackend, self).__init__(conf, poller, context)
def receive_outgoing_reply(self):
reply_message = self.poller.poll(1)
@ -71,16 +70,22 @@ class DealerBackend(base_proxy.DirectBackendMatcher):
backend.send_multipart(message)
def _create_backend(self, ipc_address):
self.backend.connect(ipc_address)
self.backends[str(ipc_address)] = True
if ipc_address in self.backends:
return self.backends[ipc_address]
backend = self.context.socket(zmq.DEALER)
backend.connect(ipc_address)
self.poller.register(backend)
self.backends[ipc_address] = backend
return backend
class FrontendTcpRouter(base_proxy.BaseTcpFrontend):
def __init__(self, conf, context):
super(FrontendTcpRouter, self).__init__(conf,
zmq_async.get_poller(),
context,
def __init__(self, conf, context, poller=None):
if poller is None:
poller = zmq_async.get_poller(
native_zmq=conf.rpc_zmq_native)
super(FrontendTcpRouter, self).__init__(conf, poller, context,
socket_type=zmq.ROUTER,
port_number=conf.rpc_zmq_port)

View File

@ -42,8 +42,8 @@ class CastProxy(base_proxy.BaseProxy):
class FrontendTcpPull(base_proxy.BaseTcpFrontend):
def __init__(self, conf, context):
super(FrontendTcpPull, self).__init__(conf, zmq_async.get_poller(),
context)
poller = zmq_async.get_poller(native_zmq=conf.rpc_zmq_native)
super(FrontendTcpPull, self).__init__(conf, poller, context)
self.frontend = self.context.socket(zmq.PULL)
address = zmq_topic.get_tcp_bind_address(conf.rpc_zmq_fanout_port)
LOG.info(_LI("Binding to TCP PULL %s") % address)
@ -58,9 +58,8 @@ class FrontendTcpPull(base_proxy.BaseTcpFrontend):
class CastPushBackendMatcher(base_proxy.BaseBackendMatcher):
def __init__(self, conf, context):
super(CastPushBackendMatcher, self).__init__(conf,
zmq_async.get_poller(),
context)
poller = zmq_async.get_poller(native_zmq=conf.rpc_zmq_native)
super(CastPushBackendMatcher, self).__init__(conf, poller, context)
self.backend = self.context.socket(zmq.PUSH)
def _get_topic(self, message):

View File

@ -24,9 +24,8 @@ zmq = zmq_async.import_zmq()
class PublisherBackend(base_proxy.BaseBackendMatcher):
def __init__(self, conf, context):
super(PublisherBackend, self).__init__(conf,
zmq_async.get_poller(),
context)
poller = zmq_async.get_poller(native_zmq=conf.rpc_zmq_native)
super(PublisherBackend, self).__init__(conf, poller, context)
self.backend = self.context.socket(zmq.PUB)
self.backend.bind(zmq_topic.get_ipc_address_fanout(conf))

View File

@ -17,6 +17,7 @@ import logging
import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy
from oslo_messaging._drivers.zmq_driver.broker import zmq_call_proxy
from oslo_messaging._drivers.zmq_driver.broker import zmq_fanout_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._i18n import _LI
@ -27,27 +28,34 @@ class UniversalProxy(base_proxy.BaseProxy):
def __init__(self, conf, context):
super(UniversalProxy, self).__init__(conf, context)
self.tcp_frontend = zmq_call_proxy.FrontendTcpRouter(conf, context)
self.backend_matcher = BackendMatcher(conf, context)
self.poller = zmq_async.get_poller(
native_zmq=conf.rpc_zmq_native)
self.tcp_frontend = zmq_call_proxy.FrontendTcpRouter(
conf, context, poller=self.poller)
self.backend_matcher = BackendMatcher(
conf, context, poller=self.poller)
call = zmq_serializer.CALL_TYPE
self.call_backend = self.backend_matcher.backends[call]
LOG.info(_LI("Starting universal-proxy thread"))
def run(self):
message = self.tcp_frontend.receive_incoming()
if message is not None:
self.backend_matcher.redirect_to_backend(message)
message, socket = self.poller.poll(self.conf.rpc_poll_timeout)
if message is None:
return
reply, socket = self.call_backend.receive_outgoing_reply()
if reply is not None:
self.tcp_frontend.redirect_outgoing_reply(reply)
LOG.info(_LI("Received message at universal proxy: %s") % str(message))
if socket == self.tcp_frontend.frontend:
self.backend_matcher.redirect_to_backend(message)
else:
self.tcp_frontend.redirect_outgoing_reply(message)
class BackendMatcher(base_proxy.BaseBackendMatcher):
def __init__(self, conf, context):
super(BackendMatcher, self).__init__(conf, None, context)
direct_backend = zmq_call_proxy.DealerBackend(conf, context)
def __init__(self, conf, context, poller=None):
super(BackendMatcher, self).__init__(conf, poller, context)
direct_backend = zmq_call_proxy.DealerBackend(conf, context, poller)
self.backends[zmq_serializer.CALL_TYPE] = direct_backend
self.backends[zmq_serializer.CAST_TYPE] = direct_backend
fanout_backend = zmq_fanout_proxy.PublisherBackend(conf, context)

View File

@ -15,36 +15,60 @@
import logging
import threading
from oslo_utils import eventletutils
import zmq
from oslo_messaging._drivers.zmq_driver import zmq_poller
LOG = logging.getLogger(__name__)
_threading = threading
if eventletutils.EVENTLET_AVAILABLE:
import eventlet
_threading = eventlet.patcher.original('threading')
class ThreadingPoller(zmq_poller.ZmqPoller):
def __init__(self):
self.poller = zmq.Poller()
self.recv_methods = {}
def register(self, socket):
self.poller.register(socket, zmq.POLLOUT)
def register(self, socket, recv_method=None):
if recv_method is not None:
self.recv_methods[socket] = recv_method
self.poller.register(socket, zmq.POLLIN)
def poll(self, timeout=None):
socks = dict(self.poller.poll(timeout))
for socket in socks:
incoming = socket.recv()
return incoming
timeout = timeout * 1000 # zmq poller waits milliseconds
sockets = dict(self.poller.poll(timeout=timeout))
if not sockets:
return None, None
for socket in sockets:
if socket in self.recv_methods:
return self.recv_methods[socket](socket)
else:
return socket.recv_multipart(), socket
class ThreadingExecutor(zmq_poller.Executor):
def __init__(self, method):
thread = threading.Thread(target=method)
super(ThreadingExecutor, self).__init__(thread)
self._method = method
super(ThreadingExecutor, self).__init__(
_threading.Thread(target=self._loop))
self._stop = _threading.Event()
def _loop(self):
while not self._stop.is_set():
self._method()
def execute(self):
self.thread.start()
def stop(self):
self._stop.set()
def wait(self):
self.thread.join()

View File

@ -60,8 +60,9 @@ class CallRequest(Request):
raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % self.timeout)
if reply['failure']:
if reply[zmq_serializer.FIELD_FAILURE]:
raise rpc_common.deserialize_remote_exception(
reply['failure'], self.allowed_remote_exmods)
reply[zmq_serializer.FIELD_FAILURE],
self.allowed_remote_exmods)
else:
return reply['reply']
return reply[zmq_serializer.FIELD_REPLY]

View File

@ -19,6 +19,7 @@ from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_topic as topic_utils
from oslo_messaging._i18n import _LE
@ -41,9 +42,9 @@ class ZmqIncomingRequest(base.IncomingMessage):
if failure is not None:
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
message_reply = {u'reply': reply,
u'failure': failure,
u'log_failure': log_failure}
message_reply = {zmq_serializer.FIELD_REPLY: reply,
zmq_serializer.FIELD_FAILURE: failure,
zmq_serializer.FIELD_LOG_FAILURE: log_failure}
LOG.debug("Replying %s REP", (str(message_reply)))
self.received = True
self.reply_socket.send(self.reply_id, zmq.SNDMORE)

View File

@ -23,8 +23,12 @@ LOG = logging.getLogger(__name__)
green_zmq = importutils.try_import('eventlet.green.zmq')
def import_zmq():
imported_zmq = green_zmq or importutils.try_import('zmq')
def import_zmq(native_zmq=False):
if native_zmq:
imported_zmq = importutils.try_import('zmq')
else:
imported_zmq = green_zmq or importutils.try_import('zmq')
if imported_zmq is None:
errmsg = _LE("ZeroMQ not found!")
LOG.error(errmsg)
@ -32,28 +36,28 @@ def import_zmq():
return imported_zmq
def get_poller():
if green_zmq:
def get_poller(native_zmq=False):
if native_zmq or green_zmq is None:
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
return threading_poller.ThreadingPoller()
else:
from oslo_messaging._drivers.zmq_driver.poller import green_poller
return green_poller.GreenPoller()
else:
def get_reply_poller(native_zmq=False):
if native_zmq or green_zmq is None:
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
return threading_poller.ThreadingPoller()
def get_reply_poller():
if green_zmq:
else:
from oslo_messaging._drivers.zmq_driver.poller import green_poller
return green_poller.HoldReplyPoller()
else:
def get_executor(method, native_zmq=False):
if native_zmq or green_zmq is None:
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
return threading_poller.ThreadingPoller()
def get_executor(method):
if green_zmq is not None:
return threading_poller.ThreadingExecutor(method)
else:
from oslo_messaging._drivers.zmq_driver.poller import green_poller
return green_poller.GreenExecutor(method)
else:
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
return threading_poller.ThreadingExecutor()

View File

@ -26,6 +26,10 @@ LOG = logging.getLogger(__name__)
MESSAGE_CALL_TYPE_POSITION = 2
MESSAGE_CALL_TOPIC_POSITION = 3
FIELD_FAILURE = 'failure'
FIELD_REPLY = 'reply'
FIELD_LOG_FAILURE = 'log_failure'
CALL_TYPE = 'call'
CAST_TYPE = 'cast'
FANOUT_TYPE = 'fanout'

View File

@ -150,7 +150,7 @@ class TestZmqBasics(ZmqBaseTestCase):
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertIsNotNone(result)
self.assertTrue(result)
def test_send_noreply(self):
"""Cast() with topic."""

View File

@ -24,7 +24,4 @@ redis-server --port $ZMQ_REDIS_PORT &
oslo-messaging-zmq-receiver --config-file ${DATADIR}/zmq.conf > ${DATADIR}/receiver.log 2>&1 &
# FIXME(sileht): This does the same kind of setup that devstack does
# But this doesn't work yet, a zeromq maintener should take a look on that
$*