Update common to get new kombu serialization code

There are some changes in the serialization format for
messages sent with the kombu driver under some situations,
so bring in the new version of the driver to ensure we
can communicate with other components also using the new
format.

Change-Id: Iffd102cb780d13a40e93047a59917dc56c4d56ea
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
This commit is contained in:
Doug Hellmann
2013-02-20 08:50:32 -05:00
parent 22b1e58201
commit cf84207743
8 changed files with 341 additions and 76 deletions

View File

@@ -51,12 +51,20 @@ def _print_greenthreads():
print
def _print_nativethreads():
for threadId, stack in sys._current_frames().items():
print threadId
traceback.print_stack(stack)
print
def initialize_if_enabled():
backdoor_locals = {
'exit': _dont_use_this, # So we don't exit the entire process
'quit': _dont_use_this, # So we don't exit the entire process
'fo': _find_objects,
'pgt': _print_greenthreads,
'pnt': _print_nativethreads,
}
if CONF.backdoor_port is None:

View File

@@ -325,16 +325,11 @@ def _create_logging_excepthook(product_name):
def setup(product_name):
"""Setup logging."""
sys.excepthook = _create_logging_excepthook(product_name)
if CONF.log_config:
try:
logging.config.fileConfig(CONF.log_config)
except Exception:
traceback.print_exc()
raise
logging.config.fileConfig(CONF.log_config)
else:
_setup_logging_from_conf(product_name)
sys.excepthook = _create_logging_excepthook(product_name)
def set_defaults(logging_context_format_string):

View File

@@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from ceilometer.openstack.common import jsonutils

View File

@@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from ceilometer.openstack.common import context as req_context

View File

