Initial commit for new zmq driver implementation

- Minimal RPC (CALL + direct CAST) implementation
- Has up and running oslo_messaging/tests/drivers/test_impl_zmq
- Pep8 fixed.
- Works over REQ/REP pipeline according to [1]
- Has a beginning of eventlet/threading behavior differentiation

Fanout and Notifier are not yet supported
Devstack not yet fixed
Functional tests not yet fixed

..[1] - https://review.openstack.org/#/c/171131/

Change-Id: I44cd48070bf7c7f46152fdf0e54664a7dee97de9
This commit is contained in:
Oleksii Zamiatin 2015-06-19 14:29:18 +03:00
parent 76ec03c8f9
commit 73cd49129f
29 changed files with 1490 additions and 1712 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1 @@
__author__ = 'ozamiatin'

View File

@ -0,0 +1,92 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
@six.add_metaclass(abc.ABCMeta)
class BaseProxy(object):
def __init__(self, conf, context):
super(BaseProxy, self).__init__()
self.conf = conf
self.context = context
self.executor = zmq_async.get_executor(self.run)
@abc.abstractmethod
def run(self):
"Main execution point of the proxy"
def start(self):
self.executor.execute()
def stop(self):
self.executor.stop()
def wait(self):
self.executor.wait()
@six.add_metaclass(abc.ABCMeta)
class BaseTcpFrontend(object):
def __init__(self, conf, poller, context):
self.conf = conf
self.poller = poller
self.context = context
def receive_incoming(self):
message, socket = self.poller.poll(1)
return message
@six.add_metaclass(abc.ABCMeta)
class BaseBackendMatcher(object):
def __init__(self, conf, poller, context):
self.conf = conf
self.context = context
self.backends = {}
self.poller = poller
def redirect_to_backend(self, message):
backend, topic = self._match_backend(message)
self._send_message(backend, message, topic)
def _match_backend(self, message):
topic = self._get_topic(message)
ipc_address = self._get_ipc_address(topic)
if ipc_address not in self.backends:
self._create_backend(ipc_address)
return self.backend, topic
@abc.abstractmethod
def _get_topic(self, message):
"Extract topic from message"
@abc.abstractmethod
def _get_ipc_address(self, topic):
"Get ipc backend address from topic"
@abc.abstractmethod
def _send_message(self, backend, message, topic):
"Backend specific sending logic"
@abc.abstractmethod
def _create_backend(self, ipc_address):
"Backend specific socket opening logic"

View File

@ -0,0 +1,71 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
from oslo_utils import excutils
from oslo_messaging._drivers.zmq_driver.broker.zmq_call_proxy import CallProxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ZmqBroker(object):
"""Local messaging IPC broker (nodes are still peers).
The main purpose is to have one TCP connection
(one TCP port assigned for ZMQ messaging) per node.
There could be a number of services running on a node.
Without such broker a number of opened TCP ports used for
messaging become unpredictable for the engine.
All messages are coming to TCP ROUTER socket and then
distributed between their targets by topic via IPC.
"""
def __init__(self, conf):
super(ZmqBroker, self).__init__()
self.conf = conf
self.context = zmq.Context()
self.proxies = [CallProxy(conf, self.context)]
self._create_ipc_dirs()
def _create_ipc_dirs(self):
ipc_dir = self.conf.rpc_zmq_ipc_dir
try:
os.makedirs("%s/fanout" % ipc_dir)
except os.error:
if not os.path.isdir(ipc_dir):
with excutils.save_and_reraise_exception():
LOG.error(_LE("Required IPC directory does not exist at"
" %s"), ipc_dir)
def start(self):
for proxy in self.proxies:
proxy.start()
def wait(self):
for proxy in self.proxies:
proxy.wait()
def close(self):
LOG.info(_LI("Broker shutting down ..."))
for proxy in self.proxies:
proxy.stop()

View File

@ -0,0 +1,110 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.common import RPCException
import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class CallProxy(base_proxy.BaseProxy):
def __init__(self, conf, context):
super(CallProxy, self).__init__(conf, context)
self.tcp_frontend = FrontendTcpRouter(self.conf, context)
self.backend_matcher = CallBackendMatcher(self.conf, context)
LOG.info(_LI("Starting call proxy thread"))
def run(self):
message = self.tcp_frontend.receive_incoming()
if message is not None:
self.backend_matcher.redirect_to_backend(message)
reply, socket = self.backend_matcher.receive_outgoing_reply()
if reply is not None:
self.tcp_frontend.redirect_outgoing_reply(reply)
class CallBackendMatcher(base_proxy.BaseBackendMatcher):
def __init__(self, conf, context):
super(CallBackendMatcher, self).__init__(conf,
zmq_async.get_poller(),
context)
self.backend = self.context.socket(zmq.DEALER)
self.poller.register(self.backend)
def receive_outgoing_reply(self):
reply_message = self.poller.poll(1)
return reply_message
def _get_topic(self, message):
topic, server = zmq_serializer.get_topic_from_call_message(message)
return zmq_topic.Topic(self.conf, topic, server)
def _get_ipc_address(self, topic):
return zmq_topic.get_ipc_address_call(self.conf, topic)
def _send_message(self, backend, message, topic):
# Empty needed for awaiting REP socket to work properly
# (DEALER-REP usage specific)
backend.send(b'', zmq.SNDMORE)
backend.send_multipart(message)
def _create_backend(self, ipc_address):
self.backend.connect(ipc_address)
self.backends[str(ipc_address)] = True
class FrontendTcpRouter(base_proxy.BaseTcpFrontend):
def __init__(self, conf, context):
super(FrontendTcpRouter, self).__init__(conf,
zmq_async.get_poller(),
context)
try:
self.frontend = self.context.socket(zmq.ROUTER)
bind_address = zmq_topic.get_tcp_bind_address(conf.rpc_zmq_port)
LOG.info(_LI("Binding to TCP ROUTER %s") % bind_address)
self.frontend.bind(bind_address)
self.poller.register(self.frontend)
except zmq.ZMQError:
errmsg = _LE("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use.")
LOG.error(errmsg)
raise RPCException(errmsg)
@staticmethod
def _reduce_empty(reply):
reply.pop(0)
return reply
def redirect_outgoing_reply(self, reply):
self._reduce_empty(reply)
try:
self.frontend.send_multipart(reply)
LOG.info(_LI("Redirecting reply to client %s") % reply)
except zmq.ZMQError:
errmsg = _LE("Failed redirecting reply to client %s") % reply
LOG.error(errmsg)
raise RPCException(errmsg)

