Use rpc from openstack-common.

Final patch for blueprint common-rpc.

This patch removes nova.rpc in favor of the copy in openstack-common.

Change-Id: I9c2f6bdbe8cd0c44417f75284131dbf3c126d1dd
This commit is contained in:
Russell Bryant 2012-06-13 10:48:54 -04:00
parent 83e6cf7b92
commit ba3754e3ff
66 changed files with 114 additions and 1867 deletions

View File

@ -45,7 +45,7 @@ from nova import exception
from nova import flags
from nova import log as logging
from nova.openstack.common import cfg
from nova import rpc
from nova.openstack.common import rpc
delete_exchange_opt = cfg.BoolOpt('delete_exchange',

View File

@ -41,7 +41,7 @@ from nova import flags
from nova import log as logging
from nova.network import linux_net
from nova.openstack.common import importutils
from nova import rpc
from nova.openstack.common import rpc
from nova import utils
FLAGS = flags.FLAGS

View File

@ -56,7 +56,7 @@ from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import rpc
from nova.openstack.common import rpc
from nova import utils

View File

@ -90,9 +90,9 @@ from nova import log as logging
from nova.openstack.common import cfg
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import rpc
from nova.openstack.common import timeutils
from nova import quota
from nova import rpc
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import utils
from nova import version

View File

@ -35,8 +35,8 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'nova', '__init__.py')):
from nova import exception
from nova.openstack.common import cfg
from nova import rpc
from nova.rpc import impl_zmq
from nova.openstack.common import rpc
from nova.openstack.common.rpc import impl_zmq
from nova import utils
CONF = cfg.CONF

View File

@ -55,7 +55,7 @@ from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import rpc
from nova.openstack.common import rpc
from nova import utils
from nova.volume import utils as volume_utils

View File

@ -33,7 +33,7 @@ if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
from nova import flags
from nova import log as logging
from nova import rpc
from nova.openstack.common import rpc
from nova import service
from nova.vnc import xvp_proxy

View File

@ -32,8 +32,8 @@ from nova.compute import instance_types
from nova import exception
from nova import flags
from nova import log as logging
from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common import timeutils
from nova.rpc import common as rpc_common
from nova import utils

View File

@ -19,13 +19,13 @@ Client side of the cert manager RPC API.
"""
from nova import flags
import nova.rpc.proxy
import nova.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
class CertAPI(nova.rpc.proxy.RpcProxy):
class CertAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the cert rpc API.
API version history:

View File

@ -68,8 +68,8 @@ from nova.openstack.common import cfg
from nova.openstack.common import excutils
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import rpc
from nova.openstack.common import timeutils
from nova import rpc
from nova import utils
from nova.virt import driver
from nova import volume

View File

@ -20,8 +20,8 @@ Client side of the compute RPC API.
from nova import exception
from nova import flags
from nova import rpc
import nova.rpc.proxy
from nova.openstack.common import rpc
import nova.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
@ -48,7 +48,7 @@ def _compute_topic(topic, ctxt, host, instance):
return rpc.queue_get_for(ctxt, topic, host)
class ComputeAPI(nova.rpc.proxy.RpcProxy):
class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the compute rpc API.
API version history:
@ -358,7 +358,7 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy):
topic=_compute_topic(self.topic, ctxt, None, instance))
class SecurityGroupAPI(nova.rpc.proxy.RpcProxy):
class SecurityGroupAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the security group rpc API.
API version history:

View File

@ -21,7 +21,7 @@ from nova.compute import rpcapi as compute_rpcapi
from nova.console import rpcapi as console_rpcapi
from nova.db import base
from nova import flags
from nova import rpc
from nova.openstack.common import rpc
from nova import utils

View File

@ -19,13 +19,13 @@ Client side of the console RPC API.
"""
from nova import flags
import nova.rpc.proxy
import nova.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
class ConsoleAPI(nova.rpc.proxy.RpcProxy):
class ConsoleAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the console rpc API.
API version history:

View File

@ -19,13 +19,13 @@ Client side of the consoleauth RPC API.
"""
from nova import flags
import nova.rpc.proxy
import nova.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
class ConsoleAuthAPI(nova.rpc.proxy.RpcProxy):
class ConsoleAuthAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the consoleauth rpc API.
API version history:

View File

@ -56,7 +56,7 @@ This module provides Manager, a base class for managers.
from nova.db import base
from nova import flags
from nova import log as logging
from nova.rpc import dispatcher as rpc_dispatcher
from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import version

View File

@ -23,7 +23,7 @@ from nova.db import base
from nova import flags
from nova import log as logging
from nova.network import model as network_model
from nova import rpc
from nova.openstack.common import rpc
FLAGS = flags.FLAGS

View File

@ -67,10 +67,10 @@ from nova.openstack.common import cfg
from nova.openstack.common import excutils
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import rpc
from nova.openstack.common import timeutils
import nova.policy
from nova import quota
from nova import rpc
from nova import utils

View File

@ -28,7 +28,7 @@ from nova.network import manager
from nova.network.quantum import melange_ipam_lib
from nova.network.quantum import quantum_connection
from nova.openstack.common import cfg
from nova import rpc
from nova.openstack.common import rpc
from nova import utils
LOG = logging.getLogger(__name__)

View File

@ -19,7 +19,7 @@ import nova.context
from nova import flags
from nova import log as logging
from nova.openstack.common import cfg
from nova import rpc
from nova.openstack.common import rpc
LOG = logging.getLogger(__name__)

View File

@ -31,7 +31,7 @@ from nova.openstack.common import importutils
rpc_opts = [
cfg.StrOpt('rpc_backend',
default='nova.rpc.impl_kombu',
default='%s.impl_kombu' % __package__,
help="The messaging module to use, defaults to kombu."),
cfg.IntOpt('rpc_thread_pool_size',
default=64,
@ -47,9 +47,9 @@ rpc_opts = [
help='Seconds to wait before a cast expires (TTL). '
'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules',
default=['nova.exception'],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
default=['openstack.common.exception', 'nova.exception'],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
cfg.StrOpt('control_exchange',
default='nova',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
@ -72,7 +72,7 @@ def create_connection(new=True):
implementation is free to return an existing connection from a
pool.
:returns: An instance of nova.rpc.common.Connection
:returns: An instance of openstack.common.rpc.common.Connection
"""
return _get_impl().create_connection(cfg.CONF, new=new)
@ -84,8 +84,9 @@ def call(context, topic, msg, timeout=None):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
nova.rpc.common.Connection.create_consumer() and only applies
when the consumer was created with fanout=False.
openstack.common.rpc.common.Connection.create_consumer()
and only applies when the consumer was created with
fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
@ -93,8 +94,8 @@ def call(context, topic, msg, timeout=None):
:returns: A dict from the remote method.
:raises: nova.rpc.common.Timeout if a complete response is not received
before the timeout is reached.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
@ -106,8 +107,9 @@ def cast(context, topic, msg):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
nova.rpc.common.Connection.create_consumer() and only applies
when the consumer was created with fanout=False.
openstack.common.rpc.common.Connection.create_consumer()
and only applies when the consumer was created with
fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
@ -126,8 +128,9 @@ def fanout_cast(context, topic, msg):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
nova.rpc.common.Connection.create_consumer() and only applies
when the consumer was created with fanout=True.
openstack.common.rpc.common.Connection.create_consumer()
and only applies when the consumer was created with
fanout=True.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
@ -147,8 +150,9 @@ def multicall(context, topic, msg, timeout=None):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
nova.rpc.common.Connection.create_consumer() and only applies
when the consumer was created with fanout=False.
openstack.common.rpc.common.Connection.create_consumer()
and only applies when the consumer was created with
fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
@ -159,8 +163,8 @@ def multicall(context, topic, msg, timeout=None):
returned and X is the Nth value that was returned by the remote
method.
:raises: nova.rpc.common.Timeout if a complete response is not received
before the timeout is reached.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
@ -248,5 +252,11 @@ def _get_impl():
"""Delay import of rpc_backend until configuration is loaded."""
global _RPCIMPL
if _RPCIMPL is None:
_RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
try:
_RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
except ImportError:
# For backwards compatibility with older nova config.
impl = cfg.CONF.rpc_backend.replace('nova.rpc',
'nova.openstack.common.rpc')
_RPCIMPL = importutils.import_module(impl)
return _RPCIMPL

