Local Fanout implementation

Fanout unit-test passes now
No matchmaker used yet (multi-host fanout wouldn't work)

Change-Id: I9362adab4f7c7eba8120b51efe1b8c2056df3bbe
This commit is contained in:
Oleksii Zamiatin
2015-07-02 18:11:42 +03:00
parent 76f44879e1
commit 7df65f2937
15 changed files with 318 additions and 64 deletions

View File

@@ -13,15 +13,30 @@
# under the License.
import abc
import logging
import six
from oslo_messaging._drivers.common import RPCException
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta)
class BaseProxy(object):
"""Base TCP-proxy.
TCP-proxy redirects messages received by TCP from clients to servers
over IPC. Consists of TCP-frontend and IPC-backend objects. Runs
in async executor.
"""
def __init__(self, conf, context):
super(BaseProxy, self).__init__()
self.conf = conf
@@ -30,7 +45,7 @@ class BaseProxy(object):
@abc.abstractmethod
def run(self):
"Main execution point of the proxy"
"""Main execution point of the proxy"""
def start(self):
self.executor.execute()
@@ -45,10 +60,47 @@ class BaseProxy(object):
@six.add_metaclass(abc.ABCMeta)
class BaseTcpFrontend(object):
def __init__(self, conf, poller, context):
"""Base frontend clause.
TCP-frontend is a part of TCP-proxy which receives incoming
messages from clients.
"""
def __init__(self, conf, poller, context,
socket_type=None,
port_number=None,
receive_meth=None):
"""Construct a TCP-frontend.
Its attributes are:
:param conf: Driver configuration object.
:type conf: ConfigOpts
:param poller: Messages poller-object green or threading.
:type poller: ZmqPoller
:param context: ZeroMQ context object.
:type context: zmq.Context
:param socket_type: ZeroMQ socket type.
:type socket_type: int
:param port_number: Current messaging pipeline port.
:type port_number: int
"""
self.conf = conf
self.poller = poller
self.context = context
try:
self.frontend = self.context.socket(socket_type)
bind_address = zmq_topic.get_tcp_bind_address(port_number)
LOG.info(_LI("Binding to TCP %s") % bind_address)
self.frontend.bind(bind_address)
self.poller.register(self.frontend, receive_meth)
except zmq.ZMQError as e:
errmsg = _LE("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use: %s") % str(e)
LOG.error(errmsg)
raise RPCException(errmsg)
def receive_incoming(self):
message, socket = self.poller.poll(1)
@@ -64,6 +116,14 @@ class BaseBackendMatcher(object):
self.backends = {}
self.poller = poller
@abc.abstractmethod
def redirect_to_backend(self, message):
"""Redirect message"""
@six.add_metaclass(abc.ABCMeta)
class DirectBackendMatcher(BaseBackendMatcher):
def redirect_to_backend(self, message):
backend, topic = self._match_backend(message)
self._send_message(backend, message, topic)
@@ -77,16 +137,16 @@ class BaseBackendMatcher(object):
@abc.abstractmethod
def _get_topic(self, message):
"Extract topic from message"
"""Extract topic from message"""
@abc.abstractmethod
def _get_ipc_address(self, topic):
"Get ipc backend address from topic"
"""Get ipc backend address from topic"""
@abc.abstractmethod
def _send_message(self, backend, message, topic):
"Backend specific sending logic"
"""Backend specific sending logic"""
@abc.abstractmethod
def _create_backend(self, ipc_address):
"Backend specific socket opening logic"
"""Backend specific socket opening logic"""

View File

@@ -17,7 +17,7 @@ import os
from oslo_utils import excutils
from oslo_messaging._drivers.zmq_driver.broker.zmq_call_proxy import CallProxy
from oslo_messaging._drivers.zmq_driver.broker import zmq_universal_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LE, _LI
@@ -44,7 +44,8 @@ class ZmqBroker(object):
super(ZmqBroker, self).__init__()
self.conf = conf
self.context = zmq.Context()
self.proxies = [CallProxy(conf, self.context)]
proxy = zmq_universal_proxy.UniversalProxy(conf, self.context)
self.proxies = [proxy]
self._create_ipc_dirs()
def _create_ipc_dirs(self):

View File

@@ -31,7 +31,7 @@ class CallProxy(base_proxy.BaseProxy):
def __init__(self, conf, context):
super(CallProxy, self).__init__(conf, context)
self.tcp_frontend = FrontendTcpRouter(self.conf, context)
self.backend_matcher = CallBackendMatcher(self.conf, context)
self.backend_matcher = DealerBackend(self.conf, context)
LOG.info(_LI("Starting call proxy thread"))
def run(self):
@@ -44,12 +44,12 @@ class CallProxy(base_proxy.BaseProxy):
self.tcp_frontend.redirect_outgoing_reply(reply)
class CallBackendMatcher(base_proxy.BaseBackendMatcher):
class DealerBackend(base_proxy.DirectBackendMatcher):
def __init__(self, conf, context):
super(CallBackendMatcher, self).__init__(conf,
zmq_async.get_poller(),
context)
super(DealerBackend, self).__init__(conf,
zmq_async.get_poller(),
context)
self.backend = self.context.socket(zmq.DEALER)
self.poller.register(self.backend)
@@ -80,19 +80,9 @@ class FrontendTcpRouter(base_proxy.BaseTcpFrontend):
def __init__(self, conf, context):
super(FrontendTcpRouter, self).__init__(conf,
zmq_async.get_poller(),
context)
try:
self.frontend = self.context.socket(zmq.ROUTER)
bind_address = zmq_topic.get_tcp_bind_address(conf.rpc_zmq_port)
LOG.info(_LI("Binding to TCP ROUTER %s") % bind_address)
self.frontend.bind(bind_address)
self.poller.register(self.frontend)
except zmq.ZMQError:
errmsg = _LE("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use.")
LOG.error(errmsg)
raise RPCException(errmsg)
context,
socket_type=zmq.ROUTER,
port_number=conf.rpc_zmq_port)
@staticmethod
def _reduce_empty(reply):

View File

@@ -0,0 +1,35 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_topic
zmq = zmq_async.import_zmq()
class PublisherBackend(base_proxy.BaseBackendMatcher):
def __init__(self, conf, context):
super(PublisherBackend, self).__init__(conf,
zmq_async.get_poller(),
context)
self.backend = self.context.socket(zmq.PUB)
self.backend.bind(zmq_topic.get_ipc_address_fanout(conf))
def redirect_to_backend(self, message):
topic_pos = zmq_serializer.MESSAGE_CALL_TOPIC_POSITION
self.backend.send_multipart(message[topic_pos:])

View File

@@ -0,0 +1,58 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy
from oslo_messaging._drivers.zmq_driver.broker import zmq_call_proxy
from oslo_messaging._drivers.zmq_driver.broker import zmq_fanout_proxy
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._i18n import _LI
LOG = logging.getLogger(__name__)
class UniversalProxy(base_proxy.BaseProxy):
def __init__(self, conf, context):
super(UniversalProxy, self).__init__(conf, context)
self.tcp_frontend = zmq_call_proxy.FrontendTcpRouter(conf, context)
self.backend_matcher = BackendMatcher(conf, context)
call = zmq_serializer.CALL_TYPE
self.call_backend = self.backend_matcher.backends[call]
LOG.info(_LI("Starting universal-proxy thread"))
def run(self):
message = self.tcp_frontend.receive_incoming()
if message is not None:
self.backend_matcher.redirect_to_backend(message)
reply, socket = self.call_backend.receive_outgoing_reply()
if reply is not None:
self.tcp_frontend.redirect_outgoing_reply(reply)
class BackendMatcher(base_proxy.BaseBackendMatcher):
def __init__(self, conf, context):
super(BackendMatcher, self).__init__(conf, None, context)
direct_backend = zmq_call_proxy.DealerBackend(conf, context)
self.backends[zmq_serializer.CALL_TYPE] = direct_backend
self.backends[zmq_serializer.CAST_TYPE] = direct_backend
fanout_backend = zmq_fanout_proxy.PublisherBackend(conf, context)
self.backends[zmq_serializer.FANOUT_TYPE] = fanout_backend
def redirect_to_backend(self, message):
message_type = zmq_serializer.get_msg_type(message)
self.backends[message_type].redirect_to_backend(message)

View File

@@ -16,6 +16,7 @@ import logging
from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LE, _LI
@@ -33,7 +34,9 @@ class CallRequest(Request):
socket = self.zmq_context.socket(zmq.REQ)
super(CallRequest, self).__init__(conf, target, context,
message, socket, timeout, retry)
message, socket,
zmq_serializer.CALL_TYPE,
timeout, retry)
self.connect_address = zmq_topic.get_tcp_address_call(conf,
self.topic)

View File

@@ -17,6 +17,7 @@ import logging
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_publisher
from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LE, _LI
@@ -30,8 +31,11 @@ class CastRequest(Request):
def __init__(self, conf, target, context,
message, socket, address, timeout=None, retry=None):
self.connect_address = address
fanout_type = zmq_serializer.FANOUT_TYPE
cast_type = zmq_serializer.CAST_TYPE
msg_type = fanout_type if target.fanout else cast_type
super(CastRequest, self).__init__(conf, target, context, message,
socket, timeout, retry)
socket, msg_type, timeout, retry)
def __call__(self, *args, **kwargs):
self.send_request()

View File

@@ -20,6 +20,7 @@ import uuid
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LE
@@ -32,7 +33,9 @@ zmq = zmq_async.import_zmq()
class Request(object):
def __init__(self, conf, target, context, message,
socket, timeout=None, retry=None):
socket, msg_type, timeout=None, retry=None):
assert msg_type in zmq_serializer.MESSAGE_TYPES, "Unknown msg type!"
if message['method'] is None:
errmsg = _LE("No method specified for RPC call")
@@ -40,6 +43,7 @@ class Request(object):
raise KeyError(errmsg)
self.msg_id = uuid.uuid4().hex
self.msg_type = msg_type
self.target = target
self.context = context
self.message = message
@@ -62,6 +66,7 @@ class Request(object):
return False
def send_request(self):
self.socket.send_string(self.msg_type, zmq.SNDMORE)
self.socket.send_string(str(self.topic), zmq.SNDMORE)
self.socket.send_string(self.msg_id, zmq.SNDMORE)
self.socket.send_json(self.context, zmq.SNDMORE)

View File

@@ -56,13 +56,19 @@ class ZmqIncomingRequest(base.IncomingMessage):
class CallResponder(zmq_base_consumer.ConsumerBase):
def __init__(self, listener, conf, poller, context):
super(CallResponder, self).__init__(listener, conf, poller, context)
def poll(self, timeout=None):
def _receive_message(self, socket):
try:
incoming, socket = self.poller.poll(timeout)
reply_id, context, message = incoming
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty separator expected'
msg_type = socket.recv_string()
assert msg_type is not None, 'Bad format: msg type expected'
topic = socket.recv_string()
assert topic is not None, 'Bad format: topic string expected'
msg_id = socket.recv_string()
assert msg_id is not None, 'Bad format: message ID expected'
context = socket.recv_json()
message = socket.recv_json()
LOG.debug("[Server] REP Received message %s" % str(message))
incoming = ZmqIncomingRequest(self.listener,
context,
@@ -70,27 +76,13 @@ class CallResponder(zmq_base_consumer.ConsumerBase):
reply_id,
self.poller)
return incoming
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed ... {}"), e)
def listen(self, target):
def _receive_message(socket):
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty separator expected'
topic = socket.recv_string()
assert topic is not None, 'Bad format: topic string expected'
msg_id = socket.recv_string()
assert msg_id is not None, 'Bad format: message ID expected'
context = socket.recv_json()
message = socket.recv_json()
return (reply_id, context, message)
topic = topic_utils.Topic.from_target(self.conf, target)
ipc_rep_address = topic_utils.get_ipc_address_call(self.conf, topic)
rep_socket = self.context.socket(zmq.REP)
rep_socket.bind(ipc_rep_address)
self.sockets_per_topic[str(topic)] = rep_socket
self.poller.register(rep_socket, _receive_message)
self.poller.register(rep_socket, self._receive_message)

View File

@@ -0,0 +1,74 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import six
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_topic as topic_utils
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ZmqFanoutMessage(base.IncomingMessage):
def __init__(self, listener, context, message, socket, poller):
super(ZmqFanoutMessage, self).__init__(listener, context, message)
poller.resume_polling(socket)
def reply(self, reply=None, failure=None, log_failure=True):
"""Reply is not needed for fanout(cast) messages"""
def acknowledge(self):
pass
def requeue(self):
pass
class FanoutConsumer(zmq_base_consumer.ConsumerBase):
def _receive_message(self, socket):
try:
topic = socket.recv_string()
assert topic is not None, 'Bad format: Topic is expected'
msg_id = socket.recv_string()
assert msg_id is not None, 'Bad format: message ID expected'
context = socket.recv_json()
message = socket.recv_json()
LOG.debug("[Server] REP Received message %s" % str(message))
incoming = ZmqFanoutMessage(self.listener, context, message,
socket, self.poller)
return incoming
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed ... {}"), e)
def listen(self, target):
topic = topic_utils.Topic.from_target(self.conf, target)
ipc_address = topic_utils.get_ipc_address_fanout(self.conf)
sub_socket = self.context.socket(zmq.SUB)
sub_socket.connect(ipc_address)
if six.PY3:
sub_socket.setsockopt_string(zmq.SUBSCRIBE, str(topic))
else:
sub_socket.setsockopt(zmq.SUBSCRIBE, str(topic))
self.poller.register(sub_socket, self._receive_message)

View File

@@ -16,6 +16,7 @@ import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_call_responder
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_fanout_consumer
from oslo_messaging._drivers.zmq_driver import zmq_async
LOG = logging.getLogger(__name__)
@@ -29,14 +30,17 @@ class ZmqServer(base.Listener):
LOG.info("[Server] __init__")
self.conf = conf
self.context = zmq.Context()
poller = zmq_async.get_reply_poller()
self.call_responder = zmq_call_responder.CallResponder(self, conf,
poller,
self.context)
self.poller = zmq_async.get_reply_poller()
self.call_resp = zmq_call_responder.CallResponder(self, conf,
self.poller,
self.context)
self.fanout_resp = zmq_fanout_consumer.FanoutConsumer(self, conf,
self.poller,
self.context)
def poll(self, timeout=None):
incoming = self.call_responder.poll(timeout)
return incoming
incoming = self.poller.poll(timeout)
return incoming[0]
def stop(self):
LOG.info("[Server] Stop")
@@ -46,4 +50,7 @@ class ZmqServer(base.Listener):
def listen(self, target):
LOG.info("[Server] Listen to Target %s" % target)
self.call_responder.listen(target)
if target.fanout:
self.fanout_resp.listen(target)
else:
self.call_resp.listen(target)

View File

@@ -23,7 +23,26 @@ from oslo_messaging._i18n import _LE, _LW
LOG = logging.getLogger(__name__)
MESSAGE_CALL_TOPIC_POSITION = 2
MESSAGE_CALL_TYPE_POSITION = 2
MESSAGE_CALL_TOPIC_POSITION = 3
CALL_TYPE = 'call'
CAST_TYPE = 'cast'
FANOUT_TYPE = 'fanout'
NOTIFY_TYPE = 'notify'
MESSAGE_TYPES = (CALL_TYPE, CAST_TYPE, FANOUT_TYPE, NOTIFY_TYPE)
def get_msg_type(message):
type = message[MESSAGE_CALL_TYPE_POSITION]
if six.PY3:
type = type.decode('utf-8')
if type not in MESSAGE_TYPES:
errmsg = _LE("Unknown message type: %s") % str(type)
LOG.error(errmsg)
rpc_common.RPCException(errmsg)
return type
def _get_topic_from_msg(message, position):
@@ -46,7 +65,7 @@ def _get_topic_from_msg(message, position):
except Exception as e:
errmsg = _LE("Failed topic string parsing, %s") % str(e)
LOG.error(errmsg)
rpc_common.RPCException(errmsg)
raise rpc_common.RPCException(errmsg)
return topic_items[0], topic_items[1]

View File

@@ -29,6 +29,10 @@ def get_ipc_address_cast(conf, topic):
return "ipc://%s/fanout/%s" % (conf.rpc_zmq_ipc_dir, str(topic))
def get_ipc_address_fanout(conf):
return "ipc://%s/fanout_general" % conf.rpc_zmq_ipc_dir
class Topic(object):
def __init__(self, conf, topic, server=None, fanout=False):
@@ -58,4 +62,4 @@ class Topic(object):
return self._topic if self._topic else ""
def __str__(self, *args, **kwargs):
return "%s.%s" % (self.topic, self.server)
return u"%s.%s" % (self.topic, self.server)

View File

@@ -165,20 +165,21 @@ class TestZmqBasics(ZmqBaseTestCase):
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
@testtools.skip("Not implemented feature")
def test_send_fanout(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
self.driver.listen(target)
self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait()
self.assertIsNone(result)
self.assertEqual(True, self.listener._received.isSet())
msg_pattern = "{'method': 'hello-world', 'tx_id': 1}"
self.assertEqual(msg_pattern, self.listener.message)
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
def test_send_receive_direct(self):
"""Call() without topic."""

View File

@@ -19,6 +19,7 @@ import six
import testscenarios
from oslo import messaging
from oslo_messaging._drivers import common as exceptions
from oslo_messaging.tests import utils as test_utils
from oslo_serialization import jsonutils