Merge "Fix Redis message controller getting stuck in while loop"
This commit is contained in:
commit
68015690f5
@ -118,29 +118,32 @@ class MessageController(storage.Message):
|
|||||||
for msg_id in message_ids:
|
for msg_id in message_ids:
|
||||||
pipe.delete(msg_id)
|
pipe.delete(msg_id)
|
||||||
|
|
||||||
# TODO(prashanthr_): Looking for better ways to solve the issue.
|
# TODO(prashanthr_): Look for better ways to solve the issue.
|
||||||
def _find_first_unclaimed(self, queue, project, limit):
|
def _find_first_unclaimed(self, queue, project, limit):
|
||||||
"""Find the first unclaimed message in the queue."""
|
"""Find the first unclaimed message in the queue."""
|
||||||
|
|
||||||
msgset_key = utils.scope_message_ids_set(queue, project,
|
msgset_key = utils.scope_message_ids_set(queue, project,
|
||||||
MESSAGE_IDS_SUFFIX)
|
MESSAGE_IDS_SUFFIX)
|
||||||
marker = 0
|
|
||||||
now = timeutils.utcnow_ts()
|
now = timeutils.utcnow_ts()
|
||||||
|
|
||||||
# NOTE(prashanthr_): This will not be an infinite loop.
|
# TODO(kgriffs): Generalize this paging pattern (DRY)
|
||||||
while True:
|
offset = 0
|
||||||
msg_keys = self._client.zrange(msgset_key, marker,
|
|
||||||
marker + limit)
|
|
||||||
if msg_keys:
|
|
||||||
messages = [Message.from_redis(self._client.hgetall(msg_key))
|
|
||||||
for msg_key in msg_keys]
|
|
||||||
|
|
||||||
for msg in messages:
|
while True:
|
||||||
if not utils.msg_claimed_filter(msg, now):
|
msg_keys = self._client.zrange(msgset_key, offset,
|
||||||
return msg.id
|
offset + limit - 1)
|
||||||
else:
|
if not msg_keys:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
offset += len(msg_keys)
|
||||||
|
|
||||||
|
messages = [Message.from_redis(self._client.hgetall(msg_key))
|
||||||
|
for msg_key in msg_keys]
|
||||||
|
|
||||||
|
for msg in messages:
|
||||||
|
if not utils.msg_claimed_filter(msg, now):
|
||||||
|
return msg.id
|
||||||
|
|
||||||
def _exists(self, key):
|
def _exists(self, key):
|
||||||
"""Check if message exists in the Queue.
|
"""Check if message exists in the Queue.
|
||||||
|
|
||||||
@ -194,9 +197,10 @@ class MessageController(storage.Message):
|
|||||||
client = self._client
|
client = self._client
|
||||||
|
|
||||||
with self._client.pipeline() as pipe:
|
with self._client.pipeline() as pipe:
|
||||||
# NOTE(prashanthr_): Iterate through the queue to find the first
|
|
||||||
# unclaimed message.
|
|
||||||
if not marker and not include_claimed:
|
if not marker and not include_claimed:
|
||||||
|
# NOTE(kgriffs): Skip unclaimed messages at the head
|
||||||
|
# of the queue; otherwise we would just filter them all
|
||||||
|
# out and likely end up with an empty list to return.
|
||||||
marker = self._find_first_unclaimed(queue, project, limit)
|
marker = self._find_first_unclaimed(queue, project, limit)
|
||||||
start = client.zrank(msgset_key, marker) or 0
|
start = client.zrank(msgset_key, marker) or 0
|
||||||
else:
|
else:
|
||||||
|
@ -765,17 +765,30 @@ class ClaimControllerTest(ControllerBaseTest):
|
|||||||
self.controller.get, self.queue_name,
|
self.controller.get, self.queue_name,
|
||||||
claim_id, project=self.project)
|
claim_id, project=self.project)
|
||||||
|
|
||||||
def test_claim_create_default_limit(self):
|
def test_claim_create_default_limit_multi(self):
|
||||||
|
num_claims = 5
|
||||||
|
num_messages = storage.DEFAULT_MESSAGES_PER_CLAIM * num_claims
|
||||||
|
|
||||||
|
# NOTE(kgriffs): + 1 on num_messages to check for off-by-one error
|
||||||
_insert_fixtures(self.message_controller, self.queue_name,
|
_insert_fixtures(self.message_controller, self.queue_name,
|
||||||
project=self.project, client_uuid=uuid.uuid4(),
|
project=self.project, client_uuid=uuid.uuid4(),
|
||||||
num=storage.DEFAULT_MESSAGES_PER_CLAIM + 1)
|
num=num_messages + 1)
|
||||||
|
|
||||||
meta = {'ttl': 70, 'grace': 30}
|
meta = {'ttl': 70, 'grace': 30}
|
||||||
|
total_claimed = 0
|
||||||
|
|
||||||
claim_id, messages = self.controller.create(self.queue_name, meta,
|
for _ in range(num_claims):
|
||||||
project=self.project)
|
claim_id, messages = self.controller.create(
|
||||||
|
self.queue_name, meta, project=self.project)
|
||||||
|
|
||||||
messages = list(messages)
|
messages = list(messages)
|
||||||
self.assertEqual(len(messages), storage.DEFAULT_MESSAGES_PER_CLAIM)
|
num_claimed = len(messages)
|
||||||
|
self.assertEqual(num_claimed,
|
||||||
|
storage.DEFAULT_MESSAGES_PER_CLAIM)
|
||||||
|
|
||||||
|
total_claimed += num_claimed
|
||||||
|
|
||||||
|
self.assertEqual(total_claimed, num_messages)
|
||||||
|
|
||||||
def test_extend_lifetime(self):
|
def test_extend_lifetime(self):
|
||||||
_insert_fixtures(self.message_controller, self.queue_name,
|
_insert_fixtures(self.message_controller, self.queue_name,
|
||||||
|
Loading…
Reference in New Issue
Block a user