Sync latest rpc changes from oslo-incubator

Includes these commits:

 99b7c35 Convert kombu SSL version string into integer
 1a2df89 Enable H302 hacking check
 3969355 Fix exception arg typo
 3006787 Sanitize passwords in _safe_log
 323e465 Add conditional exception reraise
 dea334a Replace sys.exit by a RPCException
 719eba4 Don't reconnect to exclusive fanout consumers.
 22ec8ff Make AMQP based RPC consumer threads more robust
 13650b1 rpc: remove some unused serialization code
 e204885 Optionally reject messages on exception.
 688832f Remove unused zmq relay functionality

The sslutils module is included for its validate_ssl_version() method.

Change-Id: Ic2b64f68bfda0dde365221ab1efbc592b1e5bc6f
This commit is contained in:
Mark McLoughlin 2013-07-09 07:46:21 +01:00
parent 6872b1f19e
commit 9c1ba5c0f5
14 changed files with 245 additions and 74 deletions

View File

@ -1542,8 +1542,9 @@
# Options defined in nova.openstack.common.rpc.impl_kombu # Options defined in nova.openstack.common.rpc.impl_kombu
# #
# SSL version to use (valid only if SSL enabled) (string # SSL version to use (valid only if SSL enabled). valid values
# value) # are TLSv1, SSLv23 and SSLv3. SSLv2 may be available on some
# distributions (string value)
#kombu_ssl_version= #kombu_ssl_version=
# SSL key file (valid only if SSL enabled) (string value) # SSL key file (valid only if SSL enabled) (string value)
@ -2611,6 +2612,25 @@
#manager=nova.conductor.manager.ConductorManager #manager=nova.conductor.manager.ConductorManager
[ssl]
#
# Options defined in nova.openstack.common.sslutils
#
# CA certificate file to use to verify connecting clients
# (string value)
#ca_file=<None>
# Certificate file to use when starting the server securely
# (string value)
#cert_file=<None>
# Private key file to use when starting the server securely
# (string value)
#key_file=<None>
[cells] [cells]
# #
@ -3127,4 +3147,4 @@
#keymap=en-us #keymap=en-us
# Total option count: 624 # Total option count: 627

View File

