make rpc.call propogate exception info. Includes tests
This commit is contained in:
@@ -103,7 +103,7 @@ class CloudController(object):
|
|||||||
result = {}
|
result = {}
|
||||||
for instance in self.instdir.all:
|
for instance in self.instdir.all:
|
||||||
if instance['project_id'] == project_id:
|
if instance['project_id'] == project_id:
|
||||||
line = '%s slots=%d' % (instance['private_dns_name'],
|
line = '%s slots=%d' % (instance['private_dns_name'],
|
||||||
INSTANCE_TYPES[instance['instance_type']]['vcpus'])
|
INSTANCE_TYPES[instance['instance_type']]['vcpus'])
|
||||||
if instance['key_name'] in result:
|
if instance['key_name'] in result:
|
||||||
result[instance['key_name']].append(line)
|
result[instance['key_name']].append(line)
|
||||||
@@ -300,7 +300,7 @@ class CloudController(object):
|
|||||||
"user_id": context.user.id,
|
"user_id": context.user.id,
|
||||||
"project_id": context.project.id}})
|
"project_id": context.project.id}})
|
||||||
# NOTE(vish): rpc returned value is in the result key in the dictionary
|
# NOTE(vish): rpc returned value is in the result key in the dictionary
|
||||||
volume = self._get_volume(context, result['result'])
|
volume = self._get_volume(context, result)
|
||||||
defer.returnValue({'volumeSet': [self.format_volume(context, volume)]})
|
defer.returnValue({'volumeSet': [self.format_volume(context, volume)]})
|
||||||
|
|
||||||
def _get_address(self, context, public_ip):
|
def _get_address(self, context, public_ip):
|
||||||
@@ -423,7 +423,7 @@ class CloudController(object):
|
|||||||
i['key_name'] = instance.get('key_name', None)
|
i['key_name'] = instance.get('key_name', None)
|
||||||
if context.user.is_admin():
|
if context.user.is_admin():
|
||||||
i['key_name'] = '%s (%s, %s)' % (i['key_name'],
|
i['key_name'] = '%s (%s, %s)' % (i['key_name'],
|
||||||
instance.get('project_id', None),
|
instance.get('project_id', None),
|
||||||
instance.get('node_name', ''))
|
instance.get('node_name', ''))
|
||||||
i['product_codes_set'] = self._convert_to_set(
|
i['product_codes_set'] = self._convert_to_set(
|
||||||
instance.get('product_codes', None), 'product_code')
|
instance.get('product_codes', None), 'product_code')
|
||||||
@@ -471,11 +471,10 @@ class CloudController(object):
|
|||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def allocate_address(self, context, **kwargs):
|
def allocate_address(self, context, **kwargs):
|
||||||
network_topic = yield self._get_network_topic(context)
|
network_topic = yield self._get_network_topic(context)
|
||||||
alloc_result = yield rpc.call(network_topic,
|
public_ip = yield rpc.call(network_topic,
|
||||||
{"method": "allocate_elastic_ip",
|
{"method": "allocate_elastic_ip",
|
||||||
"args": {"user_id": context.user.id,
|
"args": {"user_id": context.user.id,
|
||||||
"project_id": context.project.id}})
|
"project_id": context.project.id}})
|
||||||
public_ip = alloc_result['result']
|
|
||||||
defer.returnValue({'addressSet': [{'publicIp': public_ip}]})
|
defer.returnValue({'addressSet': [{'publicIp': public_ip}]})
|
||||||
|
|
||||||
@rbac.allow('netadmin')
|
@rbac.allow('netadmin')
|
||||||
@@ -516,11 +515,10 @@ class CloudController(object):
|
|||||||
"""Retrieves the network host for a project"""
|
"""Retrieves the network host for a project"""
|
||||||
host = network_service.get_host_for_project(context.project.id)
|
host = network_service.get_host_for_project(context.project.id)
|
||||||
if not host:
|
if not host:
|
||||||
result = yield rpc.call(FLAGS.network_topic,
|
host = yield rpc.call(FLAGS.network_topic,
|
||||||
{"method": "set_network_host",
|
{"method": "set_network_host",
|
||||||
"args": {"user_id": context.user.id,
|
"args": {"user_id": context.user.id,
|
||||||
"project_id": context.project.id}})
|
"project_id": context.project.id}})
|
||||||
host = result['result']
|
|
||||||
defer.returnValue('%s.%s' %(FLAGS.network_topic, host))
|
defer.returnValue('%s.%s' %(FLAGS.network_topic, host))
|
||||||
|
|
||||||
@rbac.allow('projectmanager', 'sysadmin')
|
@rbac.allow('projectmanager', 'sysadmin')
|
||||||
@@ -563,13 +561,12 @@ class CloudController(object):
|
|||||||
vpn = False
|
vpn = False
|
||||||
if image_id == FLAGS.vpn_image_id:
|
if image_id == FLAGS.vpn_image_id:
|
||||||
vpn = True
|
vpn = True
|
||||||
allocate_result = yield rpc.call(network_topic,
|
allocate_data = yield rpc.call(network_topic,
|
||||||
{"method": "allocate_fixed_ip",
|
{"method": "allocate_fixed_ip",
|
||||||
"args": {"user_id": context.user.id,
|
"args": {"user_id": context.user.id,
|
||||||
"project_id": context.project.id,
|
"project_id": context.project.id,
|
||||||
"security_group": security_group,
|
"security_group": security_group,
|
||||||
"vpn": vpn}})
|
"vpn": vpn}})
|
||||||
allocate_data = allocate_result['result']
|
|
||||||
inst = self.instdir.new()
|
inst = self.instdir.new()
|
||||||
inst['image_id'] = image_id
|
inst['image_id'] = image_id
|
||||||
inst['kernel_id'] = kernel_id
|
inst['kernel_id'] = kernel_id
|
||||||
|
|||||||
38
nova/rpc.py
38
nova/rpc.py
@@ -40,7 +40,7 @@ FLAGS = flags.FLAGS
|
|||||||
|
|
||||||
|
|
||||||
_log = logging.getLogger('amqplib')
|
_log = logging.getLogger('amqplib')
|
||||||
_log.setLevel(logging.WARN)
|
_log.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
class Connection(connection.BrokerConnection):
|
class Connection(connection.BrokerConnection):
|
||||||
@@ -141,8 +141,8 @@ class AdapterConsumer(TopicConsumer):
|
|||||||
node_args = dict((str(k), v) for k, v in args.iteritems())
|
node_args = dict((str(k), v) for k, v in args.iteritems())
|
||||||
d = defer.maybeDeferred(node_func, **node_args)
|
d = defer.maybeDeferred(node_func, **node_args)
|
||||||
if msg_id:
|
if msg_id:
|
||||||
d.addCallback(lambda rval: msg_reply(msg_id, rval))
|
d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
|
||||||
d.addErrback(lambda e: msg_reply(msg_id, str(e)))
|
d.addErrback(lambda e: msg_reply(msg_id, None, e))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
@@ -174,20 +174,37 @@ class DirectPublisher(Publisher):
|
|||||||
super(DirectPublisher, self).__init__(connection=connection)
|
super(DirectPublisher, self).__init__(connection=connection)
|
||||||
|
|
||||||
|
|
||||||
def msg_reply(msg_id, reply):
|
def msg_reply(msg_id, reply=None, failure=None):
|
||||||
|
if failure:
|
||||||
|
message = failure.getErrorMessage()
|
||||||
|
traceback = failure.getTraceback()
|
||||||
|
logging.error("Returning exception %s to caller", message)
|
||||||
|
logging.error(traceback)
|
||||||
|
failure = (failure.type.__name__, str(failure.value), traceback)
|
||||||
conn = Connection.instance()
|
conn = Connection.instance()
|
||||||
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
|
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
publisher.send({'result': reply})
|
publisher.send({'result': reply, 'failure': failure})
|
||||||
except TypeError:
|
except Exception, exc:
|
||||||
publisher.send(
|
publisher.send(
|
||||||
{'result': dict((k, repr(v))
|
{'result': dict((k, repr(v))
|
||||||
for k, v in reply.__dict__.iteritems())
|
for k, v in reply.__dict__.iteritems()),
|
||||||
|
'failure': failure
|
||||||
})
|
})
|
||||||
publisher.close()
|
publisher.close()
|
||||||
|
|
||||||
|
|
||||||
|
class RemoteError(exception.Error):
|
||||||
|
"""signifies that a remote class has raised an exception"""
|
||||||
|
def __init__(self, type, value, traceback):
|
||||||
|
self.type = type
|
||||||
|
self.value = value
|
||||||
|
self.traceback = traceback
|
||||||
|
super(RemoteError, self).__init__("%s %s\n%s" % (type,
|
||||||
|
value,
|
||||||
|
traceback))
|
||||||
|
|
||||||
|
|
||||||
def call(topic, msg):
|
def call(topic, msg):
|
||||||
_log.debug("Making asynchronous call...")
|
_log.debug("Making asynchronous call...")
|
||||||
msg_id = uuid.uuid4().hex
|
msg_id = uuid.uuid4().hex
|
||||||
@@ -199,7 +216,10 @@ def call(topic, msg):
|
|||||||
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
|
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
|
||||||
def deferred_receive(data, message):
|
def deferred_receive(data, message):
|
||||||
message.ack()
|
message.ack()
|
||||||
d.callback(data)
|
if data['failure']:
|
||||||
|
return d.errback(RemoteError(*data['failure']))
|
||||||
|
else:
|
||||||
|
return d.callback(data['result'])
|
||||||
consumer.register_callback(deferred_receive)
|
consumer.register_callback(deferred_receive)
|
||||||
injected = consumer.attach_to_tornado()
|
injected = consumer.attach_to_tornado()
|
||||||
|
|
||||||
|
|||||||
62
nova/tests/rpc_unittest.py
Normal file
62
nova/tests/rpc_unittest.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2010 United States Government as represented by the
|
||||||
|
# Administrator of the National Aeronautics and Space Administration.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from nova import flags
|
||||||
|
from nova import rpc
|
||||||
|
from nova import test
|
||||||
|
|
||||||
|
|
||||||
|
FLAGS = flags.FLAGS
|
||||||
|
|
||||||
|
|
||||||
|
class RpcTestCase(test.BaseTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(RpcTestCase, self).setUp()
|
||||||
|
self.conn = rpc.Connection.instance()
|
||||||
|
self.receiver = TestReceiver()
|
||||||
|
self.consumer = rpc.AdapterConsumer(connection=self.conn,
|
||||||
|
topic='test',
|
||||||
|
proxy=self.receiver)
|
||||||
|
|
||||||
|
self.injected.append(self.consumer.attach_to_tornado(self.ioloop))
|
||||||
|
|
||||||
|
def test_call_succeed(self):
|
||||||
|
value = 42
|
||||||
|
result = yield rpc.call('test', {"method": "echo", "args": {"value": value}})
|
||||||
|
self.assertEqual(value, result)
|
||||||
|
|
||||||
|
def test_call_exception(self):
|
||||||
|
value = 42
|
||||||
|
self.assertFailure(rpc.call('test', {"method": "fail", "args": {"value": value}}), rpc.RemoteError)
|
||||||
|
try:
|
||||||
|
yield rpc.call('test', {"method": "fail", "args": {"value": value}})
|
||||||
|
self.fail("should have thrown rpc.RemoteError")
|
||||||
|
except rpc.RemoteError as exc:
|
||||||
|
self.assertEqual(int(exc.value), value)
|
||||||
|
|
||||||
|
class TestReceiver(object):
|
||||||
|
def echo(self, value):
|
||||||
|
logging.debug("Received %s", value)
|
||||||
|
return defer.succeed(value)
|
||||||
|
|
||||||
|
def fail(self, value):
|
||||||
|
raise Exception(value)
|
||||||
@@ -59,6 +59,7 @@ from nova.tests.model_unittest import *
|
|||||||
from nova.tests.network_unittest import *
|
from nova.tests.network_unittest import *
|
||||||
from nova.tests.objectstore_unittest import *
|
from nova.tests.objectstore_unittest import *
|
||||||
from nova.tests.process_unittest import *
|
from nova.tests.process_unittest import *
|
||||||
|
from nova.tests.rpc_unittest import *
|
||||||
from nova.tests.validator_unittest import *
|
from nova.tests.validator_unittest import *
|
||||||
from nova.tests.volume_unittest import *
|
from nova.tests.volume_unittest import *
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user