Merge "Merge remote-tracking branch 'origin/feature/zmq' into merge-branch"

This commit is contained in:
Jenkins 2015-09-16 14:31:53 +00:00 committed by Gerrit Code Review
commit bff2c802cf
53 changed files with 2379 additions and 2380 deletions

View File

@ -2,7 +2,7 @@ oslo.messaging
==============
The Oslo messaging API supports RPC and notifications over a number of
different messsaging transports.
different messaging transports.
Contents
========

View File

@ -1,40 +0,0 @@
#!/usr/bin/env python
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
eventlet.monkey_patch()
import contextlib
import sys
from oslo_config import cfg
from oslo_log import log
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._executors import base # FIXME(markmc)
CONF = cfg.CONF
CONF.register_opts(impl_zmq.zmq_opts)
CONF.register_opts(base._pool_opts)
def main():
CONF(sys.argv[1:], project='oslo')
log.setup(CONF, 'oslo.messaging')
with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
reactor.consume_in_thread()
reactor.wait()

View File

@ -75,8 +75,7 @@ class Listener(object):
def cleanup(self):
"""Cleanup listener.
Close connection used by listener if any. For some listeners like
zmq there is no connection so no need to close connection.
Close connection (socket) used by listener if any.
As this is listener specific method, overwrite it in to derived class
if cleanup of listener required.
"""

File diff suppressed because it is too large Load Diff

View File

@ -1,322 +0,0 @@
# Copyright 2011 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import contextlib
import logging
import eventlet
from oslo_config import cfg
from oslo_messaging._i18n import _
from oslo_messaging._i18n import _LI
matchmaker_opts = [
cfg.IntOpt('matchmaker_heartbeat_freq',
default=300,
help='Heartbeat frequency.'),
cfg.IntOpt('matchmaker_heartbeat_ttl',
default=600,
help='Heartbeat time-to-live.'),
]
CONF = cfg.CONF
CONF.register_opts(matchmaker_opts)
LOG = logging.getLogger(__name__)
contextmanager = contextlib.contextmanager
class MatchMakerException(Exception):
"""Signified a match could not be found."""
message = _("Match not found by MatchMaker.")
class Exchange(object):
"""Implements lookups.
Subclass this to support hashtables, dns, etc.
"""
def __init__(self):
pass
def run(self, key):
raise NotImplementedError()
class Binding(object):
"""A binding on which to perform a lookup."""
def __init__(self):
pass
def test(self, key):
raise NotImplementedError()
class MatchMakerBase(object):
"""Match Maker Base Class.
Build off HeartbeatMatchMakerBase if building a heartbeat-capable
MatchMaker.
"""
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
self.no_heartbeat_msg = _('Matchmaker does not implement '
'registration or heartbeat.')
def register(self, key, host):
"""Register a host on a backend.
Heartbeats, if applicable, may keepalive registration.
"""
pass
def ack_alive(self, key, host):
"""Acknowledge that a key.host is alive.
Used internally for updating heartbeats, but may also be used
publicly to acknowledge a system is alive (i.e. rpc message
successfully sent to host)
"""
pass
def is_alive(self, topic, host):
"""Checks if a host is alive."""
pass
def expire(self, topic, host):
"""Explicitly expire a host's registration."""
pass
def send_heartbeats(self):
"""Send all heartbeats.
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
pass
def unregister(self, key, host):
"""Unregister a topic."""
pass
def start_heartbeat(self):
"""Spawn heartbeat greenthread."""
pass
def stop_heartbeat(self):
"""Destroys the heartbeat greenthread."""
pass
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
# NOTE(ewindisch): kept the following method in case we implement the
# underlying support.
# def add_negate_binding(self, binding, rule, last=True):
# self.bindings.append((binding, rule, True, last))
def queues(self, key):
workers = []
# bit is for negate bindings - if we choose to implement it.
# last stops processing rules if this matches.
for (binding, exchange, bit, last) in self.bindings:
if binding.test(key):
workers.extend(exchange.run(key))
# Support last.
if last:
return workers
return workers
class HeartbeatMatchMakerBase(MatchMakerBase):
"""Base for a heart-beat capable MatchMaker.
Provides common methods for registering, unregistering, and maintaining
heartbeats.
"""
def __init__(self):
self.hosts = set()
self._heart = None
self.host_topic = {}
super(HeartbeatMatchMakerBase, self).__init__()
def send_heartbeats(self):
"""Send all heartbeats.
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
for key, host in self.host_topic.keys():
self.ack_alive(key, host)
def ack_alive(self, key, host):
"""Acknowledge that a host.topic is alive.
Used internally for updating heartbeats, but may also be used
publicly to acknowledge a system is alive (i.e. rpc message
successfully sent to host)
"""
raise NotImplementedError("Must implement ack_alive")
def backend_register(self, key, host):
"""Implements registration logic.
Called by register(self,key,host)
"""
raise NotImplementedError("Must implement backend_register")
def backend_unregister(self, key, key_host):
"""Implements de-registration logic.
Called by unregister(self,key,host)
"""
raise NotImplementedError("Must implement backend_unregister")
def register(self, key, host):
"""Register a host on a backend.
Heartbeats, if applicable, may keepalive registration.
"""
self.hosts.add(host)
self.host_topic[(key, host)] = host
key_host = '.'.join((key, host))
self.backend_register(key, key_host)
self.ack_alive(key, host)
def unregister(self, key, host):
"""Unregister a topic."""
if (key, host) in self.host_topic:
del self.host_topic[(key, host)]
self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host)))
LOG.info(_LI("Matchmaker unregistered: %(key)s, %(host)s"),
{'key': key, 'host': host})
def start_heartbeat(self):
"""Implementation of MatchMakerBase.start_heartbeat.
Launches greenthread looping send_heartbeats(),
yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations.
"""
if not self.hosts:
raise MatchMakerException(
_("Register before starting heartbeat."))
def do_heartbeat():
while True:
self.send_heartbeats()
eventlet.sleep(CONF.matchmaker_heartbeat_freq)
self._heart = eventlet.spawn(do_heartbeat)
def stop_heartbeat(self):
"""Destroys the heartbeat greenthread."""
if self._heart:
self._heart.kill()
class DirectBinding(Binding):
"""Specifies a host in the key via a '.' character.
Although dots are used in the key, the behavior here is
that it maps directly to a host, thus direct.
"""
def test(self, key):
return '.' in key
class TopicBinding(Binding):
"""Where a 'bare' key without dots.
AMQP generally considers topic exchanges to be those *with* dots,
but we deviate here in terminology as the behavior here matches
that of a topic exchange (whereas where there are dots, behavior
matches that of a direct exchange.
"""
def test(self, key):
return '.' not in key
class FanoutBinding(Binding):
"""Match on fanout keys, where key starts with 'fanout.' string."""
def test(self, key):
return key.startswith('fanout~')
class StubExchange(Exchange):
"""Exchange that does nothing."""
def run(self, key):
return [(key, None)]
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
def __init__(self, host='localhost'):
self.host = host
super(Exchange, self).__init__()
def run(self, key):
return [('.'.join((key.split('.')[0], self.host)), self.host)]
class DirectExchange(Exchange):
"""Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute.host" running on "host"
"""
def __init__(self):
super(Exchange, self).__init__()
def run(self, key):
e = key.split('.', 1)[1]
return [(key, e)]
class MatchMakerLocalhost(MatchMakerBase):
"""Match Maker where all bare topics resolve to localhost.
Useful for testing.
"""
def __init__(self, host='localhost'):
super(MatchMakerLocalhost, self).__init__()
self.add_binding(FanoutBinding(), LocalhostExchange(host))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange(host))
class MatchMakerStub(MatchMakerBase):
"""Match Maker where topics are untouched.
Useful for testing, or for AMQP/brokered queues.
Will not work where knowledge of hosts is known (i.e. zeromq)
"""
def __init__(self):
super(MatchMakerStub, self).__init__()
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange())

View File

