Merge remote-tracking branch 'origin/feature/zmq' into merge-branch

Change-Id: If189d03131efc02045955508cef06fdd2ed590ee
This commit is contained in:
Davanum Srinivas 2015-09-15 11:06:26 -04:00
commit 97892e656a
53 changed files with 2379 additions and 2380 deletions

View File

@ -2,7 +2,7 @@ oslo.messaging
==============
The Oslo messaging API supports RPC and notifications over a number of
different messsaging transports.
different messaging transports.
Contents
========

View File

@ -1,40 +0,0 @@
#!/usr/bin/env python
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
eventlet.monkey_patch()
import contextlib
import sys
from oslo_config import cfg
from oslo_log import log
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._executors import base # FIXME(markmc)
CONF = cfg.CONF
CONF.register_opts(impl_zmq.zmq_opts)
CONF.register_opts(base._pool_opts)
def main():
CONF(sys.argv[1:], project='oslo')
log.setup(CONF, 'oslo.messaging')
with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
reactor.consume_in_thread()
reactor.wait()

View File

@ -75,8 +75,7 @@ class Listener(object):
def cleanup(self):
"""Cleanup listener.
Close connection used by listener if any. For some listeners like
zmq there is no connection so no need to close connection.
Close connection (socket) used by listener if any.
As this is listener specific method, overwrite it in to derived class
if cleanup of listener required.
"""

File diff suppressed because it is too large Load Diff

View File

