diff --git a/zaqar/queues/storage/redis/messages.py b/zaqar/queues/storage/redis/messages.py index 1978cae93..a414ac027 100644 --- a/zaqar/queues/storage/redis/messages.py +++ b/zaqar/queues/storage/redis/messages.py @@ -118,29 +118,32 @@ class MessageController(storage.Message): for msg_id in message_ids: pipe.delete(msg_id) - # TODO(prashanthr_): Looking for better ways to solve the issue. + # TODO(prashanthr_): Look for better ways to solve the issue. def _find_first_unclaimed(self, queue, project, limit): """Find the first unclaimed message in the queue.""" msgset_key = utils.scope_message_ids_set(queue, project, MESSAGE_IDS_SUFFIX) - marker = 0 now = timeutils.utcnow_ts() - # NOTE(prashanthr_): This will not be an infinite loop. - while True: - msg_keys = self._client.zrange(msgset_key, marker, - marker + limit) - if msg_keys: - messages = [Message.from_redis(self._client.hgetall(msg_key)) - for msg_key in msg_keys] + # TODO(kgriffs): Generalize this paging pattern (DRY) + offset = 0 - for msg in messages: - if not utils.msg_claimed_filter(msg, now): - return msg.id - else: + while True: + msg_keys = self._client.zrange(msgset_key, offset, + offset + limit - 1) + if not msg_keys: return None + offset += len(msg_keys) + + messages = [Message.from_redis(self._client.hgetall(msg_key)) + for msg_key in msg_keys] + + for msg in messages: + if not utils.msg_claimed_filter(msg, now): + return msg.id + def _exists(self, key): """Check if message exists in the Queue. @@ -194,9 +197,10 @@ class MessageController(storage.Message): client = self._client with self._client.pipeline() as pipe: - # NOTE(prashanthr_): Iterate through the queue to find the first - # unclaimed message. if not marker and not include_claimed: + # NOTE(kgriffs): Skip unclaimed messages at the head + # of the queue; otherwise we would just filter them all + # out and likely end up with an empty list to return. marker = self._find_first_unclaimed(queue, project, limit) start = client.zrank(msgset_key, marker) or 0 else: diff --git a/zaqar/tests/queues/storage/base.py b/zaqar/tests/queues/storage/base.py index aeff50da2..c0fafbe8b 100644 --- a/zaqar/tests/queues/storage/base.py +++ b/zaqar/tests/queues/storage/base.py @@ -765,17 +765,30 @@ class ClaimControllerTest(ControllerBaseTest): self.controller.get, self.queue_name, claim_id, project=self.project) - def test_claim_create_default_limit(self): + def test_claim_create_default_limit_multi(self): + num_claims = 5 + num_messages = storage.DEFAULT_MESSAGES_PER_CLAIM * num_claims + + # NOTE(kgriffs): + 1 on num_messages to check for off-by-one error _insert_fixtures(self.message_controller, self.queue_name, project=self.project, client_uuid=uuid.uuid4(), - num=storage.DEFAULT_MESSAGES_PER_CLAIM + 1) + num=num_messages + 1) + meta = {'ttl': 70, 'grace': 30} + total_claimed = 0 - claim_id, messages = self.controller.create(self.queue_name, meta, - project=self.project) + for _ in range(num_claims): + claim_id, messages = self.controller.create( + self.queue_name, meta, project=self.project) - messages = list(messages) - self.assertEqual(len(messages), storage.DEFAULT_MESSAGES_PER_CLAIM) + messages = list(messages) + num_claimed = len(messages) + self.assertEqual(num_claimed, + storage.DEFAULT_MESSAGES_PER_CLAIM) + + total_claimed += num_claimed + + self.assertEqual(total_claimed, num_messages) def test_extend_lifetime(self): _insert_fixtures(self.message_controller, self.queue_name,