View File

@ -0,0 +1,79 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import oslo_messaging._drivers.zmq_driver.broker.zmq_base_proxy as base_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_serializer
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LI
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
class CastProxy(base_proxy.BaseProxy):
def __init__(self, conf, context):
super(CastProxy, self).__init__(conf, context)
self.tcp_frontend = FrontendTcpPull(self.conf, context)
self.backend_matcher = CastPushBackendMatcher(self.conf, context)
LOG.info(_LI("Starting cast proxy thread"))
def run(self):
message = self.tcp_frontend.receive_incoming()
if message is not None:
self.backend_matcher.redirect_to_backend(message)
class FrontendTcpPull(base_proxy.BaseTcpFrontend):
def __init__(self, conf, context):
super(FrontendTcpPull, self).__init__(conf, zmq_async.get_poller(),
context)
self.frontend = self.context.socket(zmq.PULL)
address = zmq_topic.get_tcp_bind_address(conf.rpc_zmq_fanout_port)
LOG.info(_LI("Binding to TCP PULL %s") % address)
self.frontend.bind(address)
self.poller.register(self.frontend)
def _receive_message(self):
message = self.poller.poll()
return message
class CastPushBackendMatcher(base_proxy.BaseBackendMatcher):
def __init__(self, conf, context):
super(CastPushBackendMatcher, self).__init__(conf,
zmq_async.get_poller(),
context)
self.backend = self.context.socket(zmq.PUSH)
def _get_topic(self, message):
topic, server = zmq_serializer.get_topic_from_cast_message(message)
return zmq_topic.Topic(self.conf, topic, server)
def _get_ipc_address(self, topic):
return zmq_topic.get_ipc_address_cast(self.conf, topic)
def _send_message(self, backend, message, topic):
backend.send_multipart(message)
def _create_backend(self, ipc_address):
LOG.debug("[Cast Proxy] Creating PUSH backend %s", ipc_address)
self.backend.connect(ipc_address)
self.backends[str(ipc_address)] = True

View File

@ -0,0 +1 @@
__author__ = 'ozamiatin'

View File

@ -0,0 +1,111 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import eventlet
import six
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_poller
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class GreenPoller(zmq_poller.ZmqPoller):
def __init__(self):
self.incoming_queue = six.moves.queue.Queue()
self.green_pool = eventlet.GreenPool()
self.sockets = []
def register(self, socket, recv_method=None):
self.sockets.append(socket)
return self.green_pool.spawn(self._socket_receive, socket,
recv_method)
def _socket_receive(self, socket, recv_method=None):
while True:
if recv_method:
incoming = recv_method(socket)
else:
incoming = socket.recv_multipart()
self.incoming_queue.put((incoming, socket))
eventlet.sleep()
def poll(self, timeout=None):
incoming = None
try:
with eventlet.Timeout(timeout, exception=rpc_common.Timeout):
while incoming is None:
try:
incoming = self.incoming_queue.get_nowait()
except six.moves.queue.Empty:
eventlet.sleep()
except rpc_common.Timeout:
return None, None
return incoming[0], incoming[1]
class HoldReplyPoller(GreenPoller):
def __init__(self):
super(HoldReplyPoller, self).__init__()
self.event_by_socket = {}
def register(self, socket, recv_method=None):
super(HoldReplyPoller, self).register(socket, recv_method)
self.event_by_socket[socket] = threading.Event()
def resume_polling(self, socket):
pause = self.event_by_socket[socket]
pause.set()
def _socket_receive(self, socket, recv_method=None):
pause = self.event_by_socket[socket]
while True:
pause.clear()
if recv_method:
incoming = recv_method(socket)
else:
incoming = socket.recv_multipart()
self.incoming_queue.put((incoming, socket))
pause.wait()
class GreenExecutor(zmq_poller.Executor):
def __init__(self, method):
self._method = method
super(GreenExecutor, self).__init__(None)
def _loop(self):
while True:
self._method()
eventlet.sleep()
def execute(self):
self.thread = eventlet.spawn(self._loop)
def wait(self):
if self.thread is not None:
self.thread.wait()
def stop(self):
if self.thread is not None:
self.thread.kill()

View File

@ -0,0 +1,50 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import zmq
from oslo_messaging._drivers.zmq_driver import zmq_poller
LOG = logging.getLogger(__name__)
class ThreadingPoller(zmq_poller.ZmqPoller):
def __init__(self):
self.poller = zmq.Poller()
def register(self, socket):
self.poller.register(socket, zmq.POLLOUT)
def poll(self, timeout=None):
socks = dict(self.poller.poll(timeout))
for socket in socks:
incoming = socket.recv()
return incoming
class ThreadingExecutor(zmq_poller.Executor):
def __init__(self, method):
thread = threading.Thread(target=method)
super(ThreadingExecutor, self).__init__(thread)
def execute(self):
self.thread.start()
def wait(self):
self.thread.join()

View File

@ -0,0 +1,49 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class CallRequest(Request):
def __init__(self, conf, target, context, message, timeout=None,
retry=None):
try:
self.zmq_context = zmq.Context()
socket = self.zmq_context.socket(zmq.REQ)
super(CallRequest, self).__init__(conf, target, context,
message, socket, timeout, retry)
self.connect_address = zmq_topic.get_tcp_address_call(conf,
self.topic)
LOG.info(_LI("Connecting REQ to %s") % self.connect_address)
self.socket.connect(self.connect_address)
except zmq.ZMQError as e:
LOG.error(_LE("Error connecting to socket: %s") % str(e))
def receive_reply(self):
# NOTE(ozamiatin): Check for retry here (no retries now)
self.socket.setsockopt(zmq.RCVTIMEO, self.timeout)
reply = self.socket.recv_json()
return reply[u'reply']