@ -1,322 +0,0 @@
# Copyright 2011 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import contextlib
import logging
import eventlet
from oslo_config import cfg
from oslo_messaging._i18n import _
from oslo_messaging._i18n import _LI
matchmaker_opts = [
cfg.IntOpt('matchmaker_heartbeat_freq',
default=300,
help='Heartbeat frequency.'),
cfg.IntOpt('matchmaker_heartbeat_ttl',
default=600,
help='Heartbeat time-to-live.'),
]
CONF = cfg.CONF
CONF.register_opts(matchmaker_opts)
LOG = logging.getLogger(__name__)
contextmanager = contextlib.contextmanager
class MatchMakerException(Exception):
"""Signified a match could not be found."""
message = _("Match not found by MatchMaker.")
class Exchange(object):
"""Implements lookups.
Subclass this to support hashtables, dns, etc.
"""
def __init__(self):
pass
def run(self, key):
raise NotImplementedError()
class Binding(object):
"""A binding on which to perform a lookup."""
def __init__(self):
pass
def test(self, key):
raise NotImplementedError()
class MatchMakerBase(object):
"""Match Maker Base Class.
Build off HeartbeatMatchMakerBase if building a heartbeat-capable
MatchMaker.
"""
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
self.no_heartbeat_msg = _('Matchmaker does not implement '
'registration or heartbeat.')
def register(self, key, host):
"""Register a host on a backend.
Heartbeats, if applicable, may keepalive registration.
"""
pass
def ack_alive(self, key, host):
"""Acknowledge that a key.host is alive.
Used internally for updating heartbeats, but may also be used
publicly to acknowledge a system is alive (i.e. rpc message
successfully sent to host)
"""
pass
def is_alive(self, topic, host):
"""Checks if a host is alive."""
pass
def expire(self, topic, host):
"""Explicitly expire a host's registration."""
pass
def send_heartbeats(self):
"""Send all heartbeats.
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
pass
def unregister(self, key, host):
"""Unregister a topic."""
pass
def start_heartbeat(self):
"""Spawn heartbeat greenthread."""
pass
def stop_heartbeat(self):
"""Destroys the heartbeat greenthread."""
pass
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
# NOTE(ewindisch): kept the following method in case we implement the
# underlying support.
# def add_negate_binding(self, binding, rule, last=True):
# self.bindings.append((binding, rule, True, last))
def queues(self, key):
workers = []
# bit is for negate bindings - if we choose to implement it.
# last stops processing rules if this matches.
for (binding, exchange, bit, last) in self.bindings:
if binding.test(key):
workers.extend(exchange.run(key))
# Support last.
if last:
return workers
return workers
class HeartbeatMatchMakerBase(MatchMakerBase):
"""Base for a heart-beat capable MatchMaker.
Provides common methods for registering, unregistering, and maintaining
heartbeats.
"""
def __init__(self):
self.hosts = set()
self._heart = None
self.host_topic = {}
super(HeartbeatMatchMakerBase, self).__init__()
def send_heartbeats(self):
"""Send all heartbeats.
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
for key, host in self.host_topic.keys():
self.ack_alive(key, host)
def ack_alive(self, key, host):
"""Acknowledge that a host.topic is alive.
Used internally for updating heartbeats, but may also be used
publicly to acknowledge a system is alive (i.e. rpc message
successfully sent to host)
"""
raise NotImplementedError("Must implement ack_alive")
def backend_register(self, key, host):
"""Implements registration logic.
Called by register(self,key,host)
"""
raise NotImplementedError("Must implement backend_register")
def backend_unregister(self, key, key_host):
"""Implements de-registration logic.
Called by unregister(self,key,host)
"""
raise NotImplementedError("Must implement backend_unregister")
def register(self, key, host):
"""Register a host on a backend.
Heartbeats, if applicable, may keepalive registration.
"""
self.hosts.add(host)
self.host_topic[(key, host)] = host
key_host = '.'.join((key, host))
self.backend_register(key, key_host)
self.ack_alive(key, host)
def unregister(self, key, host):
"""Unregister a topic."""
if (key, host) in self.host_topic:
del self.host_topic[(key, host)]
self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host)))
LOG.info(_LI("Matchmaker unregistered: %(key)s, %(host)s"),
{'key': key, 'host': host})
def start_heartbeat(self):
"""Implementation of MatchMakerBase.start_heartbeat.
Launches greenthread looping send_heartbeats(),
yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations.
"""
if not self.hosts:
raise MatchMakerException(
_("Register before starting heartbeat."))
def do_heartbeat():
while True:
self.send_heartbeats()
eventlet.sleep(CONF.matchmaker_heartbeat_freq)
self._heart = eventlet.spawn(do_heartbeat)
def stop_heartbeat(self):
"""Destroys the heartbeat greenthread."""
if self._heart:
self._heart.kill()
class DirectBinding(Binding):
"""Specifies a host in the key via a '.' character.
Although dots are used in the key, the behavior here is
that it maps directly to a host, thus direct.
"""
def test(self, key):
return '.' in key
class TopicBinding(Binding):
"""Where a 'bare' key without dots.
AMQP generally considers topic exchanges to be those *with* dots,
but we deviate here in terminology as the behavior here matches
that of a topic exchange (whereas where there are dots, behavior
matches that of a direct exchange.
"""
def test(self, key):
return '.' not in key
class FanoutBinding(Binding):
"""Match on fanout keys, where key starts with 'fanout.' string."""
def test(self, key):
return key.startswith('fanout~')
class StubExchange(Exchange):
"""Exchange that does nothing."""
def run(self, key):
return [(key, None)]
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
def __init__(self, host='localhost'):
self.host = host
super(Exchange, self).__init__()
def run(self, key):
return [('.'.join((key.split('.')[0], self.host)), self.host)]
class DirectExchange(Exchange):
"""Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute.host" running on "host"
"""
def __init__(self):
super(Exchange, self).__init__()
def run(self, key):
e = key.split('.', 1)[1]
return [(key, e)]
class MatchMakerLocalhost(MatchMakerBase):
"""Match Maker where all bare topics resolve to localhost.
Useful for testing.
"""
def __init__(self, host='localhost'):
super(MatchMakerLocalhost, self).__init__()
self.add_binding(FanoutBinding(), LocalhostExchange(host))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange(host))
class MatchMakerStub(MatchMakerBase):
"""Match Maker where topics are untouched.
Useful for testing, or for AMQP/brokered queues.
Will not work where knowledge of hosts is known (i.e. zeromq)
"""
def __init__(self):
super(MatchMakerStub, self).__init__()
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange())

