Use rpc from openstack-common.

Final patch for blueprint common-rpc.

This patch removes cinder.rpc in favor of the copy in openstack-common.

Change-Id: I9c2f6bdbe8cd0c44417f75284131dbf3c126d1dd
This commit is contained in:
Russell Bryant 2012-06-13 10:48:54 -04:00 committed by Mark McLoughlin
parent 84b7025547
commit 15f971de5b
36 changed files with 1396 additions and 1749 deletions

View File

@ -84,8 +84,8 @@ from cinder import flags
from cinder import log as logging
from cinder.openstack.common import cfg
from cinder.openstack.common import importutils
from cinder.openstack.common import rpc
from cinder import quota
from cinder import rpc
from cinder import utils
from cinder import version
from cinder.volume import volume_types

View File

@ -45,7 +45,7 @@ from cinder import exception
from cinder import flags
from cinder import log as logging
from cinder.openstack.common import cfg
from cinder import rpc
from cinder.openstack.common import rpc
delete_exchange_opt = \

View File

@ -56,7 +56,7 @@ This module provides Manager, a base class for managers.
from cinder.db import base
from cinder import flags
from cinder import log as logging
from cinder.rpc import dispatcher as rpc_dispatcher
from cinder.openstack.common.rpc import dispatcher as rpc_dispatcher
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import version

View File

@ -19,7 +19,7 @@ import cinder.context
from cinder import flags
from cinder import log as logging
from cinder.openstack.common import cfg
from cinder import rpc
from cinder.openstack.common import rpc
LOG = logging.getLogger(__name__)

View File

