Merge "[zmq] Reduce threading from python proxy"

This commit is contained in:
Jenkins 2016-04-22 13:21:13 +00:00 committed by Gerrit Code Review
commit 715b5b1c3f
12 changed files with 74 additions and 140 deletions

View File

@ -14,12 +14,12 @@
import argparse
import logging
import time
from oslo_config import cfg
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver.broker import zmq_proxy
from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy
from oslo_messaging import server
CONF = cfg.CONF
@ -62,13 +62,15 @@ def main():
raise Exception("Bad proxy type %s, should be one of %s" %
(args.proxy_type, PROXY_TYPES))
reactor = zmq_proxy.ZmqPublisher(CONF) if args.proxy_type == PUBLISHER \
else zmq_proxy.ZmqRouter(CONF)
reactor.start()
reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.PublisherProxy) \
if args.proxy_type == PUBLISHER \
else zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.RouterProxy)
try:
while True:
time.sleep(1)
reactor.run()
except KeyboardInterrupt:
reactor.close()
if __name__ == "__main__":
main()

View File

@ -1,50 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
@six.add_metaclass(abc.ABCMeta)
class BaseProxy(object):
"""Base TCP-proxy.
TCP-proxy redirects messages received by TCP from clients to servers
over IPC. Consists of TCP-frontend and IPC-backend objects. Runs
in async executor.
"""
def __init__(self, conf, context):
super(BaseProxy, self).__init__()
self.conf = conf
self.context = context
self.executor = zmq_async.get_executor(self.run,
zmq_concurrency='native')
@abc.abstractmethod
def run(self):
"""Main execution point of the proxy"""
def start(self):
self.executor.execute()
def stop(self):
self.executor.stop()
def wait(self):
self.executor.wait()

View File

