Merge "Core modifications for future zones service."

This commit is contained in:
Jenkins
2012-02-16 21:41:43 +00:00
committed by Gerrit Code Review
7 changed files with 246 additions and 30 deletions

View File

@@ -441,7 +441,16 @@ global_opts = [
help='Cache glance images locally'), help='Cache glance images locally'),
cfg.BoolOpt('use_cow_images', cfg.BoolOpt('use_cow_images',
default=True, default=True,
help='Whether to use cow images') help='Whether to use cow images'),
cfg.StrOpt('compute_api_class',
default='nova.compute.api.API',
help='The compute API class to use'),
cfg.StrOpt('network_api_class',
default='nova.network.api.API',
help='The network API class to use'),
cfg.StrOpt('volume_api_class',
default='nova.volume.api.API',
help='The volume API class to use'),
] ]
FLAGS.register_opts(global_opts) FLAGS.register_opts(global_opts)

View File

@@ -161,6 +161,37 @@ def cleanup():
return _get_impl().cleanup() return _get_impl().cleanup()
def cast_to_server(context, server_params, topic, msg):
"""Invoke a remote method that does not return anything.
:param context: Information that identifies the user that has made this
request.
:param server_params: Connection information
:param topic: The topic to send the notification to.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:returns: None
"""
return _get_impl().cast_to_server(context, server_params, topic, msg)
def fanout_cast_to_server(context, server_params, topic, msg):
"""Broadcast to a remote method invocation with no return.
:param context: Information that identifies the user that has made this
request.
:param server_params: Connection information
:param topic: The topic to send the notification to.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:returns: None
"""
return _get_impl().fanout_cast_to_server(context, server_params, topic,
msg)
_RPCIMPL = None _RPCIMPL = None

View File

@@ -73,14 +73,15 @@ class ConnectionContext(rpc_common.Connection):
the pool. the pool.
""" """
def __init__(self, connection_pool, pooled=True): def __init__(self, connection_pool, pooled=True, server_params=None):
"""Create a new connection, or get one from the pool""" """Create a new connection, or get one from the pool"""
self.connection = None self.connection = None
self.connection_pool = connection_pool self.connection_pool = connection_pool
if pooled: if pooled:
self.connection = connection_pool.get() self.connection = connection_pool.get()
else: else:
self.connection = connection_pool.connection_cls() self.connection = connection_pool.connection_cls(
server_params=server_params)
self.pooled = pooled self.pooled = pooled
def __enter__(self): def __enter__(self):
@@ -353,6 +354,23 @@ def fanout_cast(context, topic, msg, connection_pool):
conn.fanout_send(topic, msg) conn.fanout_send(topic, msg)
def cast_to_server(context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
pack_context(msg, context)
with ConnectionContext(connection_pool, pooled=False,
server_params=server_params) as conn:
conn.topic_send(topic, msg)
def fanout_cast_to_server(context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
pack_context(msg, context)
with ConnectionContext(connection_pool, pooled=False,
server_params=server_params) as conn:
conn.fanout_send(topic, msg)
def notify(context, topic, msg, connection_pool): def notify(context, topic, msg, connection_pool):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
LOG.debug(_('Sending notification on %s...'), topic) LOG.debug(_('Sending notification on %s...'), topic)

View File

@@ -27,8 +27,6 @@ import kombu.entity
import kombu.messaging import kombu.messaging
import kombu.connection import kombu.connection
from nova import context
from nova import exception
from nova import flags from nova import flags
from nova.rpc import common as rpc_common from nova.rpc import common as rpc_common
from nova.rpc import amqp as rpc_amqp from nova.rpc import amqp as rpc_amqp
@@ -310,7 +308,7 @@ class NotifyPublisher(TopicPublisher):
class Connection(object): class Connection(object):
"""Connection object.""" """Connection object."""
def __init__(self): def __init__(self, server_params=None):
self.consumers = [] self.consumers = []
self.consumer_thread = None self.consumer_thread = None
self.max_retries = FLAGS.rabbit_max_retries self.max_retries = FLAGS.rabbit_max_retries
@@ -323,11 +321,25 @@ class Connection(object):
self.interval_max = 30 self.interval_max = 30
self.memory_transport = False self.memory_transport = False
self.params = dict(hostname=FLAGS.rabbit_host, if server_params is None:
port=FLAGS.rabbit_port, server_params = {}
userid=FLAGS.rabbit_userid,
password=FLAGS.rabbit_password, # Keys to translate from server_params to kombu params
virtual_host=FLAGS.rabbit_virtual_host) server_params_to_kombu_params = {'username': 'userid'}
params = {}
for sp_key, value in server_params.iteritems():
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
params.setdefault('hostname', FLAGS.rabbit_host)
params.setdefault('port', FLAGS.rabbit_port)
params.setdefault('userid', FLAGS.rabbit_userid)
params.setdefault('password', FLAGS.rabbit_password)
params.setdefault('virtual_host', FLAGS.rabbit_virtual_host)
self.params = params
if FLAGS.fake_rabbit: if FLAGS.fake_rabbit:
self.params['transport'] = 'memory' self.params['transport'] = 'memory'
self.memory_transport = True self.memory_transport = True
@@ -588,10 +600,10 @@ class Connection(object):
"""Create a consumer that calls a method in a proxy object""" """Create a consumer that calls a method in a proxy object"""
if fanout: if fanout:
self.declare_fanout_consumer(topic, self.declare_fanout_consumer(topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool)) rpc_amqp.ProxyCallback(proxy, Connection.pool))
else: else:
self.declare_topic_consumer(topic, self.declare_topic_consumer(topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool)) rpc_amqp.ProxyCallback(proxy, Connection.pool))
Connection.pool = rpc_amqp.Pool(connection_cls=Connection) Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
@@ -622,6 +634,18 @@ def fanout_cast(context, topic, msg):
return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
def cast_to_server(context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
return rpc_amqp.cast_to_server(context, server_params, topic, msg,
Connection.pool)
def fanout_cast_to_server(context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
return rpc_amqp.cast_to_server(context, server_params, topic, msg,
Connection.pool)
def notify(context, topic, msg): def notify(context, topic, msg):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg, Connection.pool) return rpc_amqp.notify(context, topic, msg, Connection.pool)

View File

@@ -272,19 +272,31 @@ class NotifyPublisher(Publisher):
class Connection(object): class Connection(object):
"""Connection object.""" """Connection object."""
def __init__(self): def __init__(self, server_params=None):
self.session = None self.session = None
self.consumers = {} self.consumers = {}
self.consumer_thread = None self.consumer_thread = None
self.broker = FLAGS.qpid_hostname + ":" + FLAGS.qpid_port if server_params is None:
server_params = {}
default_params = dict(hostname=FLAGS.qpid_hostname,
port=FLAGS.qpid_port,
username=FLAGS.qpid_username,
password=FLAGS.qpid_password)
params = server_params
for key in default_params.keys():
params.setdefault(key, default_params[key])
self.broker = params['hostname'] + ":" + str(params['port'])
# Create the connection - this does not open the connection # Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(self.broker) self.connection = qpid.messaging.Connection(self.broker)
# Check if flags are set and if so set them for the connection # Check if flags are set and if so set them for the connection
# before we call open # before we call open
self.connection.username = FLAGS.qpid_username self.connection.username = params['username']
self.connection.password = FLAGS.qpid_password self.connection.password = params['password']
self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms
self.connection.reconnect = FLAGS.qpid_reconnect self.connection.reconnect = FLAGS.qpid_reconnect
self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout
@@ -474,10 +486,10 @@ class Connection(object):
"""Create a consumer that calls a method in a proxy object""" """Create a consumer that calls a method in a proxy object"""
if fanout: if fanout:
consumer = FanoutConsumer(self.session, topic, consumer = FanoutConsumer(self.session, topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool)) rpc_amqp.ProxyCallback(proxy, Connection.pool))
else: else:
consumer = TopicConsumer(self.session, topic, consumer = TopicConsumer(self.session, topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool)) rpc_amqp.ProxyCallback(proxy, Connection.pool))
self._register_consumer(consumer) self._register_consumer(consumer)
return consumer return consumer
@@ -510,6 +522,18 @@ def fanout_cast(context, topic, msg):
return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
def cast_to_server(context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
return rpc_amqp.cast_to_server(context, server_params, topic, msg,
Connection.pool)
def fanout_cast_to_server(context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
return rpc_amqp.fanout_cast_to_server(context, server_params, topic,
msg, Connection.pool)
def notify(context, topic, msg): def notify(context, topic, msg):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg, Connection.pool) return rpc_amqp.notify(context, topic, msg, Connection.pool)

View File

@@ -19,12 +19,15 @@
Unit Tests for remote procedure calls using kombu Unit Tests for remote procedure calls using kombu
""" """
from nova import context
from nova import flags
from nova import log as logging from nova import log as logging
from nova import test from nova import test
from nova.rpc import amqp as rpc_amqp
from nova.rpc import impl_kombu from nova.rpc import impl_kombu
from nova.tests.rpc import common from nova.tests.rpc import common
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -99,6 +102,61 @@ class RpcKombuTestCase(common._BaseRpcTestCase):
self.assertEqual(self.received_message, message) self.assertEqual(self.received_message, message)
def test_cast_interface_uses_default_options(self):
"""Test kombu rpc.cast"""
ctxt = context.RequestContext('fake_user', 'fake_project')
class MyConnection(impl_kombu.Connection):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(myself.params,
{'hostname': FLAGS.rabbit_host,
'userid': FLAGS.rabbit_userid,
'password': FLAGS.rabbit_password,
'port': FLAGS.rabbit_port,
'virtual_host': FLAGS.rabbit_virtual_host,
'transport': 'memory'})
def topic_send(_context, topic, msg):
pass
MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
self.stubs.Set(impl_kombu, 'Connection', MyConnection)
impl_kombu.cast(ctxt, 'fake_topic', {'msg': 'fake'})
def test_cast_to_server_uses_server_params(self):
"""Test kombu rpc.cast"""
ctxt = context.RequestContext('fake_user', 'fake_project')
server_params = {'username': 'fake_username',
'password': 'fake_password',
'hostname': 'fake_hostname',
'port': 31337,
'virtual_host': 'fake_virtual_host'}
class MyConnection(impl_kombu.Connection):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(myself.params,
{'hostname': server_params['hostname'],
'userid': server_params['username'],
'password': server_params['password'],
'port': server_params['port'],
'virtual_host': server_params['virtual_host'],
'transport': 'memory'})
def topic_send(_context, topic, msg):
pass
MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
self.stubs.Set(impl_kombu, 'Connection', MyConnection)
impl_kombu.cast_to_server(ctxt, server_params,
'fake_topic', {'msg': 'fake'})
@test.skip_test("kombu memory transport seems buggy with fanout queues " @test.skip_test("kombu memory transport seems buggy with fanout queues "
"as this test passes when you use rabbit (fake_rabbit=False)") "as this test passes when you use rabbit (fake_rabbit=False)")
def test_fanout_send_receive(self): def test_fanout_send_receive(self):

View File

@@ -24,6 +24,7 @@ import mox
from nova import context from nova import context
from nova import log as logging from nova import log as logging
from nova.rpc import amqp as rpc_amqp
from nova import test from nova import test
try: try:
@@ -80,6 +81,10 @@ class RpcQpidTestCase(test.TestCase):
qpid.messaging.Session = self.orig_session qpid.messaging.Session = self.orig_session
qpid.messaging.Sender = self.orig_sender qpid.messaging.Sender = self.orig_sender
qpid.messaging.Receiver = self.orig_receiver qpid.messaging.Receiver = self.orig_receiver
if impl_qpid:
# Need to reset this in case we changed the connection_cls
# in self._setup_to_server_tests()
impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
self.mocker.ResetAll() self.mocker.ResetAll()
@@ -147,13 +152,15 @@ class RpcQpidTestCase(test.TestCase):
def test_create_consumer_fanout(self): def test_create_consumer_fanout(self):
self._test_create_consumer(fanout=True) self._test_create_consumer(fanout=True)
def _test_cast(self, fanout): def _test_cast(self, fanout, server_params=None):
self.mock_connection = self.mocker.CreateMock(self.orig_connection) self.mock_connection = self.mocker.CreateMock(self.orig_connection)
self.mock_session = self.mocker.CreateMock(self.orig_session) self.mock_session = self.mocker.CreateMock(self.orig_session)
self.mock_sender = self.mocker.CreateMock(self.orig_sender) self.mock_sender = self.mocker.CreateMock(self.orig_sender)
self.mock_connection.opened().AndReturn(False) self.mock_connection.opened().AndReturn(False)
self.mock_connection.open() self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session) self.mock_connection.session().AndReturn(self.mock_session)
if fanout: if fanout:
expected_address = ('impl_qpid_test_fanout ; ' expected_address = ('impl_qpid_test_fanout ; '
@@ -166,22 +173,34 @@ class RpcQpidTestCase(test.TestCase):
'"create": "always"}') '"create": "always"}')
self.mock_session.sender(expected_address).AndReturn(self.mock_sender) self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
self.mock_sender.send(mox.IgnoreArg()) self.mock_sender.send(mox.IgnoreArg())
# This is a pooled connection, so instead of closing it, it gets reset, if not server_params:
# which is just creating a new session on the connection. # This is a pooled connection, so instead of closing it, it
self.mock_session.close() # gets reset, which is just creating a new session on the
self.mock_connection.session().AndReturn(self.mock_session) # connection.
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
self.mocker.ReplayAll() self.mocker.ReplayAll()
try: try:
ctx = context.RequestContext("user", "project") ctx = context.RequestContext("user", "project")
if fanout: args = [ctx, "impl_qpid_test",
impl_qpid.fanout_cast(ctx, "impl_qpid_test", {"method": "test_method", "args": {}}]
{"method": "test_method", "args": {}})
if server_params:
args.insert(1, server_params)
if fanout:
method = impl_qpid.fanout_cast_to_server
else:
method = impl_qpid.cast_to_server
else: else:
impl_qpid.cast(ctx, "impl_qpid_test", if fanout:
{"method": "test_method", "args": {}}) method = impl_qpid.fanout_cast
else:
method = impl_qpid.cast
method(*args)
self.mocker.VerifyAll() self.mocker.VerifyAll()
finally: finally:
@@ -198,6 +217,39 @@ class RpcQpidTestCase(test.TestCase):
def test_fanout_cast(self): def test_fanout_cast(self):
self._test_cast(fanout=True) self._test_cast(fanout=True)
def _setup_to_server_tests(self, server_params):
class MyConnection(impl_qpid.Connection):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(myself.connection.username,
server_params['username'])
self.assertEqual(myself.connection.password,
server_params['password'])
self.assertEqual(myself.broker,
server_params['hostname'] + ':' +
str(server_params['port']))
MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
self.stubs.Set(impl_qpid, 'Connection', MyConnection)
@test.skip_if(qpid is None, "Test requires qpid")
def test_cast_to_server(self):
server_params = {'username': 'fake_username',
'password': 'fake_password',
'hostname': 'fake_hostname',
'port': 31337}
self._setup_to_server_tests(server_params)
self._test_cast(fanout=False, server_params=server_params)
@test.skip_if(qpid is None, "Test requires qpid")
def test_fanout_cast_to_server(self):
server_params = {'username': 'fake_username',
'password': 'fake_password',
'hostname': 'fake_hostname',
'port': 31337}
self._setup_to_server_tests(server_params)
self._test_cast(fanout=True, server_params=server_params)
def _test_call(self, multi): def _test_call(self, multi):
self.mock_connection = self.mocker.CreateMock(self.orig_connection) self.mock_connection = self.mocker.CreateMock(self.orig_connection)
self.mock_session = self.mocker.CreateMock(self.orig_session) self.mock_session = self.mocker.CreateMock(self.orig_session)