Sync latest rpc changes from oslo-incubator

Includes these commits:

 3006787 Sanitize passwords in _safe_log
 c37f6aa Add support for heartbeating in the kombu RPC driver
 323e465 Add conditional exception reraise
 dea334a Replace sys.exit by a RPCException
 719eba4 Don't reconnect to exclusive fanout consumers.
 22ec8ff Make AMQP based RPC consumer threads more robust
 13650b1 rpc: remove some unused serialization code
 e204885 Optionally reject messages on exception.
 688832f Remove unused zmq relay functionality

Change-Id: I8629c271bc00571783f9b77ec36034945df9a583
This commit is contained in:
Mark McLoughlin
2013-07-09 07:46:21 +01:00
parent 91eff71753
commit 386a8c21ec
5 changed files with 108 additions and 59 deletions

View File

@@ -19,16 +19,15 @@
Exception related utilities. Exception related utilities.
""" """
import contextlib
import logging import logging
import sys import sys
import time
import traceback import traceback
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _
@contextlib.contextmanager class save_and_reraise_exception(object):
def save_and_reraise_exception():
"""Save current exception, run some code and then re-raise. """Save current exception, run some code and then re-raise.
In some cases the exception context can be cleared, resulting in None In some cases the exception context can be cleared, resulting in None
@@ -40,12 +39,60 @@ def save_and_reraise_exception():
To work around this, we save the exception state, run handler code, and To work around this, we save the exception state, run handler code, and
then re-raise the original exception. If another exception occurs, the then re-raise the original exception. If another exception occurs, the
saved exception is logged and the new exception is re-raised. saved exception is logged and the new exception is re-raised.
"""
type_, value, tb = sys.exc_info() In some cases the caller may not want to re-raise the exception, and
try: for those circumstances this context provides a reraise flag that
yield can be used to suppress the exception. For example:
except Exception: except Exception:
with save_and_reraise_exception() as ctxt:
decide_if_need_reraise()
if not should_be_reraised:
ctxt.reraise = False
"""
def __init__(self):
self.reraise = True
def __enter__(self):
self.type_, self.value, self.tb, = sys.exc_info()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
logging.error(_('Original exception being dropped: %s'), logging.error(_('Original exception being dropped: %s'),
traceback.format_exception(type_, value, tb)) traceback.format_exception(self.type_,
raise self.value,
raise type_, value, tb self.tb))
return False
if self.reraise:
raise self.type_, self.value, self.tb
def forever_retry_uncaught_exceptions(infunc):
def inner_func(*args, **kwargs):
last_log_time = 0
last_exc_message = None
exc_count = 0
while True:
try:
return infunc(*args, **kwargs)
except Exception as exc:
if exc.message == last_exc_message:
exc_count += 1
else:
exc_count = 1
# Do not log any more frequently than once a minute unless
# the exception message changes
cur_time = int(time.time())
if (cur_time - last_log_time > 60 or
exc.message != last_exc_message):
logging.exception(
_('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
last_log_time = cur_time
last_exc_message = exc.message
exc_count = 0
# This should be a very rare event. In case it isn't, do
# a sleep.
time.sleep(1)
return inner_func

View File

@@ -221,12 +221,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
failure = rpc_common.serialize_remote_exception(failure, failure = rpc_common.serialize_remote_exception(failure,
log_failure) log_failure)
try:
msg = {'result': reply, 'failure': failure} msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending: if ending:
msg['ending'] = True msg['ending'] = True
_add_unique_id(msg) _add_unique_id(msg)

View File

@@ -261,41 +261,20 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data): def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging.""" """Sanitizes the msg_data field before logging."""
SANITIZE = {'set_admin_password': [('args', 'new_pass')], SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
'run_instance': [('args', 'admin_password')],
'route_message': [('args', 'message', 'args', 'method_info',
'method_kwargs', 'password'),
('args', 'message', 'args', 'method_info',
'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE def _fix_passwords(d):
has_context_token = '_context_auth_token' in msg_data """Sanitizes the password fields in the dictionary."""
has_token = 'auth_token' in msg_data for k in d.iterkeys():
if k.lower().find('password') != -1:
d[k] = '<SANITIZED>'
elif k.lower() in SANITIZE:
d[k] = '<SANITIZED>'
elif isinstance(d[k], dict):
_fix_passwords(d[k])
return d
if not any([has_method, has_context_token, has_token]): return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
return log_func(msg, msg_data)
msg_data = copy.deepcopy(msg_data)
if has_method:
for arg in SANITIZE.get(msg_data['method'], []):
try:
d = msg_data
for elem in arg[:-1]:
d = d[elem]
d[arg[-1]] = '<SANITIZED>'
except KeyError as e:
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>'
if has_token:
msg_data['auth_token'] = '<SANITIZED>'
return log_func(msg, msg_data)
def serialize_remote_exception(failure_info, log_failure=True): def serialize_remote_exception(failure_info, log_failure=True):

View File

@@ -18,7 +18,6 @@ import functools
import itertools import itertools
import socket import socket
import ssl import ssl
import sys
import time import time
import uuid import uuid
@@ -30,6 +29,7 @@ import kombu.entity
import kombu.messaging import kombu.messaging
from oslo.config import cfg from oslo.config import cfg
from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _
from nova.openstack.common import network_utils from nova.openstack.common import network_utils
from nova.openstack.common.rpc import amqp as rpc_amqp from nova.openstack.common.rpc import amqp as rpc_amqp
@@ -82,6 +82,9 @@ kombu_opts = [
default=0, default=0,
help='maximum retries with trying to connect to RabbitMQ ' help='maximum retries with trying to connect to RabbitMQ '
'(the default of 0 implies an infinite retry count)'), '(the default of 0 implies an infinite retry count)'),
cfg.IntOpt('rabbit_heartbeat',
default=60,
help='Seconds between connection keepalive heartbeats'),
cfg.BoolOpt('rabbit_durable_queues', cfg.BoolOpt('rabbit_durable_queues',
default=False, default=False,
help='use durable queues in RabbitMQ'), help='use durable queues in RabbitMQ'),
@@ -449,6 +452,7 @@ class Connection(object):
'userid': self.conf.rabbit_userid, 'userid': self.conf.rabbit_userid,
'password': self.conf.rabbit_password, 'password': self.conf.rabbit_password,
'virtual_host': self.conf.rabbit_virtual_host, 'virtual_host': self.conf.rabbit_virtual_host,
'heartbeat': self.conf.rabbit_heartbeat,
} }
for sp_key, value in server_params.iteritems(): for sp_key, value in server_params.iteritems():
@@ -560,13 +564,11 @@ class Connection(object):
log_info.update(params) log_info.update(params)
if self.max_retries and attempt == self.max_retries: if self.max_retries and attempt == self.max_retries:
LOG.error(_('Unable to connect to AMQP server on ' msg = _('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d ' '%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info) 'tries: %(err_str)s') % log_info
# NOTE(comstud): Copied from original code. There's LOG.error(msg)
# really no better recourse because if this was a queue we raise rpc_common.RPCException(msg)
# need to consume on, we have no way to consume anymore.
sys.exit(1)
if attempt == 1: if attempt == 1:
sleep_time = self.interval_start or 1 sleep_time = self.interval_start or 1
@@ -748,6 +750,7 @@ class Connection(object):
def consume_in_thread(self): def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread.""" """Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread(): def _consumer_thread():
try: try:
self.consume() self.consume()

View File

@@ -24,6 +24,7 @@ import eventlet
import greenlet import greenlet
from oslo.config import cfg from oslo.config import cfg
from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils from nova.openstack.common import importutils
from nova.openstack.common import jsonutils from nova.openstack.common import jsonutils
@@ -118,10 +119,17 @@ class ConsumerBase(object):
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session) self.connect(session)
def connect(self, session):
"""Declare the reciever on connect."""
self._declare_receiver(session)
def reconnect(self, session): def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect.""" """Re-declare the receiver after a qpid reconnect."""
self._declare_receiver(session)
def _declare_receiver(self, session):
self.session = session self.session = session
self.receiver = session.receiver(self.address) self.receiver = session.receiver(self.address)
self.receiver.capacity = 1 self.receiver.capacity = 1
@@ -158,6 +166,9 @@ class ConsumerBase(object):
def get_receiver(self): def get_receiver(self):
return self.receiver return self.receiver
def get_node_name(self):
return self.address.split(';')[0]
class DirectConsumer(ConsumerBase): class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'.""" """Queue/consumer class for 'direct'."""
@@ -207,6 +218,7 @@ class FanoutConsumer(ConsumerBase):
'topic' is the topic to listen on 'topic' is the topic to listen on
'callback' is the callback to call when messages are received 'callback' is the callback to call when messages are received
""" """
self.conf = conf
super(FanoutConsumer, self).__init__( super(FanoutConsumer, self).__init__(
session, callback, session, callback,
@@ -215,6 +227,18 @@ class FanoutConsumer(ConsumerBase):
"%s_fanout_%s" % (topic, uuid.uuid4().hex), "%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True}) {"exclusive": True})
def reconnect(self, session):
topic = self.get_node_name()
params = {
'session': session,
'topic': topic,
'callback': self.callback,
}
self.__init__(conf=self.conf, **params)
super(FanoutConsumer, self).reconnect(session)
class Publisher(object): class Publisher(object):
"""Base Publisher class.""" """Base Publisher class."""
@@ -576,6 +600,7 @@ class Connection(object):
def consume_in_thread(self): def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread.""" """Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread(): def _consumer_thread():
try: try:
self.consume() self.consume()