diff --git a/nova/rpc.py b/nova/rpc.py index 62c6afff..ca622dae 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -1,12 +1,12 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright [2010] [Anso Labs, LLC] -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,18 +18,19 @@ AMQP-based RPC. Queues have consumers and publishers. No fan-out support yet. """ +import json import logging import sys import uuid from nova import vendor -import anyjson from carrot import connection from carrot import messaging from twisted.internet import defer from twisted.internet import reactor from twisted.internet import task +from nova import exception from nova import fakerabbit from nova import flags @@ -73,16 +74,20 @@ class Consumer(messaging.Consumer): attachToTornado = attach_to_tornado + @exception.wrap_exception + def fetch(self, *args, **kwargs): + super(Consumer, self).fetch(*args, **kwargs) + def attach_to_twisted(self): loop = task.LoopingCall(self.fetch, enable_callbacks=True) - loop.start(interval=0.001) + loop.start(interval=0.01) class Publisher(messaging.Publisher): pass class TopicConsumer(Consumer): - exchange_type = "topic" + exchange_type = "topic" def __init__(self, connection=None, topic="broadcast"): self.queue = topic self.routing_key = topic @@ -95,36 +100,42 @@ class AdapterConsumer(TopicConsumer): _log.debug('Initing the Adapter Consumer for %s' % (topic)) self.proxy = proxy super(AdapterConsumer, self).__init__(connection=connection, topic=topic) - + + @exception.wrap_exception def receive(self, message_data, message): _log.debug('received %s' % (message_data)) msg_id = message_data.pop('_msg_id', None) method = message_data.get('method') args = message_data.get('args', {}) + message.ack() if not method: + # vish: we may not want to ack here, but that means that bad messages + # stay in the queue indefinitely, so for now we just log the + # message and send an error string back to the caller + _log.warn('no method for message: %s' % (message_data)) + msg_reply(msg_id, 'No method for message: %s' % message_data) return - node_func = getattr(self.proxy, str(method)) + node_func = getattr(self.proxy, str(method)) node_args = dict((str(k), v) for k, v in args.iteritems()) d = defer.maybeDeferred(node_func, **node_args) if msg_id: d.addCallback(lambda rval: msg_reply(msg_id, rval)) d.addErrback(lambda e: msg_reply(msg_id, str(e))) - message.ack() return class TopicPublisher(Publisher): - exchange_type = "topic" + exchange_type = "topic" def __init__(self, connection=None, topic="broadcast"): self.routing_key = topic self.exchange = FLAGS.control_exchange super(TopicPublisher, self).__init__(connection=connection) - + class DirectConsumer(Consumer): - exchange_type = "direct" + exchange_type = "direct" def __init__(self, connection=None, msg_id=None): self.queue = msg_id self.routing_key = msg_id @@ -145,12 +156,12 @@ class DirectPublisher(Publisher): def msg_reply(msg_id, reply): conn = Connection.instance() publisher = DirectPublisher(connection=conn, msg_id=msg_id) - + try: publisher.send({'result': reply}) except TypeError: publisher.send( - {'result': dict((k, repr(v)) + {'result': dict((k, repr(v)) for k, v in reply.__dict__.iteritems()) }) publisher.close() @@ -161,7 +172,7 @@ def call(topic, msg): msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) _log.debug("MSG_ID is %s" % (msg_id)) - + conn = Connection.instance() d = defer.Deferred() consumer = DirectConsumer(connection=conn, msg_id=msg_id) @@ -198,7 +209,7 @@ def send_message(topic, message, wait=True): _log.debug('message %s', message) if wait: - consumer = messaging.Consumer(connection=rpc.Connection.instance(), + consumer = messaging.Consumer(connection=Connection.instance(), queue=msg_id, exchange=msg_id, auto_delete=True, @@ -206,7 +217,7 @@ def send_message(topic, message, wait=True): routing_key=msg_id) consumer.register_callback(generic_response) - publisher = messaging.Publisher(connection=rpc.Connection.instance(), + publisher = messaging.Publisher(connection=Connection.instance(), exchange="nova", exchange_type="topic", routing_key=topic) @@ -215,8 +226,8 @@ def send_message(topic, message, wait=True): if wait: consumer.wait() - -# TODO: Replace with a docstring test + +# TODO: Replace with a docstring test if __name__ == "__main__": - send_message(sys.argv[1], anyjson.deserialize(sys.argv[2])) + send_message(sys.argv[1], json.loads(sys.argv[2]))