Introduce message types for WBE protocol
* Abstract message class added - all messages types are derived from it now and have to implement the `to_dict` method, so it can be serialized and transferred with broker; * Implemented the `Response` message type, that restores failures from dictionary on creation; * Corrected and improved unit tests; Change-Id: I10e017a613f0422420d0244b9f8786f988863107
This commit is contained in:
@@ -36,23 +36,32 @@ class Server(object):
|
||||
self._endpoints = dict([(endpoint.name, endpoint)
|
||||
for endpoint in endpoints])
|
||||
|
||||
def _on_message(self, request, message):
|
||||
"""This method is called on incoming request."""
|
||||
LOG.debug("Got request: %s", request)
|
||||
# NOTE(skudriashev): Process all incoming requests only if proxy is
|
||||
def _on_message(self, data, message):
|
||||
"""This method is called on incoming message."""
|
||||
LOG.debug("Got message: %s", data)
|
||||
# NOTE(skudriashev): Process all incoming messages only if proxy is
|
||||
# running, otherwise requeue them.
|
||||
if self._proxy.is_running:
|
||||
# NOTE(skudriashev): Process request only if message has been
|
||||
# acknowledged successfully.
|
||||
try:
|
||||
# acknowledge message
|
||||
# acknowledge message before processing
|
||||
message.ack()
|
||||
except kombu_exc.MessageStateError:
|
||||
LOG.exception("Failed to acknowledge AMQP message.")
|
||||
else:
|
||||
LOG.debug("AMQP message acknowledged.")
|
||||
# spawn new thread to process request
|
||||
self._executor.submit(self._process_request, request, message)
|
||||
try:
|
||||
msg_type = message.properties['type']
|
||||
except KeyError:
|
||||
LOG.warning("The 'type' message property is missing.")
|
||||
else:
|
||||
if msg_type == pr.REQUEST:
|
||||
# spawn new thread to process request
|
||||
self._executor.submit(self._process_request, data,
|
||||
message)
|
||||
else:
|
||||
LOG.warning("Unexpected message type: %s", msg_type)
|
||||
else:
|
||||
try:
|
||||
# requeue message
|
||||
@@ -100,7 +109,7 @@ class Server(object):
|
||||
|
||||
def _reply(self, reply_to, task_uuid, state=pr.FAILURE, **kwargs):
|
||||
"""Send reply to the `reply_to` queue."""
|
||||
response = dict(state=state, **kwargs)
|
||||
response = pr.Response(state, **kwargs)
|
||||
LOG.debug("Sending reply: %s", response)
|
||||
try:
|
||||
self._proxy.publish(response, reply_to, correlation_id=task_uuid)
|
||||
@@ -115,7 +124,7 @@ class Server(object):
|
||||
|
||||
def _process_request(self, request, message):
|
||||
"""Process request in separate thread and reply back."""
|
||||
# NOTE(skudriashev): parse broker message first to get the `reply_to`
|
||||
# NOTE(skudriashev): Parse broker message first to get the `reply_to`
|
||||
# and the `task_uuid` parameters to have possibility to reply back.
|
||||
try:
|
||||
reply_to, task_uuid = self._parse_message(message)
|
||||
|
Reference in New Issue
Block a user