@ -31,7 +31,7 @@ from cinder.openstack.common import importutils
rpc_opts = [
cfg.StrOpt('rpc_backend',
default='cinder.rpc.impl_kombu',
default='%s.impl_kombu' % __package__,
help="The messaging module to use, defaults to kombu."),
cfg.IntOpt('rpc_thread_pool_size',
default=64,
@ -42,17 +42,23 @@ rpc_opts = [
cfg.IntOpt('rpc_response_timeout',
default=60,
help='Seconds to wait for a response from call or multicall'),
cfg.IntOpt('rpc_cast_timeout',
default=30,
help='Seconds to wait before a cast expires (TTL). '
'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules',
default=['cinder.exception'],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
default=['cinder.openstack.common.exception',
'nova.exception',
],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
cfg.StrOpt('control_exchange',
default='cinder',
default='nova',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
]
]
cfg.CONF.register_opts(rpc_opts)
@ -61,14 +67,14 @@ def create_connection(new=True):
"""Create a connection to the message bus used for rpc.
For some example usage of creating a connection and some consumers on that
connection, see cinder.service.
connection, see nova.service.
:param new: Whether or not to create a new connection. A new connection
will be created by default. If new is False, the
implementation is free to return an existing connection from a
pool.
:returns: An instance of cinder.rpc.common.Connection
:returns: An instance of openstack.common.rpc.common.Connection
"""
return _get_impl().create_connection(cfg.CONF, new=new)
@ -80,9 +86,9 @@ def call(context, topic, msg, timeout=None):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
cinder.rpc.common.Connection.create_consumer()
and only applies when the consumer was created
with fanout=False.
openstack.common.rpc.common.Connection.create_consumer()
and only applies when the consumer was created with
fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
@ -90,8 +96,8 @@ def call(context, topic, msg, timeout=None):
:returns: A dict from the remote method.
:raises: cinder.rpc.common.Timeout if a complete response is not received
before the timeout is reached.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
@ -103,9 +109,9 @@ def cast(context, topic, msg):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
cinder.rpc.common.Connection.create_consumer()
and only applies when the consumer was created
with fanout=False.
openstack.common.rpc.common.Connection.create_consumer()
and only applies when the consumer was created with
fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
@ -124,9 +130,9 @@ def fanout_cast(context, topic, msg):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
cinder.rpc.common.Connection.create_consumer()
and only applies when the consumer was created
with fanout=True.
openstack.common.rpc.common.Connection.create_consumer()
and only applies when the consumer was created with
fanout=True.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
@ -146,9 +152,9 @@ def multicall(context, topic, msg, timeout=None):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
cinder.rpc.common.Connection.create_consumer()
and only applies when the consumer was created
with fanout=False.
openstack.common.rpc.common.Connection.create_consumer()
and only applies when the consumer was created with
fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
@ -159,8 +165,8 @@ def multicall(context, topic, msg, timeout=None):
returned and X is the Nth value that was returned by the remote
method.
:raises: cinder.rpc.common.Timeout if a complete response is not received
before the timeout is reached.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
@ -224,7 +230,20 @@ def fanout_cast_to_server(context, server_params, topic, msg):
def queue_get_for(context, topic, host):
"""Get a queue name for a given topic + host."""
"""Get a queue name for a given topic + host.
This function only works if this naming convention is followed on the
consumer side, as well. For example, in nova, every instance of the
nova-foo service calls create_consumer() for two topics:
foo
foo.<host>
Messages sent to the 'foo' topic are distributed to exactly one instance of
the nova-foo service. The services are chosen in a round-robin fashion.
Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
<host>.
"""
return '%s.%s' % (topic, host)
@ -235,5 +254,11 @@ def _get_impl():
"""Delay import of rpc_backend until configuration is loaded."""
global _RPCIMPL
if _RPCIMPL is None:
_RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
try:
_RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
except ImportError:
# For backwards compatibility with older nova config.
impl = cfg.CONF.rpc_backend.replace('nova.rpc',
'nova.openstack.common.rpc')
_RPCIMPL = importutils.import_module(impl)
return _RPCIMPL

View File

@ -18,7 +18,7 @@
# under the License.
"""
Shared code between AMQP based cinder.rpc implementations.
Shared code between AMQP based openstack.common.rpc implementations.
The code in this module is shared between the rpc implemenations based on AMQP.
Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
@ -35,8 +35,9 @@ from eventlet import pools
from eventlet import semaphore
from cinder.openstack.common import excutils
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import local
import cinder.rpc.common as rpc_common
from cinder.openstack.common.rpc import common as rpc_common
LOG = logging.getLogger(__name__)
@ -74,13 +75,14 @@ def get_connection_pool(conf, connection_cls):
class ConnectionContext(rpc_common.Connection):
"""The class that is actually returned to the caller of
create_connection(). This is a essentially a wrapper around
Connection that supports 'with' and can return a new Connection or
one from a pool. It will also catch when an instance of this class
is to be deleted so that we can return Connections to the pool on
exceptions and so forth without making the caller be responsible for
catching all exceptions and making sure to return a connection to
the pool.
create_connection(). This is essentially a wrapper around
Connection that supports 'with'. It can also return a new
Connection, or one from a pool. The function will also catch
when an instance of this class is to be deleted. With that
we can return Connections to the pool on exceptions and so
forth without making the caller be responsible for catching
them. If possible the function makes sure to return a
connection to the pool.
"""
def __init__(self, conf, connection_pool, pooled=True, server_params=None):
@ -91,8 +93,9 @@ class ConnectionContext(rpc_common.Connection):
if pooled:
self.connection = connection_pool.get()
else:
self.connection = connection_pool.connection_cls(conf,
server_params=server_params)
self.connection = connection_pool.connection_cls(
conf,
server_params=server_params)
self.pooled = pooled
def __enter__(self):
@ -131,6 +134,9 @@ class ConnectionContext(rpc_common.Connection):
def create_consumer(self, topic, proxy, fanout=False):
self.connection.create_consumer(topic, proxy, fanout)
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
def consume_in_thread(self):
self.connection.consume_in_thread()
@ -157,8 +163,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, msg)
@ -171,6 +177,12 @@ class RpcContext(rpc_common.CommonRpcContext):
self.conf = kwargs.pop('conf')
super(RpcContext, self).__init__(**kwargs)
def deepcopy(self):
values = self.to_dict()
values['conf'] = self.conf
values['msg_id'] = self.msg_id
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None):
if self.msg_id:
@ -278,8 +290,8 @@ class ProxyCallback(object):
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
self._connection = connection
self._iterator = connection.iterconsume(
timeout=timeout or conf.rpc_response_timeout)
self._iterator = connection.iterconsume(timeout=timeout or
conf.rpc_response_timeout)
self._result = None
self._done = False
self._got_ending = False
@ -298,7 +310,7 @@ class MulticallWaiter(object):
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
failure)
failure)
elif data.get('ending', False):
self._got_ending = True
@ -379,16 +391,16 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
server_params=server_params) as conn:
conn.topic_send(topic, msg)
def fanout_cast_to_server(conf, context, server_params, topic, msg,
connection_pool):
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
server_params=server_params) as conn:
conn.fanout_send(topic, msg)

View File

@ -23,6 +23,7 @@ import sys
import traceback
from cinder.openstack.common import cfg
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import importutils
from cinder.openstack.common import jsonutils
from cinder.openstack.common import local
@ -119,8 +120,8 @@ class Connection(object):
:param conf: An openstack.common.cfg configuration object.
:param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same
topic. For example, all instances of cinder-compute
consume from a queue called "compute". In that case, the
topic. For example, all instances of nova-compute consume
from a queue called "compute". In that case, the
messages will get distributed amongst the consumers in a
round-robin fashion if fanout=False. If fanout=True,
every consumer associated with this topic will get a
@ -132,6 +133,25 @@ class Connection(object):
"""
raise NotImplementedError()
def create_worker(self, conf, topic, proxy, pool_name):
"""Create a worker on this connection.
A worker is like a regular consumer of messages directed to a
topic, except that it is part of a set of such consumers (the
"pool") which may run in parallel. Every pool of workers will
receive a given message, but only one worker in the pool will
be asked to process it. Load is distributed across the members
of the pool in round-robin fashion.
:param conf: An openstack.common.cfg configuration object.
:param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same
topic.
:param proxy: The object that will handle all incoming messages.
:param pool_name: String containing the name of the pool of workers
"""
raise NotImplementedError()
def consume_in_thread(self):
"""Spawn a thread to handle incoming messages.
@ -148,10 +168,8 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
SANITIZE = {
'set_admin_password': ('new_pass',),
'run_instance': ('admin_password',),
}
SANITIZE = {'set_admin_password': ('new_pass',),
'run_instance': ('admin_password',), }
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data
@ -236,7 +254,7 @@ def deserialize_remote_exception(conf, data):
ex_type = type(failure)
str_override = lambda self: message
new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,),
{'__str__': str_override})
{'__str__': str_override, '__unicode__': str_override})
try:
# NOTE(ameade): Dynamically create a new exception type and swap it in
# as the new type for the exception. This only works on user defined
@ -268,20 +286,22 @@ class CommonRpcContext(object):
def from_dict(cls, values):
return cls(**values)
def deepcopy(self):
return self.from_dict(self.to_dict())
def update_store(self):
local.store.context = self
def elevated(self, read_deleted=None, overwrite=False):
"""Return a version of this context with admin flag set."""
# TODO(russellb) This method is a bit of a cinder-ism. It makes
# TODO(russellb) This method is a bit of a nova-ism. It makes
# some assumptions about the data in the request context sent
# across rpc, while the rest of this class does not. We could get
# rid of this if we changed the cinder code that uses this to
# rid of this if we changed the nova code that uses this to
# convert the RpcContext back to its native RequestContext doing
# something like
# cinder.context.RequestContext.from_dict(ctxt.to_dict())
# something like nova.context.RequestContext.from_dict(ctxt.to_dict())
context = copy.deepcopy(self)
context = self.deepcopy()
context.values['is_admin'] = True
context.values.setdefault('roles', [])

View File

@ -40,9 +40,48 @@ The conversion over to a versioned API must be done on both the client side and
server side of the API at the same time. However, as the code stands today,
there can be both versioned and unversioned APIs implemented in the same code
base.
EXAMPLES:
Nova was the first project to use versioned rpc APIs. Consider the compute rpc
API as an example. The client side is in nova/compute/rpcapi.py and the server
side is in nova/compute/manager.py.
Example 1) Adding a new method.
Adding a new method is a backwards compatible change. It should be added to
nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
have a specific version specified to indicate the minimum API version that must
be implemented for the method to be supported. For example:
def get_host_uptime(self, ctxt, host):
topic = _compute_topic(self.topic, ctxt, host, None)
return self.call(ctxt, self.make_msg('get_host_uptime'), topic,
version='1.1')
In this case, version '1.1' is the first version that supported the
get_host_uptime() method.
Example 2) Adding a new parameter.
Adding a new parameter to an rpc method can be made backwards compatible. The
RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
The implementation of the method must not expect the parameter to be present.
def some_remote_method(self, arg1, arg2, newarg=None):
# The code needs to deal with newarg=None for cases
# where an older client sends a message without it.
pass
On the client side, the same changes should be made as in example 1. The
minimum version that supports the new parameter should be specified.
"""
from cinder.rpc import common as rpc_common
from cinder.openstack.common.rpc import common as rpc_common
class RpcDispatcher(object):
@ -92,14 +131,20 @@ class RpcDispatcher(object):
if not version:
version = '1.0'
had_compatible = False
for proxyobj in self.callbacks:
if hasattr(proxyobj, 'RPC_API_VERSION'):
rpc_api_version = proxyobj.RPC_API_VERSION
else:
rpc_api_version = '1.0'
is_compatible = self._is_compatible(rpc_api_version, version)
had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method):
continue
if self._is_compatible(rpc_api_version, version):
if is_compatible:
return getattr(proxyobj, method)(ctxt, **kwargs)
raise rpc_common.UnsupportedRpcVersion(version=version)
if had_compatible:
raise AttributeError("No such RPC function '%s'" % method)
else:
raise rpc_common.UnsupportedRpcVersion(version=version)

View File

@ -18,15 +18,12 @@ queues. Casts will block, but this is very useful for tests.
"""
import inspect
import json
import signal
import sys
import time
import traceback
import eventlet
from cinder.rpc import common as rpc_common
from cinder.openstack.common import jsonutils
from cinder.openstack.common.rpc import common as rpc_common
CONSUMERS = {}
@ -37,6 +34,13 @@ class RpcContext(rpc_common.CommonRpcContext):
self._response = []
self._done = False
def deepcopy(self):
values = self.to_dict()
new_inst = self.__class__(**values)
new_inst._response = self._response
new_inst._done = self._done
return new_inst
def reply(self, reply=None, failure=None, ending=False):
if ending:
self._done = True
@ -117,7 +121,7 @@ def create_connection(conf, new=True):
def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized."""
json.dumps(msg)
jsonutils.dumps(msg)
def multicall(conf, context, topic, msg, timeout=None):
@ -171,9 +175,10 @@ def fanout_cast(conf, context, topic, msg):
if not method:
return
args = msg.get('args', {})
version = msg.get('version', None)
for consumer in CONSUMERS.get(topic, []):
try:
consumer.call(context, method, args, None)
consumer.call(context, version, method, args, None)
except Exception:
pass

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import itertools
import socket
import ssl
@ -24,13 +25,14 @@ import uuid
import eventlet
import greenlet
import kombu
import kombu.connection
import kombu.entity
import kombu.messaging
import kombu.connection
from cinder.openstack.common import cfg
from cinder.rpc import amqp as rpc_amqp
from cinder.rpc import common as rpc_common
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common.rpc import amqp as rpc_amqp
from cinder.openstack.common.rpc import common as rpc_common
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
@ -45,7 +47,7 @@ kombu_opts = [
cfg.StrOpt('kombu_ssl_ca_certs',
default='',
help=('SSL certification authority file '
'(valid only if SSL enabled)')),
'(valid only if SSL enabled)')),
cfg.StrOpt('rabbit_host',
default='localhost',
help='the RabbitMQ host'),
@ -79,7 +81,7 @@ kombu_opts = [
default=False,
help='use durable queues in RabbitMQ'),
]
]
cfg.CONF.register_opts(kombu_opts)
@ -170,55 +172,55 @@ class DirectConsumer(ConsumerBase):
"""
# Default options
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
exchange = kombu.entity.Exchange(
name=msg_id,
type='direct',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(DirectConsumer, self).__init__(
channel,
callback,
tag,
name=msg_id,
exchange=exchange,
routing_key=msg_id,
**options)
exchange = kombu.entity.Exchange(name=msg_id,
type='direct',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(DirectConsumer, self).__init__(channel,
callback,
tag,
name=msg_id,
exchange=exchange,
routing_key=msg_id,
**options)
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
def __init__(self, conf, channel, topic, callback, tag, **kwargs):
def __init__(self, conf, channel, topic, callback, tag, name=None,
**kwargs):
"""Init a 'topic' queue.
'channel' is the amqp channel to use
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
'tag' is a unique ID for the consumer on the channel
:param channel: the amqp channel to use
:param topic: the topic to listen on
:paramtype topic: str
:param callback: the callback to call when messages are received
:param tag: a unique ID for the consumer on the channel
:param name: optional queue name, defaults to topic
:paramtype name: str
Other kombu options may be passed
Other kombu options may be passed as keyword arguments
"""
# Default options
options = {'durable': conf.rabbit_durable_queues,
'auto_delete': False,
'exclusive': False}
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(
name=conf.control_exchange,
type='topic',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(TopicConsumer, self).__init__(
channel,
callback,
tag,
name=topic,
exchange=exchange,
routing_key=topic,
**options)
exchange = kombu.entity.Exchange(name=conf.control_exchange,
type='topic',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(TopicConsumer, self).__init__(channel,
callback,
tag,
name=name or topic,
exchange=exchange,
routing_key=topic,
**options)
class FanoutConsumer(ConsumerBase):
@ -240,22 +242,17 @@ class FanoutConsumer(ConsumerBase):
# Default options
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
exchange = kombu.entity.Exchange(
name=exchange_name,
type='fanout',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(FanoutConsumer, self).__init__(
channel,
callback,
tag,
name=queue_name,
exchange=exchange,
routing_key=topic,
**options)
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
durable=options['durable'],
auto_delete=options['auto_delete'])
super(FanoutConsumer, self).__init__(channel, callback, tag,
name=queue_name,
exchange=exchange,
routing_key=topic,
**options)
class Publisher(object):
@ -273,9 +270,10 @@ class Publisher(object):
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection"""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
channel=channel, routing_key=self.routing_key)
channel=channel,
routing_key=self.routing_key)
def send(self, msg):
"""Send a message"""
@ -291,14 +289,11 @@ class DirectPublisher(Publisher):
"""
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel,
msg_id,
msg_id,
type='direct',
**options)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options)
class TopicPublisher(Publisher):
@ -309,14 +304,11 @@ class TopicPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': conf.rabbit_durable_queues,
'auto_delete': False,
'exclusive': False}
'auto_delete': False,
'exclusive': False}
options.update(kwargs)
super(TopicPublisher, self).__init__(channel,
conf.control_exchange,
topic,
type='topic',
**options)
super(TopicPublisher, self).__init__(channel, conf.control_exchange,
topic, type='topic', **options)
class FanoutPublisher(Publisher):
@ -327,14 +319,11 @@ class FanoutPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
super(FanoutPublisher, self).__init__(channel,
'%s_fanout' % topic,
None,
type='fanout',
**options)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options)
class NotifyPublisher(TopicPublisher):
@ -351,10 +340,10 @@ class NotifyPublisher(TopicPublisher):
# we do this to ensure that messages don't get dropped if the
# consumer is started after we do
queue = kombu.entity.Queue(channel=channel,
exchange=self.exchange,
durable=self.durable,
name=self.routing_key,
routing_key=self.routing_key)
exchange=self.exchange,
durable=self.durable,
name=self.routing_key,
routing_key=self.routing_key)
queue.declare()
@ -440,7 +429,7 @@ class Connection(object):
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % self.params)
"%(hostname)s:%(port)d") % self.params)
try:
self.connection.close()
except self.connection_errors:
@ -448,8 +437,7 @@ class Connection(object):
# Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet.
self.connection = None
self.connection = kombu.connection.BrokerConnection(
**self.params)
self.connection = kombu.connection.BrokerConnection(**self.params)
self.connection_errors = self.connection.connection_errors
if self.memory_transport:
# Kludge to speed up tests.
@ -499,8 +487,8 @@ class Connection(object):
if self.max_retries and attempt == self.max_retries:
LOG.exception(_('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info)
'%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore.
@ -515,8 +503,8 @@ class Connection(object):
log_info['sleep_time'] = sleep_time
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is'
' unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info)
' unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
@ -566,11 +554,11 @@ class Connection(object):
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s") % log_info)
"%(err_str)s") % log_info)
def _declare_consumer():
consumer = consumer_cls(self.conf, self.channel, topic, callback,
self.consumer_num.next())
self.consumer_num.next())
self.consumers.append(consumer)
return consumer
@ -584,11 +572,11 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
str(exc))
info['do_consume'] = True
def _consume():
@ -622,7 +610,7 @@ class Connection(object):
def _error_callback(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s") % log_info)
"'%(topic)s': %(err_str)s") % log_info)
def _publish():
publisher = cls(self.conf, self.channel, topic, **kwargs)
@ -632,14 +620,17 @@ class Connection(object):
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
In cinder's use, this is generally a msg_id queue used for
In nova's use, this is generally a msg_id queue used for
responses for call/multicall
"""
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None):
def declare_topic_consumer(self, topic, callback=None, queue_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(TopicConsumer, topic, callback)
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
),
topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
@ -683,61 +674,77 @@ class Connection(object):
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
rpc_amqp.get_connection_pool(self, Connection))
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
if fanout:
self.declare_fanout_consumer(topic, proxy_cb)
else:
self.declare_topic_consumer(topic, proxy_cb)
def create_worker(self, topic, proxy, pool_name):
"""Create a worker that calls a method in a proxy object"""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.declare_topic_consumer(topic, proxy_cb, pool_name)
def create_connection(conf, new=True):
"""Create a connection"""
return rpc_amqp.create_connection(conf, new,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.create_connection(
conf, new,
rpc_amqp.get_connection_pool(conf, Connection))
def multicall(conf, context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
return rpc_amqp.multicall(conf, context, topic, msg, timeout,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.multicall(
conf, context, topic, msg, timeout,
rpc_amqp.get_connection_pool(conf, Connection))
def call(conf, context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
return rpc_amqp.call(conf, context, topic, msg, timeout,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.call(
conf, context, topic, msg, timeout,
rpc_amqp.get_connection_pool(conf, Connection))
def cast(conf, context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
return rpc_amqp.cast(conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.cast(
conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast(conf, context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
return rpc_amqp.fanout_cast(conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.fanout_cast(
conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
def cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.cast_to_server(
conf, context, server_params, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.cast_to_server(
conf, context, server_params, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.notify(
conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
def cleanup():

View File

@ -15,8 +15,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import itertools
import json
import logging
import time
import uuid
@ -27,8 +27,10 @@ import qpid.messaging
import qpid.messaging.exceptions
from cinder.openstack.common import cfg
from cinder.rpc import amqp as rpc_amqp
from cinder.rpc import common as rpc_common
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import jsonutils
from cinder.openstack.common.rpc import amqp as rpc_amqp
from cinder.openstack.common.rpc import common as rpc_common
LOG = logging.getLogger(__name__)
@ -75,7 +77,7 @@ qpid_opts = [
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
help='Disable Nagle algorithm'),
]
]
cfg.CONF.register_opts(qpid_opts)
@ -123,7 +125,7 @@ class ConsumerBase(object):
addr_opts["node"]["x-declare"].update(node_opts)
addr_opts["link"]["x-declare"].update(link_opts)
self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session)
@ -159,26 +161,29 @@ class DirectConsumer(ConsumerBase):
"""
super(DirectConsumer, self).__init__(session, callback,
"%s/%s" % (msg_id, msg_id),
{"type": "direct"},
msg_id,
{"exclusive": True})
"%s/%s" % (msg_id, msg_id),
{"type": "direct"},
msg_id,
{"exclusive": True})
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
def __init__(self, conf, session, topic, callback):
def __init__(self, conf, session, topic, callback, name=None):
"""Init a 'topic' queue.
'session' is the amqp session to use
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
:param session: the amqp session to use
:param topic: is the topic to listen on
:paramtype topic: str
:param callback: the callback to call when messages are received
:param name: optional queue name, defaults to topic
"""
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (conf.control_exchange, topic), {},
topic, {})
"%s/%s" % (conf.control_exchange,
topic),
{}, name or topic, {})
class FanoutConsumer(ConsumerBase):
@ -192,11 +197,12 @@ class FanoutConsumer(ConsumerBase):
'callback' is the callback to call when messages are received
"""
super(FanoutConsumer, self).__init__(session, callback,
"%s_fanout" % topic,
{"durable": False, "type": "fanout"},
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True})
super(FanoutConsumer, self).__init__(
session, callback,
"%s_fanout" % topic,
{"durable": False, "type": "fanout"},
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True})
class Publisher(object):
@ -224,7 +230,7 @@ class Publisher(object):
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)
self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session)
@ -250,8 +256,9 @@ class TopicPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
super(TopicPublisher, self).__init__(session,
"%s/%s" % (conf.control_exchange, topic))
super(TopicPublisher, self).__init__(
session,
"%s/%s" % (conf.control_exchange, topic))
class FanoutPublisher(Publisher):
@ -259,8 +266,9 @@ class FanoutPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'fanout' publisher.
"""
super(FanoutPublisher, self).__init__(session,
"%s_fanout" % topic, {"type": "fanout"})
super(FanoutPublisher, self).__init__(
session,
"%s_fanout" % topic, {"type": "fanout"})
class NotifyPublisher(Publisher):
@ -268,9 +276,10 @@ class NotifyPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
super(NotifyPublisher, self).__init__(session,
"%s/%s" % (conf.control_exchange, topic),
{"durable": True})
super(NotifyPublisher, self).__init__(
session,
"%s/%s" % (conf.control_exchange, topic),
{"durable": True})
class Connection(object):
@ -288,9 +297,9 @@ class Connection(object):
server_params = {}
default_params = dict(hostname=self.conf.qpid_hostname,
port=self.conf.qpid_port,
username=self.conf.qpid_username,
password=self.conf.qpid_password)
port=self.conf.qpid_port,
username=self.conf.qpid_username,
password=self.conf.qpid_password)
params = server_params
for key in default_params.keys():
@ -308,18 +317,18 @@ class Connection(object):
self.connection.reconnect = self.conf.qpid_reconnect
if self.conf.qpid_reconnect_timeout:
self.connection.reconnect_timeout = (
self.conf.qpid_reconnect_timeout)
self.conf.qpid_reconnect_timeout)
if self.conf.qpid_reconnect_limit:
self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
if self.conf.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
self.conf.qpid_reconnect_interval_max)
self.conf.qpid_reconnect_interval_max)
if self.conf.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
self.conf.qpid_reconnect_interval_min)
self.conf.qpid_reconnect_interval_min)
if self.conf.qpid_reconnect_interval:
self.connection.reconnect_interval = (
self.conf.qpid_reconnect_interval)
self.conf.qpid_reconnect_interval)
self.connection.hearbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
@ -391,7 +400,7 @@ class Connection(object):
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s") % log_info)
"%(err_str)s") % log_info)
def _declare_consumer():
consumer = consumer_cls(self.conf, self.session, topic, callback)
@ -406,11 +415,11 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, qpid.messaging.exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
str(exc))
def _consume():
nxt_receiver = self.session.next_receiver(timeout=timeout)
@ -440,7 +449,7 @@ class Connection(object):
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s") % log_info)
"'%(topic)s': %(err_str)s") % log_info)
def _publisher_send():
publisher = cls(self.conf, self.session, topic)
@ -450,14 +459,17 @@ class Connection(object):
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
In cinder's use, this is generally a msg_id queue used for
In nova's use, this is generally a msg_id queue used for
responses for call/multicall
"""
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None):
def declare_topic_consumer(self, topic, callback=None, queue_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(TopicConsumer, topic, callback)
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
),
topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
@ -501,8 +513,9 @@ class Connection(object):
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
rpc_amqp.get_connection_pool(self, Connection))
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@ -513,53 +526,73 @@ class Connection(object):
return consumer
def create_worker(self, topic, proxy, pool_name):
"""Create a worker that calls a method in a proxy object"""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name)
self._register_consumer(consumer)
return consumer
def create_connection(conf, new=True):
"""Create a connection"""
return rpc_amqp.create_connection(conf, new,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.create_connection(
conf, new,
rpc_amqp.get_connection_pool(conf, Connection))
def multicall(conf, context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
return rpc_amqp.multicall(conf, context, topic, msg, timeout,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.multicall(
conf, context, topic, msg, timeout,
rpc_amqp.get_connection_pool(conf, Connection))
def call(conf, context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
return rpc_amqp.call(conf, context, topic, msg, timeout,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.call(
conf, context, topic, msg, timeout,
rpc_amqp.get_connection_pool(conf, Connection))
def cast(conf, context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
return rpc_amqp.cast(conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.cast(
conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast(conf, context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
return rpc_amqp.fanout_cast(conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.fanout_cast(
conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
def cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.cast_to_server(
conf, context, server_params, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic,
msg, rpc_amqp.get_connection_pool(conf, Connection))
return rpc_amqp.fanout_cast_to_server(
conf, context, server_params, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
rpc_amqp.get_connection_pool(conf, Connection))
def cleanup():

View File

@ -0,0 +1,725 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pprint
import socket
import string
import sys
import types
import uuid
import eventlet
from eventlet.green import zmq
import greenlet
from cinder.openstack.common import cfg
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import importutils
from cinder.openstack.common import jsonutils
from cinder.openstack.common.rpc import common as rpc_common
# for convenience, are not modified.
pformat = pprint.pformat
Timeout = eventlet.timeout.Timeout
LOG = rpc_common.LOG
RemoteError = rpc_common.RemoteError
RPCException = rpc_common.RPCException
zmq_opts = [
cfg.StrOpt('rpc_zmq_bind_address', default='*',
help='ZeroMQ bind address. Should be a wildcard (*), '
'an ethernet interface, or IP. '
'The "host" option should point or resolve to this '
'address.'),
# The module.Class to use for matchmaking.
cfg.StrOpt(
'rpc_zmq_matchmaker',
default=('cinder.openstack.common.rpc.'
'matchmaker.MatchMakerLocalhost'),
help='MatchMaker driver',
),
# The following port is unassigned by IANA as of 2012-05-21
cfg.IntOpt('rpc_zmq_port', default=9501,
help='ZeroMQ receiver listening port'),
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
help='Name of this node. Must be a valid hostname, FQDN, or '
'IP address. Must match "host" option, if running Nova.')
]
# These globals are defined in register_opts(conf),
# a mandatory initialization call
FLAGS = None
ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
def _serialize(data):
"""
Serialization wrapper
We prefer using JSON, but it cannot encode all types.
Error if a developer passes us bad data.
"""
try:
return str(jsonutils.dumps(data, ensure_ascii=True))
except TypeError:
LOG.error(_("JSON serialization failed."))
raise
def _deserialize(data):
"""
Deserialization wrapper
"""
LOG.debug(_("Deserializing: %s"), data)
return jsonutils.loads(data)
class ZmqSocket(object):
"""
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
and connection management.
Can be used as a Context (supports the 'with' statement).
"""
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
self.sock = ZMQ_CTX.socket(zmq_type)
self.addr = addr
self.type = zmq_type
self.subscriptions = []
# Support failures on sending/receiving on wrong socket type.
self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
self.can_sub = zmq_type in (zmq.SUB, )
# Support list, str, & None for subscribe arg (cast to list)
do_sub = {
list: subscribe,
str: [subscribe],
type(None): []
}[type(subscribe)]
for f in do_sub:
self.subscribe(f)
str_data = {'addr': addr, 'type': self.socket_s(),
'subscribe': subscribe, 'bind': bind}
LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
LOG.debug(_("-> bind: %(bind)s"), str_data)
try:
if bind:
self.sock.bind(addr)
else:
self.sock.connect(addr)
except Exception:
raise RPCException(_("Could not open socket."))
def socket_s(self):
"""Get socket type as string."""
t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
'DEALER')
return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]
def subscribe(self, msg_filter):
"""Subscribe."""
if not self.can_sub:
raise RPCException("Cannot subscribe on this socket.")
LOG.debug(_("Subscribing to %s"), msg_filter)
try:
self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
except Exception:
return
self.subscriptions.append(msg_filter)
def unsubscribe(self, msg_filter):
"""Unsubscribe."""
if msg_filter not in self.subscriptions:
return
self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
self.subscriptions.remove(msg_filter)
def close(self):
if self.sock is None or self.sock.closed:
return
# We must unsubscribe, or we'll leak descriptors.
if len(self.subscriptions) > 0:
for f in self.subscriptions:
try:
self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
except Exception:
pass
self.subscriptions = []
# Linger -1 prevents lost/dropped messages
try:
self.sock.close(linger=-1)
except Exception:
pass
self.sock = None
def recv(self):
if not self.can_recv:
raise RPCException(_("You cannot recv on this socket."))
return self.sock.recv_multipart()
def send(self, data):
if not self.can_send:
raise RPCException(_("You cannot send on this socket."))
self.sock.send_multipart(data)
class ZmqClient(object):
"""Client for ZMQ sockets."""
def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data):
self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)])
def close(self):
self.outq.close()
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call."""
def __init__(self, **kwargs):
self.replies = []
super(RpcContext, self).__init__(**kwargs)
def deepcopy(self):
values = self.to_dict()
values['replies'] = self.replies
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False):
if ending:
return
self.replies.append(reply)
@classmethod
def marshal(self, ctx):
ctx_data = ctx.to_dict()
return _serialize(ctx_data)
@classmethod
def unmarshal(self, data):
return RpcContext.from_dict(_deserialize(data))
class InternalContext(object):
"""Used by ConsumerBase as a private context for - methods."""
def __init__(self, proxy):
self.proxy = proxy
self.msg_waiter = None
def _get_response(self, ctx, proxy, topic, data):
"""Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
data.setdefault('version', None)
data.setdefault('args', [])
try:
result = proxy.dispatch(
ctx, data['version'], data['method'], **data['args'])
return ConsumerBase.normalize_reply(result, ctx.replies)
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
pass
except Exception:
return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())}
def reply(self, ctx, proxy,
msg_id=None, context=None, topic=None, msg=None):
"""Reply to a casted call."""
# Our real method is curried into msg['args']
child_ctx = RpcContext.unmarshal(msg[0])
response = ConsumerBase.normalize_reply(
self._get_response(child_ctx, proxy, topic, msg[1]),
ctx.replies)
LOG.debug(_("Sending reply"))
cast(FLAGS, ctx, topic, {
'method': '-process_reply',
'args': {
'msg_id': msg_id,
'response': response
}
})
class ConsumerBase(object):
"""Base Consumer."""
def __init__(self):
self.private_ctx = InternalContext(None)
@classmethod
def normalize_reply(self, result, replies):
#TODO(ewindisch): re-evaluate and document this method.
if isinstance(result, types.GeneratorType):
return list(result)
elif replies:
return replies
else:
return [result]
def process(self, style, target, proxy, ctx, data):
# Method starting with - are
# processed internally. (non-valid method name)
method = data['method']
# Internal method
# uses internal context for safety.
if data['method'][0] == '-':
# For reply / process_reply
method = method[1:]
if method == 'reply':
self.private_ctx.reply(ctx, proxy, **data['args'])
return
data.setdefault('version', None)
data.setdefault('args', [])
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
class ZmqBaseReactor(ConsumerBase):
"""
A consumer class implementing a
centralized casting broker (PULL-PUSH)
for RoundRobin requests.
"""
def __init__(self, conf):
super(ZmqBaseReactor, self).__init__()
self.conf = conf
self.mapping = {}
self.proxies = {}
self.threads = []
self.sockets = []
self.subscribe = {}
self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
zmq_type_out=None, in_bind=True, out_bind=True,
subscribe=None):
LOG.info(_("Registering reactor"))
if zmq_type_in not in (zmq.PULL, zmq.SUB):
raise RPCException("Bad input socktype")
# Items push in.
inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
subscribe=subscribe)
self.proxies[inq] = proxy
self.sockets.append(inq)
LOG.info(_("In reactor registered"))
if not out_addr:
return
if zmq_type_out not in (zmq.PUSH, zmq.PUB):
raise RPCException("Bad output socktype")
# Items push out.
outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
self.mapping[inq] = outq
self.mapping[outq] = inq
self.sockets.append(outq)
LOG.info(_("Out reactor registered"))
def consume_in_thread(self):
def _consume(sock):
LOG.info(_("Consuming socket"))
while True:
self.consume(sock)
for k in self.proxies.keys():
self.threads.append(
self.pool.spawn(_consume, k)
)
def wait(self):
for t in self.threads:
t.wait()
def close(self):
for s in self.sockets:
s.close()
for t in self.threads:
t.kill()
class ZmqProxy(ZmqBaseReactor):
"""
A consumer class implementing a
topic-based proxy, forwarding to
IPC sockets.
"""
def __init__(self, conf):
super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {}
ipc_dir = conf.rpc_zmq_ipc_dir
self.topic_proxy['zmq_replies'] = \
ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock):
ipc_dir = self.conf.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
msg_id, topic, style, in_msg = data
topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
# Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = _deserialize(in_msg)
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
data = [str(msg_id), _serialize(response)]
else:
sock_type = zmq.PUSH
if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
sock_type, bind=True)
self.topic_proxy[topic] = outq
self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic)
# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
consumer for messages. Can also be
used as a 1:1 proxy
"""
def __init__(self, conf):
super(ZmqReactor, self).__init__(conf)
def consume(self, sock):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
if sock in self.mapping:
LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
'data': data})
self.mapping[sock].send(data)
return
msg_id, topic, style, in_msg = data
ctx, request = _deserialize(in_msg)
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
self.pool.spawn_n(self.process, style, topic,
proxy, ctx, request)
class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
self.conf = conf
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic})
# Subscription scenarios
if fanout:
subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
topic = 'fanout~' + topic
else:
sock_type = zmq.PULL
subscribe = None
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
(self.conf.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"),
['PULL', 'SUB'][sock_type == zmq.SUB])
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
def close(self):
self.reactor.close()
def wait(self):
self.reactor.wait()
def consume_in_thread(self):
self.reactor.consume_in_thread()
def _cast(addr, context, msg_id, topic, msg, timeout=None):
timeout_cast = timeout or FLAGS.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
with Timeout(timeout_cast, exception=rpc_common.Timeout):
try:
conn = ZmqClient(addr)
# assumes cast can't return an exception
conn.cast(msg_id, topic, payload)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
if 'conn' in vars():
conn.close()
def _call(addr, context, msg_id, topic, msg, timeout=None):
# timeout_response is how long we wait for a response
timeout = timeout or FLAGS.rpc_response_timeout
# The msg_id is used to track replies.
msg_id = str(uuid.uuid4().hex)
# Replies always come into the reply service.
reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host
LOG.debug(_("Creating payload"))
# Curry the original request into a reply method.
mcontext = RpcContext.marshal(context)
payload = {
'method': '-reply',
'args': {
'msg_id': msg_id,
'context': mcontext,
'topic': reply_topic,
'msg': [mcontext, msg]
}
}
LOG.debug(_("Creating queue socket for reply waiter"))
# Messages arriving async.
# TODO(ewindisch): have reply consumer with dynamic subscription mgmt
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
"ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir,
zmq.SUB, subscribe=msg_id, bind=False
)
LOG.debug(_("Sending cast"))
_cast(addr, context, msg_id, topic, payload)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
responses = _deserialize(msg[-1])
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
finally:
if 'msg_waiter' in vars():
msg_waiter.close()
# It seems we don't need to do all of the following,
# but perhaps it would be useful for multicall?
# One effect of this is that we're checking all
# responses for Exceptions.
for resp in responses:
if isinstance(resp, types.DictType) and 'exc' in resp:
raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc'])
return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
message to all relevant hosts.
"""
conf = FLAGS
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
queues = matchmaker.queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
if len(queues) == 0:
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
raise rpc_common.Timeout, "No match from matchmaker."
# This supports brokerless fanout (addresses > 1)
for queue in queues:
(_topic, ip_addr) = queue
_addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
_topic, _topic, msg, timeout)
return
return method(_addr, context, _topic, _topic, msg, timeout)
def create_connection(conf, new=True):
return Connection(conf)
def multicall(conf, *args, **kwargs):
"""Multiple calls."""
register_opts(conf)
return _multi_send(_call, *args, **kwargs)
def call(conf, *args, **kwargs):
"""Send a message, expect a response."""
register_opts(conf)
data = _multi_send(_call, *args, **kwargs)
return data[-1]
def cast(conf, *args, **kwargs):
"""Send a message expecting no reply."""
register_opts(conf)
_multi_send(_cast, *args, **kwargs)
def fanout_cast(conf, context, topic, msg, **kwargs):
"""Send a message to all listening and expect no reply."""
register_opts(conf)
# NOTE(ewindisch): fanout~ is used because it avoid splitting on .
# and acts as a non-subtle hint to the matchmaker and ZmqProxy.
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
def notify(conf, context, topic, msg, **kwargs):
"""
Send notification event.
Notifications are sent to topic-priority.
This differs from the AMQP drivers which send to topic.priority.
"""
register_opts(conf)
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
cast(conf, context, topic, msg, **kwargs)
def cleanup():
"""Clean up resources in use by implementation."""
global ZMQ_CTX
global matchmaker
matchmaker = None
ZMQ_CTX.destroy()
ZMQ_CTX = None
def register_opts(conf):
"""Registration of options for this driver."""
#NOTE(ewindisch): ZMQ_CTX and matchmaker
# are initialized here as this is as good
# an initialization method as any.
# We memoize through these globals
global ZMQ_CTX
global matchmaker
global FLAGS
if not FLAGS:
conf.register_opts(zmq_opts)
FLAGS = conf
# Don't re-set, if this method is called twice.
if not ZMQ_CTX:
ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class'
mm_path = conf.rpc_zmq_matchmaker.split('.')
mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1]
# Only initialize a class.
if mm_path[-1][0] not in string.ascii_uppercase:
LOG.error(_("Matchmaker could not be loaded.\n"
"rpc_zmq_matchmaker is not a class."))
raise RPCException(_("Error loading Matchmaker."))
mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor()
register_opts(cfg.CONF)

