Fix Redis message controller getting stuck in while loop

The Redis driver was getting stuck in a loop while trying to create
a claim. The root cause turned out to be some logic that was trying
to skip past all the claimed messages at the head of the queue, but
did not advance the offset if no unclaimed messages were found,
leading to a neverending loop.

Change-Id: I8d2e1a7820c30e92a63ed938013f0e69dff83b2f
Partially-Implements: blueprint redis-storage-driver
This commit is contained in:
kgriffs 2014-09-02 09:35:19 -05:00
parent 83f192da6e
commit 16cf9550d4
2 changed files with 38 additions and 21 deletions

View File

@ -118,29 +118,32 @@ class MessageController(storage.Message):
for msg_id in message_ids:
pipe.delete(msg_id)
# TODO(prashanthr_): Looking for better ways to solve the issue.
# TODO(prashanthr_): Look for better ways to solve the issue.
def _find_first_unclaimed(self, queue, project, limit):
"""Find the first unclaimed message in the queue."""
msgset_key = utils.scope_message_ids_set(queue, project,
MESSAGE_IDS_SUFFIX)
marker = 0
now = timeutils.utcnow_ts()
# NOTE(prashanthr_): This will not be an infinite loop.
while True:
msg_keys = self._client.zrange(msgset_key, marker,
marker + limit)
if msg_keys:
messages = [Message.from_redis(self._client.hgetall(msg_key))
for msg_key in msg_keys]
# TODO(kgriffs): Generalize this paging pattern (DRY)
offset = 0
for msg in messages:
if not utils.msg_claimed_filter(msg, now):
return msg.id
else:
while True:
msg_keys = self._client.zrange(msgset_key, offset,
offset + limit - 1)
if not msg_keys:
return None
offset += len(msg_keys)
messages = [Message.from_redis(self._client.hgetall(msg_key))
for msg_key in msg_keys]
for msg in messages:
if not utils.msg_claimed_filter(msg, now):
return msg.id
def _exists(self, key):
"""Check if message exists in the Queue.
@ -194,9 +197,10 @@ class MessageController(storage.Message):
client = self._client
with self._client.pipeline() as pipe:
# NOTE(prashanthr_): Iterate through the queue to find the first
# unclaimed message.
if not marker and not include_claimed:
# NOTE(kgriffs): Skip unclaimed messages at the head
# of the queue; otherwise we would just filter them all
# out and likely end up with an empty list to return.
marker = self._find_first_unclaimed(queue, project, limit)
start = client.zrank(msgset_key, marker) or 0
else:

View File

@ -765,17 +765,30 @@ class ClaimControllerTest(ControllerBaseTest):
self.controller.get, self.queue_name,
claim_id, project=self.project)
def test_claim_create_default_limit(self):
def test_claim_create_default_limit_multi(self):
num_claims = 5
num_messages = storage.DEFAULT_MESSAGES_PER_CLAIM * num_claims
# NOTE(kgriffs): + 1 on num_messages to check for off-by-one error
_insert_fixtures(self.message_controller, self.queue_name,
project=self.project, client_uuid=uuid.uuid4(),
num=storage.DEFAULT_MESSAGES_PER_CLAIM + 1)
num=num_messages + 1)
meta = {'ttl': 70, 'grace': 30}
total_claimed = 0
claim_id, messages = self.controller.create(self.queue_name, meta,
project=self.project)
for _ in range(num_claims):
claim_id, messages = self.controller.create(
self.queue_name, meta, project=self.project)
messages = list(messages)
self.assertEqual(len(messages), storage.DEFAULT_MESSAGES_PER_CLAIM)
messages = list(messages)
num_claimed = len(messages)
self.assertEqual(num_claimed,
storage.DEFAULT_MESSAGES_PER_CLAIM)
total_claimed += num_claimed
self.assertEqual(total_claimed, num_messages)
def test_extend_lifetime(self):
_insert_fixtures(self.message_controller, self.queue_name,