View File

@ -1,145 +0,0 @@
# Copyright 2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should accept a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
from oslo_config import cfg
from oslo_utils import importutils
from oslo_messaging._drivers import matchmaker as mm_common
redis = importutils.try_import('redis')
matchmaker_redis_opts = [
cfg.StrOpt('host',
default='127.0.0.1',
help='Host to locate redis.'),
cfg.IntOpt('port',
default=6379,
help='Use this port to connect to redis host.'),
cfg.StrOpt('password',
help='Password for Redis server (optional).'),
]
CONF = cfg.CONF
opt_group = cfg.OptGroup(name='matchmaker_redis',
title='Options for Redis-based MatchMaker')
CONF.register_group(opt_group)
CONF.register_opts(matchmaker_redis_opts, opt_group)
class RedisExchange(mm_common.Exchange):
def __init__(self, matchmaker):
self.matchmaker = matchmaker
self.redis = matchmaker.redis
super(RedisExchange, self).__init__()
class RedisTopicExchange(RedisExchange):
"""Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host"
"""
def run(self, topic):
while True:
member_name = self.redis.srandmember(topic)
if not member_name:
# If this happens, there are no
# longer any members.
break
if not self.matchmaker.is_alive(topic, member_name):
continue
host = member_name.split('.', 1)[1]
return [(member_name, host)]
return []
class RedisFanoutExchange(RedisExchange):
"""Return a list of all hosts."""
def run(self, topic):
topic = topic.split('~', 1)[1]
hosts = self.redis.smembers(topic)
good_hosts = filter(
lambda host: self.matchmaker.is_alive(topic, host), hosts)
return [(x, x.split('.', 1)[1]) for x in good_hosts]
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
"""MatchMaker registering and looking-up hosts with a Redis server."""
def __init__(self):
super(MatchMakerRedis, self).__init__()
if not redis:
raise ImportError("Failed to import module redis.")
self.redis = redis.StrictRedis(
host=CONF.matchmaker_redis.host,
port=CONF.matchmaker_redis.port,
password=CONF.matchmaker_redis.password)
self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
def ack_alive(self, key, host):
topic = "%s.%s" % (key, host)
if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
# If we could not update the expiration, the key
# might have been pruned. Re-register, creating a new
# key in Redis.
self.register(key, host)
def is_alive(self, topic, host):
# After redis 2.8, if the specialized key doesn't exist,
# TTL fuction would return -2. If key exists,
# but doesn't have expiration associated,
# TTL func would return -1. For more information,
# please visit http://redis.io/commands/ttl
if self.redis.ttl(host) == -2:
self.expire(topic, host)
return False
return True
def expire(self, topic, host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.delete(host)
pipe.srem(topic, host)
pipe.execute()
def backend_register(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.sadd(key, key_host)
# No value is needed, we just
# care if it exists. Sets aren't viable
# because only keys can expire.
pipe.sadd(key_host, '')
pipe.execute()
def backend_unregister(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.srem(key, key_host)
pipe.delete(key_host)
pipe.execute()

View File

@ -1,105 +0,0 @@
# Copyright 2011-2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import itertools
import json
import logging
from oslo_config import cfg
from oslo_messaging._drivers import matchmaker as mm
from oslo_messaging._i18n import _LW
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('ringfile',
deprecated_name='matchmaker_ringfile',
deprecated_group='DEFAULT',
default='/etc/oslo/matchmaker_ring.json',
help='Matchmaker ring file (JSON).'),
]
CONF = cfg.CONF
CONF.register_opts(matchmaker_opts, 'matchmaker_ring')
LOG = logging.getLogger(__name__)
class RingExchange(mm.Exchange):
"""Match Maker where hosts are loaded from a static JSON formatted file.
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
"""
def __init__(self, ring=None):
super(RingExchange, self).__init__()
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ring.ringfile, 'r')
self.ring = json.load(fh)
fh.close()
self.ring0 = {}
for k in self.ring.keys():
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
return key in self.ring0
class RoundRobinRingExchange(RingExchange):
"""A Topic Exchange based on a hashmap."""
def __init__(self, ring=None):
super(RoundRobinRingExchange, self).__init__(ring)
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_LW("No key defining hosts for topic '%s', "
"see ringfile"), key
)
return []
host = next(self.ring0[key])
return [(key + '.' + host, host)]
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(FanoutRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "fanout~", strip it for lookup.
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_LW("No key defining hosts for topic '%s', "
"see ringfile"), nkey
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class MatchMakerRing(mm.MatchMakerBase):
"""Match Maker where hosts are loaded from a static hashmap."""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(mm.DirectBinding(), mm.DirectExchange())
self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring))

View File

@ -0,0 +1,101 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import uuid
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI, _LW
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class DealerPublisher(zmq_publisher_base.PublisherMultisend):
def __init__(self, conf, matchmaker):
super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER)
self.ack_receiver = AcknowledgementReceiver()
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
dealer_socket, hosts = self._check_hosts_connections(request.target)
if not dealer_socket.connections:
# NOTE(ozamiatin): Here we can provide
# a queue for keeping messages to send them later
# when some listener appears. However such approach
# being more reliable will consume additional memory.
LOG.warning(_LW("Request %s was dropped because no connection")
% request.msg_type)
return
self.ack_receiver.track_socket(dealer_socket.handle)
if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(dealer_socket.connections_count()):
self._send_request(dealer_socket, request)
else:
self._send_request(dealer_socket, request)
def _send_request(self, socket, request):
message_id = str(uuid.uuid1())
socket.send(b'', zmq.SNDMORE)
socket.send_string(request.msg_type, zmq.SNDMORE)
socket.send_string(message_id, zmq.SNDMORE)
socket.send_pyobj(request.context, zmq.SNDMORE)
socket.send_pyobj(request.message)
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
% {"message": request.message,
"target": request.target})
def cleanup(self):
self.ack_receiver.cleanup()
super(DealerPublisher, self).cleanup()
class AcknowledgementReceiver(object):
def __init__(self):
self.poller = zmq_async.get_poller()
self.thread = zmq_async.get_executor(self.poll_for_acknowledgements)
self.thread.execute()
def _receive_acknowledgement(self, socket):
empty = socket.recv()
assert empty == b"", "Empty delimiter expected"
ack_message = socket.recv_pyobj()
return ack_message
def track_socket(self, socket):
self.poller.register(socket, self._receive_acknowledgement)
def poll_for_acknowledgements(self):
ack_message, socket = self.poller.poll()
LOG.info(_LI("Message %s acknowledged")
% ack_message[zmq_names.FIELD_ID])
def cleanup(self):
self.thread.stop()
self.poller.close()

View File

@ -0,0 +1,47 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class PubPublisher(zmq_publisher_base.PublisherMultisend):
def __init__(self, conf, matchmaker):
super(PubPublisher, self).__init__(conf, matchmaker, zmq.PUB)
def send_request(self, request):
if request.msg_type not in zmq_names.NOTIFY_TYPES:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
pub_socket, hosts = self._check_hosts_connections(request.target)
self._send_request(pub_socket, request)
def _send_request(self, socket, request):
super(PubPublisher, self)._send_request(socket, request)
LOG.info(_LI("Publishing message %(message)s to a target %(target)s")
% {"message": request.message,
"target": request.target})

View File

