Document the public RPC API

I think I've covered everything that's public, which actually turns out
not to be a whole lot.

Still need to document the notifications API.
This commit is contained in:
Mark McLoughlin 2013-06-07 11:10:24 +01:00
parent e6a237d766
commit f67c090277
6 changed files with 307 additions and 0 deletions

View File

@ -47,6 +47,21 @@ class MessageHandlingServer(object):
super(MessageHandlingServer, self).__init__()
def start(self):
"""Start handling incoming messages.
This method causes the server to begin polling the transport for
incoming messages and passing them to the dispatcher. Message
processing will continue until the stop() method is called.
The executor controls how the server integrates with the applications
I/O handling strategy - it may choose to poll for messages in a new
process, thread or co-operatively scheduled coroutine or simply by
registering a callback with an event loop. Similarly, the executor may
choose to dispatch messages in a new thread, coroutine or simply the
current thread. An RPCServer subclass is available for each I/O
strategy supported by the library, so choose the subclass appropraite
for your program.
"""
if self._executor is not None:
return
listener = self.transport._listen(self.target)
@ -55,10 +70,22 @@ class MessageHandlingServer(object):
self._executor.start()
def stop(self):
"""Stop handling incoming messages.
Once this method returns, no new incoming messages will be handled by
the server. However, the server may still be in the process of handling
some messages.
"""
if self._executor is not None:
self._executor.stop()
def wait(self):
"""Wait for message processing to complete.
After calling stop(), there may still be some some existing messages
which have not been completely processed. The wait() method blocks
until all message processing has completed.
"""
if self._executor is not None:
self._executor.wait()
self._executor = None

View File

