Add heartbeat() method to RpcIncomingMessage

This adds a heartbeat() method to RpcIncomingMessage to be used by a
subsequent patch implementation of active-call heartbeating. This is
unimplemented in all drivers for the moment.

Change-Id: If8ab0dc16e3bef69d5a826c31c0fe35e403ac6a1
This commit is contained in:
Dan Smith 2018-03-03 13:48:11 -08:00
parent d1f241a410
commit 930e6189e2
6 changed files with 28 additions and 0 deletions

View File

@ -178,6 +178,9 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
# the end.
self._message_operations_handler.do(self.message.requeue)
def heartbeat(self):
LOG.debug("Message heartbeat not implemented")
class ObsoleteReplyQueuesCache(object):
"""Cache of reply queue id that doesn't exists anymore.

View File

@ -154,6 +154,19 @@ class RpcIncomingMessage(IncomingMessage):
:raises: Does not raise an exception
"""
@abc.abstractmethod
def heartbeat(self):
"""Called by the server to send an RPC heartbeat message back to
the calling client.
If the client (is new enough to have) passed its timeout value during
the RPC call, this method will be called periodically by the server
to update the client's timeout timer while a long-running call is
executing.
:raises: Does not raise an exception
"""
@six.add_metaclass(abc.ABCMeta)
class PollStyleListener(object):

View File

@ -98,6 +98,9 @@ class ProtonIncomingMessage(base.RpcIncomingMessage):
self._correlation_id = message.id
self._disposition = disposition
def heartbeat(self):
LOG.debug("Message heartbeat not implemented")
def reply(self, reply=None, failure=None):
"""Schedule an RPCReplyTask to send the reply."""
if self._reply_to:

View File

@ -38,6 +38,9 @@ class FakeIncomingMessage(base.RpcIncomingMessage):
def requeue(self):
self.requeue_callback()
def heartbeat(self):
"""Heartbeat is not supported."""
class FakeListener(base.PollStyleListener):

View File

@ -315,6 +315,9 @@ class OsloKafkaMessage(base.RpcIncomingMessage):
def reply(self, reply=None, failure=None):
LOG.warning(_LW("reply is not supported"))
def heartbeat(self):
LOG.warning(_LW("heartbeat is not supported"))
class KafkaListener(base.PollStyleListener):

View File

@ -36,3 +36,6 @@ class ZmqIncomingMessage(base.RpcIncomingMessage):
def requeue(self):
"""Requeue is not supported."""
def heartbeat(self):
"""Heartbeat is not supported."""