Merge branch 'loop' into combined

This commit is contained in:
Vishvananda Ishaya 2010-06-11 14:07:07 -07:00
commit 7275c83c5c
1 changed files with 31 additions and 20 deletions

View File

@ -1,12 +1,12 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright [2010] [Anso Labs, LLC]
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -18,18 +18,19 @@ AMQP-based RPC. Queues have consumers and publishers.
No fan-out support yet.
"""
import json
import logging
import sys
import uuid
from nova import vendor
import anyjson
from carrot import connection
from carrot import messaging
from twisted.internet import defer
from twisted.internet import reactor
from twisted.internet import task
from nova import exception
from nova import fakerabbit
from nova import flags
@ -73,16 +74,20 @@ class Consumer(messaging.Consumer):
attachToTornado = attach_to_tornado
@exception.wrap_exception
def fetch(self, *args, **kwargs):
super(Consumer, self).fetch(*args, **kwargs)
def attach_to_twisted(self):
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
loop.start(interval=0.001)
loop.start(interval=0.01)
class Publisher(messaging.Publisher):
pass
class TopicConsumer(Consumer):
exchange_type = "topic"
exchange_type = "topic"
def __init__(self, connection=None, topic="broadcast"):
self.queue = topic
self.routing_key = topic
@ -95,36 +100,42 @@ class AdapterConsumer(TopicConsumer):
_log.debug('Initing the Adapter Consumer for %s' % (topic))
self.proxy = proxy
super(AdapterConsumer, self).__init__(connection=connection, topic=topic)
@exception.wrap_exception
def receive(self, message_data, message):
_log.debug('received %s' % (message_data))
msg_id = message_data.pop('_msg_id', None)
method = message_data.get('method')
args = message_data.get('args', {})
message.ack()
if not method:
# vish: we may not want to ack here, but that means that bad messages
# stay in the queue indefinitely, so for now we just log the
# message and send an error string back to the caller
_log.warn('no method for message: %s' % (message_data))
msg_reply(msg_id, 'No method for message: %s' % message_data)
return
node_func = getattr(self.proxy, str(method))
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
d = defer.maybeDeferred(node_func, **node_args)
if msg_id:
d.addCallback(lambda rval: msg_reply(msg_id, rval))
d.addErrback(lambda e: msg_reply(msg_id, str(e)))
message.ack()
return
class TopicPublisher(Publisher):
exchange_type = "topic"
exchange_type = "topic"
def __init__(self, connection=None, topic="broadcast"):
self.routing_key = topic
self.exchange = FLAGS.control_exchange
super(TopicPublisher, self).__init__(connection=connection)
class DirectConsumer(Consumer):
exchange_type = "direct"
exchange_type = "direct"
def __init__(self, connection=None, msg_id=None):
self.queue = msg_id
self.routing_key = msg_id
@ -145,12 +156,12 @@ class DirectPublisher(Publisher):
def msg_reply(msg_id, reply):
conn = Connection.instance()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply})
except TypeError:
publisher.send(
{'result': dict((k, repr(v))
{'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems())
})
publisher.close()
@ -161,7 +172,7 @@ def call(topic, msg):
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
_log.debug("MSG_ID is %s" % (msg_id))
conn = Connection.instance()
d = defer.Deferred()
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
@ -198,7 +209,7 @@ def send_message(topic, message, wait=True):
_log.debug('message %s', message)
if wait:
consumer = messaging.Consumer(connection=rpc.Connection.instance(),
consumer = messaging.Consumer(connection=Connection.instance(),
queue=msg_id,
exchange=msg_id,
auto_delete=True,
@ -206,7 +217,7 @@ def send_message(topic, message, wait=True):
routing_key=msg_id)
consumer.register_callback(generic_response)
publisher = messaging.Publisher(connection=rpc.Connection.instance(),
publisher = messaging.Publisher(connection=Connection.instance(),
exchange="nova",
exchange_type="topic",
routing_key=topic)
@ -215,8 +226,8 @@ def send_message(topic, message, wait=True):
if wait:
consumer.wait()
# TODO: Replace with a docstring test
# TODO: Replace with a docstring test
if __name__ == "__main__":
send_message(sys.argv[1], anyjson.deserialize(sys.argv[2]))
send_message(sys.argv[1], json.loads(sys.argv[2]))