@ -1,145 +0,0 @@
# Copyright 2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should accept a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
from oslo_config import cfg
from oslo_utils import importutils
from oslo_messaging._drivers import matchmaker as mm_common
redis = importutils.try_import('redis')
matchmaker_redis_opts = [
cfg.StrOpt('host',
default='127.0.0.1',
help='Host to locate redis.'),
cfg.IntOpt('port',
default=6379,
help='Use this port to connect to redis host.'),
cfg.StrOpt('password',
help='Password for Redis server (optional).'),
]
CONF = cfg.CONF
opt_group = cfg.OptGroup(name='matchmaker_redis',
title='Options for Redis-based MatchMaker')
CONF.register_group(opt_group)
CONF.register_opts(matchmaker_redis_opts, opt_group)
class RedisExchange(mm_common.Exchange):
def __init__(self, matchmaker):
self.matchmaker = matchmaker
self.redis = matchmaker.redis
super(RedisExchange, self).__init__()
class RedisTopicExchange(RedisExchange):
"""Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host"
"""
def run(self, topic):
while True:
member_name = self.redis.srandmember(topic)
if not member_name:
# If this happens, there are no
# longer any members.
break
if not self.matchmaker.is_alive(topic, member_name):
continue
host = member_name.split('.', 1)[1]
return [(member_name, host)]
return []
class RedisFanoutExchange(RedisExchange):
"""Return a list of all hosts."""
def run(self, topic):
topic = topic.split('~', 1)[1]
hosts = self.redis.smembers(topic)
good_hosts = filter(
lambda host: self.matchmaker.is_alive(topic, host), hosts)
return [(x, x.split('.', 1)[1]) for x in good_hosts]
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
"""MatchMaker registering and looking-up hosts with a Redis server."""
def __init__(self):
super(MatchMakerRedis, self).__init__()
if not redis:
raise ImportError("Failed to import module redis.")
self.redis = redis.StrictRedis(
host=CONF.matchmaker_redis.host,
port=CONF.matchmaker_redis.port,
password=CONF.matchmaker_redis.password)
self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
def ack_alive(self, key, host):
topic = "%s.%s" % (key, host)
if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
# If we could not update the expiration, the key
# might have been pruned. Re-register, creating a new
# key in Redis.
self.register(key, host)
def is_alive(self, topic, host):
# After redis 2.8, if the specialized key doesn't exist,
# TTL fuction would return -2. If key exists,
# but doesn't have expiration associated,
# TTL func would return -1. For more information,
# please visit http://redis.io/commands/ttl
if self.redis.ttl(host) == -2:
self.expire(topic, host)
return False
return True
def expire(self, topic, host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.delete(host)
pipe.srem(topic, host)
pipe.execute()
def backend_register(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.sadd(key, key_host)
# No value is needed, we just
# care if it exists. Sets aren't viable
# because only keys can expire.
pipe.sadd(key_host, '')
pipe.execute()
def backend_unregister(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.srem(key, key_host)
pipe.delete(key_host)
pipe.execute()

View File

@ -1,105 +0,0 @@
# Copyright 2011-2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import itertools
import json
import logging
from oslo_config import cfg
from oslo_messaging._drivers import matchmaker as mm
from oslo_messaging._i18n import _LW
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('ringfile',
deprecated_name='matchmaker_ringfile',
deprecated_group='DEFAULT',
default='/etc/oslo/matchmaker_ring.json',
help='Matchmaker ring file (JSON).'),
]
CONF = cfg.CONF
CONF.register_opts(matchmaker_opts, 'matchmaker_ring')
LOG = logging.getLogger(__name__)
class RingExchange(mm.Exchange):
"""Match Maker where hosts are loaded from a static JSON formatted file.
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
"""
def __init__(self, ring=None):
super(RingExchange, self).__init__()
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ring.ringfile, 'r')
self.ring = json.load(fh)
fh.close()
self.ring0 = {}
for k in self.ring.keys():
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
return key in self.ring0
class RoundRobinRingExchange(RingExchange):
"""A Topic Exchange based on a hashmap."""
def __init__(self, ring=None):
super(RoundRobinRingExchange, self).__init__(ring)
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_LW("No key defining hosts for topic '%s', "
"see ringfile"), key
)
return []
host = next(self.ring0[key])
return [(key + '.' + host, host)]
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(FanoutRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "fanout~", strip it for lookup.
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_LW("No key defining hosts for topic '%s', "
"see ringfile"), nkey
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class MatchMakerRing(mm.MatchMakerBase):
"""Match Maker where hosts are loaded from a static hashmap."""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(mm.DirectBinding(), mm.DirectExchange())
self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring))

View File

@ -0,0 +1,101 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import uuid
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI, _LW
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class DealerPublisher(zmq_publisher_base.PublisherMultisend):
def __init__(self, conf, matchmaker):
super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER)
self.ack_receiver = AcknowledgementReceiver()
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
dealer_socket, hosts = self._check_hosts_connections(request.target)
if not dealer_socket.connections:
# NOTE(ozamiatin): Here we can provide
# a queue for keeping messages to send them later
# when some listener appears. However such approach
# being more reliable will consume additional memory.
LOG.warning(_LW("Request %s was dropped because no connection")
% request.msg_type)
return
self.ack_receiver.track_socket(dealer_socket.handle)
if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(dealer_socket.connections_count()):
self._send_request(dealer_socket, request)
else:
self._send_request(dealer_socket, request)
def _send_request(self, socket, request):
message_id = str(uuid.uuid1())
socket.send(b'', zmq.SNDMORE)
socket.send_string(request.msg_type, zmq.SNDMORE)
socket.send_string(message_id, zmq.SNDMORE)
socket.send_pyobj(request.context, zmq.SNDMORE)
socket.send_pyobj(request.message)
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
% {"message": request.message,
"target": request.target})
def cleanup(self):
self.ack_receiver.cleanup()
super(DealerPublisher, self).cleanup()
class AcknowledgementReceiver(object):
def __init__(self):
self.poller = zmq_async.get_poller()
self.thread = zmq_async.get_executor(self.poll_for_acknowledgements)
self.thread.execute()
def _receive_acknowledgement(self, socket):
empty = socket.recv()
assert empty == b"", "Empty delimiter expected"
ack_message = socket.recv_pyobj()
return ack_message
def track_socket(self, socket):
self.poller.register(socket, self._receive_acknowledgement)
def poll_for_acknowledgements(self):
ack_message, socket = self.poller.poll()
LOG.info(_LI("Message %s acknowledged")
% ack_message[zmq_names.FIELD_ID])
def cleanup(self):
self.thread.stop()
self.poller.close()

View File

@ -0,0 +1,47 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class PubPublisher(zmq_publisher_base.PublisherMultisend):
def __init__(self, conf, matchmaker):
super(PubPublisher, self).__init__(conf, matchmaker, zmq.PUB)
def send_request(self, request):
if request.msg_type not in zmq_names.NOTIFY_TYPES:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
pub_socket, hosts = self._check_hosts_connections(request.target)
self._send_request(pub_socket, request)
def _send_request(self, socket, request):
super(PubPublisher, self)._send_request(socket, request)
LOG.info(_LI("Publishing message %(message)s to a target %(target)s")
% {"message": request.message,
"target": request.target})

View File

@ -0,0 +1,141 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class UnsupportedSendPattern(rpc_common.RPCException):
"""Exception to raise from publishers in case of unsupported
sending pattern called.
"""
def __init__(self, pattern_name):
"""Construct exception object
:param pattern_name: Message type name from zmq_names
:type pattern_name: str
"""
errmsg = _LE("Sending pattern %s is unsupported.") % pattern_name
super(UnsupportedSendPattern, self).__init__(errmsg)
@six.add_metaclass(abc.ABCMeta)
class PublisherBase(object):
"""Abstract publisher class
Each publisher from zmq-driver client should implement
this interface to serve as a messages publisher.
Publisher can send request objects from zmq_request.
"""
def __init__(self, conf, matchmaker):
"""Construct publisher
Accept configuration object and Name Service interface object.
Create zmq.Context and connected sockets dictionary.
:param conf: configuration object
:type conf: oslo_config.CONF
:param matchmaker: Name Service interface object
:type matchmaker: matchmaker.MatchMakerBase
"""
self.conf = conf
self.zmq_context = zmq.Context()
self.matchmaker = matchmaker
self.outbound_sockets = {}
super(PublisherBase, self).__init__()
@abc.abstractmethod
def send_request(self, request):
"""Send request to consumer
:param request: Message data and destination container object
:type request: zmq_request.Request
"""
def _send_request(self, socket, request):
"""Send request to consumer.
Helper private method which defines basic sending behavior.
:param socket: Socket to publish message on
:type socket: zmq.Socket
:param request: Message data and destination container object
:type request: zmq_request.Request
"""
socket.send_string(request.msg_type, zmq.SNDMORE)
socket.send_pyobj(request.context, zmq.SNDMORE)
socket.send_pyobj(request.message)
def cleanup(self):
"""Cleanup publisher. Close allocated connections."""
for socket in self.outbound_sockets.values():
socket.setsockopt(zmq.LINGER, 0)
socket.close()
class PublisherMultisend(PublisherBase):
def __init__(self, conf, matchmaker, socket_type):
self.socket_type = socket_type
super(PublisherMultisend, self).__init__(conf, matchmaker)
def _check_hosts_connections(self, target):
# TODO(ozamiatin): Place for significant optimization
# Matchmaker cache should be implemented
hosts = self.matchmaker.get_hosts(target)
if str(target) in self.outbound_sockets:
socket = self.outbound_sockets[str(target)]
else:
socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type)
self.outbound_sockets[str(target)] = socket
for host in hosts:
self._connect_to_host(socket, host, target)
return socket, hosts
def _connect_to_host(self, socket, host, target):
address = zmq_address.get_tcp_direct_address(host)
stype = zmq_names.socket_type_str(self.socket_type)
try:
LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s")
% {"stype": stype,
"address": address,
"target": target})
socket.connect(address)
except zmq.ZMQError as e:
errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
% (stype, address, e)
LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s")
% (stype, address, e))
raise rpc_common.RPCException(errmsg)

View File

@ -0,0 +1,57 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI, _LW
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class PushPublisher(zmq_publisher_base.PublisherMultisend):
def __init__(self, conf, matchmaker):
super(PushPublisher, self).__init__(conf, matchmaker, zmq.PUSH)
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
push_socket, hosts = self._check_hosts_connections(request.target)
if not push_socket.connections:
LOG.warning(_LW("Request %s was dropped because no connection")
% request.msg_type)
return
if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(push_socket.connections_count()):
self._send_request(push_socket, request)
else:
self._send_request(push_socket, request)
def _send_request(self, socket, request):
super(PushPublisher, self)._send_request(socket, request)
LOG.info(_LI("Publishing message %(message)s to a target %(target)s")
% {"message": request.message,
"target": request.target})

