Merge "Add the serialization of exceptions for RPC calls."

This commit is contained in:
Jenkins 2012-04-10 20:11:45 +00:00 committed by Gerrit Code Review
commit 584c968259
12 changed files with 330 additions and 96 deletions

View File

@ -918,6 +918,11 @@
###### (StrOpt) path to s3 buckets ###### (StrOpt) path to s3 buckets
# buckets_path="$state_path/buckets" # buckets_path="$state_path/buckets"
######### defined in nova.rpc.common #########
###### (ListOpt) Modules of exceptions that are permitted to be recreated
# allowed_rpc_exception_modules="nova.exception"
######### defined in nova.rpc.impl_kombu ######### ######### defined in nova.rpc.impl_kombu #########
###### (StrOpt) SSL certification authority file (valid only if SSL enabled) ###### (StrOpt) SSL certification authority file (valid only if SSL enabled)

View File

@ -41,7 +41,6 @@ from nova import flags
from nova.image import s3 from nova.image import s3
from nova import log as logging from nova import log as logging
from nova import network from nova import network
from nova.rpc import common as rpc_common
from nova import utils from nova import utils
from nova import volume from nova import volume
@ -1253,15 +1252,8 @@ class CloudController(object):
def allocate_address(self, context, **kwargs): def allocate_address(self, context, **kwargs):
LOG.audit(_("Allocate address"), context=context) LOG.audit(_("Allocate address"), context=context)
try:
public_ip = self.network_api.allocate_floating_ip(context) public_ip = self.network_api.allocate_floating_ip(context)
return {'publicIp': public_ip} return {'publicIp': public_ip}
except rpc_common.RemoteError as ex:
# NOTE(tr3buchet) - why does this block exist?
if ex.exc_type == 'NoMoreFloatingIps':
raise exception.NoMoreFloatingIps()
else:
raise
def release_address(self, context, public_ip, **kwargs): def release_address(self, context, public_ip, **kwargs):
LOG.audit(_("Release address %s"), public_ip, context=context) LOG.audit(_("Release address %s"), public_ip, context=context)

View File

@ -152,16 +152,12 @@ class FloatingIPController(object):
try: try:
address = self.network_api.allocate_floating_ip(context, pool) address = self.network_api.allocate_floating_ip(context, pool)
ip = self.network_api.get_floating_ip_by_address(context, address) ip = self.network_api.get_floating_ip_by_address(context, address)
except rpc_common.RemoteError as ex: except exception.NoMoreFloatingIps:
# NOTE(tr3buchet) - why does this block exist?
if ex.exc_type == 'NoMoreFloatingIps':
if pool: if pool:
msg = _("No more floating ips in pool %s.") % pool msg = _("No more floating ips in pool %s.") % pool
else: else:
msg = _("No more floating ips available.") msg = _("No more floating ips available.")
raise webob.exc.HTTPBadRequest(explanation=msg) raise webob.exc.HTTPBadRequest(explanation=msg)
else:
raise
return _translate_floating_ip_view(ip) return _translate_floating_ip_view(ip)
@ -212,9 +208,6 @@ class FloatingIPActionController(wsgi.Controller):
except exception.FixedIpNotFoundForInstance: except exception.FixedIpNotFoundForInstance:
msg = _("No fixed ips associated to instance") msg = _("No fixed ips associated to instance")
raise webob.exc.HTTPBadRequest(explanation=msg) raise webob.exc.HTTPBadRequest(explanation=msg)
except rpc_common.RemoteError:
msg = _("Associate floating ip failed")
raise webob.exc.HTTPInternalServerError(explanation=msg)
return webob.Response(status_int=202) return webob.Response(status_int=202)

View File

