From 671316ae6d59efee379b3295fa2b666192b3fda3 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Fri, 11 Jun 2010 10:48:16 -0700 Subject: [PATCH 1/2] get rid of anyjson in rpc and fix bad reference to rpc.Connection --- nova/rpc.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index 62c6afff..16027a73 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -1,12 +1,12 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright [2010] [Anso Labs, LLC] -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,12 +18,12 @@ AMQP-based RPC. Queues have consumers and publishers. No fan-out support yet. """ +import json import logging import sys import uuid from nova import vendor -import anyjson from carrot import connection from carrot import messaging from twisted.internet import defer @@ -82,7 +82,7 @@ class Publisher(messaging.Publisher): class TopicConsumer(Consumer): - exchange_type = "topic" + exchange_type = "topic" def __init__(self, connection=None, topic="broadcast"): self.queue = topic self.routing_key = topic @@ -95,7 +95,7 @@ class AdapterConsumer(TopicConsumer): _log.debug('Initing the Adapter Consumer for %s' % (topic)) self.proxy = proxy super(AdapterConsumer, self).__init__(connection=connection, topic=topic) - + def receive(self, message_data, message): _log.debug('received %s' % (message_data)) msg_id = message_data.pop('_msg_id', None) @@ -105,7 +105,7 @@ class AdapterConsumer(TopicConsumer): if not method: return - node_func = getattr(self.proxy, str(method)) + node_func = getattr(self.proxy, str(method)) node_args = dict((str(k), v) for k, v in args.iteritems()) d = defer.maybeDeferred(node_func, **node_args) if msg_id: @@ -116,15 +116,15 @@ class AdapterConsumer(TopicConsumer): class TopicPublisher(Publisher): - exchange_type = "topic" + exchange_type = "topic" def __init__(self, connection=None, topic="broadcast"): self.routing_key = topic self.exchange = FLAGS.control_exchange super(TopicPublisher, self).__init__(connection=connection) - + class DirectConsumer(Consumer): - exchange_type = "direct" + exchange_type = "direct" def __init__(self, connection=None, msg_id=None): self.queue = msg_id self.routing_key = msg_id @@ -145,12 +145,12 @@ class DirectPublisher(Publisher): def msg_reply(msg_id, reply): conn = Connection.instance() publisher = DirectPublisher(connection=conn, msg_id=msg_id) - + try: publisher.send({'result': reply}) except TypeError: publisher.send( - {'result': dict((k, repr(v)) + {'result': dict((k, repr(v)) for k, v in reply.__dict__.iteritems()) }) publisher.close() @@ -161,7 +161,7 @@ def call(topic, msg): msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) _log.debug("MSG_ID is %s" % (msg_id)) - + conn = Connection.instance() d = defer.Deferred() consumer = DirectConsumer(connection=conn, msg_id=msg_id) @@ -198,7 +198,7 @@ def send_message(topic, message, wait=True): _log.debug('message %s', message) if wait: - consumer = messaging.Consumer(connection=rpc.Connection.instance(), + consumer = messaging.Consumer(connection=Connection.instance(), queue=msg_id, exchange=msg_id, auto_delete=True, @@ -206,7 +206,7 @@ def send_message(topic, message, wait=True): routing_key=msg_id) consumer.register_callback(generic_response) - publisher = messaging.Publisher(connection=rpc.Connection.instance(), + publisher = messaging.Publisher(connection=Connection.instance(), exchange="nova", exchange_type="topic", routing_key=topic) @@ -215,8 +215,8 @@ def send_message(topic, message, wait=True): if wait: consumer.wait() - -# TODO: Replace with a docstring test + +# TODO: Replace with a docstring test if __name__ == "__main__": - send_message(sys.argv[1], anyjson.deserialize(sys.argv[2])) + send_message(sys.argv[1], json.loads(sys.argv[2])) From 6af7c4d8076374150b0800a6dffa5d81d63a9ab1 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Fri, 11 Jun 2010 11:07:52 -0700 Subject: [PATCH 2/2] Fix for LoopingCall failing Added in exception logging around amqp calls Creating deferred in receive before ack() message was causing IOError (interrupted system calls), probably because the same message was getting processed twice in some situations, causing the system calls to be doubled. Moving the ack() earlier fixed the problem. The code works now with an interval of 0 but that causes heavy processor usage. An interval of 0.01 keeps the cpu usage within reasonable limits --- nova/rpc.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/nova/rpc.py b/nova/rpc.py index 16027a73..ca622dae 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -30,6 +30,7 @@ from twisted.internet import defer from twisted.internet import reactor from twisted.internet import task +from nova import exception from nova import fakerabbit from nova import flags @@ -73,9 +74,13 @@ class Consumer(messaging.Consumer): attachToTornado = attach_to_tornado + @exception.wrap_exception + def fetch(self, *args, **kwargs): + super(Consumer, self).fetch(*args, **kwargs) + def attach_to_twisted(self): loop = task.LoopingCall(self.fetch, enable_callbacks=True) - loop.start(interval=0.001) + loop.start(interval=0.01) class Publisher(messaging.Publisher): pass @@ -96,13 +101,20 @@ class AdapterConsumer(TopicConsumer): self.proxy = proxy super(AdapterConsumer, self).__init__(connection=connection, topic=topic) + @exception.wrap_exception def receive(self, message_data, message): _log.debug('received %s' % (message_data)) msg_id = message_data.pop('_msg_id', None) method = message_data.get('method') args = message_data.get('args', {}) + message.ack() if not method: + # vish: we may not want to ack here, but that means that bad messages + # stay in the queue indefinitely, so for now we just log the + # message and send an error string back to the caller + _log.warn('no method for message: %s' % (message_data)) + msg_reply(msg_id, 'No method for message: %s' % message_data) return node_func = getattr(self.proxy, str(method)) @@ -111,7 +123,6 @@ class AdapterConsumer(TopicConsumer): if msg_id: d.addCallback(lambda rval: msg_reply(msg_id, rval)) d.addErrback(lambda e: msg_reply(msg_id, str(e))) - message.ack() return