@ -19,16 +19,15 @@
Exception related utilities. Exception related utilities.
""" """
import contextlib
import logging import logging
import sys import sys
import time
import traceback import traceback
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _
@contextlib.contextmanager class save_and_reraise_exception(object):
def save_and_reraise_exception():
"""Save current exception, run some code and then re-raise. """Save current exception, run some code and then re-raise.
In some cases the exception context can be cleared, resulting in None In some cases the exception context can be cleared, resulting in None
@ -40,12 +39,60 @@ def save_and_reraise_exception():
To work around this, we save the exception state, run handler code, and To work around this, we save the exception state, run handler code, and
then re-raise the original exception. If another exception occurs, the then re-raise the original exception. If another exception occurs, the
saved exception is logged and the new exception is re-raised. saved exception is logged and the new exception is re-raised.
"""
type_, value, tb = sys.exc_info() In some cases the caller may not want to re-raise the exception, and
try: for those circumstances this context provides a reraise flag that
yield can be used to suppress the exception. For example:
except Exception: except Exception:
logging.error(_('Original exception being dropped: %s'), with save_and_reraise_exception() as ctxt:
traceback.format_exception(type_, value, tb)) decide_if_need_reraise()
raise if not should_be_reraised:
raise type_, value, tb ctxt.reraise = False
"""
def __init__(self):
self.reraise = True
def __enter__(self):
self.type_, self.value, self.tb, = sys.exc_info()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
logging.error(_('Original exception being dropped: %s'),
traceback.format_exception(self.type_,
self.value,
self.tb))
return False
if self.reraise:
raise self.type_, self.value, self.tb
def forever_retry_uncaught_exceptions(infunc):
def inner_func(*args, **kwargs):
last_log_time = 0
last_exc_message = None
exc_count = 0
while True:
try:
return infunc(*args, **kwargs)
except Exception as exc:
if exc.message == last_exc_message:
exc_count += 1
else:
exc_count = 1
# Do not log any more frequently than once a minute unless
# the exception message changes
cur_time = int(time.time())
if (cur_time - last_log_time > 60 or
exc.message != last_exc_message):
logging.exception(
_('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
last_log_time = cur_time
last_exc_message = exc.message
exc_count = 0
# This should be a very rare event. In case it isn't, do
# a sleep.
time.sleep(1)
return inner_func

View File

@ -29,7 +29,7 @@ import inspect
from oslo.config import cfg from oslo.config import cfg
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _ # noqa
from nova.openstack.common import importutils from nova.openstack.common import importutils
from nova.openstack.common import local from nova.openstack.common import local
from nova.openstack.common import log as logging from nova.openstack.common import log as logging

View File

@ -36,7 +36,7 @@ from eventlet import queue
from eventlet import semaphore from eventlet import semaphore
from nova.openstack.common import excutils from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _ # noqa
from nova.openstack.common import local from nova.openstack.common import local
from nova.openstack.common import log as logging from nova.openstack.common import log as logging
from nova.openstack.common.rpc import common as rpc_common from nova.openstack.common.rpc import common as rpc_common
@ -221,12 +221,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
failure = rpc_common.serialize_remote_exception(failure, failure = rpc_common.serialize_remote_exception(failure,
log_failure) log_failure)
try: msg = {'result': reply, 'failure': failure}
msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending: if ending:
msg['ending'] = True msg['ending'] = True
_add_unique_id(msg) _add_unique_id(msg)

View File

@ -24,7 +24,7 @@ import traceback
from oslo.config import cfg from oslo.config import cfg
import six import six
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _ # noqa
from nova.openstack.common import importutils from nova.openstack.common import importutils
from nova.openstack.common import jsonutils from nova.openstack.common import jsonutils
from nova.openstack.common import local from nova.openstack.common import local
@ -261,41 +261,20 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data): def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging.""" """Sanitizes the msg_data field before logging."""
SANITIZE = {'set_admin_password': [('args', 'new_pass')], SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
'run_instance': [('args', 'admin_password')],
'route_message': [('args', 'message', 'args', 'method_info',
'method_kwargs', 'password'),
('args', 'message', 'args', 'method_info',
'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE def _fix_passwords(d):
has_context_token = '_context_auth_token' in msg_data """Sanitizes the password fields in the dictionary."""
has_token = 'auth_token' in msg_data for k in d.iterkeys():
if k.lower().find('password') != -1:
d[k] = '<SANITIZED>'
elif k.lower() in SANITIZE:
d[k] = '<SANITIZED>'
elif isinstance(d[k], dict):
_fix_passwords(d[k])
return d
if not any([has_method, has_context_token, has_token]): return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
return log_func(msg, msg_data)
msg_data = copy.deepcopy(msg_data)
if has_method:
for arg in SANITIZE.get(msg_data['method'], []):
try:
d = msg_data
for elem in arg[:-1]:
d = d[elem]
d[arg[-1]] = '<SANITIZED>'
except KeyError as e:
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>'
if has_token:
msg_data['auth_token'] = '<SANITIZED>'
return log_func(msg, msg_data)
def serialize_remote_exception(failure_info, log_failure=True): def serialize_remote_exception(failure_info, log_failure=True):

View File

@ -18,7 +18,6 @@ import functools
import itertools import itertools
import socket import socket
import ssl import ssl
import sys
import time import time
import uuid import uuid
@ -30,15 +29,20 @@ import kombu.entity
import kombu.messaging import kombu.messaging
from oslo.config import cfg from oslo.config import cfg
from nova.openstack.common.gettextutils import _ from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _ # noqa
from nova.openstack.common import network_utils from nova.openstack.common import network_utils
from nova.openstack.common.rpc import amqp as rpc_amqp from nova.openstack.common.rpc import amqp as rpc_amqp
from nova.openstack.common.rpc import common as rpc_common from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common import sslutils
kombu_opts = [ kombu_opts = [
cfg.StrOpt('kombu_ssl_version', cfg.StrOpt('kombu_ssl_version',
default='', default='',
help='SSL version to use (valid only if SSL enabled)'), help='SSL version to use (valid only if SSL enabled). '
'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
'be available on some distributions'
),
cfg.StrOpt('kombu_ssl_keyfile', cfg.StrOpt('kombu_ssl_keyfile',
default='', default='',
help='SSL key file (valid only if SSL enabled)'), help='SSL key file (valid only if SSL enabled)'),
@ -477,7 +481,8 @@ class Connection(object):
# http://docs.python.org/library/ssl.html - ssl.wrap_socket # http://docs.python.org/library/ssl.html - ssl.wrap_socket
if self.conf.kombu_ssl_version: if self.conf.kombu_ssl_version:
ssl_params['ssl_version'] = self.conf.kombu_ssl_version ssl_params['ssl_version'] = sslutils.validate_ssl_version(
self.conf.kombu_ssl_version)
if self.conf.kombu_ssl_keyfile: if self.conf.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
if self.conf.kombu_ssl_certfile: if self.conf.kombu_ssl_certfile:
@ -560,13 +565,11 @@ class Connection(object):
log_info.update(params) log_info.update(params)
if self.max_retries and attempt == self.max_retries: if self.max_retries and attempt == self.max_retries:
LOG.error(_('Unable to connect to AMQP server on ' msg = _('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d ' '%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info) 'tries: %(err_str)s') % log_info
# NOTE(comstud): Copied from original code. There's LOG.error(msg)
# really no better recourse because if this was a queue we raise rpc_common.RPCException(msg)
# need to consume on, we have no way to consume anymore.
sys.exit(1)
if attempt == 1: if attempt == 1:
sleep_time = self.interval_start or 1 sleep_time = self.interval_start or 1
@ -748,6 +751,7 @@ class Connection(object):
def consume_in_thread(self): def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread.""" """Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread(): def _consumer_thread():
try: try:
self.consume() self.consume()

View File

@ -24,7 +24,8 @@ import eventlet
import greenlet import greenlet
from oslo.config import cfg from oslo.config import cfg
from nova.openstack.common.gettextutils import _ from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _ # noqa
from nova.openstack.common import importutils from nova.openstack.common import importutils
from nova.openstack.common import jsonutils from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging from nova.openstack.common import log as logging
@ -118,10 +119,17 @@ class ConsumerBase(object):
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session) self.connect(session)
def connect(self, session):
"""Declare the reciever on connect."""
self._declare_receiver(session)
def reconnect(self, session): def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect.""" """Re-declare the receiver after a qpid reconnect."""
self._declare_receiver(session)
def _declare_receiver(self, session):
self.session = session self.session = session
self.receiver = session.receiver(self.address) self.receiver = session.receiver(self.address)
self.receiver.capacity = 1 self.receiver.capacity = 1
@ -158,6 +166,9 @@ class ConsumerBase(object):
def get_receiver(self): def get_receiver(self):
return self.receiver return self.receiver
def get_node_name(self):
return self.address.split(';')[0]
class DirectConsumer(ConsumerBase): class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'.""" """Queue/consumer class for 'direct'."""
@ -207,6 +218,7 @@ class FanoutConsumer(ConsumerBase):
'topic' is the topic to listen on 'topic' is the topic to listen on
'callback' is the callback to call when messages are received 'callback' is the callback to call when messages are received
""" """
self.conf = conf
super(FanoutConsumer, self).__init__( super(FanoutConsumer, self).__init__(
session, callback, session, callback,
@ -215,6 +227,18 @@ class FanoutConsumer(ConsumerBase):
"%s_fanout_%s" % (topic, uuid.uuid4().hex), "%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True}) {"exclusive": True})
def reconnect(self, session):
topic = self.get_node_name()
params = {
'session': session,
'topic': topic,
'callback': self.callback,
}
self.__init__(conf=self.conf, **params)
super(FanoutConsumer, self).reconnect(session)
class Publisher(object): class Publisher(object):
"""Base Publisher class.""" """Base Publisher class."""
@ -576,6 +600,7 @@ class Connection(object):
def consume_in_thread(self): def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread.""" """Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread(): def _consumer_thread():
try: try:
self.consume() self.consume()

View File

@ -27,7 +27,7 @@ import greenlet
from oslo.config import cfg from oslo.config import cfg
from nova.openstack.common import excutils from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _ # noqa
from nova.openstack.common import importutils from nova.openstack.common import importutils
from nova.openstack.common import jsonutils from nova.openstack.common import jsonutils
from nova.openstack.common.rpc import common as rpc_common from nova.openstack.common.rpc import common as rpc_common

View File

@ -23,7 +23,7 @@ import contextlib
import eventlet import eventlet
from oslo.config import cfg from oslo.config import cfg
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _ # noqa
from nova.openstack.common import log as logging from nova.openstack.common import log as logging

View File

@ -23,7 +23,7 @@ import json
from oslo.config import cfg from oslo.config import cfg
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _ # noqa
from nova.openstack.common import log as logging from nova.openstack.common import log as logging
from nova.openstack.common.rpc import matchmaker as mm from nova.openstack.common.rpc import matchmaker as mm

View File

@ -69,7 +69,7 @@ class RpcProxy(object):
v = vers if vers else self.default_version v = vers if vers else self.default_version
if (self.version_cap and not if (self.version_cap and not
rpc_common.version_is_compatible(self.version_cap, v)): rpc_common.version_is_compatible(self.version_cap, v)):
raise rpc_common.RpcVersionCapError(version=self.version_cap) raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
msg['version'] = v msg['version'] = v
def _get_topic(self, topic): def _get_topic(self, topic):

View File

@ -17,7 +17,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _ # noqa
from nova.openstack.common import log as logging from nova.openstack.common import log as logging
from nova.openstack.common import rpc from nova.openstack.common import rpc
from nova.openstack.common.rpc import dispatcher as rpc_dispatcher from nova.openstack.common.rpc import dispatcher as rpc_dispatcher

View File

@ -0,0 +1,100 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import ssl
from oslo.config import cfg
from nova.openstack.common.gettextutils import _ # noqa
ssl_opts = [
cfg.StrOpt('ca_file',
default=None,
help="CA certificate file to use to verify "
"connecting clients"),
cfg.StrOpt('cert_file',
default=None,
help="Certificate file to use when starting "
"the server securely"),
cfg.StrOpt('key_file',
default=None,
help="Private key file to use when starting "
"the server securely"),
]
CONF = cfg.CONF
CONF.register_opts(ssl_opts, "ssl")
def is_enabled():
cert_file = CONF.ssl.cert_file
key_file = CONF.ssl.key_file
ca_file = CONF.ssl.ca_file
use_ssl = cert_file or key_file
if cert_file and not os.path.exists(cert_file):
raise RuntimeError(_("Unable to find cert_file : %s") % cert_file)
if ca_file and not os.path.exists(ca_file):
raise RuntimeError(_("Unable to find ca_file : %s") % ca_file)
if key_file and not os.path.exists(key_file):
raise RuntimeError(_("Unable to find key_file : %s") % key_file)
if use_ssl and (not cert_file or not key_file):
raise RuntimeError(_("When running server in SSL mode, you must "
"specify both a cert_file and key_file "
"option value in your configuration file"))
return use_ssl
def wrap(sock):
ssl_kwargs = {
'server_side': True,
'certfile': CONF.ssl.cert_file,
'keyfile': CONF.ssl.key_file,
'cert_reqs': ssl.CERT_NONE,
}
if CONF.ssl.ca_file:
ssl_kwargs['ca_certs'] = CONF.ssl.ca_file
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
return ssl.wrap_socket(sock, **ssl_kwargs)
_SSL_PROTOCOLS = {
"tlsv1": ssl.PROTOCOL_TLSv1,
"sslv23": ssl.PROTOCOL_SSLv23,
"sslv3": ssl.PROTOCOL_SSLv3
}
try:
_SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
except AttributeError:
pass
def validate_ssl_version(version):
key = version.lower()
try:
return _SSL_PROTOCOLS[key]
except KeyError:
raise RuntimeError(_("Invalid SSL version : %s") % version)

View File

@ -30,6 +30,7 @@ module=redhat-eventlet.patch
module=rootwrap module=rootwrap
module=rpc module=rpc
module=service module=service
module=sslutils
module=strutils module=strutils
module=threadgroup module=threadgroup
module=timeutils module=timeutils