Make poll() just return the message

The driver shouldn't be pulling the namespace and version from the
message since that's RPC specific stuff.

Also, it's not terribly useful for the driver to pass back a target
object describing the exchange and topic the message was received on
since that's implicit in the listener.
This commit is contained in:
Mark McLoughlin 2013-05-11 16:17:00 +01:00
parent 7c3697f77b
commit 93447e8381
3 changed files with 7 additions and 9 deletions

View File

@ -26,9 +26,7 @@ class Listener(object):
@abc.abstractmethod @abc.abstractmethod
def poll(self): def poll(self):
# returns (target, message) # returns message
# target includes the (exchange, topic, namespace, version) which the
# message was sent to
pass pass
@abc.abstractmethod @abc.abstractmethod

View File

@ -30,9 +30,9 @@ class ExecutorBase(object):
self.callback = callback self.callback = callback
def _process_one_message(self): def _process_one_message(self):
(target, message) = self.listener.poll() message = self.listener.poll()
try: try:
self.callback(target, message) self.callback(message)
except Exception: except Exception:
_LOG.exception(_("Failed to process message... skipping it.")) _LOG.exception(_("Failed to process message... skipping it."))
finally: finally:

View File

@ -57,15 +57,15 @@ class RPCDispatcher(object):
endpoint_version = endpoint.target.version or '1.0' endpoint_version = endpoint.target.version or '1.0'
return utils.version_is_compatible(endpoint_version, version) return utils.version_is_compatible(endpoint_version, version)
def __call__(self, target, message): def __call__(self, message):
method = message.get('method') method = message.get('method')
args = message.get('args', {}) args = message.get('args', {})
namespace = message.get('namespace')
version = target.version or '1.0' version = message.get('version', '1.0')
found_compatible = False found_compatible = False
for endpoint in self.endpoints: for endpoint in self.endpoints:
if target.namespace != endpoint.target.namespace: if namespace != endpoint.target.namespace:
continue continue
is_compatible = self._is_compatible(endpoint, version) is_compatible = self._is_compatible(endpoint, version)