@ -0,0 +1,141 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class UnsupportedSendPattern(rpc_common.RPCException):
"""Exception to raise from publishers in case of unsupported
sending pattern called.
"""
def __init__(self, pattern_name):
"""Construct exception object
:param pattern_name: Message type name from zmq_names
:type pattern_name: str
"""
errmsg = _LE("Sending pattern %s is unsupported.") % pattern_name
super(UnsupportedSendPattern, self).__init__(errmsg)
@six.add_metaclass(abc.ABCMeta)
class PublisherBase(object):
"""Abstract publisher class
Each publisher from zmq-driver client should implement
this interface to serve as a messages publisher.
Publisher can send request objects from zmq_request.
"""
def __init__(self, conf, matchmaker):
"""Construct publisher
Accept configuration object and Name Service interface object.
Create zmq.Context and connected sockets dictionary.
:param conf: configuration object
:type conf: oslo_config.CONF
:param matchmaker: Name Service interface object
:type matchmaker: matchmaker.MatchMakerBase
"""
self.conf = conf
self.zmq_context = zmq.Context()
self.matchmaker = matchmaker
self.outbound_sockets = {}
super(PublisherBase, self).__init__()
@abc.abstractmethod
def send_request(self, request):
"""Send request to consumer
:param request: Message data and destination container object
:type request: zmq_request.Request
"""
def _send_request(self, socket, request):
"""Send request to consumer.
Helper private method which defines basic sending behavior.
:param socket: Socket to publish message on
:type socket: zmq.Socket
:param request: Message data and destination container object
:type request: zmq_request.Request
"""
socket.send_string(request.msg_type, zmq.SNDMORE)
socket.send_pyobj(request.context, zmq.SNDMORE)
socket.send_pyobj(request.message)
def cleanup(self):
"""Cleanup publisher. Close allocated connections."""
for socket in self.outbound_sockets.values():
socket.setsockopt(zmq.LINGER, 0)
socket.close()
class PublisherMultisend(PublisherBase):
def __init__(self, conf, matchmaker, socket_type):
self.socket_type = socket_type
super(PublisherMultisend, self).__init__(conf, matchmaker)
def _check_hosts_connections(self, target):
# TODO(ozamiatin): Place for significant optimization
# Matchmaker cache should be implemented
hosts = self.matchmaker.get_hosts(target)
if str(target) in self.outbound_sockets:
socket = self.outbound_sockets[str(target)]
else:
socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type)
self.outbound_sockets[str(target)] = socket
for host in hosts:
self._connect_to_host(socket, host, target)
return socket, hosts
def _connect_to_host(self, socket, host, target):
address = zmq_address.get_tcp_direct_address(host)
stype = zmq_names.socket_type_str(self.socket_type)
try:
LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s")
% {"stype": stype,
"address": address,
"target": target})
socket.connect(address)
except zmq.ZMQError as e:
errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
% (stype, address, e)
LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s")
% (stype, address, e))
raise rpc_common.RPCException(errmsg)

View File

@ -0,0 +1,57 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI, _LW
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class PushPublisher(zmq_publisher_base.PublisherMultisend):
def __init__(self, conf, matchmaker):
super(PushPublisher, self).__init__(conf, matchmaker, zmq.PUSH)
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
push_socket, hosts = self._check_hosts_connections(request.target)
if not push_socket.connections:
LOG.warning(_LW("Request %s was dropped because no connection")
% request.msg_type)
return
if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(push_socket.connections_count()):
self._send_request(push_socket, request)
else:
self._send_request(push_socket, request)
def _send_request(self, socket, request):
super(PushPublisher, self)._send_request(socket, request)
LOG.info(_LI("Publishing message %(message)s to a target %(target)s")
% {"message": request.message,
"target": request.target})

View File

@ -0,0 +1,85 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ReqPublisher(zmq_publisher_base.PublisherBase):
def send_request(self, request):
if request.msg_type != zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
socket = self._connect_to_host(request.target)
self._send_request(socket, request)
return self._receive_reply(socket, request)
def _connect_to_host(self, target):
try:
self.zmq_context = zmq.Context()
socket = self.zmq_context.socket(zmq.REQ)
host = self.matchmaker.get_single_host(target)
connect_address = zmq_address.get_tcp_direct_address(host)
LOG.info(_LI("Connecting REQ to %s") % connect_address)
socket.connect(connect_address)
self.outbound_sockets[str(target)] = socket
return socket
except zmq.ZMQError as e:
errmsg = _LE("Error connecting to socket: %s") % str(e)
LOG.error(_LE("Error connecting to socket: %s") % str(e))
raise rpc_common.RPCException(errmsg)
@staticmethod
def _receive_reply(socket, request):
def _receive_method(socket):
return socket.recv_pyobj()
# NOTE(ozamiatin): Check for retry here (no retries now)
with contextlib.closing(zmq_async.get_reply_poller()) as poller:
poller.register(socket, recv_method=_receive_method)
reply, socket = poller.poll(timeout=request.timeout)
if reply is None:
raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % request.timeout)
if reply[zmq_names.FIELD_FAILURE]:
raise rpc_common.deserialize_remote_exception(
reply[zmq_names.FIELD_FAILURE],
request.allowed_remote_exmods)
else:
return reply[zmq_names.FIELD_REPLY]
def close(self):
# For contextlib compatibility
self.cleanup()

