Documenting main driver classes

Main classes destination and methods parameters.

Change-Id: I23d906855d616830dfc12c5e9e32881a4600b6ff
This commit is contained in:
Oleksii Zamiatin
2015-08-05 16:49:23 +03:00
parent 141f59bd9b
commit da4ee6361b
4 changed files with 194 additions and 9 deletions

View File

@@ -83,14 +83,35 @@ zmq_opts = [
class ZmqDriver(base.BaseDriver): class ZmqDriver(base.BaseDriver):
"""ZeroMQ Driver
"""ZeroMQ Driver implementation.
Provides implementation of RPC and Notifier APIs by means
of ZeroMQ library.
See :doc:`zmq_driver` for details. See :doc:`zmq_driver` for details.
""" """
def __init__(self, conf, url, default_exchange=None, def __init__(self, conf, url, default_exchange=None,
allowed_remote_exmods=None): allowed_remote_exmods=None):
"""Construct ZeroMQ driver.
Intialize driver options.
Construct matchmaker - pluggable interface to targets management
Name Service
Construct client and server controllers
:param conf: oslo messaging configuration object
:type conf: oslo_config.CONF
:param url: transport URL
:type url: TransportUrl
:param default_exchange: Not used in zmq implementation
:type default_exchange: None
:param allowed_remote_exmods: remote exception passing options
:type allowed_remote_exmods: list
"""
conf.register_opts(zmq_opts) conf.register_opts(zmq_opts)
conf.register_opts(executor_base._pool_opts) conf.register_opts(executor_base._pool_opts)
self.conf = conf self.conf = conf
@@ -108,6 +129,24 @@ class ZmqDriver(base.BaseDriver):
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None): retry=None):
"""Send RPC message to server
:param target: Message destination target
:type target: oslo_messaging.Target
:param ctxt: Message context
:type ctxt: dict
:param message: Message payload to pass
:type message: dict
:param wait_for_reply: Waiting for reply flag
:type wait_for_reply: bool
:param timeout: Reply waiting timeout in seconds
:type timeout: int
:param retry: an optional default connection retries configuration
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
timeout = timeout or self.conf.rpc_response_timeout timeout = timeout or self.conf.rpc_response_timeout
if wait_for_reply: if wait_for_reply:
return self.client.send_call(target, ctxt, message, timeout, retry) return self.client.send_call(target, ctxt, message, timeout, retry)
@@ -117,6 +156,22 @@ class ZmqDriver(base.BaseDriver):
self.client.send_cast(target, ctxt, message, timeout, retry) self.client.send_cast(target, ctxt, message, timeout, retry)
def send_notification(self, target, ctxt, message, version, retry=None): def send_notification(self, target, ctxt, message, version, retry=None):
"""Send notification to server
:param target: Message destination target
:type target: oslo_messaging.Target
:param ctxt: Message context
:type ctxt: dict
:param message: Message payload to pass
:type message: dict
:param version: Messaging API version
:type version: str
:param retry: an optional default connection retries configuration
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
if target.fanout: if target.fanout:
self.client.send_notify_fanout(target, ctxt, message, version, self.client.send_notify_fanout(target, ctxt, message, version,
retry) retry)
@@ -124,13 +179,27 @@ class ZmqDriver(base.BaseDriver):
self.client.send_notify(target, ctxt, message, version, retry) self.client.send_notify(target, ctxt, message, version, retry)
def listen(self, target): def listen(self, target):
"""Listen to a specified target on a server side
:param target: Message destination target
:type target: oslo_messaging.Target
"""
self.server.listen(target) self.server.listen(target)
return self.server return self.server
def listen_for_notifications(self, targets_and_priorities, pool): def listen_for_notifications(self, targets_and_priorities, pool):
"""Listen to a specified list of targets on a server side
:param targets_and_priorities: List of pairs (target, priority)
:type targets_and_priorities: list
:param pool: Not used for zmq implementation
:type pool: object
"""
self.server.listen_notification(targets_and_priorities) self.server.listen_notification(targets_and_priorities)
return self.server return self.server
def cleanup(self): def cleanup(self):
"""Cleanup all driver's connections finally
"""
self.client.cleanup() self.client.cleanup()
self.server.cleanup() self.server.cleanup()

View File

@@ -26,7 +26,16 @@ zmq = zmq_async.import_zmq()
class UnsupportedSendPattern(rpc_common.RPCException): class UnsupportedSendPattern(rpc_common.RPCException):
"""Exception to raise from publishers in case of unsupported
sending pattern called.
"""
def __init__(self, pattern_name): def __init__(self, pattern_name):
"""Construct exception object
:param pattern_name: Message type name from zmq_names
:type pattern_name: str
"""
errmsg = _LE("Sending pattern %s is unsupported.") % pattern_name errmsg = _LE("Sending pattern %s is unsupported.") % pattern_name
super(UnsupportedSendPattern, self).__init__(errmsg) super(UnsupportedSendPattern, self).__init__(errmsg)
@@ -34,7 +43,27 @@ class UnsupportedSendPattern(rpc_common.RPCException):
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class PublisherBase(object): class PublisherBase(object):
"""Abstract publisher class
Each publisher from zmq-driver client should implement
this interface to serve as a messages publisher.
Publisher can send request objects from zmq_request.
"""
def __init__(self, conf, matchmaker): def __init__(self, conf, matchmaker):
"""Construct publisher
Accept configuration object and Name Service interface object.
Create zmq.Context and connected sockets dictionary.
:param conf: configuration object
:type conf: oslo_config.CONF
:param matchmaker: Name Service interface object
:type matchmaker: matchmaker.MatchMakerBase
"""
self.conf = conf self.conf = conf
self.zmq_context = zmq.Context() self.zmq_context = zmq.Context()
self.matchmaker = matchmaker self.matchmaker = matchmaker
@@ -43,14 +72,27 @@ class PublisherBase(object):
@abc.abstractmethod @abc.abstractmethod
def send_request(self, request): def send_request(self, request):
"""Send request to consumer""" """Send request to consumer
:param request: Message data and destination container object
:type request: zmq_request.Request
"""
def _send_request(self, socket, request): def _send_request(self, socket, request):
"""Send request to consumer.
Helper private method which defines basic sending behavior.
:param socket: Socket to publish message on
:type socket: zmq.Socket
:param request: Message data and destination container object
:type request: zmq_request.Request
"""
socket.send_string(request.msg_type, zmq.SNDMORE) socket.send_string(request.msg_type, zmq.SNDMORE)
socket.send_json(request.context, zmq.SNDMORE) socket.send_json(request.context, zmq.SNDMORE)
socket.send_json(request.message) socket.send_json(request.message)
def cleanup(self): def cleanup(self):
"""Cleanup publisher. Close allocated connections."""
for socket, hosts in self.outbound_sockets.values(): for socket, hosts in self.outbound_sockets.values():
socket.setsockopt(zmq.LINGER, 0) socket.setsockopt(zmq.LINGER, 0)
socket.close() socket.close()

View File

@@ -29,8 +29,31 @@ zmq = zmq_async.import_zmq()
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class Request(object): class Request(object):
"""Zmq request abstract class
Represents socket (publisher) independent data object to publish.
Request object should contain all needed information for a publisher
to publish it, for instance: message payload, target, timeout
and retries etc.
"""
def __init__(self, target, context=None, message=None, retry=None): def __init__(self, target, context=None, message=None, retry=None):
"""Construct request object
:param target: Message destination target
:type target: oslo_messaging.Target
:param context: Message context
:type context: dict
:param message: Message payload to pass
:type message: dict
:param retry: an optional default connection retries configuration
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
if self.msg_type not in zmq_names.MESSAGE_TYPES: if self.msg_type not in zmq_names.MESSAGE_TYPES:
raise RuntimeError("Unknown message type!") raise RuntimeError("Unknown message type!")

View File

@@ -20,36 +20,87 @@ import six
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class ZmqPoller(object): class ZmqPoller(object):
"""Base poller interface
Needed to poll on zmq sockets in green and native async manner.
Native poller implementation wraps zmq.Poller helper class.
Wrapping is needed to provide unified poller interface
in zmq-driver (for both native and zmq pollers). It makes some
difference with poller-helper from zmq library which doesn't actually
receive message.
The poller object should be obtained over:
poller = zmq_async.get_poller()
Then we have to register sockets for polling. We are able
to provide specific receiving method. By default poller calls
socket.recv_multipart.
def receive_message(socket):
id = socket.recv_string()
ctxt = socket.recv_json()
msg = socket.recv_json()
return (id, ctxt, msg)
poller.register(socket, recv_method=receive_message)
Further to receive a message we should call:
message, socket = poller.poll()
The 'message' here contains (id, ctxt, msg) tuple.
"""
@abc.abstractmethod @abc.abstractmethod
def register(self, socket, recv_method=None): def register(self, socket, recv_method=None):
"""Register socket to poll""" """Register socket to poll
:param socket: Socket to subscribe for polling
:type socket: zmq.Socket
:param recv_method: Optional specific receiver procedure
Should return received message object
:type recv_method: callable
"""
@abc.abstractmethod @abc.abstractmethod
def poll(self, timeout=None): def poll(self, timeout=None):
"""Poll for messages""" """Poll for messages
:param timeout: Optional polling timeout
None or -1 means poll forever
any positive value means timeout in seconds
:type timeout: int
:returns: (message, socket) tuple
"""
@abc.abstractmethod @abc.abstractmethod
def close(self): def close(self):
"""Terminate polling""" """Terminate polling"""
def resume_polling(self, socket): def resume_polling(self, socket):
"""Resume with polling""" """Resume with polling
Some implementations of poller may provide hold polling before reply
This method is intended to excplicitly resume polling aftewards.
"""
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class Executor(object): class Executor(object):
"""Base executor interface for threading/green async executors"""
def __init__(self, thread): def __init__(self, thread):
self.thread = thread self.thread = thread
@abc.abstractmethod @abc.abstractmethod
def execute(self): def execute(self):
'Run execution' """Run execution"""
@abc.abstractmethod @abc.abstractmethod
def stop(self): def stop(self):
'Stop execution' """Stop execution"""
@abc.abstractmethod @abc.abstractmethod
def wait(self): def wait(self):
'Wait until pass' """Wait until pass"""