merging trunk

This commit is contained in:
Brian Waldon
2011-09-01 11:08:41 -04:00
24 changed files with 1335 additions and 400 deletions

View File

@@ -102,9 +102,8 @@ if __name__ == '__main__':
logging.setup()
begin, end = time_period(FLAGS.instance_usage_audit_period)
print "Creating usages for %s until %s" % (str(begin), str(end))
instances = db.instance_get_active_by_window(context.get_admin_context(),
begin,
end)
ctxt = context.get_admin_context()
instances = db.instance_get_active_by_window_joined(ctxt, begin, end)
print "%s instances" % len(instances)
for instance_ref in instances:
usage_info = utils.usage_from_instance(instance_ref,

View File

@@ -113,11 +113,10 @@ class AjaxConsoleProxy(object):
AjaxConsoleProxy.tokens[kwargs['token']] = \
{'args': kwargs, 'last_activity': time.time()}
conn = rpc.create_connection(new=True)
consumer = rpc.create_consumer(
conn,
FLAGS.ajax_console_proxy_topic,
TopicProxy)
self.conn = rpc.create_connection(new=True)
self.conn.create_consumer(
FLAGS.ajax_console_proxy_topic,
TopicProxy)
def delete_expired_tokens():
now = time.time()
@@ -129,7 +128,7 @@ class AjaxConsoleProxy(object):
for k in to_delete:
del AjaxConsoleProxy.tokens[k]
utils.LoopingCall(consumer.fetch, enable_callbacks=True).start(0.1)
self.conn.consume_in_thread()
utils.LoopingCall(delete_expired_tokens).start(1)
if __name__ == '__main__':
@@ -142,3 +141,4 @@ if __name__ == '__main__':
server = wsgi.Server("AJAX Console Proxy", acp, port=acp_port)
service.serve(server)
service.wait()
self.conn.close()

View File

@@ -61,7 +61,7 @@ class ApiError(Error):
super(ApiError, self).__init__(outstr)
class BuildInProgress(Error):
class RebuildRequiresActiveInstance(Error):
pass
@@ -533,6 +533,10 @@ class NoMoreFloatingIps(FloatingIpNotFound):
message = _("Zero floating ips available.")
class FloatingIpAlreadyInUse(NovaException):
message = _("Floating ip %(address)s already in use by %(fixed_ip)s.")
class NoFloatingIpsDefined(NotFound):
message = _("Zero floating ips exist.")

View File

@@ -303,8 +303,12 @@ DEFINE_bool('rabbit_use_ssl', False, 'connect over SSL')
DEFINE_string('rabbit_userid', 'guest', 'rabbit userid')
DEFINE_string('rabbit_password', 'guest', 'rabbit password')
DEFINE_string('rabbit_virtual_host', '/', 'rabbit virtual host')
DEFINE_integer('rabbit_retry_interval', 10, 'rabbit connection retry interval')
DEFINE_integer('rabbit_max_retries', 12, 'rabbit connection attempts')
DEFINE_integer('rabbit_retry_interval', 1,
'rabbit connection retry interval to start')
DEFINE_integer('rabbit_retry_backoff', 2,
'rabbit connection retry backoff in seconds')
DEFINE_integer('rabbit_max_retries', 0,
'maximum rabbit connection attempts (0=try forever)')
DEFINE_string('control_exchange', 'nova', 'the main exchange to connect to')
DEFINE_boolean('rabbit_durable_queues', False, 'use durable queues')
DEFINE_list('enabled_apis', ['ec2', 'osapi'],

View File

@@ -122,4 +122,5 @@ def notify(publisher_id, event_type, priority, payload):
driver.notify(msg)
except Exception, e:
LOG.exception(_("Problem '%(e)s' attempting to "
"send to notification system." % locals()))
"send to notification system. Payload=%(payload)s" %
locals()))

View File

@@ -23,44 +23,35 @@ from nova import flags
FLAGS = flags.FLAGS
flags.DEFINE_string('rpc_backend',
'nova.rpc.amqp',
"The messaging module to use, defaults to AMQP.")
'nova.rpc.impl_kombu',
"The messaging module to use, defaults to kombu.")
RPCIMPL = import_object(FLAGS.rpc_backend)
_RPCIMPL = None
def get_impl():
"""Delay import of rpc_backend until FLAGS are loaded."""
global _RPCIMPL
if _RPCIMPL is None:
_RPCIMPL = import_object(FLAGS.rpc_backend)
return _RPCIMPL
def create_connection(new=True):
return RPCIMPL.Connection.instance(new=True)
def create_consumer(conn, topic, proxy, fanout=False):
if fanout:
return RPCIMPL.FanoutAdapterConsumer(
connection=conn,
topic=topic,
proxy=proxy)
else:
return RPCIMPL.TopicAdapterConsumer(
connection=conn,
topic=topic,
proxy=proxy)
def create_consumer_set(conn, consumers):
return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers)
return get_impl().create_connection(new=new)
def call(context, topic, msg):
return RPCIMPL.call(context, topic, msg)
return get_impl().call(context, topic, msg)
def cast(context, topic, msg):
return RPCIMPL.cast(context, topic, msg)
return get_impl().cast(context, topic, msg)
def fanout_cast(context, topic, msg):
return RPCIMPL.fanout_cast(context, topic, msg)
return get_impl().fanout_cast(context, topic, msg)
def multicall(context, topic, msg):
return RPCIMPL.multicall(context, topic, msg)
return get_impl().multicall(context, topic, msg)

View File