View File

@ -0,0 +1,85 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ReqPublisher(zmq_publisher_base.PublisherBase):
def send_request(self, request):
if request.msg_type != zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
socket = self._connect_to_host(request.target)
self._send_request(socket, request)
return self._receive_reply(socket, request)
def _connect_to_host(self, target):
try:
self.zmq_context = zmq.Context()
socket = self.zmq_context.socket(zmq.REQ)
host = self.matchmaker.get_single_host(target)
connect_address = zmq_address.get_tcp_direct_address(host)
LOG.info(_LI("Connecting REQ to %s") % connect_address)
socket.connect(connect_address)
self.outbound_sockets[str(target)] = socket
return socket
except zmq.ZMQError as e:
errmsg = _LE("Error connecting to socket: %s") % str(e)
LOG.error(_LE("Error connecting to socket: %s") % str(e))
raise rpc_common.RPCException(errmsg)
@staticmethod
def _receive_reply(socket, request):
def _receive_method(socket):
return socket.recv_pyobj()
# NOTE(ozamiatin): Check for retry here (no retries now)
with contextlib.closing(zmq_async.get_reply_poller()) as poller:
poller.register(socket, recv_method=_receive_method)
reply, socket = poller.poll(timeout=request.timeout)
if reply is None:
raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % request.timeout)
if reply[zmq_names.FIELD_FAILURE]:
raise rpc_common.deserialize_remote_exception(
reply[zmq_names.FIELD_FAILURE],
request.allowed_remote_exmods)
else:
return reply[zmq_names.FIELD_REPLY]
def close(self):
# For contextlib compatibility
self.cleanup()

View File

