diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 8fc72dbbb..abda9b0ba 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -178,6 +178,9 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): # the end. self._message_operations_handler.do(self.message.requeue) + def heartbeat(self): + LOG.debug("Message heartbeat not implemented") + class ObsoleteReplyQueuesCache(object): """Cache of reply queue id that doesn't exists anymore. diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index c09ab6f42..f02344126 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -154,6 +154,19 @@ class RpcIncomingMessage(IncomingMessage): :raises: Does not raise an exception """ + @abc.abstractmethod + def heartbeat(self): + """Called by the server to send an RPC heartbeat message back to + the calling client. + + If the client (is new enough to have) passed its timeout value during + the RPC call, this method will be called periodically by the server + to update the client's timeout timer while a long-running call is + executing. + + :raises: Does not raise an exception + """ + @six.add_metaclass(abc.ABCMeta) class PollStyleListener(object): diff --git a/oslo_messaging/_drivers/impl_amqp1.py b/oslo_messaging/_drivers/impl_amqp1.py index 5c1973d02..5714a6124 100644 --- a/oslo_messaging/_drivers/impl_amqp1.py +++ b/oslo_messaging/_drivers/impl_amqp1.py @@ -98,6 +98,9 @@ class ProtonIncomingMessage(base.RpcIncomingMessage): self._correlation_id = message.id self._disposition = disposition + def heartbeat(self): + LOG.debug("Message heartbeat not implemented") + def reply(self, reply=None, failure=None): """Schedule an RPCReplyTask to send the reply.""" if self._reply_to: diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py index 6898350e7..fd66133e9 100644 --- a/oslo_messaging/_drivers/impl_fake.py +++ b/oslo_messaging/_drivers/impl_fake.py @@ -38,6 +38,9 @@ class FakeIncomingMessage(base.RpcIncomingMessage): def requeue(self): self.requeue_callback() + def heartbeat(self): + """Heartbeat is not supported.""" + class FakeListener(base.PollStyleListener): diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 007727c9f..c63fe1b0a 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -315,6 +315,9 @@ class OsloKafkaMessage(base.RpcIncomingMessage): def reply(self, reply=None, failure=None): LOG.warning(_LW("reply is not supported")) + def heartbeat(self): + LOG.warning(_LW("heartbeat is not supported")) + class KafkaListener(base.PollStyleListener): diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index a7ddd091c..9810388de 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -36,3 +36,6 @@ class ZmqIncomingMessage(base.RpcIncomingMessage): def requeue(self): """Requeue is not supported.""" + + def heartbeat(self): + """Heartbeat is not supported."""