@@ -1,8 +1,14 @@
from nova import exception
from nova import flags
from nova import log as logging
LOG = logging.getLogger('nova.rpc')
flags.DEFINE_integer('rpc_thread_pool_size', 1024,
'Size of RPC thread pool')
flags.DEFINE_integer('rpc_conn_pool_size', 30,
'Size of RPC connection pool')
class RemoteError(exception.Error):
"""Signifies that a remote class has raised an exception.

View File

@@ -33,6 +33,7 @@ import uuid
from carrot import connection as carrot_connection
from carrot import messaging
import eventlet
from eventlet import greenpool
from eventlet import pools
from eventlet import queue
@@ -42,21 +43,22 @@ from nova import context
from nova import exception
from nova import fakerabbit
from nova import flags
from nova import log as logging
from nova import utils
from nova.rpc.common import RemoteError, LOG
# Needed for tests
eventlet.monkey_patch()
FLAGS = flags.FLAGS
flags.DEFINE_integer('rpc_thread_pool_size', 1024,
'Size of RPC thread pool')
flags.DEFINE_integer('rpc_conn_pool_size', 30,
'Size of RPC connection pool')
class Connection(carrot_connection.BrokerConnection):
"""Connection instance object."""
def __init__(self, *args, **kwargs):
super(Connection, self).__init__(*args, **kwargs)
self._rpc_consumers = []
self._rpc_consumer_thread = None
@classmethod
def instance(cls, new=True):
"""Returns the instance."""
@@ -94,13 +96,63 @@ class Connection(carrot_connection.BrokerConnection):
pass
return cls.instance()
def close(self):
self.cancel_consumer_thread()
for consumer in self._rpc_consumers:
try:
consumer.close()
except Exception:
# ignore all errors
pass
self._rpc_consumers = []
super(Connection, self).close()
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
consumer_set = ConsumerSet(connection=self,
consumer_list=self._rpc_consumers)
def _consumer_thread():
try:
consumer_set.wait()
except greenlet.GreenletExit:
return
if self._rpc_consumer_thread is None:
self._rpc_consumer_thread = eventlet.spawn(_consumer_thread)
return self._rpc_consumer_thread
def cancel_consumer_thread(self):
"""Cancel a consumer thread"""
if self._rpc_consumer_thread is not None:
self._rpc_consumer_thread.kill()
try:
self._rpc_consumer_thread.wait()
except greenlet.GreenletExit:
pass
self._rpc_consumer_thread = None
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls methods in the proxy"""
if fanout:
consumer = FanoutAdapterConsumer(
connection=self,
topic=topic,
proxy=proxy)
else:
consumer = TopicAdapterConsumer(
connection=self,
topic=topic,
proxy=proxy)
self._rpc_consumers.append(consumer)
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
# TODO(comstud): Timeout connections not used in a while
def create(self):
LOG.debug('Creating new connection')
LOG.debug('Pool creating new connection')
return Connection.instance(new=True)
# Create a ConnectionPool to use for RPC calls. We'll order the
@@ -119,25 +171,34 @@ class Consumer(messaging.Consumer):
"""
def __init__(self, *args, **kwargs):
for i in xrange(FLAGS.rabbit_max_retries):
if i > 0:
time.sleep(FLAGS.rabbit_retry_interval)
max_retries = FLAGS.rabbit_max_retries
sleep_time = FLAGS.rabbit_retry_interval
tries = 0
while True:
tries += 1
if tries > 1:
time.sleep(sleep_time)
# backoff for next retry attempt.. if there is one
sleep_time += FLAGS.rabbit_retry_backoff
if sleep_time > 30:
sleep_time = 30
try:
super(Consumer, self).__init__(*args, **kwargs)
self.failed_connection = False
break
except Exception as e: # Catching all because carrot sucks
self.failed_connection = True
if max_retries > 0 and tries == max_retries:
break
fl_host = FLAGS.rabbit_host
fl_port = FLAGS.rabbit_port
fl_intv = FLAGS.rabbit_retry_interval
fl_intv = sleep_time
LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
' unreachable: %(e)s. Trying again in %(fl_intv)d'
' seconds.') % locals())
self.failed_connection = True
if self.failed_connection:
LOG.error(_('Unable to connect to AMQP server '
'after %d tries. Shutting down.'),
FLAGS.rabbit_max_retries)
'after %(tries)d tries. Shutting down.') % locals())
sys.exit(1)
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
@@ -166,12 +227,6 @@ class Consumer(messaging.Consumer):
LOG.exception(_('Failed to fetch message from queue: %s' % e))
self.failed_connection = True
def attach_to_eventlet(self):
"""Only needed for unit tests!"""
timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
timer.start(0.1)
return timer
class AdapterConsumer(Consumer):
"""Calls methods on a proxy object based on method and args."""
@@ -242,7 +297,7 @@ class AdapterConsumer(Consumer):
# NOTE(vish): this iterates through the generator
list(rval)
except Exception as e:
logging.exception('Exception during message handling')
LOG.exception('Exception during message handling')
if msg_id:
msg_reply(msg_id, None, sys.exc_info())
return
@@ -520,6 +575,11 @@ class MulticallWaiter(object):
yield result
def create_connection(new=True):
"""Create a connection"""
return Connection.instance(new=new)
def call(context, topic, msg):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)

781
nova/rpc/impl_kombu.py Normal file
View File

@@ -0,0 +1,781 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import kombu
import kombu.entity
import kombu.messaging
import kombu.connection
import itertools
import sys
import time
import traceback
import types
import uuid
import eventlet
from eventlet import greenpool
from eventlet import pools
import greenlet
from nova import context
from nova import exception
from nova import flags
from nova.rpc.common import RemoteError, LOG
# Needed for tests
eventlet.monkey_patch()
FLAGS = flags.FLAGS
class ConsumerBase(object):
"""Consumer base class."""
def __init__(self, channel, callback, tag, **kwargs):
"""Declare a queue on an amqp channel.
'channel' is the amqp channel to use
'callback' is the callback to call when messages are received
'tag' is a unique ID for the consumer on the channel
queue name, exchange name, and other kombu options are
passed in here as a dictionary.
"""
self.callback = callback
self.tag = str(tag)
self.kwargs = kwargs
self.queue = None
self.reconnect(channel)
def reconnect(self, channel):
"""Re-declare the queue after a rabbit reconnect"""
self.channel = channel
self.kwargs['channel'] = channel
self.queue = kombu.entity.Queue(**self.kwargs)
self.queue.declare()
def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the
Connection.iterconsume() iterator will process the messages,
calling the appropriate callback.
If a callback is specified in kwargs, use that. Otherwise,
use the callback passed during __init__()
If kwargs['nowait'] is True, then this call will block until
a message is read.
Messages will automatically be acked if the callback doesn't
raise an exception
"""
options = {'consumer_tag': self.tag}
options['nowait'] = kwargs.get('nowait', False)
callback = kwargs.get('callback', self.callback)
if not callback:
raise ValueError("No callback defined")
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
callback(message.payload)
message.ack()
self.queue.consume(*args, callback=_callback, **options)
def cancel(self):
"""Cancel the consuming from the queue, if it has started"""
try:
self.queue.cancel(self.tag)
except KeyError, e:
# NOTE(comstud): Kludge to get around a amqplib bug
if str(e) != "u'%s'" % self.tag:
raise
self.queue = None
class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'"""
def __init__(self, channel, msg_id, callback, tag, **kwargs):
"""Init a 'direct' queue.
'channel' is the amqp channel to use
'msg_id' is the msg_id to listen on
'callback' is the callback to call when messages are received
'tag' is a unique ID for the consumer on the channel
Other kombu options may be passed
"""
# Default options
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
exchange = kombu.entity.Exchange(
name=msg_id,
type='direct',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(DirectConsumer, self).__init__(
channel,
callback,
tag,
name=msg_id,
exchange=exchange,
routing_key=msg_id,
**options)
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
def __init__(self, channel, topic, callback, tag, **kwargs):
"""Init a 'topic' queue.
'channel' is the amqp channel to use
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
'tag' is a unique ID for the consumer on the channel
Other kombu options may be passed
"""
# Default options
options = {'durable': FLAGS.rabbit_durable_queues,
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(
name=FLAGS.control_exchange,
type='topic',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(TopicConsumer, self).__init__(
channel,
callback,
tag,
name=topic,
exchange=exchange,
routing_key=topic,
**options)
class FanoutConsumer(ConsumerBase):
"""Consumer class for 'fanout'"""
def __init__(self, channel, topic, callback, tag, **kwargs):
"""Init a 'fanout' queue.
'channel' is the amqp channel to use
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
'tag' is a unique ID for the consumer on the channel
Other kombu options may be passed
"""
unique = uuid.uuid4().hex
exchange_name = '%s_fanout' % topic
queue_name = '%s_fanout_%s' % (topic, unique)
# Default options
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
exchange = kombu.entity.Exchange(
name=exchange_name,
type='fanout',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(FanoutConsumer, self).__init__(
channel,
callback,
tag,
name=queue_name,
exchange=exchange,
routing_key=topic,
**options)
class Publisher(object):
"""Base Publisher class"""
def __init__(self, channel, exchange_name, routing_key, **kwargs):
"""Init the Publisher class with the exchange_name, routing_key,
and other options
"""
self.exchange_name = exchange_name
self.routing_key = routing_key
self.kwargs = kwargs
self.reconnect(channel)
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection"""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
channel=channel, routing_key=self.routing_key)
def send(self, msg):
"""Send a message"""
self.producer.publish(msg)
class DirectPublisher(Publisher):
"""Publisher class for 'direct'"""
def __init__(self, channel, msg_id, **kwargs):
"""init a 'direct' publisher.
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel,
msg_id,
msg_id,
type='direct',
**options)
class TopicPublisher(Publisher):
"""Publisher class for 'topic'"""
def __init__(self, channel, topic, **kwargs):
"""init a 'topic' publisher.
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': FLAGS.rabbit_durable_queues,
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
super(TopicPublisher, self).__init__(channel,
FLAGS.control_exchange,
topic,
type='topic',
**options)
class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'"""
def __init__(self, channel, topic, **kwargs):
"""init a 'fanout' publisher.
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
super(FanoutPublisher, self).__init__(channel,
'%s_fanout' % topic,
None,
type='fanout',
**options)
class Connection(object):
"""Connection object."""
def __init__(self):
self.consumers = []
self.consumer_thread = None
self.max_retries = FLAGS.rabbit_max_retries
# Try forever?
if self.max_retries <= 0:
self.max_retries = None
self.interval_start = FLAGS.rabbit_retry_interval
self.interval_stepping = FLAGS.rabbit_retry_backoff
# max retry-interval = 30 seconds
self.interval_max = 30
self.memory_transport = False
self.params = dict(hostname=FLAGS.rabbit_host,
port=FLAGS.rabbit_port,
userid=FLAGS.rabbit_userid,
password=FLAGS.rabbit_password,
virtual_host=FLAGS.rabbit_virtual_host)
if FLAGS.fake_rabbit:
self.params['transport'] = 'memory'
self.memory_transport = True
else:
self.memory_transport = False
self.connection = None
self.reconnect()
def reconnect(self):
"""Handles reconnecting and re-estblishing queues"""
if self.connection:
try:
self.connection.close()
except self.connection.connection_errors:
pass
time.sleep(1)
self.connection = kombu.connection.BrokerConnection(**self.params)
if self.memory_transport:
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
self.consumer_num = itertools.count(1)
try:
self.connection.ensure_connection(errback=self.connect_error,
max_retries=self.max_retries,
interval_start=self.interval_start,
interval_step=self.interval_stepping,
interval_max=self.interval_max)
except self.connection.connection_errors, e:
# We should only get here if max_retries is set. We'll go
# ahead and exit in this case.
err_str = str(e)
max_retries = self.max_retries
LOG.error(_('Unable to connect to AMQP server '
'after %(max_retries)d tries: %(err_str)s') % locals())
sys.exit(1)
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
self.params))
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
if self.memory_transport:
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
if self.consumers:
LOG.debug(_("Re-established AMQP queues"))
def get_channel(self):
"""Convenience call for bin/clear_rabbit_queues"""
return self.channel
def connect_error(self, exc, interval):
"""Callback when there are connection re-tries by kombu"""
info = self.params.copy()
info['intv'] = interval
info['e'] = exc
LOG.error(_('AMQP server on %(hostname)s:%(port)d is'
' unreachable: %(e)s. Trying again in %(intv)d'
' seconds.') % info)
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
if self.memory_transport:
self.channel._new_queue('ae.undeliver')
self.consumers = []
def declare_consumer(self, consumer_cls, topic, callback):
"""Create a Consumer using the class that was passed in and
add it to our list of consumers
"""
consumer = consumer_cls(self.channel, topic, callback,
self.consumer_num.next())
self.consumers.append(consumer)
return consumer
def iterconsume(self, limit=None):
"""Return an iterator that will consume from all queues/consumers"""
while True:
try:
queues_head = self.consumers[:-1]
queues_tail = self.consumers[-1]
for queue in queues_head:
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
for iteration in itertools.count(0):
if limit and iteration >= limit:
raise StopIteration
yield self.connection.drain_events()
except self.connection.connection_errors, e:
LOG.exception(_('Failed to consume message from queue: '
'%s' % str(e)))
self.reconnect()
def cancel_consumer_thread(self):
"""Cancel a consumer thread"""
if self.consumer_thread is not None:
self.consumer_thread.kill()
try:
self.consumer_thread.wait()
except greenlet.GreenletExit:
pass
self.consumer_thread = None
def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
while True:
publisher = None
try:
publisher = cls(self.channel, topic)
publisher.send(msg)
return
except self.connection.connection_errors, e:
LOG.exception(_('Failed to publish message %s' % str(e)))
try:
self.reconnect()
if publisher:
publisher.reconnect(self.channel)
except self.connection.connection_errors, e:
pass
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
In nova's use, this is generally a msg_id queue used for
responses for call/multicall
"""
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None):
"""Create a 'topic' consumer."""
self.declare_consumer(TopicConsumer, topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
self.declare_consumer(FanoutConsumer, topic, callback)
def direct_send(self, msg_id, msg):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg):
"""Send a 'topic' message"""
self.publisher_send(TopicPublisher, topic, msg)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
self.publisher_send(FanoutPublisher, topic, msg)
def consume(self, limit=None):
"""Consume from all queues/consumers"""
it = self.iterconsume(limit=limit)
while True:
try:
it.next()
except StopIteration:
return
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
def _consumer_thread():
try:
self.consume()
except greenlet.GreenletExit:
return
if self.consumer_thread is None:
self.consumer_thread = eventlet.spawn(_consumer_thread)
return self.consumer_thread
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
self.declare_fanout_consumer(topic, ProxyCallback(proxy))
else:
self.declare_topic_consumer(topic, ProxyCallback(proxy))
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
# TODO(comstud): Timeout connections not used in a while
def create(self):
LOG.debug('Pool creating new connection')
return Connection()
# Create a ConnectionPool to use for RPC calls. We'll order the
# pool as a stack (LIFO), so that we can potentially loop through and
# timeout old unused connections at some point
ConnectionPool = Pool(
max_size=FLAGS.rpc_conn_pool_size,
order_as_stack=True)
class ConnectionContext(object):
"""The class that is actually returned to the caller of
create_connection(). This is a essentially a wrapper around
Connection that supports 'with' and can return a new Connection or
one from a pool. It will also catch when an instance of this class
is to be deleted so that we can return Connections to the pool on
exceptions and so forth without making the caller be responsible for
catching all exceptions and making sure to return a connection to
the pool.
"""
def __init__(self, pooled=True):
"""Create a new connection, or get one from the pool"""
self.connection = None
if pooled:
self.connection = ConnectionPool.get()
else:
self.connection = Connection()
self.pooled = pooled
def __enter__(self):
"""with ConnectionContext() should return self"""
return self
def _done(self):
"""If the connection came from a pool, clean it up and put it back.
If it did not come from a pool, close it.
"""
if self.connection:
if self.pooled:
# Reset the connection so it's ready for the next caller
# to grab from the pool
self.connection.reset()
ConnectionPool.put(self.connection)
else:
try:
self.connection.close()
except Exception:
# There's apparently a bug in kombu 'memory' transport
# which causes an assert failure.
# But, we probably want to ignore all exceptions when
# trying to close a connection, anyway...
pass
self.connection = None
def __exit__(self, t, v, tb):
"""end of 'with' statement. We're done here."""
self._done()
def __del__(self):
"""Caller is done with this connection. Make sure we cleaned up."""
self._done()
def close(self):
"""Caller is done with this connection."""
self._done()
def __getattr__(self, key):
"""Proxy all other calls to the Connection instance"""
if self.connection:
return getattr(self.connection, key)
else:
raise exception.InvalidRPCConnectionReuse()
class ProxyCallback(object):
"""Calls methods on a proxy object based on method and args."""
def __init__(self, proxy):
self.proxy = proxy
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
Parses the message for validity and fires off a thread to call the
proxy object method.
Message data should be a dictionary with two keys:
method: string representing the method to call
args: dictionary of arg: value
Example: {'method': 'echo', 'args': {'value': 42}}
"""
LOG.debug(_('received %s') % message_data)
ctxt = _unpack_context(message_data)
method = message_data.get('method')
args = message_data.get('args', {})
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data)
return
self.pool.spawn_n(self._process_data, ctxt, method, args)
@exception.wrap_exception()
def _process_data(self, ctxt, method, args):
"""Thread that maigcally looks for a method on the proxy
object and calls it.
"""
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
# NOTE(vish): magic is fun!
try:
rval = node_func(context=ctxt, **node_args)
# Check if the result was a generator
if isinstance(rval, types.GeneratorType):
for x in rval:
ctxt.reply(x, None)
else:
ctxt.reply(rval, None)
# This final None tells multicall that it is done.
ctxt.reply(None, None)
except Exception as e:
LOG.exception('Exception during message handling')
ctxt.reply(None, sys.exc_info())
return
def _unpack_context(msg):
"""Unpack context from msg."""
context_dict = {}
for key in list(msg.keys()):
# NOTE(vish): Some versions of python don't like unicode keys
# in kwargs.
key = str(key)
if key.startswith('_context_'):
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
LOG.debug(_('unpacked context: %s'), context_dict)
return RpcContext.from_dict(context_dict)
def _pack_context(msg, context):
"""Pack context into msg.
Values for message keys need to be less than 255 chars, so we pull
context out into a bunch of separate keys. If we want to support
more arguments in rabbit messages, we may want to do the same
for args at some point.
"""
context_d = dict([('_context_%s' % key, value)
for (key, value) in context.to_dict().iteritems()])
msg.update(context_d)
class RpcContext(context.RequestContext):
"""Context that supports replying to a rpc.call"""
def __init__(self, *args, **kwargs):
msg_id = kwargs.pop('msg_id', None)
self.msg_id = msg_id
super(RpcContext, self).__init__(*args, **kwargs)
def reply(self, *args, **kwargs):
if self.msg_id:
msg_reply(self.msg_id, *args, **kwargs)
class MulticallWaiter(object):
def __init__(self, connection):
self._connection = connection
self._iterator = connection.iterconsume()
self._result = None
self._done = False
def done(self):
self._done = True
self._connection.close()
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
if data['failure']:
self._result = RemoteError(*data['failure'])
else:
self._result = data['result']
def __iter__(self):
"""Return a result until we get a 'None' response from consumer"""
if self._done:
raise StopIteration
while True:
self._iterator.next()
result = self._result
if isinstance(result, Exception):
self.done()
raise result
if result == None:
self.done()
raise StopIteration
yield result
def create_connection(new=True):
"""Create a connection"""
return ConnectionContext(pooled=not new)
def multicall(context, topic, msg):
"""Make a call that returns multiple times."""
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
LOG.debug(_('Making asynchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
_pack_context(msg, context)
conn = ConnectionContext()
wait_msg = MulticallWaiter(conn)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg)
return wait_msg
def call(context, topic, msg):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
return
return rv[-1]
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
_pack_context(msg, context)
with ConnectionContext() as conn:
conn.topic_send(topic, msg)
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
_pack_context(msg, context)
with ConnectionContext() as conn:
conn.fanout_send(topic, msg)
def msg_reply(msg_id, reply=None, failure=None):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
"""
with ConnectionContext() as conn:
if failure:
message = str(failure[1])
tb = traceback.format_exception(*failure)
LOG.error(_("Returning exception %s to caller"), message)
LOG.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
try:
msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
conn.direct_send(msg_id, msg)

View File

@@ -30,6 +30,7 @@ from nova import log as logging
from nova import rpc
from nova import utils
from nova.compute import power_state
from nova.compute import vm_states
from nova.api.ec2 import ec2utils
@@ -104,10 +105,8 @@ class Scheduler(object):
dest, block_migration)
# Changing instance_state.
db.instance_set_state(context,
instance_id,
power_state.PAUSED,
'migrating')
values = {"vm_state": vm_states.MIGRATING}
db.instance_update(context, instance_id, values)
# Changing volume state
for volume_ref in instance_ref['volumes']:
@@ -129,8 +128,7 @@ class Scheduler(object):
"""
# Checking instance is running.
if (power_state.RUNNING != instance_ref['state'] or \
'running' != instance_ref['state_description']):
if instance_ref['power_state'] != power_state.RUNNING:
instance_id = ec2utils.id_to_ec2_id(instance_ref['id'])
raise exception.InstanceNotRunning(instance_id=instance_id)

View File

@@ -153,26 +153,15 @@ class Service(object):
self.topic)
# Share this same connection for these Consumers
consumer_all = rpc.create_consumer(self.conn, self.topic, self,
fanout=False)
self.conn.create_consumer(self.topic, self, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
consumer_node = rpc.create_consumer(self.conn, node_topic, self,
fanout=False)
self.conn.create_consumer(node_topic, self, fanout=False)
fanout = rpc.create_consumer(self.conn, self.topic, self, fanout=True)
self.conn.create_consumer(self.topic, self, fanout=True)
consumers = [consumer_all, consumer_node, fanout]
consumer_set = rpc.create_consumer_set(self.conn, consumers)
# Wait forever, processing these consumers
def _wait():
try:
consumer_set.wait()
finally:
consumer_set.close()
self.consumer_set_thread = eventlet.spawn(_wait)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
if self.report_interval:
pulse = utils.LoopingCall(self.report_state)
@@ -237,10 +226,11 @@ class Service(object):
logging.warn(_('Service killed that has no database entry'))
def stop(self):
self.consumer_set_thread.kill()
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.consumer_set_thread.wait()
except greenlet.GreenletExit:
self.conn.close()
except Exception:
pass
for x in self.timers:
try:

View File

@@ -40,6 +40,7 @@ from nova.scheduler import driver
from nova.scheduler import manager
from nova.scheduler import multi
from nova.compute import power_state
from nova.compute import vm_states
FLAGS = flags.FLAGS
@@ -94,6 +95,9 @@ class SchedulerTestCase(test.TestCase):
inst['vcpus'] = kwargs.get('vcpus', 1)
inst['memory_mb'] = kwargs.get('memory_mb', 10)
inst['local_gb'] = kwargs.get('local_gb', 20)
inst['vm_state'] = kwargs.get('vm_state', vm_states.ACTIVE)
inst['power_state'] = kwargs.get('power_state', power_state.RUNNING)
inst['task_state'] = kwargs.get('task_state', None)
return db.instance_create(ctxt, inst)
def test_fallback(self):
@@ -271,8 +275,9 @@ class SimpleDriverTestCase(test.TestCase):
inst['memory_mb'] = kwargs.get('memory_mb', 20)
inst['local_gb'] = kwargs.get('local_gb', 30)
inst['launched_on'] = kwargs.get('launghed_on', 'dummy')
inst['state_description'] = kwargs.get('state_description', 'running')
inst['state'] = kwargs.get('state', power_state.RUNNING)
inst['vm_state'] = kwargs.get('vm_state', vm_states.ACTIVE)
inst['task_state'] = kwargs.get('task_state', None)
inst['power_state'] = kwargs.get('power_state', power_state.RUNNING)
return db.instance_create(self.context, inst)['id']
def _create_volume(self):
@@ -664,14 +669,14 @@ class SimpleDriverTestCase(test.TestCase):
block_migration=False)
i_ref = db.instance_get(self.context, instance_id)
self.assertTrue(i_ref['state_description'] == 'migrating')
self.assertTrue(i_ref['vm_state'] == vm_states.MIGRATING)
db.instance_destroy(self.context, instance_id)
db.volume_destroy(self.context, v_ref['id'])
def test_live_migration_src_check_instance_not_running(self):
"""The instance given by instance_id is not running."""
instance_id = self._create_instance(state_description='migrating')
instance_id = self._create_instance(power_state=power_state.NOSTATE)
i_ref = db.instance_get(self.context, instance_id)
try:

View File

@@ -38,8 +38,6 @@ class AdminApiTestCase(test.TestCase):
super(AdminApiTestCase, self).setUp()
self.flags(connection_type='fake')
self.conn = rpc.create_connection()
# set up our cloud
self.api = admin.AdminController()

View File

@@ -38,6 +38,7 @@ from nova import test
from nova import utils
from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.compute import vm_states
from nova.image import fake
@@ -51,8 +52,6 @@ class CloudTestCase(test.TestCase):
self.flags(connection_type='fake',
stub_network=True)
self.conn = rpc.create_connection()
# set up our cloud
self.cloud = cloud.CloudController()
@@ -494,8 +493,11 @@ class CloudTestCase(test.TestCase):
inst2 = db.instance_create(self.context, args2)
db.instance_destroy(self.context, inst1.id)
result = self.cloud.describe_instances(self.context)
result = result['reservationSet'][0]['instancesSet']
self.assertEqual(result[0]['instanceId'],
result1 = result['reservationSet'][0]['instancesSet']
self.assertEqual(result1[0]['instanceId'],
ec2utils.id_to_ec2_id(inst1.id))
result2 = result['reservationSet'][1]['instancesSet']
self.assertEqual(result2[0]['instanceId'],
ec2utils.id_to_ec2_id(inst2.id))
def _block_device_mapping_create(self, instance_id, mappings):
@@ -1163,7 +1165,7 @@ class CloudTestCase(test.TestCase):
self.compute = self.start_service('compute')
def _wait_for_state(self, ctxt, instance_id, predicate):
"""Wait for an stopping instance to be a given state"""
"""Wait for a stopped instance to be a given state"""
id = ec2utils.ec2_id_to_id(instance_id)
while True:
info = self.cloud.compute_api.get(context=ctxt, instance_id=id)
@@ -1174,12 +1176,16 @@ class CloudTestCase(test.TestCase):
def _wait_for_running(self, instance_id):
def is_running(info):
return info['state_description'] == 'running'
vm_state = info["vm_state"]
task_state = info["task_state"]
return vm_state == vm_states.ACTIVE and task_state == None
self._wait_for_state(self.context, instance_id, is_running)
def _wait_for_stopped(self, instance_id):
def is_stopped(info):
return info['state_description'] == 'stopped'
vm_state = info["vm_state"]
task_state = info["task_state"]
return vm_state == vm_states.STOPPED and task_state == None
self._wait_for_state(self.context, instance_id, is_stopped)
def _wait_for_terminate(self, instance_id):
@@ -1562,7 +1568,7 @@ class CloudTestCase(test.TestCase):
'id': 0,
'root_device_name': '/dev/sdh',
'security_groups': [{'name': 'fake0'}, {'name': 'fake1'}],
'state_description': 'stopping',
'vm_state': vm_states.STOPPED,
'instance_type': {'name': 'fake_type'},
'kernel_id': 1,
'ramdisk_id': 2,
@@ -1606,7 +1612,7 @@ class CloudTestCase(test.TestCase):
self.assertEqual(groupSet, expected_groupSet)
self.assertEqual(get_attribute('instanceInitiatedShutdownBehavior'),
{'instance_id': 'i-12345678',
'instanceInitiatedShutdownBehavior': 'stop'})
'instanceInitiatedShutdownBehavior': 'stopped'})
self.assertEqual(get_attribute('instanceType'),
{'instance_id': 'i-12345678',
'instanceType': 'fake_type'})

View File

@@ -24,6 +24,7 @@ from nova import compute
from nova.compute import instance_types
from nova.compute import manager as compute_manager
from nova.compute import power_state
from nova.compute import vm_states
from nova import context
from nova import db
from nova.db.sqlalchemy import models
@@ -763,8 +764,8 @@ class ComputeTestCase(test.TestCase):
'block_migration': False,
'disk': None}}).\
AndRaise(rpc.RemoteError('', '', ''))
dbmock.instance_update(c, i_ref['id'], {'state_description': 'running',
'state': power_state.RUNNING,
dbmock.instance_update(c, i_ref['id'], {'vm_state': vm_states.ACTIVE,
'task_state': None,
'host': i_ref['host']})
for v in i_ref['volumes']:
dbmock.volume_update(c, v['id'], {'status': 'in-use'})
@@ -795,8 +796,8 @@ class ComputeTestCase(test.TestCase):
'block_migration': False,
'disk': None}}).\
AndRaise(rpc.RemoteError('', '', ''))
dbmock.instance_update(c, i_ref['id'], {'state_description': 'running',
'state': power_state.RUNNING,
dbmock.instance_update(c, i_ref['id'], {'vm_state': vm_states.ACTIVE,
'task_state': None,
'host': i_ref['host']})
self.compute.db = dbmock
@@ -841,8 +842,8 @@ class ComputeTestCase(test.TestCase):
c = context.get_admin_context()
instance_id = self._create_instance()
i_ref = db.instance_get(c, instance_id)
db.instance_update(c, i_ref['id'], {'state_description': 'migrating',
'state': power_state.PAUSED})
db.instance_update(c, i_ref['id'], {'vm_state': vm_states.MIGRATING,
'power_state': power_state.PAUSED})
v_ref = db.volume_create(c, {'size': 1, 'instance_id': instance_id})
fix_addr = db.fixed_ip_create(c, {'address': '1.1.1.1',
'instance_id': instance_id})
@@ -903,7 +904,7 @@ class ComputeTestCase(test.TestCase):
instances = db.instance_get_all(context.get_admin_context())
LOG.info(_("After force-killing instances: %s"), instances)
self.assertEqual(len(instances), 1)
self.assertEqual(power_state.SHUTOFF, instances[0]['state'])
self.assertEqual(power_state.NOSTATE, instances[0]['power_state'])
def test_get_all_by_name_regexp(self):
"""Test searching instances by name (display_name)"""
@@ -1323,25 +1324,28 @@ class ComputeTestCase(test.TestCase):
"""Test searching instances by state"""
c = context.get_admin_context()
instance_id1 = self._create_instance({'state': power_state.SHUTDOWN})
instance_id1 = self._create_instance({
'power_state': power_state.SHUTDOWN,
})
instance_id2 = self._create_instance({
'id': 2,
'state': power_state.RUNNING})
'id': 2,
'power_state': power_state.RUNNING,
})
instance_id3 = self._create_instance({
'id': 10,
'state': power_state.RUNNING})
'id': 10,
'power_state': power_state.RUNNING,
})
instances = self.compute_api.get_all(c,
search_opts={'state': power_state.SUSPENDED})
search_opts={'power_state': power_state.SUSPENDED})
self.assertEqual(len(instances), 0)
instances = self.compute_api.get_all(c,
search_opts={'state': power_state.SHUTDOWN})
search_opts={'power_state': power_state.SHUTDOWN})
self.assertEqual(len(instances), 1)
self.assertEqual(instances[0].id, instance_id1)
instances = self.compute_api.get_all(c,
search_opts={'state': power_state.RUNNING})
search_opts={'power_state': power_state.RUNNING})
self.assertEqual(len(instances), 2)
instance_ids = [instance.id for instance in instances]
self.assertTrue(instance_id2 in instance_ids)
@@ -1349,7 +1353,7 @@ class ComputeTestCase(test.TestCase):
# Test passing a list as search arg
instances = self.compute_api.get_all(c,
search_opts={'state': [power_state.SHUTDOWN,
search_opts={'power_state': [power_state.SHUTDOWN,
power_state.RUNNING]})
self.assertEqual(len(instances), 3)

View File

@@ -91,5 +91,7 @@ class DbApiTestCase(test.TestCase):
inst2 = db.instance_create(self.context, args2)
db.instance_destroy(self.context, inst1.id)
result = db.instance_get_all_by_filters(self.context.elevated(), {})
self.assertEqual(1, len(result))
self.assertEqual(2, len(result))
self.assertEqual(result[0].id, inst2.id)
self.assertEqual(result[1].id, inst1.id)
self.assertTrue(result[1].deleted)

View File

@@ -371,6 +371,22 @@ class VlanNetworkTestCase(test.TestCase):
self.mox.ReplayAll()
self.network.validate_networks(self.context, requested_networks)
def test_cant_associate_associated_floating_ip(self):
ctxt = context.RequestContext('testuser', 'testproject',
is_admin=False)
def fake_floating_ip_get_by_address(context, address):
return {'address': '10.10.10.10',
'fixed_ip': {'address': '10.0.0.1'}}
self.stubs.Set(self.network.db, 'floating_ip_get_by_address',
fake_floating_ip_get_by_address)
self.assertRaises(exception.FloatingIpAlreadyInUse,
self.network.associate_floating_ip,
ctxt,
mox.IgnoreArg(),
mox.IgnoreArg())
class CommonNetworkTestCase(test.TestCase):

View File

@@ -22,168 +22,16 @@ Unit Tests for remote procedure calls using queue
from nova import context
from nova import log as logging
from nova import rpc
from nova import test
from nova.tests import test_rpc_common
LOG = logging.getLogger('nova.tests.rpc')
class RpcTestCase(test.TestCase):
class RpcTestCase(test_rpc_common._BaseRpcTestCase):
def setUp(self):
self.rpc = rpc
super(RpcTestCase, self).setUp()
self.conn = rpc.create_connection(True)
self.receiver = TestReceiver()
self.consumer = rpc.create_consumer(self.conn,
'test',
self.receiver,
False)
self.consumer.attach_to_eventlet()
self.context = context.get_admin_context()
def test_call_succeed(self):
value = 42
result = rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
def test_call_succeed_despite_multiple_returns(self):
value = 42
result = rpc.call(self.context, 'test', {"method": "echo_three_times",
"args": {"value": value}})
self.assertEqual(value + 2, result)
def test_call_succeed_despite_multiple_returns_yield(self):
value = 42
result = rpc.call(self.context, 'test',
{"method": "echo_three_times_yield",
"args": {"value": value}})
self.assertEqual(value + 2, result)
def test_multicall_succeed_once(self):
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo",
"args": {"value": value}})
for i, x in enumerate(result):
if i > 0:
self.fail('should only receive one response')
self.assertEqual(value + i, x)
def test_multicall_succeed_three_times(self):
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo_three_times",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(value + i, x)
def test_multicall_succeed_three_times_yield(self):
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo_three_times_yield",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(value + i, x)
def test_context_passed(self):
"""Makes sure a context is passed through rpc call."""
value = 42
result = rpc.call(self.context,
'test', {"method": "context",
"args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
"""Test that exception gets passed back properly.
rpc.call returns a RemoteError object. The value of the
exception is converted to a string, so we convert it back
to an int in the test.
"""
value = 42
self.assertRaises(rpc.RemoteError,
rpc.call,
self.context,
'test',
{"method": "fail",
"args": {"value": value}})
try:
rpc.call(self.context,
'test',
{"method": "fail",
"args": {"value": value}})
self.fail("should have thrown rpc.RemoteError")
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
def test_nested_calls(self):
"""Test that we can do an rpc.call inside another call."""
class Nested(object):
@staticmethod
def echo(context, queue, value):
"""Calls echo in the passed queue"""
LOG.debug(_("Nested received %(queue)s, %(value)s")
% locals())
# TODO: so, it will replay the context and use the same REQID?
# that's bizarre.
ret = rpc.call(context,
queue,
{"method": "echo",
"args": {"value": value}})
LOG.debug(_("Nested return %s"), ret)
return value
nested = Nested()
conn = rpc.create_connection(True)
consumer = rpc.create_consumer(conn,
'nested',
nested,
False)
consumer.attach_to_eventlet()
value = 42
result = rpc.call(self.context,
'nested', {"method": "echo",
"args": {"queue": "test",
"value": value}})
self.assertEqual(value, result)
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
Uses static methods because we aren't actually storing any state.
"""
@staticmethod
def echo(context, value):
"""Simply returns whatever value is sent in."""
LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context."""
LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
def echo_three_times(context, value):
context.reply(value)
context.reply(value + 1)
context.reply(value + 2)
@staticmethod
def echo_three_times_yield(context, value):
yield value
yield value + 1
yield value + 2
@staticmethod
def fail(context, value):
"""Raises an exception with the value sent in."""
raise Exception(value)
def tearDown(self):
super(RpcTestCase, self).tearDown()

View File

@@ -1,88 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Openstack, LLC.
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For RPC AMQP.
"""
from nova import context
from nova import log as logging
from nova import rpc
from nova.rpc import amqp
from nova import test
LOG = logging.getLogger('nova.tests.rpc')
class RpcAMQPTestCase(test.TestCase):
def setUp(self):
super(RpcAMQPTestCase, self).setUp()
self.conn = rpc.create_connection(True)
self.receiver = TestReceiver()
self.consumer = rpc.create_consumer(self.conn,
'test',
self.receiver,
False)
self.consumer.attach_to_eventlet()
self.context = context.get_admin_context()
def test_connectionpool_single(self):
"""Test that ConnectionPool recycles a single connection."""
conn1 = amqp.ConnectionPool.get()
amqp.ConnectionPool.put(conn1)
conn2 = amqp.ConnectionPool.get()
amqp.ConnectionPool.put(conn2)
self.assertEqual(conn1, conn2)
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
Uses static methods because we aren't actually storing any state.
"""
@staticmethod
def echo(context, value):
"""Simply returns whatever value is sent in."""
LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context."""
LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
def echo_three_times(context, value):
context.reply(value)
context.reply(value + 1)
context.reply(value + 2)
@staticmethod
def echo_three_times_yield(context, value):
yield value
yield value + 1
yield value + 2
@staticmethod
def fail(context, value):
"""Raises an exception with the value sent in."""
raise Exception(value)

View File

@@ -0,0 +1,45 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using carrot
"""
from nova import context
from nova import log as logging
from nova.rpc import impl_carrot
from nova.tests import test_rpc_common
LOG = logging.getLogger('nova.tests.rpc')
class RpcCarrotTestCase(test_rpc_common._BaseRpcTestCase):
def setUp(self):
self.rpc = impl_carrot
super(RpcCarrotTestCase, self).setUp()
def tearDown(self):
super(RpcCarrotTestCase, self).tearDown()
def test_connectionpool_single(self):
"""Test that ConnectionPool recycles a single connection."""
conn1 = self.rpc.ConnectionPool.get()
self.rpc.ConnectionPool.put(conn1)
conn2 = self.rpc.ConnectionPool.get()
self.rpc.ConnectionPool.put(conn2)
self.assertEqual(conn1, conn2)

View File

@@ -0,0 +1,189 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls shared between all implementations
"""
from nova import context
from nova import log as logging
from nova.rpc.common import RemoteError
from nova import test
LOG = logging.getLogger('nova.tests.rpc')
class _BaseRpcTestCase(test.TestCase):
def setUp(self):
super(_BaseRpcTestCase, self).setUp()
self.conn = self.rpc.create_connection(True)
self.receiver = TestReceiver()
self.conn.create_consumer('test', self.receiver, False)
self.conn.consume_in_thread()
self.context = context.get_admin_context()
def tearDown(self):
self.conn.close()
super(_BaseRpcTestCase, self).tearDown()
def test_call_succeed(self):
value = 42
result = self.rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
def test_call_succeed_despite_multiple_returns(self):
value = 42
result = self.rpc.call(self.context, 'test',
{"method": "echo_three_times",
"args": {"value": value}})
self.assertEqual(value + 2, result)
def test_call_succeed_despite_multiple_returns_yield(self):
value = 42
result = self.rpc.call(self.context, 'test',
{"method": "echo_three_times_yield",
"args": {"value": value}})
self.assertEqual(value + 2, result)
def test_multicall_succeed_once(self):
value = 42
result = self.rpc.multicall(self.context,
'test',
{"method": "echo",
"args": {"value": value}})
for i, x in enumerate(result):
if i > 0:
self.fail('should only receive one response')
self.assertEqual(value + i, x)
def test_multicall_succeed_three_times(self):
value = 42
result = self.rpc.multicall(self.context,
'test',
{"method": "echo_three_times",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(value + i, x)
def test_multicall_succeed_three_times_yield(self):
value = 42
result = self.rpc.multicall(self.context,
'test',
{"method": "echo_three_times_yield",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(value + i, x)
def test_context_passed(self):
"""Makes sure a context is passed through rpc call."""
value = 42
result = self.rpc.call(self.context,
'test', {"method": "context",
"args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
"""Test that exception gets passed back properly.
rpc.call returns a RemoteError object. The value of the
exception is converted to a string, so we convert it back
to an int in the test.
"""
value = 42
self.assertRaises(RemoteError,
self.rpc.call,
self.context,
'test',
{"method": "fail",
"args": {"value": value}})
try:
self.rpc.call(self.context,
'test',
{"method": "fail",
"args": {"value": value}})
self.fail("should have thrown RemoteError")
except RemoteError as exc:
self.assertEqual(int(exc.value), value)
def test_nested_calls(self):
"""Test that we can do an rpc.call inside another call."""
class Nested(object):
@staticmethod
def echo(context, queue, value):
"""Calls echo in the passed queue"""
LOG.debug(_("Nested received %(queue)s, %(value)s")
% locals())
# TODO: so, it will replay the context and use the same REQID?
# that's bizarre.
ret = self.rpc.call(context,
queue,
{"method": "echo",
"args": {"value": value}})
LOG.debug(_("Nested return %s"), ret)
return value
nested = Nested()
conn = self.rpc.create_connection(True)
conn.create_consumer('nested', nested, False)
conn.consume_in_thread()
value = 42
result = self.rpc.call(self.context,
'nested', {"method": "echo",
"args": {"queue": "test",
"value": value}})
conn.close()
self.assertEqual(value, result)
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
Uses static methods because we aren't actually storing any state.
"""
@staticmethod
def echo(context, value):
"""Simply returns whatever value is sent in."""
LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context."""
LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
def echo_three_times(context, value):
context.reply(value)
context.reply(value + 1)
context.reply(value + 2)
@staticmethod
def echo_three_times_yield(context, value):
yield value
yield value + 1
yield value + 2
@staticmethod
def fail(context, value):
"""Raises an exception with the value sent in."""
raise Exception(value)

View File

@@ -0,0 +1,110 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using kombu
"""
from nova import context
from nova import log as logging
from nova import test
from nova.rpc import impl_kombu
from nova.tests import test_rpc_common
LOG = logging.getLogger('nova.tests.rpc')
class RpcKombuTestCase(test_rpc_common._BaseRpcTestCase):
def setUp(self):
self.rpc = impl_kombu
super(RpcKombuTestCase, self).setUp()
def tearDown(self):
super(RpcKombuTestCase, self).tearDown()
def test_reusing_connection(self):
"""Test that reusing a connection returns same one."""
conn_context = self.rpc.create_connection(new=False)
conn1 = conn_context.connection
conn_context.close()
conn_context = self.rpc.create_connection(new=False)
conn2 = conn_context.connection
conn_context.close()
self.assertEqual(conn1, conn2)
def test_topic_send_receive(self):
"""Test sending to a topic exchange/queue"""
conn = self.rpc.create_connection()
message = 'topic test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_topic_consumer('a_topic', _callback)
conn.topic_send('a_topic', message)
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
def test_direct_send_receive(self):
"""Test sending to a direct exchange/queue"""
conn = self.rpc.create_connection()
message = 'direct test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_direct_consumer('a_direct', _callback)
conn.direct_send('a_direct', message)
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
@test.skip_test("kombu memory transport seems buggy with fanout queues "
"as this test passes when you use rabbit (fake_rabbit=False)")
def test_fanout_send_receive(self):
"""Test sending to a fanout exchange and consuming from 2 queues"""
conn = self.rpc.create_connection()
conn2 = self.rpc.create_connection()
message = 'fanout test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_fanout_consumer('a_fanout', _callback)
conn2.declare_fanout_consumer('a_fanout', _callback)
conn.fanout_send('a_fanout', message)
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
self.received_message = None
conn2.consume(limit=1)
conn2.close()
self.assertEqual(self.received_message, message)

View File

@@ -16,7 +16,6 @@
"""Test suite for XenAPI."""
import eventlet
import functools
import json
import os
@@ -203,42 +202,6 @@ class XenAPIVMTestCase(test.TestCase):
self.context = context.RequestContext(self.user_id, self.project_id)
self.conn = xenapi_conn.get_connection(False)
def test_parallel_builds(self):
stubs.stubout_loopingcall_delay(self.stubs)
def _do_build(id, proj, user, *args):
values = {
'id': id,
'project_id': proj,
'user_id': user,
'image_ref': 1,
'kernel_id': 2,
'ramdisk_id': 3,
'instance_type_id': '3', # m1.large
'os_type': 'linux',
'architecture': 'x86-64'}
network_info = [({'bridge': 'fa0', 'id': 0, 'injected': False},
{'broadcast': '192.168.0.255',
'dns': ['192.168.0.1'],
'gateway': '192.168.0.1',
'gateway6': 'dead:beef::1',
'ip6s': [{'enabled': '1',
'ip': 'dead:beef::dcad:beff:feef:0',
'netmask': '64'}],
'ips': [{'enabled': '1',
'ip': '192.168.0.100',
'netmask': '255.255.255.0'}],
'label': 'fake',
'mac': 'DE:AD:BE:EF:00:00',
'rxtx_cap': 3})]
instance = db.instance_create(self.context, values)
self.conn.spawn(self.context, instance, network_info)
gt1 = eventlet.spawn(_do_build, 1, self.project_id, self.user_id)
gt2 = eventlet.spawn(_do_build, 2, self.project_id, self.user_id)
gt1.wait()
gt2.wait()
def test_list_instances_0(self):
instances = self.conn.list_instances()
self.assertEquals(instances, [])

View File

@@ -23,6 +23,8 @@ import time
from nova import db
from nova import utils
from nova.compute import task_states
from nova.compute import vm_states
def stub_out_db_instance_api(stubs):
@@ -64,7 +66,8 @@ def stub_out_db_instance_api(stubs):
'image_ref': values['image_ref'],
'kernel_id': values['kernel_id'],
'ramdisk_id': values['ramdisk_id'],
'state_description': 'scheduling',
'vm_state': vm_states.BUILDING,
'task_state': task_states.SCHEDULING,
'user_id': values['user_id'],
'project_id': values['project_id'],
'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),