From 4771afb5e2d1823606c5a51637ca82736de21989 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Sat, 11 May 2013 13:19:59 +0100 Subject: [PATCH] Adding reply infrastructure This is pretty rough. --- openstack/common/messaging/_drivers/base.py | 4 ++++ openstack/common/messaging/_executors/base.py | 11 +++++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/openstack/common/messaging/_drivers/base.py b/openstack/common/messaging/_drivers/base.py index a515dd5ec..28b9c13d0 100644 --- a/openstack/common/messaging/_drivers/base.py +++ b/openstack/common/messaging/_drivers/base.py @@ -34,6 +34,10 @@ class Listener(object): # so the transport can ack the message pass + @abc.abstractmethod + def reply(self, reply=None, failure=None): + pass + class BaseDriver(object): diff --git a/openstack/common/messaging/_executors/base.py b/openstack/common/messaging/_executors/base.py index 8b3818078..5a4d939da 100644 --- a/openstack/common/messaging/_executors/base.py +++ b/openstack/common/messaging/_executors/base.py @@ -13,6 +13,7 @@ # under the License. import abc +import sys from openstack.common.gettextutils import _ from openstack.common import log as logging @@ -32,9 +33,15 @@ class ExecutorBase(object): def _process_one_message(self): message = self.listener.poll() try: - self.callback(message) + reply = self.callback(message) + if reply: + self.listener.reply(reply) except Exception: - _LOG.exception(_("Failed to process message... skipping it.")) + # sys.exc_info() is deleted by LOG.exception(). + exc_info = sys.exc_info() + _LOG.error(_("Failed to process message... skipping it."), + exc_info=exc_info) + self.listener.reply(failure=exc_info) finally: self.listener.done(message)