diff --git a/nova/flags.py b/nova/flags.py index 3b06760d..af490e28 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -441,7 +441,16 @@ global_opts = [ help='Cache glance images locally'), cfg.BoolOpt('use_cow_images', default=True, - help='Whether to use cow images') + help='Whether to use cow images'), + cfg.StrOpt('compute_api_class', + default='nova.compute.api.API', + help='The compute API class to use'), + cfg.StrOpt('network_api_class', + default='nova.network.api.API', + help='The network API class to use'), + cfg.StrOpt('volume_api_class', + default='nova.volume.api.API', + help='The volume API class to use'), ] FLAGS.register_opts(global_opts) diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index 3ad6ddd2..412a32b7 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -161,6 +161,37 @@ def cleanup(): return _get_impl().cleanup() +def cast_to_server(context, server_params, topic, msg): + """Invoke a remote method that does not return anything. + + :param context: Information that identifies the user that has made this + request. + :param server_params: Connection information + :param topic: The topic to send the notification to. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().cast_to_server(context, server_params, topic, msg) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Broadcast to a remote method invocation with no return. + + :param context: Information that identifies the user that has made this + request. + :param server_params: Connection information + :param topic: The topic to send the notification to. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().fanout_cast_to_server(context, server_params, topic, + msg) + + _RPCIMPL = None diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py index 01e12776..95655d38 100644 --- a/nova/rpc/amqp.py +++ b/nova/rpc/amqp.py @@ -73,14 +73,15 @@ class ConnectionContext(rpc_common.Connection): the pool. """ - def __init__(self, connection_pool, pooled=True): + def __init__(self, connection_pool, pooled=True, server_params=None): """Create a new connection, or get one from the pool""" self.connection = None self.connection_pool = connection_pool if pooled: self.connection = connection_pool.get() else: - self.connection = connection_pool.connection_cls() + self.connection = connection_pool.connection_cls( + server_params=server_params) self.pooled = pooled def __enter__(self): @@ -353,6 +354,23 @@ def fanout_cast(context, topic, msg, connection_pool): conn.fanout_send(topic, msg) +def cast_to_server(context, server_params, topic, msg, connection_pool): + """Sends a message on a topic to a specific server.""" + pack_context(msg, context) + with ConnectionContext(connection_pool, pooled=False, + server_params=server_params) as conn: + conn.topic_send(topic, msg) + + +def fanout_cast_to_server(context, server_params, topic, msg, + connection_pool): + """Sends a message on a fanout exchange to a specific server.""" + pack_context(msg, context) + with ConnectionContext(connection_pool, pooled=False, + server_params=server_params) as conn: + conn.fanout_send(topic, msg) + + def notify(context, topic, msg, connection_pool): """Sends a notification event on a topic.""" LOG.debug(_('Sending notification on %s...'), topic) diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index a90d06a7..99402f6b 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -27,8 +27,6 @@ import kombu.entity import kombu.messaging import kombu.connection -from nova import context -from nova import exception from nova import flags from nova.rpc import common as rpc_common from nova.rpc import amqp as rpc_amqp @@ -310,7 +308,7 @@ class NotifyPublisher(TopicPublisher): class Connection(object): """Connection object.""" - def __init__(self): + def __init__(self, server_params=None): self.consumers = [] self.consumer_thread = None self.max_retries = FLAGS.rabbit_max_retries @@ -323,11 +321,25 @@ class Connection(object): self.interval_max = 30 self.memory_transport = False - self.params = dict(hostname=FLAGS.rabbit_host, - port=FLAGS.rabbit_port, - userid=FLAGS.rabbit_userid, - password=FLAGS.rabbit_password, - virtual_host=FLAGS.rabbit_virtual_host) + if server_params is None: + server_params = {} + + # Keys to translate from server_params to kombu params + server_params_to_kombu_params = {'username': 'userid'} + + params = {} + for sp_key, value in server_params.iteritems(): + p_key = server_params_to_kombu_params.get(sp_key, sp_key) + params[p_key] = value + + params.setdefault('hostname', FLAGS.rabbit_host) + params.setdefault('port', FLAGS.rabbit_port) + params.setdefault('userid', FLAGS.rabbit_userid) + params.setdefault('password', FLAGS.rabbit_password) + params.setdefault('virtual_host', FLAGS.rabbit_virtual_host) + + self.params = params + if FLAGS.fake_rabbit: self.params['transport'] = 'memory' self.memory_transport = True @@ -588,10 +600,10 @@ class Connection(object): """Create a consumer that calls a method in a proxy object""" if fanout: self.declare_fanout_consumer(topic, - rpc_amqp.ProxyCallback(proxy, Connection.pool)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) else: self.declare_topic_consumer(topic, - rpc_amqp.ProxyCallback(proxy, Connection.pool)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) Connection.pool = rpc_amqp.Pool(connection_cls=Connection) @@ -622,6 +634,18 @@ def fanout_cast(context, topic, msg): return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) +def cast_to_server(context, server_params, topic, msg): + """Sends a message on a topic to a specific server.""" + return rpc_amqp.cast_to_server(context, server_params, topic, msg, + Connection.pool) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Sends a message on a fanout exchange to a specific server.""" + return rpc_amqp.cast_to_server(context, server_params, topic, msg, + Connection.pool) + + def notify(context, topic, msg): """Sends a notification event on a topic.""" return rpc_amqp.notify(context, topic, msg, Connection.pool) diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py index 10bc162f..98f8b06d 100644 --- a/nova/rpc/impl_qpid.py +++ b/nova/rpc/impl_qpid.py @@ -272,19 +272,31 @@ class NotifyPublisher(Publisher): class Connection(object): """Connection object.""" - def __init__(self): + def __init__(self, server_params=None): self.session = None self.consumers = {} self.consumer_thread = None - self.broker = FLAGS.qpid_hostname + ":" + FLAGS.qpid_port + if server_params is None: + server_params = {} + + default_params = dict(hostname=FLAGS.qpid_hostname, + port=FLAGS.qpid_port, + username=FLAGS.qpid_username, + password=FLAGS.qpid_password) + + params = server_params + for key in default_params.keys(): + params.setdefault(key, default_params[key]) + + self.broker = params['hostname'] + ":" + str(params['port']) # Create the connection - this does not open the connection self.connection = qpid.messaging.Connection(self.broker) # Check if flags are set and if so set them for the connection # before we call open - self.connection.username = FLAGS.qpid_username - self.connection.password = FLAGS.qpid_password + self.connection.username = params['username'] + self.connection.password = params['password'] self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms self.connection.reconnect = FLAGS.qpid_reconnect self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout @@ -474,10 +486,10 @@ class Connection(object): """Create a consumer that calls a method in a proxy object""" if fanout: consumer = FanoutConsumer(self.session, topic, - rpc_amqp.ProxyCallback(proxy, Connection.pool)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) else: consumer = TopicConsumer(self.session, topic, - rpc_amqp.ProxyCallback(proxy, Connection.pool)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) self._register_consumer(consumer) return consumer @@ -510,6 +522,18 @@ def fanout_cast(context, topic, msg): return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) +def cast_to_server(context, server_params, topic, msg): + """Sends a message on a topic to a specific server.""" + return rpc_amqp.cast_to_server(context, server_params, topic, msg, + Connection.pool) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Sends a message on a fanout exchange to a specific server.""" + return rpc_amqp.fanout_cast_to_server(context, server_params, topic, + msg, Connection.pool) + + def notify(context, topic, msg): """Sends a notification event on a topic.""" return rpc_amqp.notify(context, topic, msg, Connection.pool) diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py index 928582a2..f90d111f 100644 --- a/nova/tests/rpc/test_kombu.py +++ b/nova/tests/rpc/test_kombu.py @@ -19,12 +19,15 @@ Unit Tests for remote procedure calls using kombu """ +from nova import context +from nova import flags from nova import log as logging from nova import test +from nova.rpc import amqp as rpc_amqp from nova.rpc import impl_kombu from nova.tests.rpc import common - +FLAGS = flags.FLAGS LOG = logging.getLogger(__name__) @@ -99,6 +102,61 @@ class RpcKombuTestCase(common._BaseRpcTestCase): self.assertEqual(self.received_message, message) + def test_cast_interface_uses_default_options(self): + """Test kombu rpc.cast""" + + ctxt = context.RequestContext('fake_user', 'fake_project') + + class MyConnection(impl_kombu.Connection): + def __init__(myself, *args, **kwargs): + super(MyConnection, myself).__init__(*args, **kwargs) + self.assertEqual(myself.params, + {'hostname': FLAGS.rabbit_host, + 'userid': FLAGS.rabbit_userid, + 'password': FLAGS.rabbit_password, + 'port': FLAGS.rabbit_port, + 'virtual_host': FLAGS.rabbit_virtual_host, + 'transport': 'memory'}) + + def topic_send(_context, topic, msg): + pass + + MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection) + self.stubs.Set(impl_kombu, 'Connection', MyConnection) + + impl_kombu.cast(ctxt, 'fake_topic', {'msg': 'fake'}) + + def test_cast_to_server_uses_server_params(self): + """Test kombu rpc.cast""" + + ctxt = context.RequestContext('fake_user', 'fake_project') + + server_params = {'username': 'fake_username', + 'password': 'fake_password', + 'hostname': 'fake_hostname', + 'port': 31337, + 'virtual_host': 'fake_virtual_host'} + + class MyConnection(impl_kombu.Connection): + def __init__(myself, *args, **kwargs): + super(MyConnection, myself).__init__(*args, **kwargs) + self.assertEqual(myself.params, + {'hostname': server_params['hostname'], + 'userid': server_params['username'], + 'password': server_params['password'], + 'port': server_params['port'], + 'virtual_host': server_params['virtual_host'], + 'transport': 'memory'}) + + def topic_send(_context, topic, msg): + pass + + MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection) + self.stubs.Set(impl_kombu, 'Connection', MyConnection) + + impl_kombu.cast_to_server(ctxt, server_params, + 'fake_topic', {'msg': 'fake'}) + @test.skip_test("kombu memory transport seems buggy with fanout queues " "as this test passes when you use rabbit (fake_rabbit=False)") def test_fanout_send_receive(self): diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py index 61d47fb5..fb8be4c7 100644 --- a/nova/tests/rpc/test_qpid.py +++ b/nova/tests/rpc/test_qpid.py @@ -24,6 +24,7 @@ import mox from nova import context from nova import log as logging +from nova.rpc import amqp as rpc_amqp from nova import test try: @@ -80,6 +81,10 @@ class RpcQpidTestCase(test.TestCase): qpid.messaging.Session = self.orig_session qpid.messaging.Sender = self.orig_sender qpid.messaging.Receiver = self.orig_receiver + if impl_qpid: + # Need to reset this in case we changed the connection_cls + # in self._setup_to_server_tests() + impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection self.mocker.ResetAll() @@ -147,13 +152,15 @@ class RpcQpidTestCase(test.TestCase): def test_create_consumer_fanout(self): self._test_create_consumer(fanout=True) - def _test_cast(self, fanout): + def _test_cast(self, fanout, server_params=None): + self.mock_connection = self.mocker.CreateMock(self.orig_connection) self.mock_session = self.mocker.CreateMock(self.orig_session) self.mock_sender = self.mocker.CreateMock(self.orig_sender) self.mock_connection.opened().AndReturn(False) self.mock_connection.open() + self.mock_connection.session().AndReturn(self.mock_session) if fanout: expected_address = ('impl_qpid_test_fanout ; ' @@ -166,22 +173,34 @@ class RpcQpidTestCase(test.TestCase): '"create": "always"}') self.mock_session.sender(expected_address).AndReturn(self.mock_sender) self.mock_sender.send(mox.IgnoreArg()) - # This is a pooled connection, so instead of closing it, it gets reset, - # which is just creating a new session on the connection. - self.mock_session.close() - self.mock_connection.session().AndReturn(self.mock_session) + if not server_params: + # This is a pooled connection, so instead of closing it, it + # gets reset, which is just creating a new session on the + # connection. + self.mock_session.close() + self.mock_connection.session().AndReturn(self.mock_session) self.mocker.ReplayAll() try: ctx = context.RequestContext("user", "project") - if fanout: - impl_qpid.fanout_cast(ctx, "impl_qpid_test", - {"method": "test_method", "args": {}}) + args = [ctx, "impl_qpid_test", + {"method": "test_method", "args": {}}] + + if server_params: + args.insert(1, server_params) + if fanout: + method = impl_qpid.fanout_cast_to_server + else: + method = impl_qpid.cast_to_server else: - impl_qpid.cast(ctx, "impl_qpid_test", - {"method": "test_method", "args": {}}) + if fanout: + method = impl_qpid.fanout_cast + else: + method = impl_qpid.cast + + method(*args) self.mocker.VerifyAll() finally: @@ -198,6 +217,39 @@ class RpcQpidTestCase(test.TestCase): def test_fanout_cast(self): self._test_cast(fanout=True) + def _setup_to_server_tests(self, server_params): + class MyConnection(impl_qpid.Connection): + def __init__(myself, *args, **kwargs): + super(MyConnection, myself).__init__(*args, **kwargs) + self.assertEqual(myself.connection.username, + server_params['username']) + self.assertEqual(myself.connection.password, + server_params['password']) + self.assertEqual(myself.broker, + server_params['hostname'] + ':' + + str(server_params['port'])) + + MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection) + self.stubs.Set(impl_qpid, 'Connection', MyConnection) + + @test.skip_if(qpid is None, "Test requires qpid") + def test_cast_to_server(self): + server_params = {'username': 'fake_username', + 'password': 'fake_password', + 'hostname': 'fake_hostname', + 'port': 31337} + self._setup_to_server_tests(server_params) + self._test_cast(fanout=False, server_params=server_params) + + @test.skip_if(qpid is None, "Test requires qpid") + def test_fanout_cast_to_server(self): + server_params = {'username': 'fake_username', + 'password': 'fake_password', + 'hostname': 'fake_hostname', + 'port': 31337} + self._setup_to_server_tests(server_params) + self._test_cast(fanout=True, server_params=server_params) + def _test_call(self, multi): self.mock_connection = self.mocker.CreateMock(self.orig_connection) self.mock_session = self.mocker.CreateMock(self.orig_session)