View File

@ -0,0 +1,72 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_req_publisher
from oslo_messaging._drivers.zmq_driver.client import zmq_request
from oslo_messaging._drivers.zmq_driver import zmq_async
zmq = zmq_async.import_zmq()
class ZmqClient(object):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
self.conf = conf
self.context = zmq.Context()
self.matchmaker = matchmaker
self.allowed_remote_exmods = allowed_remote_exmods or []
self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
def send_call(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CallRequest(
target, context=context, message=message,
timeout=timeout, retry=retry,
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
with contextlib.closing(zmq_req_publisher.ReqPublisher(
self.conf, self.matchmaker)) as req_publisher:
return req_publisher.send_request(request)
def send_cast(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CastRequest(
target, context=context, message=message,
timeout=timeout, retry=retry)) as request:
self.dealer_publisher.send_request(request)
def send_fanout(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.FanoutRequest(
target, context=context, message=message,
timeout=timeout, retry=retry)) as request:
self.dealer_publisher.send_request(request)
def send_notify(self, target, context, message, version, retry=None):
with contextlib.closing(zmq_request.NotificationRequest(
target, context, message, version=version,
retry=retry)) as request:
self.dealer_publisher.send_request(request)
def send_notify_fanout(self, target, context, message, version,
retry=None):
with contextlib.closing(zmq_request.NotificationFanoutRequest(
target, context, message, version=version,
retry=retry)) as request:
self.dealer_publisher.send_request(request)
def cleanup(self):
self.dealer_publisher.cleanup()

View File

@ -0,0 +1,118 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta)
class Request(object):
"""Zmq request abstract class
Represents socket (publisher) independent data object to publish.
Request object should contain all needed information for a publisher
to publish it, for instance: message payload, target, timeout
and retries etc.
"""
def __init__(self, target, context=None, message=None, retry=None):
"""Construct request object
:param target: Message destination target
:type target: oslo_messaging.Target
:param context: Message context
:type context: dict
:param message: Message payload to pass
:type message: dict
:param retry: an optional default connection retries configuration
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
if self.msg_type not in zmq_names.MESSAGE_TYPES:
raise RuntimeError("Unknown message type!")
self.target = target
self.context = context
self.message = message
self.retry = retry
@abc.abstractproperty
def msg_type(self):
"""ZMQ message type"""
def close(self):
"""Nothing to close in base request"""
class RpcRequest(Request):
def __init__(self, *args, **kwargs):
message = kwargs.get("message")
if message['method'] is None:
errmsg = _LE("No method specified for RPC call")
LOG.error(_LE("No method specified for RPC call"))
raise KeyError(errmsg)
self.timeout = kwargs.pop("timeout")
assert self.timeout is not None, "Timeout should be specified!"
super(RpcRequest, self).__init__(*args, **kwargs)
class CallRequest(RpcRequest):
msg_type = zmq_names.CALL_TYPE
def __init__(self, *args, **kwargs):
self.allowed_remote_exmods = kwargs.pop("allowed_remote_exmods")
super(CallRequest, self).__init__(*args, **kwargs)
class CastRequest(RpcRequest):
msg_type = zmq_names.CAST_TYPE
class FanoutRequest(RpcRequest):
msg_type = zmq_names.CAST_FANOUT_TYPE
class NotificationRequest(Request):
msg_type = zmq_names.NOTIFY_TYPE
def __init__(self, *args, **kwargs):
self.version = kwargs.pop("version")
super(NotificationRequest, self).__init__(*args, **kwargs)
class NotificationFanoutRequest(NotificationRequest):
msg_type = zmq_names.NOTIFY_FANOUT_TYPE

View File

@ -0,0 +1,95 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
import logging
import random
import six
import oslo_messaging
from oslo_messaging._i18n import _LI, _LW
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class MatchMakerBase(object):
def __init__(self, conf, *args, **kwargs):
super(MatchMakerBase, self).__init__(*args, **kwargs)
self.conf = conf
@abc.abstractmethod
def register(self, target, hostname):
"""Register target on nameserver.
:param target: the target for host
:type target: Target
:param hostname: host for the topic in "host:port" format
:type hostname: String
"""
@abc.abstractmethod
def get_hosts(self, target):
"""Get all hosts from nameserver by target.
:param target: the default target for invocations
:type target: Target
:returns: a list of "hostname:port" hosts
"""
def get_single_host(self, target):
"""Get a single host by target.
:param target: the target for messages
:type target: Target
:returns: a "hostname:port" host
"""
hosts = self.get_hosts(target)
if not hosts:
err_msg = "No hosts were found for target %s." % target
LOG.error(err_msg)
raise oslo_messaging.InvalidTarget(err_msg, target)
if len(hosts) == 1:
host = hosts[0]
LOG.info(_LI("A single host %(host)s found for target %(target)s.")
% {"host": host, "target": target})
else:
host = random.choice(hosts)
LOG.warning(_LW("Multiple hosts %(hosts)s were found for target "
" %(target)s. Using the random one - %(host)s.")
% {"hosts": hosts, "target": target, "host": host})
return host
class DummyMatchMaker(MatchMakerBase):
def __init__(self, conf, *args, **kwargs):
super(DummyMatchMaker, self).__init__(conf, *args, **kwargs)
self._cache = collections.defaultdict(list)
def register(self, target, hostname):
key = str(target)
if hostname not in self._cache[key]:
self._cache[key].append(hostname)
def get_hosts(self, target):
key = str(target)
return self._cache[key]

View File

@ -0,0 +1,77 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_config import cfg
import redis
from oslo_messaging._drivers.zmq_driver.matchmaker import base
LOG = logging.getLogger(__name__)
matchmaker_redis_opts = [
cfg.StrOpt('host',
default='127.0.0.1',
help='Host to locate redis.'),
cfg.IntOpt('port',
default=6379,
help='Use this port to connect to redis host.'),
cfg.StrOpt('password',
default='',
secret=True,
help='Password for Redis server (optional).'),
]
class RedisMatchMaker(base.MatchMakerBase):
def __init__(self, conf, *args, **kwargs):
super(RedisMatchMaker, self).__init__(conf, *args, **kwargs)
self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis")
self._redis = redis.StrictRedis(
host=self.conf.matchmaker_redis.host,
port=self.conf.matchmaker_redis.port,
password=self.conf.matchmaker_redis.password,
)
def _target_to_key(self, target):
attributes = ['topic', 'exchange', 'server']
prefix = "ZMQ-target"
key = ":".join((getattr(target, attr) or "*") for attr in attributes)
return "%s-%s" % (prefix, key)
def _get_keys_by_pattern(self, pattern):
return self._redis.keys(pattern)
def _get_hosts_by_key(self, key):
return self._redis.lrange(key, 0, -1)
def register(self, target, hostname):
key = self._target_to_key(target)
if hostname not in self._get_hosts_by_key(key):
self._redis.lpush(key, hostname)
def get_hosts(self, target):
pattern = self._target_to_key(target)
if "*" not in pattern:
# pattern have no placeholders, so this is valid key
return self._get_hosts_by_key(pattern)
hosts = []
for key in self._get_keys_by_pattern(pattern):
hosts.extend(self._get_hosts_by_key(key))