@ -19,7 +19,27 @@ from openstack.common.messaging.rpc import server
class EventletRPCServer(server._RPCServer):
"""An RPC server which integrates with eventlet.
This is an RPC server which polls for incoming messages from a greenthread
and dispatches each message in its own greenthread.
The stop() method kills the message polling greenthread and the wait()
method waits for all message dispatch greenthreads to complete.
"""
def __init__(self, transport, target, endpoints, serializer=None):
"""Construct a new eventlet RPC server.
:param transport: the messaging transport
:type transport: Transport
:param target: the exchange, topic and server to listen on
:type target: Target
:param endpoints: a list of endpoint objects
:type endpoints: list
:param serializer: an optional entity serializer
:type serializer: Serializer
"""
executor_cls = impl_eventlet.EventletExecutor
super(EventletRPCServer, self).__init__(transport,
target,

View File

@ -78,6 +78,7 @@ class _CallContext(object):
return msg
def cast(self, ctxt, method, **kwargs):
"""Invoke a method and return immediately. See RPCClient.cast()."""
msg = self._make_message(ctxt, method, kwargs)
self.transport._send(self.target, ctxt, msg)
@ -99,6 +100,7 @@ class _CallContext(object):
version_cap=self.version_cap)
def call(self, ctxt, method, **kwargs):
"""Invoke a method and wait for a reply. See RPCClient.call()."""
msg = self._make_message(ctxt, method, kwargs)
timeout = self.timeout
@ -117,9 +119,68 @@ class _CallContext(object):
class RPCClient(object):
"""A class for invoking methods on remote servers.
The RPCClient class is responsible for sending method invocations to remote
servers via a messaging transport.
A default target is supplied to the RPCClient constructor, but target
attributes can be overridden for individual method invocations using the
prepare() method.
A method invocation consists of a request context dictionary, a method name
and a dictionary of arguments. A cast() invocation just sends the request
and returns immediately. A call() invocation waits for the server to send
a return value.
This class is intended to be used by subclassing it and providing methods
on the subclass which will perform the remote invocation using call() or
cast()::
class TestClient(messaging.RPCClient):
def __init__(self, transport):
target = messaging.Target(topic='testtopic', version='2.0')
super(Client, self).__init__(transport, target)
def test(self, ctxt, arg):
return self.call(ctxt, 'test', arg=arg)
An example of using the prepare() method to override some attributes of the
default target::
def test(self, ctxt, arg):
cctxt = self.prepare(version='2.5')
return cctxt.call(ctxt, 'test', arg=arg)
RPCClient have a number of other properties - timeout, check_for_lock and
version_cap - which may make sense to override for some method invocations,
so they too can be passed to prepare()::
def test(self, ctxt, arg):
cctxt = self.prepare(check_for_lock=False, timeout=10)
return cctxt.call(ctxt, 'test', arg=arg)
"""
def __init__(self, transport, target,
timeout=None, check_for_lock=None,
version_cap=None, serializer=None):
"""Construct an RPC client.
:param transport: a messaging transport handle
:type transport: Transport
:param target: the default target for invocations
:type target: Target
:param timeout: an optional default timeout (in seconds) for call()s
:type timeout: int or float
:param check_for_lock: warn if a lockutils.synchronized lock is held
:type check_for_lock: bool
:param version_cap: raise a RpcVersionCapError version exceeds this cap
:type version_cap: str
:param serializer: an optional entity serializer
:type serializer: Serializer
"""
self.conf = transport.conf
self.conf.register_opts(_client_opts)
@ -137,6 +198,36 @@ class RPCClient(object):
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, check_for_lock=_marker, version_cap=_marker):
"""Prepare a method invocation context.
Use this method to override client properties for an individual method
invocation. For example::
def test(self, ctxt, arg):
cctxt = self.prepare(version='2.5')
return cctxt.call(ctxt, 'test', arg=arg)
:param exchange: see Target.exchange
:type exchange: str
:param topic: see Target.topic
:type topic: str
:param namespace: see Target.namespace
:type namespace: str
:param version: requirement the server must support, see Target.version
:type version: str
:param server: send to a specific server, see Target.server
:type server: str
:param fanout: send to all servers on topic, see Target.fanout
:type fanout: bool
:param timeout: an optional default timeout (in seconds) for call()s
:type timeout: int or float
:param check_for_lock: warn if a lockutils.synchronized lock is held
:type check_for_lock: bool
:param version_cap: raise a RpcVersionCapError version exceeds this cap
:type version_cap: str
:param serializer: an optional entity serializer
:type serializer: Serializer
"""
kwargs = dict(
exchange=exchange,
topic=topic,
@ -160,7 +251,32 @@ class RPCClient(object):
version_cap)
def cast(self, ctxt, method, **kwargs):
"""Invoke a method and return immediately.
Method arguments must either be primitive types or types supported by
the client's serializer (if any).
:param ctxt: a request context dict
:type ctxt: dict
:param method: the method name
:type method: str
:param kwargs: a dict of method arguments
:param kwargs: dict
"""
self.prepare().cast(ctxt, method, **kwargs)
def call(self, ctxt, method, **kwargs):
"""Invoke a method and wait for a reply.
Method arguments must either be primitive types or types supported by
the client's serializer (if any).
:param ctxt: a request context dict
:type ctxt: dict
:param method: the method name
:type method: str
:param kwargs: a dict of method arguments
:param kwargs: dict
:raises: MessagingTimeout
"""
return self.prepare().call(ctxt, method, **kwargs)

View File