View File

@ -0,0 +1,72 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_publisher
from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class CastRequest(Request):
def __init__(self, conf, target, context,
message, socket, address, timeout=None, retry=None):
self.connect_address = address
super(CastRequest, self).__init__(conf, target, context, message,
socket, timeout, retry)
def __call__(self, *args, **kwargs):
self.send_request()
def send_request(self):
self.socket.send(b'', zmq.SNDMORE)
super(CastRequest, self).send_request()
def receive_reply(self):
# Ignore reply for CAST
pass
class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase):
def __init__(self, conf, matchmaker):
super(DealerCastPublisher, self).__init__(conf)
self.matchmaker = matchmaker
def cast(self, target, context,
message, timeout=None, retry=None):
topic = zmq_topic.Topic.from_target(self.conf, target)
connect_address = zmq_topic.get_tcp_address_call(self.conf, topic)
dealer_socket = self._create_socket(connect_address)
request = CastRequest(self.conf, target, context, message,
dealer_socket, connect_address, timeout, retry)
request.send_request()
def _create_socket(self, address):
if address in self.outbound_sockets:
return self.outbound_sockets[address]
try:
dealer_socket = self.zmq_context.socket(zmq.DEALER)
LOG.info(_LI("Connecting DEALER to %s") % address)
dealer_socket.connect(address)
except zmq.ZMQError:
LOG.error(_LE("Failed connecting DEALER to %s") % address)
return dealer_socket

View File

@ -0,0 +1,40 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta)
class CastPublisherBase(object):
def __init__(self, conf):
self.conf = conf
self.zmq_context = zmq.Context()
self.outbound_sockets = {}
super(CastPublisherBase, self).__init__()
@abc.abstractmethod
def cast(self, target, context,
message, timeout=None, retry=None):
"Send CAST to target"

View File

@ -0,0 +1,33 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_call_request
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_dealer
class ZmqClient(object):
def __init__(self, conf, matchmaker=None):
self.conf = conf
self.cast_publisher = zmq_cast_dealer.DealerCastPublisher(conf,
matchmaker)
def call(self, target, context, message, timeout=None, retry=None):
request = zmq_call_request.CallRequest(self.conf, target, context,
message, timeout, retry)
return request()
def cast(self, target, context, message, timeout=None, retry=None):
self.cast_publisher.cast(target, context, message, timeout, retry)

View File

@ -0,0 +1,76 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from abc import abstractmethod
import logging
import uuid
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_topic
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta)
class Request(object):
def __init__(self, conf, target, context, message,
socket, timeout=None, retry=None):
if message['method'] is None:
errmsg = _LE("No method specified for RPC call")
LOG.error(errmsg)
raise KeyError(errmsg)
self.msg_id = uuid.uuid4().hex
self.target = target
self.context = context
self.message = message
self.timeout = self._to_milliseconds(conf, timeout)
self.retry = retry
self.reply = None
self.socket = socket
self.topic = zmq_topic.Topic.from_target(conf, target)
@staticmethod
def _to_milliseconds(conf, timeout):
return timeout * 1000 if timeout else conf.rpc_response_timeout * 1000
@property
def is_replied(self):
return self.reply is not None
@property
def is_timed_out(self):
return False
def send_request(self):
self.socket.send_string(str(self.topic), zmq.SNDMORE)
self.socket.send_string(self.msg_id, zmq.SNDMORE)
self.socket.send_json(self.context, zmq.SNDMORE)
self.socket.send_json(self.message)
def __call__(self):
self.send_request()
return self.receive_reply()
@abstractmethod
def receive_reply(self):
"Receive reply from server side"

View File

@ -0,0 +1,35 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class ConsumerBase(object):
def __init__(self, listener, conf, zmq_poller, context):
self.listener = listener
self.conf = conf
self.poller = zmq_poller
self.context = context
self.sockets_per_topic = {}
def poll(self, timeout=None):
pass
def stop(self):
pass
def cleanup(self):
pass
def listen(self, target):
pass

View File

@ -0,0 +1,96 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_topic as topic_utils
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ZmqIncomingRequest(base.IncomingMessage):
def __init__(self, listener, context, message, socket, rep_id, poller):
super(ZmqIncomingRequest, self).__init__(listener, context, message)
self.reply_socket = socket
self.reply_id = rep_id
self.received = None
self.poller = poller
def reply(self, reply=None, failure=None, log_failure=True):
message_reply = {u'reply': reply,
u'failure': failure,
u'log_failure': log_failure}
LOG.debug("Replying %s REP", (str(message_reply)))
self.received = True
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
self.reply_socket.send(b'', zmq.SNDMORE)
self.reply_socket.send_json(message_reply)
self.poller.resume_polling(self.reply_socket)
def acknowledge(self):
pass
def requeue(self):
pass
class CallResponder(zmq_base_consumer.ConsumerBase):
def __init__(self, listener, conf, poller, context):
super(CallResponder, self).__init__(listener, conf, poller, context)
def poll(self, timeout=None):
try:
incoming, socket = self.poller.poll(timeout)
reply_id, context, message = incoming
LOG.debug("[Server] REP Received message %s" % str(message))
incoming = ZmqIncomingRequest(self.listener,
context,
message, socket,
reply_id,
self.poller)
return incoming
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed ... {}"), e)
def listen(self, target):
def _receive_message(socket):
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty separator expected'
topic = socket.recv_string()
assert topic is not None, 'Bad format: topic string expected'
msg_id = socket.recv_string()
assert msg_id is not None, 'Bad format: message ID expected'
context = socket.recv_json()
message = socket.recv_json()
return (reply_id, context, message)
topic = topic_utils.Topic.from_target(self.conf, target)
ipc_rep_address = topic_utils.get_ipc_address_call(self.conf, topic)
rep_socket = self.context.socket(zmq.REP)
rep_socket.bind(ipc_rep_address)
self.sockets_per_topic[str(topic)] = rep_socket
self.poller.register(rep_socket, _receive_message)

View File

