pep8 and pylint cleanup

This commit is contained in:
Vishvananda Ishaya
2010-08-13 12:51:38 -07:00
parent ed06bb0055
commit 2d8c53b39f
2 changed files with 122 additions and 38 deletions

View File

@@ -21,14 +21,13 @@ AMQP-based RPC. Queues have consumers and publishers.
No fan-out support yet. No fan-out support yet.
""" """
from carrot import connection from carrot import connection as carrot_connection
from carrot import messaging from carrot import messaging
import json import json
import logging import logging
import sys import sys
import uuid import uuid
from twisted.internet import defer from twisted.internet import defer
from twisted.internet import reactor
from twisted.internet import task from twisted.internet import task
from nova import exception from nova import exception
@@ -39,13 +38,15 @@ from nova import flags
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
_log = logging.getLogger('amqplib') LOG = logging.getLogger('amqplib')
_log.setLevel(logging.DEBUG) LOG.setLevel(logging.DEBUG)
class Connection(connection.BrokerConnection): class Connection(carrot_connection.BrokerConnection):
"""Connection instance object"""
@classmethod @classmethod
def instance(cls): def instance(cls):
"""Returns the instance"""
if not hasattr(cls, '_instance'): if not hasattr(cls, '_instance'):
params = dict(hostname=FLAGS.rabbit_host, params = dict(hostname=FLAGS.rabbit_host,
port=FLAGS.rabbit_port, port=FLAGS.rabbit_port,
@@ -56,18 +57,33 @@ class Connection(connection.BrokerConnection):
if FLAGS.fake_rabbit: if FLAGS.fake_rabbit:
params['backend_cls'] = fakerabbit.Backend params['backend_cls'] = fakerabbit.Backend
# NOTE(vish): magic is fun!
# pylint: disable=W0142
cls._instance = cls(**params) cls._instance = cls(**params)
return cls._instance return cls._instance
@classmethod @classmethod
def recreate(cls): def recreate(cls):
"""Recreates the connection instance
This is necessary to recover from some network errors/disconnects"""
del cls._instance del cls._instance
return cls.instance() return cls.instance()
class Consumer(messaging.Consumer): class Consumer(messaging.Consumer):
"""Consumer base class
Contains methods for connecting the fetch method to async loops
"""
def __init__(self, *args, **kwargs):
self.failed_connection = False
super(Consumer, self).__init__(*args, **kwargs)
# TODO(termie): it would be nice to give these some way of automatically # TODO(termie): it would be nice to give these some way of automatically
# cleaning up after themselves # cleaning up after themselves
def attach_to_tornado(self, io_inst=None): def attach_to_tornado(self, io_inst=None):
"""Attach a callback to tornado that fires 10 times a second"""
from tornado import ioloop from tornado import ioloop
if io_inst is None: if io_inst is None:
io_inst = ioloop.IOLoop.instance() io_inst = ioloop.IOLoop.instance()
@@ -79,33 +95,44 @@ class Consumer(messaging.Consumer):
attachToTornado = attach_to_tornado attachToTornado = attach_to_tornado
def fetch(self, *args, **kwargs): def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
"""Wraps the parent fetch with some logic for failed connections"""
# TODO(vish): the logic for failed connections and logging should be # TODO(vish): the logic for failed connections and logging should be
# refactored into some sort of connection manager object # refactored into some sort of connection manager object
try: try:
if getattr(self, 'failed_connection', False): if self.failed_connection:
# attempt to reconnect # NOTE(vish): conn is defined in the parent class, we can
# recreate it as long as we create the backend too
# pylint: disable=W0201
self.conn = Connection.recreate() self.conn = Connection.recreate()
self.backend = self.conn.create_backend() self.backend = self.conn.create_backend()
super(Consumer, self).fetch(*args, **kwargs) super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
if getattr(self, 'failed_connection', False): if self.failed_connection:
logging.error("Reconnected to queue") logging.error("Reconnected to queue")
self.failed_connection = False self.failed_connection = False
except Exception, ex: # NOTE(vish): This is catching all errors because we really don't
if not getattr(self, 'failed_connection', False): # exceptions to be logged 10 times a second if some
# persistent failure occurs.
except Exception: # pylint: disable=W0703
if not self.failed_connection:
logging.exception("Failed to fetch message from queue") logging.exception("Failed to fetch message from queue")
self.failed_connection = True self.failed_connection = True
def attach_to_twisted(self): def attach_to_twisted(self):
"""Attach a callback to twisted that fires 10 times a second"""
loop = task.LoopingCall(self.fetch, enable_callbacks=True) loop = task.LoopingCall(self.fetch, enable_callbacks=True)
loop.start(interval=0.1) loop.start(interval=0.1)
class Publisher(messaging.Publisher): class Publisher(messaging.Publisher):
"""Publisher base class"""
pass pass
class TopicConsumer(Consumer): class TopicConsumer(Consumer):
"""Consumes messages on a specific topic"""
exchange_type = "topic" exchange_type = "topic"
def __init__(self, connection=None, topic="broadcast"): def __init__(self, connection=None, topic="broadcast"):
self.queue = topic self.queue = topic
self.routing_key = topic self.routing_key = topic
@@ -115,14 +142,24 @@ class TopicConsumer(Consumer):
class AdapterConsumer(TopicConsumer): class AdapterConsumer(TopicConsumer):
"""Calls methods on a proxy object based on method and args"""
def __init__(self, connection=None, topic="broadcast", proxy=None): def __init__(self, connection=None, topic="broadcast", proxy=None):
_log.debug('Initing the Adapter Consumer for %s' % (topic)) LOG.debug('Initing the Adapter Consumer for %s' % (topic))
self.proxy = proxy self.proxy = proxy
super(AdapterConsumer, self).__init__(connection=connection, topic=topic) super(AdapterConsumer, self).__init__(connection=connection,
topic=topic)
@exception.wrap_exception @exception.wrap_exception
def receive(self, message_data, message): def receive(self, message_data, message):
_log.debug('received %s' % (message_data)) """Magically looks for a method on the proxy object and calls it
Message data should be a dictionary with two keys:
method: string representing the method to call
args: dictionary of arg: value
Example: {'method': 'echo', 'args': {'value': 42}}
"""
LOG.debug('received %s' % (message_data))
msg_id = message_data.pop('_msg_id', None) msg_id = message_data.pop('_msg_id', None)
method = message_data.get('method') method = message_data.get('method')
@@ -133,12 +170,14 @@ class AdapterConsumer(TopicConsumer):
# messages stay in the queue indefinitely, so for now # messages stay in the queue indefinitely, so for now
# we just log the message and send an error string # we just log the message and send an error string
# back to the caller # back to the caller
_log.warn('no method for message: %s' % (message_data)) LOG.warn('no method for message: %s' % (message_data))
msg_reply(msg_id, 'No method for message: %s' % message_data) msg_reply(msg_id, 'No method for message: %s' % message_data)
return return
node_func = getattr(self.proxy, str(method)) node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems()) node_args = dict((str(k), v) for k, v in args.iteritems())
# NOTE(vish): magic is fun!
# pylint: disable=W0142
d = defer.maybeDeferred(node_func, **node_args) d = defer.maybeDeferred(node_func, **node_args)
if msg_id: if msg_id:
d.addCallback(lambda rval: msg_reply(msg_id, rval, None)) d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
@@ -147,7 +186,9 @@ class AdapterConsumer(TopicConsumer):
class TopicPublisher(Publisher): class TopicPublisher(Publisher):
"""Publishes messages on a specific topic"""
exchange_type = "topic" exchange_type = "topic"
def __init__(self, connection=None, topic="broadcast"): def __init__(self, connection=None, topic="broadcast"):
self.routing_key = topic self.routing_key = topic
self.exchange = FLAGS.control_exchange self.exchange = FLAGS.control_exchange
@@ -156,7 +197,9 @@ class TopicPublisher(Publisher):
class DirectConsumer(Consumer): class DirectConsumer(Consumer):
"""Consumes messages directly on a channel specified by msg_id"""
exchange_type = "direct" exchange_type = "direct"
def __init__(self, connection=None, msg_id=None): def __init__(self, connection=None, msg_id=None):
self.queue = msg_id self.queue = msg_id
self.routing_key = msg_id self.routing_key = msg_id
@@ -166,7 +209,9 @@ class DirectConsumer(Consumer):
class DirectPublisher(Publisher): class DirectPublisher(Publisher):
"""Publishes messages directly on a channel specified by msg_id"""
exchange_type = "direct" exchange_type = "direct"
def __init__(self, connection=None, msg_id=None): def __init__(self, connection=None, msg_id=None):
self.routing_key = msg_id self.routing_key = msg_id
self.exchange = msg_id self.exchange = msg_id
@@ -175,51 +220,62 @@ class DirectPublisher(Publisher):
def msg_reply(msg_id, reply=None, failure=None): def msg_reply(msg_id, reply=None, failure=None):
"""Sends a reply or an error on the channel signified by msg_id
failure should be a twisted failure object"""
if failure: if failure:
message = failure.getErrorMessage() message = failure.getErrorMessage()
traceback = failure.getTraceback() traceback = failure.getTraceback()
logging.error("Returning exception %s to caller", message) logging.error("Returning exception %s to caller", message)
logging.error(traceback) logging.error(traceback)
failure = (failure.type.__name__, str(failure.value), traceback) failure = (failure.type.__name__, str(failure.value), traceback)
conn = Connection.instance() conn = Connection.instance()
publisher = DirectPublisher(connection=conn, msg_id=msg_id) publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try: try:
publisher.send({'result': reply, 'failure': failure}) publisher.send({'result': reply, 'failure': failure})
except Exception, exc: except TypeError:
publisher.send( publisher.send(
{'result': dict((k, repr(v)) {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()), for k, v in reply.__dict__.iteritems()),
'failure': failure 'failure': failure})
})
publisher.close() publisher.close()
class RemoteError(exception.Error): class RemoteError(exception.Error):
"""signifies that a remote class has raised an exception""" """Signifies that a remote class has raised an exception
def __init__(self, type, value, traceback):
self.type = type Containes a string representation of the type of the original exception,
the value of the original exception, and the traceback. These are
sent to the parent as a joined string so printing the exception
contains all of the relevent info."""
def __init__(self, exc_type, value, traceback):
self.exc_type = exc_type
self.value = value self.value = value
self.traceback = traceback self.traceback = traceback
super(RemoteError, self).__init__("%s %s\n%s" % (type, super(RemoteError, self).__init__("%s %s\n%s" % (exc_type,
value, value,
traceback)) traceback))
def call(topic, msg): def call(topic, msg):
_log.debug("Making asynchronous call...") """Sends a message on a topic and wait for a response"""
LOG.debug("Making asynchronous call...")
msg_id = uuid.uuid4().hex msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id}) msg.update({'_msg_id': msg_id})
_log.debug("MSG_ID is %s" % (msg_id)) LOG.debug("MSG_ID is %s" % (msg_id))
conn = Connection.instance() conn = Connection.instance()
d = defer.Deferred() d = defer.Deferred()
consumer = DirectConsumer(connection=conn, msg_id=msg_id) consumer = DirectConsumer(connection=conn, msg_id=msg_id)
def deferred_receive(data, message): def deferred_receive(data, message):
"""Acks message and callbacks or errbacks"""
message.ack() message.ack()
if data['failure']: if data['failure']:
return d.errback(RemoteError(*data['failure'])) return d.errback(RemoteError(*data['failure']))
else: else:
return d.callback(data['result']) return d.callback(data['result'])
consumer.register_callback(deferred_receive) consumer.register_callback(deferred_receive)
injected = consumer.attach_to_tornado() injected = consumer.attach_to_tornado()
@@ -233,7 +289,8 @@ def call(topic, msg):
def cast(topic, msg): def cast(topic, msg):
_log.debug("Making asynchronous cast...") """Sends a message on a topic without waiting for a response"""
LOG.debug("Making asynchronous cast...")
conn = Connection.instance() conn = Connection.instance()
publisher = TopicPublisher(connection=conn, topic=topic) publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg) publisher.send(msg)
@@ -241,16 +298,18 @@ def cast(topic, msg):
def generic_response(message_data, message): def generic_response(message_data, message):
_log.debug('response %s', message_data) """Logs a result and exits"""
LOG.debug('response %s', message_data)
message.ack() message.ack()
sys.exit(0) sys.exit(0)
def send_message(topic, message, wait=True): def send_message(topic, message, wait=True):
"""Sends a message for testing"""
msg_id = uuid.uuid4().hex msg_id = uuid.uuid4().hex
message.update({'_msg_id': msg_id}) message.update({'_msg_id': msg_id})
_log.debug('topic is %s', topic) LOG.debug('topic is %s', topic)
_log.debug('message %s', message) LOG.debug('message %s', message)
if wait: if wait:
consumer = messaging.Consumer(connection=Connection.instance(), consumer = messaging.Consumer(connection=Connection.instance(),
@@ -273,6 +332,8 @@ def send_message(topic, message, wait=True):
consumer.wait() consumer.wait()
# TODO: Replace with a docstring test
if __name__ == "__main__": if __name__ == "__main__":
# NOTE(vish): you can send messages from the command line using
# topic and a json sting representing a dictionary
# for the method
send_message(sys.argv[1], json.loads(sys.argv[2])) send_message(sys.argv[1], json.loads(sys.argv[2]))

View File

@@ -15,7 +15,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
"""
Unit Tests for remote procedure calls using queue
"""
import logging import logging
from twisted.internet import defer from twisted.internet import defer
@@ -29,7 +31,8 @@ FLAGS = flags.FLAGS
class RpcTestCase(test.BaseTestCase): class RpcTestCase(test.BaseTestCase):
def setUp(self): """Test cases for rpc"""
def setUp(self): # pylint: disable=C0103
super(RpcTestCase, self).setUp() super(RpcTestCase, self).setUp()
self.conn = rpc.Connection.instance() self.conn = rpc.Connection.instance()
self.receiver = TestReceiver() self.receiver = TestReceiver()
@@ -40,23 +43,43 @@ class RpcTestCase(test.BaseTestCase):
self.injected.append(self.consumer.attach_to_tornado(self.ioloop)) self.injected.append(self.consumer.attach_to_tornado(self.ioloop))
def test_call_succeed(self): def test_call_succeed(self):
"""Get a value through rpc call"""
value = 42 value = 42
result = yield rpc.call('test', {"method": "echo", "args": {"value": value}}) result = yield rpc.call('test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result) self.assertEqual(value, result)
def test_call_exception(self): def test_call_exception(self):
"""Test that exception gets passed back properly
rpc.call returns a RemoteError object. The value of the
exception is converted to a string, so we convert it back
to an int in the test.
"""
value = 42 value = 42
self.assertFailure(rpc.call('test', {"method": "fail", "args": {"value": value}}), rpc.RemoteError) self.assertFailure(rpc.call('test', {"method": "fail",
"args": {"value": value}}),
rpc.RemoteError)
try: try:
yield rpc.call('test', {"method": "fail", "args": {"value": value}}) yield rpc.call('test', {"method": "fail",
"args": {"value": value}})
self.fail("should have thrown rpc.RemoteError") self.fail("should have thrown rpc.RemoteError")
except rpc.RemoteError as exc: except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value) self.assertEqual(int(exc.value), value)
class TestReceiver(object): class TestReceiver(object):
def echo(self, value): """Simple Proxy class so the consumer has methods to call
Uses static methods because we aren't actually storing any state"""
@staticmethod
def echo(value):
"""Simply returns whatever value is sent in"""
logging.debug("Received %s", value) logging.debug("Received %s", value)
return defer.succeed(value) return defer.succeed(value)
def fail(self, value): @staticmethod
def fail(value):
"""Raises an exception with the value sent in"""
raise Exception(value) raise Exception(value)