[zmq] Refactor publishers

This patch refactors publishers by separating responsibilities and
introducing senders and waiters within publishers.

Change-Id: I90df59d61af2b40b516a5151c67c184fcc91e366
Co-Authored-By: Oleksii Zamiatin <ozamiatin@mirantis.com>
This commit is contained in:
Gevorg Davoian 2016-06-22 19:09:32 +03:00
parent 58ad758dc2
commit ac484f6b26
19 changed files with 638 additions and 763 deletions

View File

@ -1,106 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from concurrent import futures
import futurist
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_reply_waiter
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class DealerCallPublisher(object):
"""Thread-safe CALL publisher
Used as faster and thread-safe publisher for CALL
instead of ReqPublisher.
"""
def __init__(self, conf, matchmaker, sockets_manager, sender=None,
reply_waiter=None):
super(DealerCallPublisher, self).__init__()
self.conf = conf
self.matchmaker = matchmaker
self.reply_waiter = reply_waiter or zmq_reply_waiter.ReplyWaiter(conf)
self.sockets_manager = sockets_manager
self.sender = sender or CallSender(self.sockets_manager,
self.reply_waiter)
def send_request(self, request):
reply_future = self.sender.send_request(request)
try:
reply = reply_future.result(timeout=request.timeout)
LOG.debug("Received reply %s", request.message_id)
except AssertionError:
LOG.error(_LE("Message format error in reply %s"),
request.message_id)
return None
except futures.TimeoutError:
raise oslo_messaging.MessagingTimeout(
"Timeout %(tout)s seconds was reached for message %(id)s" %
{"tout": request.timeout,
"id": request.message_id})
finally:
self.reply_waiter.untrack_id(request.message_id)
if reply.failure:
raise rpc_common.deserialize_remote_exception(
reply.failure,
request.allowed_remote_exmods)
else:
return reply.reply_body
def cleanup(self):
self.reply_waiter.cleanup()
self.sender.cleanup()
class CallSender(zmq_publisher_base.QueuedSender):
def __init__(self, sockets_manager, reply_waiter):
super(CallSender, self).__init__(sockets_manager,
self._do_send_request)
assert reply_waiter, "Valid ReplyWaiter expected!"
self.reply_waiter = reply_waiter
def _do_send_request(self, socket, request):
# DEALER socket specific envelope empty delimiter
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(request)
LOG.debug("Sent message_id %(message)s to a target %(target)s",
{"message": request.message_id,
"target": request.target})
def send_request(self, request):
reply_future = futurist.Future()
self.reply_waiter.track_reply(reply_future, request.message_id)
self.queue.put(request)
return reply_future
def _connect_socket(self, target):
socket = self.outbound_sockets.get_socket(target)
self.reply_waiter.poll_socket(socket)
return socket

View File

@ -12,78 +12,98 @@
# License for the specific language governing permissions and limitations
# under the License.
from concurrent import futures
import logging
from oslo_messaging._drivers.zmq_driver.client.publishers\
import retrying
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class DealerPublisher(zmq_publisher_base.QueuedSender):
def __init__(self, conf, matchmaker):
def _send_message_data(socket, request):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(request)
LOG.debug("Sent message_id %(message)s to a target %(target)s",
{"message": request.message_id,
"target": request.target})
def _do_send_request(socket, request):
if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(socket.connections_count()):
_send_message_data(socket, request)
else:
_send_message_data(socket, request)
sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
super(DealerPublisher, self).__init__(sockets_manager,
_do_send_request)
class DealerPublisher(zmq_publisher_base.PublisherBase):
"""Non-CALL publisher using direct connections."""
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
super(DealerPublisher, self).send_request(request)
raise zmq_publisher_base.UnsupportedSendPattern(
zmq_names.message_type_str(request.msg_type)
)
class DealerPublisherAsync(object):
"""This simplified publisher is to be used with eventlet only.
Eventlet takes care about zmq sockets sharing between green threads
using queued lock.
Use DealerPublisher for other concurrency models.
"""
def __init__(self, conf, matchmaker):
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
@staticmethod
def _send_message_data(socket, request):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(request)
LOG.debug("Sent message_id %(message)s to a target %(target)s",
{"message": request.message_id,
"target": request.target})
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
socket = self.sockets_manager.get_socket(request.target)
try:
socket = self.sockets_manager.get_socket(request.target)
except retrying.RetryError:
return
if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(socket.connections_count()):
self._send_message_data(socket, request)
self.sender.send(socket, request)
else:
self._send_message_data(socket, request)
self.sender.send(socket, request)
class DealerCallPublisher(zmq_publisher_base.PublisherBase):
"""CALL publisher using direct connections."""
def __init__(self, sockets_manager, sender, reply_receiver):
super(DealerCallPublisher, self).__init__(sockets_manager, sender)
self.reply_receiver = reply_receiver
@staticmethod
def _raise_timeout(request):
raise oslo_messaging.MessagingTimeout(
"Timeout %(tout)s seconds was reached for message %(msg_id)s" %
{"tout": request.timeout, "msg_id": request.message_id}
)
def send_request(self, request):
if request.msg_type != zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(
zmq_names.message_type_str(request.msg_type)
)
try:
socket = self._connect_socket(request.target)
except retrying.RetryError:
self._raise_timeout(request)
self.sender.send(socket, request)
self.reply_receiver.register_socket(socket)
return self._recv_reply(request)
def _connect_socket(self, target):
return self.sockets_manager.get_socket(target)
def _recv_reply(self, request):
reply_future, = self.reply_receiver.track_request(request)
try:
_, reply = reply_future.result(timeout=request.timeout)
except AssertionError:
LOG.error(_LE("Message format error in reply for %s"),
request.message_id)
return None
except futures.TimeoutError:
self._raise_timeout(request)
finally:
self.reply_receiver.untrack_request(request)
if reply.failure:
raise rpc_common.deserialize_remote_exception(
reply.failure, request.allowed_remote_exmods
)
else:
return reply.reply_body
def cleanup(self):
self.sockets_manager.cleanup()
self.reply_receiver.stop()
super(DealerCallPublisher, self).cleanup()

View File

@ -13,171 +13,90 @@
# under the License.
import logging
import six
import time
import retrying
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_call_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_reply_waiter
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_updater
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class DealerPublisherProxy(object):
"""Used when publishing to a proxy. """
def __init__(self, conf, matchmaker, socket_to_proxy):
self.conf = conf
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
self.socket = socket_to_proxy
self.routing_table = RoutingTable(conf, matchmaker)
self.connection_updater = PublisherConnectionUpdater(
conf, matchmaker, self.socket)
class DealerPublisherProxy(zmq_publisher_base.PublisherBase):
"""Non-CALL publisher via proxy."""
def __init__(self, sockets_manager, sender):
super(DealerPublisherProxy, self).__init__(sockets_manager, sender)
self.socket = sockets_manager.get_socket_to_publishers()
self.routing_table = zmq_routing_table.RoutingTable(self.conf,
self.matchmaker)
self.connection_updater = \
PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
def _get_routing_keys(self, request):
try:
if request.msg_type in zmq_names.DIRECT_TYPES:
return [self.routing_table.get_routable_host(request.target)]
else:
return \
[zmq_address.target_to_subscribe_filter(request.target)] \
if self.conf.use_pub_sub else \
self.routing_table.get_all_hosts(request.target)
except retrying.RetryError:
return []
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(
request.msg_type)
if self.conf.use_pub_sub:
routing_key = self.routing_table.get_routable_host(request.target) \
if request.msg_type in zmq_names.DIRECT_TYPES else \
zmq_address.target_to_subscribe_filter(request.target)
self._do_send_request(request, routing_key)
else:
routing_keys = \
[self.routing_table.get_routable_host(request.target)] \
if request.msg_type in zmq_names.DIRECT_TYPES else \
self.routing_table.get_all_hosts(request.target)
for routing_key in routing_keys:
self._do_send_request(request, routing_key)
def _do_send_request(self, request, routing_key):
self.socket.send(b'', zmq.SNDMORE)
self.socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
self.socket.send(six.b(routing_key), zmq.SNDMORE)
self.socket.send(six.b(request.message_id), zmq.SNDMORE)
self.socket.send_pyobj(request.context, zmq.SNDMORE)
self.socket.send_pyobj(request.message)
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
"a target %(target)s",
{"message": request.message_id,
"target": request.target,
"addr": list(self.socket.connections)})
zmq_names.message_type_str(request.msg_type)
)
for routing_key in self._get_routing_keys(request):
request.routing_key = routing_key
self.sender.send(self.socket, request)
def cleanup(self):
self.connection_updater.stop()
self.socket.close()
super(DealerPublisherProxy, self).cleanup()
class DealerCallPublisherProxy(zmq_dealer_call_publisher.DealerCallPublisher):
class DealerCallPublisherProxy(zmq_dealer_publisher.DealerCallPublisher):
"""CALL publisher via proxy."""
def __init__(self, conf, matchmaker, sockets_manager):
reply_waiter = ReplyWaiterProxy(conf)
sender = CallSenderProxy(conf, matchmaker, sockets_manager,
reply_waiter)
def __init__(self, sockets_manager, sender, reply_waiter):
super(DealerCallPublisherProxy, self).__init__(
conf, matchmaker, sockets_manager, sender, reply_waiter)
sockets_manager, sender, reply_waiter
)
self.socket = self.sockets_manager.get_socket_to_publishers()
self.routing_table = zmq_routing_table.RoutingTable(self.conf,
self.matchmaker)
self.connection_updater = \
PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
class CallSenderProxy(zmq_dealer_call_publisher.CallSender):
def __init__(self, conf, matchmaker, sockets_manager, reply_waiter):
super(CallSenderProxy, self).__init__(
sockets_manager, reply_waiter)
self.socket = self.outbound_sockets.get_socket_to_publishers()
self.reply_waiter.poll_socket(self.socket)
self.routing_table = RoutingTable(conf, matchmaker)
self.connection_updater = PublisherConnectionUpdater(
conf, matchmaker, self.socket)
def send_request(self, request):
try:
request.routing_key = \
self.routing_table.get_routable_host(request.target)
except retrying.RetryError:
self._raise_timeout(request)
return super(DealerCallPublisherProxy, self).send_request(request)
def _connect_socket(self, target):
return self.socket
def _do_send_request(self, socket, request):
routing_key = self.routing_table.get_routable_host(request.target)
# DEALER socket specific envelope empty delimiter
socket.send(b'', zmq.SNDMORE)
socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
socket.send(six.b(routing_key), zmq.SNDMORE)
socket.send(six.b(request.message_id), zmq.SNDMORE)
socket.send_pyobj(request.context, zmq.SNDMORE)
socket.send_pyobj(request.message)
LOG.debug("Sent message_id %(message)s to a target %(target)s",
{"message": request.message_id,
"target": request.target})
class ReplyWaiterProxy(zmq_reply_waiter.ReplyWaiter):
def receive_method(self, socket):
empty = socket.recv()
assert empty == b'', "Empty expected!"
reply_id = socket.recv()
assert reply_id is not None, "Reply ID expected!"
message_type = int(socket.recv())
assert message_type == zmq_names.REPLY_TYPE, "Reply is expected!"
message_id = socket.recv()
reply = socket.recv_pyobj()
LOG.debug("Received reply %s", message_id)
return reply
class RoutingTable(object):
"""This class implements local routing-table cache
taken from matchmaker. Its purpose is to give the next routable
host id (remote DEALER's id) by request for specific target in
round-robin fashion.
"""
def __init__(self, conf, matchmaker):
self.conf = conf
self.matchmaker = matchmaker
self.routing_table = {}
self.routable_hosts = {}
def get_all_hosts(self, target):
self._update_routing_table(target)
return list(self.routable_hosts.get(str(target)) or [])
def get_routable_host(self, target):
self._update_routing_table(target)
hosts_for_target = self.routable_hosts[str(target)]
host = hosts_for_target.pop(0)
if not hosts_for_target:
self._renew_routable_hosts(target)
return host
def _is_tm_expired(self, tm):
return 0 <= self.conf.zmq_target_expire <= time.time() - tm
def _update_routing_table(self, target):
routing_record = self.routing_table.get(str(target))
if routing_record is None:
self._fetch_hosts(target)
self._renew_routable_hosts(target)
elif self._is_tm_expired(routing_record[1]):
self._fetch_hosts(target)
def _fetch_hosts(self, target):
self.routing_table[str(target)] = (self.matchmaker.get_hosts(
target, zmq_names.socket_type_str(zmq.DEALER)), time.time())
def _renew_routable_hosts(self, target):
hosts, _ = self.routing_table[str(target)]
self.routable_hosts[str(target)] = list(hosts)
def cleanup(self):
self.connection_updater.stop()
super(DealerCallPublisherProxy, self).cleanup()
self.socket.close()
class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater):