@ -213,26 +213,10 @@ class API(base.Base):
'rxtx_factor': instance['instance_type']['rxtx_factor'], 'rxtx_factor': instance['instance_type']['rxtx_factor'],
'host': instance['host'], 'host': instance['host'],
'project_id': instance['project_id']} 'project_id': instance['project_id']}
try:
nw_info = rpc.call(context, FLAGS.network_topic, nw_info = rpc.call(context, FLAGS.network_topic,
{'method': 'get_instance_nw_info', {'method': 'get_instance_nw_info',
'args': args}) 'args': args})
return network_model.NetworkInfo.hydrate(nw_info) return network_model.NetworkInfo.hydrate(nw_info)
# FIXME(comstud) rpc calls raise RemoteError if the remote raises
# an exception. In the case here, because of a race condition,
# it's possible the remote will raise a InstanceNotFound when
# someone deletes the instance while this call is in progress.
#
# Unfortunately, we don't have access to the original exception
# class now.. but we do have the exception class's name. So,
# we're checking it here and raising a new exception.
#
# Ultimately we need RPC to be able to serialize more things like
# classes.
except rpc_common.RemoteError as err:
if err.exc_type == 'InstanceNotFound':
raise exception.InstanceNotFound(instance_id=instance['id'])
raise
def validate_networks(self, context, requested_networks): def validate_networks(self, context, requested_networks):
"""validate the networks passed at the time of creating """validate the networks passed at the time of creating

View File

@ -27,7 +27,6 @@ AMQP, but is deprecated and predates this code.
import inspect import inspect
import sys import sys
import traceback
import uuid import uuid
from eventlet import greenpool from eventlet import greenpool
@ -141,11 +140,7 @@ def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False):
""" """
with ConnectionContext(connection_pool) as conn: with ConnectionContext(connection_pool) as conn:
if failure: if failure:
message = str(failure[1]) failure = rpc_common.serialize_remote_exception(failure)
tb = traceback.format_exception(*failure)
LOG.error(_("Returning exception %s to caller"), message)
LOG.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
try: try:
msg = {'result': reply, 'failure': failure} msg = {'result': reply, 'failure': failure}
@ -285,7 +280,9 @@ class MulticallWaiter(object):
def __call__(self, data): def __call__(self, data):
"""The consume() callback will call this. Store the result.""" """The consume() callback will call this. Store the result."""
if data['failure']: if data['failure']:
self._result = rpc_common.RemoteError(*data['failure']) failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(failure)
elif data.get('ending', False): elif data.get('ending', False):
self._got_ending = True self._got_ending = True
else: else:

View File

@ -18,11 +18,14 @@
# under the License. # under the License.
import copy import copy
import sys
import traceback
from nova import exception from nova import exception
from nova import flags from nova import flags
from nova import log as logging from nova import log as logging
from nova.openstack.common import cfg from nova.openstack.common import cfg
from nova import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -37,9 +40,14 @@ rpc_opts = [
cfg.IntOpt('rpc_response_timeout', cfg.IntOpt('rpc_response_timeout',
default=60, default=60,
help='Seconds to wait for a response from call or multicall'), help='Seconds to wait for a response from call or multicall'),
cfg.IntOpt('allowed_rpc_exception_modules',
default=['nova.exception'],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
] ]
flags.FLAGS.register_opts(rpc_opts) flags.FLAGS.register_opts(rpc_opts)
FLAGS = flags.FLAGS
class RemoteError(exception.NovaException): class RemoteError(exception.NovaException):
@ -158,3 +166,74 @@ def _safe_log(log_func, msg, msg_data):
msg_data['auth_token'] = '<SANITIZED>' msg_data['auth_token'] = '<SANITIZED>'
return log_func(msg, msg_data) return log_func(msg, msg_data)
def serialize_remote_exception(failure_info):
"""Prepares exception data to be sent over rpc.
Failure_info should be a sys.exc_info() tuple.
"""
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
LOG.error(_("Returning exception %s to caller"), unicode(failure))
LOG.error(tb)
kwargs = {}
if hasattr(failure, 'kwargs'):
kwargs = failure.kwargs
data = {
'class': str(failure.__class__.__name__),
'module': str(failure.__class__.__module__),
'message': unicode(failure),
'tb': tb,
'args': failure.args,
'kwargs': kwargs
}
json_data = utils.dumps(data)
return json_data
def deserialize_remote_exception(data):
failure = utils.loads(str(data))
trace = failure.get('tb', [])
message = failure.get('message', "") + "\n" + "\n".join(trace)
name = failure.get('class')
module = failure.get('module')
# NOTE(ameade): We DO NOT want to allow just any module to be imported, in
# order to prevent arbitrary code execution.
if not module in FLAGS.allowed_rpc_exception_modules:
return RemoteError(name, failure.get('message'), trace)
try:
__import__(module)
mod = sys.modules[module]
klass = getattr(mod, name)
if not issubclass(klass, Exception):
raise TypeError("Can only deserialize Exceptions")
failure = klass(**failure.get('kwargs', {}))
except (AttributeError, TypeError, ImportError):
return RemoteError(name, failure.get('message'), trace)
ex_type = type(failure)
str_override = lambda self: message
new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
{'__str__': str_override})
try:
# NOTE(ameade): Dynamically create a new exception type and swap it in
# as the new type for the exception. This only works on user defined
# Exceptions and not core python exceptions. This is important because
# we cannot necessarily change an exception message so we must override
# the __str__ method.
failure.__class__ = new_ex_type
except TypeError as e:
# NOTE(ameade): If a core exception then just add the traceback to the
# first exception argument.
failure.args = (message,) + failure.args[1:]
return failure