@@ -32,13 +32,27 @@ import uuid
from eventlet import greenpool
from eventlet import pools
from eventlet import semaphore
from eventlet import queue
# TODO(pekowsk): Remove import cfg and below comment in Havana.
# This import should no longer be needed when the amqp_rpc_single_reply_queue
# option is removed.
from oslo.config import cfg
from ceilometer.openstack.common import excutils
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import local
from ceilometer.openstack.common import log as logging
from ceilometer.openstack.common.rpc import common as rpc_common
# TODO(pekowski): Remove this option in Havana.
amqp_opts = [
cfg.BoolOpt('amqp_rpc_single_reply_queue',
default=False,
help='Enable a fast single reply queue if using AMQP based '
'RPC like RabbitMQ or Qpid.'),
]
cfg.CONF.register_opts(amqp_opts)
LOG = logging.getLogger(__name__)
@@ -51,6 +65,7 @@ class Pool(pools.Pool):
kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
def create(self):
@@ -60,6 +75,16 @@ class Pool(pools.Pool):
def empty(self):
while self.free_items:
self.get().close()
# Force a new connection pool to be created.
# Note that this was added due to failing unit test cases. The issue
# is the above "while loop" gets all the cached connections from the
# pool and closes them, but never returns them to the pool, a pool
# leak. The unit tests hang waiting for an item to be returned to the
# pool. The unit tests get here via the teatDown() method. In the run
# time code, it gets here via cleanup() and only appears in service.py
# just before doing a sys.exit(), so cleanup() only happens once and
# the leakage is not a problem.
self.connection_cls.pool = None
_pool_create_sem = semaphore.Semaphore()
@@ -154,8 +179,45 @@ class ConnectionContext(rpc_common.Connection):
raise rpc_common.InvalidRPCConnectionReuse()
def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
ending=False, log_failure=True):
class ReplyProxy(ConnectionContext):
""" Connection class for RPC replies / callbacks """
def __init__(self, conf, connection_pool):
self._call_waiters = {}
self._num_call_waiters = 0
self._num_call_waiters_wrn_threshhold = 10
self._reply_q = 'reply_' + uuid.uuid4().hex
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
self.declare_direct_consumer(self._reply_q, self._process_data)
self.consume_in_thread()
def _process_data(self, message_data):
msg_id = message_data.pop('_msg_id', None)
waiter = self._call_waiters.get(msg_id)
if not waiter:
LOG.warn(_('no calling threads waiting for msg_id : %s'
', message : %s') % (msg_id, message_data))
else:
waiter.put(message_data)
def add_call_waiter(self, waiter, msg_id):
self._num_call_waiters += 1
if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
LOG.warn(_('Number of call waiters is greater than warning '
'threshhold: %d. There could be a MulticallProxyWaiter '
'leak.') % self._num_call_waiters_wrn_threshhold)
self._num_call_waiters_wrn_threshhold *= 2
self._call_waiters[msg_id] = waiter
def del_call_waiter(self, msg_id):
self._num_call_waiters -= 1
del self._call_waiters[msg_id]
def get_reply_q(self):
return self._reply_q
def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
failure=None, ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
@@ -174,13 +236,21 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibilty.
if reply_q:
msg['_msg_id'] = msg_id
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
else:
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call"""
def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
self.reply_q = kwargs.pop('reply_q', None)
self.conf = kwargs.pop('conf')
super(RpcContext, self).__init__(**kwargs)
@@ -188,13 +258,14 @@ class RpcContext(rpc_common.CommonRpcContext):
values = self.to_dict()
values['conf'] = self.conf
values['msg_id'] = self.msg_id
values['reply_q'] = self.reply_q
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None, log_failure=True):
if self.msg_id:
msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
ending, log_failure)
msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
reply, failure, ending, log_failure)
if ending:
self.msg_id = None
@@ -210,6 +281,7 @@ def unpack_context(conf, msg):
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
context_dict['reply_q'] = msg.pop('_reply_q', None)
context_dict['conf'] = conf
ctx = RpcContext.from_dict(context_dict)
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
@@ -339,6 +411,65 @@ class ProxyCallback(_ThreadPoolWithWait):
connection_pool=self.connection_pool)
class MulticallProxyWaiter(object):
def __init__(self, conf, msg_id, timeout, connection_pool):
self._msg_id = msg_id
self._timeout = timeout or conf.rpc_response_timeout
self._reply_proxy = connection_pool.reply_proxy
self._done = False
self._got_ending = False
self._conf = conf
self._dataqueue = queue.LightQueue()
# Add this caller to the reply proxy's call_waiters
self._reply_proxy.add_call_waiter(self, self._msg_id)
def put(self, data):
self._dataqueue.put(data)
def done(self):
if self._done:
return
self._done = True
# Remove this caller from reply proxy's call_waiters
self._reply_proxy.del_call_waiter(self._msg_id)
def _process_data(self, data):
result = None
if data['failure']:
failure = data['failure']
result = rpc_common.deserialize_remote_exception(self._conf,
failure)
elif data.get('ending', False):
self._got_ending = True
else:
result = data['result']
return result
def __iter__(self):
"""Return a result until we get a reply with an 'ending" flag"""
if self._done:
raise StopIteration
while True:
try:
data = self._dataqueue.get(timeout=self._timeout)
result = self._process_data(data)
except queue.Empty:
LOG.exception(_('Timed out waiting for RPC response.'))
self.done()
raise rpc_common.Timeout()
except Exception:
with excutils.save_and_reraise_exception():
self.done()
if self._got_ending:
self.done()
raise StopIteration
if isinstance(result, Exception):
self.done()
raise result
yield result
#TODO(pekowski): Remove MulticallWaiter() in Havana.
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
self._connection = connection
@@ -394,22 +525,40 @@ def create_connection(conf, new, connection_pool):
return ConnectionContext(conf, connection_pool, pooled=not new)
_reply_proxy_create_sem = semaphore.Semaphore()
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
# TODO(pekowski): Remove all these comments in Havana.
# For amqp_rpc_single_reply_queue = False,
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
# For amqp_rpc_single_reply_queue = True,
# The 'with' statement is mandatory for closing the connection
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
pack_context(msg, context)
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
# TODO(pekowski): Remove this flag and the code under the if clause
# in Havana.
if not conf.amqp_rpc_single_reply_queue:
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
else:
with _reply_proxy_create_sem:
if not connection_pool.reply_proxy:
connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg

View File

@@ -17,9 +17,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import copy
import sys
import traceback
import uuid
from oslo.config import cfg
@@ -46,33 +48,40 @@ This version number applies to the message envelope that is used in the
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
The current message format (version 2.0) is very simple. It is:
The current message format (version 2.1) is very simple. It is:
{
'ceilometer.version': <RPC Envelope Version as a String>,
'ceilometer.message': <Application Message Payload, JSON encoded>
'ceilometer.nonce': <Unique message identifier>
}
Message format version '1.0' is just considered to be the messages we sent
without a message envelope.
So, the current message envelope just includes the envelope version. It may
eventually contain additional information, such as a signature for the message
payload.
Message format version '2.0' sent ceilometer.message containing a JSON encoded
Application Message Payload without Hashed Parameters.
We will JSON encode the application message payload. The message envelope,
The message format is intended eventually contain additional information,
such as a signature for the message payload.
We will JSON encode the application message payload. The message,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
_RPC_ENVELOPE_VERSION = '2.0'
_RPC_ENVELOPE_VERSION = '2.1'
_VERSION_KEY = 'ceilometer.version'
_MESSAGE_KEY = 'ceilometer.message'
_NONCE_KEY = 'ceilometer.nonce'
# TODO(russellb) Turn this on after Grizzly.
_SEND_RPC_ENVELOPE = False
DUP_MSG_CHECK_SIZE = 512 # Arbitrary - make configurable.
SEEN_MSGS = collections.deque([], maxlen=DUP_MSG_CHECK_SIZE)
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
@@ -125,6 +134,10 @@ class Timeout(RPCException):
message = _("Timeout while waiting on RPC response.")
class DuplicatedMessageError(RPCException):
message = _("Received replayed message(%(msg_id)s). Ignoring.")
class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")
@@ -139,6 +152,10 @@ class UnsupportedRpcEnvelopeVersion(RPCException):
"not supported by this endpoint.")
class InvalidRpcEnvelope(RPCException):
message = _("RPC envelope was malformed.")
class Connection(object):
"""A connection, returned by rpc.create_connection().
@@ -438,17 +455,32 @@ def version_is_compatible(imp_version, version):
def serialize_msg(raw_msg, force_envelope=False):
msg_identifier = uuid.uuid4().hex
if not _SEND_RPC_ENVELOPE and not force_envelope:
if isinstance(raw_msg, dict):
raw_msg['_nonce'] = msg_identifier
return raw_msg
"""Make an RPC message envelope"""
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
_MESSAGE_KEY: jsonutils.dumps(raw_msg)}
_MESSAGE_KEY: jsonutils.dumps(raw_msg),
_NONCE_KEY: msg_identifier}
return msg
def _raise_if_duplicate(duplicate_key):
"""Check if a message is a duplicate based on key."""
if not duplicate_key:
return
if duplicate_key in SEEN_MSGS:
raise DuplicatedMessageError(duplicate_key)
SEEN_MSGS.append(duplicate_key)
def deserialize_msg(msg):
# NOTE(russellb): Hang on to your hats, this road is about to
# get a little bumpy.
@@ -473,21 +505,32 @@ def deserialize_msg(msg):
# This case covers return values from rpc.call() from before message
# envelopes were used. (messages to call a method were always a dict)
has_envelope = True
base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
if not isinstance(msg, dict):
# See #2 above.
return msg
base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
if not all(map(lambda key: key in msg, base_envelope_keys)):
# See #1.b above.
return msg
# At this point we think we have the message envelope
# format we were expecting. (#1.a above)
if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
elif not all(map(lambda key: key in msg, base_envelope_keys)):
# See #1.b above.
has_envelope = False
elif not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
nonce = None
raw_msg = None
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
if has_envelope and '_NONCE_KEY' in msg: # envelope v2.1
_raise_if_duplicate(msg[_NONCE_KEY])
# Here, we can delay jsonutils.loads until
# after we have verified the message.
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
elif has_envelope: # envelope v2.0
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
nonce = raw_msg.get('_nonce')
_raise_if_duplicate(nonce)
else: # no envelope ("v1.0")
raw_msg = msg
nonce = raw_msg.get('_nonce')
_raise_if_duplicate(nonce)
return raw_msg

View File

@@ -98,6 +98,21 @@ cfg.CONF.register_opts(kombu_opts)
LOG = rpc_common.LOG
def _unflatten_envelope(packenv):
"""Unflattens the RPC envelope.
Takes a list and returns a dictionary.
i.e. [1,2,3,4] => {1: 2, 3: 4}
"""
i = iter(packenv)
h = {}
try:
while True:
k = i.next()
h[k] = i.next()
except StopIteration:
return h
def _get_queue_arguments(conf):
"""Construct the arguments for declaring a queue.
@@ -163,8 +178,13 @@ class ConsumerBase(object):
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
if '\0' in message.payload:
msg = _unflatten_envelope(message.payload.split('\0'))
else:
msg = message.payload
raw_msg = rpc_common.deserialize_msg(msg)
callback(raw_msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
@@ -198,6 +218,7 @@ class DirectConsumer(ConsumerBase):
"""
# Default options
options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
'exclusive': False}
options.update(kwargs)
@@ -306,6 +327,10 @@ class Publisher(object):
def send(self, msg, timeout=None):
"""Send a message"""
if isinstance(msg, dict) and rpc_common._VERSION_KEY in msg:
# Fast-serialize envelopes to avoid Kombu's JSON serialization.
msg = '\0'.join(reduce(lambda x, y: x + y, msg.items()))
if timeout:
#
# AMQP TTL is in milliseconds when set in the header.

View File

@@ -216,12 +216,18 @@ class ZmqClient(object):
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
self.outq.send(map(bytes,
(msg_id, topic, 'cast', _serialize(data))))
return
rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
self.outq.send(map(bytes,
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
def close(self):
self.outq.close()
@@ -320,7 +326,7 @@ class ConsumerBase(object):
else:
return [result]
def process(self, style, target, proxy, ctx, data):
def process(self, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
@@ -432,12 +438,14 @@ class ZmqProxy(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
msg_id, topic, style, in_msg = data
topic = topic.split('.', 1)[0]
topic = data[1]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
if topic.startswith('fanout~'):
sock_type = zmq.PUB
topic = topic.split('.', 1)[0]
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
else:
sock_type = zmq.PUSH
@@ -520,6 +528,21 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).consume_in_thread()
def unflatten_envelope(packenv):
"""Unflattens the RPC envelope.
Takes a list and returns a dictionary.
i.e. [1,2,3,4] => {1: 2, 3: 4}
"""
i = iter(packenv)
h = {}
try:
while True:
k = i.next()
h[k] = i.next()
except StopIteration:
return h
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
@@ -540,38 +563,50 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
msg_id, topic, style, in_msg = data
ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
self.pool.spawn_n(self.process, style, topic,
proxy, ctx, request)
if data[2] == 'cast': # Legacy protocol
packenv = data[3]
ctx, msg = _deserialize(packenv)
request = rpc_common.deserialize_msg(msg)
ctx = RpcContext.unmarshal(ctx)
elif data[2] == 'impl_zmq_v2':
packenv = data[4:]
msg = unflatten_envelope(packenv)
request = rpc_common.deserialize_msg(msg)
# Unmarshal only after verifying the message.
ctx = RpcContext.unmarshal(data[3])
else:
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
return
self.pool.spawn_n(self.process, proxy, ctx, request)
class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic})
# Subscription scenarios
if fanout:
subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
topic = 'fanout~' + topic
subscribe = ('', fanout)[type(fanout) == str]
topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
if topic in self.topics:
LOG.info(_("Skipping topic registration. Already registered."))
return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
@@ -582,9 +617,11 @@ class Connection(rpc_common.Connection):
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
self.topics.append(topic)
def close(self):
self.reactor.close()
self.topics = []
def wait(self):
self.reactor.wait()
@@ -593,8 +630,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
def _cast(addr, context, topic, msg, timeout=None, serialize=True,
force_envelope=False, _msg_id=None):
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
_msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -603,7 +640,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
conn.cast(_msg_id, topic, payload, serialize, force_envelope)
conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -612,7 +649,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
def _call(addr, context, topic, msg, timeout=None,
serialize=True, force_envelope=False):
envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@@ -642,20 +679,31 @@ def _call(addr, context, topic, msg, timeout=None,
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
"ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
"ipc://%s/zmq_topic_zmq_replies.%s" %
(CONF.rpc_zmq_ipc_dir,
CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)
LOG.debug(_("Sending cast"))
_cast(addr, context, topic, payload,
serialize=serialize, force_envelope=force_envelope)
_cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
responses = _deserialize(msg[-1])[-1]['args']['response']
if msg[2] == 'cast': # Legacy version
raw_msg = _deserialize(msg[-1])[-1]
elif msg[2] == 'impl_zmq_v2':
rpc_envelope = unflatten_envelope(msg[4:])
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
else:
raise rpc_common.UnsupportedRpcEnvelopeVersion(
_("Unsupported or unknown ZMQ envelope returned."))
responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
@@ -676,8 +724,8 @@ def _call(addr, context, topic, msg, timeout=None,
return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False, _msg_id=None):
def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -703,11 +751,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
_topic, msg, timeout, serialize,
force_envelope, _msg_id)
_topic, msg, timeout, envelope,
_msg_id)
return
return method(_addr, context, _topic, msg, timeout,
serialize, force_envelope)
envelope)
def create_connection(conf, new=True):
@@ -746,8 +794,7 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
kwargs['serialize'] = kwargs.pop('envelope')
kwargs['force_envelope'] = True
kwargs['envelope'] = kwargs.get('envelope', True)
cast(conf, context, topic, msg, **kwargs)