diff --git a/api-ref/source/parameters.yaml b/api-ref/source/parameters.yaml index 221839c4..6146013e 100644 --- a/api-ref/source/parameters.yaml +++ b/api-ref/source/parameters.yaml @@ -133,6 +133,28 @@ pop: #### variables in request #################################################### +_dead_letter_queue: + type: string + in: body + required: False + description: | + The target the message will be moved to when the message can't processed + successfully after meet the max claim count. It's not supported to add + queue C as the dead letter queue for queue B where queue B has been set + as a dead letter queue for queue A. There is no default value for this + attribute. If it's not set explicitly, then that means there is no dead + letter queue for current queue. It is one of the ``reserved attributes`` + of Zaqar queues. + +_dead_letter_queue_messages_ttl: + type: integer + in: body + required: False + description: | + The new TTL setting for messages when moved to dead letter queue. If it's + not set, current TTL will be kept. It is one of the ``reserved attributes`` + of Zaqar queues. + _default_message_ttl: type: integer in: body @@ -145,6 +167,26 @@ _default_message_ttl: one of the ``reserved attributes`` of Zaqar queues. The value will be reverted to the default value after deleting it explicitly. +_flavor: + type: string + in: body + required: False + description: | + The flavor name which can tell Zaqar which storage pool will be used to + create the queue. It is one of the ``reserved attributes`` of Zaqar + queues. + +_max_claim_count: + type: integer + in: body + required: False + description: | + The max number the message can be claimed. Generally, + it means the message cannot be processed successfully. There is no default + value for this attribute. If it's not set, then that means this feature + won't be enabled for current queue. It is one of the + ``reserved attributes`` of Zaqar queues. + _max_messages_post_size: type: integer in: body diff --git a/api-ref/source/queues.inc b/api-ref/source/queues.inc index a2adb8c9..47ec4986 100644 --- a/api-ref/source/queues.inc +++ b/api-ref/source/queues.inc @@ -72,6 +72,10 @@ The body of the request is empty. exceed 64 bytes in length, and it is limited to US-ASCII letters, digits, underscores, and hyphens. +When create queue, user can specify metadata for the queue. Currently, Zaqar +supports below metadata: _flavor, _max_claim_count, _dead_letter_queue and +_dead_letter_queue_messages_ttl. + Normal response codes: 201, 204 @@ -88,6 +92,12 @@ Request Parameters .. rest_parameters:: parameters.yaml - queue_name: queue_name + - _dead_letter_queue: _dead_letter_queue + - _dead_letter_queue_messages_ttl: _dead_letter_queue_messages_ttl + - _default_message_ttl: _default_message_ttl + - _flavor: _flavor + - _max_claim_count: _max_claim_count + - _max_messages_post_size: _max_messages_post_size Request Example --------------- diff --git a/releasenotes/notes/support_dead_letter_queue_for_mongodb-c8b7303319e7f920.yaml b/releasenotes/notes/support_dead_letter_queue_for_mongodb-c8b7303319e7f920.yaml new file mode 100644 index 00000000..a3407144 --- /dev/null +++ b/releasenotes/notes/support_dead_letter_queue_for_mongodb-c8b7303319e7f920.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + Support for dead letter queue is added for MongoDB. With this feature, + message will be moved to the specified dead letter queue if it's claimed + many times but still can't successfully processed by a client. New reseved + metadata keys of queue are added: _max_claim_count, _dead_letter_queue and + _dead_letter_queue_messages_ttl. diff --git a/zaqar/storage/mongodb/claims.py b/zaqar/storage/mongodb/claims.py index 36ae5657..ee267d29 100644 --- a/zaqar/storage/mongodb/claims.py +++ b/zaqar/storage/mongodb/claims.py @@ -24,12 +24,16 @@ Field Mappings: import datetime from bson import objectid +from oslo_log import log as logging from oslo_utils import timeutils +from pymongo.collection import ReturnDocument from zaqar import storage from zaqar.storage import errors from zaqar.storage.mongodb import utils +LOG = logging.getLogger(__name__) + def _messages_iter(msg_iter): """Used to iterate through messages.""" @@ -125,6 +129,7 @@ class ClaimController(storage.Claim): time being, to execute an update on a limited number of records. """ msg_ctrl = self.driver.message_controller + queue_ctrl = self.driver.queue_controller ttl = metadata['ttl'] grace = metadata['grace'] @@ -142,19 +147,25 @@ class ClaimController(storage.Claim): 'id': oid, 't': ttl, 'e': claim_expires, + 'c': 0 # NOTE(flwang): A placeholder which will be updated later } # Get a list of active, not claimed nor expired # messages that could be claimed. - msgs = msg_ctrl._active(queue, projection={'_id': 1}, project=project, + msgs = msg_ctrl._active(queue, projection={'_id': 1, 'c': 1}, + project=project, limit=limit) messages = iter([]) - ids = [msg['_id'] for msg in msgs] + be_claimed = [(msg['_id'], msg['c'].get('c', 0)) for msg in msgs] + ids = [_id for _id, _ in be_claimed] if len(ids) == 0: return None, messages + # Get the maxClaimCount and deadLetterQueue from current queue's meta + queue_meta = queue_ctrl.get(queue, project=project) + now = timeutils.utcnow_ts() # NOTE(kgriffs): Set the claim field for @@ -186,6 +197,68 @@ class ClaimController(storage.Claim): {'$set': new_values}, upsert=False, multi=True) + if ('_max_claim_count' in queue_meta and + '_dead_letter_queue' in queue_meta): + LOG.debug(u"The list of messages being claimed: %(be_claimed)s", + {"be_claimed": be_claimed}) + + for _id, claimed_count in be_claimed: + # NOTE(flwang): We have claimed the message above, but we will + # update the claim count below. So that means, when the + # claimed_count equals queue_meta['_max_claim_count'], the + # message has met the threshold. And Zaqar will move it to the + # DLQ. + if claimed_count < queue_meta['_max_claim_count']: + # 1. Save the new max claim count for message + collection.update({'_id': _id, + 'c.id': oid}, + {'$set': {'c.c': claimed_count + 1}}, + upsert=False) + LOG.debug(u"Message %(id)s has been claimed %(count)d " + u"times.", {"id": str(_id), + "count": claimed_count + 1}) + else: + # 2. Check if the message's claim count has exceeded the + # max claim count defined in the queue, if so, move the + # message to the dead letter queue. + + # NOTE(flwang): We're moving message directly. That means, + # the queue and dead letter queue must be created on the + # same storage pool. It's a technical tradeoff, because if + # we re-send the message to the dead letter queue by + # message controller, then we will lost all the claim + # information. + dlq_name = queue_meta['_dead_letter_queue'] + new_msg = {'c.c': claimed_count, + 'p_q': utils.scope_queue_name(dlq_name, + project)} + dlq_ttl = queue_meta.get("_dead_letter_queue_messages_ttl") + if dlq_ttl: + new_msg['t'] = dlq_ttl + kwargs = {"return_document": ReturnDocument.AFTER} + msg = collection.find_one_and_update({'_id': _id, + 'c.id': oid}, + {'$set': new_msg}, + **kwargs) + dlq_collection = msg_ctrl._collection(dlq_name, project) + if not dlq_collection: + LOG.warning(u"Failed to find the message collection " + u"for queue %(dlq_name)s", {"dlq_name": + dlq_name}) + return None, iter([]) + result = dlq_collection.insert_one(msg) + if result.inserted_id: + collection.delete_one({'_id': _id}) + LOG.debug(u"Message %(id)s has met the max claim count " + u"%(count)d, now it has been moved to dead " + u"letter queue %(dlq_name)s.", + {"id": str(_id), "count": claimed_count, + "dlq_name": dlq_name}) + # NOTE(flwang): Because the claimed count has meet the + # max, so the current claim is not valid. And technically, + # it's failed to create the claim. + return None, iter([]) + if updated != 0: # NOTE(kgriffs): This extra step is necessary because # in between having gotten a list of active messages diff --git a/zaqar/storage/mongodb/messages.py b/zaqar/storage/mongodb/messages.py index a7c2148e..a4a98c76 100644 --- a/zaqar/storage/mongodb/messages.py +++ b/zaqar/storage/mongodb/messages.py @@ -637,7 +637,7 @@ class MessageController(storage.Message): 't': message['ttl'], 'e': now_dt + datetime.timedelta(seconds=message['ttl']), 'u': client_uuid, - 'c': {'id': None, 'e': now}, + 'c': {'id': None, 'e': now, 'c': 0}, 'b': message['body'] if 'body' in message else {}, 'k': next_marker + index, 'tx': None, @@ -815,7 +815,7 @@ class FIFOMessageController(MessageController): 't': message['ttl'], 'e': now_dt + datetime.timedelta(seconds=message['ttl']), 'u': client_uuid, - 'c': {'id': None, 'e': now}, + 'c': {'id': None, 'e': now, 'c': 0}, 'b': message['body'] if 'body' in message else {}, 'k': next_marker + index, 'tx': transaction, diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index 05ab36ad..57622505 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -1010,6 +1010,47 @@ class ClaimControllerTest(ControllerBaseTest): {'ttl': 40}, project=self.project) + def test_dead_letter_queue(self): + DLQ_name = "DLQ" + meta = {'ttl': 3, 'grace': 3} + self.queue_controller.create("DLQ", project=self.project) + # Set dead letter queeu metadata + metadata = {"_max_claim_count": 2, + "_dead_letter_queue": DLQ_name, + "_dead_letter_queue_messages_ttl": 9999} + self.queue_controller.set_metadata(self.queue_name, + metadata, + project=self.project) + + new_messages = [{'ttl': 3600, 'body': {"key": "value"}}] + + self.message_controller.post(self.queue_name, new_messages, + client_uuid=str(uuid.uuid1()), + project=self.project) + + claim_id, messages = self.controller.create(self.queue_name, meta, + project=self.project) + self.assertIsNotNone(claim_id) + self.assertEqual(1, len(list(messages))) + time.sleep(5) + claim_id, messages = self.controller.create(self.queue_name, meta, + project=self.project) + self.assertIsNotNone(claim_id) + messages = list(messages) + self.assertEqual(1, len(messages)) + time.sleep(5) + claim_id, messages = self.controller.create(self.queue_name, meta, + project=self.project) + self.assertIsNone(claim_id) + self.assertEqual(0, len(list(messages))) + + DLQ_messages = self.message_controller.list(DLQ_name, + project=self.project, + include_claimed=True) + expected_msg = list(next(DLQ_messages))[0] + self.assertEqual(9999, expected_msg["ttl"]) + self.assertEqual({"key": "value"}, expected_msg["body"]) + @ddt.ddt class SubscriptionControllerTest(ControllerBaseTest): diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index ed681c7d..45f64bf4 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -327,6 +327,21 @@ class Validator(object): ' and must be at least greater than 0.'), self._limits_conf.max_messages_post_size) + max_claim_count = queue_metadata.get('_max_claim_count', None) + if max_claim_count and not isinstance(max_claim_count, int): + msg = _(u'_max_claim_count must be integer.') + raise ValidationFailed(msg) + + dlq_ttl = queue_metadata.get('_dead_letter_queue_messages_ttl', None) + if dlq_ttl and not isinstance(dlq_ttl, int): + msg = _(u'_dead_letter_queue_messages_ttl must be integer.') + raise ValidationFailed(msg) + + if not (MIN_MESSAGE_TTL <= dlq_ttl <= + self._limits_conf.max_message_ttl): + msg = _(u'The TTL for a message may not exceed {0} seconds, ' + 'and must be at least {1} seconds long.') + def queue_purging(self, document): """Restrictions the resource types to be purged for a queue.