View File

@ -77,12 +77,8 @@ class Consumer(object):
else: else:
res.append(rval) res.append(rval)
done.send(res) done.send(res)
except Exception: except Exception as e:
exc_info = sys.exc_info() done.send_exception(e)
done.send_exception(
rpc_common.RemoteError(exc_info[0].__name__,
str(exc_info[1]),
''.join(traceback.format_exception(*exc_info))))
thread = eventlet.greenthread.spawn(_inner) thread = eventlet.greenthread.spawn(_inner)
@ -161,7 +157,7 @@ def call(context, topic, msg, timeout=None):
def cast(context, topic, msg): def cast(context, topic, msg):
try: try:
call(context, topic, msg) call(context, topic, msg)
except rpc_common.RemoteError: except Exception:
pass pass
@ -184,5 +180,5 @@ def fanout_cast(context, topic, msg):
for consumer in CONSUMERS.get(topic, []): for consumer in CONSUMERS.get(topic, []):
try: try:
consumer.call(context, method, args, None) consumer.call(context, method, args, None)
except rpc_common.RemoteError: except Exception:
pass pass

View File

@ -362,7 +362,7 @@ class Scheduler(object):
{"method": 'compare_cpu', {"method": 'compare_cpu',
"args": {'cpu_info': oservice_ref['cpu_info']}}) "args": {'cpu_info': oservice_ref['cpu_info']}})
except rpc_common.RemoteError: except exception.InvalidCPUInfo:
src = instance_ref['host'] src = instance_ref['host']
LOG.exception(_("host %(dest)s is not compatible with " LOG.exception(_("host %(dest)s is not compatible with "
"original host %(src)s.") % locals()) "original host %(src)s.") % locals())
@ -446,17 +446,12 @@ class Scheduler(object):
available = available_gb * (1024 ** 3) available = available_gb * (1024 ** 3)
# Getting necessary disk size # Getting necessary disk size
try:
topic = db.queue_get_for(context, FLAGS.compute_topic, topic = db.queue_get_for(context, FLAGS.compute_topic,
instance_ref['host']) instance_ref['host'])
ret = rpc.call(context, topic, ret = rpc.call(context, topic,
{"method": 'get_instance_disk_info', {"method": 'get_instance_disk_info',
"args": {'instance_name': instance_ref['name']}}) "args": {'instance_name': instance_ref['name']}})
disk_infos = utils.loads(ret) disk_infos = utils.loads(ret)
except rpc_common.RemoteError:
LOG.exception(_("host %(dest)s is not compatible with "
"original host %(src)s.") % locals())
raise
necessary = 0 necessary = 0
if disk_over_commit: if disk_over_commit:

View File