@ -17,6 +17,82 @@ from openstack.common.messaging import _server
from openstack.common.messaging._executors import impl_blocking
from openstack.common.messaging.rpc import _dispatcher
"""
An RPC server exposes a number of endpoints, each of which contain a set of
methods which may be invoked remotely by clients over a given transport.
To create an RPC server, you supply a transport, target and a list of
endpoints.
A transport can be obtained simply by calling the get_transport() method:
transport = messaging.get_transport(conf)
which will load the appropriate transport driver according to the user's
messaging configuration configuration. See get_transport() for more details.
The target supplied when creating an RPC server expresses the topic, server
name and - optionally - the exchange to listen on. See Target for more details
on these attributes.
Each endpoint object may have a target attribute which may have namespace and
version fields set. By default, we use the 'null namespace' and version 1.0.
Incoming method calls will be dispatched to the first endpoint with the
requested method, a matching namespace and a compatible version number.
RPC servers have start(), stop() and wait() messages to begin handling
requests, stop handling requests and wait for all in-process requests to
complete.
An RPC server class is provided for each supported I/O handling framework.
Currently BlockingRPCServer and eventlet.RPCServer are available.
A simple example of an RPC server with multiple endpoints might be:
from oslo.config import cfg
from openstack.common import messaging
class ServerControlEndpoint(object):
target = messaging.Target(namespace='control',
version='2.0')
def __init__(self, server):
self.server = server
def stop(self, ctx):
self.server.stop()
class TestEndpoint(object):
def test(self, ctx, arg):
return arg
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test', server='server1')
endpoints = [
ServerControlEndpoint(self),
TestEndpoint(),
]
server = messaging.BlockingRPCServer(transport, target, endpoints)
server.start()
server.wait()
Clients can invoke methods on the server by sending the request to a topic and
it gets sent to one of the servers listening on the topic, or by sending the
request to a specific server listening on the topic, or by sending the request
to all servers listening on the topic (known as fanout). These modes are chosen
via the server and fanout attributes on Target but the mode used is transparent
to the server.
The first parameter to method invocations is always the request context
supplied by the client.
Parameters to the method invocation are primitive types and so must be the
return values from the methods. By supplying a serializer object, a server can
deserialize arguments from - serialize return values to - primitive types.
"""
class _RPCServer(_server.MessageHandlingServer):
@ -30,7 +106,30 @@ class _RPCServer(_server.MessageHandlingServer):
class BlockingRPCServer(_RPCServer):
"""An RPC server which blocks and dispatches in the current thread.
The blocking RPC server is a very simple RPC server whose start() method
functions as a request processing loop - i.e. it blocks, processes messages
and only returns when stop() is called from a dispatched method.
Method calls are dispatched in the current thread, so only a single method
call can be executing at once.
This class is likely to only be useful for simple demo programs.
"""
def __init__(self, transport, target, endpoints, serializer=None):
"""Construct a new blocking RPC server.
:param transport: the messaging transport
:type transport: Transport
:param target: the exchange, topic and server to listen on
:type target: Target
:param endpoints: a list of endpoint objects
:type endpoints: list
:param serializer: an optional entity serializer
:type serializer: Serializer
"""
executor_cls = impl_blocking.BlockingExecutor
super(BlockingRPCServer, self).__init__(transport,
target,

View File

@ -21,6 +21,16 @@ class Target(object):
A Target encapsulates all the information to identify where a message
should be sent or what messages a server is listening for.
Different subsets of the information encapsulated in a Target object is
relevant to various aspects of the API::
creating a server:
topic and server is required; exchange is optional
an endpoint's target:
namespace and version is optional
client sending a message:
topic is required, all other attributes optional
Its attributes are:
:param exchange: A scope for topics. Leave unspecified to default to the

View File

@ -40,12 +40,26 @@ _transport_opts = [
def set_defaults(control_exchange):
"""Set defaults for messaging transport configuration options.
:param control_exchange: the default exchange under which topics are scoped
:type control_exchange: str
"""
cfg.set_defaults(_transport_opts,
control_exchange=control_exchange)
class Transport(object):
"""A messaging transport.
This is a mostly opaque handle for an underlying messaging transport
driver.
It has a single 'conf' property which is the cfg.ConfigOpts instance used
to construct the transport object.
"""
def __init__(self, driver):
self.conf = driver.conf
self._driver = driver
@ -62,6 +76,27 @@ class Transport(object):
def get_transport(conf, url=None):
"""A factory method for Transport objects.
This method will construct a Transport object from transport configuration
gleaned from the user's configuration and, optionally, a transport URL.
If a transport URL is supplied as a parameter, any transport configuration
contained in it takes precedence. If no transport URL is supplied, but
there is a transport URL supplied in the user's configuration then that
URL will take the place of the url parameter. In both cases, any
configuration not supplied in the transport URL may be taken from
individual configuration parameters in the user's configuration.
An example transport URL might be::
rabbit://me:passwd@host:5672/myexchange
:param conf: the user configuration
:type conf: cfg.ConfigOpts
:param url: a transport URL
:type url: str
"""
conf.register_opts(_transport_opts)
url = url or conf.transport_url