Merge "Non-blocking outgoing queue was implemented"

This commit is contained in:
Jenkins 2015-09-30 16:11:29 +00:00 committed by Gerrit Code Review
commit 3a5db723aa
19 changed files with 332 additions and 37 deletions

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,42 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging
import sys
from oslo_config import cfg
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver.broker import zmq_broker
from oslo_messaging._executors import impl_pooledexecutor
CONF = cfg.CONF
CONF.register_opts(impl_zmq.zmq_opts)
CONF.register_opts(impl_pooledexecutor._pool_opts)
# TODO(ozamiatin): Move this option assignment to an external config file
# Use efficient zmq poller in real-world deployment
CONF.rpc_zmq_native = True
def main():
CONF(sys.argv[1:], project='oslo')
logging.basicConfig(level=logging.DEBUG)
with contextlib.closing(zmq_broker.ZmqBroker(CONF)) as reactor:
reactor.start()
reactor.wait()
if __name__ == "__main__":
main()

View File

@ -80,6 +80,10 @@ zmq_opts = [
default=1,
help='The default number of seconds that poll should wait. '
'Poll raises timeout exception when timeout expired.'),
cfg.BoolOpt('zmq_use_broker',
default=True,
help='Shows whether zmq-messaging uses broker or not.')
]

View File

@ -0,0 +1,53 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class BaseProxy(object):
"""Base TCP-proxy.
TCP-proxy redirects messages received by TCP from clients to servers
over IPC. Consists of TCP-frontend and IPC-backend objects. Runs
in async executor.
"""
def __init__(self, conf, context):
super(BaseProxy, self).__init__()
self.conf = conf
self.context = context
self.executor = zmq_async.get_executor(self.run,
zmq_concurrency='native')
@abc.abstractmethod
def run(self):
"""Main execution point of the proxy"""
def start(self):
self.executor.execute()
def stop(self):
self.executor.stop()
def wait(self):
self.executor.wait()

View File

@ -0,0 +1,82 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
from oslo_utils import excutils
import six
from stevedore import driver
import zmq
from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
class ZmqBroker(object):
"""Local messaging IPC broker (nodes are still peers).
The main purpose is to have native zeromq application.
Benefits of such approach are following:
1. No risk to block the main thread of the process by unpatched
native parts of the libzmq (c-library is completely monkey-patch
unfriendly)
2. Making use of standard zmq approaches as async pollers,
devices, queues etc.
3. Possibility to implement queue persistence not touching existing
clients (staying in a separate process).
"""
def __init__(self, conf):
super(ZmqBroker, self).__init__()
self.conf = conf
self._create_ipc_dirs()
self.matchmaker = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
self.conf.rpc_zmq_matchmaker,
).driver(self.conf)
self.context = zmq.Context()
self.queue = six.moves.queue.Queue()
self.proxies = [zmq_queue_proxy.OutgoingQueueProxy(
conf, self.context, self.queue, self.matchmaker),
zmq_queue_proxy.IncomingQueueProxy(
conf, self.context, self.queue)
]
def _create_ipc_dirs(self):
ipc_dir = self.conf.rpc_zmq_ipc_dir
try:
os.makedirs("%s/fanout" % ipc_dir)
except os.error:
if not os.path.isdir(ipc_dir):
with excutils.save_and_reraise_exception():
LOG.error(_LE("Required IPC directory does not exist at"
" %s"), ipc_dir)
def start(self):
for proxy in self.proxies:
proxy.start()
def wait(self):
for proxy in self.proxies:
proxy.wait()
def close(self):
LOG.info(_LI("Broker shutting down ..."))
for proxy in self.proxies:
proxy.stop()

View File