@ -20,6 +20,7 @@ import webob
from nova.api.openstack.compute.contrib import floating_ips from nova.api.openstack.compute.contrib import floating_ips
from nova import context from nova import context
from nova import db from nova import db
from nova import exception
from nova import network from nova import network
from nova import compute from nova import compute
from nova import rpc from nova import rpc
@ -184,6 +185,18 @@ class FloatingIpTest(test.TestCase):
self.assertEqual(res_dict['floating_ip']['ip'], '10.10.10.10') self.assertEqual(res_dict['floating_ip']['ip'], '10.10.10.10')
self.assertEqual(res_dict['floating_ip']['instance_id'], None) self.assertEqual(res_dict['floating_ip']['instance_id'], None)
def test_floating_ip_show_not_found(self):
def fake_get_floating_ip(*args, **kwargs):
raise exception.FloatingIpNotFound()
self.stubs.Set(network.api.API, "get_floating_ip",
fake_get_floating_ip)
req = fakes.HTTPRequest.blank('/v2/fake/os-floating-ips/9876')
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.show, req, 9876)
def test_show_associated_floating_ip(self): def test_show_associated_floating_ip(self):
def get_floating_ip(self, context, id): def get_floating_ip(self, context, id):
return {'id': 1, 'address': '10.10.10.10', 'pool': 'nova', return {'id': 1, 'address': '10.10.10.10', 'pool': 'nova',
@ -205,7 +218,7 @@ class FloatingIpTest(test.TestCase):
# test floating ip allocate/release(deallocate) # test floating ip allocate/release(deallocate)
def test_floating_ip_allocate_no_free_ips(self): def test_floating_ip_allocate_no_free_ips(self):
def fake_call(*args, **kwargs): def fake_call(*args, **kwargs):
raise(rpc_common.RemoteError('NoMoreFloatingIps', '', '')) raise exception.NoMoreFloatingIps()
self.stubs.Set(rpc, "call", fake_call) self.stubs.Set(rpc, "call", fake_call)

View File

@ -25,6 +25,7 @@ from eventlet import greenthread
import nose import nose
from nova import context from nova import context
from nova import exception
from nova import log as logging from nova import log as logging
from nova.rpc import amqp as rpc_amqp from nova.rpc import amqp as rpc_amqp
from nova.rpc import common as rpc_common from nova.rpc import common as rpc_common
@ -100,30 +101,6 @@ class BaseRpcTestCase(test.TestCase):
"args": {"value": value}}) "args": {"value": value}})
self.assertEqual(self.context.to_dict(), result) self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
"""Test that exception gets passed back properly.
rpc.call returns a RemoteError object. The value of the
exception is converted to a string, so we convert it back
to an int in the test.
"""
value = 42
self.assertRaises(rpc_common.RemoteError,
self.rpc.call,
self.context,
'test',
{"method": "fail",
"args": {"value": value}})
try:
self.rpc.call(self.context,
'test',
{"method": "fail",
"args": {"value": value}})
self.fail("should have thrown RemoteError")
except rpc_common.RemoteError as exc:
self.assertEqual(int(exc.value), value)
def test_nested_calls(self): def test_nested_calls(self):
"""Test that we can do an rpc.call inside another call.""" """Test that we can do an rpc.call inside another call."""
class Nested(object): class Nested(object):
@ -248,7 +225,12 @@ class TestReceiver(object):
@staticmethod @staticmethod
def fail(context, value): def fail(context, value):
"""Raises an exception with the value sent in.""" """Raises an exception with the value sent in."""
raise Exception(value) raise NotImplementedError(value)
@staticmethod
def fail_converted(context, value):
"""Raises an exception with the value sent in."""
raise exception.ConvertedException(explanation=value)
@staticmethod @staticmethod
def block(context, value): def block(context, value):

View File