@ -0,0 +1,72 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_req_publisher
from oslo_messaging._drivers.zmq_driver.client import zmq_request
from oslo_messaging._drivers.zmq_driver import zmq_async
zmq = zmq_async.import_zmq()
class ZmqClient(object):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
self.conf = conf
self.context = zmq.Context()
self.matchmaker = matchmaker
self.allowed_remote_exmods = allowed_remote_exmods or []
self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
def send_call(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CallRequest(
target, context=context, message=message,
timeout=timeout, retry=retry,
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
with contextlib.closing(zmq_req_publisher.ReqPublisher(
self.conf, self.matchmaker)) as req_publisher:
return req_publisher.send_request(request)
def send_cast(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CastRequest(
target, context=context, message=message,
timeout=timeout, retry=retry)) as request:
self.dealer_publisher.send_request(request)
def send_fanout(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.FanoutRequest(
target, context=context, message=message,
timeout=timeout, retry=retry)) as request:
self.dealer_publisher.send_request(request)
def send_notify(self, target, context, message, version, retry=None):
with contextlib.closing(zmq_request.NotificationRequest(
target, context, message, version=version,
retry=retry)) as request:
self.dealer_publisher.send_request(request)
def send_notify_fanout(self, target, context, message, version,
retry=None):
with contextlib.closing(zmq_request.NotificationFanoutRequest(
target, context, message, version=version,
retry=retry)) as request:
self.dealer_publisher.send_request(request)
def cleanup(self):
self.dealer_publisher.cleanup()

View File

@ -0,0 +1,118 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta)
class Request(object):
"""Zmq request abstract class
Represents socket (publisher) independent data object to publish.
Request object should contain all needed information for a publisher
to publish it, for instance: message payload, target, timeout
and retries etc.
"""
def __init__(self, target, context=None, message=None, retry=None):
"""Construct request object
:param target: Message destination target
:type target: oslo_messaging.Target
:param context: Message context
:type context: dict
:param message: Message payload to pass
:type message: dict
:param retry: an optional default connection retries configuration
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
if self.msg_type not in zmq_names.MESSAGE_TYPES:
raise RuntimeError("Unknown message type!")
self.target = target
self.context = context
self.message = message
self.retry = retry
@abc.abstractproperty
def msg_type(self):
"""ZMQ message type"""
def close(self):
"""Nothing to close in base request"""
class RpcRequest(Request):
def __init__(self, *args, **kwargs):
message = kwargs.get("message")
if message['method'] is None:
errmsg = _LE("No method specified for RPC call")
LOG.error(_LE("No method specified for RPC call"))
raise KeyError(errmsg)
self.timeout = kwargs.pop("timeout")
assert self.timeout is not None, "Timeout should be specified!"
super(RpcRequest, self).__init__(*args, **kwargs)
class CallRequest(RpcRequest):
msg_type = zmq_names.CALL_TYPE
def __init__(self, *args, **kwargs):
self.allowed_remote_exmods = kwargs.pop("allowed_remote_exmods")
super(CallRequest, self).__init__(*args, **kwargs)
class CastRequest(RpcRequest):
msg_type = zmq_names.CAST_TYPE
class FanoutRequest(RpcRequest):
msg_type = zmq_names.CAST_FANOUT_TYPE
class NotificationRequest(Request):
msg_type = zmq_names.NOTIFY_TYPE
def __init__(self, *args, **kwargs):
self.version = kwargs.pop("version")
super(NotificationRequest, self).__init__(*args, **kwargs)
class NotificationFanoutRequest(NotificationRequest):
msg_type = zmq_names.NOTIFY_FANOUT_TYPE

View File

@ -0,0 +1,95 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
import logging
import random
import six
import oslo_messaging
from oslo_messaging._i18n import _LI, _LW
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class MatchMakerBase(object):
def __init__(self, conf, *args, **kwargs):
super(MatchMakerBase, self).__init__(*args, **kwargs)
self.conf = conf
@abc.abstractmethod
def register(self, target, hostname):
"""Register target on nameserver.
:param target: the target for host
:type target: Target
:param hostname: host for the topic in "host:port" format
:type hostname: String
"""
@abc.abstractmethod
def get_hosts(self, target):
"""Get all hosts from nameserver by target.
:param target: the default target for invocations
:type target: Target
:returns: a list of "hostname:port" hosts
"""
def get_single_host(self, target):
"""Get a single host by target.
:param target: the target for messages
:type target: Target
:returns: a "hostname:port" host
"""
hosts = self.get_hosts(target)
if not hosts:
err_msg = "No hosts were found for target %s." % target
LOG.error(err_msg)
raise oslo_messaging.InvalidTarget(err_msg, target)
if len(hosts) == 1:
host = hosts[0]
LOG.info(_LI("A single host %(host)s found for target %(target)s.")
% {"host": host, "target": target})
else:
host = random.choice(hosts)
LOG.warning(_LW("Multiple hosts %(hosts)s were found for target "
" %(target)s. Using the random one - %(host)s.")
% {"hosts": hosts, "target": target, "host": host})
return host
class DummyMatchMaker(MatchMakerBase):
def __init__(self, conf, *args, **kwargs):
super(DummyMatchMaker, self).__init__(conf, *args, **kwargs)
self._cache = collections.defaultdict(list)
def register(self, target, hostname):
key = str(target)
if hostname not in self._cache[key]:
self._cache[key].append(hostname)
def get_hosts(self, target):
key = str(target)
return self._cache[key]

View File

@ -0,0 +1,77 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_config import cfg
import redis
from oslo_messaging._drivers.zmq_driver.matchmaker import base
LOG = logging.getLogger(__name__)
matchmaker_redis_opts = [
cfg.StrOpt('host',
default='127.0.0.1',
help='Host to locate redis.'),
cfg.IntOpt('port',
default=6379,
help='Use this port to connect to redis host.'),
cfg.StrOpt('password',
default='',
secret=True,
help='Password for Redis server (optional).'),
]
class RedisMatchMaker(base.MatchMakerBase):
def __init__(self, conf, *args, **kwargs):
super(RedisMatchMaker, self).__init__(conf, *args, **kwargs)
self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis")
self._redis = redis.StrictRedis(
host=self.conf.matchmaker_redis.host,
port=self.conf.matchmaker_redis.port,
password=self.conf.matchmaker_redis.password,
)
def _target_to_key(self, target):
attributes = ['topic', 'exchange', 'server']
prefix = "ZMQ-target"
key = ":".join((getattr(target, attr) or "*") for attr in attributes)
return "%s-%s" % (prefix, key)
def _get_keys_by_pattern(self, pattern):
return self._redis.keys(pattern)
def _get_hosts_by_key(self, key):
return self._redis.lrange(key, 0, -1)
def register(self, target, hostname):
key = self._target_to_key(target)
if hostname not in self._get_hosts_by_key(key):
self._redis.lpush(key, hostname)
def get_hosts(self, target):
pattern = self._target_to_key(target)
if "*" not in pattern:
# pattern have no placeholders, so this is valid key
return self._get_hosts_by_key(pattern)
hosts = []
for key in self._get_keys_by_pattern(pattern):
hosts.extend(self._get_hosts_by_key(key))
return hosts

View File

@ -0,0 +1,114 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import eventlet
from oslo_messaging._drivers.zmq_driver import zmq_poller
LOG = logging.getLogger(__name__)
class GreenPoller(zmq_poller.ZmqPoller):
def __init__(self):
self.incoming_queue = eventlet.queue.LightQueue()
self.green_pool = eventlet.GreenPool()
self.thread_by_socket = {}
def register(self, socket, recv_method=None):
if socket not in self.thread_by_socket:
self.thread_by_socket[socket] = self.green_pool.spawn(
self._socket_receive, socket, recv_method)
def _socket_receive(self, socket, recv_method=None):
while True:
if recv_method:
incoming = recv_method(socket)
else:
incoming = socket.recv_multipart()
self.incoming_queue.put((incoming, socket))
eventlet.sleep()
def poll(self, timeout=None):
try:
return self.incoming_queue.get(timeout=timeout)
except eventlet.queue.Empty:
return (None, None)
def close(self):
for thread in self.thread_by_socket.values():
thread.kill()
self.thread_by_socket = {}
class HoldReplyPoller(GreenPoller):
def __init__(self):
super(HoldReplyPoller, self).__init__()
self.event_by_socket = {}
self._is_running = threading.Event()
def register(self, socket, recv_method=None):
super(HoldReplyPoller, self).register(socket, recv_method)
self.event_by_socket[socket] = threading.Event()
def resume_polling(self, socket):
pause = self.event_by_socket[socket]
pause.set()
def _socket_receive(self, socket, recv_method=None):
pause = self.event_by_socket[socket]
while not self._is_running.is_set():
pause.clear()
if recv_method:
incoming = recv_method(socket)
else:
incoming = socket.recv_multipart()
self.incoming_queue.put((incoming, socket))
pause.wait()
def close(self):
self._is_running.set()
for pause in self.event_by_socket.values():
pause.set()
eventlet.sleep()
super(HoldReplyPoller, self).close()
class GreenExecutor(zmq_poller.Executor):
def __init__(self, method):
self._method = method
super(GreenExecutor, self).__init__(None)
def _loop(self):
while True:
self._method()
eventlet.sleep()
def execute(self):
self.thread = eventlet.spawn(self._loop)
def wait(self):
if self.thread is not None:
self.thread.wait()
def stop(self):
if self.thread is not None:
self.thread.kill()

View File

@ -0,0 +1,86 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
from oslo_utils import eventletutils
import zmq
from oslo_messaging._drivers.zmq_driver import zmq_poller
LOG = logging.getLogger(__name__)
_threading = threading
if eventletutils.EVENTLET_AVAILABLE:
import eventlet
_threading = eventlet.patcher.original('threading')
class ThreadingPoller(zmq_poller.ZmqPoller):
def __init__(self):
self.poller = zmq.Poller()
self.recv_methods = {}
def register(self, socket, recv_method=None):
if recv_method is not None:
self.recv_methods[socket] = recv_method
self.poller.register(socket, zmq.POLLIN)
def poll(self, timeout=None):
timeout = timeout * 1000 # zmq poller waits milliseconds
sockets = None
try:
sockets = dict(self.poller.poll(timeout=timeout))
except zmq.ZMQError as e:
LOG.debug("Polling terminated with error: %s" % e)
if not sockets:
return None, None
for socket in sockets:
if socket in self.recv_methods:
return self.recv_methods[socket](socket), socket
else:
return socket.recv_multipart(), socket
def resume_polling(self, socket):
pass # Nothing to do for threading poller
def close(self):
pass # Nothing to do for threading poller
class ThreadingExecutor(zmq_poller.Executor):
def __init__(self, method):
self._method = method
super(ThreadingExecutor, self).__init__(
_threading.Thread(target=self._loop))
self._stop = _threading.Event()
def _loop(self):
while not self._stop.is_set():
self._method()
def execute(self):
self.thread.start()
def stop(self):
self._stop.set()
def wait(self):
self.thread.join()

View File

@ -0,0 +1,86 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta)
class ConsumerBase(object):
def __init__(self, conf, poller, server):
self.conf = conf
self.poller = poller
self.server = server
self.sockets = []
self.context = zmq.Context()
def subscribe_socket(self, socket_type):
try:
socket = zmq_socket.ZmqRandomPortSocket(
self.conf, self.context, socket_type)
self.sockets.append(socket)
self.poller.register(socket, self.receive_message)
LOG.info(_LI("Run %(stype)s consumer on %(addr)s:%(port)d"),
{"stype": socket_type,
"addr": socket.bind_address,
"port": socket.port})
return socket
except zmq.ZMQError as e:
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\
% (self.port, e)
LOG.error(_LE("Failed binding to port %(port)d: %(e)s")
% (self.port, e))
raise rpc_common.RPCException(errmsg)
@abc.abstractmethod
def listen(self, target):
"""Associate new sockets with targets here"""
@abc.abstractmethod
def receive_message(self, target):
"""Method for poller - receiving message routine"""
def cleanup(self):
for socket in self.sockets:
if not socket.handle.closed:
socket.setsockopt(zmq.LINGER, 0)
socket.close()
self.sockets = []
class SingleSocketConsumer(ConsumerBase):
def __init__(self, conf, poller, server, socket_type):
super(SingleSocketConsumer, self).__init__(conf, poller, server)
self.socket = self.subscribe_socket(socket_type)
@property
def address(self):
return self.socket.bind_address
@property
def port(self):
return self.socket.port

View File

@ -0,0 +1,69 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class PullIncomingMessage(base.IncomingMessage):
def __init__(self, listener, context, message):
super(PullIncomingMessage, self).__init__(listener, context, message)
def reply(self, reply=None, failure=None, log_failure=True):
"""Reply is not needed for non-call messages."""
def acknowledge(self):
"""Acknowledgments are not supported by this type of consumer."""
def requeue(self):
"""Requeueing is not supported."""
class PullConsumer(zmq_consumer_base.SingleSocketConsumer):
def __init__(self, conf, poller, server):
super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL)
def listen(self, target):
LOG.info(_LI("Listen to target %s") % str(target))
# Do nothing here because we have a single socket
def receive_message(self, socket):
try:
msg_type = socket.recv_string()
assert msg_type is not None, 'Bad format: msg type expected'
context = socket.recv_pyobj()
message = socket.recv_pyobj()
LOG.info(_LI("Received %(msg_type)s message %(msg)s")
% {"msg_type": msg_type,
"msg": str(message)})
if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
return PullIncomingMessage(self.server, context, message)
else:
LOG.error(_LE("Unknown message type: %s") % msg_type)
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed: %s") % str(e))

View File

@ -0,0 +1,94 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class RouterIncomingMessage(base.IncomingMessage):
def __init__(self, listener, context, message, socket, reply_id, msg_id,
poller):
super(RouterIncomingMessage, self).__init__(listener, context, message)
self.socket = socket
self.reply_id = reply_id
self.msg_id = msg_id
self.message = message
poller.resume_polling(socket)
def reply(self, reply=None, failure=None, log_failure=True):
"""Reply is not needed for non-call messages"""
def acknowledge(self):
LOG.info("Sending acknowledge for %s", self.msg_id)
ack_message = {zmq_names.FIELD_ID: self.msg_id}
self.socket.send(self.reply_id, zmq.SNDMORE)
self.socket.send(b'', zmq.SNDMORE)
self.socket.send_pyobj(ack_message)
def requeue(self):
"""Requeue is not supported"""
class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
def __init__(self, conf, poller, server):
super(RouterConsumer, self).__init__(conf, poller, server, zmq.ROUTER)
def listen(self, target):
LOG.info(_LI("Listen to target %s") % str(target))
# Do nothing here because we have a single socket
def receive_message(self, socket):
try:
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
msg_type = socket.recv_string()
assert msg_type is not None, 'Bad format: msg type expected'
msg_id = None
if msg_type != zmq_names.CALL_TYPE:
msg_id = socket.recv_string()
context = socket.recv_pyobj()
message = socket.recv_pyobj()
LOG.info(_LI("Received %(msg_type)s message %(msg)s")
% {"msg_type": msg_type,
"msg": str(message)})
if msg_type == zmq_names.CALL_TYPE:
return zmq_incoming_message.ZmqIncomingRequest(
self.server, context, message, socket, reply_id,
self.poller)
elif msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
return RouterIncomingMessage(
self.server, context, message, socket, reply_id,
msg_id, self.poller)
else:
LOG.error(_LE("Unknown message type: %s") % msg_type)
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed: %s") % str(e))

View File

@ -0,0 +1,55 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ZmqIncomingRequest(base.IncomingMessage):
def __init__(self, listener, context, message, socket, rep_id, poller):
super(ZmqIncomingRequest, self).__init__(listener, context, message)
self.reply_socket = socket
self.reply_id = rep_id
self.received = None
self.poller = poller
def reply(self, reply=None, failure=None, log_failure=True):
if failure is not None:
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
message_reply = {zmq_names.FIELD_REPLY: reply,
zmq_names.FIELD_FAILURE: failure,
zmq_names.FIELD_LOG_FAILURE: log_failure}
LOG.info("Replying %s REP", (str(message_reply)))
self.received = True
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
self.reply_socket.send(b'', zmq.SNDMORE)
self.reply_socket.send_pyobj(message_reply)
self.poller.resume_polling(self.reply_socket)
def requeue(self):
"""Requeue is not supported"""

View File

@ -0,0 +1,80 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_router_consumer
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ZmqServer(base.Listener):
def __init__(self, driver, conf, matchmaker=None):
super(ZmqServer, self).__init__(driver)
self.matchmaker = matchmaker
self.poller = zmq_async.get_poller()
self.rpc_consumer = zmq_router_consumer.RouterConsumer(
conf, self.poller, self)
self.notify_consumer = self.rpc_consumer
self.consumers = [self.rpc_consumer]
def poll(self, timeout=None):
message, socket = self.poller.poll(
timeout or self.conf.rpc_poll_timeout)
return message
def stop(self):
consumer = self.rpc_consumer
LOG.info("Stop server %s:%d" % (consumer.address, consumer.port))
def cleanup(self):
self.poller.close()
for consumer in self.consumers:
consumer.cleanup()
def listen(self, target):
consumer = self.rpc_consumer
consumer.listen(target)
LOG.info("Listen to target %s on %s:%d" %
(target, consumer.address, consumer.port))
host = zmq_address.combine_address(self.conf.rpc_zmq_host,
consumer.port)
self.matchmaker.register(target=target,
hostname=host)
def listen_notification(self, targets_and_priorities):
consumer = self.notify_consumer
LOG.info("Listen for notifications on %s:%d"
% (consumer.address, consumer.port))
for target, priority in targets_and_priorities:
host = zmq_address.combine_address(self.conf.rpc_zmq_host,
consumer.port)
t = copy.deepcopy(target)
t.topic = target.topic + '.' + priority
self.matchmaker.register(target=t, hostname=host)
consumer.listen(t)

View File

@ -0,0 +1,25 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def combine_address(host, port):
return "%s:%s" % (host, port)
def get_tcp_direct_address(host):
return "tcp://%s" % (host)
def get_tcp_random_address(conf):
return "tcp://*"

View File

@ -0,0 +1,75 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver.poller import green_poller
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
from oslo_messaging._i18n import _, _LE
from oslo_utils import importutils
LOG = logging.getLogger(__name__)
# Map zmq_concurrency config option names to the actual module name.
ZMQ_MODULES = {
'native': 'zmq',
'eventlet': 'eventlet.green.zmq',
}
def import_zmq(zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
imported_zmq = importutils.try_import(ZMQ_MODULES[zmq_concurrency],
default='zmq')
if imported_zmq is None:
errmsg = _LE("ZeroMQ not found!")
LOG.error(_LE("ZeroMQ not found!"))
raise ImportError(errmsg)
return imported_zmq
def get_poller(zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
return green_poller.GreenPoller()
return threading_poller.ThreadingPoller()
def get_reply_poller(zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
return green_poller.HoldReplyPoller()
return threading_poller.ThreadingPoller()
def get_executor(method, zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
return green_poller.GreenExecutor(method)
return threading_poller.ThreadingExecutor(method)
def _is_eventlet_zmq_available():
return importutils.try_import('eventlet.green.zmq')
def _raise_error_if_invalid_config_value(zmq_concurrency):
if zmq_concurrency not in ZMQ_MODULES:
errmsg = _('Invalid zmq_concurrency value: %s')
raise ValueError(errmsg % zmq_concurrency)

View File

@ -0,0 +1,53 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers.zmq_driver import zmq_async
zmq = zmq_async.import_zmq()
ZMQ_SOCKET_STR = {zmq.DEALER: "DEALER",
zmq.ROUTER: "ROUTER",
zmq.PUSH: "PUSH",
zmq.PULL: "PULL",
zmq.REQ: "REQ",
zmq.REP: "REP",
zmq.PUB: "PUB",
zmq.SUB: "SUB"}
FIELD_FAILURE = 'failure'
FIELD_REPLY = 'reply'
FIELD_LOG_FAILURE = 'log_failure'
FIELD_ID = 'id'
CALL_TYPE = 'call'
CAST_TYPE = 'cast'
CAST_FANOUT_TYPE = 'cast-f'
NOTIFY_TYPE = 'notify'
NOTIFY_FANOUT_TYPE = 'notify-f'
MESSAGE_TYPES = (CALL_TYPE,
CAST_TYPE,
CAST_FANOUT_TYPE,
NOTIFY_TYPE,
NOTIFY_FANOUT_TYPE)
MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_FANOUT_TYPE)
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE)
CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE)
def socket_type_str(socket_type):
return ZMQ_SOCKET_STR[socket_type]

View File

@ -0,0 +1,106 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class ZmqPoller(object):
"""Base poller interface
Needed to poll on zmq sockets in green and native async manner.
Native poller implementation wraps zmq.Poller helper class.
Wrapping is needed to provide unified poller interface
in zmq-driver (for both native and zmq pollers). It makes some
difference with poller-helper from zmq library which doesn't actually
receive message.
The poller object should be obtained over:
poller = zmq_async.get_poller()
Then we have to register sockets for polling. We are able
to provide specific receiving method. By default poller calls
socket.recv_multipart.
def receive_message(socket):
id = socket.recv_string()
ctxt = socket.recv_json()
msg = socket.recv_json()
return (id, ctxt, msg)
poller.register(socket, recv_method=receive_message)
Further to receive a message we should call:
message, socket = poller.poll()
The 'message' here contains (id, ctxt, msg) tuple.
"""
@abc.abstractmethod
def register(self, socket, recv_method=None):
"""Register socket to poll
:param socket: Socket to subscribe for polling
:type socket: zmq.Socket
:param recv_method: Optional specific receiver procedure
Should return received message object
:type recv_method: callable
"""
@abc.abstractmethod
def poll(self, timeout=None):
"""Poll for messages
:param timeout: Optional polling timeout
None or -1 means poll forever
any positive value means timeout in seconds
:type timeout: int
:returns: (message, socket) tuple
"""
@abc.abstractmethod
def close(self):
"""Terminate polling"""
def resume_polling(self, socket):
"""Resume with polling
Some implementations of poller may provide hold polling before reply
This method is intended to excplicitly resume polling aftewards.
"""
@six.add_metaclass(abc.ABCMeta)
class Executor(object):
"""Base executor interface for threading/green async executors"""
def __init__(self, thread):
self.thread = thread
@abc.abstractmethod
def execute(self):
"""Run execution"""
@abc.abstractmethod
def stop(self):
"""Stop execution"""
@abc.abstractmethod
def wait(self):
"""Wait until pass"""

View File

@ -0,0 +1,82 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ZmqSocket(object):
def __init__(self, context, socket_type):
self.context = context
self.socket_type = socket_type
self.handle = context.socket(socket_type)
self.connections = set()
def type_name(self):
return zmq_names.socket_type_str(self.socket_type)
def connections_count(self):
return len(self.connections)
def connect(self, address):
if address not in self.connections:
self.handle.connect(address)
self.connections.add(address)
def setsockopt(self, *args, **kwargs):
self.handle.setsockopt(*args, **kwargs)
def send(self, *args, **kwargs):
self.handle.send(*args, **kwargs)
def send_string(self, *args, **kwargs):
self.handle.send_string(*args, **kwargs)
def send_json(self, *args, **kwargs):
self.handle.send_json(*args, **kwargs)
def send_pyobj(self, *args, **kwargs):
self.handle.send_pyobj(*args, **kwargs)
def recv(self, *args, **kwargs):
return self.handle.recv(*args, **kwargs)
def recv_string(self, *args, **kwargs):
return self.handle.recv_string(*args, **kwargs)
def recv_json(self, *args, **kwargs):
return self.handle.recv_json(*args, **kwargs)
def recv_pyobj(self, *args, **kwargs):
return self.handle.recv_pyobj(*args, **kwargs)
def close(self, *args, **kwargs):
self.handle.close(*args, **kwargs)
class ZmqRandomPortSocket(ZmqSocket):
def __init__(self, conf, context, socket_type):
super(ZmqRandomPortSocket, self).__init__(context, socket_type)
self.conf = conf
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
self.port = self.handle.bind_to_random_port(self.bind_address)

View File

@ -59,7 +59,8 @@ class ConfFixture(fixtures.Fixture):
_import_opts(self.conf,
'oslo_messaging._drivers.impl_zmq', 'zmq_opts')
_import_opts(self.conf,
'oslo_messaging._drivers.matchmaker_redis',
'oslo_messaging._drivers.zmq_driver.'
'matchmaker.matchmaker_redis',
'matchmaker_redis_opts',
'matchmaker_redis')
_import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts')

View File

@ -25,10 +25,8 @@ from oslo_messaging._drivers import base as drivers_base
from oslo_messaging._drivers import impl_qpid
from oslo_messaging._drivers import impl_rabbit
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers import matchmaker
from oslo_messaging._drivers import matchmaker_redis
from oslo_messaging._drivers import matchmaker_ring
from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts
from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis
from oslo_messaging._executors import impl_pooledexecutor
from oslo_messaging.notify import notifier
from oslo_messaging.rpc import client
@ -37,7 +35,7 @@ from oslo_messaging import transport
_global_opt_lists = [
drivers_base.base_opts,
impl_zmq.zmq_opts,
matchmaker.matchmaker_opts,
matchmaker_redis.matchmaker_redis_opts,
impl_pooledexecutor._pool_opts,
notifier._notifier_opts,
client._client_opts,
@ -47,7 +45,6 @@ _global_opt_lists = [
_opts = [
(None, list(itertools.chain(*_global_opt_lists))),
('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts),
('matchmaker_ring', matchmaker_ring.matchmaker_opts),
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts,
impl_rabbit.rabbit_opts))),

View File

@ -1,473 +0,0 @@
# Copyright 2014 Canonical, Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import socket
import fixtures
from oslo_utils import importutils
import testtools
try:
import zmq
except ImportError:
zmq = None
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
# eventlet is not yet py3 compatible, so skip if not installed
eventlet = importutils.try_import('eventlet')
impl_zmq = importutils.try_import('oslo_messaging._drivers.impl_zmq')
LOG = logging.getLogger(__name__)
def get_unused_port():
"""Returns an unused port on localhost."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('localhost', 0))
port = s.getsockname()[1]
s.close()
return port
class ZmqBaseTestCase(test_utils.BaseTestCase):
"""Base test case for all ZMQ tests that make use of the ZMQ Proxy"""
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestConfZmqDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
transport = oslo_messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
class stopRpc(object):
def __init__(self, attrs):
self.attrs = attrs
def __call__(self):
if self.attrs['reactor']:
self.attrs['reactor'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
class TestZmqBasics(ZmqBaseTestCase):
def test_start_stop_listener(self):
target = oslo_messaging.Target(topic='testtopic')
listener = self.driver.listen(target)
result = listener.poll(0.01)
self.assertEqual(result, None)
def test_send_receive_raises(self):
"""Call() without method."""
target = oslo_messaging.Target(topic='testtopic')
self.driver.listen(target)
self.assertRaises(
KeyError,
self.driver.send,
target, {}, {'tx_id': 1}, wait_for_reply=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqIncomingMessage')
def test_send_receive_topic(self, mock_msg):
"""Call() with method."""
mock_msg.return_value = msg = mock.MagicMock()
msg.received = received = mock.MagicMock()
received.failure = False
received.reply = True
msg.condition = condition = mock.MagicMock()
condition.wait.return_value = True
target = oslo_messaging.Target(topic='testtopic')
self.driver.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
def test_send_receive_fanout(self, mock_call):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
self.driver.listen(target)
mock_call.__name__ = '_call'
mock_call.return_value = [True]
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
mock_call.assert_called_once_with(
self.driver,
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
{}, 'fanout~testtopic.127.0.0.1',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [], True)
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
def test_send_receive_direct(self, mock_call):
# Also verifies fix for bug http://pad.lv/1301723
target = oslo_messaging.Target(topic='testtopic', server='localhost')
self.driver.listen(target)
mock_call.__name__ = '_call'
mock_call.return_value = [True]
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
mock_call.assert_called_once_with(
self.driver,
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
{}, 'testtopic.localhost',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [], True)
class TestZmqSocket(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestZmqSocket, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False,
subscribe=None)
self.assertTrue(sock.can_recv)
self.assertFalse(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False,
subscribe=None)
self.assertTrue(sock.can_recv)
self.assertFalse(sock.can_send)
self.assertTrue(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False,
subscribe=None)
self.assertFalse(sock.can_recv)
self.assertTrue(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False,
subscribe=None)
self.assertFalse(sock.can_recv)
self.assertTrue(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
class TestZmqIncomingMessage(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestZmqIncomingMessage, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
def test_zmqincomingmessage(self):
msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo')
msg.reply("abc")
self.assertIsInstance(
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
self.assertIsInstance(
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
self.assertEqual(msg.received.reply, "abc")
msg.requeue()
class TestZmqConnection(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver.conf, self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
(self.internal_ipc_dir))
# No Fanout
conn.create_consumer(topic, context)
conn.reactor.register.assert_called_with(context, inaddr,
impl_zmq.zmq.PULL,
subscribe=None, in_bind=False)
# Reset for next bunch of checks
conn.reactor.register.reset_mock()
# Fanout
inaddr = ('ipc://%s/zmq_topic_fanout~topic' %
(self.internal_ipc_dir))
conn.create_consumer(topic, context, fanout='subscriber.foo')
conn.reactor.register.assert_called_with(context, inaddr,
impl_zmq.zmq.SUB,
subscribe='subscriber.foo',
in_bind=False)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver.conf, self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
(self.internal_ipc_dir))
conn.create_consumer(topic, context)
conn.reactor.register.assert_called_with(
context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False)
conn.reactor.register.reset_mock()
# Call again with same topic
conn.create_consumer(topic, context)
self.assertFalse(conn.reactor.register.called)
@mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
conn = impl_zmq.Connection(self.driver.conf, self.driver)
conn.reactor.close = mock.Mock()
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
conn.close()
self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called)
self.assertTrue(conn.reactor.close.called)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_wait(self, mock_reactor):
conn = impl_zmq.Connection(self.driver, self.driver)
conn.reactor.wait = mock.Mock()
conn.wait()
self.assertTrue(conn.reactor.wait.called)
@mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_consume_in_thread(self, mock_reactor,
mock_getmatchmaker):
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
conn = impl_zmq.Connection(self.driver, self.driver)
conn.reactor.consume_in_thread = mock.Mock()
conn.consume_in_thread()
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
self.assertTrue(conn.reactor.consume_in_thread.called)
class TestZmqListener(ZmqBaseTestCase):
def test_zmqlistener_no_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
# Timeout = 0 should return straight away since the queue is empty
listener.poll(timeout=0)
def test_zmqlistener_w_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
kwargs = {'a': 1, 'b': 2}
m = mock.Mock()
ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
message = {'namespace': 'name.space', 'method': m.fake_method,
'args': kwargs}
eventlet.spawn_n(listener.dispatch, ctxt, message)
resp = listener.poll(timeout=10)
msg = {'method': m.fake_method, 'namespace': 'name.space',
'args': kwargs}
self.assertEqual(resp.message, msg)
class TestZmqDriver(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.matchmaker.MatchMakerBase.queues',
autospec=True)
def test_zmqdriver_multi_send_cast_with_no_queues(self,
mock_queues,
mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic'
msg = 'jeronimo'
with mock.patch.object(impl_zmq.LOG, 'warn') as flog:
mock_queues.return_value = None
impl_zmq._multi_send(self.driver, mock_cast,
context, topic, msg)
self.assertEqual(1, flog.call_count)
args, kwargs = flog.call_args
self.assertIn('No matchmaker results', args[0])
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
@mock.patch('oslo_messaging._drivers.matchmaker.MatchMakerBase.queues',
autospec=True)
def test_zmqdriver_multi_send_call_with_no_queues(self,
mock_queues,
mock_call):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic'
msg = 'jeronimo'
mock_queues.return_value = None
self.assertRaises(rpc_common.Timeout,
impl_zmq._multi_send, self.driver,
mock_call, context, topic, msg)
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
def test_zmqdriver_send(self, mock_multi_send, mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic'
msg = 'jeronimo'
self.driver.send(oslo_messaging.Target(topic=topic), context, msg,
False, 0, False)
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
topic, msg,
allowed_remote_exmods=[],
envelope=False, pooled=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic.foo'
topic_reformat = 'testtopic-foo'
msg = 'jeronimo'
self.driver.send_notification(oslo_messaging.Target(topic=topic),
context, msg, False, False)
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
topic_reformat, msg,
allowed_remote_exmods=[],
envelope=False, pooled=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
def test_zmqdriver_listen(self, mock_connection, mock_listener):
mock_listener.return_value = listener = mock.Mock()
mock_connection.return_value = conn = mock.Mock()
conn.create_consumer = mock.Mock()
conn.consume_in_thread = mock.Mock()
topic = 'testtopic.foo'
self.driver.listen(oslo_messaging.Target(topic=topic))
conn.create_consumer.assert_called_with(topic, listener, fanout=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
def test_zmqdriver_listen_for_notification(self, mock_connection,
mock_listener):
mock_listener.return_value = listener = mock.Mock()
mock_connection.return_value = conn = mock.Mock()
conn.create_consumer = mock.Mock()
conn.consume_in_thread = mock.Mock()
topic = 'testtopic.foo'
data = [(oslo_messaging.Target(topic=topic), 0)]
# NOTE(jamespage): Pooling not supported, just pass None for now.
self.driver.listen_for_notifications(data, None)
conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener)

View File

@ -1,69 +0,0 @@
# Copyright 2014 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import importutils
import testtools
from oslo_messaging.tests import utils as test_utils
# NOTE(jamespage) matchmaker tied directly to eventlet
# which is not yet py3 compatible - skip if import fails
matchmaker = (
importutils.try_import('oslo_messaging._drivers.matchmaker'))
@testtools.skipIf(not matchmaker, "matchmaker/eventlet unavailable")
class MatchmakerTest(test_utils.BaseTestCase):
def test_fanout_binding(self):
matcher = matchmaker.MatchMakerBase()
matcher.add_binding(
matchmaker.FanoutBinding(), matchmaker.DirectExchange())
self.assertEqual(matcher.queues('hello.world'), [])
self.assertEqual(
matcher.queues('fanout~fantasy.unicorn'),
[('fanout~fantasy.unicorn', 'unicorn')])
self.assertEqual(
matcher.queues('fanout~fantasy.pony'),
[('fanout~fantasy.pony', 'pony')])
def test_topic_binding(self):
matcher = matchmaker.MatchMakerBase()
matcher.add_binding(
matchmaker.TopicBinding(), matchmaker.StubExchange())
self.assertEqual(
matcher.queues('hello-world'), [('hello-world', None)])
def test_direct_binding(self):
matcher = matchmaker.MatchMakerBase()
matcher.add_binding(
matchmaker.DirectBinding(), matchmaker.StubExchange())
self.assertEqual(
matcher.queues('hello.server'), [('hello.server', None)])
self.assertEqual(matcher.queues('hello-world'), [])
def test_localhost_match(self):
matcher = matchmaker.MatchMakerLocalhost()
self.assertEqual(
matcher.queues('hello.server'), [('hello.server', 'server')])
# Gets remapped due to localhost exchange
# all bindings default to first match.
self.assertEqual(
matcher.queues('fanout~testing.server'),
[('fanout~testing.localhost', 'localhost')])
self.assertEqual(
matcher.queues('hello-world'),
[('hello-world.localhost', 'localhost')])

View File

@ -1,97 +0,0 @@
# Copyright 2014 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import importutils
import testtools
from oslo_messaging.tests import utils as test_utils
redis = importutils.try_import('redis')
matchmaker_redis = (
importutils.try_import('oslo_messaging._drivers.matchmaker_redis'))
def redis_available():
'''Helper to see if local redis server is running'''
if not redis:
return False
try:
c = redis.StrictRedis(socket_timeout=1)
c.ping()
return True
except redis.exceptions.ConnectionError:
return False
@testtools.skipIf(not matchmaker_redis, "matchmaker/eventlet unavailable")
@testtools.skipIf(not redis_available(), "redis unavailable")
class RedisMatchMakerTest(test_utils.BaseTestCase):
def setUp(self):
super(RedisMatchMakerTest, self).setUp()
self.ring_data = {
"conductor": ["controller1", "node1", "node2", "node3"],
"scheduler": ["controller1", "node1", "node2", "node3"],
"network": ["controller1", "node1", "node2", "node3"],
"cert": ["controller1"],
"console": ["controller1"],
"l3_agent.node1": ["node1"],
"consoleauth": ["controller1"]}
self.matcher = matchmaker_redis.MatchMakerRedis()
self.populate()
def tearDown(self):
super(RedisMatchMakerTest, self).tearDown()
c = redis.StrictRedis()
c.flushdb()
def populate(self):
for k, hosts in self.ring_data.items():
for h in hosts:
self.matcher.register(k, h)
def test_direct(self):
self.assertEqual(
self.matcher.queues('cert.controller1'),
[('cert.controller1', 'controller1')])
def test_register(self):
self.matcher.register('cert', 'keymaster')
self.assertEqual(
sorted(self.matcher.redis.smembers('cert')),
[b'cert.controller1', b'cert.keymaster'])
self.matcher.register('l3_agent.node1', 'node1')
self.assertEqual(
sorted(self.matcher.redis.smembers('l3_agent.node1')),
[b'l3_agent.node1.node1'])
def test_unregister(self):
self.matcher.unregister('conductor', 'controller1')
self.assertEqual(
sorted(self.matcher.redis.smembers('conductor')),
[b'conductor.node1', b'conductor.node2', b'conductor.node3'])
def test_ack_alive(self):
self.matcher.ack_alive('ack_alive', 'controller1')
self.assertEqual(
sorted(self.matcher.redis.smembers('ack_alive')),
[b'ack_alive.controller1'])
def test_is_alive(self):
self.assertEqual(
self.matcher.is_alive('conductor', 'conductor.controller1'),
True)
self.assertEqual(
self.matcher.is_alive('conductor', 'conductor.controller2'),
False)

View File

@ -1,73 +0,0 @@
# Copyright 2014 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import importutils
import testtools
from oslo_messaging.tests import utils as test_utils
# NOTE(jamespage) matchmaker tied directly to eventlet
# which is not yet py3 compatible - skip if import fails
matchmaker_ring = (
importutils.try_import('oslo_messaging._drivers.matchmaker_ring'))
@testtools.skipIf(not matchmaker_ring, "matchmaker/eventlet unavailable")
class MatchmakerRingTest(test_utils.BaseTestCase):
def setUp(self):
super(MatchmakerRingTest, self).setUp()
self.ring_data = {
"conductor": ["controller1", "node1", "node2", "node3"],
"scheduler": ["controller1", "node1", "node2", "node3"],
"network": ["controller1", "node1", "node2", "node3"],
"cert": ["controller1"],
"console": ["controller1"],
"consoleauth": ["controller1"]}
self.matcher = matchmaker_ring.MatchMakerRing(self.ring_data)
def test_direct(self):
self.assertEqual(
self.matcher.queues('cert.controller1'),
[('cert.controller1', 'controller1')])
self.assertEqual(
self.matcher.queues('conductor.node1'),
[('conductor.node1', 'node1')])
def test_fanout(self):
self.assertEqual(
self.matcher.queues('fanout~conductor'),
[('fanout~conductor.controller1', 'controller1'),
('fanout~conductor.node1', 'node1'),
('fanout~conductor.node2', 'node2'),
('fanout~conductor.node3', 'node3')])
def test_bare_topic(self):
# Round robins through the hosts on the topic
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.controller1', 'controller1')])
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.node1', 'node1')])
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.node2', 'node2')])
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.node3', 'node3')])
# Cycles loop
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.controller1', 'controller1')])

View File

@ -0,0 +1,80 @@
# Copyright 2014 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from stevedore import driver
import testscenarios
import oslo_messaging
from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
class TestImplMatchmaker(test_utils.BaseTestCase):
scenarios = [
("dummy", {"rpc_zmq_matchmaker": "dummy"}),
("redis", {"rpc_zmq_matchmaker": "redis"}),
]
def setUp(self):
super(TestImplMatchmaker, self).setUp()
self.test_matcher = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
self.rpc_zmq_matchmaker,
).driver(self.conf)
if self.rpc_zmq_matchmaker == "redis":
self.addCleanup(self.test_matcher._redis.flushdb)
self.target = oslo_messaging.Target(topic="test_topic")
self.host1 = b"test_host1"
self.host2 = b"test_host2"
def test_register(self):
self.test_matcher.register(self.target, self.host1)
self.assertEqual(self.test_matcher.get_hosts(self.target),
[self.host1])
self.assertEqual(self.test_matcher.get_single_host(self.target),
self.host1)
def test_register_two_hosts(self):
self.test_matcher.register(self.target, self.host1)
self.test_matcher.register(self.target, self.host2)
self.assertItemsEqual(self.test_matcher.get_hosts(self.target),
[self.host1, self.host2])
self.assertIn(self.test_matcher.get_single_host(self.target),
[self.host1, self.host2])
def test_register_two_same_hosts(self):
self.test_matcher.register(self.target, self.host1)
self.test_matcher.register(self.target, self.host1)
self.assertEqual(self.test_matcher.get_hosts(self.target),
[self.host1])
self.assertEqual(self.test_matcher.get_single_host(self.target),
self.host1)
def test_get_hosts_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
self.assertEqual(self.test_matcher.get_hosts(target), [])
def test_get_single_host_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
self.assertRaises(oslo_messaging.InvalidTarget,
self.test_matcher.get_single_host, target)

View File

@ -0,0 +1,246 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import fixtures
import testtools
import oslo_messaging
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _
from oslo_messaging.tests import utils as test_utils
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class TestServerListener(object):
def __init__(self, driver):
self.driver = driver
self.listener = None
self.executor = zmq_async.get_executor(self._run)
self._stop = threading.Event()
self._received = threading.Event()
self.message = None
def listen(self, target):
self.listener = self.driver.listen(target)
self.executor.execute()
def listen_notifications(self, targets_and_priorities):
self.listener = self.driver.listen_for_notifications(
targets_and_priorities, {})
self.executor.execute()
def _run(self):
try:
message = self.listener.poll()
if message is not None:
message.acknowledge()
self._received.set()
self.message = message
message.reply(reply=True)
except Exception:
LOG.exception(_("Unexpected exception occurred."))
def stop(self):
self.executor.stop()
class ZmqBaseTestCase(test_utils.BaseTestCase):
"""Base test case for all ZMQ tests """
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'rpc_zmq_matchmaker': 'dummy'}
self.config(**kwargs)
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
self.listener = TestServerListener(self.driver)
self.addCleanup(stopRpc(self.__dict__))
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestConfZmqDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
transport = oslo_messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
class stopRpc(object):
def __init__(self, attrs):
self.attrs = attrs
def __call__(self):
if self.attrs['driver']:
self.attrs['driver'].cleanup()
if self.attrs['listener']:
self.attrs['listener'].stop()
class TestZmqBasics(ZmqBaseTestCase):
def test_send_receive_raises(self):
"""Call() without method."""
target = oslo_messaging.Target(topic='testtopic')
self.listener.listen(target)
self.assertRaises(
KeyError,
self.driver.send,
target, {}, {'tx_id': 1}, wait_for_reply=True)
def test_send_receive_topic(self):
"""Call() with topic."""
target = oslo_messaging.Target(topic='testtopic')
self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertTrue(result)
def test_send_noreply(self):
"""Cast() with topic."""
target = oslo_messaging.Target(topic='testtopic', server="my@server")
self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait()
self.assertIsNone(result)
self.assertEqual(True, self.listener._received.isSet())
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
def test_send_fanout(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=False)
self.listener._received.wait()
self.assertIsNone(result)
self.assertEqual(True, self.listener._received.isSet())
method = self.listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
def test_send_receive_direct(self):
"""Call() without topic."""
target = oslo_messaging.Target(server='127.0.0.1')
self.listener.listen(target)
message = {'method': 'hello-world', 'tx_id': 1}
context = {}
result = self.driver.send(target, context, message,
wait_for_reply=True)
self.assertTrue(result)
def test_send_receive_notification(self):
"""Notify() test"""
target = oslo_messaging.Target(topic='t1',
server='notification@server')
self.listener.listen_notifications([(target, 'info')])
message = {'method': 'hello-world', 'tx_id': 1}
context = {}
target.topic = target.topic + '.info'
self.driver.send_notification(target, context, message, '3.0')
self.listener._received.wait(5)
self.assertTrue(self.listener._received.isSet())
class TestPoller(test_utils.BaseTestCase):
def setUp(self):
super(TestPoller, self).setUp()
self.poller = zmq_async.get_poller()
self.ctx = zmq.Context()
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
self.ADDR_REQ = "ipc://%s/request1" % self.internal_ipc_dir
def test_poll_blocking(self):
rep = self.ctx.socket(zmq.REP)
rep.bind(self.ADDR_REQ)
reply_poller = zmq_async.get_reply_poller()
reply_poller.register(rep)
def listener():
incoming, socket = reply_poller.poll()
self.assertEqual(b'Hello', incoming[0])
socket.send_string('Reply')
reply_poller.resume_polling(socket)
executor = zmq_async.get_executor(listener)
executor.execute()
req1 = self.ctx.socket(zmq.REQ)
req1.connect(self.ADDR_REQ)
req2 = self.ctx.socket(zmq.REQ)
req2.connect(self.ADDR_REQ)
req1.send_string('Hello')
req2.send_string('Hello')
reply = req1.recv_string()
self.assertEqual('Reply', reply)
reply = req2.recv_string()
self.assertEqual('Reply', reply)
def test_poll_timeout(self):
rep = self.ctx.socket(zmq.REP)
rep.bind(self.ADDR_REQ)
reply_poller = zmq_async.get_reply_poller()
reply_poller.register(rep)
incoming, socket = reply_poller.poll(1)
self.assertIsNone(incoming)
self.assertIsNone(socket)

View File

@ -0,0 +1,170 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_messaging._drivers.zmq_driver.poller import green_poller
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests import utils as test_utils
class TestImportZmq(test_utils.BaseTestCase):
def setUp(self):
super(TestImportZmq, self).setUp()
def test_config_short_names_are_converted_to_correct_module_names(self):
mock_try_import = mock.Mock()
zmq_async.importutils.try_import = mock_try_import
zmq_async.importutils.try_import.return_value = 'mock zmq module'
self.assertEqual('mock zmq module', zmq_async.import_zmq('native'))
mock_try_import.assert_called_with('zmq', default='zmq')
zmq_async.importutils.try_import.return_value = 'mock eventlet module'
self.assertEqual('mock eventlet module',
zmq_async.import_zmq('eventlet'))
mock_try_import.assert_called_with('eventlet.green.zmq', default='zmq')
def test_when_no_args_then_default_zmq_module_is_loaded(self):
mock_try_import = mock.Mock()
zmq_async.importutils.try_import = mock_try_import
zmq_async.import_zmq()
mock_try_import.assert_called_with('eventlet.green.zmq', default='zmq')
def test_when_import_fails_then_raise_ImportError(self):
zmq_async.importutils.try_import = mock.Mock()
zmq_async.importutils.try_import.return_value = None
with self.assertRaisesRegexp(ImportError, "ZeroMQ not found!"):
zmq_async.import_zmq('native')
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
errmsg = 'Invalid zmq_concurrency value: x'
with self.assertRaisesRegexp(ValueError, errmsg):
zmq_async.import_zmq(invalid_opt)
class TestGetPoller(test_utils.BaseTestCase):
def setUp(self):
super(TestGetPoller, self).setUp()
def test_when_no_arg_to_get_poller_then_return_default_poller(self):
zmq_async._is_eventlet_zmq_available = lambda: True
actual = zmq_async.get_poller()
self.assertTrue(isinstance(actual, green_poller.GreenPoller))
def test_when_native_poller_requested_then_return_ThreadingPoller(self):
actual = zmq_async.get_poller('native')
self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: False
actual = zmq_async.get_poller('eventlet')
self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
def test_when_eventlet_is_available_then_return_GreenPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: True
actual = zmq_async.get_poller('eventlet')
self.assertTrue(isinstance(actual, green_poller.GreenPoller))
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
errmsg = 'Invalid zmq_concurrency value: x'
with self.assertRaisesRegexp(ValueError, errmsg):
zmq_async.get_poller(invalid_opt)
class TestGetReplyPoller(test_utils.BaseTestCase):
def setUp(self):
super(TestGetReplyPoller, self).setUp()
def test_default_reply_poller_is_HoldReplyPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: True
actual = zmq_async.get_reply_poller()
self.assertTrue(isinstance(actual, green_poller.HoldReplyPoller))
def test_when_eventlet_is_available_then_return_HoldReplyPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: True
actual = zmq_async.get_reply_poller('eventlet')
self.assertTrue(isinstance(actual, green_poller.HoldReplyPoller))
def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: False
actual = zmq_async.get_reply_poller('eventlet')
self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
errmsg = 'Invalid zmq_concurrency value: x'
with self.assertRaisesRegexp(ValueError, errmsg):
zmq_async.get_reply_poller(invalid_opt)
class TestGetExecutor(test_utils.BaseTestCase):
def setUp(self):
super(TestGetExecutor, self).setUp()
def test_default_executor_is_GreenExecutor(self):
zmq_async._is_eventlet_zmq_available = lambda: True
executor = zmq_async.get_executor('any method')
self.assertTrue(isinstance(executor, green_poller.GreenExecutor))
self.assertEqual('any method', executor._method)
def test_when_eventlet_module_is_available_then_return_GreenExecutor(self):
zmq_async._is_eventlet_zmq_available = lambda: True
executor = zmq_async.get_executor('any method', 'eventlet')
self.assertTrue(isinstance(executor, green_poller.GreenExecutor))
self.assertEqual('any method', executor._method)
def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self):
zmq_async._is_eventlet_zmq_available = lambda: False
executor = zmq_async.get_executor('any method', 'eventlet')
self.assertTrue(isinstance(executor,
threading_poller.ThreadingExecutor))
self.assertEqual('any method', executor._method)
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
errmsg = 'Invalid zmq_concurrency value: x'
with self.assertRaisesRegexp(ValueError, errmsg):
zmq_async.get_executor('any method', invalid_opt)

View File

@ -101,6 +101,8 @@ class CallTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(0, s.endpoint.ival)
def test_timeout(self):
if self.url.startswith("zmq"):
self.skipTest("Skip CallTestCase.test_timeout for ZMQ driver")
transport = self.useFixture(utils.TransportFixture(self.url))
target = oslo_messaging.Target(topic="no_such_topic")
c = utils.ClientStub(transport.transport, target, timeout=1)
@ -111,8 +113,7 @@ class CallTestCase(utils.SkipIfNoTransportURL):
group = self.useFixture(utils.RpcServerGroupFixture(self.url))
client = group.client(1)
client.add(increment=2)
f = lambda: client.subtract(increment=3)
self.assertThat(f, matchers.raises(ValueError))
self.assertRaises(ValueError, client.subtract, increment=3)
def test_timeout_with_concurrently_queues(self):
transport = self.useFixture(utils.TransportFixture(self.url))

View File

@ -125,7 +125,7 @@ class RpcServerGroupFixture(fixtures.Fixture):
# NOTE(sileht): topic and servier_name must be uniq
# to be able to run all tests in parallel
self.topic = topic or str(uuid.uuid4())
self.names = names or ["server_%i_%s" % (i, uuid.uuid4())
self.names = names or ["server_%i_%s" % (i, str(uuid.uuid4())[:8])
for i in range(3)]
self.exchange = exchange
self.targets = [self._target(server=n) for n in self.names]

View File

@ -32,11 +32,10 @@ class OptsTestCase(test_utils.BaseTestCase):
super(OptsTestCase, self).setUp()
def _test_list_opts(self, result):
self.assertEqual(6, len(result))
self.assertEqual(5, len(result))
groups = [g for (g, l) in result]
self.assertIn(None, groups)
self.assertIn('matchmaker_ring', groups)
self.assertIn('matchmaker_redis', groups)
self.assertIn('oslo_messaging_amqp', groups)
self.assertIn('oslo_messaging_rabbit', groups)

View File

@ -97,3 +97,4 @@ class TimerTestCase(test_utils.BaseTestCase):
remaining = t.check_return(callback, 1, a='b')
self.assertEqual(0, remaining)
callback.assert_called_once_with(1, a='b')

View File

@ -22,9 +22,4 @@ EOF
redis-server --port $ZMQ_REDIS_PORT &
oslo-messaging-zmq-receiver --config-file ${DATADIR}/zmq.conf > ${DATADIR}/receiver.log 2>&1 &
# FIXME(sileht): This does the same kind of setup that devstack does
# But this doesn't work yet, a zeromq maintener should take a look on that
$*

View File

@ -52,9 +52,8 @@ oslo.messaging.notify.drivers =
oslo.messaging.zmq.matchmaker =
# Matchmakers for ZeroMQ
redis = oslo_messaging._drivers.matchmaker_redis:MatchMakerRedis
ring = oslo_messaging._drivers.matchmaker_ring:MatchMakerRing
local = oslo_messaging._drivers.matchmaker:MatchMakerLocalhost
dummy = oslo_messaging._drivers.zmq_driver.matchmaker.base:DummyMatchMaker
redis = oslo_messaging._drivers.zmq_driver.matchmaker.matchmaker_redis:RedisMatchMaker
oslo.config.opts =
oslo.messaging = oslo_messaging.opts:list_opts

View File

@ -25,7 +25,6 @@ import oslo_messaging as messaging
from oslo_messaging import notify # noqa
from oslo_messaging import rpc # noqa
LOG = logging.getLogger()
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\