Implements a simplified messaging abstraction with the least amount of impact to the code base.
This commit is contained in:
598
nova/rpc.py
598
nova/rpc.py
@@ -16,597 +16,51 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""AMQP-based RPC.
|
||||
|
||||
Queues have consumers and publishers.
|
||||
|
||||
No fan-out support yet.
|
||||
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
import types
|
||||
import uuid
|
||||
|
||||
from carrot import connection as carrot_connection
|
||||
from carrot import messaging
|
||||
from eventlet import greenpool
|
||||
from eventlet import pools
|
||||
from eventlet import queue
|
||||
import greenlet
|
||||
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova import fakerabbit
|
||||
from nova.utils import load_module
|
||||
from nova.rpc_backends.common import RemoteError, LOG
|
||||
from nova import flags
|
||||
from nova import log as logging
|
||||
from nova import utils
|
||||
|
||||
|
||||
LOG = logging.getLogger('nova.rpc')
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DEFINE_integer('rpc_thread_pool_size', 1024,
|
||||
'Size of RPC thread pool')
|
||||
flags.DEFINE_integer('rpc_conn_pool_size', 30,
|
||||
'Size of RPC connection pool')
|
||||
flags.DEFINE_string('rpc_backend',
|
||||
'nova.rpc_backends.amqp',
|
||||
"The messaging module to use, defaults to AMQP.")
|
||||
|
||||
RPCIMPL = load_module(FLAGS.rpc_backend)
|
||||
|
||||
class Connection(carrot_connection.BrokerConnection):
|
||||
"""Connection instance object."""
|
||||
|
||||
@classmethod
|
||||
def instance(cls, new=True):
|
||||
"""Returns the instance."""
|
||||
if new or not hasattr(cls, '_instance'):
|
||||
params = dict(hostname=FLAGS.rabbit_host,
|
||||
port=FLAGS.rabbit_port,
|
||||
ssl=FLAGS.rabbit_use_ssl,
|
||||
userid=FLAGS.rabbit_userid,
|
||||
password=FLAGS.rabbit_password,
|
||||
virtual_host=FLAGS.rabbit_virtual_host)
|
||||
def create_connection(new=True):
|
||||
return RPCIMPL.Connection.instance(new=True)
|
||||
|
||||
if FLAGS.fake_rabbit:
|
||||
params['backend_cls'] = fakerabbit.Backend
|
||||
|
||||
# NOTE(vish): magic is fun!
|
||||
# pylint: disable=W0142
|
||||
if new:
|
||||
return cls(**params)
|
||||
else:
|
||||
cls._instance = cls(**params)
|
||||
return cls._instance
|
||||
def create_consumer(conn, topic, proxy, fanout=False):
|
||||
if fanout:
|
||||
return RPCIMPL.FanoutAdapterConsumer(
|
||||
connection=conn,
|
||||
topic=topic,
|
||||
proxy=proxy)
|
||||
else:
|
||||
return RPCIMPL.TopicAdapterConsumer(
|
||||
connection=conn,
|
||||
topic=topic,
|
||||
proxy=proxy)
|
||||
|
||||
@classmethod
|
||||
def recreate(cls):
|
||||
"""Recreates the connection instance.
|
||||
|
||||
This is necessary to recover from some network errors/disconnects.
|
||||
|
||||
"""
|
||||
try:
|
||||
del cls._instance
|
||||
except AttributeError, e:
|
||||
# The _instance stuff is for testing purposes. Usually we don't use
|
||||
# it. So don't freak out if it doesn't exist.
|
||||
pass
|
||||
return cls.instance()
|
||||
|
||||
|
||||
class Pool(pools.Pool):
|
||||
"""Class that implements a Pool of Connections."""
|
||||
|
||||
# TODO(comstud): Timeout connections not used in a while
|
||||
def create(self):
|
||||
LOG.debug('Creating new connection')
|
||||
return Connection.instance(new=True)
|
||||
|
||||
# Create a ConnectionPool to use for RPC calls. We'll order the
|
||||
# pool as a stack (LIFO), so that we can potentially loop through and
|
||||
# timeout old unused connections at some point
|
||||
ConnectionPool = Pool(
|
||||
max_size=FLAGS.rpc_conn_pool_size,
|
||||
order_as_stack=True)
|
||||
|
||||
|
||||
class Consumer(messaging.Consumer):
|
||||
"""Consumer base class.
|
||||
|
||||
Contains methods for connecting the fetch method to async loops.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
for i in xrange(FLAGS.rabbit_max_retries):
|
||||
if i > 0:
|
||||
time.sleep(FLAGS.rabbit_retry_interval)
|
||||
try:
|
||||
super(Consumer, self).__init__(*args, **kwargs)
|
||||
self.failed_connection = False
|
||||
break
|
||||
except Exception as e: # Catching all because carrot sucks
|
||||
fl_host = FLAGS.rabbit_host
|
||||
fl_port = FLAGS.rabbit_port
|
||||
fl_intv = FLAGS.rabbit_retry_interval
|
||||
LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
|
||||
' unreachable: %(e)s. Trying again in %(fl_intv)d'
|
||||
' seconds.') % locals())
|
||||
self.failed_connection = True
|
||||
if self.failed_connection:
|
||||
LOG.error(_('Unable to connect to AMQP server '
|
||||
'after %d tries. Shutting down.'),
|
||||
FLAGS.rabbit_max_retries)
|
||||
sys.exit(1)
|
||||
|
||||
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
||||
"""Wraps the parent fetch with some logic for failed connection."""
|
||||
# TODO(vish): the logic for failed connections and logging should be
|
||||
# refactored into some sort of connection manager object
|
||||
try:
|
||||
if self.failed_connection:
|
||||
# NOTE(vish): connection is defined in the parent class, we can
|
||||
# recreate it as long as we create the backend too
|
||||
# pylint: disable=W0201
|
||||
self.connection = Connection.recreate()
|
||||
self.backend = self.connection.create_backend()
|
||||
self.declare()
|
||||
return super(Consumer, self).fetch(no_ack,
|
||||
auto_ack,
|
||||
enable_callbacks)
|
||||
if self.failed_connection:
|
||||
LOG.error(_('Reconnected to queue'))
|
||||
self.failed_connection = False
|
||||
# NOTE(vish): This is catching all errors because we really don't
|
||||
# want exceptions to be logged 10 times a second if some
|
||||
# persistent failure occurs.
|
||||
except Exception, e: # pylint: disable=W0703
|
||||
if not self.failed_connection:
|
||||
LOG.exception(_('Failed to fetch message from queue: %s' % e))
|
||||
self.failed_connection = True
|
||||
|
||||
def attach_to_eventlet(self):
|
||||
"""Only needed for unit tests!"""
|
||||
timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
|
||||
timer.start(0.1)
|
||||
return timer
|
||||
|
||||
|
||||
class AdapterConsumer(Consumer):
|
||||
"""Calls methods on a proxy object based on method and args."""
|
||||
|
||||
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
||||
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
|
||||
self.proxy = proxy
|
||||
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
|
||||
super(AdapterConsumer, self).__init__(connection=connection,
|
||||
topic=topic)
|
||||
self.register_callback(self.process_data)
|
||||
|
||||
def process_data(self, message_data, message):
|
||||
"""Consumer callback to call a method on a proxy object.
|
||||
|
||||
Parses the message for validity and fires off a thread to call the
|
||||
proxy object method.
|
||||
|
||||
Message data should be a dictionary with two keys:
|
||||
method: string representing the method to call
|
||||
args: dictionary of arg: value
|
||||
|
||||
Example: {'method': 'echo', 'args': {'value': 42}}
|
||||
|
||||
"""
|
||||
LOG.debug(_('received %s') % message_data)
|
||||
# This will be popped off in _unpack_context
|
||||
msg_id = message_data.get('_msg_id', None)
|
||||
ctxt = _unpack_context(message_data)
|
||||
|
||||
method = message_data.get('method')
|
||||
args = message_data.get('args', {})
|
||||
message.ack()
|
||||
if not method:
|
||||
# NOTE(vish): we may not want to ack here, but that means that bad
|
||||
# messages stay in the queue indefinitely, so for now
|
||||
# we just log the message and send an error string
|
||||
# back to the caller
|
||||
LOG.warn(_('no method for message: %s') % message_data)
|
||||
if msg_id:
|
||||
msg_reply(msg_id,
|
||||
_('No method for message: %s') % message_data)
|
||||
return
|
||||
self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
|
||||
|
||||
@exception.wrap_exception()
|
||||
def _process_data(self, msg_id, ctxt, method, args):
|
||||
"""Thread that maigcally looks for a method on the proxy
|
||||
object and calls it.
|
||||
"""
|
||||
|
||||
node_func = getattr(self.proxy, str(method))
|
||||
node_args = dict((str(k), v) for k, v in args.iteritems())
|
||||
# NOTE(vish): magic is fun!
|
||||
try:
|
||||
rval = node_func(context=ctxt, **node_args)
|
||||
if msg_id:
|
||||
# Check if the result was a generator
|
||||
if isinstance(rval, types.GeneratorType):
|
||||
for x in rval:
|
||||
msg_reply(msg_id, x, None)
|
||||
else:
|
||||
msg_reply(msg_id, rval, None)
|
||||
|
||||
# This final None tells multicall that it is done.
|
||||
msg_reply(msg_id, None, None)
|
||||
elif isinstance(rval, types.GeneratorType):
|
||||
# NOTE(vish): this iterates through the generator
|
||||
list(rval)
|
||||
except Exception as e:
|
||||
logging.exception('Exception during message handling')
|
||||
if msg_id:
|
||||
msg_reply(msg_id, None, sys.exc_info())
|
||||
return
|
||||
|
||||
|
||||
class TopicAdapterConsumer(AdapterConsumer):
|
||||
"""Consumes messages on a specific topic."""
|
||||
|
||||
exchange_type = 'topic'
|
||||
|
||||
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
||||
self.queue = topic
|
||||
self.routing_key = topic
|
||||
self.exchange = FLAGS.control_exchange
|
||||
self.durable = False
|
||||
super(TopicAdapterConsumer, self).__init__(connection=connection,
|
||||
topic=topic, proxy=proxy)
|
||||
|
||||
|
||||
class FanoutAdapterConsumer(AdapterConsumer):
|
||||
"""Consumes messages from a fanout exchange."""
|
||||
|
||||
exchange_type = 'fanout'
|
||||
|
||||
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
||||
self.exchange = '%s_fanout' % topic
|
||||
self.routing_key = topic
|
||||
unique = uuid.uuid4().hex
|
||||
self.queue = '%s_fanout_%s' % (topic, unique)
|
||||
self.durable = False
|
||||
# Fanout creates unique queue names, so we should auto-remove
|
||||
# them when done, so they're not left around on restart.
|
||||
# Also, we're the only one that should be consuming. exclusive
|
||||
# implies auto_delete, so we'll just set that..
|
||||
self.exclusive = True
|
||||
LOG.info(_('Created "%(exchange)s" fanout exchange '
|
||||
'with "%(key)s" routing key'),
|
||||
dict(exchange=self.exchange, key=self.routing_key))
|
||||
super(FanoutAdapterConsumer, self).__init__(connection=connection,
|
||||
topic=topic, proxy=proxy)
|
||||
|
||||
|
||||
class ConsumerSet(object):
|
||||
"""Groups consumers to listen on together on a single connection."""
|
||||
|
||||
def __init__(self, connection, consumer_list):
|
||||
self.consumer_list = set(consumer_list)
|
||||
self.consumer_set = None
|
||||
self.enabled = True
|
||||
self.init(connection)
|
||||
|
||||
def init(self, conn):
|
||||
if not conn:
|
||||
conn = Connection.instance(new=True)
|
||||
if self.consumer_set:
|
||||
self.consumer_set.close()
|
||||
self.consumer_set = messaging.ConsumerSet(conn)
|
||||
for consumer in self.consumer_list:
|
||||
consumer.connection = conn
|
||||
# consumer.backend is set for us
|
||||
self.consumer_set.add_consumer(consumer)
|
||||
|
||||
def reconnect(self):
|
||||
self.init(None)
|
||||
|
||||
def wait(self, limit=None):
|
||||
running = True
|
||||
while running:
|
||||
it = self.consumer_set.iterconsume(limit=limit)
|
||||
if not it:
|
||||
break
|
||||
while True:
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration:
|
||||
return
|
||||
except greenlet.GreenletExit:
|
||||
running = False
|
||||
break
|
||||
except Exception as e:
|
||||
LOG.exception(_("Exception while processing consumer"))
|
||||
self.reconnect()
|
||||
# Break to outer loop
|
||||
break
|
||||
|
||||
def close(self):
|
||||
self.consumer_set.close()
|
||||
|
||||
|
||||
class Publisher(messaging.Publisher):
|
||||
"""Publisher base class."""
|
||||
pass
|
||||
|
||||
|
||||
class TopicPublisher(Publisher):
|
||||
"""Publishes messages on a specific topic."""
|
||||
|
||||
exchange_type = 'topic'
|
||||
|
||||
def __init__(self, connection=None, topic='broadcast'):
|
||||
self.routing_key = topic
|
||||
self.exchange = FLAGS.control_exchange
|
||||
self.durable = False
|
||||
super(TopicPublisher, self).__init__(connection=connection)
|
||||
|
||||
|
||||
class FanoutPublisher(Publisher):
|
||||
"""Publishes messages to a fanout exchange."""
|
||||
|
||||
exchange_type = 'fanout'
|
||||
|
||||
def __init__(self, topic, connection=None):
|
||||
self.exchange = '%s_fanout' % topic
|
||||
self.queue = '%s_fanout' % topic
|
||||
self.durable = False
|
||||
self.auto_delete = True
|
||||
LOG.info(_('Creating "%(exchange)s" fanout exchange'),
|
||||
dict(exchange=self.exchange))
|
||||
super(FanoutPublisher, self).__init__(connection=connection)
|
||||
|
||||
|
||||
class DirectConsumer(Consumer):
|
||||
"""Consumes messages directly on a channel specified by msg_id."""
|
||||
|
||||
exchange_type = 'direct'
|
||||
|
||||
def __init__(self, connection=None, msg_id=None):
|
||||
self.queue = msg_id
|
||||
self.routing_key = msg_id
|
||||
self.exchange = msg_id
|
||||
self.auto_delete = True
|
||||
self.exclusive = True
|
||||
super(DirectConsumer, self).__init__(connection=connection)
|
||||
|
||||
|
||||
class DirectPublisher(Publisher):
|
||||
"""Publishes messages directly on a channel specified by msg_id."""
|
||||
|
||||
exchange_type = 'direct'
|
||||
|
||||
def __init__(self, connection=None, msg_id=None):
|
||||
self.routing_key = msg_id
|
||||
self.exchange = msg_id
|
||||
self.auto_delete = True
|
||||
super(DirectPublisher, self).__init__(connection=connection)
|
||||
|
||||
|
||||
def msg_reply(msg_id, reply=None, failure=None):
|
||||
"""Sends a reply or an error on the channel signified by msg_id.
|
||||
|
||||
Failure should be a sys.exc_info() tuple.
|
||||
|
||||
"""
|
||||
if failure:
|
||||
message = str(failure[1])
|
||||
tb = traceback.format_exception(*failure)
|
||||
LOG.error(_("Returning exception %s to caller"), message)
|
||||
LOG.error(tb)
|
||||
failure = (failure[0].__name__, str(failure[1]), tb)
|
||||
|
||||
with ConnectionPool.item() as conn:
|
||||
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
|
||||
try:
|
||||
publisher.send({'result': reply, 'failure': failure})
|
||||
except TypeError:
|
||||
publisher.send(
|
||||
{'result': dict((k, repr(v))
|
||||
for k, v in reply.__dict__.iteritems()),
|
||||
'failure': failure})
|
||||
|
||||
publisher.close()
|
||||
|
||||
|
||||
class RemoteError(exception.Error):
|
||||
"""Signifies that a remote class has raised an exception.
|
||||
|
||||
Containes a string representation of the type of the original exception,
|
||||
the value of the original exception, and the traceback. These are
|
||||
sent to the parent as a joined string so printing the exception
|
||||
contains all of the relevent info.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, exc_type, value, traceback):
|
||||
self.exc_type = exc_type
|
||||
self.value = value
|
||||
self.traceback = traceback
|
||||
super(RemoteError, self).__init__('%s %s\n%s' % (exc_type,
|
||||
value,
|
||||
traceback))
|
||||
|
||||
|
||||
def _unpack_context(msg):
|
||||
"""Unpack context from msg."""
|
||||
context_dict = {}
|
||||
for key in list(msg.keys()):
|
||||
# NOTE(vish): Some versions of python don't like unicode keys
|
||||
# in kwargs.
|
||||
key = str(key)
|
||||
if key.startswith('_context_'):
|
||||
value = msg.pop(key)
|
||||
context_dict[key[9:]] = value
|
||||
context_dict['msg_id'] = msg.pop('_msg_id', None)
|
||||
LOG.debug(_('unpacked context: %s'), context_dict)
|
||||
return RpcContext.from_dict(context_dict)
|
||||
|
||||
|
||||
def _pack_context(msg, context):
|
||||
"""Pack context into msg.
|
||||
|
||||
Values for message keys need to be less than 255 chars, so we pull
|
||||
context out into a bunch of separate keys. If we want to support
|
||||
more arguments in rabbit messages, we may want to do the same
|
||||
for args at some point.
|
||||
|
||||
"""
|
||||
context_d = dict([('_context_%s' % key, value)
|
||||
for (key, value) in context.to_dict().iteritems()])
|
||||
msg.update(context_d)
|
||||
|
||||
|
||||
class RpcContext(context.RequestContext):
|
||||
def __init__(self, *args, **kwargs):
|
||||
msg_id = kwargs.pop('msg_id', None)
|
||||
self.msg_id = msg_id
|
||||
super(RpcContext, self).__init__(*args, **kwargs)
|
||||
|
||||
def reply(self, *args, **kwargs):
|
||||
msg_reply(self.msg_id, *args, **kwargs)
|
||||
|
||||
|
||||
def multicall(context, topic, msg):
|
||||
"""Make a call that returns multiple times."""
|
||||
LOG.debug(_('Making asynchronous call on %s ...'), topic)
|
||||
msg_id = uuid.uuid4().hex
|
||||
msg.update({'_msg_id': msg_id})
|
||||
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
||||
_pack_context(msg, context)
|
||||
|
||||
con_conn = ConnectionPool.get()
|
||||
consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
|
||||
wait_msg = MulticallWaiter(consumer)
|
||||
consumer.register_callback(wait_msg)
|
||||
|
||||
publisher = TopicPublisher(connection=con_conn, topic=topic)
|
||||
publisher.send(msg)
|
||||
publisher.close()
|
||||
|
||||
return wait_msg
|
||||
|
||||
|
||||
class MulticallWaiter(object):
|
||||
def __init__(self, consumer):
|
||||
self._consumer = consumer
|
||||
self._results = queue.Queue()
|
||||
self._closed = False
|
||||
|
||||
def close(self):
|
||||
self._closed = True
|
||||
self._consumer.close()
|
||||
ConnectionPool.put(self._consumer.connection)
|
||||
|
||||
def __call__(self, data, message):
|
||||
"""Acks message and sets result."""
|
||||
message.ack()
|
||||
if data['failure']:
|
||||
self._results.put(RemoteError(*data['failure']))
|
||||
else:
|
||||
self._results.put(data['result'])
|
||||
|
||||
def __iter__(self):
|
||||
return self.wait()
|
||||
|
||||
def wait(self):
|
||||
while True:
|
||||
rv = None
|
||||
while rv is None and not self._closed:
|
||||
try:
|
||||
rv = self._consumer.fetch(enable_callbacks=True)
|
||||
except Exception:
|
||||
self.close()
|
||||
raise
|
||||
time.sleep(0.01)
|
||||
|
||||
result = self._results.get()
|
||||
if isinstance(result, Exception):
|
||||
self.close()
|
||||
raise result
|
||||
if result == None:
|
||||
self.close()
|
||||
raise StopIteration
|
||||
yield result
|
||||
def create_consumer_set(conn, consumers):
|
||||
return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers)
|
||||
|
||||
|
||||
def call(context, topic, msg):
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
rv = multicall(context, topic, msg)
|
||||
# NOTE(vish): return the last result from the multicall
|
||||
rv = list(rv)
|
||||
if not rv:
|
||||
return
|
||||
return rv[-1]
|
||||
return RPCIMPL.call(context, topic, msg)
|
||||
|
||||
|
||||
def cast(context, topic, msg):
|
||||
"""Sends a message on a topic without waiting for a response."""
|
||||
LOG.debug(_('Making asynchronous cast on %s...'), topic)
|
||||
_pack_context(msg, context)
|
||||
with ConnectionPool.item() as conn:
|
||||
publisher = TopicPublisher(connection=conn, topic=topic)
|
||||
publisher.send(msg)
|
||||
publisher.close()
|
||||
return RPCIMPL.cast(context, topic, msg)
|
||||
|
||||
|
||||
def fanout_cast(context, topic, msg):
|
||||
"""Sends a message on a fanout exchange without waiting for a response."""
|
||||
LOG.debug(_('Making asynchronous fanout cast...'))
|
||||
_pack_context(msg, context)
|
||||
with ConnectionPool.item() as conn:
|
||||
publisher = FanoutPublisher(topic, connection=conn)
|
||||
publisher.send(msg)
|
||||
publisher.close()
|
||||
return RPCIMPL.fanout_cast(context, topic, msg)
|
||||
|
||||
|
||||
def generic_response(message_data, message):
|
||||
"""Logs a result and exits."""
|
||||
LOG.debug(_('response %s'), message_data)
|
||||
message.ack()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def send_message(topic, message, wait=True):
|
||||
"""Sends a message for testing."""
|
||||
msg_id = uuid.uuid4().hex
|
||||
message.update({'_msg_id': msg_id})
|
||||
LOG.debug(_('topic is %s'), topic)
|
||||
LOG.debug(_('message %s'), message)
|
||||
|
||||
if wait:
|
||||
consumer = messaging.Consumer(connection=Connection.instance(),
|
||||
queue=msg_id,
|
||||
exchange=msg_id,
|
||||
auto_delete=True,
|
||||
exchange_type='direct',
|
||||
routing_key=msg_id)
|
||||
consumer.register_callback(generic_response)
|
||||
|
||||
publisher = messaging.Publisher(connection=Connection.instance(),
|
||||
exchange=FLAGS.control_exchange,
|
||||
durable=False,
|
||||
exchange_type='topic',
|
||||
routing_key=topic)
|
||||
publisher.send(message)
|
||||
publisher.close()
|
||||
|
||||
if wait:
|
||||
consumer.wait()
|
||||
consumer.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# You can send messages from the command line using
|
||||
# topic and a json string representing a dictionary
|
||||
# for the method
|
||||
send_message(sys.argv[1], json.loads(sys.argv[2]))
|
||||
def multicall(context, topic, msg):
|
||||
return RPCIMPL.multicall(context, topic, msg)
|
||||
|
0
nova/rpc_backends/__init__.py
Normal file
0
nova/rpc_backends/__init__.py
Normal file
591
nova/rpc_backends/amqp.py
Normal file
591
nova/rpc_backends/amqp.py
Normal file
@@ -0,0 +1,591 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""AMQP-based RPC.
|
||||
|
||||
Queues have consumers and publishers.
|
||||
|
||||
No fan-out support yet.
|
||||
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
import types
|
||||
import uuid
|
||||
|
||||
from carrot import connection as carrot_connection
|
||||
from carrot import messaging
|
||||
from eventlet import greenpool
|
||||
from eventlet import pools
|
||||
from eventlet import queue
|
||||
import greenlet
|
||||
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova import fakerabbit
|
||||
from nova import flags
|
||||
from nova import log as logging
|
||||
from nova import utils
|
||||
from nova.rpc_backends.common import RemoteError, LOG
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DEFINE_integer('rpc_thread_pool_size', 1024,
|
||||
'Size of RPC thread pool')
|
||||
flags.DEFINE_integer('rpc_conn_pool_size', 30,
|
||||
'Size of RPC connection pool')
|
||||
|
||||
|
||||
class Connection(carrot_connection.BrokerConnection):
|
||||
"""Connection instance object."""
|
||||
|
||||
@classmethod
|
||||
def instance(cls, new=True):
|
||||
"""Returns the instance."""
|
||||
if new or not hasattr(cls, '_instance'):
|
||||
params = dict(hostname=FLAGS.rabbit_host,
|
||||
port=FLAGS.rabbit_port,
|
||||
ssl=FLAGS.rabbit_use_ssl,
|
||||
userid=FLAGS.rabbit_userid,
|
||||
password=FLAGS.rabbit_password,
|
||||
virtual_host=FLAGS.rabbit_virtual_host)
|
||||
|
||||
if FLAGS.fake_rabbit:
|
||||
params['backend_cls'] = fakerabbit.Backend
|
||||
|
||||
# NOTE(vish): magic is fun!
|
||||
# pylint: disable=W0142
|
||||
if new:
|
||||
return cls(**params)
|
||||
else:
|
||||
cls._instance = cls(**params)
|
||||
return cls._instance
|
||||
|
||||
@classmethod
|
||||
def recreate(cls):
|
||||
"""Recreates the connection instance.
|
||||
|
||||
This is necessary to recover from some network errors/disconnects.
|
||||
|
||||
"""
|
||||
try:
|
||||
del cls._instance
|
||||
except AttributeError, e:
|
||||
# The _instance stuff is for testing purposes. Usually we don't use
|
||||
# it. So don't freak out if it doesn't exist.
|
||||
pass
|
||||
return cls.instance()
|
||||
|
||||
|
||||
class Pool(pools.Pool):
|
||||
"""Class that implements a Pool of Connections."""
|
||||
|
||||
# TODO(comstud): Timeout connections not used in a while
|
||||
def create(self):
|
||||
LOG.debug('Creating new connection')
|
||||
return Connection.instance(new=True)
|
||||
|
||||
# Create a ConnectionPool to use for RPC calls. We'll order the
|
||||
# pool as a stack (LIFO), so that we can potentially loop through and
|
||||
# timeout old unused connections at some point
|
||||
ConnectionPool = Pool(
|
||||
max_size=FLAGS.rpc_conn_pool_size,
|
||||
order_as_stack=True)
|
||||
|
||||
|
||||
class Consumer(messaging.Consumer):
|
||||
"""Consumer base class.
|
||||
|
||||
Contains methods for connecting the fetch method to async loops.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
for i in xrange(FLAGS.rabbit_max_retries):
|
||||
if i > 0:
|
||||
time.sleep(FLAGS.rabbit_retry_interval)
|
||||
try:
|
||||
super(Consumer, self).__init__(*args, **kwargs)
|
||||
self.failed_connection = False
|
||||
break
|
||||
except Exception as e: # Catching all because carrot sucks
|
||||
fl_host = FLAGS.rabbit_host
|
||||
fl_port = FLAGS.rabbit_port
|
||||
fl_intv = FLAGS.rabbit_retry_interval
|
||||
LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
|
||||
' unreachable: %(e)s. Trying again in %(fl_intv)d'
|
||||
' seconds.') % locals())
|
||||
self.failed_connection = True
|
||||
if self.failed_connection:
|
||||
LOG.error(_('Unable to connect to AMQP server '
|
||||
'after %d tries. Shutting down.'),
|
||||
FLAGS.rabbit_max_retries)
|
||||
sys.exit(1)
|
||||
|
||||
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
||||
"""Wraps the parent fetch with some logic for failed connection."""
|
||||
# TODO(vish): the logic for failed connections and logging should be
|
||||
# refactored into some sort of connection manager object
|
||||
try:
|
||||
if self.failed_connection:
|
||||
# NOTE(vish): connection is defined in the parent class, we can
|
||||
# recreate it as long as we create the backend too
|
||||
# pylint: disable=W0201
|
||||
self.connection = Connection.recreate()
|
||||
self.backend = self.connection.create_backend()
|
||||
self.declare()
|
||||
return super(Consumer, self).fetch(no_ack,
|
||||
auto_ack,
|
||||
enable_callbacks)
|
||||
if self.failed_connection:
|
||||
LOG.error(_('Reconnected to queue'))
|
||||
self.failed_connection = False
|
||||
# NOTE(vish): This is catching all errors because we really don't
|
||||
# want exceptions to be logged 10 times a second if some
|
||||
# persistent failure occurs.
|
||||
except Exception, e: # pylint: disable=W0703
|
||||
if not self.failed_connection:
|
||||
LOG.exception(_('Failed to fetch message from queue: %s' % e))
|
||||
self.failed_connection = True
|
||||
|
||||
def attach_to_eventlet(self):
|
||||
"""Only needed for unit tests!"""
|
||||
timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
|
||||
timer.start(0.1)
|
||||
return timer
|
||||
|
||||
|
||||
class AdapterConsumer(Consumer):
|
||||
"""Calls methods on a proxy object based on method and args."""
|
||||
|
||||
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
||||
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
|
||||
self.proxy = proxy
|
||||
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
|
||||
super(AdapterConsumer, self).__init__(connection=connection,
|
||||
topic=topic)
|
||||
self.register_callback(self.process_data)
|
||||
|
||||
def process_data(self, message_data, message):
|
||||
"""Consumer callback to call a method on a proxy object.
|
||||
|
||||
Parses the message for validity and fires off a thread to call the
|
||||
proxy object method.
|
||||
|
||||
Message data should be a dictionary with two keys:
|
||||
method: string representing the method to call
|
||||
args: dictionary of arg: value
|
||||
|
||||
Example: {'method': 'echo', 'args': {'value': 42}}
|
||||
|
||||
"""
|
||||
LOG.debug(_('received %s') % message_data)
|
||||
# This will be popped off in _unpack_context
|
||||
msg_id = message_data.get('_msg_id', None)
|
||||
ctxt = _unpack_context(message_data)
|
||||
|
||||
method = message_data.get('method')
|
||||
args = message_data.get('args', {})
|
||||
message.ack()
|
||||
if not method:
|
||||
# NOTE(vish): we may not want to ack here, but that means that bad
|
||||
# messages stay in the queue indefinitely, so for now
|
||||
# we just log the message and send an error string
|
||||
# back to the caller
|
||||
LOG.warn(_('no method for message: %s') % message_data)
|
||||
if msg_id:
|
||||
msg_reply(msg_id,
|
||||
_('No method for message: %s') % message_data)
|
||||
return
|
||||
self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
|
||||
|
||||
@exception.wrap_exception()
|
||||
def _process_data(self, msg_id, ctxt, method, args):
|
||||
"""Thread that maigcally looks for a method on the proxy
|
||||
object and calls it.
|
||||
"""
|
||||
|
||||
node_func = getattr(self.proxy, str(method))
|
||||
node_args = dict((str(k), v) for k, v in args.iteritems())
|
||||
# NOTE(vish): magic is fun!
|
||||
try:
|
||||
rval = node_func(context=ctxt, **node_args)
|
||||
if msg_id:
|
||||
# Check if the result was a generator
|
||||
if isinstance(rval, types.GeneratorType):
|
||||
for x in rval:
|
||||
msg_reply(msg_id, x, None)
|
||||
else:
|
||||
msg_reply(msg_id, rval, None)
|
||||
|
||||
# This final None tells multicall that it is done.
|
||||
msg_reply(msg_id, None, None)
|
||||
elif isinstance(rval, types.GeneratorType):
|
||||
# NOTE(vish): this iterates through the generator
|
||||
list(rval)
|
||||
except Exception as e:
|
||||
logging.exception('Exception during message handling')
|
||||
if msg_id:
|
||||
msg_reply(msg_id, None, sys.exc_info())
|
||||
return
|
||||
|
||||
|
||||
class TopicAdapterConsumer(AdapterConsumer):
|
||||
"""Consumes messages on a specific topic."""
|
||||
|
||||
exchange_type = 'topic'
|
||||
|
||||
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
||||
self.queue = topic
|
||||
self.routing_key = topic
|
||||
self.exchange = FLAGS.control_exchange
|
||||
self.durable = False
|
||||
super(TopicAdapterConsumer, self).__init__(connection=connection,
|
||||
topic=topic, proxy=proxy)
|
||||
|
||||
|
||||
class FanoutAdapterConsumer(AdapterConsumer):
|
||||
"""Consumes messages from a fanout exchange."""
|
||||
|
||||
exchange_type = 'fanout'
|
||||
|
||||
def __init__(self, connection=None, topic='broadcast', proxy=None):
|
||||
self.exchange = '%s_fanout' % topic
|
||||
self.routing_key = topic
|
||||
unique = uuid.uuid4().hex
|
||||
self.queue = '%s_fanout_%s' % (topic, unique)
|
||||
self.durable = False
|
||||
# Fanout creates unique queue names, so we should auto-remove
|
||||
# them when done, so they're not left around on restart.
|
||||
# Also, we're the only one that should be consuming. exclusive
|
||||
# implies auto_delete, so we'll just set that..
|
||||
self.exclusive = True
|
||||
LOG.info(_('Created "%(exchange)s" fanout exchange '
|
||||
'with "%(key)s" routing key'),
|
||||
dict(exchange=self.exchange, key=self.routing_key))
|
||||
super(FanoutAdapterConsumer, self).__init__(connection=connection,
|
||||
topic=topic, proxy=proxy)
|
||||
|
||||
|
||||
class ConsumerSet(object):
|
||||
"""Groups consumers to listen on together on a single connection."""
|
||||
|
||||
def __init__(self, connection, consumer_list):
|
||||
self.consumer_list = set(consumer_list)
|
||||
self.consumer_set = None
|
||||
self.enabled = True
|
||||
self.init(connection)
|
||||
|
||||
def init(self, conn):
|
||||
if not conn:
|
||||
conn = Connection.instance(new=True)
|
||||
if self.consumer_set:
|
||||
self.consumer_set.close()
|
||||
self.consumer_set = messaging.ConsumerSet(conn)
|
||||
for consumer in self.consumer_list:
|
||||
consumer.connection = conn
|
||||
# consumer.backend is set for us
|
||||
self.consumer_set.add_consumer(consumer)
|
||||
|
||||
def reconnect(self):
|
||||
self.init(None)
|
||||
|
||||
def wait(self, limit=None):
|
||||
running = True
|
||||
while running:
|
||||
it = self.consumer_set.iterconsume(limit=limit)
|
||||
if not it:
|
||||
break
|
||||
while True:
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration:
|
||||
return
|
||||
except greenlet.GreenletExit:
|
||||
running = False
|
||||
break
|
||||
except Exception as e:
|
||||
LOG.exception(_("Exception while processing consumer"))
|
||||
self.reconnect()
|
||||
# Break to outer loop
|
||||
break
|
||||
|
||||
def close(self):
|
||||
self.consumer_set.close()
|
||||
|
||||
|
||||
class Publisher(messaging.Publisher):
|
||||
"""Publisher base class."""
|
||||
pass
|
||||
|
||||
|
||||
class TopicPublisher(Publisher):
|
||||
"""Publishes messages on a specific topic."""
|
||||
|
||||
exchange_type = 'topic'
|
||||
|
||||
def __init__(self, connection=None, topic='broadcast'):
|
||||
self.routing_key = topic
|
||||
self.exchange = FLAGS.control_exchange
|
||||
self.durable = False
|
||||
super(TopicPublisher, self).__init__(connection=connection)
|
||||
|
||||
|
||||
class FanoutPublisher(Publisher):
|
||||
"""Publishes messages to a fanout exchange."""
|
||||
|
||||
exchange_type = 'fanout'
|
||||
|
||||
def __init__(self, topic, connection=None):
|
||||
self.exchange = '%s_fanout' % topic
|
||||
self.queue = '%s_fanout' % topic
|
||||
self.durable = False
|
||||
self.auto_delete = True
|
||||
LOG.info(_('Creating "%(exchange)s" fanout exchange'),
|
||||
dict(exchange=self.exchange))
|
||||
super(FanoutPublisher, self).__init__(connection=connection)
|
||||
|
||||
|
||||
class DirectConsumer(Consumer):
|
||||
"""Consumes messages directly on a channel specified by msg_id."""
|
||||
|
||||
exchange_type = 'direct'
|
||||
|
||||
def __init__(self, connection=None, msg_id=None):
|
||||
self.queue = msg_id
|
||||
self.routing_key = msg_id
|
||||
self.exchange = msg_id
|
||||
self.auto_delete = True
|
||||
self.exclusive = True
|
||||
super(DirectConsumer, self).__init__(connection=connection)
|
||||
|
||||
|
||||
class DirectPublisher(Publisher):
|
||||
"""Publishes messages directly on a channel specified by msg_id."""
|
||||
|
||||
exchange_type = 'direct'
|
||||
|
||||
def __init__(self, connection=None, msg_id=None):
|
||||
self.routing_key = msg_id
|
||||
self.exchange = msg_id
|
||||
self.auto_delete = True
|
||||
super(DirectPublisher, self).__init__(connection=connection)
|
||||
|
||||
|
||||
def msg_reply(msg_id, reply=None, failure=None):
|
||||
"""Sends a reply or an error on the channel signified by msg_id.
|
||||
|
||||
Failure should be a sys.exc_info() tuple.
|
||||
|
||||
"""
|
||||
if failure:
|
||||
message = str(failure[1])
|
||||
tb = traceback.format_exception(*failure)
|
||||
LOG.error(_("Returning exception %s to caller"), message)
|
||||
LOG.error(tb)
|
||||
failure = (failure[0].__name__, str(failure[1]), tb)
|
||||
|
||||
with ConnectionPool.item() as conn:
|
||||
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
|
||||
try:
|
||||
publisher.send({'result': reply, 'failure': failure})
|
||||
except TypeError:
|
||||
publisher.send(
|
||||
{'result': dict((k, repr(v))
|
||||
for k, v in reply.__dict__.iteritems()),
|
||||
'failure': failure})
|
||||
|
||||
publisher.close()
|
||||
|
||||
|
||||
def _unpack_context(msg):
|
||||
"""Unpack context from msg."""
|
||||
context_dict = {}
|
||||
for key in list(msg.keys()):
|
||||
# NOTE(vish): Some versions of python don't like unicode keys
|
||||
# in kwargs.
|
||||
key = str(key)
|
||||
if key.startswith('_context_'):
|
||||
value = msg.pop(key)
|
||||
context_dict[key[9:]] = value
|
||||
context_dict['msg_id'] = msg.pop('_msg_id', None)
|
||||
LOG.debug(_('unpacked context: %s'), context_dict)
|
||||
return RpcContext.from_dict(context_dict)
|
||||
|
||||
|
||||
def _pack_context(msg, context):
|
||||
"""Pack context into msg.
|
||||
|
||||
Values for message keys need to be less than 255 chars, so we pull
|
||||
context out into a bunch of separate keys. If we want to support
|
||||
more arguments in rabbit messages, we may want to do the same
|
||||
for args at some point.
|
||||
|
||||
"""
|
||||
context_d = dict([('_context_%s' % key, value)
|
||||
for (key, value) in context.to_dict().iteritems()])
|
||||
msg.update(context_d)
|
||||
|
||||
|
||||
class RpcContext(context.RequestContext):
|
||||
def __init__(self, *args, **kwargs):
|
||||
msg_id = kwargs.pop('msg_id', None)
|
||||
self.msg_id = msg_id
|
||||
super(RpcContext, self).__init__(*args, **kwargs)
|
||||
|
||||
def reply(self, *args, **kwargs):
|
||||
msg_reply(self.msg_id, *args, **kwargs)
|
||||
|
||||
|
||||
def multicall(context, topic, msg):
|
||||
"""Make a call that returns multiple times."""
|
||||
LOG.debug(_('Making asynchronous call on %s ...'), topic)
|
||||
msg_id = uuid.uuid4().hex
|
||||
msg.update({'_msg_id': msg_id})
|
||||
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
||||
_pack_context(msg, context)
|
||||
|
||||
con_conn = ConnectionPool.get()
|
||||
consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
|
||||
wait_msg = MulticallWaiter(consumer)
|
||||
consumer.register_callback(wait_msg)
|
||||
|
||||
publisher = TopicPublisher(connection=con_conn, topic=topic)
|
||||
publisher.send(msg)
|
||||
publisher.close()
|
||||
|
||||
return wait_msg
|
||||
|
||||
|
||||
class MulticallWaiter(object):
|
||||
def __init__(self, consumer):
|
||||
self._consumer = consumer
|
||||
self._results = queue.Queue()
|
||||
self._closed = False
|
||||
|
||||
def close(self):
|
||||
self._closed = True
|
||||
self._consumer.close()
|
||||
ConnectionPool.put(self._consumer.connection)
|
||||
|
||||
def __call__(self, data, message):
|
||||
"""Acks message and sets result."""
|
||||
message.ack()
|
||||
if data['failure']:
|
||||
self._results.put(RemoteError(*data['failure']))
|
||||
else:
|
||||
self._results.put(data['result'])
|
||||
|
||||
def __iter__(self):
|
||||
return self.wait()
|
||||
|
||||
def wait(self):
|
||||
while True:
|
||||
rv = None
|
||||
while rv is None and not self._closed:
|
||||
try:
|
||||
rv = self._consumer.fetch(enable_callbacks=True)
|
||||
except Exception:
|
||||
self.close()
|
||||
raise
|
||||
time.sleep(0.01)
|
||||
|
||||
result = self._results.get()
|
||||
if isinstance(result, Exception):
|
||||
self.close()
|
||||
raise result
|
||||
if result == None:
|
||||
self.close()
|
||||
raise StopIteration
|
||||
yield result
|
||||
|
||||
|
||||
def call(context, topic, msg):
|
||||
"""Sends a message on a topic and wait for a response."""
|
||||
rv = multicall(context, topic, msg)
|
||||
# NOTE(vish): return the last result from the multicall
|
||||
rv = list(rv)
|
||||
if not rv:
|
||||
return
|
||||
return rv[-1]
|
||||
|
||||
|
||||
def cast(context, topic, msg):
|
||||
"""Sends a message on a topic without waiting for a response."""
|
||||
LOG.debug(_('Making asynchronous cast on %s...'), topic)
|
||||
_pack_context(msg, context)
|
||||
with ConnectionPool.item() as conn:
|
||||
publisher = TopicPublisher(connection=conn, topic=topic)
|
||||
publisher.send(msg)
|
||||
publisher.close()
|
||||
|
||||
|
||||
def fanout_cast(context, topic, msg):
|
||||
"""Sends a message on a fanout exchange without waiting for a response."""
|
||||
LOG.debug(_('Making asynchronous fanout cast...'))
|
||||
_pack_context(msg, context)
|
||||
with ConnectionPool.item() as conn:
|
||||
publisher = FanoutPublisher(topic, connection=conn)
|
||||
publisher.send(msg)
|
||||
publisher.close()
|
||||
|
||||
|
||||
def generic_response(message_data, message):
|
||||
"""Logs a result and exits."""
|
||||
LOG.debug(_('response %s'), message_data)
|
||||
message.ack()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def send_message(topic, message, wait=True):
|
||||
"""Sends a message for testing."""
|
||||
msg_id = uuid.uuid4().hex
|
||||
message.update({'_msg_id': msg_id})
|
||||
LOG.debug(_('topic is %s'), topic)
|
||||
LOG.debug(_('message %s'), message)
|
||||
|
||||
if wait:
|
||||
consumer = messaging.Consumer(connection=Connection.instance(),
|
||||
queue=msg_id,
|
||||
exchange=msg_id,
|
||||
auto_delete=True,
|
||||
exchange_type='direct',
|
||||
routing_key=msg_id)
|
||||
consumer.register_callback(generic_response)
|
||||
|
||||
publisher = messaging.Publisher(connection=Connection.instance(),
|
||||
exchange=FLAGS.control_exchange,
|
||||
durable=False,
|
||||
exchange_type='topic',
|
||||
routing_key=topic)
|
||||
publisher.send(message)
|
||||
publisher.close()
|
||||
|
||||
if wait:
|
||||
consumer.wait()
|
||||
consumer.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# You can send messages from the command line using
|
||||
# topic and a json string representing a dictionary
|
||||
# for the method
|
||||
send_message(sys.argv[1], json.loads(sys.argv[2]))
|
23
nova/rpc_backends/common.py
Normal file
23
nova/rpc_backends/common.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from nova import exception
|
||||
from nova import log as logging
|
||||
|
||||
LOG = logging.getLogger('nova.rpc')
|
||||
|
||||
|
||||
class RemoteError(exception.Error):
|
||||
"""Signifies that a remote class has raised an exception.
|
||||
|
||||
Containes a string representation of the type of the original exception,
|
||||
the value of the original exception, and the traceback. These are
|
||||
sent to the parent as a joined string so printing the exception
|
||||
contains all of the relevent info.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, exc_type, value, traceback):
|
||||
self.exc_type = exc_type
|
||||
self.value = value
|
||||
self.traceback = traceback
|
||||
super(RemoteError, self).__init__('%s %s\n%s' % (exc_type,
|
||||
value,
|
||||
traceback))
|
@@ -39,7 +39,7 @@ class AdminApiTestCase(test.TestCase):
|
||||
super(AdminApiTestCase, self).setUp()
|
||||
self.flags(connection_type='fake')
|
||||
|
||||
self.conn = rpc.Connection.instance()
|
||||
self.conn = rpc.create_connection()
|
||||
|
||||
# set up our cloud
|
||||
self.api = admin.AdminController()
|
||||
|
@@ -50,7 +50,7 @@ class CloudTestCase(test.TestCase):
|
||||
self.flags(connection_type='fake',
|
||||
stub_network=True)
|
||||
|
||||
self.conn = rpc.Connection.instance()
|
||||
self.conn = rpc.create_connection()
|
||||
|
||||
# set up our cloud
|
||||
self.cloud = cloud.CloudController()
|
||||
@@ -269,63 +269,24 @@ class CloudTestCase(test.TestCase):
|
||||
delete = self.cloud.delete_security_group
|
||||
self.assertRaises(exception.ApiError, delete, self.context)
|
||||
|
||||
def test_authorize_security_group_ingress(self):
|
||||
def test_authorize_revoke_security_group_ingress(self):
|
||||
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||
sec = db.security_group_create(self.context, kwargs)
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||
self.assertTrue(authz(self.context, group_name=sec['name'], **kwargs))
|
||||
|
||||
def test_authorize_security_group_ingress_ip_permissions_ip_ranges(self):
|
||||
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||
sec = db.security_group_create(self.context, kwargs)
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
kwargs = {'ip_permissions': [{'to_port': 81, 'from_port': 81,
|
||||
'ip_ranges':
|
||||
{'1': {'cidr_ip': u'0.0.0.0/0'},
|
||||
'2': {'cidr_ip': u'10.10.10.10/32'}},
|
||||
'ip_protocol': u'tcp'}]}
|
||||
self.assertTrue(authz(self.context, group_name=sec['name'], **kwargs))
|
||||
|
||||
def test_authorize_security_group_ingress_ip_permissions_groups(self):
|
||||
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||
sec = db.security_group_create(self.context, kwargs)
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
kwargs = {'ip_permissions': [{'to_port': 81, 'from_port': 81,
|
||||
'ip_ranges':{'1': {'cidr_ip': u'0.0.0.0/0'},
|
||||
'2': {'cidr_ip': u'10.10.10.10/32'}},
|
||||
'groups': {'1': {'user_id': u'someuser',
|
||||
'group_name': u'somegroup1'},
|
||||
'2': {'user_id': u'someuser',
|
||||
'group_name': u'othergroup2'}},
|
||||
'ip_protocol': u'tcp'}]}
|
||||
self.assertTrue(authz(self.context, group_name=sec['name'], **kwargs))
|
||||
|
||||
def test_revoke_security_group_ingress(self):
|
||||
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||
sec = db.security_group_create(self.context, kwargs)
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||
authz(self.context, group_id=sec['id'], **kwargs)
|
||||
authz(self.context, group_name=sec['name'], **kwargs)
|
||||
revoke = self.cloud.revoke_security_group_ingress
|
||||
self.assertTrue(revoke(self.context, group_name=sec['name'], **kwargs))
|
||||
|
||||
def test_revoke_security_group_ingress_by_id(self):
|
||||
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||
sec = db.security_group_create(self.context, kwargs)
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||
authz(self.context, group_id=sec['id'], **kwargs)
|
||||
revoke = self.cloud.revoke_security_group_ingress
|
||||
self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
|
||||
|
||||
def test_authorize_security_group_ingress_by_id(self):
|
||||
def test_authorize_revoke_security_group_ingress_by_id(self):
|
||||
sec = db.security_group_create(self.context,
|
||||
{'project_id': self.context.project_id,
|
||||
'name': 'test'})
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||
self.assertTrue(authz(self.context, group_id=sec['id'], **kwargs))
|
||||
authz(self.context, group_id=sec['id'], **kwargs)
|
||||
revoke = self.cloud.revoke_security_group_ingress
|
||||
self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
|
||||
|
||||
def test_authorize_security_group_ingress_missing_protocol_params(self):
|
||||
sec = db.security_group_create(self.context,
|
||||
@@ -947,21 +908,6 @@ class CloudTestCase(test.TestCase):
|
||||
self._wait_for_running(ec2_instance_id)
|
||||
return ec2_instance_id
|
||||
|
||||
def test_rescue_unrescue_instance(self):
|
||||
instance_id = self._run_instance(
|
||||
image_id='ami-1',
|
||||
instance_type=FLAGS.default_instance_type,
|
||||
max_count=1)
|
||||
self.cloud.rescue_instance(context=self.context,
|
||||
instance_id=instance_id)
|
||||
# NOTE(vish): This currently does no validation, it simply makes sure
|
||||
# that the code path doesn't throw an exception.
|
||||
self.cloud.unrescue_instance(context=self.context,
|
||||
instance_id=instance_id)
|
||||
# TODO(soren): We need this until we can stop polling in the rpc code
|
||||
# for unit tests.
|
||||
self.cloud.terminate_instances(self.context, [instance_id])
|
||||
|
||||
def test_console_output(self):
|
||||
instance_id = self._run_instance(
|
||||
image_id='ami-1',
|
||||
|
@@ -33,11 +33,12 @@ LOG = logging.getLogger('nova.tests.rpc')
|
||||
class RpcTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(RpcTestCase, self).setUp()
|
||||
self.conn = rpc.Connection.instance(True)
|
||||
self.conn = rpc.create_connection(True)
|
||||
self.receiver = TestReceiver()
|
||||
self.consumer = rpc.TopicAdapterConsumer(connection=self.conn,
|
||||
topic='test',
|
||||
proxy=self.receiver)
|
||||
self.consumer = rpc.create_consumer(self.conn,
|
||||
'test',
|
||||
self.receiver,
|
||||
False)
|
||||
self.consumer.attach_to_eventlet()
|
||||
self.context = context.get_admin_context()
|
||||
|
||||
@@ -129,6 +130,8 @@ class RpcTestCase(test.TestCase):
|
||||
"""Calls echo in the passed queue"""
|
||||
LOG.debug(_("Nested received %(queue)s, %(value)s")
|
||||
% locals())
|
||||
# TODO: so, it will replay the context and use the same REQID?
|
||||
# that's bizarre.
|
||||
ret = rpc.call(context,
|
||||
queue,
|
||||
{"method": "echo",
|
||||
@@ -137,10 +140,11 @@ class RpcTestCase(test.TestCase):
|
||||
return value
|
||||
|
||||
nested = Nested()
|
||||
conn = rpc.Connection.instance(True)
|
||||
consumer = rpc.TopicAdapterConsumer(connection=conn,
|
||||
topic='nested',
|
||||
proxy=nested)
|
||||
conn = rpc.create_connection(True)
|
||||
consumer = rpc.create_consumer(conn,
|
||||
'nested',
|
||||
nested,
|
||||
False)
|
||||
consumer.attach_to_eventlet()
|
||||
value = 42
|
||||
result = rpc.call(self.context,
|
||||
@@ -149,47 +153,6 @@ class RpcTestCase(test.TestCase):
|
||||
"value": value}})
|
||||
self.assertEqual(value, result)
|
||||
|
||||
def test_connectionpool_single(self):
|
||||
"""Test that ConnectionPool recycles a single connection."""
|
||||
conn1 = rpc.ConnectionPool.get()
|
||||
rpc.ConnectionPool.put(conn1)
|
||||
conn2 = rpc.ConnectionPool.get()
|
||||
rpc.ConnectionPool.put(conn2)
|
||||
self.assertEqual(conn1, conn2)
|
||||
|
||||
def test_connectionpool_double(self):
|
||||
"""Test that ConnectionPool returns and reuses separate connections.
|
||||
|
||||
When called consecutively we should get separate connections and upon
|
||||
returning them those connections should be reused for future calls
|
||||
before generating a new connection.
|
||||
|
||||
"""
|
||||
conn1 = rpc.ConnectionPool.get()
|
||||
conn2 = rpc.ConnectionPool.get()
|
||||
|
||||
self.assertNotEqual(conn1, conn2)
|
||||
rpc.ConnectionPool.put(conn1)
|
||||
rpc.ConnectionPool.put(conn2)
|
||||
|
||||
conn3 = rpc.ConnectionPool.get()
|
||||
conn4 = rpc.ConnectionPool.get()
|
||||
self.assertEqual(conn1, conn3)
|
||||
self.assertEqual(conn2, conn4)
|
||||
|
||||
def test_connectionpool_limit(self):
|
||||
"""Test connection pool limit and connection uniqueness."""
|
||||
max_size = FLAGS.rpc_conn_pool_size
|
||||
conns = []
|
||||
|
||||
for i in xrange(max_size):
|
||||
conns.append(rpc.ConnectionPool.get())
|
||||
|
||||
self.assertFalse(rpc.ConnectionPool.free_items)
|
||||
self.assertEqual(rpc.ConnectionPool.current_size,
|
||||
rpc.ConnectionPool.max_size)
|
||||
self.assertEqual(len(set(conns)), max_size)
|
||||
|
||||
|
||||
class TestReceiver(object):
|
||||
"""Simple Proxy class so the consumer has methods to call.
|
||||
|
68
nova/tests/test_rpc_amqp.py
Normal file
68
nova/tests/test_rpc_amqp.py
Normal file
@@ -0,0 +1,68 @@
|
||||
from nova import context
|
||||
from nova import flags
|
||||
from nova import log as logging
|
||||
from nova import rpc
|
||||
from nova.rpc_backends import amqp
|
||||
from nova import test
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
LOG = logging.getLogger('nova.tests.rpc')
|
||||
|
||||
|
||||
class RpcAMQPTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(RpcAMQPTestCase, self).setUp()
|
||||
self.conn = rpc.create_connection(True)
|
||||
self.receiver = TestReceiver()
|
||||
self.consumer = rpc.create_consumer(self.conn,
|
||||
'test',
|
||||
self.receiver,
|
||||
False)
|
||||
self.consumer.attach_to_eventlet()
|
||||
self.context = context.get_admin_context()
|
||||
|
||||
def test_connectionpool_single(self):
|
||||
"""Test that ConnectionPool recycles a single connection."""
|
||||
conn1 = amqp.ConnectionPool.get()
|
||||
amqp.ConnectionPool.put(conn1)
|
||||
conn2 = amqp.ConnectionPool.get()
|
||||
amqp.ConnectionPool.put(conn2)
|
||||
self.assertEqual(conn1, conn2)
|
||||
|
||||
|
||||
class TestReceiver(object):
|
||||
"""Simple Proxy class so the consumer has methods to call.
|
||||
|
||||
Uses static methods because we aren't actually storing any state.
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def echo(context, value):
|
||||
"""Simply returns whatever value is sent in."""
|
||||
LOG.debug(_("Received %s"), value)
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def context(context, value):
|
||||
"""Returns dictionary version of context."""
|
||||
LOG.debug(_("Received %s"), context)
|
||||
return context.to_dict()
|
||||
|
||||
@staticmethod
|
||||
def echo_three_times(context, value):
|
||||
context.reply(value)
|
||||
context.reply(value + 1)
|
||||
context.reply(value + 2)
|
||||
|
||||
@staticmethod
|
||||
def echo_three_times_yield(context, value):
|
||||
yield value
|
||||
yield value + 1
|
||||
yield value + 2
|
||||
|
||||
@staticmethod
|
||||
def fail(context, value):
|
||||
"""Raises an exception with the value sent in."""
|
||||
raise Exception(value)
|
Reference in New Issue
Block a user