@ -0,0 +1,49 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_call_responder
from oslo_messaging._drivers.zmq_driver import zmq_async
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ZmqServer(base.Listener):
def __init__(self, conf, matchmaker=None):
LOG.info("[Server] __init__")
self.conf = conf
self.context = zmq.Context()
poller = zmq_async.get_reply_poller()
self.call_responder = zmq_call_responder.CallResponder(self, conf,
poller,
self.context)
def poll(self, timeout=None):
incoming = self.call_responder.poll(timeout)
return incoming
def stop(self):
LOG.info("[Server] Stop")
def cleanup(self):
pass
def listen(self, target):
LOG.info("[Server] Listen to Target %s" % target)
self.call_responder.listen(target)

View File

@ -0,0 +1,59 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_utils import importutils
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
green_zmq = importutils.try_import('eventlet.green.zmq')
def import_zmq():
imported_zmq = green_zmq or importutils.try_import('zmq')
if imported_zmq is None:
errmsg = _LE("ZeroMQ not found!")
LOG.error(errmsg)
raise ImportError(errmsg)
return imported_zmq
def get_poller():
if green_zmq:
from oslo_messaging._drivers.zmq_driver.poller import green_poller
return green_poller.GreenPoller()
else:
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
return threading_poller.ThreadingPoller()
def get_reply_poller():
if green_zmq:
from oslo_messaging._drivers.zmq_driver.poller import green_poller
return green_poller.HoldReplyPoller()
else:
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
return threading_poller.ThreadingPoller()
def get_executor(method):
if green_zmq is not None:
from oslo_messaging._drivers.zmq_driver.poller import green_poller
return green_poller.GreenExecutor(method)
else:
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
return threading_poller.ThreadingExecutor()

View File