View File

@ -1,66 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LW
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ReplyWaiter(object):
def __init__(self, conf):
self.conf = conf
self.replies = {}
self.poller = zmq_async.get_poller()
self.executor = zmq_async.get_executor(self.run_loop)
self.executor.execute()
self._lock = threading.Lock()
def track_reply(self, reply_future, message_id):
with self._lock:
self.replies[message_id] = reply_future
def untrack_id(self, message_id):
with self._lock:
self.replies.pop(message_id)
def poll_socket(self, socket):
self.poller.register(socket, recv_method=self.receive_method)
def receive_method(self, socket):
empty = socket.recv()
assert empty == b'', "Empty expected!"
reply = socket.recv_pyobj()
LOG.debug("Received reply %s", reply.message_id)
return reply
def run_loop(self):
reply, socket = self.poller.poll(
timeout=self.conf.rpc_poll_timeout)
if reply is not None:
call_future = self.replies.get(reply.message_id)
if call_future:
call_future.set_result(reply)
else:
LOG.warning(_LW("Received timed out reply: %s"),
reply.message_id)
def cleanup(self):
self.poller.close()

View File

@ -14,14 +14,11 @@
import abc
import logging
import time
import six
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
@ -56,7 +53,7 @@ class PublisherBase(object):
Publisher can send request objects from zmq_request.
"""
def __init__(self, sockets_manager):
def __init__(self, sockets_manager, sender):
"""Construct publisher
@ -66,10 +63,10 @@ class PublisherBase(object):
:param conf: configuration object
:type conf: oslo_config.CONF
"""
self.outbound_sockets = sockets_manager
self.sockets_manager = sockets_manager
self.conf = sockets_manager.conf
self.matchmaker = sockets_manager.matchmaker
super(PublisherBase, self).__init__()
self.sender = sender
@abc.abstractmethod
def send_request(self, request):
@ -79,126 +76,6 @@ class PublisherBase(object):
:type request: zmq_request.Request
"""
def _send_request(self, socket, request):
"""Send request to consumer.
Helper private method which defines basic sending behavior.
:param socket: Socket to publish message on
:type socket: zmq.Socket
:param request: Message data and destination container object
:type request: zmq_request.Request
"""
LOG.debug("Sending %(type)s message_id %(message)s to a target "
"%(target)s",
{"type": request.msg_type,
"message": request.message_id,
"target": request.target})
socket.send_pyobj(request)
def cleanup(self):
"""Cleanup publisher. Close allocated connections."""
self.outbound_sockets.cleanup()
class SocketsManager(object):
def __init__(self, conf, matchmaker, listener_type, socket_type):
self.conf = conf
self.matchmaker = matchmaker
self.listener_type = listener_type
self.socket_type = socket_type
self.zmq_context = zmq.Context()
self.outbound_sockets = {}
self.socket_to_publishers = None
self.socket_to_routers = None
def get_hosts(self, target):
return self.matchmaker.get_hosts(
target, zmq_names.socket_type_str(self.listener_type))
@staticmethod
def _key_from_target(target):
return target.topic if target.fanout else str(target)
def _get_hosts_and_connect(self, socket, target):
hosts = self.get_hosts(target)
self._connect_to_hosts(socket, target, hosts)
def _track_socket(self, socket, target):
key = self._key_from_target(target)
self.outbound_sockets[key] = (socket, time.time())
def _connect_to_hosts(self, socket, target, hosts):
for host in hosts:
socket.connect_to_host(host)
self._track_socket(socket, target)
def _check_for_new_hosts(self, target):
key = self._key_from_target(target)
socket, tm = self.outbound_sockets[key]
if 0 <= self.conf.zmq_target_expire <= time.time() - tm:
self._get_hosts_and_connect(socket, target)
return socket
def get_socket(self, target):
key = self._key_from_target(target)
if key in self.outbound_sockets:
socket = self._check_for_new_hosts(target)
else:
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
self.socket_type)
self._get_hosts_and_connect(socket, target)
return socket
def get_socket_to_publishers(self):
if self.socket_to_publishers is not None:
return self.socket_to_publishers
self.socket_to_publishers = zmq_socket.ZmqSocket(
self.conf, self.zmq_context, self.socket_type)
publishers = self.matchmaker.get_publishers()
for pub_address, router_address in publishers:
self.socket_to_publishers.connect_to_host(router_address)
return self.socket_to_publishers
def get_socket_to_routers(self):
if self.socket_to_routers is not None:
return self.socket_to_routers
self.socket_to_routers = zmq_socket.ZmqSocket(
self.conf, self.zmq_context, self.socket_type)
routers = self.matchmaker.get_routers()
for router_address in routers:
self.socket_to_routers.connect_to_host(router_address)
return self.socket_to_routers
def cleanup(self):
for socket, tm in self.outbound_sockets.values():
socket.close()
class QueuedSender(PublisherBase):
def __init__(self, sockets_manager, _do_send_request):
super(QueuedSender, self).__init__(sockets_manager)
self._do_send_request = _do_send_request
self.queue, self.empty_except = zmq_async.get_queue()
self.executor = zmq_async.get_executor(self.run_loop)
self.executor.execute()
def send_request(self, request):
self.queue.put(request)
def _connect_socket(self, target):
return self.outbound_sockets.get_socket(target)
def run_loop(self):
try:
request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
except self.empty_except:
return
socket = self._connect_socket(request.target)
self._do_send_request(socket, request)
def cleanup(self):
self.executor.stop()
super(QueuedSender, self).cleanup()
self.sockets_manager.cleanup()

View File

@ -1,52 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class PushPublisher(object):
def __init__(self, conf, matchmaker):
super(PushPublisher, self).__init__()
sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.PULL, zmq.PUSH)
def _do_send_request(push_socket, request):
push_socket.send_pyobj(request)
LOG.debug("Sending message_id %(message)s to a target %(target)s",
{"message": request.message_id,
"target": request.target})
self.sender = zmq_publisher_base.QueuedSender(
sockets_manager, _do_send_request)
def send_request(self, request):
if request.msg_type != zmq_names.CAST_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
self.sender.send_request(request)
def cleanup(self):
self.sender.cleanup()

View File

@ -14,15 +14,14 @@
from oslo_messaging._drivers import common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_call_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@ -46,25 +45,34 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
if conf.use_router_proxy or not conf.use_pub_sub:
raise WrongClientException()
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
self.sockets_manager = zmq_sockets_manager.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER
)
sender_proxy = zmq_senders.RequestSenderProxy(conf)
sender_direct = zmq_senders.RequestSenderDirect(conf)
receiver_direct = zmq_receivers.ReplyReceiverDirect(conf)
fanout_publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
conf, matchmaker, self.sockets_manager.get_socket_to_publishers())
self.sockets_manager, sender_proxy
)
super(ZmqClientMixDirectPubSub, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
zmq_names.CALL_TYPE:
zmq_dealer_call_publisher.DealerCallPublisher(
conf, matchmaker, self.sockets_manager),
zmq_dealer_publisher.DealerCallPublisher(
self.sockets_manager, sender_direct, receiver_direct
),
zmq_names.CAST_FANOUT_TYPE: fanout_publisher,
zmq_names.NOTIFY_TYPE: fanout_publisher,
"default": zmq_dealer_publisher.DealerPublisherAsync(
conf, matchmaker)
"default":
zmq_dealer_publisher.DealerPublisher(self.sockets_manager,
sender_direct)
}
)
@ -82,18 +90,25 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
if conf.use_pub_sub or conf.use_router_proxy:
raise WrongClientException()
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
self.sockets_manager = zmq_sockets_manager.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER
)
sender = zmq_senders.RequestSenderDirect(conf)
receiver = zmq_receivers.ReplyReceiverDirect(conf)
super(ZmqClientDirect, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
zmq_names.CALL_TYPE:
zmq_dealer_call_publisher.DealerCallPublisher(
conf, matchmaker, self.sockets_manager),
zmq_dealer_publisher.DealerCallPublisher(
self.sockets_manager, sender, receiver
),
"default": zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
"default":
zmq_dealer_publisher.DealerPublisher(self.sockets_manager,
sender)
}
)
@ -113,18 +128,25 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase):
if not conf.use_router_proxy:
raise WrongClientException()
self.sockets_manager = zmq_publisher_base.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
self.sockets_manager = zmq_sockets_manager.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER
)
sender = zmq_senders.RequestSenderProxy(conf)
receiver = zmq_receivers.ReplyReceiverProxy(conf)
super(ZmqClientProxy, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
zmq_names.CALL_TYPE:
zmq_dealer_publisher_proxy.DealerCallPublisherProxy(
conf, matchmaker, self.sockets_manager),
self.sockets_manager, sender, receiver
),
"default": zmq_dealer_publisher_proxy.DealerPublisherProxy(
conf, matchmaker,
self.sockets_manager.get_socket_to_publishers())
"default":
zmq_dealer_publisher_proxy.DealerPublisherProxy(
self.sockets_manager, sender
)
}
)

View File

@ -0,0 +1,140 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import threading
import futurist
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta)
class ReceiverBase(object):
"""Base response receiving interface."""
def __init__(self, conf):
self.conf = conf
self._lock = threading.Lock()
self._requests = {}
self._poller = zmq_async.get_poller()
self._executor = zmq_async.get_executor(method=self._run_loop)
self._executor.execute()
@abc.abstractproperty
def message_types(self):
"""A list of supported incoming response types."""
def register_socket(self, socket):
"""Register a socket for receiving data."""
self._poller.register(socket, recv_method=self.recv_response)
@abc.abstractmethod
def recv_response(self, socket):
"""Receive a response and return a tuple of the form
(reply_id, message_type, message_id, response).
"""
def track_request(self, request):
"""Track a request via already registered sockets and return
a list of futures for monitoring all types of responses.
"""
futures = []
for message_type in self.message_types:
future = futurist.Future()
self._set_future(request.message_id, message_type, future)
futures.append(future)
return futures
def untrack_request(self, request):
"""Untrack a request and stop monitoring any responses."""
for message_type in self.message_types:
self._pop_future(request.message_id, message_type)
def stop(self):
self._poller.close()
self._executor.stop()
def _get_future(self, message_id, message_type):
with self._lock:
return self._requests.get((message_id, message_type))
def _set_future(self, message_id, message_type, future):
with self._lock:
self._requests[(message_id, message_type)] = future
def _pop_future(self, message_id, message_type):
with self._lock:
return self._requests.pop((message_id, message_type), None)
def _run_loop(self):
data, socket = self._poller.poll(timeout=self.conf.rpc_poll_timeout)
if data is None:
return
reply_id, message_type, message_id, response = data
assert message_type in self.message_types, \
"%s is not supported!" % zmq_names.message_type_str(message_type)
future = self._get_future(message_id, message_type)
if future is not None:
LOG.debug("Received %(msg_type)s for %(msg_id)s",
{"msg_type": zmq_names.message_type_str(message_type),
"msg_id": message_id})
future.set_result((reply_id, response))
class AckReceiver(ReceiverBase):
message_types = (zmq_names.ACK_TYPE,)
class ReplyReceiver(ReceiverBase):
message_types = (zmq_names.REPLY_TYPE,)
class ReplyReceiverProxy(ReplyReceiver):
def recv_response(self, socket):
empty = socket.recv()
assert empty == b'', "Empty expected!"
reply_id = socket.recv()
assert reply_id is not None, "Reply ID expected!"
message_type = int(socket.recv())
assert message_type == zmq_names.REPLY_TYPE, "Reply is expected!"
message_id = socket.recv()
reply = socket.recv_pyobj()
LOG.debug("Received reply for %s", message_id)
return reply_id, message_type, message_id, reply
class ReplyReceiverDirect(ReplyReceiver):
def recv_response(self, socket):
empty = socket.recv()
assert empty == b'', "Empty expected!"
reply = socket.recv_pyobj()
LOG.debug("Received reply for %s", reply.message_id)
return reply.reply_id, reply.type_, reply.message_id, reply
class AckAndReplyReceiver(ReceiverBase):
message_types = (zmq_names.ACK_TYPE, zmq_names.REPLY_TYPE)

View File

@ -0,0 +1,65 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
zmq = zmq_async.import_zmq()
class RoutingTable(object):
"""This class implements local routing-table cache
taken from matchmaker. Its purpose is to give the next routable
host id (remote DEALER's id) by request for specific target in
round-robin fashion.
"""
def __init__(self, conf, matchmaker):
self.conf = conf
self.matchmaker = matchmaker
self.routing_table = {}
self.routable_hosts = {}
def get_all_hosts(self, target):
self._update_routing_table(target)
return list(self.routable_hosts.get(str(target)) or [])
def get_routable_host(self, target):
self._update_routing_table(target)
hosts_for_target = self.routable_hosts[str(target)]
host = hosts_for_target.pop(0)
if not hosts_for_target:
self._renew_routable_hosts(target)
return host
def _is_tm_expired(self, tm):
return 0 <= self.conf.zmq_target_expire <= time.time() - tm
def _update_routing_table(self, target):
routing_record = self.routing_table.get(str(target))
if routing_record is None:
self._fetch_hosts(target)
self._renew_routable_hosts(target)
elif self._is_tm_expired(routing_record[1]):
self._fetch_hosts(target)
def _fetch_hosts(self, target):
self.routing_table[str(target)] = (self.matchmaker.get_hosts(
target, zmq_names.socket_type_str(zmq.DEALER)), time.time())
def _renew_routable_hosts(self, target):
hosts, _ = self.routing_table[str(target)]
self.routable_hosts[str(target)] = list(hosts)

View File

@ -0,0 +1,94 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta)
class SenderBase(object):
"""Base request/ack/reply sending interface."""
def __init__(self, conf):
self.conf = conf
@abc.abstractmethod
def send(self, socket, message):
pass
class RequestSenderProxy(SenderBase):
def send(self, socket, request):
socket.send(b'', zmq.SNDMORE)
socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
socket.send(six.b(request.routing_key), zmq.SNDMORE)
socket.send(six.b(request.message_id), zmq.SNDMORE)
socket.send_pyobj(request.context, zmq.SNDMORE)
socket.send_pyobj(request.message)
LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s message "
"%(msg_id)s to target %(target)s",
{"addr": list(socket.connections),
"msg_type": zmq_names.message_type_str(request.msg_type),
"msg_id": request.message_id,
"target": request.target})
class ReplySenderProxy(SenderBase):
def send(self, socket, reply):
LOG.debug("Replying to %s", reply.message_id)
assert reply.type_ == zmq_names.REPLY_TYPE, "Reply expected!"
socket.send(b'', zmq.SNDMORE)
socket.send(six.b(str(reply.type_)), zmq.SNDMORE)
socket.send(reply.reply_id, zmq.SNDMORE)
socket.send(reply.message_id, zmq.SNDMORE)
socket.send_pyobj(reply)
class RequestSenderDirect(SenderBase):
def send(self, socket, request):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(request)
LOG.debug("Sending %(msg_type)s message %(msg_id)s to "
"target %(target)s",
{"msg_type": zmq_names.message_type_str(request.msg_type),
"msg_id": request.message_id,
"target": request.target})
class ReplySenderDirect(SenderBase):
def send(self, socket, reply):
LOG.debug("Replying to %s", reply.message_id)
assert reply.type_ == zmq_names.REPLY_TYPE, "Reply expected!"
socket.send(reply.reply_id, zmq.SNDMORE)
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(reply)

View File

@ -0,0 +1,96 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
zmq = zmq_async.import_zmq()
class SocketsManager(object):
def __init__(self, conf, matchmaker, listener_type, socket_type):
self.conf = conf
self.matchmaker = matchmaker
self.listener_type = listener_type
self.socket_type = socket_type
self.zmq_context = zmq.Context()
self.outbound_sockets = {}
self.socket_to_publishers = None
self.socket_to_routers = None
def get_hosts(self, target):
return self.matchmaker.get_hosts(
target, zmq_names.socket_type_str(self.listener_type))
@staticmethod
def _key_from_target(target):
return target.topic if target.fanout else str(target)
def _get_hosts_and_connect(self, socket, target):
hosts = self.get_hosts(target)
self._connect_to_hosts(socket, target, hosts)
def _track_socket(self, socket, target):
key = self._key_from_target(target)
self.outbound_sockets[key] = (socket, time.time())
def _connect_to_hosts(self, socket, target, hosts):
for host in hosts:
socket.connect_to_host(host)
self._track_socket(socket, target)
def _check_for_new_hosts(self, target):
key = self._key_from_target(target)
socket, tm = self.outbound_sockets[key]
if 0 <= self.conf.zmq_target_expire <= time.time() - tm:
self._get_hosts_and_connect(socket, target)
return socket
def get_socket(self, target):
key = self._key_from_target(target)
if key in self.outbound_sockets:
socket = self._check_for_new_hosts(target)
else:
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
self.socket_type)
self._get_hosts_and_connect(socket, target)
return socket
def get_socket_to_publishers(self):
if self.socket_to_publishers is not None:
return self.socket_to_publishers
self.socket_to_publishers = zmq_socket.ZmqSocket(
self.conf, self.zmq_context, self.socket_type)
publishers = self.matchmaker.get_publishers()
for pub_address, router_address in publishers:
self.socket_to_publishers.connect_to_host(router_address)
return self.socket_to_publishers
def get_socket_to_routers(self):
if self.socket_to_routers is not None:
return self.socket_to_routers
self.socket_to_routers = zmq_socket.ZmqSocket(
self.conf, self.zmq_context, self.socket_type)
routers = self.matchmaker.get_routers()
for router_address in routers:
self.socket_to_routers.connect_to_host(router_address)
return self.socket_to_routers
def cleanup(self):
for socket, tm in self.outbound_sockets.values():
socket.close()

View File

@ -14,15 +14,12 @@
import logging
import six
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver.client import zmq_response
from oslo_messaging._drivers.zmq_driver.server.consumers\
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
from oslo_messaging._drivers.zmq_driver.server.consumers \
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_updater
@ -33,54 +30,11 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class DealerIncomingMessage(base.RpcIncomingMessage):
def __init__(self, context, message):
super(DealerIncomingMessage, self).__init__(context, message)
def reply(self, reply=None, failure=None):
"""Reply is not needed for non-call messages"""
def acknowledge(self):
"""Not sending acknowledge"""
def requeue(self):
"""Requeue is not supported"""
class DealerIncomingRequest(base.RpcIncomingMessage):
def __init__(self, socket, reply_id, message_id, context, message):
super(DealerIncomingRequest, self).__init__(context, message)
self.reply_socket = socket
self.reply_id = reply_id
self.message_id = message_id
def reply(self, reply=None, failure=None):
if failure is not None:
failure = rpc_common.serialize_remote_exception(failure)
response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
message_id=self.message_id,
reply_id=self.reply_id,
reply_body=reply,
failure=failure)
LOG.debug("Replying %s", self.message_id)
self.reply_socket.send(b'', zmq.SNDMORE)
self.reply_socket.send(six.b(str(zmq_names.REPLY_TYPE)), zmq.SNDMORE)
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
self.reply_socket.send(self.message_id, zmq.SNDMORE)
self.reply_socket.send_pyobj(response)
def requeue(self):
"""Requeue is not supported"""
class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
def __init__(self, conf, poller, server):
self.sockets_manager = zmq_publisher_base.SocketsManager(
self.sender = zmq_senders.ReplySenderProxy(conf)
self.sockets_manager = zmq_sockets_manager.SocketsManager(
conf, server.matchmaker, zmq.ROUTER, zmq.DEALER)
self.host = None
super(DealerConsumer, self).__init__(conf, poller, server, zmq.DEALER)
@ -91,6 +45,7 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
def subscribe_socket(self, socket_type):
try:
socket = self.sockets_manager.get_socket_to_routers()
self.sockets.append(socket)
self.host = socket.handle.identity
self.poller.register(socket, self.receive_message)
return socket
@ -110,10 +65,12 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
LOG.debug("[%(host)s] Received message %(id)s",
{"host": self.host, "id": message_id})
if message_type == zmq_names.CALL_TYPE:
return DealerIncomingRequest(
socket, reply_id, message_id, context, message)
return zmq_incoming_message.ZmqIncomingMessage(
context, message, reply_id, message_id, socket, self.sender
)
elif message_type in zmq_names.NON_BLOCKING_TYPES:
return DealerIncomingMessage(context, message)
return zmq_incoming_message.ZmqIncomingMessage(context,
message)
else:
LOG.error(_LE("Unknown message type: %s"),
zmq_names.message_type_str(message_type))
@ -122,6 +79,7 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
def cleanup(self):
LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)
self.connection_updater.cleanup()
super(DealerConsumer, self).cleanup()

View File

@ -1,69 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class PullIncomingMessage(base.RpcIncomingMessage):
def __init__(self, context, message):
super(PullIncomingMessage, self).__init__(context, message)
def reply(self, reply=None, failure=None):
"""Reply is not needed for non-call messages."""
def acknowledge(self):
"""Acknowledgments are not supported by this type of consumer."""
def requeue(self):
"""Requeueing is not supported."""
class PullConsumer(zmq_consumer_base.SingleSocketConsumer):
def __init__(self, conf, poller, server):
super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL)
LOG.info(_LI("[%s] Run PULL consumer"), self.host)
def receive_message(self, socket):
try:
request = socket.recv_pyobj()
msg_type = request.msg_type
assert msg_type is not None, 'Bad format: msg type expected'
context = request.context
message = request.message
LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s",
{"host": self.host,
"type": request.msg_type,
"id": request.message_id,
"target": request.target})
if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
return PullIncomingMessage(context, message)
else:
LOG.error(_LE("Unknown message type: %s"), msg_type)
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))

View File

@ -14,7 +14,7 @@
import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
@ -27,29 +27,10 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class RouterIncomingMessage(base.RpcIncomingMessage):
def __init__(self, context, message, socket, reply_id, msg_id,
poller):
super(RouterIncomingMessage, self).__init__(context, message)
self.socket = socket
self.reply_id = reply_id
self.msg_id = msg_id
self.message = message
def reply(self, reply=None, failure=None):
"""Reply is not needed for non-call messages"""
def acknowledge(self):
LOG.debug("Not sending acknowledge for %s", self.msg_id)
def requeue(self):
"""Requeue is not supported"""
class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
def __init__(self, conf, poller, server):
self.sender = zmq_senders.ReplySenderDirect(conf)
super(RouterConsumer, self).__init__(conf, poller, server, zmq.ROUTER)
LOG.info(_LI("[%s] Run ROUTER consumer"), self.host)
@ -70,14 +51,19 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
"target": request.target})
if request.msg_type == zmq_names.CALL_TYPE:
return zmq_incoming_message.ZmqIncomingRequest(
socket, reply_id, request, self.poller)
return zmq_incoming_message.ZmqIncomingMessage(
request.context, request.message, reply_id,
request.message_id, socket, self.sender
)
elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
return RouterIncomingMessage(
request.context, request.message, socket, reply_id,
request.message_id, self.poller)
return zmq_incoming_message.ZmqIncomingMessage(request.context,
request.message)
else:
LOG.error(_LE("Unknown message type: %s"), request.msg_type)
LOG.error(_LE("Unknown message type: %s"),
zmq_names.message_type_str(request.msg_type))
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))
def cleanup(self):
LOG.info(_LI("[%s] Destroy ROUTER consumer"), self.host)
super(RouterConsumer, self).cleanup()

View File

@ -16,9 +16,9 @@ import logging
import six
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.server.consumers\
from oslo_messaging._drivers.zmq_driver.server.consumers \
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_socket
@ -29,21 +29,6 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class SubIncomingMessage(base.RpcIncomingMessage):
def __init__(self, context, message):
super(SubIncomingMessage, self).__init__(context, message)
def reply(self, reply=None, failure=None):
"""Reply is not needed for non-call messages."""
def acknowledge(self):
"""Requeue is not supported"""
def requeue(self):
"""Requeue is not supported"""
class SubConsumer(zmq_consumer_base.ConsumerBase):
def __init__(self, conf, poller, server):
@ -89,8 +74,7 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
context, message = self._receive_request(socket)
if not message:
return None
return SubIncomingMessage(context, message)
return zmq_incoming_message.ZmqIncomingMessage(context, message)
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))

View File

@ -21,38 +21,41 @@ from oslo_messaging._drivers.zmq_driver.client import zmq_response
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ZmqIncomingRequest(base.RpcIncomingMessage):
class ZmqIncomingMessage(base.RpcIncomingMessage):
def __init__(self, socket, rep_id, request, poller):
super(ZmqIncomingRequest, self).__init__(request.context,
request.message)
self.reply_socket = socket
self.reply_id = rep_id
self.request = request
self.received = None
self.poller = poller
def __init__(self, context, message, reply_id=None, message_id=None,
socket=None, sender=None):
if sender is not None:
assert socket is not None, "Valid socket expected!"
assert message_id is not None, "Valid message ID expected!"
assert reply_id is not None, "Valid reply ID expected!"
super(ZmqIncomingMessage, self).__init__(context, message)
self.reply_id = reply_id
self.message_id = message_id
self.socket = socket
self.sender = sender
def acknowledge(self):
"""Not sending acknowledge"""
def reply(self, reply=None, failure=None):
if failure is not None:
failure = rpc_common.serialize_remote_exception(failure)
response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
message_id=self.request.message_id,
reply_id=self.reply_id,
reply_body=reply,
failure=failure)
LOG.debug("Replying %s", (str(self.request.message_id)))
self.received = True
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
self.reply_socket.send(b'', zmq.SNDMORE)
self.reply_socket.send_pyobj(response)
if self.sender is not None:
if failure is not None:
failure = rpc_common.serialize_remote_exception(failure)
reply = zmq_response.Response(type=zmq_names.REPLY_TYPE,
message_id=self.message_id,
reply_id=self.reply_id,
reply_body=reply,
failure=failure)
self.sender.send(self.socket, reply)
def requeue(self):
"""Requeue is not supported"""

View File

@ -69,8 +69,8 @@ def socket_type_str(socket_type):
def message_type_str(message_type):
msg_type_str = {CALL_TYPE: "CALL",
CAST_TYPE: "CAST",
CAST_FANOUT_TYPE: "CAST_FANOUT_TYPE",
NOTIFY_TYPE: "NOTIFY_TYPE",
REPLY_TYPE: "REPLY_TYPE",
ACK_TYPE: "ACK_TYPE"}
CAST_FANOUT_TYPE: "CAST_FANOUT",
NOTIFY_TYPE: "NOTIFY",
REPLY_TYPE: "REPLY",
ACK_TYPE: "ACK"}
return msg_type_str[message_type]

View File

@ -31,6 +31,8 @@ class UpdaterBase(object):
self.conf = conf
self.matchmaker = matchmaker
self.update_method = update_method
# make first update immediately
self.update_method()
self.executor = zmq_async.get_executor(method=self._update_loop)
self.executor.execute()

View File

@ -300,6 +300,8 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT')
if zmq_redis_port:
self.config(port=zmq_redis_port, group="matchmaker_redis")
self.config(check_timeout=10000, group="matchmaker_redis")
self.config(wait_timeout=1000, group="matchmaker_redis")
zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB')
if zmq_use_pub_sub:
self.config(use_pub_sub=zmq_use_pub_sub)