View File

@ -0,0 +1,258 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import contextlib
import itertools
import json
import logging
from cinder.openstack.common import cfg
from cinder.openstack.common.gettextutils import _
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('matchmaker_ringfile',
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
]
CONF = cfg.CONF
CONF.register_opts(matchmaker_opts)
LOG = logging.getLogger(__name__)
contextmanager = contextlib.contextmanager
class MatchMakerException(Exception):
"""Signified a match could not be found."""
message = _("Match not found by MatchMaker.")
class Exchange(object):
"""
Implements lookups.
Subclass this to support hashtables, dns, etc.
"""
def __init__(self):
pass
def run(self, key):
raise NotImplementedError()
class Binding(object):
"""
A binding on which to perform a lookup.
"""
def __init__(self):
pass
def test(self, key):
raise NotImplementedError()
class MatchMakerBase(object):
"""Match Maker Base Class."""
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
#NOTE(ewindisch): kept the following method in case we implement the
# underlying support.
#def add_negate_binding(self, binding, rule, last=True):
# self.bindings.append((binding, rule, True, last))
def queues(self, key):
workers = []
# bit is for negate bindings - if we choose to implement it.
# last stops processing rules if this matches.
for (binding, exchange, bit, last) in self.bindings:
if binding.test(key):
workers.extend(exchange.run(key))
# Support last.
if last:
return workers
return workers
class DirectBinding(Binding):
"""
Specifies a host in the key via a '.' character
Although dots are used in the key, the behavior here is
that it maps directly to a host, thus direct.
"""
def test(self, key):
if '.' in key:
return True
return False
class TopicBinding(Binding):
"""
Where a 'bare' key without dots.
AMQP generally considers topic exchanges to be those *with* dots,
but we deviate here in terminology as the behavior here matches
that of a topic exchange (whereas where there are dots, behavior
matches that of a direct exchange.
"""
def test(self, key):
if '.' not in key:
return True
return False
class FanoutBinding(Binding):
"""Match on fanout keys, where key starts with 'fanout.' string."""
def test(self, key):
if key.startswith('fanout~'):
return True
return False
class StubExchange(Exchange):
"""Exchange that does nothing."""
def run(self, key):
return [(key, None)]
class RingExchange(Exchange):
"""
Match Maker where hosts are loaded from a static file containing
a hashmap (JSON formatted).
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
"""
def __init__(self, ring=None):
super(RingExchange, self).__init__()
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ringfile, 'r')
self.ring = json.load(fh)
fh.close()
self.ring0 = {}
for k in self.ring.keys():
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
if key in self.ring0:
return True
return False
class RoundRobinRingExchange(RingExchange):
"""A Topic Exchange based on a hashmap."""
def __init__(self, ring=None):
super(RoundRobinRingExchange, self).__init__(ring)
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (key, )
)
return []
host = next(self.ring0[key])
return [(key + '.' + host, host)]
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(FanoutRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "fanout~", strip it for lookup.
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
def __init__(self):
super(Exchange, self).__init__()
def run(self, key):
return [(key.split('.')[0] + '.localhost', 'localhost')]
class DirectExchange(Exchange):
"""
Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host"
"""
def __init__(self):
super(Exchange, self).__init__()
def run(self, key):
b, e = key.split('.', 1)
return [(b, e)]
class MatchMakerRing(MatchMakerBase):
"""
Match Maker where hosts are loaded from a static hashmap.
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
class MatchMakerLocalhost(MatchMakerBase):
"""
Match Maker where all bare topics resolve to localhost.
Useful for testing.
"""
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange())
class MatchMakerStub(MatchMakerBase):
"""
Match Maker where topics are untouched.
Useful for testing, or for AMQP/brokered queues.
Will not work where knowledge of hosts is known (i.e. zeromq)
"""
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange())

View File

@ -22,7 +22,7 @@ For more information about rpc API version numbers, see:
"""
from cinder import rpc
from cinder.openstack.common import rpc
class RpcProxy(object):
@ -127,7 +127,7 @@ class RpcProxy(object):
rpc.fanout_cast(context, self.topic, msg)
def cast_to_server(self, context, server_params, msg, topic=None,
version=None):
version=None):
"""rpc.cast_to_server() a remote method.
:param context: The request context

View File

@ -26,9 +26,8 @@ from cinder import flags
from cinder import log as logging
from cinder.openstack.common import cfg
from cinder.openstack.common import importutils
from cinder.openstack.common import rpc
from cinder.openstack.common import timeutils
from cinder import rpc
from cinder.rpc import common as rpc_common
from cinder import utils

View File

@ -19,13 +19,13 @@ Client side of the scheduler manager RPC API.
"""
from cinder import flags
import cinder.rpc.proxy
import cinder.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
class SchedulerAPI(cinder.rpc.proxy.RpcProxy):
class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the scheduler rpc API.
API version history:

View File

@ -34,7 +34,7 @@ from cinder import flags
from cinder import log as logging
from cinder.openstack.common import cfg
from cinder.openstack.common import importutils
from cinder import rpc
from cinder.openstack.common import rpc
from cinder import utils
from cinder import version
from cinder import wsgi

View File

@ -29,7 +29,7 @@ def set_defaults(conf):
conf.set_default('volume_driver', 'cinder.volume.driver.FakeISCSIDriver')
conf.set_default('connection_type', 'fake')
conf.set_default('fake_rabbit', True)
conf.set_default('rpc_backend', 'cinder.rpc.impl_fake')
conf.set_default('rpc_backend', 'cinder.openstack.common.rpc.impl_fake')
conf.set_default('iscsi_num_targets', 8)
conf.set_default('verbose', True)
conf.set_default('sql_connection', "sqlite://")

View File

@ -1,19 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work
from cinder.tests import *

View File

@ -1,270 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls shared between all implementations
"""
import time
from eventlet import greenthread
import nose
from cinder import context
from cinder import exception
from cinder import flags
from cinder import log as logging
from cinder.rpc import amqp as rpc_amqp
from cinder.rpc import common as rpc_common
from cinder.rpc import dispatcher as rpc_dispatcher
from cinder import test
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
class BaseRpcTestCase(test.TestCase):
def setUp(self, supports_timeouts=True):
super(BaseRpcTestCase, self).setUp()
self.supports_timeouts = supports_timeouts
self.context = context.get_admin_context()
if self.rpc:
self.conn = self.rpc.create_connection(FLAGS, True)
receiver = TestReceiver()
self.dispatcher = rpc_dispatcher.RpcDispatcher([receiver])
self.conn.create_consumer('test', self.dispatcher, False)
self.conn.consume_in_thread()
def tearDown(self):
if self.rpc:
self.conn.close()
super(BaseRpcTestCase, self).tearDown()
def test_call_succeed(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
value = 42
result = self.rpc.call(FLAGS, self.context, 'test',
{"method": "echo", "args": {"value": value}})
self.assertEqual(value, result)
def test_call_succeed_despite_multiple_returns_yield(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
value = 42
result = self.rpc.call(FLAGS, self.context, 'test',
{"method": "echo_three_times_yield",
"args": {"value": value}})
self.assertEqual(value + 2, result)
def test_multicall_succeed_once(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
value = 42
result = self.rpc.multicall(FLAGS, self.context,
'test',
{"method": "echo",
"args": {"value": value}})
for i, x in enumerate(result):
if i > 0:
self.fail('should only receive one response')
self.assertEqual(value + i, x)
def test_multicall_three_nones(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
value = 42
result = self.rpc.multicall(FLAGS, self.context,
'test',
{"method": "multicall_three_nones",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(x, None)
# i should have been 0, 1, and finally 2:
self.assertEqual(i, 2)
def test_multicall_succeed_three_times_yield(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
value = 42
result = self.rpc.multicall(FLAGS, self.context,
'test',
{"method": "echo_three_times_yield",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(value + i, x)
def test_context_passed(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
"""Makes sure a context is passed through rpc call."""
value = 42
result = self.rpc.call(FLAGS, self.context,
'test', {"method": "context",
"args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
def test_nested_calls(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
"""Test that we can do an rpc.call inside another call."""
class Nested(object):
@staticmethod
def echo(context, queue, value):
"""Calls echo in the passed queue"""
LOG.debug(_("Nested received %(queue)s, %(value)s")
% locals())
# TODO(comstud):
# so, it will replay the context and use the same REQID?
# that's bizarre.
ret = self.rpc.call(FLAGS, context,
queue,
{"method": "echo",
"args": {"value": value}})
LOG.debug(_("Nested return %s"), ret)
return value
nested = Nested()
dispatcher = rpc_dispatcher.RpcDispatcher([nested])
conn = self.rpc.create_connection(FLAGS, True)
conn.create_consumer('nested', dispatcher, False)
conn.consume_in_thread()
value = 42
result = self.rpc.call(FLAGS, self.context,
'nested', {"method": "echo",
"args": {"queue": "test",
"value": value}})
conn.close()
self.assertEqual(value, result)
def test_call_timeout(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
"""Make sure rpc.call will time out"""
if not self.supports_timeouts:
raise nose.SkipTest(_("RPC backend does not support timeouts"))
value = 42
self.assertRaises(rpc_common.Timeout,
self.rpc.call,
FLAGS, self.context,
'test',
{"method": "block",
"args": {"value": value}}, timeout=1)
try:
self.rpc.call(FLAGS, self.context,
'test',
{"method": "block",
"args": {"value": value}},
timeout=1)
self.fail("should have thrown Timeout")
except rpc_common.Timeout as exc:
pass
class BaseRpcAMQPTestCase(BaseRpcTestCase):
"""Base test class for all AMQP-based RPC tests"""
def test_proxycallback_handles_exceptions(self):
"""Make sure exceptions unpacking messages don't cause hangs."""
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
orig_unpack = rpc_amqp.unpack_context
info = {'unpacked': False}
def fake_unpack_context(*args, **kwargs):
info['unpacked'] = True
raise test.TestingException('moo')
self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
value = 41
self.rpc.cast(FLAGS, self.context, 'test',
{"method": "echo", "args": {"value": value}})
# Wait for the cast to complete.
for x in xrange(50):
if info['unpacked']:
break
greenthread.sleep(0.1)
else:
self.fail("Timeout waiting for message to be consued")
# Now see if we get a response even though we raised an
# exception for the cast above.
self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
value = 42
result = self.rpc.call(FLAGS, self.context, 'test',
{"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
Uses static methods because we aren't actually storing any state.
"""
@staticmethod
def echo(context, value):
"""Simply returns whatever value is sent in."""
LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context."""
LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
def multicall_three_nones(context, value):
yield None
yield None
yield None
@staticmethod
def echo_three_times_yield(context, value):
yield value
yield value + 1
yield value + 2
@staticmethod
def fail(context, value):
"""Raises an exception with the value sent in."""
raise NotImplementedError(value)
@staticmethod
def fail_converted(context, value):
"""Raises an exception with the value sent in."""
raise exception.ConvertedException(explanation=value)
@staticmethod
def block(context, value):
time.sleep(2)

View File

@ -1,147 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for 'common' functons used through rpc code.
"""
import json
import sys
from cinder import context
from cinder import exception
from cinder import flags
from cinder import log as logging
from cinder import test
from cinder.rpc import amqp as rpc_amqp
from cinder.rpc import common as rpc_common
from cinder.tests.rpc import common
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
def raise_exception():
raise Exception("test")
class FakeUserDefinedException(Exception):
def __init__(self):
Exception.__init__(self, "Test Message")
class RpcCommonTestCase(test.TestCase):
def test_serialize_remote_exception(self):
expected = {
'class': 'Exception',
'module': 'exceptions',
'message': 'test',
}
try:
raise_exception()
except Exception as exc:
failure = rpc_common.serialize_remote_exception(sys.exc_info())
failure = json.loads(failure)
#assure the traceback was added
self.assertEqual(expected['class'], failure['class'])
self.assertEqual(expected['module'], failure['module'])
self.assertEqual(expected['message'], failure['message'])
def test_serialize_remote_cinder_exception(self):
def raise_cinder_exception():
raise exception.CinderException("test", code=500)
expected = {
'class': 'CinderException',
'module': 'cinder.exception',
'kwargs': {'code': 500},
'message': 'test'
}
try:
raise_cinder_exception()
except Exception as exc:
failure = rpc_common.serialize_remote_exception(sys.exc_info())
failure = json.loads(failure)
#assure the traceback was added
self.assertEqual(expected['class'], failure['class'])
self.assertEqual(expected['module'], failure['module'])
self.assertEqual(expected['kwargs'], failure['kwargs'])
self.assertEqual(expected['message'], failure['message'])
def test_deserialize_remote_exception(self):
failure = {
'class': 'CinderException',
'module': 'cinder.exception',
'message': 'test message',
'tb': ['raise CinderException'],
}
serialized = json.dumps(failure)
after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
self.assertTrue(isinstance(after_exc, exception.CinderException))
self.assertTrue('test message' in unicode(after_exc))
#assure the traceback was added
self.assertTrue('raise CinderException' in unicode(after_exc))
def test_deserialize_remote_exception_bad_module(self):
failure = {
'class': 'popen2',
'module': 'os',
'kwargs': {'cmd': '/bin/echo failed'},
'message': 'foo',
}
serialized = json.dumps(failure)
after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
self.assertTrue(isinstance(after_exc, rpc_common.RemoteError))
def test_deserialize_remote_exception_user_defined_exception(self):
"""Ensure a user defined exception can be deserialized."""
self.flags(allowed_rpc_exception_modules=[self.__class__.__module__])
failure = {
'class': 'FakeUserDefinedException',
'module': self.__class__.__module__,
'tb': ['raise FakeUserDefinedException'],
}
serialized = json.dumps(failure)
after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
self.assertTrue(isinstance(after_exc, FakeUserDefinedException))
#assure the traceback was added
self.assertTrue('raise FakeUserDefinedException' in unicode(after_exc))
def test_deserialize_remote_exception_cannot_recreate(self):
"""Ensure a RemoteError is returned on initialization failure.
If an exception cannot be recreated with it's original class then a
RemoteError with the exception informations should still be returned.
"""
self.flags(allowed_rpc_exception_modules=[self.__class__.__module__])
failure = {
'class': 'FakeIDontExistException',
'module': self.__class__.__module__,
'tb': ['raise FakeIDontExistException'],
}
serialized = json.dumps(failure)
after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
self.assertTrue(isinstance(after_exc, rpc_common.RemoteError))
#assure the traceback was added
self.assertTrue('raise FakeIDontExistException' in unicode(after_exc))

View File

@ -1,109 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for rpc.dispatcher
"""
from cinder import context
from cinder.rpc import dispatcher
from cinder.rpc import common as rpc_common
from cinder import test
class RpcDispatcherTestCase(test.TestCase):
class API1(object):
RPC_API_VERSION = '1.0'
def __init__(self):
self.test_method_ctxt = None
self.test_method_arg1 = None
def test_method(self, ctxt, arg1):
self.test_method_ctxt = ctxt
self.test_method_arg1 = arg1
class API2(object):
RPC_API_VERSION = '2.1'
def __init__(self):
self.test_method_ctxt = None
self.test_method_arg1 = None
def test_method(self, ctxt, arg1):
self.test_method_ctxt = ctxt
self.test_method_arg1 = arg1
class API3(object):
RPC_API_VERSION = '3.5'
def __init__(self):
self.test_method_ctxt = None
self.test_method_arg1 = None
def test_method(self, ctxt, arg1):
self.test_method_ctxt = ctxt
self.test_method_arg1 = arg1
def setUp(self):
self.ctxt = context.RequestContext('fake_user', 'fake_project')
super(RpcDispatcherTestCase, self).setUp()
def tearDown(self):
super(RpcDispatcherTestCase, self).tearDown()
def _test_dispatch(self, version, expectations):
v2 = self.API2()
v3 = self.API3()
disp = dispatcher.RpcDispatcher([v2, v3])
disp.dispatch(self.ctxt, version, 'test_method', arg1=1)
self.assertEqual(v2.test_method_ctxt, expectations[0])
self.assertEqual(v2.test_method_arg1, expectations[1])
self.assertEqual(v3.test_method_ctxt, expectations[2])
self.assertEqual(v3.test_method_arg1, expectations[3])
def test_dispatch(self):
self._test_dispatch('2.1', (self.ctxt, 1, None, None))
self._test_dispatch('3.5', (None, None, self.ctxt, 1))
def test_dispatch_lower_minor_version(self):
self._test_dispatch('2.0', (self.ctxt, 1, None, None))
self._test_dispatch('3.1', (None, None, self.ctxt, 1))
def test_dispatch_higher_minor_version(self):
self.assertRaises(rpc_common.UnsupportedRpcVersion,
self._test_dispatch, '2.6', (None, None, None, None))
self.assertRaises(rpc_common.UnsupportedRpcVersion,
self._test_dispatch, '3.6', (None, None, None, None))
def test_dispatch_lower_major_version(self):
self.assertRaises(rpc_common.UnsupportedRpcVersion,
self._test_dispatch, '1.0', (None, None, None, None))
def test_dispatch_higher_major_version(self):
self.assertRaises(rpc_common.UnsupportedRpcVersion,
self._test_dispatch, '4.0', (None, None, None, None))
def test_dispatch_no_version_uses_v1(self):
v1 = self.API1()
disp = dispatcher.RpcDispatcher([v1])
disp.dispatch(self.ctxt, None, 'test_method', arg1=1)
self.assertEqual(v1.test_method_ctxt, self.ctxt)
self.assertEqual(v1.test_method_arg1, 1)

View File

@ -1,33 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using fake_impl
"""
from cinder import log as logging
from cinder.rpc import impl_fake
from cinder.tests.rpc import common
LOG = logging.getLogger(__name__)
class RpcFakeTestCase(common.BaseRpcTestCase):
def setUp(self):
self.rpc = impl_fake
super(RpcFakeTestCase, self).setUp()

View File

@ -1,371 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using kombu
"""
from cinder import context
from cinder import exception
from cinder import flags
from cinder import log as logging
from cinder import test
from cinder.rpc import amqp as rpc_amqp
from cinder.tests.rpc import common
try:
import kombu
from cinder.rpc import impl_kombu
except ImportError:
kombu = None
impl_kombu = None
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
class MyException(Exception):
pass
def _raise_exc_stub(stubs, times, obj, method, exc_msg,
exc_class=MyException):
info = {'called': 0}
orig_method = getattr(obj, method)
def _raise_stub(*args, **kwargs):
info['called'] += 1
if info['called'] <= times:
raise exc_class(exc_msg)
orig_method(*args, **kwargs)
stubs.Set(obj, method, _raise_stub)
return info
class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
def setUp(self):
if kombu:
self.rpc = impl_kombu
else:
self.rpc = None
super(RpcKombuTestCase, self).setUp()
def tearDown(self):
if kombu:
impl_kombu.cleanup()
super(RpcKombuTestCase, self).tearDown()
@test.skip_if(kombu is None, "Test requires kombu")
def test_reusing_connection(self):
"""Test that reusing a connection returns same one."""
conn_context = self.rpc.create_connection(FLAGS, new=False)
conn1 = conn_context.connection
conn_context.close()
conn_context = self.rpc.create_connection(FLAGS, new=False)
conn2 = conn_context.connection
conn_context.close()
self.assertEqual(conn1, conn2)
@test.skip_if(kombu is None, "Test requires kombu")
def test_topic_send_receive(self):
"""Test sending to a topic exchange/queue"""
conn = self.rpc.create_connection(FLAGS)
message = 'topic test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_topic_consumer('a_topic', _callback)
conn.topic_send('a_topic', message)
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
@test.skip_if(kombu is None, "Test requires kombu")
def test_direct_send_receive(self):
"""Test sending to a direct exchange/queue"""
conn = self.rpc.create_connection(FLAGS)
message = 'direct test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_direct_consumer('a_direct', _callback)
conn.direct_send('a_direct', message)
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
@test.skip_if(kombu is None, "Test requires kombu")
def test_cast_interface_uses_default_options(self):
"""Test kombu rpc.cast"""
ctxt = context.RequestContext('fake_user', 'fake_project')
class MyConnection(impl_kombu.Connection):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(myself.params,
{'hostname': FLAGS.rabbit_host,
'userid': FLAGS.rabbit_userid,
'password': FLAGS.rabbit_password,
'port': FLAGS.rabbit_port,
'virtual_host': FLAGS.rabbit_virtual_host,
'transport': 'memory'})
def topic_send(_context, topic, msg):
pass
MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
self.stubs.Set(impl_kombu, 'Connection', MyConnection)
impl_kombu.cast(FLAGS, ctxt, 'fake_topic', {'msg': 'fake'})
@test.skip_if(kombu is None, "Test requires kombu")
def test_cast_to_server_uses_server_params(self):
"""Test kombu rpc.cast"""
ctxt = context.RequestContext('fake_user', 'fake_project')
server_params = {'username': 'fake_username',
'password': 'fake_password',
'hostname': 'fake_hostname',
'port': 31337,
'virtual_host': 'fake_virtual_host'}
class MyConnection(impl_kombu.Connection):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(myself.params,
{'hostname': server_params['hostname'],
'userid': server_params['username'],
'password': server_params['password'],
'port': server_params['port'],
'virtual_host': server_params['virtual_host'],
'transport': 'memory'})
def topic_send(_context, topic, msg):
pass
MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
self.stubs.Set(impl_kombu, 'Connection', MyConnection)
impl_kombu.cast_to_server(FLAGS, ctxt, server_params,
'fake_topic', {'msg': 'fake'})
@test.skip_test("kombu memory transport seems buggy with fanout queues "
"as this test passes when you use rabbit (fake_rabbit=False)")
def test_fanout_send_receive(self):
"""Test sending to a fanout exchange and consuming from 2 queues"""
conn = self.rpc.create_connection()
conn2 = self.rpc.create_connection()
message = 'fanout test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_fanout_consumer('a_fanout', _callback)
conn2.declare_fanout_consumer('a_fanout', _callback)
conn.fanout_send('a_fanout', message)
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
self.received_message = None
conn2.consume(limit=1)
conn2.close()
self.assertEqual(self.received_message, message)
@test.skip_if(kombu is None, "Test requires kombu")
def test_declare_consumer_errors_will_reconnect(self):
# Test that any exception with 'timeout' in it causes a
# reconnection
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
'__init__', 'foo timeout foo')
conn = self.rpc.Connection(FLAGS)
result = conn.declare_consumer(self.rpc.DirectConsumer,
'test_topic', None)
self.assertEqual(info['called'], 3)
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
# Test that any exception in transport.connection_errors causes
# a reconnection
self.stubs.UnsetAll()
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer,
'__init__', 'meow')
conn = self.rpc.Connection(FLAGS)
conn.connection_errors = (MyException, )
result = conn.declare_consumer(self.rpc.DirectConsumer,
'test_topic', None)
self.assertEqual(info['called'], 2)
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
@test.skip_if(kombu is None, "Test requires kombu")
def test_declare_consumer_ioerrors_will_reconnect(self):
"""Test that an IOError exception causes a reconnection"""
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
'__init__', 'Socket closed', exc_class=IOError)
conn = self.rpc.Connection(FLAGS)
result = conn.declare_consumer(self.rpc.DirectConsumer,
'test_topic', None)
self.assertEqual(info['called'], 3)
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
@test.skip_if(kombu is None, "Test requires kombu")
def test_publishing_errors_will_reconnect(self):
# Test that any exception with 'timeout' in it causes a
# reconnection when declaring the publisher class and when
# calling send()
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
'__init__', 'foo timeout foo')
conn = self.rpc.Connection(FLAGS)
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
self.assertEqual(info['called'], 3)
self.stubs.UnsetAll()
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
'send', 'foo timeout foo')
conn = self.rpc.Connection(FLAGS)
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
self.assertEqual(info['called'], 3)
# Test that any exception in transport.connection_errors causes
# a reconnection when declaring the publisher class and when
# calling send()
self.stubs.UnsetAll()
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
'__init__', 'meow')
conn = self.rpc.Connection(FLAGS)
conn.connection_errors = (MyException, )
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
self.assertEqual(info['called'], 2)
self.stubs.UnsetAll()
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
'send', 'meow')
conn = self.rpc.Connection(FLAGS)
conn.connection_errors = (MyException, )
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
self.assertEqual(info['called'], 2)
@test.skip_test("kombu memory transport hangs here on precise")
@test.skip_if(kombu is None, "Test requires kombu")
def test_iterconsume_errors_will_reconnect(self):
conn = self.rpc.Connection(FLAGS)
message = 'reconnect test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_direct_consumer('a_direct', _callback)
conn.direct_send('a_direct', message)
info = _raise_exc_stub(self.stubs, 1, conn.connection,
'drain_events', 'foo timeout foo')
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
# Only called once, because our stub goes away during reconnection
@test.skip_if(kombu is None, "Test requires kombu")
def test_call_exception(self):
"""Test that exception gets passed back properly.
rpc.call returns an Exception object. The value of the
exception is converted to a string.
"""
self.flags(allowed_rpc_exception_modules=['exceptions'])
value = "This is the exception message"
self.assertRaises(NotImplementedError,
self.rpc.call,
FLAGS,
self.context,
'test',
{"method": "fail",
"args": {"value": value}})
try:
self.rpc.call(FLAGS, self.context,
'test',
{"method": "fail",
"args": {"value": value}})
self.fail("should have thrown Exception")
except NotImplementedError as exc:
self.assertTrue(value in unicode(exc))
#Traceback should be included in exception message
self.assertTrue('raise NotImplementedError(value)' in unicode(exc))
@test.skip_if(kombu is None, "Test requires kombu")
def test_call_converted_exception(self):
"""Test that exception gets passed back properly.
rpc.call returns an Exception object. The value of the
exception is converted to a string.
"""
value = "This is the exception message"
self.assertRaises(exception.ConvertedException,
self.rpc.call,
FLAGS,
self.context,
'test',
{"method": "fail_converted",
"args": {"value": value}})
try:
self.rpc.call(FLAGS, self.context,
'test',
{"method": "fail_converted",
"args": {"value": value}})
self.fail("should have thrown Exception")
except exception.ConvertedException as exc:
self.assertTrue(value in unicode(exc))
#Traceback should be included in exception message
self.assertTrue('exception.ConvertedException' in unicode(exc))

View File

@ -1,66 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using kombu + ssl
"""
from cinder import flags
from cinder import test
try:
import kombu
from cinder.rpc import impl_kombu
except ImportError:
kombu = None
impl_kombu = None
# Flag settings we will ensure get passed to amqplib
SSL_VERSION = "SSLv2"
SSL_CERT = "/tmp/cert.blah.blah"
SSL_CA_CERT = "/tmp/cert.ca.blah.blah"
SSL_KEYFILE = "/tmp/keyfile.blah.blah"
FLAGS = flags.FLAGS
class RpcKombuSslTestCase(test.TestCase):
def setUp(self):
super(RpcKombuSslTestCase, self).setUp()
if kombu:
self.flags(kombu_ssl_keyfile=SSL_KEYFILE,
kombu_ssl_ca_certs=SSL_CA_CERT,
kombu_ssl_certfile=SSL_CERT,
kombu_ssl_version=SSL_VERSION,
rabbit_use_ssl=True)
@test.skip_if(kombu is None, "Test requires kombu")
def test_ssl_on_extended(self):
rpc = impl_kombu
conn = rpc.create_connection(FLAGS, True)
c = conn.connection
#This might be kombu version dependent...
#Since we are now peaking into the internals of kombu...
self.assertTrue(isinstance(c.connection.ssl, dict))
self.assertEqual(SSL_VERSION, c.connection.ssl.get("ssl_version"))
self.assertEqual(SSL_CERT, c.connection.ssl.get("certfile"))
self.assertEqual(SSL_CA_CERT, c.connection.ssl.get("ca_certs"))
self.assertEqual(SSL_KEYFILE, c.connection.ssl.get("keyfile"))
#That hash then goes into amqplib which then goes
#Into python ssl creation...

View File

@ -1,124 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for rpc.proxy
"""
import copy
from cinder import context
from cinder import rpc
from cinder.rpc import proxy
from cinder import test
class RpcProxyTestCase(test.TestCase):
def setUp(self):
super(RpcProxyTestCase, self).setUp()
def tearDown(self):
super(RpcProxyTestCase, self).tearDown()
def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False,
server_params=None, supports_topic_override=True):
topic = 'fake_topic'
timeout = 123
rpc_proxy = proxy.RpcProxy(topic, '1.0')
ctxt = context.RequestContext('fake_user', 'fake_project')
msg = {'method': 'fake_method', 'args': {'x': 'y'}}
expected_msg = {'method': 'fake_method', 'args': {'x': 'y'},
'version': '1.0'}
expected_retval = 'hi' if has_retval else None
self.fake_args = None
self.fake_kwargs = None
def _fake_rpc_method(*args, **kwargs):
self.fake_args = args
self.fake_kwargs = kwargs
if has_retval:
return expected_retval
self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
args = [ctxt, msg]
if server_params:
args.insert(1, server_params)
# Base method usage
retval = getattr(rpc_proxy, rpc_method)(*args)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg]
if server_params:
expected_args.insert(1, server_params)
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
# overriding the version
retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1')
self.assertEqual(retval, expected_retval)
new_msg = copy.deepcopy(expected_msg)
new_msg['version'] = '1.1'
expected_args = [ctxt, topic, new_msg]
if server_params:
expected_args.insert(1, server_params)
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
if has_timeout:
# set a timeout
retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg, timeout]
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
if supports_topic_override:
# set a topic
new_topic = 'foo.bar'
retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, new_topic, expected_msg]
if server_params:
expected_args.insert(1, server_params)
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
def test_call(self):
self._test_rpc_method('call', has_timeout=True, has_retval=True)
def test_multicall(self):
self._test_rpc_method('multicall', has_timeout=True, has_retval=True)
def test_cast(self):
self._test_rpc_method('cast')
def test_fanout_cast(self):
self._test_rpc_method('fanout_cast', supports_topic_override=False)
def test_cast_to_server(self):
self._test_rpc_method('cast_to_server', server_params={'blah': 1})
def test_fanout_cast_to_server(self):
self._test_rpc_method('fanout_cast_to_server',
server_params={'blah': 1}, supports_topic_override=False)
def test_make_msg(self):
self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2),
{'method': 'test_method', 'args': {'a': 1, 'b': 2}})