@ -0,0 +1,33 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers import common as rpc_common
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call."""
def __init__(self, **kwargs):
self.replies = []
super(RpcContext, self).__init__(**kwargs)
def deepcopy(self):
values = self.to_dict()
values['replies'] = self.replies
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False):
if ending:
return
self.replies.append(reply)

View File

@ -0,0 +1,48 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class ZmqPoller(object):
@abc.abstractmethod
def register(self, socket, recv_method=None):
'Register socket to poll'
@abc.abstractmethod
def poll(self, timeout=None):
'Poll for messages'
@six.add_metaclass(abc.ABCMeta)
class Executor(object):
def __init__(self, thread):
self.thread = thread
@abc.abstractmethod
def execute(self):
'Run execution'
@abc.abstractmethod
def stop(self):
'Stop execution'
@abc.abstractmethod
def wait(self):
'Wait until pass'

View File

@ -0,0 +1,54 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import re
import six
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._i18n import _LE, _LW
LOG = logging.getLogger(__name__)
MESSAGE_CALL_TOPIC_POSITION = 2
def _get_topic_from_msg(message, position):
pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
topic = message[position]
topic_items = None
if six.PY3:
topic = topic.decode('utf-8')
try:
# The topic is received over the network,
# don't trust this input.
if badchars.search(topic) is not None:
emsg = _LW("Topic contained dangerous characters")
LOG.warn(emsg)
raise rpc_common.RPCException(emsg)
topic_items = topic.split('.', 1)
except Exception as e:
errmsg = _LE("Failed topic string parsing, %s") % str(e)
LOG.error(errmsg)
rpc_common.RPCException(errmsg)
return topic_items[0], topic_items[1]
def get_topic_from_call_message(message):
return _get_topic_from_msg(message, MESSAGE_CALL_TOPIC_POSITION)

View File

@ -0,0 +1,61 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def get_ipc_address_call(conf, topic):
return "ipc://%s/%s" % (conf.rpc_zmq_ipc_dir, str(topic))
def get_tcp_bind_address(port):
return "tcp://*:%s" % port
def get_tcp_address_call(conf, topic):
return "tcp://%s:%s" % (topic.server, conf.rpc_zmq_port)
def get_ipc_address_cast(conf, topic):
return "ipc://%s/fanout/%s" % (conf.rpc_zmq_ipc_dir, str(topic))
class Topic(object):
def __init__(self, conf, topic, server=None, fanout=False):
if server is None:
self.server = conf.rpc_zmq_host
else:
self.server = server
self._topic = topic
self.fanout = fanout
@staticmethod
def _extract_cinder_server(server):
return server.split('@', 1)[0]
@staticmethod
def from_target(conf, target):
if target.server is not None:
return Topic(conf, target.topic, target.server,
fanout=target.fanout)
else:
return Topic(conf, target.topic, fanout=target.fanout)
@property
def topic(self):
return self._topic if self._topic else ""
def __str__(self, *args, **kwargs):
return "%s.%s" % (self.topic, self.server)

View File

@ -1,5 +1,4 @@
# Copyright 2014 Canonical, Ltd. # Copyright 2015 Mirantis, Inc.
# All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -15,28 +14,52 @@
import logging import logging
import socket import socket
import threading
import fixtures import fixtures
from oslo_utils import importutils
import testtools import testtools
try:
import zmq
except ImportError:
zmq = None
import oslo_messaging import oslo_messaging
from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver.broker.zmq_broker import ZmqBroker
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _
from oslo_messaging.tests import utils as test_utils from oslo_messaging.tests import utils as test_utils
from six.moves import mock
# eventlet is not yet py3 compatible, so skip if not installed
eventlet = importutils.try_import('eventlet')
impl_zmq = importutils.try_import('oslo_messaging._drivers.impl_zmq')
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class TestRPCServerListener(object):
def __init__(self, driver):
self.driver = driver
self.target = None
self.listener = None
self.executor = zmq_async.get_executor(self._run)
self._stop = threading.Event()
self._received = threading.Event()
self.message = None
def listen(self, target):
self.target = target
self.listener = self.driver.listen(self.target)
self.executor.execute()
def _run(self):
try:
message = self.listener.poll()
if message is not None:
self._received.set()
self.message = message
message.reply(reply=True)
except Exception:
LOG.exception(_("Unexpected exception occurred."))
def stop(self):
self.executor.stop()
def get_unused_port(): def get_unused_port():
"""Returns an unused port on localhost.""" """Returns an unused port on localhost."""
@ -70,10 +93,11 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
# Start RPC # Start RPC
LOG.info("Running internal zmq receiver.") LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf) self.broker = ZmqBroker(self.conf)
self.reactor.consume_in_thread() self.broker.start()
self.listener = TestRPCServerListener(self.driver)
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__)) self.addCleanup(stopRpc(self.__dict__))
@ -94,380 +118,127 @@ class stopRpc(object):
self.attrs = attrs self.attrs = attrs
def __call__(self): def __call__(self):
if self.attrs['reactor']: if self.attrs['broker']:
self.attrs['reactor'].close() self.attrs['broker'].close()
if self.attrs['driver']: if self.attrs['driver']:
self.attrs['driver'].cleanup() self.attrs['driver'].cleanup()
if self.attrs['listener']:
self.attrs['listener'].stop()
class TestZmqBasics(ZmqBaseTestCase): class TestZmqBasics(ZmqBaseTestCase):
def test_start_stop_listener(self):
target = oslo_messaging.Target(topic='testtopic')
listener = self.driver.listen(target)
result = listener.poll(0.01)
self.assertEqual(result, None)
def test_send_receive_raises(self): def test_send_receive_raises(self):
"""Call() without method.""" """Call() without method."""
target = oslo_messaging.Target(topic='testtopic') target = oslo_messaging.Target(topic='testtopic')
self.driver.listen(target) self.listener.listen(target)
self.assertRaises( self.assertRaises(
KeyError, KeyError,
self.driver.send, self.driver.send,
target, {}, {'tx_id': 1}, wait_for_reply=True) target, {}, {'tx_id': 1}, wait_for_reply=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqIncomingMessage') def test_send_receive_topic(self):
def test_send_receive_topic(self, mock_msg): """Call() with topic."""
"""Call() with method."""
mock_msg.return_value = msg = mock.MagicMock()
msg.received = received = mock.MagicMock()
received.failure = False
received.reply = True
msg.condition = condition = mock.MagicMock()
condition.wait.return_value = True
target = oslo_messaging.Target(topic='testtopic') target = oslo_messaging.Target(topic='testtopic')
self.driver.listen(target) self.listener.listen(target)
result = self.driver.send( result = self.driver.send(
target, {}, target, {},
{'method': 'hello-world', 'tx_id': 1}, {'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True) wait_for_reply=True)
self.assertEqual(result, True) self.assertIsNotNone(result)
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True) def test_send_noreply(self):
def test_send_receive_fanout(self, mock_call): """Cast() with topic."""
target = oslo_messaging.Target(topic='testtopic', server="127.0.0.1")
self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait()
self.assertIsNone(result)
self.assertEqual(True, self.listener._received.isSet())
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
@testtools.skip("Not implemented feature")
def test_send_fanout(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True) target = oslo_messaging.Target(topic='testtopic', fanout=True)
self.driver.listen(target) self.driver.listen(target)
mock_call.__name__ = '_call'
mock_call.return_value = [True]
result = self.driver.send( result = self.driver.send(
target, {}, target, {},
{'method': 'hello-world', 'tx_id': 1}, {'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.assertIsNone(result)
self.assertEqual(True, self.listener._received.isSet())
msg_pattern = "{'method': 'hello-world', 'tx_id': 1}"
self.assertEqual(msg_pattern, self.listener.message)
def test_send_receive_direct(self):
"""Call() without topic."""
target = oslo_messaging.Target(server='127.0.0.1')
self.listener.listen(target)
message = {'method': 'hello-world', 'tx_id': 1}
context = {}
result = self.driver.send(target, context, message,
wait_for_reply=True) wait_for_reply=True)
self.assertTrue(result)
self.assertEqual(result, True)
mock_call.assert_called_once_with(
self.driver,
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
{}, 'fanout~testtopic.127.0.0.1',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [], True)
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
def test_send_receive_direct(self, mock_call):
# Also verifies fix for bug http://pad.lv/1301723
target = oslo_messaging.Target(topic='testtopic', server='localhost')
self.driver.listen(target)
mock_call.__name__ = '_call'
mock_call.return_value = [True]
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
mock_call.assert_called_once_with(
self.driver,
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
{}, 'testtopic.localhost',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [], True)
class TestZmqSocket(test_utils.BaseTestCase): class TestPoller(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self): def setUp(self):
super(TestZmqSocket, self).setUp() super(TestPoller, self).setUp()
self.messaging_conf.transport_driver = 'zmq' self.poller = zmq_async.get_poller()
# Get driver self.ctx = zmq.Context()
transport = oslo_messaging.get_transport(self.conf) self.ADDR_REQ = "ipc://request1"
self.driver = transport._driver
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') def test_poll_blocking(self):
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False, rep = self.ctx.socket(zmq.REP)
subscribe=None) rep.bind(self.ADDR_REQ)
self.assertTrue(sock.can_recv)
self.assertFalse(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') reply_poller = zmq_async.get_reply_poller()
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') reply_poller.register(rep)
def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False, def listener():
subscribe=None) incoming, socket = reply_poller.poll()
self.assertTrue(sock.can_recv) self.assertEqual(b'Hello', incoming[0])
self.assertFalse(sock.can_send) socket.send_string('Reply')
self.assertTrue(sock.can_sub) reply_poller.resume_polling(socket)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') executor = zmq_async.get_executor(listener)
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') executor.execute()
def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False, req1 = self.ctx.socket(zmq.REQ)
subscribe=None) req1.connect(self.ADDR_REQ)
self.assertFalse(sock.can_recv)
self.assertTrue(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') req2 = self.ctx.socket(zmq.REQ)
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') req2.connect(self.ADDR_REQ)
def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False, req1.send_string('Hello')
subscribe=None) req2.send_string('Hello')
self.assertFalse(sock.can_recv)
self.assertTrue(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
reply = req1.recv_string()
self.assertEqual('Reply', reply)
class TestZmqIncomingMessage(test_utils.BaseTestCase): reply = req2.recv_string()
self.assertEqual('Reply', reply)
@testtools.skipIf(zmq is None, "zmq not available") def test_poll_timeout(self):
def setUp(self): rep = self.ctx.socket(zmq.REP)
super(TestZmqIncomingMessage, self).setUp() rep.bind(self.ADDR_REQ)
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
def test_zmqincomingmessage(self): reply_poller = zmq_async.get_reply_poller()
msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo') reply_poller.register(rep)
msg.reply("abc")
self.assertIsInstance(
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
self.assertIsInstance(
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
self.assertEqual(msg.received.reply, "abc")
msg.requeue()
incoming, socket = reply_poller.poll(1)
class TestZmqConnection(ZmqBaseTestCase): self.assertIsNone(incoming)
self.assertIsNone(socket)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver.conf, self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
(self.internal_ipc_dir))
# No Fanout
conn.create_consumer(topic, context)
conn.reactor.register.assert_called_with(context, inaddr,
impl_zmq.zmq.PULL,
subscribe=None, in_bind=False)
# Reset for next bunch of checks
conn.reactor.register.reset_mock()
# Fanout
inaddr = ('ipc://%s/zmq_topic_fanout~topic' %
(self.internal_ipc_dir))
conn.create_consumer(topic, context, fanout='subscriber.foo')
conn.reactor.register.assert_called_with(context, inaddr,
impl_zmq.zmq.SUB,
subscribe='subscriber.foo',
in_bind=False)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver.conf, self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
(self.internal_ipc_dir))
conn.create_consumer(topic, context)
conn.reactor.register.assert_called_with(
context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False)
conn.reactor.register.reset_mock()
# Call again with same topic
conn.create_consumer(topic, context)
self.assertFalse(conn.reactor.register.called)
@mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
conn = impl_zmq.Connection(self.driver.conf, self.driver)
conn.reactor.close = mock.Mock()
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
conn.close()
self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called)
self.assertTrue(conn.reactor.close.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_wait(self, mock_reactor):
conn = impl_zmq.Connection(self.driver, self.driver)
conn.reactor.wait = mock.Mock()
conn.wait()
self.assertTrue(conn.reactor.wait.called)
@mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_consume_in_thread(self, mock_reactor,
mock_getmatchmaker):
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
conn = impl_zmq.Connection(self.driver, self.driver)
conn.reactor.consume_in_thread = mock.Mock()
conn.consume_in_thread()
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
self.assertTrue(conn.reactor.consume_in_thread.called)
class TestZmqListener(ZmqBaseTestCase):
def test_zmqlistener_no_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
# Timeout = 0 should return straight away since the queue is empty
listener.poll(timeout=0)
def test_zmqlistener_w_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
kwargs = {'a': 1, 'b': 2}
m = mock.Mock()
ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
message = {'namespace': 'name.space', 'method': m.fake_method,
'args': kwargs}
eventlet.spawn_n(listener.dispatch, ctxt, message)
resp = listener.poll(timeout=10)
msg = {'method': m.fake_method, 'namespace': 'name.space',
'args': kwargs}
self.assertEqual(resp.message, msg)
class TestZmqDriver(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.matchmaker.MatchMakerBase.queues',
autospec=True)
def test_zmqdriver_multi_send_cast_with_no_queues(self,
mock_queues,
mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic'
msg = 'jeronimo'
with mock.patch.object(impl_zmq.LOG, 'warn') as flog:
mock_queues.return_value = None
impl_zmq._multi_send(self.driver, mock_cast,
context, topic, msg)
self.assertEqual(1, flog.call_count)
args, kwargs = flog.call_args
self.assertIn('No matchmaker results', args[0])
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
@mock.patch('oslo_messaging._drivers.matchmaker.MatchMakerBase.queues',
autospec=True)
def test_zmqdriver_multi_send_call_with_no_queues(self,
mock_queues,
mock_call):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic'
msg = 'jeronimo'
mock_queues.return_value = None
self.assertRaises(rpc_common.Timeout,
impl_zmq._multi_send, self.driver,
mock_call, context, topic, msg)
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
def test_zmqdriver_send(self, mock_multi_send, mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic'
msg = 'jeronimo'
self.driver.send(oslo_messaging.Target(topic=topic), context, msg,
False, 0, False)
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
topic, msg,
allowed_remote_exmods=[],
envelope=False, pooled=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic.foo'
topic_reformat = 'testtopic-foo'
msg = 'jeronimo'
self.driver.send_notification(oslo_messaging.Target(topic=topic),
context, msg, False, False)
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
topic_reformat, msg,
allowed_remote_exmods=[],
envelope=False, pooled=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
def test_zmqdriver_listen(self, mock_connection, mock_listener):
mock_listener.return_value = listener = mock.Mock()
mock_connection.return_value = conn = mock.Mock()
conn.create_consumer = mock.Mock()
conn.consume_in_thread = mock.Mock()
topic = 'testtopic.foo'
self.driver.listen(oslo_messaging.Target(topic=topic))
conn.create_consumer.assert_called_with(topic, listener, fanout=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
def test_zmqdriver_listen_for_notification(self, mock_connection,
mock_listener):
mock_listener.return_value = listener = mock.Mock()
mock_connection.return_value = conn = mock.Mock()
conn.create_consumer = mock.Mock()
conn.consume_in_thread = mock.Mock()
topic = 'testtopic.foo'
data = [(oslo_messaging.Target(topic=topic), 0)]
# NOTE(jamespage): Pooling not supported, just pass None for now.
self.driver.listen_for_notifications(data, None)
conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener)

View File

@ -1,5 +1,4 @@
# Copyright 2014 Canonical, Ltd. # Copyright 2015 Mirantis, Inc.
# All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -15,28 +14,52 @@
import logging import logging
import socket import socket
import threading
import fixtures import fixtures
import testtools import testtools
from six.moves import mock import oslo_messaging
from oslo_messaging._drivers import impl_zmq
try: from oslo_messaging._drivers.zmq_driver.broker.zmq_broker import ZmqBroker
import zmq from oslo_messaging._drivers.zmq_driver import zmq_async
except ImportError: from oslo_messaging._i18n import _
zmq = None
from oslo import messaging
from oslo.utils import importutils
from oslo_messaging.tests import utils as test_utils from oslo_messaging.tests import utils as test_utils
# eventlet is not yet py3 compatible, so skip if not installed
eventlet = importutils.try_import('eventlet')
impl_zmq = importutils.try_import('oslo_messaging._drivers.impl_zmq')
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class TestRPCServerListener(object):
def __init__(self, driver):
self.driver = driver
self.target = None
self.listener = None
self.executor = zmq_async.get_executor(self._run)
self._stop = threading.Event()
self._received = threading.Event()
self.message = None
def listen(self, target):
self.target = target
self.listener = self.driver.listen(self.target)
self.executor.execute()
def _run(self):
try:
message = self.listener.poll()
if message is not None:
self._received.set()
self.message = message
message.reply(reply=True)
except Exception:
LOG.exception(_("Unexpected exception occurred."))
def stop(self):
self.executor.stop()
def get_unused_port(): def get_unused_port():
"""Returns an unused port on localhost.""" """Returns an unused port on localhost."""
@ -56,7 +79,7 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
super(ZmqBaseTestCase, self).setUp() super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq' self.messaging_conf.transport_driver = 'zmq'
# Get driver # Get driver
transport = messaging.get_transport(self.conf) transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver self.driver = transport._driver
# Set config values # Set config values
@ -70,10 +93,11 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
# Start RPC # Start RPC
LOG.info("Running internal zmq receiver.") LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf) self.broker = ZmqBroker(self.conf)
self.reactor.consume_in_thread() self.broker.start()
self.listener = TestRPCServerListener(self.driver)
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__)) self.addCleanup(stopRpc(self.__dict__))
@ -85,7 +109,7 @@ class TestConfZmqDriverLoad(test_utils.BaseTestCase):
self.messaging_conf.transport_driver = 'zmq' self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self): def test_driver_load(self):
transport = messaging.get_transport(self.conf) transport = oslo_messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver) self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
@ -94,347 +118,127 @@ class stopRpc(object):
self.attrs = attrs self.attrs = attrs
def __call__(self): def __call__(self):
if self.attrs['reactor']: if self.attrs['broker']:
self.attrs['reactor'].close() self.attrs['broker'].close()
if self.attrs['driver']: if self.attrs['driver']:
self.attrs['driver'].cleanup() self.attrs['driver'].cleanup()
if self.attrs['listener']:
self.attrs['listener'].stop()
class TestZmqBasics(ZmqBaseTestCase): class TestZmqBasics(ZmqBaseTestCase):
def test_start_stop_listener(self):
target = messaging.Target(topic='testtopic')
listener = self.driver.listen(target)
result = listener.poll(0.01)
self.assertEqual(result, None)
def test_send_receive_raises(self): def test_send_receive_raises(self):
"""Call() without method.""" """Call() without method."""
target = messaging.Target(topic='testtopic') target = oslo_messaging.Target(topic='testtopic')
self.driver.listen(target) self.listener.listen(target)
self.assertRaises( self.assertRaises(
KeyError, KeyError,
self.driver.send, self.driver.send,
target, {}, {'tx_id': 1}, wait_for_reply=True) target, {}, {'tx_id': 1}, wait_for_reply=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqIncomingMessage') def test_send_receive_topic(self):
def test_send_receive_topic(self, mock_msg): """Call() with topic."""
"""Call() with method."""
mock_msg.return_value = msg = mock.MagicMock()
msg.received = received = mock.MagicMock()
received.failure = False
received.reply = True
msg.condition = condition = mock.MagicMock()
condition.wait.return_value = True
target = messaging.Target(topic='testtopic') target = oslo_messaging.Target(topic='testtopic')
self.driver.listen(target) self.listener.listen(target)
result = self.driver.send( result = self.driver.send(
target, {}, target, {},
{'method': 'hello-world', 'tx_id': 1}, {'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True) wait_for_reply=True)
self.assertEqual(result, True) self.assertIsNotNone(result)
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True) def test_send_noreply(self):
def test_send_receive_fanout(self, mock_call): """Cast() with topic."""
target = messaging.Target(topic='testtopic', fanout=True)
target = oslo_messaging.Target(topic='testtopic', server="127.0.0.1")
self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait()
self.assertIsNone(result)
self.assertEqual(True, self.listener._received.isSet())
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
@testtools.skip("Not implemented feature")
def test_send_fanout(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
self.driver.listen(target) self.driver.listen(target)
mock_call.__name__ = '_call'
mock_call.return_value = [True]
result = self.driver.send( result = self.driver.send(
target, {}, target, {},
{'method': 'hello-world', 'tx_id': 1}, {'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.assertIsNone(result)
self.assertEqual(True, self.listener._received.isSet())
msg_pattern = "{'method': 'hello-world', 'tx_id': 1}"
self.assertEqual(msg_pattern, self.listener.message)
def test_send_receive_direct(self):
"""Call() without topic."""
target = oslo_messaging.Target(server='127.0.0.1')
self.listener.listen(target)
message = {'method': 'hello-world', 'tx_id': 1}
context = {}
result = self.driver.send(target, context, message,
wait_for_reply=True) wait_for_reply=True)
self.assertTrue(result)
self.assertEqual(result, True)
mock_call.assert_called_once_with(
self.driver,
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
{}, 'fanout~testtopic.127.0.0.1',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [], True)
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
def test_send_receive_direct(self, mock_call):
# Also verifies fix for bug http://pad.lv/1301723
target = messaging.Target(topic='testtopic', server='localhost')
self.driver.listen(target)
mock_call.__name__ = '_call'
mock_call.return_value = [True]
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
mock_call.assert_called_once_with(
self.driver,
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
{}, 'testtopic.localhost',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [], True)
class TestZmqSocket(test_utils.BaseTestCase): class TestPoller(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self): def setUp(self):
super(TestZmqSocket, self).setUp() super(TestPoller, self).setUp()
self.messaging_conf.transport_driver = 'zmq' self.poller = zmq_async.get_poller()
# Get driver self.ctx = zmq.Context()
transport = messaging.get_transport(self.conf) self.ADDR_REQ = "ipc://request1"
self.driver = transport._driver
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') def test_poll_blocking(self):
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False, rep = self.ctx.socket(zmq.REP)
subscribe=None) rep.bind(self.ADDR_REQ)
self.assertTrue(sock.can_recv)
self.assertFalse(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') reply_poller = zmq_async.get_reply_poller()
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') reply_poller.register(rep)
def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False, def listener():
subscribe=None) incoming, socket = reply_poller.poll()
self.assertTrue(sock.can_recv) self.assertEqual(b'Hello', incoming[0])
self.assertFalse(sock.can_send) socket.send_string('Reply')
self.assertTrue(sock.can_sub) reply_poller.resume_polling(socket)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') executor = zmq_async.get_executor(listener)
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') executor.execute()
def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False, req1 = self.ctx.socket(zmq.REQ)
subscribe=None) req1.connect(self.ADDR_REQ)
self.assertFalse(sock.can_recv)
self.assertTrue(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') req2 = self.ctx.socket(zmq.REQ)
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') req2.connect(self.ADDR_REQ)
def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False, req1.send_string('Hello')
subscribe=None) req2.send_string('Hello')
self.assertFalse(sock.can_recv)
self.assertTrue(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
reply = req1.recv_string()
self.assertEqual('Reply', reply)
class TestZmqIncomingMessage(test_utils.BaseTestCase): reply = req2.recv_string()
self.assertEqual('Reply', reply)
@testtools.skipIf(zmq is None, "zmq not available") def test_poll_timeout(self):
def setUp(self): rep = self.ctx.socket(zmq.REP)
super(TestZmqIncomingMessage, self).setUp() rep.bind(self.ADDR_REQ)
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
def test_zmqincomingmessage(self): reply_poller = zmq_async.get_reply_poller()
msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo') reply_poller.register(rep)
msg.reply("abc")
self.assertIsInstance(
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
self.assertIsInstance(
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
self.assertEqual(msg.received.reply, "abc")
msg.requeue()
incoming, socket = reply_poller.poll(1)
class TestZmqConnection(ZmqBaseTestCase): self.assertIsNone(incoming)
self.assertIsNone(socket)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver.conf, self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
(self.internal_ipc_dir))
# No Fanout
conn.create_consumer(topic, context)
conn.reactor.register.assert_called_with(context, inaddr,
impl_zmq.zmq.PULL,
subscribe=None, in_bind=False)
# Reset for next bunch of checks
conn.reactor.register.reset_mock()
# Fanout
inaddr = ('ipc://%s/zmq_topic_fanout~topic' %
(self.internal_ipc_dir))
conn.create_consumer(topic, context, fanout='subscriber.foo')
conn.reactor.register.assert_called_with(context, inaddr,
impl_zmq.zmq.SUB,
subscribe='subscriber.foo',
in_bind=False)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver.conf, self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
(self.internal_ipc_dir))
conn.create_consumer(topic, context)
conn.reactor.register.assert_called_with(
context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False)
conn.reactor.register.reset_mock()
# Call again with same topic
conn.create_consumer(topic, context)
self.assertFalse(conn.reactor.register.called)
@mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
conn = impl_zmq.Connection(self.driver.conf, self.driver)
conn.reactor.close = mock.Mock()
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
conn.close()
self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called)
self.assertTrue(conn.reactor.close.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_wait(self, mock_reactor):
conn = impl_zmq.Connection(self.driver.conf, self.driver)
conn.reactor.wait = mock.Mock()
conn.wait()
self.assertTrue(conn.reactor.wait.called)
@mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_consume_in_thread(self, mock_reactor,
mock_getmatchmaker):
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
conn = impl_zmq.Connection(self.driver.conf, self.driver)
conn.reactor.consume_in_thread = mock.Mock()
conn.consume_in_thread()
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
self.assertTrue(conn.reactor.consume_in_thread.called)
class TestZmqListener(ZmqBaseTestCase):
def test_zmqlistener_no_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
# Timeout = 0 should return straight away since the queue is empty
listener.poll(timeout=0)
def test_zmqlistener_w_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
kwargs = {'a': 1, 'b': 2}
m = mock.Mock()
ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
message = {'namespace': 'name.space', 'method': m.fake_method,
'args': kwargs}
eventlet.spawn_n(listener.dispatch, ctxt, message)
resp = listener.poll(timeout=10)
msg = {'method': m.fake_method, 'namespace': 'name.space',
'args': kwargs}
self.assertEqual(resp.message, msg)
class TestZmqDriver(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
def test_zmqdriver_send(self, mock_multi_send, mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic'
msg = 'jeronimo'
self.driver.send(messaging.Target(topic=topic), context, msg,
False, 0, False)
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
topic, msg,
allowed_remote_exmods=[],
envelope=False, pooled=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic.foo'
topic_reformat = 'testtopic-foo'
msg = 'jeronimo'
self.driver.send_notification(messaging.Target(topic=topic), context,
msg, False, False)
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
topic_reformat, msg,
allowed_remote_exmods=[],
envelope=False, pooled=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
def test_zmqdriver_listen(self, mock_connection, mock_listener):
mock_listener.return_value = listener = mock.Mock()
mock_connection.return_value = conn = mock.Mock()
conn.create_consumer = mock.Mock()
conn.consume_in_thread = mock.Mock()
topic = 'testtopic.foo'
self.driver.listen(messaging.Target(topic=topic))
conn.create_consumer.assert_called_with(topic, listener, fanout=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
def test_zmqdriver_listen_for_notification(self, mock_connection,
mock_listener):
mock_listener.return_value = listener = mock.Mock()
mock_connection.return_value = conn = mock.Mock()
conn.create_consumer = mock.Mock()
conn.consume_in_thread = mock.Mock()
topic = 'testtopic.foo'
data = [(messaging.Target(topic=topic), 0)]
# NOTE(jamespage): Pooling not supported, just pass None for now.
self.driver.listen_for_notifications(data, None)
conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener)