From da4ee6361baeaf34ce3bfe24b53a2fc371ae9e75 Mon Sep 17 00:00:00 2001 From: Oleksii Zamiatin Date: Wed, 5 Aug 2015 16:49:23 +0300 Subject: [PATCH] Documenting main driver classes Main classes destination and methods parameters. Change-Id: I23d906855d616830dfc12c5e9e32881a4600b6ff --- oslo_messaging/_drivers/impl_zmq.py | 73 ++++++++++++++++++- .../client/publishers/zmq_publisher_base.py | 44 ++++++++++- .../_drivers/zmq_driver/client/zmq_request.py | 23 ++++++ .../_drivers/zmq_driver/zmq_poller.py | 63 ++++++++++++++-- 4 files changed, 194 insertions(+), 9 deletions(-) diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 7a408600..f0ff1a64 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -83,14 +83,35 @@ zmq_opts = [ class ZmqDriver(base.BaseDriver): - """ZeroMQ Driver + + """ZeroMQ Driver implementation. + + Provides implementation of RPC and Notifier APIs by means + of ZeroMQ library. See :doc:`zmq_driver` for details. - """ def __init__(self, conf, url, default_exchange=None, allowed_remote_exmods=None): + """Construct ZeroMQ driver. + + Intialize driver options. + + Construct matchmaker - pluggable interface to targets management + Name Service + + Construct client and server controllers + + :param conf: oslo messaging configuration object + :type conf: oslo_config.CONF + :param url: transport URL + :type url: TransportUrl + :param default_exchange: Not used in zmq implementation + :type default_exchange: None + :param allowed_remote_exmods: remote exception passing options + :type allowed_remote_exmods: list + """ conf.register_opts(zmq_opts) conf.register_opts(executor_base._pool_opts) self.conf = conf @@ -108,6 +129,24 @@ class ZmqDriver(base.BaseDriver): def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, retry=None): + """Send RPC message to server + + :param target: Message destination target + :type target: oslo_messaging.Target + :param ctxt: Message context + :type ctxt: dict + :param message: Message payload to pass + :type message: dict + :param wait_for_reply: Waiting for reply flag + :type wait_for_reply: bool + :param timeout: Reply waiting timeout in seconds + :type timeout: int + :param retry: an optional default connection retries configuration + None or -1 means to retry forever + 0 means no retry + N means N retries + :type retry: int + """ timeout = timeout or self.conf.rpc_response_timeout if wait_for_reply: return self.client.send_call(target, ctxt, message, timeout, retry) @@ -117,6 +156,22 @@ class ZmqDriver(base.BaseDriver): self.client.send_cast(target, ctxt, message, timeout, retry) def send_notification(self, target, ctxt, message, version, retry=None): + """Send notification to server + + :param target: Message destination target + :type target: oslo_messaging.Target + :param ctxt: Message context + :type ctxt: dict + :param message: Message payload to pass + :type message: dict + :param version: Messaging API version + :type version: str + :param retry: an optional default connection retries configuration + None or -1 means to retry forever + 0 means no retry + N means N retries + :type retry: int + """ if target.fanout: self.client.send_notify_fanout(target, ctxt, message, version, retry) @@ -124,13 +179,27 @@ class ZmqDriver(base.BaseDriver): self.client.send_notify(target, ctxt, message, version, retry) def listen(self, target): + """Listen to a specified target on a server side + + :param target: Message destination target + :type target: oslo_messaging.Target + """ self.server.listen(target) return self.server def listen_for_notifications(self, targets_and_priorities, pool): + """Listen to a specified list of targets on a server side + + :param targets_and_priorities: List of pairs (target, priority) + :type targets_and_priorities: list + :param pool: Not used for zmq implementation + :type pool: object + """ self.server.listen_notification(targets_and_priorities) return self.server def cleanup(self): + """Cleanup all driver's connections finally + """ self.client.cleanup() self.server.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index 0f32f588..a367e9ed 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -26,7 +26,16 @@ zmq = zmq_async.import_zmq() class UnsupportedSendPattern(rpc_common.RPCException): + """Exception to raise from publishers in case of unsupported + sending pattern called. + """ + def __init__(self, pattern_name): + """Construct exception object + + :param pattern_name: Message type name from zmq_names + :type pattern_name: str + """ errmsg = _LE("Sending pattern %s is unsupported.") % pattern_name super(UnsupportedSendPattern, self).__init__(errmsg) @@ -34,7 +43,27 @@ class UnsupportedSendPattern(rpc_common.RPCException): @six.add_metaclass(abc.ABCMeta) class PublisherBase(object): + """Abstract publisher class + + Each publisher from zmq-driver client should implement + this interface to serve as a messages publisher. + + Publisher can send request objects from zmq_request. + """ + def __init__(self, conf, matchmaker): + + """Construct publisher + + Accept configuration object and Name Service interface object. + Create zmq.Context and connected sockets dictionary. + + :param conf: configuration object + :type conf: oslo_config.CONF + :param matchmaker: Name Service interface object + :type matchmaker: matchmaker.MatchMakerBase + """ + self.conf = conf self.zmq_context = zmq.Context() self.matchmaker = matchmaker @@ -43,14 +72,27 @@ class PublisherBase(object): @abc.abstractmethod def send_request(self, request): - """Send request to consumer""" + """Send request to consumer + + :param request: Message data and destination container object + :type request: zmq_request.Request + """ def _send_request(self, socket, request): + """Send request to consumer. + Helper private method which defines basic sending behavior. + + :param socket: Socket to publish message on + :type socket: zmq.Socket + :param request: Message data and destination container object + :type request: zmq_request.Request + """ socket.send_string(request.msg_type, zmq.SNDMORE) socket.send_json(request.context, zmq.SNDMORE) socket.send_json(request.message) def cleanup(self): + """Cleanup publisher. Close allocated connections.""" for socket, hosts in self.outbound_sockets.values(): socket.setsockopt(zmq.LINGER, 0) socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index 1caedff3..accebae1 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -29,8 +29,31 @@ zmq = zmq_async.import_zmq() @six.add_metaclass(abc.ABCMeta) class Request(object): + """Zmq request abstract class + + Represents socket (publisher) independent data object to publish. + Request object should contain all needed information for a publisher + to publish it, for instance: message payload, target, timeout + and retries etc. + """ + def __init__(self, target, context=None, message=None, retry=None): + """Construct request object + + :param target: Message destination target + :type target: oslo_messaging.Target + :param context: Message context + :type context: dict + :param message: Message payload to pass + :type message: dict + :param retry: an optional default connection retries configuration + None or -1 means to retry forever + 0 means no retry + N means N retries + :type retry: int + """ + if self.msg_type not in zmq_names.MESSAGE_TYPES: raise RuntimeError("Unknown message type!") diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py index 437c841a..a62ea8a6 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py @@ -20,36 +20,87 @@ import six @six.add_metaclass(abc.ABCMeta) class ZmqPoller(object): + """Base poller interface + + Needed to poll on zmq sockets in green and native async manner. + Native poller implementation wraps zmq.Poller helper class. + Wrapping is needed to provide unified poller interface + in zmq-driver (for both native and zmq pollers). It makes some + difference with poller-helper from zmq library which doesn't actually + receive message. + + The poller object should be obtained over: + + poller = zmq_async.get_poller() + + Then we have to register sockets for polling. We are able + to provide specific receiving method. By default poller calls + socket.recv_multipart. + + def receive_message(socket): + id = socket.recv_string() + ctxt = socket.recv_json() + msg = socket.recv_json() + return (id, ctxt, msg) + + poller.register(socket, recv_method=receive_message) + + Further to receive a message we should call: + + message, socket = poller.poll() + + The 'message' here contains (id, ctxt, msg) tuple. + """ + @abc.abstractmethod def register(self, socket, recv_method=None): - """Register socket to poll""" + """Register socket to poll + + :param socket: Socket to subscribe for polling + :type socket: zmq.Socket + :param recv_method: Optional specific receiver procedure + Should return received message object + :type recv_method: callable + """ @abc.abstractmethod def poll(self, timeout=None): - """Poll for messages""" + """Poll for messages + + :param timeout: Optional polling timeout + None or -1 means poll forever + any positive value means timeout in seconds + :type timeout: int + :returns: (message, socket) tuple + """ @abc.abstractmethod def close(self): """Terminate polling""" def resume_polling(self, socket): - """Resume with polling""" + """Resume with polling + + Some implementations of poller may provide hold polling before reply + This method is intended to excplicitly resume polling aftewards. + """ @six.add_metaclass(abc.ABCMeta) class Executor(object): + """Base executor interface for threading/green async executors""" def __init__(self, thread): self.thread = thread @abc.abstractmethod def execute(self): - 'Run execution' + """Run execution""" @abc.abstractmethod def stop(self): - 'Stop execution' + """Stop execution""" @abc.abstractmethod def wait(self): - 'Wait until pass' + """Wait until pass"""