@ -0,0 +1,78 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import six
import zmq
from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LI
LOG = logging.getLogger(__name__)
class OutgoingQueueProxy(zmq_base_proxy.BaseProxy):
def __init__(self, conf, context, queue, matchmaker):
super(OutgoingQueueProxy, self).__init__(conf, context)
self.queue = queue
self.matchmaker = matchmaker
self.publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
LOG.info(_LI("Polling at outgoing proxy ..."))
def run(self):
try:
request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
LOG.info(_LI("Redirecting request %s to TCP publisher ...")
% request)
self.publisher.send_request(request)
except six.moves.queue.Empty:
return
class IncomingQueueProxy(zmq_base_proxy.BaseProxy):
def __init__(self, conf, context, queue):
super(IncomingQueueProxy, self).__init__(conf, context)
self.poller = zmq_async.get_poller(
zmq_concurrency='native')
self.queue = queue
self.socket = context.socket(zmq.ROUTER)
self.socket.bind(zmq_address.get_broker_address(conf))
self.poller.register(self.socket, self.receive_request)
LOG.info(_LI("Polling at incoming proxy ..."))
def run(self):
request, socket = self.poller.poll(self.conf.rpc_poll_timeout)
if request is None:
return
LOG.info(_LI("Received request and queue it: %s") % str(request))
self.queue.put(request)
def receive_request(self, socket):
reply_id = socket.recv()
assert reply_id is not None, "Valid id expected"
empty = socket.recv()
assert empty == b'', "Empty delimiter expected"
return socket.recv_pyobj()

View File

