# Copyright (c) 2014 Prashanth Raghu.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import uuid
from oslo_utils import encodeutils
from oslo_utils import timeutils
import redis
from zaqar.common import decorators
from zaqar import storage
from import errors
from import models
from import scripting
from import utils
from import utils as s_utils
Message = models.Message
MessageEnvelope = models.MessageEnvelope
MSGSET_INDEX_KEY = 'msgset_index'
# The rank counter is an atomic index to rank messages
# in a FIFO manner.
# NOTE(kgriffs): This value, in seconds, should be at least less than the
# minimum allowed TTL for messages (60 seconds).
# TODO(kgriffs): Tune this and/or make it configurable. Don't want
# it to be so large that it blocks other operations for more than
# 1-2 milliseconds.
class MessageController(storage.Message, scripting.Mixin):
"""Implements message resource operations using Redis.
Messages are scoped by project + queue.
Redis Data Structures:
1. Message id's list (Redis sorted set)
Each queue in the system has a set of message ids currently
in the queue. The list is sorted based on a ranking which is
incremented atomically using the counter(MESSAGE_RANK_COUNTER_SUFFIX)
also stored in the database for every queue.
Key: <project_id>.<queue_name>.messages
2. Index of message ID lists (Redis sorted set)
This is a sorted set that facilitates discovery of all the
message ID lists. This is necessary when performing
garbage collection on the IDs contained within these lists.
Key: msgset_index
3. Messages(Redis Hash):
Scoped by the UUID of the message, the redis datastructure
has the following information.
| Name | Field |
| id | id |
| ttl | t |
| expires | e |
| body | b |
| claim | c |
| claim expiry time | c.e |
| claim count | c.c |
| client uuid | u |
| created time | cr |
| delay expiry time | d |
| body checksum | cs |
4. Messages rank counter (Redis Hash):
Key: <project_id>.<queue_name>.rank_counter
script_names = ['index_messages']
def __init__(self, *args, **kwargs):
super(MessageController, self).__init__(*args, **kwargs)
self._client = self.driver.connection
def _queue_ctrl(self):
return self.driver.queue_controller
def _index_messages(self, msgset_key, counter_key, message_ids):
# NOTE(kgriffs): A watch on a pipe could also be used to ensure
# messages are inserted in order, but that would be less efficient.
func = self._scripts['index_messages']
arguments = [len(message_ids)] + message_ids
func(keys=[msgset_key, counter_key], args=arguments)
def _count(self, queue, project):
"""Return total number of messages in a queue.
Note: Some expired messages may be included in the count if
they haven't been GC'd yet. This is done for performance.
return self._client.zcard(utils.msgset_key(queue, project))
def _create_msgset(self, queue, project, pipe):
pipe.zadd(MSGSET_INDEX_KEY, {utils.msgset_key(queue, project): 1})
def _delete_msgset(self, queue, project, pipe):
pipe.zrem(MSGSET_INDEX_KEY, utils.msgset_key(queue, project))
def _delete_queue_messages(self, queue, project, pipe):
"""Method to remove all the messages belonging to a queue.
Will be referenced from the QueueController.
The pipe to execute deletion will be passed from the QueueController
executing the operation.
client = self._client
msgset_key = utils.msgset_key(queue, project)
message_ids = client.zrange(msgset_key, 0, -1)
for msg_id in message_ids:
# TODO(prashanthr_): Look for better ways to solve the issue.
def _find_first_unclaimed(self, queue, project, limit):
"""Find the first unclaimed message in the queue."""
msgset_key = utils.msgset_key(queue, project)
now = timeutils.utcnow_ts()
# TODO(kgriffs): Generalize this paging pattern (DRY)
offset = 0
while True:
msg_keys = self._client.zrange(msgset_key, offset,
offset + limit - 1)
if not msg_keys:
return None
offset += len(msg_keys)
messages = [MessageEnvelope.from_redis(msg_key, self._client)
for msg_key in msg_keys]
for msg in messages:
if msg and not utils.msg_claimed_filter(msg, now):
def _exists(self, message_id):
"""Check if message exists in the Queue."""
return self._client.exists(message_id)
def _get_first_message_id(self, queue, project, sort):
"""Fetch head/tail of the Queue.
Helper function to get the first message in the queue
sort > 0 get from the left else from the right.
msgset_key = utils.msgset_key(queue, project)
zrange = self._client.zrange if sort == 1 else self._client.zrevrange
message_ids = zrange(msgset_key, 0, 0)
return message_ids[0] if message_ids else None
def _get_claim(self, message_id):
"""Gets minimal claim doc for a message.
:returns: {'id': cid, 'expires': ts} IFF the message is claimed,
and that claim has not expired.
claim = self._client.hmget(message_id, 'c', 'c.e')
if claim == [None, None]:
# NOTE(kgriffs): message_id was not found
return None
info = {
# NOTE(kgriffs): A "None" claim is serialized as an empty str
'id': encodeutils.safe_decode(claim[0]) or None,
'expires': int(claim[1]),
# Is the message claimed?
now = timeutils.utcnow_ts()
if info['id'] and (now < info['expires']):
return info
# Not claimed
return None
def _list(self, queue, project=None, marker=None,
echo=False, client_uuid=None,
include_claimed=False, include_delayed=False,
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue,
msgset_key = utils.msgset_key(queue, project)
client = self._client
if not marker and not include_claimed:
# NOTE(kgriffs): Skip claimed messages at the head
# of the queue; otherwise we would just filter them all
# out and likely end up with an empty list to return.
marker = self._find_first_unclaimed(queue, project, limit)
if marker:
start = client.zrank(msgset_key, marker) or 0
start = 0
rank = client.zrank(msgset_key, marker)
start = rank + 1 if rank else 0
message_ids = client.zrange(msgset_key, start,
start + (limit - 1))
messages = Message.from_redis_bulk(message_ids, client)
# NOTE(prashanthr_): Build a list of filters for checking
# the following:
# 1. Message is expired
# 2. Message is claimed
# 3. Message should not be echoed
now = timeutils.utcnow_ts()
filters = [functools.partial(utils.msg_expired_filter, now=now)]
if not include_claimed:
if not include_delayed:
if not echo:
marker = {}
yield _filter_messages(messages, filters, to_basic, marker)
yield marker['next']
def gc(self):
"""Garbage-collect expired message data.
Not all message data can be automatically expired. This method
cleans up the remainder.
:returns: Number of messages removed
claim_ctrl = self.driver.claim_controller
client = self._client
num_removed = 0
offset_msgsets = 0
while True:
# NOTE(kgriffs): Iterate across all message sets; there will
# be one set of message IDs per queue.
msgset_keys = client.zrange(MSGSET_INDEX_KEY,
offset_msgsets + GC_BATCH_SIZE - 1)
if not msgset_keys:
offset_msgsets += len(msgset_keys)
for msgset_key in msgset_keys:
msgset_key = encodeutils.safe_decode(msgset_key)
# NOTE(kgriffs): Drive the claim controller GC from
# here, because we already know the queue and project
# scope.
queue, project = utils.descope_message_ids_set(msgset_key)
claim_ctrl._gc(queue, project)
offset_mids = 0
while True:
# NOTE(kgriffs): Look up each message in the message set,
# see if it has expired, and if so, remove it from msgset.
mids = client.zrange(msgset_key, offset_mids,
offset_mids + GC_BATCH_SIZE - 1)
if not mids:
offset_mids += len(mids)
# NOTE(kgriffs): If redis expired the message, it will
# not exist, so all we have to do is remove mid from
# the msgset collection.
with client.pipeline() as pipe:
for mid in mids:
mid_exists_flags = pipe.execute()
with client.pipeline() as pipe:
for mid, exists in zip(mids, mid_exists_flags):
if not exists:
pipe.zrem(msgset_key, mid)
num_removed += 1
return num_removed
def list(self, queue, project=None, marker=None,
echo=False, client_uuid=None,
include_claimed=False, include_delayed=False):
return self._list(queue, project, marker, limit, echo,
client_uuid, include_claimed,
def first(self, queue, project=None, sort=1):
if sort not in (1, -1):
raise ValueError(u'sort must be either 1 (ascending) '
u'or -1 (descending)')
message_id = self._get_first_message_id(queue, project, sort)
if not message_id:
raise errors.QueueIsEmpty(queue, project)
message = Message.from_redis(message_id, self._client)
if message is None:
raise errors.QueueIsEmpty(queue, project)
now = timeutils.utcnow_ts()
return message.to_basic(now, include_created=True)
def get(self, queue, message_id, project=None):
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
message = Message.from_redis(message_id, self._client)
now = timeutils.utcnow_ts()
if message and not utils.msg_expired_filter(message, now):
return message.to_basic(now)
raise errors.MessageDoesNotExist(message_id, queue, project)
def bulk_get(self, queue, message_ids, project=None):
if not self._queue_ctrl.exists(queue, project):
return iter([])
# NOTE(prashanthr_): Pipelining is used here purely
# for performance.
with self._client.pipeline() as pipe:
for mid in message_ids:
messages = pipe.execute()
# NOTE(kgriffs): Skip messages that may have been deleted
now = timeutils.utcnow_ts()
return (Message.from_hmap(msg).to_basic(now)
for msg in messages if msg)
def post(self, queue, messages, client_uuid, project=None):
msgset_key = utils.msgset_key(queue, project)
counter_key = utils.scope_queue_index(queue, project,
message_ids = []
now = timeutils.utcnow_ts()
with self._client.pipeline() as pipe:
for msg in messages:
prepared_msg = Message(
delay_expires=now + msg.get('delay', 0),
body=msg.get('body', {}),
checksum=s_utils.get_checksum(msg.get('body', None)) if
self.driver.conf.enable_checksum else None
# NOTE(kgriffs): If this call fails, we will return
# an error to the client and the messages will be
# orphaned, but Redis will remove them when they
# expire, so we will just pretend they don't exist
# in that case.
self._index_messages(msgset_key, counter_key, message_ids)
return message_ids
def delete(self, queue, message_id, project=None, claim=None):
claim_ctrl = self.driver.claim_controller
if not self._queue_ctrl.exists(queue, project):
# NOTE(kgriffs): The message does not exist, so
# it is essentially "already" deleted.
if not self._exists(message_id):
# TODO(kgriffs): Create decorator for validating claim and message
# IDs, since those are not checked at the transport layer. This
# decorator should be applied to all relevant methods.
if claim is not None:
except ValueError:
raise errors.ClaimDoesNotExist(claim, queue, project)
msg_claim = self._get_claim(message_id)
is_claimed = (msg_claim is not None)
# Authorize the request based on having the correct claim ID
if claim is None:
if is_claimed:
raise errors.MessageIsClaimed(message_id)
elif not is_claimed:
raise errors.MessageNotClaimed(message_id)
elif msg_claim['id'] != claim:
if not claim_ctrl._exists(queue, claim, project):
raise errors.ClaimDoesNotExist(claim, queue, project)
raise errors.MessageNotClaimedBy(message_id, claim)
msgset_key = utils.msgset_key(queue, project)
with self._client.pipeline() as pipe:
pipe.zrem(msgset_key, message_id)
if is_claimed:
claim_ctrl._del_message(queue, project, msg_claim['id'],
message_id, pipe)
def bulk_delete(self, queue, message_ids, project=None, claim_ids=None):
claim_ctrl = self.driver.claim_controller
if not self._queue_ctrl.exists(queue, project):
msgset_key = utils.msgset_key(queue, project)
with self._client.pipeline() as pipe:
for mid in message_ids:
if not self._exists(mid):
pipe.zrem(msgset_key, mid)
msg_claim = self._get_claim(mid)
if claim_ids and msg_claim is None:
raise errors.MessageNotClaimed(mid)
if msg_claim is not None:
if claim_ids and (msg_claim['id'] not in claim_ids):
raise errors.ClaimDoesNotMatch(msg_claim['id'], queue,
claim_ctrl._del_message(queue, project, msg_claim['id'],
mid, pipe)
def pop(self, queue, limit, project=None):
# Pop is implemented as a chain of the following operations:
# 1. Create a claim.
# 2. Delete the messages claimed.
# 3. Delete the claim.
claim_ctrl = self.driver.claim_controller
claim_id, messages = claim_ctrl.create(
queue, dict(ttl=1, grace=0), project, limit=limit)
message_ids = [message['id'] for message in messages]
self.bulk_delete(queue, message_ids, project)
# NOTE(prashanthr_): Creating a claim controller reference
# causes a recursive reference. Hence, using the reference
# from the driver.
claim_ctrl.delete(queue, claim_id, project)
return messages
def _filter_messages(messages, filters, to_basic, marker):
"""Create a filtering iterator over a list of messages.
The function accepts a list of filters to be filtered
before the message can be included as a part of the reply.
now = timeutils.utcnow_ts()
for msg in messages:
# NOTE(kgriffs): Message may have been deleted, so
# check each value to ensure we got a message back
if msg is None:
# NOTE(kgriffs): Check to see if any of the filters
# indiciate that this message should be skipped.
for should_skip in filters:
if should_skip(msg):
marker['next'] =
if to_basic:
yield msg.to_basic(now)
yield msg
QUEUES_SET_STORE_NAME = 'queues_set'
class MessageQueueHandler(object):
def __init__(self, driver, control_driver):
self.driver = driver
self._client = self.driver.connection
self._queue_ctrl = self.driver.queue_controller
self._message_ctrl = self.driver.message_controller
self._claim_ctrl = self.driver.claim_controller
def create(self, name, metadata=None, project=None):
with self._client.pipeline() as pipe:
self._message_ctrl._create_msgset(name, project, pipe)
except redis.exceptions.ResponseError:
return False
def delete(self, name, project=None):
with self._client.pipeline() as pipe:
self._message_ctrl._delete_msgset(name, project, pipe)
self._message_ctrl._delete_queue_messages(name, project, pipe)
def stats(self, name, project=None):
if not self._queue_ctrl.exists(name, project=project):
raise errors.QueueDoesNotExist(name, project)
total = self._message_ctrl._count(name, project)
if total:
claimed = self._claim_ctrl._count_messages(name, project)
claimed = 0
message_stats = {
'claimed': claimed,
'free': total - claimed,
'total': total,
if total:
newest = self._message_ctrl.first(name, project, -1)
oldest = self._message_ctrl.first(name, project, 1)
except errors.QueueIsEmpty:
message_stats['newest'] = newest
message_stats['oldest'] = oldest
return {'messages': message_stats}
class MessageTopicHandler(object):
def __init__(self, driver, control_driver):
self.driver = driver
self._client = self.driver.connection
self._topic_ctrl = self.driver.topic_controller
self._message_ctrl = self.driver.message_controller
def create(self, name, metadata=None, project=None):
with self._client.pipeline() as pipe:
self._message_ctrl._create_msgset(name, project, pipe)
except redis.exceptions.ResponseError:
return False
def delete(self, name, project=None):
with self._client.pipeline() as pipe:
self._message_ctrl._delete_msgset(name, project, pipe)
self._message_ctrl._delete_queue_messages(name, project, pipe)
def stats(self, name, project=None):
if not self._topic_ctrl.exists(name, project=project):
raise errors.TopicDoesNotExist(name, project)
total = self._message_ctrl._count(name, project)
message_stats = {
'total': total
if total:
newest = self._message_ctrl.first(name, project, -1)
oldest = self._message_ctrl.first(name, project, 1)
except errors.QueueIsEmpty:
message_stats['newest'] = newest
message_stats['oldest'] = oldest
return {'messages': message_stats}