View File

@ -1,343 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using qpid
"""
import mox
from cinder import context
from cinder import flags
from cinder import log as logging
from cinder.rpc import amqp as rpc_amqp
from cinder import test
try:
import qpid
from cinder.rpc import impl_qpid
except ImportError:
qpid = None
impl_qpid = None
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
class RpcQpidTestCase(test.TestCase):
"""
Exercise the public API of impl_qpid utilizing mox.
This set of tests utilizes mox to replace the Qpid objects and ensures
that the right operations happen on them when the various public rpc API
calls are exercised. The API calls tested here include:
cinder.rpc.create_connection()
cinder.rpc.common.Connection.create_consumer()
cinder.rpc.common.Connection.close()
cinder.rpc.cast()
cinder.rpc.fanout_cast()
cinder.rpc.call()
cinder.rpc.multicall()
"""
def setUp(self):
super(RpcQpidTestCase, self).setUp()
self.mock_connection = None
self.mock_session = None
self.mock_sender = None
self.mock_receiver = None
if qpid:
self.orig_connection = qpid.messaging.Connection
self.orig_session = qpid.messaging.Session
self.orig_sender = qpid.messaging.Sender
self.orig_receiver = qpid.messaging.Receiver
qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection
qpid.messaging.Session = lambda *_x, **_y: self.mock_session
qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender
qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver
def tearDown(self):
if qpid:
qpid.messaging.Connection = self.orig_connection
qpid.messaging.Session = self.orig_session
qpid.messaging.Sender = self.orig_sender
qpid.messaging.Receiver = self.orig_receiver
if impl_qpid:
# Need to reset this in case we changed the connection_cls
# in self._setup_to_server_tests()
impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
super(RpcQpidTestCase, self).tearDown()
@test.skip_if(qpid is None, "Test requires qpid")
def test_create_connection(self):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_connection.opened().AndReturn(False)
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
self.mock_connection.close()
self.mox.ReplayAll()
connection = impl_qpid.create_connection(FLAGS)
connection.close()
def _test_create_consumer(self, fanout):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
self.mock_connection.opened().AndReturn(False)
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
if fanout:
# The link name includes a UUID, so match it with a regex.
expected_address = mox.Regex(r'^impl_qpid_test_fanout ; '
'{"node": {"x-declare": {"auto-delete": true, "durable": '
'false, "type": "fanout"}, "type": "topic"}, "create": '
'"always", "link": {"x-declare": {"auto-delete": true, '
'"exclusive": true, "durable": false}, "durable": true, '
'"name": "impl_qpid_test_fanout_.*"}}$')
else:
expected_address = (
'cinder/impl_qpid_test ; {"node": {"x-declare": '
'{"auto-delete": true, "durable": true}, "type": "topic"}, '
'"create": "always", "link": {"x-declare": {"auto-delete": '
'true, "exclusive": false, "durable": false}, "durable": '
'true, "name": "impl_qpid_test"}}')
self.mock_session.receiver(expected_address).AndReturn(
self.mock_receiver)
self.mock_receiver.capacity = 1
self.mock_connection.close()
self.mox.ReplayAll()
connection = impl_qpid.create_connection(FLAGS)
connection.create_consumer("impl_qpid_test",
lambda *_x, **_y: None,
fanout)
connection.close()
@test.skip_if(qpid is None, "Test requires qpid")
def test_create_consumer(self):
self._test_create_consumer(fanout=False)
@test.skip_if(qpid is None, "Test requires qpid")
def test_create_consumer_fanout(self):
self._test_create_consumer(fanout=True)
def _test_cast(self, fanout, server_params=None):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_sender = self.mox.CreateMock(self.orig_sender)
self.mock_connection.opened().AndReturn(False)
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
if fanout:
expected_address = ('impl_qpid_test_fanout ; '
'{"node": {"x-declare": {"auto-delete": true, '
'"durable": false, "type": "fanout"}, '
'"type": "topic"}, "create": "always"}')
else:
expected_address = (
'cinder/impl_qpid_test ; {"node": {"x-declare": '
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
'"create": "always"}')
self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
if not server_params:
# This is a pooled connection, so instead of closing it, it
# gets reset, which is just creating a new session on the
# connection.
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
self.mox.ReplayAll()
try:
ctx = context.RequestContext("user", "project")
args = [FLAGS, ctx, "impl_qpid_test",
{"method": "test_method", "args": {}}]
if server_params:
args.insert(2, server_params)
if fanout:
method = impl_qpid.fanout_cast_to_server
else:
method = impl_qpid.cast_to_server
else:
if fanout:
method = impl_qpid.fanout_cast
else:
method = impl_qpid.cast
method(*args)
finally:
while impl_qpid.Connection.pool.free_items:
# Pull the mock connection object out of the connection pool so
# that it doesn't mess up other test cases.
impl_qpid.Connection.pool.get()
@test.skip_if(qpid is None, "Test requires qpid")
def test_cast(self):
self._test_cast(fanout=False)
@test.skip_if(qpid is None, "Test requires qpid")
def test_fanout_cast(self):
self._test_cast(fanout=True)
def _setup_to_server_tests(self, server_params):
class MyConnection(impl_qpid.Connection):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(myself.connection.username,
server_params['username'])
self.assertEqual(myself.connection.password,
server_params['password'])
self.assertEqual(myself.broker,
server_params['hostname'] + ':' +
str(server_params['port']))
MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
self.stubs.Set(impl_qpid, 'Connection', MyConnection)
@test.skip_if(qpid is None, "Test requires qpid")
def test_cast_to_server(self):
server_params = {'username': 'fake_username',
'password': 'fake_password',
'hostname': 'fake_hostname',
'port': 31337}
self._setup_to_server_tests(server_params)
self._test_cast(fanout=False, server_params=server_params)
@test.skip_if(qpid is None, "Test requires qpid")
def test_fanout_cast_to_server(self):
server_params = {'username': 'fake_username',
'password': 'fake_password',
'hostname': 'fake_hostname',
'port': 31337}
self._setup_to_server_tests(server_params)
self._test_cast(fanout=True, server_params=server_params)
def _test_call(self, multi):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_sender = self.mox.CreateMock(self.orig_sender)
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
self.mock_connection.opened().AndReturn(False)
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
rcv_addr = mox.Regex(r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
' true, "durable": true, "type": "direct"}, "type": '
'"topic"}, "create": "always", "link": {"x-declare": '
'{"auto-delete": true, "exclusive": true, "durable": '
'false}, "durable": true, "name": ".*"}}')
self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
self.mock_receiver.capacity = 1
send_addr = ('cinder/impl_qpid_test ; {"node": {"x-declare": '
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
'"create": "always"}')
self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
{"result": "foo", "failure": False, "ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
if multi:
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
qpid.messaging.Message(
{"result": "bar", "failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
qpid.messaging.Message(
{"result": "baz", "failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
{"failure": False, "ending": True}))
self.mock_session.acknowledge(mox.IgnoreArg())
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
self.mox.ReplayAll()
try:
ctx = context.RequestContext("user", "project")
if multi:
method = impl_qpid.multicall
else:
method = impl_qpid.call
res = method(FLAGS, ctx, "impl_qpid_test",
{"method": "test_method", "args": {}})
if multi:
self.assertEquals(list(res), ["foo", "bar", "baz"])
else:
self.assertEquals(res, "foo")
finally:
while impl_qpid.Connection.pool.free_items:
# Pull the mock connection object out of the connection pool so
# that it doesn't mess up other test cases.
impl_qpid.Connection.pool.get()
@test.skip_if(qpid is None, "Test requires qpid")
def test_call(self):
self._test_call(multi=False)
@test.skip_if(qpid is None, "Test requires qpid")
def test_multicall(self):
self._test_call(multi=True)
#
#from cinder.tests.rpc import common
#
# Qpid does not have a handy in-memory transport like kombu, so it's not
# terribly straight forward to take advantage of the common unit tests.
# However, at least at the time of this writing, the common unit tests all pass
# with qpidd running.
#
# class RpcQpidCommonTestCase(common._BaseRpcTestCase):
# def setUp(self):
# self.rpc = impl_qpid
# super(RpcQpidCommonTestCase, self).setUp()
#
# def tearDown(self):
# super(RpcQpidCommonTestCase, self).tearDown()
#

View File

@ -20,7 +20,7 @@ Unit Tests for cinder.scheduler.rpcapi
from cinder import context
from cinder import flags
from cinder import rpc
from cinder.openstack.common import rpc
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import test

View File

@ -27,9 +27,9 @@ from cinder import db
from cinder import exception
from cinder import flags
from cinder.notifier import api as notifier
from cinder import rpc
from cinder.openstack.common import rpc
from cinder.openstack.common.rpc import common as rpc_common
from cinder.openstack.common import timeutils
from cinder.rpc import common as rpc_common
from cinder.scheduler import driver
from cinder.scheduler import manager
from cinder import test

View File

@ -67,7 +67,7 @@ class NotifierTestCase(test.TestCase):
def mock_notify(cls, *args):
self.mock_notify = True
self.stubs.Set(cinder.rpc, 'notify', mock_notify)
self.stubs.Set(cinder.openstack.common.rpc, 'notify', mock_notify)
notifier_api.notify('publisher_id', 'event_type',
cinder.notifier.api.WARN, dict(a=3))
@ -90,7 +90,7 @@ class NotifierTestCase(test.TestCase):
def mock_notify(context, topic, msg):
self.test_topic = topic
self.stubs.Set(cinder.rpc, 'notify', mock_notify)
self.stubs.Set(cinder.openstack.common.rpc, 'notify', mock_notify)
notifier_api.notify('publisher_id', 'event_type', 'DEBUG', dict(a=3))
self.assertEqual(self.test_topic, 'testnotify.debug')
@ -105,7 +105,7 @@ class NotifierTestCase(test.TestCase):
def mock_notify(context, topic, data):
msgs.append(data)
self.stubs.Set(cinder.rpc, 'notify', mock_notify)
self.stubs.Set(cinder.openstack.common.rpc, 'notify', mock_notify)
LOG.error('foo')
self.assertEqual(1, len(msgs))
msg = msgs[0]

View File

@ -18,10 +18,10 @@
from cinder import context
from cinder import db
from cinder import exception
from cinder import flags
from cinder import quota
from cinder import exception
from cinder import rpc
from cinder.openstack.common import rpc
from cinder import test
from cinder import volume
from cinder.scheduler import driver as scheduler_driver

View File

@ -18,7 +18,7 @@
"""Tests for the testing base code."""
from cinder import rpc
from cinder.openstack.common import rpc
from cinder import test

View File

@ -30,8 +30,8 @@ from cinder import db
from cinder import flags
from cinder import log as logging
from cinder.openstack.common import importutils
from cinder.openstack.common import rpc
import cinder.policy
from cinder import rpc
from cinder import test
import cinder.volume.api

View File

@ -27,10 +27,10 @@ from eventlet import greenthread
from cinder import exception
from cinder import flags
from cinder import log as logging
from cinder.openstack.common import rpc
import cinder.policy
from cinder.openstack.common import timeutils
from cinder import quota
from cinder import rpc
from cinder import utils
from cinder.db import base

View File

@ -46,8 +46,8 @@ from cinder import manager
from cinder.openstack.common import cfg
from cinder.openstack.common import excutils
from cinder.openstack.common import importutils
from cinder.openstack.common import rpc
from cinder.openstack.common import timeutils
from cinder import rpc
from cinder import utils
from cinder.volume import volume_types

View File

@ -1,7 +1,7 @@
[DEFAULT]
# The list of modules to copy from openstack-common
modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,timeutils
modules=cfg,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,rpc,timeutils
# The base module to hold the copy of openstack.common
base=cinder