@ -13,21 +13,18 @@
# under the License.
import logging
import os
from oslo_utils import excutils
from stevedore import driver
from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LE, _LI
from oslo_messaging._i18n import _LI
zmq = zmq_async.import_zmq(zmq_concurrency='native')
LOG = logging.getLogger(__name__)
class ZmqProxy(object):
"""Base class for Publishers and Routers proxies.
"""Wrapper class for Publishers and Routers proxies.
The main reason to have a proxy is high complexity of TCP sockets number
growth with direct connections (when services connect directly to
each other). The general complexity for ZeroMQ+Openstack deployment
@ -40,54 +37,9 @@ class ZmqProxy(object):
Publisher is a server which performs broadcast to subscribers.
Router is used for direct message types in case of number of TCP socket
connections is critical for specific deployment. Generally 3 publishers
is enough for deployment. Routers should be
"""
is enough for deployment.
def __init__(self, conf):
super(ZmqProxy, self).__init__()
self.conf = conf
self._create_ipc_dirs()
self.matchmaker = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
self.conf.rpc_zmq_matchmaker,
).driver(self.conf)
self.context = zmq.Context()
self.proxies = []
def _create_ipc_dirs(self):
ipc_dir = self.conf.rpc_zmq_ipc_dir
try:
os.makedirs("%s/fanout" % ipc_dir)
except os.error:
if not os.path.isdir(ipc_dir):
with excutils.save_and_reraise_exception():
LOG.error(_LE("Required IPC directory does not exist at"
" %s"), ipc_dir)
def start(self):
for proxy in self.proxies:
proxy.start()
def wait(self):
for proxy in self.proxies:
proxy.wait()
def close(self):
LOG.info(_LI("Broker shutting down ..."))
for proxy in self.proxies:
proxy.stop()
class ZmqPublisher(ZmqProxy):
def __init__(self, conf):
super(ZmqPublisher, self).__init__(conf)
self.proxies.append(zmq_queue_proxy.PublisherProxy(
conf, self.context, self.matchmaker))
class ZmqRouter(ZmqProxy):
"""Router is used for direct messages in order to reduce the number of
Router is used for direct messages in order to reduce the number of
allocated TCP sockets in controller. The list of requirements to Router:
1. There may be any number of routers in the deployment. Routers are
@ -107,9 +59,22 @@ class ZmqRouter(ZmqProxy):
Those requirements should limit the performance impact caused by using
of proxies making proxies as lightweight as possible.
"""
def __init__(self, conf):
super(ZmqRouter, self).__init__(conf)
self.proxies.append(zmq_queue_proxy.RouterProxy(
conf, self.context, self.matchmaker))
def __init__(self, conf, proxy_cls):
super(ZmqProxy, self).__init__()
self.conf = conf
self.matchmaker = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
self.conf.rpc_zmq_matchmaker,
).driver(self.conf)
self.context = zmq.Context()
self.proxy = proxy_cls(conf, self.context, self.matchmaker)
def run(self):
self.proxy.run()
def close(self):
LOG.info(_LI("Proxy shutting down ..."))
self.proxy.cleanup()

View File

@ -15,7 +15,6 @@
import abc
import logging
from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers \
@ -30,10 +29,12 @@ zmq = zmq_async.import_zmq(zmq_concurrency='native')
LOG = logging.getLogger(__name__)
class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
class UniversalQueueProxy(object):
def __init__(self, conf, context, matchmaker):
super(UniversalQueueProxy, self).__init__(conf, context)
self.conf = conf
self.context = context
super(UniversalQueueProxy, self).__init__()
self.matchmaker = matchmaker
self.poller = zmq_async.get_poller(zmq_concurrency='native')
@ -75,6 +76,9 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope)
return payload
def cleanup(self):
self.router_socket.close()
class PublisherProxy(UniversalQueueProxy):
@ -92,15 +96,20 @@ class PublisherProxy(UniversalQueueProxy):
"router": self.router_address})
def _redirect_in_request(self, multipart_message):
LOG.debug("-> Redirecting request %s to TCP publisher",
multipart_message)
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
if self.conf.use_pub_sub and envelope.is_mult_send:
LOG.debug("-> Redirecting request %s to TCP publisher", envelope)
self.pub_publisher.send_request(multipart_message)
def _redirect_reply(self, multipart_message):
"""No reply is possible for publisher."""
def cleanup(self):
super(PublisherProxy, self).cleanup()
self.pub_publisher.cleanup()
self.matchmaker.unregister_publisher(
(self.pub_publisher.host, self.router_address))
class RouterProxy(UniversalQueueProxy):
@ -117,19 +126,22 @@ class RouterProxy(UniversalQueueProxy):
{"router": self.router_address})
def _redirect_in_request(self, multipart_message):
LOG.debug("-> Redirecting request %s to TCP publisher",
multipart_message)
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
LOG.debug("Envelope: %s", envelope)
LOG.debug("-> Redirecting request %s to TCP publisher", envelope)
if not envelope.is_mult_send:
self.dealer_publisher.send_request(multipart_message)
def _redirect_reply(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
LOG.debug("Envelope.reply_id: %s", envelope.reply_id)
LOG.debug("<- Redirecting reply: %s", envelope)
response_binary = multipart_message[zmq_names.MULTIPART_IDX_BODY]
self.router_socket.send(envelope.reply_id, zmq.SNDMORE)
self.router_socket.send(b'', zmq.SNDMORE)
self.router_socket.send_pyobj(envelope, zmq.SNDMORE)
self.router_socket.send(response_binary)
def cleanup(self):
super(RouterProxy, self).cleanup()
self.dealer_publisher.cleanup()
self.matchmaker.unregister_router(self.router_address)

View File

@ -107,10 +107,14 @@ class CallSender(zmq_publisher_base.QueuedSender):
class CallSenderLight(CallSender):
def __init__(self, sockets_manager, _do_send_request, reply_waiter):
super(CallSenderLight, self).__init__(
sockets_manager, _do_send_request, reply_waiter)
self.socket = self.outbound_sockets.get_socket_to_routers()
self.reply_waiter.poll_socket(self.socket)
def _connect_socket(self, target):
socket = self.outbound_sockets.get_socket_to_routers()
self.reply_waiter.poll_socket(socket)
return socket
return self.socket
class ReplyWaiter(object):

View File

@ -60,7 +60,7 @@ class DealerPublisherProxy(object):
envelope = socket.recv_pyobj()
assert envelope is not None, "Invalid envelope!"
reply = socket.recv()
LOG.debug("Received reply %s", reply)
LOG.debug("Received reply %s", envelope)
return [envelope, reply]
def cleanup(self):

View File

@ -142,8 +142,9 @@ class SocketsManager(object):
return socket
def get_socket_to_hosts(self, target, hosts):
if str(target) in self.outbound_sockets:
socket = self._check_for_new_hosts(target)
key = str(target)
if key in self.outbound_sockets:
socket, tm = self.outbound_sockets[key]
else:
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
self.socket_type)

View File

@ -71,7 +71,8 @@ class Envelope(object):
def to_dict(self):
envelope = {zmq_names.FIELD_MSG_TYPE: self._msg_type,
zmq_names.FIELD_MSG_ID: self._message_id,
zmq_names.FIELD_TARGET: self._target}
zmq_names.FIELD_TARGET: self._target,
zmq_names.FIELD_TARGET_HOSTS: self._target_hosts}
envelope.update({k: v for k, v in self._kwargs.items()
if v is not None})
return envelope

View File

@ -38,22 +38,17 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
self.recv_methods = {}
def register(self, socket, recv_method=None):
LOG.debug("Registering socket")
if socket in self.recv_methods:
return
LOG.debug("Registering socket")
if recv_method is not None:
self.recv_methods[socket] = recv_method
self.poller.register(socket, zmq.POLLIN)
def poll(self, timeout=None):
if timeout:
timeout *= 1000 # zmq poller expects milliseconds
sockets = None
sockets = {}
try:
sockets = dict(self.poller.poll(timeout=timeout))
sockets = dict(self.poller.poll())
except zmq.ZMQError as e:
LOG.debug("Polling terminated with error: %s", e)

View File

@ -26,6 +26,7 @@ FIELD_MSG_ID = 'message_id'
FIELD_MSG_TYPE = 'msg_type'
FIELD_REPLY_ID = 'reply_id'
FIELD_TARGET = 'target'
FIELD_TARGET_HOSTS = 'target_hosts'
IDX_REPLY_TYPE = 1

View File

@ -31,11 +31,12 @@ zmq = zmq_async.import_zmq()
class ZmqSocket(object):
def __init__(self, conf, context, socket_type):
def __init__(self, conf, context, socket_type, high_watermark=0):
self.conf = conf
self.context = context
self.socket_type = socket_type
self.handle = context.socket(socket_type)
self.handle.set_hwm(high_watermark)
self.close_linger = -1
if self.conf.rpc_cast_timeout > 0:
@ -124,8 +125,9 @@ class ZmqPortRangeExceededException(exceptions.MessagingException):
class ZmqRandomPortSocket(ZmqSocket):
def __init__(self, conf, context, socket_type):
super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type)
def __init__(self, conf, context, socket_type, high_watermark=0):
super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type,
high_watermark)
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
try:

View File

@ -80,6 +80,7 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
'rpc_response_timeout': 5,
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'use_pub_sub': False,
'use_router_proxy': False,
'rpc_zmq_matchmaker': 'dummy'}
self.config(**kwargs)