@ -13,7 +13,6 @@
# under the License.
import logging
import uuid
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
@ -58,13 +57,8 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
def _send_request(self, socket, request):
message_id = str(uuid.uuid1())
socket.send(b'', zmq.SNDMORE)
socket.send_string(request.msg_type, zmq.SNDMORE)
socket.send_string(message_id, zmq.SNDMORE)
socket.send_pyobj(request.context, zmq.SNDMORE)
socket.send_pyobj(request.message)
socket.send_pyobj(request)
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
% {"message": request.message,
@ -75,6 +69,26 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
super(DealerPublisher, self).cleanup()
class DealerPublisherLight(zmq_publisher_base.PublisherBase):
def __init__(self, conf, address):
super(DealerPublisherLight, self).__init__(conf)
self.socket = self.zmq_context.socket(zmq.DEALER)
self.socket.connect(address)
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
self.socket.send(b'', zmq.SNDMORE)
self.socket.send_pyobj(request)
def cleanup(self):
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
class AcknowledgementReceiver(object):
def __init__(self):

View File

@ -56,7 +56,7 @@ class PublisherBase(object):
Publisher can send request objects from zmq_request.
"""
def __init__(self, conf, matchmaker):
def __init__(self, conf):
"""Construct publisher
@ -65,13 +65,10 @@ class PublisherBase(object):
:param conf: configuration object
:type conf: oslo_config.CONF
:param matchmaker: Name Service interface object
:type matchmaker: matchmaker.MatchMakerBase
"""
self.conf = conf
self.zmq_context = zmq.Context()
self.matchmaker = matchmaker
self.outbound_sockets = {}
super(PublisherBase, self).__init__()
@ -92,9 +89,7 @@ class PublisherBase(object):
:param request: Message data and destination container object
:type request: zmq_request.Request
"""
socket.send_string(request.msg_type, zmq.SNDMORE)
socket.send_pyobj(request.context, zmq.SNDMORE)
socket.send_pyobj(request.message)
socket.send_pyobj(request)
def cleanup(self):
"""Cleanup publisher. Close allocated connections."""
@ -106,8 +101,19 @@ class PublisherBase(object):
class PublisherMultisend(PublisherBase):
def __init__(self, conf, matchmaker, socket_type):
"""Construct publisher multi-send
Base class for fanout-sending publishers.
:param conf: configuration object
:type conf: oslo_config.CONF
:param matchmaker: Name Service interface object
:type matchmaker: matchmaker.MatchMakerBase
"""
super(PublisherMultisend, self).__init__(conf)
self.socket_type = socket_type
super(PublisherMultisend, self).__init__(conf, matchmaker)
self.matchmaker = matchmaker
def _check_hosts_connections(self, target):
# TODO(ozamiatin): Place for significant optimization
@ -126,6 +132,7 @@ class PublisherMultisend(PublisherBase):
def _connect_to_host(self, socket, host, target):
address = zmq_address.get_tcp_direct_address(host)
LOG.info(address)
stype = zmq_names.socket_type_str(self.socket_type)
try:
LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s")

View File

@ -31,6 +31,10 @@ zmq = zmq_async.import_zmq()
class ReqPublisher(zmq_publisher_base.PublisherBase):
def __init__(self, conf, matchmaker):
super(ReqPublisher, self).__init__(conf)
self.matchmaker = matchmaker
def send_request(self, request):
if request.msg_type != zmq_names.CALL_TYPE:

View File

@ -19,6 +19,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_req_publisher
from oslo_messaging._drivers.zmq_driver.client import zmq_request
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
zmq = zmq_async.import_zmq()
@ -31,8 +32,14 @@ class ZmqClient(object):
self.context = zmq.Context()
self.matchmaker = matchmaker
self.allowed_remote_exmods = allowed_remote_exmods or []
self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
self.dealer_publisher = None
if self.conf.zmq_use_broker:
self.dealer_publisher = zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(self.conf))
else:
self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
def send_call(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CallRequest(

View File

@ -14,6 +14,7 @@
import abc
import logging
import uuid
import six
@ -61,6 +62,7 @@ class Request(object):
self.context = context
self.message = message
self.retry = retry
self.message_id = str(uuid.uuid1())
@abc.abstractproperty
def msg_type(self):

View File

@ -41,7 +41,7 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
self.poller.register(socket, zmq.POLLIN)
def poll(self, timeout=None):
timeout = timeout * 1000 # zmq poller waits milliseconds
timeout *= 1000 # zmq poller waits milliseconds
sockets = None
try:

View File

@ -81,29 +81,22 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
msg_type = socket.recv_string()
assert msg_type is not None, 'Bad format: msg type expected'
request = socket.recv_pyobj()
msg_id = None
if msg_type != zmq_names.CALL_TYPE:
msg_id = socket.recv_string()
context = socket.recv_pyobj()
message = socket.recv_pyobj()
LOG.info(_LI("Received %(msg_type)s message %(msg)s")
% {"msg_type": msg_type,
"msg": str(message)})
% {"msg_type": request.msg_type,
"msg": str(request.message)})
if msg_type == zmq_names.CALL_TYPE:
if request.msg_type == zmq_names.CALL_TYPE:
return zmq_incoming_message.ZmqIncomingRequest(
self.server, context, message, socket, reply_id,
self.poller)
elif msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
self.server, request.context, request.message, socket,
reply_id, self.poller)
elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
return RouterIncomingMessage(
self.server, context, message, socket, reply_id,
msg_id, self.poller)
self.server, request.context, request.message, socket,
reply_id, request.message_id, self.poller)
else:
LOG.error(_LE("Unknown message type: %s") % msg_type)
LOG.error(_LE("Unknown message type: %s") % request.msg_type)
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed: %s") % str(e))

View File

@ -18,8 +18,12 @@ def combine_address(host, port):
def get_tcp_direct_address(host):
return "tcp://%s" % (host)
return "tcp://%s" % str(host)
def get_tcp_random_address(conf):
return "tcp://*"
def get_broker_address(conf):
return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir

View File

@ -47,6 +47,7 @@ MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_FANOUT_TYPE)
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE)
CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE)
NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES
def socket_type_str(socket_type):

View File

@ -77,6 +77,7 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'zmq_use_broker': False,
'rpc_zmq_matchmaker': 'dummy'}
self.config(**kwargs)

View File

@ -22,4 +22,6 @@ EOF
redis-server --port $ZMQ_REDIS_PORT &
oslo-messaging-zmq-broker --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-broker.log 2>&1 &
$*

View File

@ -22,7 +22,7 @@ packages =
[entry_points]
console_scripts =
oslo-messaging-zmq-receiver = oslo_messaging._cmd.zmq_receiver:main
oslo-messaging-zmq-broker = oslo_messaging._cmd.zmq_broker:main
oslo.messaging.drivers =
rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver