Fix for LoopingCall failing

Added in exception logging around amqp calls
Creating deferred in receive before ack() message
was causing IOError (interrupted system calls), probably
because the same message was getting processed twice
in some situations, causing the system calls to be doubled.
Moving the ack() earlier fixed the problem.
The code works now with an interval of 0 but that causes heavy
processor usage.
An interval of 0.01 keeps the cpu usage within reasonable limits
This commit is contained in:
Vishvananda Ishaya 2010-06-11 11:07:52 -07:00
parent 9c6e1d6648
commit a690001006
1 changed files with 13 additions and 2 deletions

View File

@ -30,6 +30,7 @@ from twisted.internet import defer
from twisted.internet import reactor
from twisted.internet import task
from nova import exception
from nova import fakerabbit
from nova import flags
@ -73,9 +74,13 @@ class Consumer(messaging.Consumer):
attachToTornado = attach_to_tornado
@exception.wrap_exception
def fetch(self, *args, **kwargs):
super(Consumer, self).fetch(*args, **kwargs)
def attach_to_twisted(self):
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
loop.start(interval=0.001)
loop.start(interval=0.01)
class Publisher(messaging.Publisher):
pass
@ -96,13 +101,20 @@ class AdapterConsumer(TopicConsumer):
self.proxy = proxy
super(AdapterConsumer, self).__init__(connection=connection, topic=topic)
@exception.wrap_exception
def receive(self, message_data, message):
_log.debug('received %s' % (message_data))
msg_id = message_data.pop('_msg_id', None)
method = message_data.get('method')
args = message_data.get('args', {})
message.ack()
if not method:
# vish: we may not want to ack here, but that means that bad messages
# stay in the queue indefinitely, so for now we just log the
# message and send an error string back to the caller
_log.warn('no method for message: %s' % (message_data))
msg_reply(msg_id, 'No method for message: %s' % message_data)
return
node_func = getattr(self.proxy, str(method))
@ -111,7 +123,6 @@ class AdapterConsumer(TopicConsumer):
if msg_id:
d.addCallback(lambda rval: msg_reply(msg_id, rval))
d.addErrback(lambda e: msg_reply(msg_id, str(e)))
message.ack()
return