Merge "Help clarify rpc API with docs and a bit of code."
This commit is contained in:
@@ -3,6 +3,7 @@
|
|||||||
# Copyright 2010 United States Government as represented by the
|
# Copyright 2010 United States Government as represented by the
|
||||||
# Administrator of the National Aeronautics and Space Administration.
|
# Administrator of the National Aeronautics and Space Administration.
|
||||||
# All Rights Reserved.
|
# All Rights Reserved.
|
||||||
|
# Copyright 2011 Red Hat, Inc.
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
# not use this file except in compliance with the License. You may obtain
|
# not use this file except in compliance with the License. You may obtain
|
||||||
@@ -26,32 +27,107 @@ flags.DEFINE_string('rpc_backend',
|
|||||||
'nova.rpc.impl_kombu',
|
'nova.rpc.impl_kombu',
|
||||||
"The messaging module to use, defaults to kombu.")
|
"The messaging module to use, defaults to kombu.")
|
||||||
|
|
||||||
|
|
||||||
|
def create_connection(new=True):
|
||||||
|
"""Create a connection to the message bus used for rpc.
|
||||||
|
|
||||||
|
For some example usage of creating a connection and some consumers on that
|
||||||
|
connection, see nova.service.
|
||||||
|
|
||||||
|
:param new: Whether or not to create a new connection. A new connection
|
||||||
|
will be created by default. If new is False, the
|
||||||
|
implementation is free to return an existing connection from a
|
||||||
|
pool.
|
||||||
|
|
||||||
|
:returns: An instance of nova.rpc.common.Connection
|
||||||
|
"""
|
||||||
|
return _get_impl().create_connection(new=new)
|
||||||
|
|
||||||
|
|
||||||
|
def call(context, topic, msg):
|
||||||
|
"""Invoke a remote method that returns something.
|
||||||
|
|
||||||
|
:param context: Information that identifies the user that has made this
|
||||||
|
request.
|
||||||
|
:param topic: The topic to send the rpc message to. This correlates to the
|
||||||
|
topic argument of
|
||||||
|
nova.rpc.common.Connection.create_consumer() and only applies
|
||||||
|
when the consumer was created with fanout=False.
|
||||||
|
:param msg: This is a dict in the form { "method" : "method_to_invoke",
|
||||||
|
"args" : dict_of_kwargs }
|
||||||
|
|
||||||
|
:returns: A dict from the remote method.
|
||||||
|
"""
|
||||||
|
return _get_impl().call(context, topic, msg)
|
||||||
|
|
||||||
|
|
||||||
|
def cast(context, topic, msg):
|
||||||
|
"""Invoke a remote method that does not return anything.
|
||||||
|
|
||||||
|
:param context: Information that identifies the user that has made this
|
||||||
|
request.
|
||||||
|
:param topic: The topic to send the rpc message to. This correlates to the
|
||||||
|
topic argument of
|
||||||
|
nova.rpc.common.Connection.create_consumer() and only applies
|
||||||
|
when the consumer was created with fanout=False.
|
||||||
|
:param msg: This is a dict in the form { "method" : "method_to_invoke",
|
||||||
|
"args" : dict_of_kwargs }
|
||||||
|
|
||||||
|
:returns: None
|
||||||
|
"""
|
||||||
|
return _get_impl().cast(context, topic, msg)
|
||||||
|
|
||||||
|
|
||||||
|
def fanout_cast(context, topic, msg):
|
||||||
|
"""Broadcast a remote method invocation with no return.
|
||||||
|
|
||||||
|
This method will get invoked on all consumers that were set up with this
|
||||||
|
topic name and fanout=True.
|
||||||
|
|
||||||
|
:param context: Information that identifies the user that has made this
|
||||||
|
request.
|
||||||
|
:param topic: The topic to send the rpc message to. This correlates to the
|
||||||
|
topic argument of
|
||||||
|
nova.rpc.common.Connection.create_consumer() and only applies
|
||||||
|
when the consumer was created with fanout=True.
|
||||||
|
:param msg: This is a dict in the form { "method" : "method_to_invoke",
|
||||||
|
"args" : dict_of_kwargs }
|
||||||
|
|
||||||
|
:returns: None
|
||||||
|
"""
|
||||||
|
return _get_impl().fanout_cast(context, topic, msg)
|
||||||
|
|
||||||
|
|
||||||
|
def multicall(context, topic, msg):
|
||||||
|
"""Invoke a remote method and get back an iterator.
|
||||||
|
|
||||||
|
In this case, the remote method will be returning multiple values in
|
||||||
|
separate messages, so the return values can be processed as the come in via
|
||||||
|
an iterator.
|
||||||
|
|
||||||
|
:param context: Information that identifies the user that has made this
|
||||||
|
request.
|
||||||
|
:param topic: The topic to send the rpc message to. This correlates to the
|
||||||
|
topic argument of
|
||||||
|
nova.rpc.common.Connection.create_consumer() and only applies
|
||||||
|
when the consumer was created with fanout=False.
|
||||||
|
:param msg: This is a dict in the form { "method" : "method_to_invoke",
|
||||||
|
"args" : dict_of_kwargs }
|
||||||
|
|
||||||
|
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
|
||||||
|
an index that starts at 0 and increases by one for each value
|
||||||
|
returned and X is the Nth value that was returned by the remote
|
||||||
|
method.
|
||||||
|
"""
|
||||||
|
return _get_impl().multicall(context, topic, msg)
|
||||||
|
|
||||||
|
|
||||||
_RPCIMPL = None
|
_RPCIMPL = None
|
||||||
|
|
||||||
|
|
||||||
def get_impl():
|
def _get_impl():
|
||||||
"""Delay import of rpc_backend until FLAGS are loaded."""
|
"""Delay import of rpc_backend until FLAGS are loaded."""
|
||||||
global _RPCIMPL
|
global _RPCIMPL
|
||||||
if _RPCIMPL is None:
|
if _RPCIMPL is None:
|
||||||
_RPCIMPL = import_object(FLAGS.rpc_backend)
|
_RPCIMPL = import_object(FLAGS.rpc_backend)
|
||||||
return _RPCIMPL
|
return _RPCIMPL
|
||||||
|
|
||||||
|
|
||||||
def create_connection(new=True):
|
|
||||||
return get_impl().create_connection(new=new)
|
|
||||||
|
|
||||||
|
|
||||||
def call(context, topic, msg):
|
|
||||||
return get_impl().call(context, topic, msg)
|
|
||||||
|
|
||||||
|
|
||||||
def cast(context, topic, msg):
|
|
||||||
return get_impl().cast(context, topic, msg)
|
|
||||||
|
|
||||||
|
|
||||||
def fanout_cast(context, topic, msg):
|
|
||||||
return get_impl().fanout_cast(context, topic, msg)
|
|
||||||
|
|
||||||
|
|
||||||
def multicall(context, topic, msg):
|
|
||||||
return get_impl().multicall(context, topic, msg)
|
|
||||||
|
|||||||
@@ -1,3 +1,23 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2010 United States Government as represented by the
|
||||||
|
# Administrator of the National Aeronautics and Space Administration.
|
||||||
|
# All Rights Reserved.
|
||||||
|
# Copyright 2011 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
from nova import exception
|
from nova import exception
|
||||||
from nova import flags
|
from nova import flags
|
||||||
from nova import log as logging
|
from nova import log as logging
|
||||||
@@ -26,3 +46,57 @@ class RemoteError(exception.NovaException):
|
|||||||
self.value = value
|
self.value = value
|
||||||
self.traceback = traceback
|
self.traceback = traceback
|
||||||
super(RemoteError, self).__init__(**self.__dict__)
|
super(RemoteError, self).__init__(**self.__dict__)
|
||||||
|
|
||||||
|
|
||||||
|
class Connection(object):
|
||||||
|
"""A connection, returned by rpc.create_connection().
|
||||||
|
|
||||||
|
This class represents a connection to the message bus used for rpc.
|
||||||
|
An instance of this class should never be created by users of the rpc API.
|
||||||
|
Use rpc.create_connection() instead.
|
||||||
|
"""
|
||||||
|
def close(self):
|
||||||
|
"""Close the connection.
|
||||||
|
|
||||||
|
This method must be called when the connection will no longer be used.
|
||||||
|
It will ensure that any resources associated with the connection, such
|
||||||
|
as a network connection, and cleaned up.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def create_consumer(self, topic, proxy, fanout=False):
|
||||||
|
"""Create a consumer on this connection.
|
||||||
|
|
||||||
|
A consumer is associated with a message queue on the backend message
|
||||||
|
bus. The consumer will read messages from the queue, unpack them, and
|
||||||
|
dispatch them to the proxy object. The contents of the message pulled
|
||||||
|
off of the queue will determine which method gets called on the proxy
|
||||||
|
object.
|
||||||
|
|
||||||
|
:param topic: This is a name associated with what to consume from.
|
||||||
|
Multiple instances of a service may consume from the same
|
||||||
|
topic. For example, all instances of nova-compute consume
|
||||||
|
from a queue called "compute". In that case, the
|
||||||
|
messages will get distributed amongst the consumers in a
|
||||||
|
round-robin fashion if fanout=False. If fanout=True,
|
||||||
|
every consumer associated with this topic will get a
|
||||||
|
copy of every message.
|
||||||
|
:param proxy: The object that will handle all incoming messages.
|
||||||
|
:param fanout: Whether or not this is a fanout topic. See the
|
||||||
|
documentation for the topic parameter for some
|
||||||
|
additional comments on this.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def consume_in_thread(self):
|
||||||
|
"""Spawn a thread to handle incoming messages.
|
||||||
|
|
||||||
|
Spawn a thread that will be responsible for handling all incoming
|
||||||
|
messages for consumers that were set up on this connection.
|
||||||
|
|
||||||
|
Message dispatching inside of this is expected to be implemented in a
|
||||||
|
non-blocking manner. An example implementation would be having this
|
||||||
|
thread pull messages in for all of the consumers, but utilize a thread
|
||||||
|
pool for dispatching the messages to the proxy objects.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ import greenlet
|
|||||||
from nova import context
|
from nova import context
|
||||||
from nova import exception
|
from nova import exception
|
||||||
from nova import flags
|
from nova import flags
|
||||||
|
from nova.rpc import common as rpc_common
|
||||||
from nova.rpc.common import RemoteError, LOG
|
from nova.rpc.common import RemoteError, LOG
|
||||||
from nova.testing import fake
|
from nova.testing import fake
|
||||||
|
|
||||||
@@ -51,7 +52,7 @@ eventlet.monkey_patch()
|
|||||||
FLAGS = flags.FLAGS
|
FLAGS = flags.FLAGS
|
||||||
|
|
||||||
|
|
||||||
class Connection(carrot_connection.BrokerConnection):
|
class Connection(carrot_connection.BrokerConnection, rpc_common.Connection):
|
||||||
"""Connection instance object."""
|
"""Connection instance object."""
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
@@ -105,7 +106,7 @@ class Connection(carrot_connection.BrokerConnection):
|
|||||||
# ignore all errors
|
# ignore all errors
|
||||||
pass
|
pass
|
||||||
self._rpc_consumers = []
|
self._rpc_consumers = []
|
||||||
super(Connection, self).close()
|
carrot_connection.BrokerConnection.close(self)
|
||||||
|
|
||||||
def consume_in_thread(self):
|
def consume_in_thread(self):
|
||||||
"""Consumer from all queues/consumers in a greenthread"""
|
"""Consumer from all queues/consumers in a greenthread"""
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ import greenlet
|
|||||||
from nova import context
|
from nova import context
|
||||||
from nova import exception
|
from nova import exception
|
||||||
from nova import flags
|
from nova import flags
|
||||||
|
from nova.rpc import common as rpc_common
|
||||||
from nova.rpc.common import RemoteError, LOG
|
from nova.rpc.common import RemoteError, LOG
|
||||||
|
|
||||||
# Needed for tests
|
# Needed for tests
|
||||||
@@ -512,7 +513,7 @@ ConnectionPool = Pool(
|
|||||||
order_as_stack=True)
|
order_as_stack=True)
|
||||||
|
|
||||||
|
|
||||||
class ConnectionContext(object):
|
class ConnectionContext(rpc_common.Connection):
|
||||||
"""The class that is actually returned to the caller of
|
"""The class that is actually returned to the caller of
|
||||||
create_connection(). This is a essentially a wrapper around
|
create_connection(). This is a essentially a wrapper around
|
||||||
Connection that supports 'with' and can return a new Connection or
|
Connection that supports 'with' and can return a new Connection or
|
||||||
@@ -569,6 +570,12 @@ class ConnectionContext(object):
|
|||||||
"""Caller is done with this connection."""
|
"""Caller is done with this connection."""
|
||||||
self._done()
|
self._done()
|
||||||
|
|
||||||
|
def create_consumer(self, topic, proxy, fanout=False):
|
||||||
|
self.connection.create_consumer(topic, proxy, fanout)
|
||||||
|
|
||||||
|
def consume_in_thread(self):
|
||||||
|
self.connection.consume_in_thread()
|
||||||
|
|
||||||
def __getattr__(self, key):
|
def __getattr__(self, key):
|
||||||
"""Proxy all other calls to the Connection instance"""
|
"""Proxy all other calls to the Connection instance"""
|
||||||
if self.connection:
|
if self.connection:
|
||||||
|
|||||||
Reference in New Issue
Block a user