From 16cf9550d475c85f7152237f038adcb986dbec21 Mon Sep 17 00:00:00 2001 From: kgriffs Date: Tue, 2 Sep 2014 09:35:19 -0500 Subject: [PATCH] Fix Redis message controller getting stuck in while loop The Redis driver was getting stuck in a loop while trying to create a claim. The root cause turned out to be some logic that was trying to skip past all the claimed messages at the head of the queue, but did not advance the offset if no unclaimed messages were found, leading to a neverending loop. Change-Id: I8d2e1a7820c30e92a63ed938013f0e69dff83b2f Partially-Implements: blueprint redis-storage-driver --- zaqar/queues/storage/redis/messages.py | 34 ++++++++++++++------------ zaqar/tests/queues/storage/base.py | 25 ++++++++++++++----- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/zaqar/queues/storage/redis/messages.py b/zaqar/queues/storage/redis/messages.py index 1978cae93..a414ac027 100644 --- a/zaqar/queues/storage/redis/messages.py +++ b/zaqar/queues/storage/redis/messages.py @@ -118,29 +118,32 @@ class MessageController(storage.Message): for msg_id in message_ids: pipe.delete(msg_id) - # TODO(prashanthr_): Looking for better ways to solve the issue. + # TODO(prashanthr_): Look for better ways to solve the issue. def _find_first_unclaimed(self, queue, project, limit): """Find the first unclaimed message in the queue.""" msgset_key = utils.scope_message_ids_set(queue, project, MESSAGE_IDS_SUFFIX) - marker = 0 now = timeutils.utcnow_ts() - # NOTE(prashanthr_): This will not be an infinite loop. - while True: - msg_keys = self._client.zrange(msgset_key, marker, - marker + limit) - if msg_keys: - messages = [Message.from_redis(self._client.hgetall(msg_key)) - for msg_key in msg_keys] + # TODO(kgriffs): Generalize this paging pattern (DRY) + offset = 0 - for msg in messages: - if not utils.msg_claimed_filter(msg, now): - return msg.id - else: + while True: + msg_keys = self._client.zrange(msgset_key, offset, + offset + limit - 1) + if not msg_keys: return None + offset += len(msg_keys) + + messages = [Message.from_redis(self._client.hgetall(msg_key)) + for msg_key in msg_keys] + + for msg in messages: + if not utils.msg_claimed_filter(msg, now): + return msg.id + def _exists(self, key): """Check if message exists in the Queue. @@ -194,9 +197,10 @@ class MessageController(storage.Message): client = self._client with self._client.pipeline() as pipe: - # NOTE(prashanthr_): Iterate through the queue to find the first - # unclaimed message. if not marker and not include_claimed: + # NOTE(kgriffs): Skip unclaimed messages at the head + # of the queue; otherwise we would just filter them all + # out and likely end up with an empty list to return. marker = self._find_first_unclaimed(queue, project, limit) start = client.zrank(msgset_key, marker) or 0 else: diff --git a/zaqar/tests/queues/storage/base.py b/zaqar/tests/queues/storage/base.py index aeff50da2..c0fafbe8b 100644 --- a/zaqar/tests/queues/storage/base.py +++ b/zaqar/tests/queues/storage/base.py @@ -765,17 +765,30 @@ class ClaimControllerTest(ControllerBaseTest): self.controller.get, self.queue_name, claim_id, project=self.project) - def test_claim_create_default_limit(self): + def test_claim_create_default_limit_multi(self): + num_claims = 5 + num_messages = storage.DEFAULT_MESSAGES_PER_CLAIM * num_claims + + # NOTE(kgriffs): + 1 on num_messages to check for off-by-one error _insert_fixtures(self.message_controller, self.queue_name, project=self.project, client_uuid=uuid.uuid4(), - num=storage.DEFAULT_MESSAGES_PER_CLAIM + 1) + num=num_messages + 1) + meta = {'ttl': 70, 'grace': 30} + total_claimed = 0 - claim_id, messages = self.controller.create(self.queue_name, meta, - project=self.project) + for _ in range(num_claims): + claim_id, messages = self.controller.create( + self.queue_name, meta, project=self.project) - messages = list(messages) - self.assertEqual(len(messages), storage.DEFAULT_MESSAGES_PER_CLAIM) + messages = list(messages) + num_claimed = len(messages) + self.assertEqual(num_claimed, + storage.DEFAULT_MESSAGES_PER_CLAIM) + + total_claimed += num_claimed + + self.assertEqual(total_claimed, num_messages) def test_extend_lifetime(self): _insert_fixtures(self.message_controller, self.queue_name,