Adding reply infrastructure

This is pretty rough.
This commit is contained in:
Mark McLoughlin 2013-05-11 13:19:59 +01:00
parent 9a7eb04e13
commit 4771afb5e2
2 changed files with 13 additions and 2 deletions

View File

@ -34,6 +34,10 @@ class Listener(object):
# so the transport can ack the message
pass
@abc.abstractmethod
def reply(self, reply=None, failure=None):
pass
class BaseDriver(object):

View File

@ -13,6 +13,7 @@
# under the License.
import abc
import sys
from openstack.common.gettextutils import _
from openstack.common import log as logging
@ -32,9 +33,15 @@ class ExecutorBase(object):
def _process_one_message(self):
message = self.listener.poll()
try:
self.callback(message)
reply = self.callback(message)
if reply:
self.listener.reply(reply)
except Exception:
_LOG.exception(_("Failed to process message... skipping it."))
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
_LOG.error(_("Failed to process message... skipping it."),
exc_info=exc_info)
self.listener.reply(failure=exc_info)
finally:
self.listener.done(message)