From 9bc9c0dc6a142540ffb45fb0a9f43cc4aa09e9ac Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Sun, 11 Sep 2016 21:07:02 -0400 Subject: [PATCH] Fixups to the inline documentation Rework the inline documentation for executors, RPC servers and clients, notifiers and notification listeners for clarity and flow. Change-Id: If4f1db853a7fc85340177fd2c9c43a479d72459d --- doc/source/executors.rst | 21 +++++- doc/source/server.rst | 6 +- oslo_messaging/notify/listener.py | 104 ++++++++++++++++----------- oslo_messaging/notify/messaging.py | 21 ++++++ oslo_messaging/notify/notifier.py | 27 +++---- oslo_messaging/rpc/client.py | 111 +++++++++++++++++++---------- oslo_messaging/rpc/server.py | 62 +++++++++------- oslo_messaging/target.py | 32 +++++---- 8 files changed, 251 insertions(+), 133 deletions(-) diff --git a/doc/source/executors.rst b/doc/source/executors.rst index 2fde5b812..e7d8a71c9 100644 --- a/doc/source/executors.rst +++ b/doc/source/executors.rst @@ -2,9 +2,24 @@ Executors ========= -Executors are providing the way an incoming message will be dispatched so that -the message can be used for meaningful work. Different types of executors are -supported, each with its own set of restrictions and capabilities. +Executors control how a received message is scheduled for processing +by a Server. This scheduling can be *synchronous* or *asynchronous*. + +A synchronous executor will process the message on the Server's +thread. This means the Server can process only one message at a time. +Other incoming messages will not be processed until the current +message is done processing. For example, in the case of an RPCServer +only one method call will be invoked at a time. A synchronous +executor guarantees that messages complete processing in the order +that they are received. + +An asynchronous executor will process received messages concurrently. +The Server thread will not be blocked by message processing and can +continue to service incoming messages. There are no ordering +guarantees - message processing may complete in a different order than +they were received. The executor may be configured to limit the +maximum number of messages that are processed at once. + Available Executors =================== diff --git a/doc/source/server.rst b/doc/source/server.rst index 0378d430e..f3b32543c 100644 --- a/doc/source/server.rst +++ b/doc/source/server.rst @@ -1,6 +1,6 @@ ------- -Server ------- +---------- +RPC Server +---------- .. automodule:: oslo_messaging.rpc.server diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py index 386e79e62..968b6489e 100644 --- a/oslo_messaging/notify/listener.py +++ b/oslo_messaging/notify/listener.py @@ -13,8 +13,68 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -"""A notification listener exposes a number of endpoints, each of which -contain a set of methods. Each method corresponds to a notification priority. +"""A notification listener is used to process notification messages sent by a +notifier that uses the ``messaging`` driver. + +A notification listener subscribes to the topic - and optionally exchange - in +the supplied target. Notification messages sent by notifier clients to the +target's topic/exchange are received by the listener. + +If multiple listeners subscribe to the same target, the notification will be +received by only one of the listeners. The receiving listener is selected from +the group using a best-effort round-robin algorithm. + +This delivery pattern can be altered somewhat by specifying a pool name for the +listener. Listeners with the same pool name behave like a subgroup within the +group of listeners subscribed to the same topic/exchange. Each subgroup of +listeners will receive a copy of the notification to be consumed by one member +of the subgroup. Therefore, multiple copies of the notification will be +delivered - one to the group of listeners that have no pool name (if they +exist), and one to each subgroup of listeners that share the same pool name. + +Note that not all transport drivers have implemented support for listener +pools. Those drivers that do not support pools will raise a NotImplementedError +if a pool name is specified to get_notification_listener(). + +A notification listener exposes a number of endpoints, each of which contain a +set of methods. Each method's name corresponds to a notification's priority. +When a notification is received it is dispatched to the method named like the +notification's priority - e.g. ``info`` notifications are dispatched to the +info() method, etc. + +Optionally a notification endpoint can define a NotificationFilter. +Notification messages that do not match the filter's rules will *not* be passed +to the endpoint's methods. + +Parameters to endpoint methods are: the request context supplied by the client, +the publisher_id of the notification message, the event_type, the payload and +metadata. The metadata parameter is a mapping containing a unique message_id +and a timestamp. + +An endpoint method can explicitly return +oslo_messaging.NotificationResult.HANDLED to acknowledge a message or +oslo_messaging.NotificationResult.REQUEUE to requeue the message. Note that not +all transport drivers implement support for requeueing. In order to use this +feature, applications should assert that the feature is available by passing +allow_requeue=True to get_notification_listener(). If the driver does not +support requeueing, it will raise NotImplementedError at this point. + +The message is acknowledged only if all endpoints either return +oslo_messaging.NotificationResult.HANDLED or None. + +Each notification listener is associated with an executor which controls how +incoming notification messages will be received and dispatched. By default, the +most simple executor is used - the blocking executor. This executor processes +inbound notifications on the server's thread, blocking it from processing +additional notifications until it finishes with the current one. Refer to the +Executor documentation for descriptions of the other types of executors. + +*Note:* If the "eventlet" executor is used, the threading and time library need +to be monkeypatched. + +Notification listener have start(), stop() and wait() messages to begin +handling requests, stop handling requests, and wait for all in-process +requests to complete after the listener has been stopped. To create a notification listener, you supply a transport, list of targets and a list of endpoints. @@ -27,18 +87,6 @@ method:: which will load the appropriate transport driver according to the user's messaging configuration. See get_notification_transport() for more details. -The target supplied when creating a notification listener expresses the topic -and - optionally - the exchange to listen on. See Target for more details -on these attributes. - -Notification listener have start(), stop() and wait() messages to begin -handling requests, stop handling requests and wait for all in-process -requests to complete. - -Each notification listener is associated with an executor which integrates the -listener with a specific I/O handling framework. Currently, there are blocking -and eventlet executors available. - A simple example of a notification listener with multiple endpoints might be:: from oslo_config import cfg @@ -72,35 +120,9 @@ A simple example of a notification listener with multiple endpoints might be:: server.start() server.wait() -A notifier sends a notification on a topic with a priority, the notification -listener will receive this notification if the topic of this one have been set -in one of the targets and if an endpoint implements the method named like the -priority and if the notification match the NotificationFilter rule set into -the filter_rule attribute of the endpoint. - -Parameters to endpoint methods are the request context supplied by the client, -the publisher_id of the notification message, the event_type, the payload and -metadata. The metadata parameter is a mapping containing a unique message_id -and a timestamp. By supplying a serializer object, a listener can deserialize a request context -and arguments from - and serialize return values to - primitive types. - -By supplying a pool name you can create multiple groups of listeners consuming -notifications and that each group only receives one copy of each -notification. - -An endpoint method can explicitly return -oslo_messaging.NotificationResult.HANDLED to acknowledge a message or -oslo_messaging.NotificationResult.REQUEUE to requeue the message. - -The message is acknowledged only if all endpoints either return -oslo_messaging.NotificationResult.HANDLED or None. - -Note that not all transport drivers implement support for requeueing. In order -to use this feature, applications should assert that the feature is available -by passing allow_requeue=True to get_notification_listener(). If the driver -does not support requeueing, it will raise NotImplementedError at this point. +and arguments from primitive types. """ import itertools diff --git a/oslo_messaging/notify/messaging.py b/oslo_messaging/notify/messaging.py index e7642ca94..78f2e8825 100644 --- a/oslo_messaging/notify/messaging.py +++ b/oslo_messaging/notify/messaging.py @@ -15,6 +15,27 @@ # License for the specific language governing permissions and limitations # under the License. +""" +Notification drivers for sending notifications via messaging. + +The messaging drivers publish notification messages to notification +listeners. + +The driver will block the notifier's thread until the notification message has +been passed to the messaging transport. There is no guarantee that the +notification message will be consumed by a notification listener. + +Notification messages are sent 'at-most-once' - ensuring that they are not +duplicated. + +If the connection to the messaging service is not active when a notification is +sent this driver will block waiting for the connection to complete. If the +connection fails to complete, the driver will try to re-establish that +connection. By default this will continue indefinitely until the connection +completes. However, the retry parameter can be used to have the notification +send fail with a MessageDeliveryFailure after the given number of retries. +""" + import logging import oslo_messaging diff --git a/oslo_messaging/notify/notifier.py b/oslo_messaging/notify/notifier.py index 1840d559a..bc3c8a201 100644 --- a/oslo_messaging/notify/notifier.py +++ b/oslo_messaging/notify/notifier.py @@ -127,10 +127,11 @@ class Driver(object): :type msg: str :param priority: priority of the message :type priority: str - :param retry: an connection retries configuration - None or -1 means to retry forever - 0 means no retry - N means N retries + :param retry: connection retries configuration (used by the messaging + driver): + None or -1 means to retry forever. + 0 means no retry is attempted. + N means attempt at most N retries. :type retry: int """ pass @@ -235,10 +236,11 @@ class Notifier(object): :type topic: str :param serializer: an optional entity serializer :type serializer: Serializer - :param retry: an connection retries configuration - None or -1 means to retry forever - 0 means no retry - N means N retries + :param retry: connection retries configuration (used by the messaging + driver): + None or -1 means to retry forever. + 0 means no retry is attempted. + N means attempt at most N retries. :type retry: int :param topics: the topics which to send messages on :type topics: list of strings @@ -285,10 +287,11 @@ class Notifier(object): :param publisher_id: field in notifications sent, for example 'compute.host1' :type publisher_id: str - :param retry: an connection retries configuration - None or -1 means to retry forever - 0 means no retry - N means N retries + :param retry: connection retries configuration (used by the messaging + driver): + None or -1 means to retry forever. + 0 means no retry is attempted. + N means attempt at most N retries. :type retry: int """ return _SubNotifier._prepare(self, publisher_id, retry=retry) diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py index 88e21a549..c04d3188d 100644 --- a/oslo_messaging/rpc/client.py +++ b/oslo_messaging/rpc/client.py @@ -222,19 +222,34 @@ class _CallContext(_BaseCallContext): class RPCClient(_BaseCallContext): - """A class for invoking methods on remote servers. + """A class for invoking methods on remote RPC servers. - The RPCClient class is responsible for sending method invocations to remote - servers via a messaging transport. + The RPCClient class is responsible for sending method invocations to and + receiving return values from remote RPC servers via a messaging transport. - A default target is supplied to the RPCClient constructor, but target - attributes can be overridden for individual method invocations using the - prepare() method. + Two RPC patterns are supported: RPC calls and RPC casts. + + An RPC cast is used when an RPC method does *not* return a value to + the caller. An RPC call is used when a return value is expected from the + method. For further information see the cast() and call() methods. + + The default target used for all subsequent calls and casts is supplied to + the RPCClient constructor. The client uses the target to control how the + RPC request is delivered to a server. If only the target's topic (and + optionally exchange) are set, then the RPC can be serviced by any server + that is listening to that topic (and exchange). If multiple servers are + listening on that topic/exchange, then one server is picked using a + best-effort round-robin algorithm. Alternatively, the client can set the + Target's ``server`` attribute to the name of a specific server to send the + RPC request to one particular server. In the case of RPC cast, the RPC + request can be broadcast to all servers listening to the Target's + topic/exchange by setting the Target's ``fanout`` property to ``True``. + + While the default target is set on construction, target attributes can be + overridden for individual method invocations using the prepare() method. A method invocation consists of a request context dictionary, a method name - and a dictionary of arguments. A cast() invocation just sends the request - and returns immediately. A call() invocation waits for the server to send - a return value. + and a dictionary of arguments. This class is intended to be used by wrapping it in another class which provides methods on the subclass to perform the remote invocation using @@ -275,10 +290,13 @@ class RPCClient(_BaseCallContext): but this is probably only useful in limited circumstances as a wrapper class will usually help to make the code much more obvious. - By default, cast() and call() will block until the message is successfully - sent. However, the retry parameter can be used to have message sending - fail with a MessageDeliveryFailure after the given number of retries. For - example:: + If the connection to the messaging service is not active when an RPC + request is made the client will block waiting for the connection to + complete. If the connection fails to complete, the client will try to + re-establish that connection. By default this will continue indefinitely + until the connection completes. However, the retry parameter can be used to + have the RPC request fail with a MessageDeliveryFailure after the given + number of retries. For example:: client = messaging.RPCClient(transport, target, retry=None) client.call(ctxt, 'sync') @@ -304,10 +322,10 @@ class RPCClient(_BaseCallContext): :type version_cap: str :param serializer: an optional entity serializer :type serializer: Serializer - :param retry: an optional default connection retries configuration - None or -1 means to retry forever - 0 means no retry - N means N retries + :param retry: an optional default connection retries configuration: + None or -1 means to retry forever. + 0 means no retry is attempted. + N means attempt at most N retries. :type retry: int """ if serializer is None: @@ -347,10 +365,10 @@ class RPCClient(_BaseCallContext): :type timeout: int or float :param version_cap: raise a RPCVersionCapError version exceeds this cap :type version_cap: str - :param retry: an optional connection retries configuration - None or -1 means to retry forever - 0 means no retry - N means N retries + :param retry: an optional connection retries configuration: + None or -1 means to retry forever. + 0 means no retry is attempted. + N means attempt at most N retries. :type retry: int """ return _CallContext._prepare(self, @@ -359,7 +377,22 @@ class RPCClient(_BaseCallContext): timeout, version_cap, retry) def cast(self, ctxt, method, **kwargs): - """Invoke a method and return immediately. + """Invoke a method without blocking for a return value. + + The cast() method is used to invoke an RPC method that does not return + a value. cast() RPC requests may be broadcast to all Servers listening + on a given topic by setting the fanout Target property to ``True``. + + The cast() operation is best-effort: cast() will block the + calling thread until the RPC request method is accepted by the + messaging transport, but cast() does *not* verify that the RPC method + has been invoked by the server. cast() does guarantee that the the + method will be not executed twice on a destination (e.g. 'at-most-once' + execution). + + There are no ordering guarantees across successive casts, even + among casts to the same destination. Therefore methods may be executed + in an order different from the order in which they are cast. Method arguments must either be primitive types or types supported by the client's serializer (if any). @@ -367,27 +400,37 @@ class RPCClient(_BaseCallContext): Similarly, the request context must be a dict unless the client's serializer supports serializing another type. - Note: cast does not ensure that the remote method will be executed on - each destination. But it does ensure that the method will be not - executed twice on a destination (e.g. 'at-most-once' execution). - - Note: there are no ordering guarantees across successive casts, even - among casts to the same destination. Therefore methods may be executed - in an order different from the order in which they are cast. - :param ctxt: a request context dict :type ctxt: dict :param method: the method name :type method: str :param kwargs: a dict of method arguments :type kwargs: dict - :raises: MessageDeliveryFailure + :raises: MessageDeliveryFailure if the messaging transport fails to + accept the request. + """ self.prepare().cast(ctxt, method, **kwargs) def call(self, ctxt, method, **kwargs): """Invoke a method and wait for a reply. + The call() method is used to invoke RPC methods that return a + value. Since only a single return value is permitted it is not possible + to call() to a fanout target. + + call() will block the calling thread until the messaging transport + provides the return value, a timeout occurs, or a non-recoverable error + occurs. + + call() guarantees that the RPC request is done 'at-most-once' which + ensures that the call will never be duplicated. However if the call + should fail or time out before the return value arrives then there are + no guarantees whether or not the method was invoked. + + Since call() blocks until completion of the RPC method, call()s from + the same thread are guaranteed to be processed in-order. + Method arguments must either be primitive types or types supported by the client's serializer (if any). Similarly, the request context must be a dict unless the client's serializer supports serializing another @@ -411,12 +454,6 @@ class RPCClient(_BaseCallContext): allowed_remote_exmods list, then a messaging.RemoteError exception is raised with all details of the remote exception. - Note: call is done 'at-most-once'. In case of we can't known - if the call have been done correctly, because we didn't get the - response on time, MessagingTimeout exception is raised. - The real reason can vary, transport failure, worker - doesn't answer in time or crash, ... - :param ctxt: a request context dict :type ctxt: dict :param method: the method name diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index 51f3540fb..dddadb950 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -31,14 +31,49 @@ The target supplied when creating an RPC server expresses the topic, server name and - optionally - the exchange to listen on. See Target for more details on these attributes. +Multiple RPC Servers may listen to the same topic (and exchange) +simultaineously. See RPCClient for details regarding how RPC requests are +distributed to the Servers in this case. + Each endpoint object may have a target attribute which may have namespace and version fields set. By default, we use the 'null namespace' and version 1.0. Incoming method calls will be dispatched to the first endpoint with the requested method, a matching namespace and a compatible version number. -RPC servers have start(), stop() and wait() messages to begin handling -requests, stop handling requests and wait for all in-process requests to -complete. +The first parameter to method invocations is always the request context +supplied by the client. The remaining parameters are the arguments supplied to +the method by the client. Endpoint methods may return a value. If so the RPC +Server will send the returned value back to the requesting client via the +transport. + +The executor parameter controls how incoming messages will be received and +dispatched. By default, the most simple executor is used - the blocking +executor. This executor processes inbound RPC requests on the server's thread, +blocking it from processing additional requests until it finishes with the +current request. This includes time spent sending the reply message to the +transport if the method returns a result. Refer to the Executor documentation +for descriptions of the other types of executors. + +*Note:* If the "eventlet" executor is used, the threading and time library need +to be monkeypatched. + +The RPC reply operation is best-effort: the server will consider the message +containing the reply successfully sent once it is accepted by the messaging +transport. The server does not guarantee that the reply is processed by the +RPC client. If the send fails an error will be logged and the server will +continue to processing incoming RPC requests. + +Parameters to the method invocation and values returned from the method are +python primitive types. However the actual encoding of the data in the message +may not be in primitive form (e.g. the message payload may be a dictionary +encoded as an ASCII string using JSON). A serializer object is used to convert +incoming encoded message data to primitive types. The serializer is also used +to convert the return value from primitive types to an encoding suitable for +the message payload. + +RPC servers have start(), stop() and wait() methods to begin handling +requests, stop handling requests, and wait for all in-process requests to +complete after the Server has been stopped. A simple example of an RPC server with multiple endpoints might be:: @@ -81,20 +116,6 @@ A simple example of an RPC server with multiple endpoints might be:: server.stop() server.wait() -Clients can invoke methods on the server by sending the request to a topic and -it gets sent to one of the servers listening on the topic, or by sending the -request to a specific server listening on the topic, or by sending the request -to all servers listening on the topic (known as fanout). These modes are chosen -via the server and fanout attributes on Target but the mode used is transparent -to the server. - -The first parameter to method invocations is always the request context -supplied by the client. - -Parameters to the method invocation are primitive types and so must be the -return values from the methods. By supplying a serializer object, a server can -deserialize a request context and arguments from - and serialize return values -to - primitive types. """ __all__ = [ @@ -160,13 +181,6 @@ def get_rpc_server(transport, target, endpoints, executor='blocking', serializer=None, access_policy=None): """Construct an RPC server. - The executor parameter controls how incoming messages will be received and - dispatched. By default, the most simple executor is used - the blocking - executor. - - If the eventlet executor is used, the threading and time library need to be - monkeypatched. - :param transport: the messaging transport :type transport: Transport :param target: the exchange, topic and server to listen on diff --git a/oslo_messaging/target.py b/oslo_messaging/target.py index e91cc878a..fcf7e50ff 100644 --- a/oslo_messaging/target.py +++ b/oslo_messaging/target.py @@ -24,12 +24,16 @@ class Target(object): Different subsets of the information encapsulated in a Target object is relevant to various aspects of the API: - creating a server: + an RPC Server's target: topic and server is required; exchange is optional - an endpoint's target: + an RPC endpoint's target: namespace and version is optional - client sending a message: + an RPC client sending a message: topic is required, all other attributes optional + a Notification Server's target: + topic is required, exchange is optional; all other attributes ignored + a Notifier's target: + topic is required, exchange is optional; all other attributes ignored Its attributes are: @@ -38,24 +42,26 @@ class Target(object): :type exchange: str :param topic: A name which identifies the set of interfaces exposed by a server. Multiple servers may listen on a topic and messages will be - dispatched to one of the servers in a round-robin fashion. + dispatched to one of the servers selected in a best-effort round-robin + fashion (unless fanout is ``True``). :type topic: str - :param namespace: Identifies a particular interface (i.e. set of methods) - exposed by a server. The default interface has no namespace identifier - and is referred to as the null namespace. + :param namespace: Identifies a particular RPC interface (i.e. set of + methods) exposed by a server. The default interface has no namespace + identifier and is referred to as the null namespace. :type namespace: str - :param version: Interfaces have a major.minor version number associated + :param version: RPC interfaces have a major.minor version number associated with them. A minor number increment indicates a backwards compatible change and an incompatible change is indicated by a major number bump. Servers may implement multiple major versions and clients may require indicate that their message requires a particular minimum minor version. :type version: str - :param server: Clients can request that a message be directed to a specific - server, rather than just one of a pool of servers listening on the topic. + :param server: RPC Clients can request that a message be directed to a + specific server, rather than just one of a pool of servers listening on + the topic. :type server: str - :param fanout: Clients may request that a message be directed to all - servers listening on a topic by setting fanout to ``True``, rather than - just one of them. + :param fanout: Clients may request that a copy of the message be delivered + to all servers listening on a topic by setting fanout to ``True``, rather + than just one of them. :type fanout: bool :param legacy_namespaces: A server always accepts messages specified via the 'namespace' parameter, and may also accept messages defined via