View File

@ -18,7 +18,7 @@
# under the License.
"""
Shared code between AMQP based nova.rpc implementations.
Shared code between AMQP based openstack.common.rpc implementations.
The code in this module is shared between the rpc implemenations based on AMQP.
Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
@ -36,7 +36,7 @@ from eventlet import semaphore
from nova.openstack.common import excutils
from nova.openstack.common import local
import nova.rpc.common as rpc_common
from nova.openstack.common.rpc import common as rpc_common
LOG = logging.getLogger(__name__)

View File

@ -19,8 +19,10 @@
import copy
import logging
import sys
import traceback
from nova.openstack.common import cfg
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import local

View File

@ -42,7 +42,7 @@ there can be both versioned and unversioned APIs implemented in the same code
base.
"""
from nova.rpc import common as rpc_common
from nova.openstack.common.rpc import common as rpc_common
class RpcDispatcher(object):

View File

@ -18,12 +18,12 @@ queues. Casts will block, but this is very useful for tests.
"""
import inspect
import json
import time
import eventlet
from nova.openstack.common import jsonutils
from nova.rpc import common as rpc_common
from nova.openstack.common.rpc import common as rpc_common
CONSUMERS = {}
@ -121,7 +121,7 @@ def create_connection(conf, new=True):
def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized."""
jsonutils.dumps(msg)
json.dumps(msg)
def multicall(conf, context, topic, msg, timeout=None):

View File

@ -30,8 +30,8 @@ import kombu.entity
import kombu.messaging
from nova.openstack.common import cfg
from nova.rpc import amqp as rpc_amqp
from nova.rpc import common as rpc_common
from nova.openstack.common.rpc import amqp as rpc_amqp
from nova.openstack.common.rpc import common as rpc_common
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
@ -139,10 +139,9 @@ class ConsumerBase(object):
message = self.channel.message_to_python(raw_message)
try:
callback(message.payload)
message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
message.ack()
self.queue.consume(*args, callback=_callback, **options)

View File

@ -17,6 +17,7 @@
import functools
import itertools
import json
import logging
import time
import uuid
@ -27,9 +28,9 @@ import qpid.messaging
import qpid.messaging.exceptions
from nova.openstack.common import cfg
from nova.openstack.common import jsonutils
from nova.rpc import amqp as rpc_amqp
from nova.rpc import common as rpc_common
from nova.openstack.common.gettextutils import _
from nova.openstack.common.rpc import amqp as rpc_amqp
from nova.openstack.common.rpc import common as rpc_common
LOG = logging.getLogger(__name__)
@ -124,7 +125,7 @@ class ConsumerBase(object):
addr_opts["node"]["x-declare"].update(node_opts)
addr_opts["link"]["x-declare"].update(link_opts)
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
self.reconnect(session)
@ -227,7 +228,7 @@ class Publisher(object):
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
self.reconnect(session)

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import pprint
import string
import sys
@ -25,9 +26,9 @@ from eventlet.green import zmq
import greenlet
from nova.openstack.common import cfg
from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.rpc import common as rpc_common
from nova.openstack.common.rpc import common as rpc_common
# for convenience, are not modified.
@ -45,7 +46,7 @@ zmq_opts = [
# The module.Class to use for matchmaking.
cfg.StrOpt('rpc_zmq_matchmaker',
default='nova.rpc.matchmaker.MatchMakerLocalhost',
default='openstack.common.rpc.matchmaker.MatchMakerLocalhost',
help='MatchMaker driver'),
# The following port is unassigned by IANA as of 2012-05-21
@ -55,7 +56,7 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/nova',
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
]
@ -74,7 +75,7 @@ def _serialize(data):
Error if a developer passes us bad data.
"""
try:
return str(jsonutils.dumps(data))
return str(json.dumps(data, ensure_ascii=True))
except TypeError:
LOG.error(_("JSON serialization failed."))
raise
@ -85,7 +86,7 @@ def _deserialize(data):
Deserialization wrapper
"""
LOG.debug(_("Deserializing: %s"), data)
return jsonutils.loads(data)
return json.loads(data)
class ZmqSocket(object):

View File

@ -22,7 +22,7 @@ For more information about rpc API version numbers, see:
"""
from nova import rpc
from nova.openstack.common import rpc
class RpcProxy(object):

View File

@ -33,8 +33,8 @@ from nova import notifications
from nova.openstack.common import cfg
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import rpc
from nova.openstack.common import timeutils
from nova import rpc
from nova import utils

View File

@ -19,13 +19,13 @@ Client side of the scheduler manager RPC API.
"""
from nova import flags
import nova.rpc.proxy
import nova.openstack.common.rpc.proxy
FLAGS = flags.FLAGS
class SchedulerAPI(nova.rpc.proxy.RpcProxy):
class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy):
'''Client side of the scheduler rpc API.
API version history:

View File

@ -35,7 +35,7 @@ from nova import flags
from nova import log as logging
from nova.openstack.common import cfg
from nova.openstack.common import importutils
from nova import rpc
from nova.openstack.common import rpc
from nova import utils
from nova import version
from nova import wsgi

View File

@ -40,7 +40,7 @@ from nova.image import fake
from nova.image import s3
from nova import log as logging
from nova.network import api as network_api
from nova import rpc
from nova.openstack.common import rpc
from nova import test
from nova import utils

View File

@ -25,7 +25,7 @@ from nova import flags
from nova.image import fake
from nova import log as logging
from nova.openstack.common import importutils
from nova import rpc
from nova.openstack.common import rpc
from nova import test
LOG = logging.getLogger(__name__)

View File

@ -17,7 +17,7 @@ from lxml import etree
from nova.api.openstack.compute.contrib import certificates
from nova import context
from nova import rpc
from nova.openstack.common import rpc
from nova import test
from nova.tests.api.openstack import fakes

View File

@ -21,7 +21,7 @@ from nova.api.openstack import compute
import nova.db.api
from nova import flags
from nova.openstack.common import jsonutils
import nova.rpc
import nova.openstack.common.rpc
from nova import test
from nova.tests.api.openstack import fakes

View File

@ -24,7 +24,7 @@ from nova import context
from nova import db
from nova import exception
from nova import network
from nova import rpc
from nova.openstack.common import rpc
from nova import test
from nova.tests.api.openstack import fakes
from nova.tests import fake_network

View File

@ -18,7 +18,7 @@
from nova.api.openstack import compute
import nova.db.api
from nova.openstack.common import jsonutils
import nova.rpc
import nova.openstack.common.rpc
from nova import test
from nova.tests.api.openstack import fakes

View File

@ -37,7 +37,7 @@ from nova.db.sqlalchemy import models
from nova import flags
import nova.image.fake
from nova.openstack.common import jsonutils
import nova.rpc
import nova.openstack.common.rpc
from nova import test
from nova.tests.api.openstack import fakes
from nova.tests import fake_network
@ -1493,11 +1493,12 @@ class ServersControllerCreateTest(test.TestCase):
self.stubs.Set(nova.db, 'instance_system_metadata_update',
fake_method)
self.stubs.Set(nova.db, 'instance_get', instance_get)
self.stubs.Set(nova.rpc, 'cast', fake_method)
self.stubs.Set(nova.rpc, 'call', rpc_call_wrapper)
self.stubs.Set(nova.openstack.common.rpc, 'cast', fake_method)
self.stubs.Set(nova.openstack.common.rpc, 'call', rpc_call_wrapper)
self.stubs.Set(nova.db, 'instance_update_and_get_original',
server_update)
self.stubs.Set(nova.rpc, 'queue_get_for', queue_get_for)
self.stubs.Set(nova.openstack.common.rpc, 'queue_get_for',
queue_get_for)
self.stubs.Set(nova.network.manager.VlanManager, 'allocate_fixed_ip',
fake_method)

View File

@ -21,7 +21,7 @@ Unit Tests for nova.cert.rpcapi
from nova.cert import rpcapi as cert_rpcapi
from nova import context
from nova import flags
from nova import rpc
from nova.openstack.common import rpc
from nova import test

View File

@ -45,10 +45,10 @@ from nova.notifier import test_notifier
from nova.openstack.common import importutils
from nova.openstack.common import policy as common_policy
from nova.openstack.common import timeutils
from nova.openstack.common import rpc
from nova.openstack.common.rpc import common as rpc_common
import nova.policy
from nova import quota
from nova import rpc
from nova.rpc import common as rpc_common
from nova.scheduler import driver as scheduler_driver
from nova import test
from nova.tests import fake_network

View File

@ -21,7 +21,7 @@ Unit Tests for nova.compute.rpcapi
from nova.compute import rpcapi as compute_rpcapi
from nova import context
from nova import flags
from nova import rpc
from nova.openstack.common import rpc
from nova import test

View File

@ -21,7 +21,7 @@ Unit Tests for nova.console.rpcapi
from nova.console import rpcapi as console_rpcapi
from nova import context
from nova import flags
from nova import rpc
from nova.openstack.common import rpc
from nova import test

View File

@ -21,7 +21,7 @@ Unit Tests for nova.consoleauth.rpcapi
from nova.consoleauth import rpcapi as consoleauth_rpcapi
from nova import context
from nova import flags
from nova import rpc
from nova.openstack.common import rpc
from nova import test

View File

@ -42,7 +42,7 @@ def set_defaults(conf):
conf.set_default('iscsi_num_targets', 8)
conf.set_default('network_size', 8)
conf.set_default('num_networks', 2)
conf.set_default('rpc_backend', 'nova.rpc.impl_fake')
conf.set_default('rpc_backend', 'nova.openstack.common.rpc.impl_fake')
conf.set_default('sql_connection', "sqlite://")
conf.set_default('sqlite_synchronous', False)
conf.set_default('use_ipv6', True)

View File

@ -27,8 +27,8 @@ from nova import log as logging
from nova.network import linux_net
from nova.network import manager as network_manager
from nova.openstack.common import importutils
from nova.openstack.common import rpc
import nova.policy
from nova import rpc
from nova import test
from nova.tests import fake_network
from nova import utils

View File

@ -1,19 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work
from nova.tests import *

View File

@ -1,321 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls shared between all implementations
"""
import time
import eventlet
from eventlet import greenthread
import nose
from nova import context
from nova import exception
from nova import flags
from nova import log as logging
from nova.rpc import amqp as rpc_amqp
from nova.rpc import common as rpc_common
from nova.rpc import dispatcher as rpc_dispatcher
from nova import test
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
class BaseRpcTestCase(test.TestCase):
def setUp(self, supports_timeouts=True, topic='test',
topic_nested='nested'):
super(BaseRpcTestCase, self).setUp()
self.topic = topic or self.topic
self.topic_nested = topic_nested or self.topic_nested
self.supports_timeouts = supports_timeouts
self.context = context.get_admin_context()
if self.rpc:
receiver = TestReceiver()
self.conn = self._create_consumer(receiver, self.topic)
def tearDown(self):
if self.rpc:
self.conn.close()
super(BaseRpcTestCase, self).tearDown()
def _create_consumer(self, proxy, topic, fanout=False):
dispatcher = rpc_dispatcher.RpcDispatcher([proxy])
conn = self.rpc.create_connection(FLAGS, True)
conn.create_consumer(topic, dispatcher, fanout)
conn.consume_in_thread()
return conn
def test_call_succeed(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
value = 42
result = self.rpc.call(FLAGS, self.context, self.topic,
{"method": "echo", "args": {"value": value}})
self.assertEqual(value, result)
def test_call_succeed_despite_multiple_returns_yield(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
value = 42
result = self.rpc.call(FLAGS, self.context, self.topic,
{"method": "echo_three_times_yield",
"args": {"value": value}})
self.assertEqual(value + 2, result)
def test_multicall_succeed_once(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
value = 42
result = self.rpc.multicall(FLAGS, self.context,
self.topic,
{"method": "echo",
"args": {"value": value}})
for i, x in enumerate(result):
if i > 0:
self.fail('should only receive one response')
self.assertEqual(value + i, x)
def test_multicall_three_nones(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
value = 42
result = self.rpc.multicall(FLAGS, self.context,
self.topic,
{"method": "multicall_three_nones",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(x, None)
# i should have been 0, 1, and finally 2:
self.assertEqual(i, 2)
def test_multicall_succeed_three_times_yield(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
value = 42
result = self.rpc.multicall(FLAGS, self.context,
self.topic,
{"method": "echo_three_times_yield",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(value + i, x)
def test_context_passed(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
"""Makes sure a context is passed through rpc call."""
value = 42
result = self.rpc.call(FLAGS, self.context,
self.topic, {"method": "context",
"args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
def _test_cast(self, fanout=False):
"""Test casts by pushing items through a channeled queue."""
# Not a true global, but capitalized so
# it is clear it is leaking scope into Nested()
QUEUE = eventlet.queue.Queue()
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
# We use the nested topic so we don't need QUEUE to be a proper
# global, and do not keep state outside this test.
class Nested(object):
@staticmethod
def put_queue(context, value):
LOG.debug("Got value in put_queue: %s", value)
QUEUE.put(value)
nested = Nested()
conn = self._create_consumer(nested, self.topic_nested, fanout)
value = 42
method = (self.rpc.cast, self.rpc.fanout_cast)[fanout]
method(FLAGS, self.context,
self.topic_nested,
{"method": "put_queue",
"args": {"value": value}})
try:
# If it does not succeed in 2 seconds, give up and assume
# failure.
result = QUEUE.get(True, 2)
except Exception:
self.assertEqual(value, None)
conn.close()
self.assertEqual(value, result)
def test_cast_success(self):
self._test_cast(False)
def test_fanout_success(self):
self._test_cast(True)
def test_nested_calls(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
"""Test that we can do an rpc.call inside another call."""
class Nested(object):
@staticmethod
def echo(context, queue, value):
"""Calls echo in the passed queue."""
LOG.debug(_("Nested received %(queue)s, %(value)s")
% locals())
# TODO(comstud):
# so, it will replay the context and use the same REQID?
# that's bizarre.
ret = self.rpc.call(FLAGS, context,
queue,
{"method": "echo",
"args": {"value": value}})
LOG.debug(_("Nested return %s"), ret)
return value
nested = Nested()
conn = self._create_consumer(nested, self.topic_nested)
value = 42
result = self.rpc.call(FLAGS, self.context,
self.topic_nested,
{"method": "echo",
"args": {"queue": "test", "value": value}})
conn.close()
self.assertEqual(value, result)
def test_call_timeout(self):
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
"""Make sure rpc.call will time out."""
if not self.supports_timeouts:
raise nose.SkipTest(_("RPC backend does not support timeouts"))
value = 42
self.assertRaises(rpc_common.Timeout,
self.rpc.call,
FLAGS, self.context,
self.topic,
{"method": "block",
"args": {"value": value}}, timeout=1)
try:
self.rpc.call(FLAGS, self.context,
self.topic,
{"method": "block",
"args": {"value": value}},
timeout=1)
self.fail("should have thrown Timeout")
except rpc_common.Timeout as exc:
pass
class BaseRpcAMQPTestCase(BaseRpcTestCase):
"""Base test class for all AMQP-based RPC tests."""
def test_proxycallback_handles_exceptions(self):
"""Make sure exceptions unpacking messages don't cause hangs."""
if not self.rpc:
raise nose.SkipTest('rpc driver not available.')
orig_unpack = rpc_amqp.unpack_context
info = {'unpacked': False}
def fake_unpack_context(*args, **kwargs):
info['unpacked'] = True
raise test.TestingException('moo')
self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
value = 41
self.rpc.cast(FLAGS, self.context, self.topic,
{"method": "echo", "args": {"value": value}})
# Wait for the cast to complete.
for x in xrange(50):
if info['unpacked']:
break
greenthread.sleep(0.1)
else:
self.fail("Timeout waiting for message to be consumed")
# Now see if we get a response even though we raised an
# exception for the cast above.
self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
value = 42
result = self.rpc.call(FLAGS, self.context, self.topic,
{"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
Uses static methods because we aren't actually storing any state.
"""
@staticmethod
def echo(context, value):
"""Simply returns whatever value is sent in."""
LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context."""
LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
def multicall_three_nones(context, value):
yield None
yield None
yield None
@staticmethod
def echo_three_times_yield(context, value):
yield value
yield value + 1
yield value + 2
@staticmethod
def fail(context, value):
"""Raises an exception with the value sent in."""
raise NotImplementedError(value)
@staticmethod
def fail_converted(context, value):
"""Raises an exception with the value sent in."""
raise exception.ConvertedException(explanation=value)
@staticmethod
def block(context, value):
time.sleep(2)

View File

@ -1,144 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for 'common' functons used through rpc code.
"""
import sys
from nova import exception
from nova import flags
from nova import log as logging
from nova.openstack.common import jsonutils
from nova.rpc import common as rpc_common
from nova import test
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
def raise_exception():
raise Exception("test")
class FakeUserDefinedException(Exception):
def __init__(self):
Exception.__init__(self, "Test Message")
class RpcCommonTestCase(test.TestCase):
def test_serialize_remote_exception(self):
expected = {
'class': 'Exception',
'module': 'exceptions',
'message': 'test',
}
try:
raise_exception()
except Exception as exc:
failure = rpc_common.serialize_remote_exception(sys.exc_info())
failure = jsonutils.loads(failure)
#assure the traceback was added
self.assertEqual(expected['class'], failure['class'])
self.assertEqual(expected['module'], failure['module'])
self.assertEqual(expected['message'], failure['message'])
def test_serialize_remote_nova_exception(self):
def raise_nova_exception():
raise exception.NovaException("test", code=500)
expected = {
'class': 'NovaException',
'module': 'nova.exception',
'kwargs': {'code': 500},
'message': 'test'
}
try:
raise_nova_exception()
except Exception as exc:
failure = rpc_common.serialize_remote_exception(sys.exc_info())
failure = jsonutils.loads(failure)
#assure the traceback was added
self.assertEqual(expected['class'], failure['class'])
self.assertEqual(expected['module'], failure['module'])
self.assertEqual(expected['kwargs'], failure['kwargs'])
self.assertEqual(expected['message'], failure['message'])
def test_deserialize_remote_exception(self):
failure = {
'class': 'NovaException',
'module': 'nova.exception',
'message': 'test message',
'tb': ['raise NovaException'],
}
serialized = jsonutils.dumps(failure)
after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
self.assertTrue(isinstance(after_exc, exception.NovaException))
self.assertTrue('test message' in unicode(after_exc))
#assure the traceback was added
self.assertTrue('raise NovaException' in unicode(after_exc))
def test_deserialize_remote_exception_bad_module(self):
failure = {
'class': 'popen2',
'module': 'os',
'kwargs': {'cmd': '/bin/echo failed'},
'message': 'foo',
}
serialized = jsonutils.dumps(failure)
after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
self.assertTrue(isinstance(after_exc, rpc_common.RemoteError))
def test_deserialize_remote_exception_user_defined_exception(self):
"""Ensure a user defined exception can be deserialized."""
self.flags(allowed_rpc_exception_modules=[self.__class__.__module__])
failure = {
'class': 'FakeUserDefinedException',
'module': self.__class__.__module__,
'tb': ['raise FakeUserDefinedException'],
}
serialized = jsonutils.dumps(failure)
after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
self.assertTrue(isinstance(after_exc, FakeUserDefinedException))
#assure the traceback was added
self.assertTrue('raise FakeUserDefinedException' in unicode(after_exc))
def test_deserialize_remote_exception_cannot_recreate(self):
"""Ensure a RemoteError is returned on initialization failure.
If an exception cannot be recreated with it's original class then a
RemoteError with the exception informations should still be returned.
"""
self.flags(allowed_rpc_exception_modules=[self.__class__.__module__])
failure = {
'class': 'FakeIDontExistException',
'module': self.__class__.__module__,
'tb': ['raise FakeIDontExistException'],
}
serialized = jsonutils.dumps(failure)
after_exc = rpc_common.deserialize_remote_exception(FLAGS, serialized)
self.assertTrue(isinstance(after_exc, rpc_common.RemoteError))
#assure the traceback was added
self.assertTrue('raise FakeIDontExistException' in unicode(after_exc))

View File

@ -1,109 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for rpc.dispatcher
"""
from nova import context
from nova.rpc import common as rpc_common
from nova.rpc import dispatcher
from nova import test
class RpcDispatcherTestCase(test.TestCase):
class API1(object):
RPC_API_VERSION = '1.0'
def __init__(self):
self.test_method_ctxt = None
self.test_method_arg1 = None
def test_method(self, ctxt, arg1):
self.test_method_ctxt = ctxt
self.test_method_arg1 = arg1
class API2(object):
RPC_API_VERSION = '2.1'
def __init__(self):
self.test_method_ctxt = None
self.test_method_arg1 = None
def test_method(self, ctxt, arg1):
self.test_method_ctxt = ctxt
self.test_method_arg1 = arg1
class API3(object):
RPC_API_VERSION = '3.5'
def __init__(self):
self.test_method_ctxt = None
self.test_method_arg1 = None
def test_method(self, ctxt, arg1):
self.test_method_ctxt = ctxt
self.test_method_arg1 = arg1
def setUp(self):
self.ctxt = context.RequestContext('fake_user', 'fake_project')
super(RpcDispatcherTestCase, self).setUp()
def tearDown(self):
super(RpcDispatcherTestCase, self).tearDown()
def _test_dispatch(self, version, expectations):
v2 = self.API2()
v3 = self.API3()
disp = dispatcher.RpcDispatcher([v2, v3])
disp.dispatch(self.ctxt, version, 'test_method', arg1=1)
self.assertEqual(v2.test_method_ctxt, expectations[0])
self.assertEqual(v2.test_method_arg1, expectations[1])
self.assertEqual(v3.test_method_ctxt, expectations[2])
self.assertEqual(v3.test_method_arg1, expectations[3])
def test_dispatch(self):
self._test_dispatch('2.1', (self.ctxt, 1, None, None))
self._test_dispatch('3.5', (None, None, self.ctxt, 1))
def test_dispatch_lower_minor_version(self):
self._test_dispatch('2.0', (self.ctxt, 1, None, None))
self._test_dispatch('3.1', (None, None, self.ctxt, 1))
def test_dispatch_higher_minor_version(self):
self.assertRaises(rpc_common.UnsupportedRpcVersion,
self._test_dispatch, '2.6', (None, None, None, None))
self.assertRaises(rpc_common.UnsupportedRpcVersion,
self._test_dispatch, '3.6', (None, None, None, None))
def test_dispatch_lower_major_version(self):
self.assertRaises(rpc_common.UnsupportedRpcVersion,
self._test_dispatch, '1.0', (None, None, None, None))
def test_dispatch_higher_major_version(self):
self.assertRaises(rpc_common.UnsupportedRpcVersion,
self._test_dispatch, '4.0', (None, None, None, None))
def test_dispatch_no_version_uses_v1(self):
v1 = self.API1()
disp = dispatcher.RpcDispatcher([v1])
disp.dispatch(self.ctxt, None, 'test_method', arg1=1)
self.assertEqual(v1.test_method_ctxt, self.ctxt)
self.assertEqual(v1.test_method_arg1, 1)

View File

@ -1,33 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using fake_impl
"""
from nova import log as logging
from nova.rpc import impl_fake
from nova.tests.rpc import common
LOG = logging.getLogger(__name__)
class RpcFakeTestCase(common.BaseRpcTestCase):
def setUp(self):
self.rpc = impl_fake
super(RpcFakeTestCase, self).setUp()

View File

@ -1,395 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using kombu
"""
from nova import context
from nova import exception
from nova import flags
from nova import log as logging
from nova.rpc import amqp as rpc_amqp
from nova import test
from nova.tests.rpc import common
try:
import kombu
from nova.rpc import impl_kombu
except ImportError:
kombu = None
impl_kombu = None
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
class MyException(Exception):
pass
def _raise_exc_stub(stubs, times, obj, method, exc_msg,
exc_class=MyException):
info = {'called': 0}
orig_method = getattr(obj, method)
def _raise_stub(*args, **kwargs):
info['called'] += 1
if info['called'] <= times:
raise exc_class(exc_msg)
orig_method(*args, **kwargs)
stubs.Set(obj, method, _raise_stub)
return info
class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
def setUp(self):
if kombu:
self.rpc = impl_kombu
else:
self.rpc = None
super(RpcKombuTestCase, self).setUp()
def tearDown(self):
if kombu:
impl_kombu.cleanup()
super(RpcKombuTestCase, self).tearDown()
@test.skip_if(kombu is None, "Test requires kombu")
def test_reusing_connection(self):
"""Test that reusing a connection returns same one."""
conn_context = self.rpc.create_connection(FLAGS, new=False)
conn1 = conn_context.connection
conn_context.close()
conn_context = self.rpc.create_connection(FLAGS, new=False)
conn2 = conn_context.connection
conn_context.close()
self.assertEqual(conn1, conn2)
@test.skip_if(kombu is None, "Test requires kombu")
def test_topic_send_receive(self):
"""Test sending to a topic exchange/queue"""
conn = self.rpc.create_connection(FLAGS)
message = 'topic test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_topic_consumer('a_topic', _callback)
conn.topic_send('a_topic', message)
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
@test.skip_if(kombu is None, "Test requires kombu")
def test_topic_multiple_queues(self):
"""Test sending to a topic exchange with multiple queues"""
conn = self.rpc.create_connection(FLAGS)
message = 'topic test message'
self.received_message_1 = None
self.received_message_2 = None
def _callback1(message):
self.received_message_1 = message
def _callback2(message):
self.received_message_2 = message
conn.declare_topic_consumer('a_topic', _callback1, queue_name='queue1')
conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2')
conn.topic_send('a_topic', message)
conn.consume(limit=2)
conn.close()
self.assertEqual(self.received_message_1, message)
self.assertEqual(self.received_message_2, message)
@test.skip_if(kombu is None, "Test requires kombu")
def test_direct_send_receive(self):
"""Test sending to a direct exchange/queue"""
conn = self.rpc.create_connection(FLAGS)
message = 'direct test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_direct_consumer('a_direct', _callback)
conn.direct_send('a_direct', message)
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
@test.skip_if(kombu is None, "Test requires kombu")
def test_cast_interface_uses_default_options(self):
"""Test kombu rpc.cast"""
ctxt = context.RequestContext('fake_user', 'fake_project')
class MyConnection(impl_kombu.Connection):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(myself.params,
{'hostname': FLAGS.rabbit_host,
'userid': FLAGS.rabbit_userid,
'password': FLAGS.rabbit_password,
'port': FLAGS.rabbit_port,
'virtual_host': FLAGS.rabbit_virtual_host,
'transport': 'memory'})
def topic_send(_context, topic, msg):
pass
MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
self.stubs.Set(impl_kombu, 'Connection', MyConnection)
impl_kombu.cast(FLAGS, ctxt, 'fake_topic', {'msg': 'fake'})
@test.skip_if(kombu is None, "Test requires kombu")
def test_cast_to_server_uses_server_params(self):
"""Test kombu rpc.cast"""
ctxt = context.RequestContext('fake_user', 'fake_project')
server_params = {'username': 'fake_username',
'password': 'fake_password',
'hostname': 'fake_hostname',
'port': 31337,
'virtual_host': 'fake_virtual_host'}
class MyConnection(impl_kombu.Connection):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(myself.params,
{'hostname': server_params['hostname'],
'userid': server_params['username'],
'password': server_params['password'],
'port': server_params['port'],
'virtual_host': server_params['virtual_host'],
'transport': 'memory'})
def topic_send(_context, topic, msg):
pass
MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
self.stubs.Set(impl_kombu, 'Connection', MyConnection)
impl_kombu.cast_to_server(FLAGS, ctxt, server_params,
'fake_topic', {'msg': 'fake'})
@test.skip_test("kombu memory transport seems buggy with fanout queues "
"as this test passes when you use rabbit (fake_rabbit=False)")
def test_fanout_send_receive(self):
"""Test sending to a fanout exchange and consuming from 2 queues"""
conn = self.rpc.create_connection()
conn2 = self.rpc.create_connection()
message = 'fanout test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_fanout_consumer('a_fanout', _callback)
conn2.declare_fanout_consumer('a_fanout', _callback)
conn.fanout_send('a_fanout', message)
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
self.received_message = None
conn2.consume(limit=1)
conn2.close()
self.assertEqual(self.received_message, message)
@test.skip_if(kombu is None, "Test requires kombu")
def test_declare_consumer_errors_will_reconnect(self):
# Test that any exception with 'timeout' in it causes a
# reconnection
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
'__init__', 'foo timeout foo')
conn = self.rpc.Connection(FLAGS)
result = conn.declare_consumer(self.rpc.DirectConsumer,
'test_topic', None)
self.assertEqual(info['called'], 3)
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
# Test that any exception in transport.connection_errors causes
# a reconnection
self.stubs.UnsetAll()
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer,
'__init__', 'meow')
conn = self.rpc.Connection(FLAGS)
conn.connection_errors = (MyException, )
result = conn.declare_consumer(self.rpc.DirectConsumer,
'test_topic', None)
self.assertEqual(info['called'], 2)
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
@test.skip_if(kombu is None, "Test requires kombu")
def test_declare_consumer_ioerrors_will_reconnect(self):
"""Test that an IOError exception causes a reconnection"""
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
'__init__', 'Socket closed', exc_class=IOError)
conn = self.rpc.Connection(FLAGS)
result = conn.declare_consumer(self.rpc.DirectConsumer,
'test_topic', None)
self.assertEqual(info['called'], 3)
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
@test.skip_if(kombu is None, "Test requires kombu")
def test_publishing_errors_will_reconnect(self):
# Test that any exception with 'timeout' in it causes a
# reconnection when declaring the publisher class and when
# calling send()
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
'__init__', 'foo timeout foo')
conn = self.rpc.Connection(FLAGS)
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
self.assertEqual(info['called'], 3)
self.stubs.UnsetAll()
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
'send', 'foo timeout foo')
conn = self.rpc.Connection(FLAGS)
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
self.assertEqual(info['called'], 3)
# Test that any exception in transport.connection_errors causes
# a reconnection when declaring the publisher class and when
# calling send()
self.stubs.UnsetAll()
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
'__init__', 'meow')
conn = self.rpc.Connection(FLAGS)
conn.connection_errors = (MyException, )
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
self.assertEqual(info['called'], 2)
self.stubs.UnsetAll()
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
'send', 'meow')
conn = self.rpc.Connection(FLAGS)
conn.connection_errors = (MyException, )
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
self.assertEqual(info['called'], 2)
@test.skip_if(kombu is None, "Test requires kombu")
def test_iterconsume_errors_will_reconnect(self):
conn = self.rpc.Connection(FLAGS)
message = 'reconnect test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_direct_consumer('a_direct', _callback)
conn.direct_send('a_direct', message)
info = _raise_exc_stub(self.stubs, 1, conn.connection,
'drain_events', 'foo timeout foo')
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
# Only called once, because our stub goes away during reconnection
@test.skip_if(kombu is None, "Test requires kombu")
def test_call_exception(self):
"""Test that exception gets passed back properly.
rpc.call returns an Exception object. The value of the
exception is converted to a string.
"""
self.flags(allowed_rpc_exception_modules=['exceptions'])
value = "This is the exception message"
self.assertRaises(NotImplementedError,
self.rpc.call,
FLAGS,
self.context,
'test',
{"method": "fail",
"args": {"value": value}})
try:
self.rpc.call(FLAGS, self.context,
'test',
{"method": "fail",
"args": {"value": value}})
self.fail("should have thrown Exception")
except NotImplementedError as exc:
self.assertTrue(value in unicode(exc))
#Traceback should be included in exception message
self.assertTrue('raise NotImplementedError(value)' in unicode(exc))
@test.skip_if(kombu is None, "Test requires kombu")
def test_call_converted_exception(self):
"""Test that exception gets passed back properly.
rpc.call returns an Exception object. The value of the
exception is converted to a string.
"""
value = "This is the exception message"
self.assertRaises(exception.ConvertedException,
self.rpc.call,
FLAGS,
self.context,
'test',
{"method": "fail_converted",
"args": {"value": value}})
try:
self.rpc.call(FLAGS, self.context,
'test',
{"method": "fail_converted",
"args": {"value": value}})
self.fail("should have thrown Exception")
except exception.ConvertedException as exc:
self.assertTrue(value in unicode(exc))
#Traceback should be included in exception message
self.assertTrue('exception.ConvertedException' in unicode(exc))

View File

@ -1,66 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using kombu + ssl
"""
from nova import flags
from nova import test
try:
import kombu
from nova.rpc import impl_kombu
except ImportError:
kombu = None
impl_kombu = None
# Flag settings we will ensure get passed to amqplib
SSL_VERSION = "SSLv2"
SSL_CERT = "/tmp/cert.blah.blah"
SSL_CA_CERT = "/tmp/cert.ca.blah.blah"
SSL_KEYFILE = "/tmp/keyfile.blah.blah"
FLAGS = flags.FLAGS
class RpcKombuSslTestCase(test.TestCase):
def setUp(self):
super(RpcKombuSslTestCase, self).setUp()
if kombu:
self.flags(kombu_ssl_keyfile=SSL_KEYFILE,
kombu_ssl_ca_certs=SSL_CA_CERT,
kombu_ssl_certfile=SSL_CERT,
kombu_ssl_version=SSL_VERSION,
rabbit_use_ssl=True)
@test.skip_if(kombu is None, "Test requires kombu")
def test_ssl_on_extended(self):
rpc = impl_kombu
conn = rpc.create_connection(FLAGS, True)
c = conn.connection
#This might be kombu version dependent...
#Since we are now peaking into the internals of kombu...
self.assertTrue(isinstance(c.connection.ssl, dict))
self.assertEqual(SSL_VERSION, c.connection.ssl.get("ssl_version"))
self.assertEqual(SSL_CERT, c.connection.ssl.get("certfile"))
self.assertEqual(SSL_CA_CERT, c.connection.ssl.get("ca_certs"))
self.assertEqual(SSL_KEYFILE, c.connection.ssl.get("keyfile"))
#That hash then goes into amqplib which then goes
#Into python ssl creation...

View File

@ -1,58 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import log as logging
from nova.rpc import matchmaker
from nova import test
LOG = logging.getLogger(__name__)
class _MatchMakerTestCase(test.TestCase):
def test_valid_host_matches(self):
queues = self.driver.queues(self.topic)
matched_hosts = map(lambda x: x[1], queues)
for host in matched_hosts:
self.assertIn(host, self.hosts)
def test_fanout_host_matches(self):
"""For known hosts, see if they're in fanout."""
queues = self.driver.queues("fanout~" + self.topic)
matched_hosts = map(lambda x: x[1], queues)
LOG.info("Received result from matchmaker: %s", queues)
for host in self.hosts:
self.assertIn(host, matched_hosts)
class MatchMakerFileTestCase(_MatchMakerTestCase):
def setUp(self):
self.topic = "test"
self.hosts = ['hello', 'world', 'foo', 'bar', 'baz']
ring = {
self.topic: self.hosts
}
self.driver = matchmaker.MatchMakerRing(ring)
super(MatchMakerFileTestCase, self).setUp()
class MatchMakerLocalhostTestCase(_MatchMakerTestCase):
def setUp(self):
self.driver = matchmaker.MatchMakerLocalhost()
self.topic = "test"
self.hosts = ['localhost']
super(MatchMakerLocalhostTestCase, self).setUp()

View File

@ -1,124 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for rpc.proxy
"""
import copy
from nova import context
from nova import rpc
from nova.rpc import proxy
from nova import test
class RpcProxyTestCase(test.TestCase):
def setUp(self):
super(RpcProxyTestCase, self).setUp()
def tearDown(self):
super(RpcProxyTestCase, self).tearDown()
def _test_rpc_method(self, rpc_method, has_timeout=False, has_retval=False,
server_params=None, supports_topic_override=True):
topic = 'fake_topic'
timeout = 123
rpc_proxy = proxy.RpcProxy(topic, '1.0')
ctxt = context.RequestContext('fake_user', 'fake_project')
msg = {'method': 'fake_method', 'args': {'x': 'y'}}
expected_msg = {'method': 'fake_method', 'args': {'x': 'y'},
'version': '1.0'}
expected_retval = 'hi' if has_retval else None
self.fake_args = None
self.fake_kwargs = None
def _fake_rpc_method(*args, **kwargs):
self.fake_args = args
self.fake_kwargs = kwargs
if has_retval:
return expected_retval
self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
args = [ctxt, msg]
if server_params:
args.insert(1, server_params)
# Base method usage
retval = getattr(rpc_proxy, rpc_method)(*args)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg]
if server_params:
expected_args.insert(1, server_params)
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
# overriding the version
retval = getattr(rpc_proxy, rpc_method)(*args, version='1.1')
self.assertEqual(retval, expected_retval)
new_msg = copy.deepcopy(expected_msg)
new_msg['version'] = '1.1'
expected_args = [ctxt, topic, new_msg]
if server_params:
expected_args.insert(1, server_params)
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
if has_timeout:
# set a timeout
retval = getattr(rpc_proxy, rpc_method)(ctxt, msg, timeout=timeout)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg, timeout]
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
if supports_topic_override:
# set a topic
new_topic = 'foo.bar'
retval = getattr(rpc_proxy, rpc_method)(*args, topic=new_topic)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, new_topic, expected_msg]
if server_params:
expected_args.insert(1, server_params)
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
def test_call(self):
self._test_rpc_method('call', has_timeout=True, has_retval=True)
def test_multicall(self):
self._test_rpc_method('multicall', has_timeout=True, has_retval=True)
def test_cast(self):
self._test_rpc_method('cast')
def test_fanout_cast(self):
self._test_rpc_method('fanout_cast', supports_topic_override=False)
def test_cast_to_server(self):
self._test_rpc_method('cast_to_server', server_params={'blah': 1})
def test_fanout_cast_to_server(self):
self._test_rpc_method('fanout_cast_to_server',
server_params={'blah': 1}, supports_topic_override=False)
def test_make_msg(self):
self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2),
{'method': 'test_method', 'args': {'a': 1, 'b': 2}})

View File

@ -1,370 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using qpid
"""
import mox
from nova import context
from nova import flags
from nova import log as logging
from nova.rpc import amqp as rpc_amqp
from nova import test
try:
from nova.rpc import impl_qpid
import qpid
except ImportError:
qpid = None
impl_qpid = None
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
class RpcQpidTestCase(test.TestCase):
"""
Exercise the public API of impl_qpid utilizing mox.
This set of tests utilizes mox to replace the Qpid objects and ensures
that the right operations happen on them when the various public rpc API
calls are exercised. The API calls tested here include:
nova.rpc.create_connection()
nova.rpc.common.Connection.create_consumer()
nova.rpc.common.Connection.close()
nova.rpc.cast()
nova.rpc.fanout_cast()
nova.rpc.call()
nova.rpc.multicall()
"""
def setUp(self):
super(RpcQpidTestCase, self).setUp()
self.mock_connection = None
self.mock_session = None
self.mock_sender = None
self.mock_receiver = None
if qpid:
self.orig_connection = qpid.messaging.Connection
self.orig_session = qpid.messaging.Session
self.orig_sender = qpid.messaging.Sender
self.orig_receiver = qpid.messaging.Receiver
qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection
qpid.messaging.Session = lambda *_x, **_y: self.mock_session
qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender
qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver
def tearDown(self):
if qpid:
qpid.messaging.Connection = self.orig_connection
qpid.messaging.Session = self.orig_session
qpid.messaging.Sender = self.orig_sender
qpid.messaging.Receiver = self.orig_receiver
if impl_qpid:
# Need to reset this in case we changed the connection_cls
# in self._setup_to_server_tests()
impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
super(RpcQpidTestCase, self).tearDown()
@test.skip_if(qpid is None, "Test requires qpid")
def test_create_connection(self):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_connection.opened().AndReturn(False)
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
self.mock_connection.close()
self.mox.ReplayAll()
connection = impl_qpid.create_connection(FLAGS)
connection.close()
def _test_create_consumer(self, fanout):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
self.mock_connection.opened().AndReturn(False)
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
if fanout:
# The link name includes a UUID, so match it with a regex.
expected_address = mox.Regex(r'^impl_qpid_test_fanout ; '
'{"node": {"x-declare": {"auto-delete": true, "durable": '
'false, "type": "fanout"}, "type": "topic"}, "create": '
'"always", "link": {"x-declare": {"auto-delete": true, '
'"exclusive": true, "durable": false}, "durable": true, '
'"name": "impl_qpid_test_fanout_.*"}}$')
else:
expected_address = ('nova/impl_qpid_test ; {"node": {"x-declare": '
'{"auto-delete": true, "durable": true}, "type": "topic"}, '
'"create": "always", "link": {"x-declare": {"auto-delete": '
'true, "exclusive": false, "durable": false}, "durable": '
'true, "name": "impl_qpid_test"}}')
self.mock_session.receiver(expected_address).AndReturn(
self.mock_receiver)
self.mock_receiver.capacity = 1
self.mock_connection.close()
self.mox.ReplayAll()
connection = impl_qpid.create_connection(FLAGS)
connection.create_consumer("impl_qpid_test",
lambda *_x, **_y: None,
fanout)
connection.close()
@test.skip_if(qpid is None, "Test requires qpid")
def test_create_consumer(self):
self._test_create_consumer(fanout=False)
@test.skip_if(qpid is None, "Test requires qpid")
def test_create_consumer_fanout(self):
self._test_create_consumer(fanout=True)
@test.skip_if(qpid is None, "Test requires qpid")
def test_create_worker(self):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
self.mock_connection.opened().AndReturn(False)
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
expected_address = (
'nova/impl_qpid_test ; {"node": {"x-declare": '
'{"auto-delete": true, "durable": true}, "type": "topic"}, '
'"create": "always", "link": {"x-declare": {"auto-delete": '
'true, "exclusive": false, "durable": false}, "durable": '
'true, "name": "impl.qpid.test.workers"}}')
self.mock_session.receiver(expected_address).AndReturn(
self.mock_receiver)
self.mock_receiver.capacity = 1
self.mock_connection.close()
self.mox.ReplayAll()
connection = impl_qpid.create_connection(FLAGS)
connection.create_worker("impl_qpid_test",
lambda *_x, **_y: None,
'impl.qpid.test.workers',
)
connection.close()
def _test_cast(self, fanout, server_params=None):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_sender = self.mox.CreateMock(self.orig_sender)
self.mock_connection.opened().AndReturn(False)
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
if fanout:
expected_address = ('impl_qpid_test_fanout ; '
'{"node": {"x-declare": {"auto-delete": true, '
'"durable": false, "type": "fanout"}, '
'"type": "topic"}, "create": "always"}')
else:
expected_address = ('nova/impl_qpid_test ; {"node": {"x-declare": '
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
'"create": "always"}')
self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
if not server_params:
# This is a pooled connection, so instead of closing it, it
# gets reset, which is just creating a new session on the
# connection.
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
self.mox.ReplayAll()
try:
ctx = context.RequestContext("user", "project")
args = [FLAGS, ctx, "impl_qpid_test",
{"method": "test_method", "args": {}}]
if server_params:
args.insert(2, server_params)
if fanout:
method = impl_qpid.fanout_cast_to_server
else:
method = impl_qpid.cast_to_server
else:
if fanout:
method = impl_qpid.fanout_cast
else:
method = impl_qpid.cast
method(*args)
finally:
while impl_qpid.Connection.pool.free_items:
# Pull the mock connection object out of the connection pool so
# that it doesn't mess up other test cases.
impl_qpid.Connection.pool.get()
@test.skip_if(qpid is None, "Test requires qpid")
def test_cast(self):
self._test_cast(fanout=False)
@test.skip_if(qpid is None, "Test requires qpid")
def test_fanout_cast(self):
self._test_cast(fanout=True)
def _setup_to_server_tests(self, server_params):
class MyConnection(impl_qpid.Connection):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(myself.connection.username,
server_params['username'])
self.assertEqual(myself.connection.password,
server_params['password'])
self.assertEqual(myself.broker,
server_params['hostname'] + ':' +
str(server_params['port']))
MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
self.stubs.Set(impl_qpid, 'Connection', MyConnection)
@test.skip_if(qpid is None, "Test requires qpid")
def test_cast_to_server(self):
server_params = {'username': 'fake_username',
'password': 'fake_password',
'hostname': 'fake_hostname',
'port': 31337}
self._setup_to_server_tests(server_params)
self._test_cast(fanout=False, server_params=server_params)
@test.skip_if(qpid is None, "Test requires qpid")
def test_fanout_cast_to_server(self):
server_params = {'username': 'fake_username',
'password': 'fake_password',
'hostname': 'fake_hostname',
'port': 31337}
self._setup_to_server_tests(server_params)
self._test_cast(fanout=True, server_params=server_params)
def _test_call(self, multi):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_sender = self.mox.CreateMock(self.orig_sender)
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
self.mock_connection.opened().AndReturn(False)
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
rcv_addr = mox.Regex(r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
' true, "durable": true, "type": "direct"}, "type": '
'"topic"}, "create": "always", "link": {"x-declare": '
'{"auto-delete": true, "exclusive": true, "durable": '
'false}, "durable": true, "name": ".*"}}')
self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
self.mock_receiver.capacity = 1
send_addr = ('nova/impl_qpid_test ; {"node": {"x-declare": '
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
'"create": "always"}')
self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
{"result": "foo", "failure": False, "ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
if multi:
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
qpid.messaging.Message(
{"result": "bar", "failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(
qpid.messaging.Message(
{"result": "baz", "failure": False,
"ending": False}))
self.mock_session.acknowledge(mox.IgnoreArg())
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
self.mock_receiver)
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
{"failure": False, "ending": True}))
self.mock_session.acknowledge(mox.IgnoreArg())
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
self.mox.ReplayAll()
try:
ctx = context.RequestContext("user", "project")
if multi:
method = impl_qpid.multicall
else:
method = impl_qpid.call
res = method(FLAGS, ctx, "impl_qpid_test",
{"method": "test_method", "args": {}})
if multi:
self.assertEquals(list(res), ["foo", "bar", "baz"])
else:
self.assertEquals(res, "foo")
finally:
while impl_qpid.Connection.pool.free_items:
# Pull the mock connection object out of the connection pool so
# that it doesn't mess up other test cases.
impl_qpid.Connection.pool.get()
@test.skip_if(qpid is None, "Test requires qpid")
def test_call(self):
self._test_call(multi=False)
@test.skip_if(qpid is None, "Test requires qpid")
def test_multicall(self):
self._test_call(multi=True)
#
#from nova.tests.rpc import common
#
# Qpid does not have a handy in-memory transport like kombu, so it's not
# terribly straight forward to take advantage of the common unit tests.
# However, at least at the time of this writing, the common unit tests all pass
# with qpidd running.
#
# class RpcQpidCommonTestCase(common._BaseRpcTestCase):
# def setUp(self):
# self.rpc = impl_qpid
# super(RpcQpidCommonTestCase, self).setUp()
#
# def tearDown(self):
# super(RpcQpidCommonTestCase, self).tearDown()
#

View File

@ -1,128 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Cloudscaling Group, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using zeromq
"""
import os
from nova import exception
from nova import flags
from nova import log as logging
from nova import rpc
from nova import test
from nova.tests.rpc import common
from nova import utils
try:
from eventlet.green import zmq
from nova.rpc import impl_zmq
except ImportError:
zmq = None
impl_zmq = None
LOG = logging.getLogger(__name__)
FLAGS = flags.FLAGS
class _RpcZmqBaseTestCase(common.BaseRpcTestCase):
@test.skip_if(zmq is None, "Test requires zmq")
def setUp(self, topic='test', topic_nested='nested'):
if not impl_zmq:
return None
self.reactor = None
FLAGS.register_opts(rpc.rpc_opts)
self.rpc = impl_zmq
self.rpc.register_opts(FLAGS)
FLAGS.set_default('rpc_zmq_matchmaker',
'mod_matchmaker.MatchMakerLocalhost')
# We'll change this if we detect no daemon running.
ipc_dir = FLAGS.rpc_zmq_ipc_dir
# Only launch the router if it isn't running independently.
if not os.path.exists(os.path.join(ipc_dir, "zmq_topic_zmq_replies")):
LOG.info(_("Running internal zmq receiver."))
# The normal ipc_dir default needs to run as root,
# /tmp is easier within a testing environment.
FLAGS.set_default('rpc_zmq_ipc_dir', '/tmp/nova-zmq.ipc.test')
# Value has changed.
ipc_dir = FLAGS.rpc_zmq_ipc_dir
try:
# Only launch the receiver if it isn't running independently.
# This is checked again, with the (possibly) new ipc_dir.
if os.path.exists(os.path.join(ipc_dir, "zmq_topic_zmq_replies")):
LOG.warning(_("Detected zmq-receiver socket. "
"Assuming nova-rpc-zmq-receiver is running."))
return
if not os.path.isdir(ipc_dir):
os.mkdir(ipc_dir)
self.reactor = impl_zmq.ZmqProxy(FLAGS)
consume_in = "tcp://%s:%s" % \
(FLAGS.rpc_zmq_bind_address,
FLAGS.rpc_zmq_port)
consumption_proxy = impl_zmq.InternalContext(None)
self.reactor.register(consumption_proxy,
consume_in, zmq.PULL, out_bind=True)
self.reactor.consume_in_thread()
except zmq.ZMQError:
assert False, _("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use.")
except OSError:
assert False, _("Could not create IPC directory %s") % \
(ipc_dir, )
finally:
super(_RpcZmqBaseTestCase, self).setUp(
topic=topic, topic_nested=topic_nested)
def tearDown(self):
if not impl_zmq:
return None
if self.reactor:
self.reactor.close()
try:
utils.execute('rm', '-rf', FLAGS.rpc_zmq_ipc_dir)
except exception.ProcessExecutionError:
pass
super(_RpcZmqBaseTestCase, self).tearDown()
class RpcZmqBaseTopicTestCase(_RpcZmqBaseTestCase):
"""
This tests with topics such as 'test' and 'nested',
without any .host appended. Stresses the matchmaker.
"""
pass
class RpcZmqDirectTopicTestCase(_RpcZmqBaseTestCase):
"""
Test communication directly to a host,
tests use 'localhost'.
"""
def setUp(self):
super(RpcZmqDirectTopicTestCase, self).setUp(
topic='test.localhost',
topic_nested='nested.localhost')

View File

@ -20,7 +20,7 @@ Unit Tests for nova.scheduler.rpcapi
from nova import context
from nova import flags
from nova import rpc
from nova.openstack.common import rpc
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import test

View File

@ -28,9 +28,9 @@ from nova import db
from nova import exception
from nova import flags
from nova.openstack.common import jsonutils
from nova.openstack.common import rpc
from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common import timeutils
from nova import rpc
from nova.rpc import common as rpc_common
from nova.scheduler import driver
from nova.scheduler import manager
from nova import test

View File

@ -73,7 +73,7 @@ class NotifierTestCase(test.TestCase):
def mock_notify(cls, *args):
self.mock_notify = True
self.stubs.Set(nova.rpc, 'notify', mock_notify)
self.stubs.Set(nova.openstack.common.rpc, 'notify', mock_notify)
notifier_api.notify(ctxt, 'publisher_id', 'event_type',
nova.notifier.api.WARN, dict(a=3))
@ -96,7 +96,7 @@ class NotifierTestCase(test.TestCase):
def mock_notify(context, topic, msg):
self.test_topic = topic
self.stubs.Set(nova.rpc, 'notify', mock_notify)
self.stubs.Set(nova.openstack.common.rpc, 'notify', mock_notify)
notifier_api.notify(ctxt, 'publisher_id',
'event_type', 'DEBUG', dict(a=3))
self.assertEqual(self.test_topic, 'testnotify.debug')
@ -112,7 +112,7 @@ class NotifierTestCase(test.TestCase):
def mock_notify(context, topic, data):
msgs.append(data)
self.stubs.Set(nova.rpc, 'notify', mock_notify)
self.stubs.Set(nova.openstack.common.rpc, 'notify', mock_notify)
LOG.error('foo')
self.assertEqual(1, len(msgs))
msg = msgs[0]

View File

@ -26,9 +26,9 @@ from nova.db.sqlalchemy import api as sqa_api
from nova.db.sqlalchemy import models as sqa_models
from nova import exception
from nova import flags
from nova.openstack.common import rpc
from nova.openstack.common import timeutils
from nova import quota
from nova import rpc
from nova.scheduler import driver as scheduler_driver
from nova import test
from nova import volume

View File

@ -18,7 +18,7 @@
"""Tests for the testing base code."""
from nova import rpc
from nova.openstack.common import rpc
from nova import test

View File

@ -31,9 +31,9 @@ from nova import flags
from nova import log as logging
from nova.notifier import test_notifier
from nova.openstack.common import importutils
from nova.openstack.common import rpc
import nova.policy
from nova import quota
from nova import rpc
from nova import test
import nova.volume.api

View File

@ -28,7 +28,7 @@ from nova import flags
from nova import log as logging
from nova.openstack.common import cfg
from nova.openstack.common import jsonutils
from nova import rpc
from nova.openstack.common import rpc
from nova.virt.xenapi import vm_utils
LOG = logging.getLogger(__name__)

View File

@ -26,10 +26,10 @@ from nova.db import base
from nova import exception
from nova import flags
from nova import log as logging
from nova.openstack.common import rpc
from nova.openstack.common import timeutils
import nova.policy
from nova import quota
from nova import rpc
FLAGS = flags.FLAGS
flags.DECLARE('storage_availability_zone', 'nova.volume.manager')

View File

@ -1,7 +1,7 @@
[DEFAULT]
# The list of modules to copy from openstack-common
modules=cfg,excutils,importutils,iniparser,jsonutils,local,policy,setup,timeutils
modules=cfg,excutils,importutils,iniparser,jsonutils,local,policy,setup,timeutils,rpc
# The base module to hold the copy of openstack.common
base=nova