diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index c0cfdd5ce..eeb2791ba 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -3,6 +3,7 @@ # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. +# Copyright 2011 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -26,32 +27,107 @@ flags.DEFINE_string('rpc_backend', 'nova.rpc.impl_kombu', "The messaging module to use, defaults to kombu.") + +def create_connection(new=True): + """Create a connection to the message bus used for rpc. + + For some example usage of creating a connection and some consumers on that + connection, see nova.service. + + :param new: Whether or not to create a new connection. A new connection + will be created by default. If new is False, the + implementation is free to return an existing connection from a + pool. + + :returns: An instance of nova.rpc.common.Connection + """ + return _get_impl().create_connection(new=new) + + +def call(context, topic, msg): + """Invoke a remote method that returns something. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + nova.rpc.common.Connection.create_consumer() and only applies + when the consumer was created with fanout=False. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: A dict from the remote method. + """ + return _get_impl().call(context, topic, msg) + + +def cast(context, topic, msg): + """Invoke a remote method that does not return anything. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + nova.rpc.common.Connection.create_consumer() and only applies + when the consumer was created with fanout=False. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().cast(context, topic, msg) + + +def fanout_cast(context, topic, msg): + """Broadcast a remote method invocation with no return. + + This method will get invoked on all consumers that were set up with this + topic name and fanout=True. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + nova.rpc.common.Connection.create_consumer() and only applies + when the consumer was created with fanout=True. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().fanout_cast(context, topic, msg) + + +def multicall(context, topic, msg): + """Invoke a remote method and get back an iterator. + + In this case, the remote method will be returning multiple values in + separate messages, so the return values can be processed as the come in via + an iterator. + + :param context: Information that identifies the user that has made this + request. + :param topic: The topic to send the rpc message to. This correlates to the + topic argument of + nova.rpc.common.Connection.create_consumer() and only applies + when the consumer was created with fanout=False. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: An iterator. The iterator will yield a tuple (N, X) where N is + an index that starts at 0 and increases by one for each value + returned and X is the Nth value that was returned by the remote + method. + """ + return _get_impl().multicall(context, topic, msg) + + _RPCIMPL = None -def get_impl(): +def _get_impl(): """Delay import of rpc_backend until FLAGS are loaded.""" global _RPCIMPL if _RPCIMPL is None: _RPCIMPL = import_object(FLAGS.rpc_backend) return _RPCIMPL - - -def create_connection(new=True): - return get_impl().create_connection(new=new) - - -def call(context, topic, msg): - return get_impl().call(context, topic, msg) - - -def cast(context, topic, msg): - return get_impl().cast(context, topic, msg) - - -def fanout_cast(context, topic, msg): - return get_impl().fanout_cast(context, topic, msg) - - -def multicall(context, topic, msg): - return get_impl().multicall(context, topic, msg) diff --git a/nova/rpc/common.py b/nova/rpc/common.py index a7597d29b..43c4a1fae 100644 --- a/nova/rpc/common.py +++ b/nova/rpc/common.py @@ -1,3 +1,23 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + from nova import exception from nova import flags from nova import log as logging @@ -26,3 +46,57 @@ class RemoteError(exception.NovaException): self.value = value self.traceback = traceback super(RemoteError, self).__init__(**self.__dict__) + + +class Connection(object): + """A connection, returned by rpc.create_connection(). + + This class represents a connection to the message bus used for rpc. + An instance of this class should never be created by users of the rpc API. + Use rpc.create_connection() instead. + """ + def close(self): + """Close the connection. + + This method must be called when the connection will no longer be used. + It will ensure that any resources associated with the connection, such + as a network connection, and cleaned up. + """ + raise NotImplementedError() + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer on this connection. + + A consumer is associated with a message queue on the backend message + bus. The consumer will read messages from the queue, unpack them, and + dispatch them to the proxy object. The contents of the message pulled + off of the queue will determine which method gets called on the proxy + object. + + :param topic: This is a name associated with what to consume from. + Multiple instances of a service may consume from the same + topic. For example, all instances of nova-compute consume + from a queue called "compute". In that case, the + messages will get distributed amongst the consumers in a + round-robin fashion if fanout=False. If fanout=True, + every consumer associated with this topic will get a + copy of every message. + :param proxy: The object that will handle all incoming messages. + :param fanout: Whether or not this is a fanout topic. See the + documentation for the topic parameter for some + additional comments on this. + """ + raise NotImplementedError() + + def consume_in_thread(self): + """Spawn a thread to handle incoming messages. + + Spawn a thread that will be responsible for handling all incoming + messages for consumers that were set up on this connection. + + Message dispatching inside of this is expected to be implemented in a + non-blocking manner. An example implementation would be having this + thread pull messages in for all of the consumers, but utilize a thread + pool for dispatching the messages to the proxy objects. + """ + raise NotImplementedError() diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py index eed8cb10d..ce119d655 100644 --- a/nova/rpc/impl_carrot.py +++ b/nova/rpc/impl_carrot.py @@ -42,6 +42,7 @@ import greenlet from nova import context from nova import exception from nova import flags +from nova.rpc import common as rpc_common from nova.rpc.common import RemoteError, LOG from nova.testing import fake @@ -51,7 +52,7 @@ eventlet.monkey_patch() FLAGS = flags.FLAGS -class Connection(carrot_connection.BrokerConnection): +class Connection(carrot_connection.BrokerConnection, rpc_common.Connection): """Connection instance object.""" def __init__(self, *args, **kwargs): @@ -105,7 +106,7 @@ class Connection(carrot_connection.BrokerConnection): # ignore all errors pass self._rpc_consumers = [] - super(Connection, self).close() + carrot_connection.BrokerConnection.close(self) def consume_in_thread(self): """Consumer from all queues/consumers in a greenthread""" diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 757e7636a..b16bc3c79 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -33,6 +33,7 @@ import greenlet from nova import context from nova import exception from nova import flags +from nova.rpc import common as rpc_common from nova.rpc.common import RemoteError, LOG # Needed for tests @@ -512,7 +513,7 @@ ConnectionPool = Pool( order_as_stack=True) -class ConnectionContext(object): +class ConnectionContext(rpc_common.Connection): """The class that is actually returned to the caller of create_connection(). This is a essentially a wrapper around Connection that supports 'with' and can return a new Connection or @@ -569,6 +570,12 @@ class ConnectionContext(object): """Caller is done with this connection.""" self._done() + def create_consumer(self, topic, proxy, fanout=False): + self.connection.create_consumer(topic, proxy, fanout) + + def consume_in_thread(self): + self.connection.consume_in_thread() + def __getattr__(self, key): """Proxy all other calls to the Connection instance""" if self.connection: