
This change introduces to possibility for the consumer application to change the default oslo.messaging config options. Same API interface as oslo.db have been taken. The first option is executor_thread_pool_size. This option was called rpc_thread_pool_size, but because it's used by notification and rpc 'executor_thread_pool_size' looks a better name. Changing executor_thread_pool_size default will be useful for ceilometer the default of 64 is really to small to consume many notifications at once for batching them. When is clearly sufficient for rpc stuffs. Change-Id: Iea0d7a72e38d27c600403c815258aa5eee0d0c8c
1137 lines
37 KiB
Python
1137 lines
37 KiB
Python
# Copyright 2011 Cloudscaling Group, Inc
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import collections
|
|
import logging
|
|
import os
|
|
import pprint
|
|
import re
|
|
import socket
|
|
import sys
|
|
import threading
|
|
import types
|
|
import uuid
|
|
|
|
import eventlet
|
|
import greenlet
|
|
from oslo_config import cfg
|
|
from oslo_serialization import jsonutils
|
|
from oslo_utils import excutils
|
|
from oslo_utils import importutils
|
|
import six
|
|
from six import moves
|
|
from stevedore import driver
|
|
|
|
from oslo_messaging._drivers import base
|
|
from oslo_messaging._drivers import common as rpc_common
|
|
from oslo_messaging._executors import impl_pooledexecutor # FIXME(markmc)
|
|
from oslo_messaging._i18n import _, _LE, _LW
|
|
from oslo_messaging._drivers import pool
|
|
|
|
|
|
zmq = importutils.try_import('eventlet.green.zmq')
|
|
|
|
# for convenience, are not modified.
|
|
pformat = pprint.pformat
|
|
Timeout = eventlet.timeout.Timeout
|
|
LOG = logging.getLogger(__name__)
|
|
RPCException = rpc_common.RPCException
|
|
|
|
zmq_opts = [
|
|
cfg.StrOpt('rpc_zmq_bind_address', default='*',
|
|
help='ZeroMQ bind address. Should be a wildcard (*), '
|
|
'an ethernet interface, or IP. '
|
|
'The "host" option should point or resolve to this '
|
|
'address.'),
|
|
|
|
# The module.Class to use for matchmaking.
|
|
cfg.StrOpt(
|
|
'rpc_zmq_matchmaker',
|
|
default='local',
|
|
help='MatchMaker driver.',
|
|
),
|
|
|
|
# The following port is unassigned by IANA as of 2012-05-21
|
|
cfg.IntOpt('rpc_zmq_port', default=9501,
|
|
help='ZeroMQ receiver listening port.'),
|
|
|
|
cfg.IntOpt('rpc_zmq_contexts', default=1,
|
|
help='Number of ZeroMQ contexts, defaults to 1.'),
|
|
|
|
cfg.IntOpt('rpc_zmq_topic_backlog',
|
|
help='Maximum number of ingress messages to locally buffer '
|
|
'per topic. Default is unlimited.'),
|
|
|
|
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
|
|
help='Directory for holding IPC sockets.'),
|
|
|
|
cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
|
|
sample_default='localhost',
|
|
help='Name of this node. Must be a valid hostname, FQDN, or '
|
|
'IP address. Must match "host" option, if running Nova.'),
|
|
|
|
cfg.IntOpt('rpc_cast_timeout',
|
|
default=30,
|
|
help='Seconds to wait before a cast expires (TTL). '
|
|
'Only supported by impl_zmq.'),
|
|
]
|
|
|
|
CONF = cfg.CONF
|
|
|
|
matchmaker = None # memoized matchmaker object
|
|
|
|
|
|
def _serialize(data):
|
|
"""Serialization wrapper.
|
|
|
|
We prefer using JSON, but it cannot encode all types.
|
|
Error if a developer passes us bad data.
|
|
"""
|
|
try:
|
|
return jsonutils.dumps(data, ensure_ascii=True)
|
|
except TypeError:
|
|
with excutils.save_and_reraise_exception():
|
|
LOG.error(_("JSON serialization failed."))
|
|
|
|
|
|
def _deserialize(data):
|
|
"""Deserialization wrapper."""
|
|
LOG.debug("Deserializing: %r", data)
|
|
return jsonutils.loads(data)
|
|
|
|
|
|
class ZmqSocket(object):
|
|
"""A tiny wrapper around ZeroMQ.
|
|
|
|
Simplifies the send/recv protocol and connection management.
|
|
Can be used as a Context (supports the 'with' statement).
|
|
"""
|
|
|
|
def __init__(self, addr, zmq_type, bind=True, subscribe=None, ctxt=None):
|
|
self.ctxt = ctxt or zmq.Context(CONF.rpc_zmq_contexts)
|
|
self.sock = self.ctxt.socket(zmq_type)
|
|
|
|
# Enable IPv6-support in libzmq.
|
|
# When IPv6 is enabled, a socket will connect to, or accept
|
|
# connections from, both IPv4 and IPv6 hosts.
|
|
try:
|
|
self.sock.ipv6 = True
|
|
except AttributeError:
|
|
# NOTE(dhellmann): Sometimes the underlying library does
|
|
# not recognize the IPV6 option. There's nothing we can
|
|
# really do in that case, so ignore the error and keep
|
|
# trying to work.
|
|
pass
|
|
|
|
self.addr = addr
|
|
self.type = zmq_type
|
|
self.subscriptions = []
|
|
|
|
# Support failures on sending/receiving on wrong socket type.
|
|
self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
|
|
self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
|
|
self.can_sub = zmq_type in (zmq.SUB, )
|
|
|
|
# Support list, str, & None for subscribe arg (cast to list)
|
|
do_sub = {
|
|
list: subscribe,
|
|
str: [subscribe],
|
|
type(None): []
|
|
}[type(subscribe)]
|
|
|
|
for f in do_sub:
|
|
self.subscribe(f)
|
|
|
|
str_data = {'addr': addr, 'type': self.socket_s(),
|
|
'subscribe': subscribe, 'bind': bind}
|
|
|
|
LOG.debug("Connecting to %(addr)s with %(type)s", str_data)
|
|
LOG.debug("-> Subscribed to %(subscribe)s", str_data)
|
|
LOG.debug("-> bind: %(bind)s", str_data)
|
|
|
|
try:
|
|
if bind:
|
|
self.sock.bind(addr)
|
|
else:
|
|
self.sock.connect(addr)
|
|
except Exception:
|
|
raise RPCException(_("Could not open socket."))
|
|
|
|
def socket_s(self):
|
|
"""Get socket type as string."""
|
|
t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
|
|
'DEALER')
|
|
return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]
|
|
|
|
def subscribe(self, msg_filter):
|
|
"""Subscribe."""
|
|
if not self.can_sub:
|
|
raise RPCException("Cannot subscribe on this socket.")
|
|
LOG.debug("Subscribing to %s", msg_filter)
|
|
|
|
try:
|
|
arg = msg_filter
|
|
if six.PY3:
|
|
arg = arg.encode('utf-8')
|
|
self.sock.setsockopt(zmq.SUBSCRIBE, arg)
|
|
except Exception:
|
|
return
|
|
|
|
self.subscriptions.append(msg_filter)
|
|
|
|
def unsubscribe(self, msg_filter):
|
|
"""Unsubscribe."""
|
|
if msg_filter not in self.subscriptions:
|
|
return
|
|
arg = msg_filter
|
|
if six.PY3:
|
|
arg = arg.encode('utf-8')
|
|
self.sock.setsockopt(zmq.UNSUBSCRIBE, arg)
|
|
self.subscriptions.remove(msg_filter)
|
|
|
|
@property
|
|
def closed(self):
|
|
return self.sock is None or self.sock.closed
|
|
|
|
def close(self):
|
|
if self.sock is None or self.sock.closed:
|
|
return
|
|
|
|
# We must unsubscribe, or we'll leak descriptors.
|
|
if self.subscriptions:
|
|
for f in self.subscriptions:
|
|
try:
|
|
self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
|
|
except Exception:
|
|
pass
|
|
self.subscriptions = []
|
|
|
|
try:
|
|
# Default is to linger
|
|
self.sock.close()
|
|
self.ctxt.term()
|
|
except Exception:
|
|
# While this is a bad thing to happen,
|
|
# it would be much worse if some of the code calling this
|
|
# were to fail. For now, lets log, and later evaluate
|
|
# if we can safely raise here.
|
|
LOG.error("ZeroMQ socket could not be closed.")
|
|
self.sock = None
|
|
|
|
def recv(self, **kwargs):
|
|
if not self.can_recv:
|
|
raise RPCException(_("You cannot recv on this socket."))
|
|
return self.sock.recv_multipart(**kwargs)
|
|
|
|
def send(self, data, **kwargs):
|
|
if not self.can_send:
|
|
raise RPCException(_("You cannot send on this socket."))
|
|
self.sock.send_multipart(data, **kwargs)
|
|
|
|
|
|
class ZmqClient(object):
|
|
"""Client for ZMQ sockets."""
|
|
|
|
def __init__(self, addr, ctxt=None):
|
|
self.address = addr
|
|
self.outq = ZmqSocket(addr, zmq.PUSH, bind=False, ctxt=ctxt)
|
|
|
|
def cast(self, msg_id, topic, data, envelope):
|
|
msg_id = msg_id or '0'
|
|
|
|
if six.PY3:
|
|
msg_id = msg_id.encode('utf-8')
|
|
|
|
if not envelope:
|
|
data = _serialize(data)
|
|
if six.PY3:
|
|
data = data.encode('utf-8')
|
|
data = (msg_id, topic, b'cast', data)
|
|
self.outq.send([bytes(item) for item in data])
|
|
return
|
|
|
|
rpc_envelope = rpc_common.serialize_msg(data[1])
|
|
zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
|
|
data = (msg_id, topic, b'impl_zmq_v2', data[0]) + zmq_msg
|
|
self.outq.send([bytes(item) for item in data])
|
|
|
|
def close(self):
|
|
self.outq.close()
|
|
|
|
|
|
class ZmqClientContext(object):
|
|
"""This is essentially a wrapper around ZmqClient that supports 'with'.
|
|
It can also return a new ZmqClient, or one from a pool.
|
|
|
|
The function will also catch when an instance of this class is to be
|
|
deleted. With that we can return ZmqClients to the pool on exceptions
|
|
and so forth without making the caller be responsible for catching them.
|
|
If possible the function makes sure to return a client to the pool.
|
|
|
|
Based on amqp.ConnectionContext.
|
|
"""
|
|
|
|
def __init__(self, address, connection_pool=None, pooled=False):
|
|
self.connection = None
|
|
self.connection_pool = connection_pool
|
|
self.pooled = pooled
|
|
if self.pooled and self.connection_pool is not None:
|
|
self.connection = self.connection_pool.get(address)
|
|
else:
|
|
self.connection = ZmqClient(address)
|
|
|
|
def __enter__(self):
|
|
"""When with ZmqClientContext() is used, return self."""
|
|
return self
|
|
|
|
def _done(self):
|
|
"""If the client came from a pool, clean it up and put it back.
|
|
If it did not come from a pool, close it.
|
|
"""
|
|
if self.connection:
|
|
if self.pooled and self.connection_pool is not None:
|
|
# Reset the connection so it's ready for the next caller
|
|
# to grab from the pool
|
|
self.connection_pool.put(self.connection)
|
|
else:
|
|
try:
|
|
self.connection.close()
|
|
except Exception:
|
|
pass
|
|
self.connection = None
|
|
|
|
def __exit__(self, exc_type, exc_value, tb):
|
|
"""End of 'with' statement. We're done here."""
|
|
self._done()
|
|
|
|
def __del__(self):
|
|
"""Caller is done with this client. Make sure we cleaned up."""
|
|
self._done()
|
|
|
|
def close(self):
|
|
"""Caller is done with this client."""
|
|
self._done()
|
|
|
|
def __getattr__(self, key):
|
|
"""Proxy all other calls to the ZmqClient instance."""
|
|
if self.connection:
|
|
return getattr(self.connection, key)
|
|
else:
|
|
raise rpc_common.InvalidRPCConnectionReuse()
|
|
|
|
|
|
class RpcContext(rpc_common.CommonRpcContext):
|
|
"""Context that supports replying to a rpc.call."""
|
|
def __init__(self, **kwargs):
|
|
self.replies = []
|
|
super(RpcContext, self).__init__(**kwargs)
|
|
|
|
def deepcopy(self):
|
|
values = self.to_dict()
|
|
values['replies'] = self.replies
|
|
return self.__class__(**values)
|
|
|
|
def reply(self, reply=None, failure=None, ending=False):
|
|
if ending:
|
|
return
|
|
self.replies.append(reply)
|
|
|
|
@classmethod
|
|
def marshal(self, ctx):
|
|
if not isinstance(ctx, dict):
|
|
ctx_data = ctx.to_dict()
|
|
else:
|
|
ctx_data = ctx
|
|
return _serialize(ctx_data)
|
|
|
|
@classmethod
|
|
def unmarshal(self, data):
|
|
return RpcContext.from_dict(_deserialize(data))
|
|
|
|
|
|
class InternalContext(object):
|
|
"""Used by ConsumerBase as a private context for - methods."""
|
|
|
|
def __init__(self, proxy):
|
|
self.proxy = proxy
|
|
self.msg_waiter = None
|
|
|
|
def _get_response(self, ctx, proxy, topic, data):
|
|
"""Process a curried message and cast the result to topic."""
|
|
LOG.debug("Running func with context: %s", ctx.to_dict())
|
|
data.setdefault('version', None)
|
|
data.setdefault('args', {})
|
|
|
|
try:
|
|
if not data.get("method"):
|
|
raise KeyError
|
|
result = proxy.dispatch(ctx, data)
|
|
return ConsumerBase.normalize_reply(result, ctx.replies)
|
|
except greenlet.GreenletExit:
|
|
# ignore these since they are just from shutdowns
|
|
pass
|
|
except rpc_common.ClientException as e:
|
|
LOG.debug("Expected exception during message handling (%s)",
|
|
e._exc_info[1])
|
|
return {'exc':
|
|
rpc_common.serialize_remote_exception(e._exc_info,
|
|
log_failure=False)}
|
|
except Exception:
|
|
LOG.error(_("Exception during message handling"))
|
|
return {'exc':
|
|
rpc_common.serialize_remote_exception(sys.exc_info())}
|
|
|
|
def reply(self, driver, ctx, proxy,
|
|
msg_id=None, context=None, topic=None, msg=None):
|
|
"""Reply to a casted call."""
|
|
# NOTE(ewindisch): context kwarg exists for Grizzly compat.
|
|
# this may be able to be removed earlier than
|
|
# 'I' if ConsumerBase.process were refactored.
|
|
if type(msg) is list:
|
|
payload = msg[-1]
|
|
else:
|
|
payload = msg
|
|
|
|
response = ConsumerBase.normalize_reply(
|
|
self._get_response(ctx, proxy, topic, payload),
|
|
ctx.replies)
|
|
|
|
LOG.debug("Sending reply")
|
|
_multi_send(driver, _cast, ctx, topic, {
|
|
'method': '-process_reply',
|
|
'args': {
|
|
'msg_id': msg_id, # Include for Folsom compat.
|
|
'response': response
|
|
}
|
|
}, _msg_id=msg_id, pooled=True)
|
|
|
|
|
|
class ConsumerBase(object):
|
|
"""Base Consumer."""
|
|
|
|
def __init__(self, driver):
|
|
self.driver = driver
|
|
self.private_ctx = InternalContext(None)
|
|
|
|
@classmethod
|
|
def normalize_reply(self, result, replies):
|
|
# TODO(ewindisch): re-evaluate and document this method.
|
|
if isinstance(result, types.GeneratorType):
|
|
return list(result)
|
|
elif replies:
|
|
return replies
|
|
else:
|
|
return [result]
|
|
|
|
def process(self, proxy, ctx, data):
|
|
data.setdefault('version', None)
|
|
data.setdefault('args', {})
|
|
|
|
# Method starting with - are
|
|
# processed internally. (non-valid method name)
|
|
method = data.get('method')
|
|
# Internal method
|
|
# uses internal context for safety.
|
|
if method == '-reply':
|
|
self.private_ctx.reply(self.driver, ctx, proxy, **data['args'])
|
|
return
|
|
|
|
proxy.dispatch(ctx, data)
|
|
|
|
|
|
class ZmqBaseReactor(ConsumerBase):
|
|
"""A consumer class implementing a centralized casting broker (PULL-PUSH).
|
|
|
|
Used for RoundRobin requests.
|
|
"""
|
|
|
|
def __init__(self, conf, driver=None):
|
|
super(ZmqBaseReactor, self).__init__(driver)
|
|
|
|
self.driver = driver
|
|
self.proxies = {}
|
|
self.threads = []
|
|
self.sockets = []
|
|
self.subscribe = {}
|
|
|
|
self.pool = eventlet.greenpool.GreenPool(
|
|
conf.executor_thread_pool_size)
|
|
|
|
def register(self, proxy, in_addr, zmq_type_in,
|
|
in_bind=True, subscribe=None):
|
|
|
|
LOG.info(_("Registering reactor"))
|
|
|
|
if zmq_type_in not in (zmq.PULL, zmq.SUB):
|
|
raise RPCException("Bad input socktype")
|
|
|
|
# Items push in.
|
|
inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
|
|
subscribe=subscribe)
|
|
|
|
self.proxies[inq] = proxy
|
|
self.sockets.append(inq)
|
|
|
|
LOG.info(_("In reactor registered"))
|
|
|
|
def consume_in_thread(self):
|
|
def _consume(sock):
|
|
LOG.info(_("Consuming socket"))
|
|
while not sock.closed:
|
|
self.consume(sock)
|
|
|
|
for k in self.proxies.keys():
|
|
self.threads.append(
|
|
self.pool.spawn(_consume, k)
|
|
)
|
|
|
|
def wait(self):
|
|
for t in self.threads:
|
|
t.wait()
|
|
|
|
def close(self):
|
|
for t in self.threads:
|
|
t.kill()
|
|
|
|
for s in self.sockets:
|
|
s.close()
|
|
|
|
|
|
class ZmqProxy(ZmqBaseReactor):
|
|
"""A consumer class implementing a topic-based proxy.
|
|
|
|
Forwards to IPC sockets.
|
|
"""
|
|
|
|
def __init__(self, conf):
|
|
super(ZmqProxy, self).__init__(conf)
|
|
pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
|
|
self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
|
|
|
|
self.topic_proxy = {}
|
|
|
|
def consume(self, sock):
|
|
ipc_dir = CONF.rpc_zmq_ipc_dir
|
|
|
|
data = sock.recv(copy=False)
|
|
topic = data[1].bytes
|
|
if six.PY3:
|
|
topic = topic.decode('utf-8')
|
|
|
|
if topic.startswith('fanout~'):
|
|
sock_type = zmq.PUB
|
|
topic = topic.split('.', 1)[0]
|
|
elif topic.startswith('zmq_replies'):
|
|
sock_type = zmq.PUB
|
|
else:
|
|
sock_type = zmq.PUSH
|
|
|
|
if topic not in self.topic_proxy:
|
|
def publisher(waiter):
|
|
LOG.info(_("Creating proxy for topic: %s"), topic)
|
|
|
|
try:
|
|
# The topic is received over the network,
|
|
# don't trust this input.
|
|
if self.badchars.search(topic) is not None:
|
|
emsg = _("Topic contained dangerous characters.")
|
|
LOG.warn(emsg)
|
|
raise RPCException(emsg)
|
|
|
|
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
|
|
(ipc_dir, topic),
|
|
sock_type, bind=True)
|
|
except RPCException:
|
|
waiter.send_exception(*sys.exc_info())
|
|
return
|
|
|
|
self.topic_proxy[topic] = eventlet.queue.LightQueue(
|
|
CONF.rpc_zmq_topic_backlog)
|
|
self.sockets.append(out_sock)
|
|
|
|
# It takes some time for a pub socket to open,
|
|
# before we can have any faith in doing a send() to it.
|
|
if sock_type == zmq.PUB:
|
|
eventlet.sleep(.5)
|
|
|
|
waiter.send(True)
|
|
|
|
while(True):
|
|
data = self.topic_proxy[topic].get()
|
|
out_sock.send(data, copy=False)
|
|
|
|
wait_sock_creation = eventlet.event.Event()
|
|
eventlet.spawn(publisher, wait_sock_creation)
|
|
|
|
try:
|
|
wait_sock_creation.wait()
|
|
except RPCException:
|
|
LOG.error(_("Topic socket file creation failed."))
|
|
return
|
|
|
|
try:
|
|
self.topic_proxy[topic].put_nowait(data)
|
|
except eventlet.queue.Full:
|
|
LOG.error(_("Local per-topic backlog buffer full for topic "
|
|
"%s. Dropping message."), topic)
|
|
|
|
def consume_in_thread(self):
|
|
"""Runs the ZmqProxy service."""
|
|
ipc_dir = CONF.rpc_zmq_ipc_dir
|
|
consume_in = "tcp://%s:%s" % \
|
|
(CONF.rpc_zmq_bind_address,
|
|
CONF.rpc_zmq_port)
|
|
consumption_proxy = InternalContext(None)
|
|
|
|
try:
|
|
os.makedirs(ipc_dir)
|
|
except os.error:
|
|
if not os.path.isdir(ipc_dir):
|
|
with excutils.save_and_reraise_exception():
|
|
LOG.error(_("Required IPC directory does not exist at"
|
|
" %s"), ipc_dir)
|
|
try:
|
|
self.register(consumption_proxy,
|
|
consume_in,
|
|
zmq.PULL)
|
|
except zmq.ZMQError:
|
|
if os.access(ipc_dir, os.X_OK):
|
|
with excutils.save_and_reraise_exception():
|
|
LOG.error(_("Permission denied to IPC directory at"
|
|
" %s"), ipc_dir)
|
|
with excutils.save_and_reraise_exception():
|
|
LOG.error(_("Could not create ZeroMQ receiver daemon. "
|
|
"Socket may already be in use."))
|
|
|
|
super(ZmqProxy, self).consume_in_thread()
|
|
|
|
|
|
def unflatten_envelope(packenv):
|
|
"""Unflattens the RPC envelope.
|
|
|
|
Takes a list and returns a dictionary.
|
|
i.e. [1,2,3,4] => {1: 2, 3: 4}
|
|
"""
|
|
i = iter(packenv)
|
|
h = {}
|
|
try:
|
|
while True:
|
|
k = six.next(i)
|
|
h[k] = six.next(i)
|
|
except StopIteration:
|
|
return h
|
|
|
|
|
|
class ZmqReactor(ZmqBaseReactor):
|
|
"""A consumer class implementing a consumer for messages.
|
|
|
|
Can also be used as a 1:1 proxy
|
|
"""
|
|
|
|
def __init__(self, conf, driver):
|
|
super(ZmqReactor, self).__init__(conf, driver)
|
|
|
|
def consume(self, sock):
|
|
# TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
|
data = sock.recv()
|
|
LOG.debug("CONSUMER RECEIVED DATA: %s", data)
|
|
|
|
proxy = self.proxies[sock]
|
|
|
|
if data[2] == b'cast': # Legacy protocol
|
|
packenv = data[3]
|
|
|
|
ctx, msg = _deserialize(packenv)
|
|
request = rpc_common.deserialize_msg(msg)
|
|
ctx = RpcContext.unmarshal(ctx)
|
|
elif data[2] == b'impl_zmq_v2':
|
|
packenv = data[4:]
|
|
|
|
msg = unflatten_envelope(packenv)
|
|
request = rpc_common.deserialize_msg(msg)
|
|
|
|
# Unmarshal only after verifying the message.
|
|
ctx = RpcContext.unmarshal(data[3])
|
|
else:
|
|
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
|
|
return
|
|
|
|
self.pool.spawn_n(self.process, proxy, ctx, request)
|
|
|
|
|
|
class Connection(rpc_common.Connection):
|
|
"""Manages connections and threads."""
|
|
|
|
def __init__(self, conf, driver):
|
|
self.topics = []
|
|
self.reactor = ZmqReactor(conf, driver)
|
|
|
|
def create_consumer(self, topic, proxy, fanout=False):
|
|
# Register with matchmaker.
|
|
_get_matchmaker().register(topic, CONF.rpc_zmq_host)
|
|
|
|
# Subscription scenarios
|
|
if fanout:
|
|
sock_type = zmq.SUB
|
|
subscribe = ('', fanout)[type(fanout) == str]
|
|
topic = 'fanout~' + topic.split('.', 1)[0]
|
|
else:
|
|
sock_type = zmq.PULL
|
|
subscribe = None
|
|
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
|
|
|
|
if topic in self.topics:
|
|
LOG.info(_("Skipping topic registration. Already registered."))
|
|
return
|
|
|
|
# Receive messages from (local) proxy
|
|
inaddr = "ipc://%s/zmq_topic_%s" % \
|
|
(CONF.rpc_zmq_ipc_dir, topic)
|
|
|
|
LOG.debug("Consumer is a zmq.%s",
|
|
['PULL', 'SUB'][sock_type == zmq.SUB])
|
|
|
|
self.reactor.register(proxy, inaddr, sock_type,
|
|
subscribe=subscribe, in_bind=False)
|
|
self.topics.append(topic)
|
|
|
|
def close(self):
|
|
mm = _get_matchmaker()
|
|
mm.stop_heartbeat()
|
|
for topic in self.topics:
|
|
try:
|
|
mm.unregister(topic, CONF.rpc_zmq_host)
|
|
except Exception as err:
|
|
LOG.error(_LE('Unable to unregister topic %(topic)s'
|
|
' from matchmaker: %(err)s') %
|
|
{'topic': topic, 'err': err})
|
|
|
|
self.reactor.close()
|
|
self.topics = []
|
|
|
|
def wait(self):
|
|
self.reactor.wait()
|
|
|
|
def consume_in_thread(self):
|
|
_get_matchmaker().start_heartbeat()
|
|
self.reactor.consume_in_thread()
|
|
|
|
|
|
def _cast(driver, addr, context, topic, msg, timeout=None, envelope=False,
|
|
_msg_id=None, allowed_remote_exmods=None, pooled=False):
|
|
allowed_remote_exmods = allowed_remote_exmods or []
|
|
timeout_cast = timeout or CONF.rpc_cast_timeout
|
|
payload = [RpcContext.marshal(context), msg]
|
|
if six.PY3:
|
|
topic = topic.encode('utf-8')
|
|
|
|
with Timeout(timeout_cast, exception=rpc_common.Timeout):
|
|
with driver.get_connection(addr, pooled) as conn:
|
|
try:
|
|
# assumes cast can't return an exception
|
|
conn.cast(_msg_id, topic, payload, envelope)
|
|
except zmq.ZMQError:
|
|
raise RPCException("Cast failed. ZMQ Socket Exception")
|
|
|
|
|
|
def _call(driver, addr, context, topic, msg, timeout=None,
|
|
envelope=False, allowed_remote_exmods=None, pooled=False):
|
|
allowed_remote_exmods = allowed_remote_exmods or []
|
|
# timeout_response is how long we wait for a response
|
|
timeout = timeout or CONF.rpc_response_timeout
|
|
|
|
# The msg_id is used to track replies.
|
|
msg_id = uuid.uuid4().hex
|
|
|
|
# Replies always come into the reply service.
|
|
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
|
|
|
|
LOG.debug("Creating payload")
|
|
# Curry the original request into a reply method.
|
|
mcontext = RpcContext.marshal(context)
|
|
payload = {
|
|
'method': '-reply',
|
|
'args': {
|
|
'msg_id': msg_id,
|
|
'topic': reply_topic,
|
|
# TODO(ewindisch): safe to remove mcontext in I.
|
|
'msg': [mcontext, msg]
|
|
}
|
|
}
|
|
|
|
LOG.debug("Creating queue socket for reply waiter")
|
|
|
|
# Messages arriving async.
|
|
# TODO(ewindisch): have reply consumer with dynamic subscription mgmt
|
|
with Timeout(timeout, exception=rpc_common.Timeout):
|
|
try:
|
|
msg_waiter = ZmqSocket(
|
|
"ipc://%s/zmq_topic_zmq_replies.%s" %
|
|
(CONF.rpc_zmq_ipc_dir,
|
|
CONF.rpc_zmq_host),
|
|
zmq.SUB, subscribe=msg_id, bind=False
|
|
)
|
|
|
|
LOG.debug("Sending cast: %s", topic)
|
|
_cast(driver, addr, context, topic, payload, envelope=envelope,
|
|
pooled=pooled)
|
|
|
|
LOG.debug("Cast sent; Waiting reply")
|
|
# Blocks until receives reply
|
|
msg = msg_waiter.recv()
|
|
if msg is None:
|
|
raise rpc_common.Timeout()
|
|
LOG.debug("Received message: %s", msg)
|
|
LOG.debug("Unpacking response")
|
|
|
|
if msg[2] == b'cast': # Legacy version
|
|
raw_msg = _deserialize(msg[-1])[-1]
|
|
elif msg[2] == b'impl_zmq_v2':
|
|
rpc_envelope = unflatten_envelope(msg[4:])
|
|
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
|
|
else:
|
|
raise rpc_common.UnsupportedRpcEnvelopeVersion(
|
|
_("Unsupported or unknown ZMQ envelope returned."))
|
|
|
|
responses = raw_msg['args']['response']
|
|
# ZMQError trumps the Timeout error.
|
|
except zmq.ZMQError:
|
|
raise RPCException("ZMQ Socket Error")
|
|
except (IndexError, KeyError):
|
|
raise RPCException(_("RPC Message Invalid."))
|
|
finally:
|
|
if 'msg_waiter' in vars():
|
|
msg_waiter.close()
|
|
|
|
# It seems we don't need to do all of the following,
|
|
# but perhaps it would be useful for multicall?
|
|
# One effect of this is that we're checking all
|
|
# responses for Exceptions.
|
|
for resp in responses:
|
|
if isinstance(resp, dict) and 'exc' in resp:
|
|
raise rpc_common.deserialize_remote_exception(
|
|
resp['exc'], allowed_remote_exmods)
|
|
|
|
return responses[-1]
|
|
|
|
|
|
def _multi_send(driver, method, context, topic, msg, timeout=None,
|
|
envelope=False, _msg_id=None, allowed_remote_exmods=None,
|
|
pooled=False):
|
|
"""Wraps the sending of messages.
|
|
|
|
Dispatches to the matchmaker and sends message to all relevant hosts.
|
|
"""
|
|
allowed_remote_exmods = allowed_remote_exmods or []
|
|
conf = CONF
|
|
LOG.debug(' '.join(map(pformat, (topic, msg))))
|
|
|
|
queues = _get_matchmaker().queues(topic)
|
|
LOG.debug("Sending message(s) to: %s", queues)
|
|
|
|
# Don't stack if we have no matchmaker results
|
|
if not queues:
|
|
warn_log = _LW("No matchmaker results. Not sending.")
|
|
|
|
if method.__name__ == '_cast':
|
|
LOG.warn(warn_log)
|
|
return
|
|
|
|
# While not strictly a timeout, callers know how to handle
|
|
# this exception and a timeout isn't too big a lie.
|
|
raise rpc_common.Timeout(warn_log)
|
|
|
|
# This supports brokerless fanout (addresses > 1)
|
|
return_val = None
|
|
for queue in queues:
|
|
_topic, ip_addr = queue
|
|
_addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
|
|
|
|
if method.__name__ == '_cast':
|
|
eventlet.spawn_n(method, driver, _addr, context,
|
|
_topic, msg, timeout, envelope, _msg_id,
|
|
None, pooled)
|
|
else:
|
|
return_val = method(driver, _addr, context, _topic, msg, timeout,
|
|
envelope, allowed_remote_exmods, pooled)
|
|
|
|
return return_val
|
|
|
|
|
|
def _get_matchmaker(*args, **kwargs):
|
|
global matchmaker
|
|
mm_name = CONF.rpc_zmq_matchmaker
|
|
|
|
# Back compatibility for old class names
|
|
mm_mapping = {
|
|
'oslo_messaging._drivers.matchmaker_redis.MatchMakerRedis': 'redis',
|
|
'oslo_messaging._drivers.matchmaker_ring.MatchMakerRing': 'ring',
|
|
'oslo_messaging._drivers.matchmaker.MatchMakerLocalhost': 'local',
|
|
'oslo.messaging._drivers.matchmaker_redis.MatchMakerRedis': 'redis',
|
|
'oslo.messaging._drivers.matchmaker_ring.MatchMakerRing': 'ring',
|
|
'oslo.messaging._drivers.matchmaker.MatchMakerLocalhost': 'local'}
|
|
if mm_name in mm_mapping:
|
|
LOG.warn(_LW('rpc_zmq_matchmaker = %(old_val)s is deprecated. '
|
|
'It is suggested to change the value to %(new_val)s.'),
|
|
{'old_val': mm_name, 'new_val': mm_mapping[mm_name]})
|
|
mm_name = mm_mapping[mm_name]
|
|
|
|
if not matchmaker:
|
|
mgr = driver.DriverManager('oslo.messaging.zmq.matchmaker',
|
|
mm_name)
|
|
matchmaker = mgr.driver(*args, **kwargs)
|
|
return matchmaker
|
|
|
|
|
|
class ZmqIncomingMessage(base.IncomingMessage):
|
|
|
|
ReceivedReply = collections.namedtuple(
|
|
'ReceivedReply', ['reply', 'failure', 'log_failure'])
|
|
|
|
def __init__(self, listener, ctxt, message):
|
|
super(ZmqIncomingMessage, self).__init__(listener, ctxt, message)
|
|
self.condition = threading.Condition()
|
|
self.received = None
|
|
|
|
def reply(self, reply=None, failure=None, log_failure=True):
|
|
self.received = self.ReceivedReply(reply, failure, log_failure)
|
|
with self.condition:
|
|
self.condition.notify()
|
|
|
|
def requeue(self):
|
|
LOG.debug("WARNING: requeue not supported")
|
|
|
|
|
|
class ZmqListener(base.Listener):
|
|
|
|
def __init__(self, driver):
|
|
super(ZmqListener, self).__init__(driver)
|
|
self.incoming_queue = moves.queue.Queue()
|
|
|
|
def dispatch(self, ctxt, message):
|
|
incoming = ZmqIncomingMessage(self,
|
|
ctxt.to_dict(),
|
|
message)
|
|
|
|
self.incoming_queue.put(incoming)
|
|
|
|
with incoming.condition:
|
|
incoming.condition.wait()
|
|
|
|
assert incoming.received
|
|
|
|
if incoming.received.failure:
|
|
raise incoming.received.failure
|
|
else:
|
|
return incoming.received.reply
|
|
|
|
def poll(self, timeout=None):
|
|
try:
|
|
return self.incoming_queue.get(timeout=timeout)
|
|
except six.moves.queue.Empty:
|
|
# timeout
|
|
return None
|
|
|
|
|
|
class ZmqClientPool(pool.Pool):
|
|
"""Class that implements a pool of Zmq Clients for a single endpoint"""
|
|
def __init__(self, conf, address, connection_cls, ctxt):
|
|
self.connection_cls = connection_cls
|
|
self.ctxt = ctxt
|
|
self.address = address
|
|
super(ZmqClientPool, self).__init__(conf.rpc_conn_pool_size)
|
|
|
|
def create(self):
|
|
LOG.debug('Pool creating new ZMQ connection for %s' % self.address)
|
|
return self.connection_cls(self.address, self.ctxt)
|
|
|
|
def empty(self):
|
|
for item in self.iter_free():
|
|
item.close()
|
|
|
|
|
|
class ZmqClientPoolManager(object):
|
|
"""Class that manages pools of clients for Zmq endpoints"""
|
|
|
|
def __init__(self, conf, ctxt=None):
|
|
self._pools = {}
|
|
self._lock = threading.Lock()
|
|
self.conf = conf
|
|
self.ctxt = ctxt
|
|
|
|
def get(self, address):
|
|
if address not in self._pools:
|
|
with self._lock:
|
|
if address not in self._pools:
|
|
self._pools[address] = ZmqClientPool(self.conf,
|
|
address,
|
|
ZmqClient,
|
|
self.ctxt)
|
|
return self._pools[address].get()
|
|
|
|
def put(self, item):
|
|
self._pools[item.address].put(item)
|
|
|
|
def empty(self):
|
|
for p in self._pools:
|
|
self._pools[p].empty()
|
|
|
|
|
|
class ZmqDriver(base.BaseDriver):
|
|
"""ZeroMQ Driver
|
|
|
|
See :doc:`zmq_driver` for details.
|
|
|
|
"""
|
|
|
|
# FIXME(markmc): allow this driver to be used without eventlet
|
|
|
|
def __init__(self, conf, url, default_exchange=None,
|
|
allowed_remote_exmods=None):
|
|
if not zmq:
|
|
raise ImportError("Failed to import eventlet.green.zmq")
|
|
conf.register_opts(zmq_opts)
|
|
conf.register_opts(impl_pooledexecutor._pool_opts)
|
|
conf.register_opts(base.base_opts)
|
|
|
|
super(ZmqDriver, self).__init__(conf, url, default_exchange,
|
|
allowed_remote_exmods)
|
|
|
|
# FIXME(markmc): handle default_exchange
|
|
|
|
# FIXME(markmc): handle transport URL
|
|
if self._url.hosts:
|
|
raise NotImplementedError('The ZeroMQ driver does not yet support '
|
|
'transport URLs')
|
|
|
|
# FIXME(markmc): use self.conf everywhere
|
|
if self.conf is not CONF:
|
|
raise NotImplementedError('The ZeroMQ driver currently only works '
|
|
'with oslo.config.cfg.CONF')
|
|
|
|
self.listeners = []
|
|
|
|
# NOTE(jamespage): Create pool manager on first use to deal with
|
|
# os.fork calls in openstack daemons.
|
|
self._pool = None
|
|
self._pid = None
|
|
self._lock = threading.Lock()
|
|
|
|
def _configure_pool_manager(func):
|
|
"""Causes a new pool manager to be created when the messaging service
|
|
is first used by the current process. This is important as all
|
|
connections in the pools manager by the pool manager will share the
|
|
same ZMQ context, which must not be shared across OS processes.
|
|
"""
|
|
def wrap(self, *args, **kws):
|
|
with self._lock:
|
|
old_pid = self._pid
|
|
self._pid = os.getpid()
|
|
|
|
if old_pid != self._pid:
|
|
# Create fresh pool manager for the current process
|
|
# along with a new ZMQ context.
|
|
self._pool = ZmqClientPoolManager(
|
|
self.conf,
|
|
zmq.Context(self.conf.rpc_zmq_contexts)
|
|
)
|
|
return func(self, *args, **kws)
|
|
return wrap
|
|
|
|
def _send(self, target, ctxt, message,
|
|
wait_for_reply=None, timeout=None, envelope=False):
|
|
|
|
if wait_for_reply:
|
|
method = _call
|
|
else:
|
|
method = _cast
|
|
|
|
topic = target.topic
|
|
if target.fanout:
|
|
# NOTE(ewindisch): fanout~ is used because it avoid splitting on
|
|
# and acts as a non-subtle hint to the matchmaker and ZmqProxy.
|
|
topic = 'fanout~' + topic
|
|
elif target.server:
|
|
topic = '%s.%s' % (topic, target.server)
|
|
|
|
reply = _multi_send(self, method, ctxt, topic, message,
|
|
envelope=envelope,
|
|
allowed_remote_exmods=self._allowed_remote_exmods,
|
|
pooled=True)
|
|
|
|
if wait_for_reply:
|
|
return reply[-1]
|
|
|
|
@_configure_pool_manager
|
|
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
|
retry=None):
|
|
# NOTE(sileht): retry is not implemented because this driver never
|
|
# retry anything
|
|
return self._send(target, ctxt, message, wait_for_reply, timeout)
|
|
|
|
@_configure_pool_manager
|
|
def send_notification(self, target, ctxt, message, version, retry=None):
|
|
# NOTE(ewindisch): dot-priority in rpc notifier does not
|
|
# work with our assumptions.
|
|
# NOTE(sileht): retry is not implemented because this driver never
|
|
# retry anything
|
|
target = target(topic=target.topic.replace('.', '-'))
|
|
return self._send(target, ctxt, message, envelope=(version == 2.0))
|
|
|
|
@_configure_pool_manager
|
|
def listen(self, target):
|
|
conn = Connection(self.conf, self)
|
|
|
|
listener = ZmqListener(self)
|
|
|
|
conn.create_consumer(target.topic, listener)
|
|
conn.create_consumer('%s.%s' % (target.topic, target.server),
|
|
listener)
|
|
conn.create_consumer(target.topic, listener, fanout=True)
|
|
|
|
conn.consume_in_thread()
|
|
self.listeners.append(conn)
|
|
|
|
return listener
|
|
|
|
@_configure_pool_manager
|
|
def listen_for_notifications(self, targets_and_priorities, pool):
|
|
# NOTE(sileht): this listener implementation is limited
|
|
# because zeromq doesn't support:
|
|
# * requeing message
|
|
# * pool
|
|
conn = Connection(self.conf, self)
|
|
|
|
listener = ZmqListener(self)
|
|
for target, priority in targets_and_priorities:
|
|
# NOTE(ewindisch): dot-priority in rpc notifier does not
|
|
# work with our assumptions.
|
|
# NOTE(sileht): create_consumer doesn't support target.exchange
|
|
conn.create_consumer('%s-%s' % (target.topic, priority),
|
|
listener)
|
|
conn.consume_in_thread()
|
|
self.listeners.append(conn)
|
|
|
|
return listener
|
|
|
|
def cleanup(self):
|
|
for c in self.listeners:
|
|
c.close()
|
|
self.listeners = []
|
|
if self._pool:
|
|
self._pool.empty()
|
|
|
|
def get_connection(self, address, pooled=False):
|
|
return ZmqClientContext(address, self._pool, pooled)
|