@ -0,0 +1,147 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for 'common' functons used through rpc code.
"""
import json
import sys
from nova import context
from nova import exception
from nova import flags
from nova import log as logging
from nova import test
from nova.rpc import amqp as rpc_amqp
from nova.rpc import common as rpc_common
from nova.tests.rpc import common
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
def raise_exception():
raise Exception("test")
class FakeUserDefinedException(Exception):
def __init__(self):
Exception.__init__(self, "Test Message")
class RpcCommonTestCase(test.TestCase):
def test_serialize_remote_exception(self):
expected = {
'class': 'Exception',
'module': 'exceptions',
'message': 'test',
}
try:
raise_exception()
except Exception as exc:
failure = rpc_common.serialize_remote_exception(sys.exc_info())
failure = json.loads(failure)
#assure the traceback was added
self.assertEqual(expected['class'], failure['class'])
self.assertEqual(expected['module'], failure['module'])
self.assertEqual(expected['message'], failure['message'])
def test_serialize_remote_nova_exception(self):
def raise_nova_exception():
raise exception.NovaException("test", code=500)
expected = {
'class': 'NovaException',
'module': 'nova.exception',
'kwargs': {'code': 500},
'message': 'test'
}
try:
raise_nova_exception()
except Exception as exc:
failure = rpc_common.serialize_remote_exception(sys.exc_info())
failure = json.loads(failure)
#assure the traceback was added
self.assertEqual(expected['class'], failure['class'])
self.assertEqual(expected['module'], failure['module'])
self.assertEqual(expected['kwargs'], failure['kwargs'])
self.assertEqual(expected['message'], failure['message'])
def test_deserialize_remote_exception(self):
failure = {
'class': 'NovaException',
'module': 'nova.exception',
'message': 'test message',
'tb': ['raise NovaException'],
}
serialized = json.dumps(failure)
after_exc = rpc_common.deserialize_remote_exception(serialized)
self.assertTrue(isinstance(after_exc, exception.NovaException))
self.assertTrue('test message' in unicode(after_exc))
#assure the traceback was added
self.assertTrue('raise NovaException' in unicode(after_exc))
def test_deserialize_remote_exception_bad_module(self):
failure = {
'class': 'popen2',
'module': 'os',
'kwargs': {'cmd': '/bin/echo failed'},
'message': 'foo',
}
serialized = json.dumps(failure)
after_exc = rpc_common.deserialize_remote_exception(serialized)
self.assertTrue(isinstance(after_exc, rpc_common.RemoteError))
def test_deserialize_remote_exception_user_defined_exception(self):
"""Ensure a user defined exception can be deserialized."""
self.flags(allowed_rpc_exception_modules=[self.__class__.__module__])
failure = {
'class': 'FakeUserDefinedException',
'module': self.__class__.__module__,
'tb': ['raise FakeUserDefinedException'],
}
serialized = json.dumps(failure)
after_exc = rpc_common.deserialize_remote_exception(serialized)
self.assertTrue(isinstance(after_exc, FakeUserDefinedException))
#assure the traceback was added
self.assertTrue('raise FakeUserDefinedException' in unicode(after_exc))
def test_deserialize_remote_exception_cannot_recreate(self):
"""Ensure a RemoteError is returned on initialization failure.
If an exception cannot be recreated with it's original class then a
RemoteError with the exception informations should still be returned.
"""
self.flags(allowed_rpc_exception_modules=[self.__class__.__module__])
failure = {
'class': 'FakeIDontExistException',
'module': self.__class__.__module__,
'tb': ['raise FakeIDontExistException'],
}
serialized = json.dumps(failure)
after_exc = rpc_common.deserialize_remote_exception(serialized)
self.assertTrue(isinstance(after_exc, rpc_common.RemoteError))
#assure the traceback was added
self.assertTrue('raise FakeIDontExistException' in unicode(after_exc))

View File

@ -20,6 +20,7 @@ Unit Tests for remote procedure calls using kombu
""" """
from nova import context from nova import context
from nova import exception
from nova import flags from nova import flags
from nova import log as logging from nova import log as logging
from nova import test from nova import test
@ -292,4 +293,54 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
self.assertEqual(self.received_message, message) self.assertEqual(self.received_message, message)
# Only called once, because our stub goes away during reconnection # Only called once, because our stub goes away during reconnection
self.assertEqual(info['called'], 1)
def test_call_exception(self):
"""Test that exception gets passed back properly.
rpc.call returns an Exception object. The value of the
exception is converted to a string.
"""
self.flags(allowed_rpc_exception_modules=['exceptions'])
value = "This is the exception message"
self.assertRaises(NotImplementedError,
self.rpc.call,
self.context,
'test',
{"method": "fail",
"args": {"value": value}})
try:
self.rpc.call(self.context,
'test',
{"method": "fail",
"args": {"value": value}})
self.fail("should have thrown Exception")
except NotImplementedError as exc:
self.assertTrue(value in unicode(exc))
#Traceback should be included in exception message
self.assertTrue('raise NotImplementedError(value)' in unicode(exc))
def test_call_converted_exception(self):
"""Test that exception gets passed back properly.
rpc.call returns an Exception object. The value of the
exception is converted to a string.
"""
value = "This is the exception message"
self.assertRaises(exception.ConvertedException,
self.rpc.call,
self.context,
'test',
{"method": "fail_converted",
"args": {"value": value}})
try:
self.rpc.call(self.context,
'test',
{"method": "fail_converted",
"args": {"value": value}})
self.fail("should have thrown Exception")
except exception.ConvertedException as exc:
self.assertTrue(value in unicode(exc))
#Traceback should be included in exception message
self.assertTrue('exception.ConvertedException' in unicode(exc))