Fix the TTL issue of subscriptions for Redis
Currently, the TTL of subscriptons doesn't work. That said, after the ttl seconds, the subscription can't be deleted. This pathch fixes it and a functional test will be added in Zaqar client. Partial-Bug: #1529168 Change-Id: I0c6a965347701bcfbf6d5634c7c8303d1e74ecc2
This commit is contained in:
@@ -147,7 +147,7 @@ class SubscriptionEnvelope(object):
|
||||
pipe.hmset(self.id, hmap)
|
||||
pipe.expire(self.id, self.ttl)
|
||||
|
||||
def to_basic(self, now):
|
||||
def to_basic(self):
|
||||
basic_msg = {
|
||||
'id': self.id,
|
||||
'source': self.source,
|
||||
|
||||
@@ -31,7 +31,6 @@ SubscriptionEnvelope = models.SubscriptionEnvelope
|
||||
|
||||
SUBSET_INDEX_KEY = 'subset_index'
|
||||
SUBSCRIPTION_IDS_SUFFIX = 'subscriptions'
|
||||
SUBSCRIBERS_SUFFIX = 'subscribers'
|
||||
|
||||
|
||||
class SubscriptionController(base.Subscription):
|
||||
@@ -93,21 +92,14 @@ class SubscriptionController(base.Subscription):
|
||||
|
||||
subscription = SubscriptionEnvelope.from_redis(subscription_id,
|
||||
self._client)
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
if subscription and not utils.subscription_expired_filter(subscription,
|
||||
now):
|
||||
return subscription.to_basic(now)
|
||||
if subscription:
|
||||
return subscription.to_basic()
|
||||
else:
|
||||
raise errors.SubscriptionDoesNotExist(subscription_id)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def create(self, queue, subscriber, ttl, options, project=None):
|
||||
member_key = utils.scope_subscribers_set(queue, project,
|
||||
SUBSCRIBERS_SUFFIX)
|
||||
if self._client.sismember(member_key, subscriber):
|
||||
return None
|
||||
subscription_id = str(uuid.uuid4())
|
||||
subset_key = utils.scope_subscription_ids_set(queue,
|
||||
project,
|
||||
@@ -115,7 +107,6 @@ class SubscriptionController(base.Subscription):
|
||||
|
||||
source = queue
|
||||
now = timeutils.utcnow_ts()
|
||||
ttl = int(ttl)
|
||||
expires = now + ttl
|
||||
|
||||
subscription = {'id': subscription_id,
|
||||
@@ -131,14 +122,49 @@ class SubscriptionController(base.Subscription):
|
||||
try:
|
||||
# Pipeline ensures atomic inserts.
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.sadd(member_key, subscriber)
|
||||
pipe.zadd(subset_key, 1,
|
||||
subscription_id).hmset(subscription_id, subscription)
|
||||
pipe.execute()
|
||||
if not self._is_duplicated_subscriber(subscriber,
|
||||
queue,
|
||||
project):
|
||||
pipe.zadd(subset_key, 1,
|
||||
subscription_id).hmset(subscription_id,
|
||||
subscription)
|
||||
pipe.expire(subscription_id, ttl)
|
||||
pipe.execute()
|
||||
else:
|
||||
return None
|
||||
return subscription_id
|
||||
except redis.exceptions.ResponseError:
|
||||
return None
|
||||
|
||||
def _is_duplicated_subscriber(self, subscriber, queue, project):
|
||||
"""Check if the subscriber is existing or not.
|
||||
|
||||
Given the limitation of Redis' expires(), it's hard to auto expire
|
||||
subscriber from the set and subscription id from the sorted set, so
|
||||
this method is used to do a ugly duplication check when adding a new
|
||||
subscription so that we don't need the set for subscriber. And as a
|
||||
side effect, this method will remove the unreachable subscription's id
|
||||
from the sorted set.
|
||||
"""
|
||||
subset_key = utils.scope_subscription_ids_set(queue,
|
||||
project,
|
||||
SUBSCRIPTION_IDS_SUFFIX)
|
||||
try:
|
||||
sub_ids = (q for q in self._client.zrange(subset_key, 0, -1))
|
||||
for s_id in sub_ids:
|
||||
subscription = self._client.hmget(s_id, ['s', 'u', 't', 'o'])
|
||||
if subscription == [None, None, None, None]:
|
||||
# NOTE(flwang): Under this check, that means the
|
||||
# subscription has been expired. So redis can't get
|
||||
# the subscription but the id is still there. So let's
|
||||
# delete the id for clean up.
|
||||
self._client.zrem(subset_key, s_id)
|
||||
if subscription[1] == subscriber:
|
||||
return True
|
||||
return False
|
||||
except redis.exceptions.ResponseError:
|
||||
return True
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def exists(self, queue, subscription_id, project=None):
|
||||
@@ -168,12 +194,8 @@ class SubscriptionController(base.Subscription):
|
||||
def delete(self, queue, subscription_id, project=None):
|
||||
subset_key = utils.scope_subscription_ids_set(queue, project,
|
||||
SUBSCRIPTION_IDS_SUFFIX)
|
||||
member_key = utils.scope_subscribers_set(queue, project,
|
||||
SUBSCRIBERS_SUFFIX)
|
||||
subscriber = self._client.hget(subscription_id, 'u')
|
||||
# NOTE(prashanthr_): Pipelining is used to mitigate race conditions
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.srem(member_key, subscriber)
|
||||
pipe.zrem(subset_key, subscription_id)
|
||||
pipe.delete(subscription_id)
|
||||
pipe.execute()
|
||||
|
||||
@@ -122,14 +122,6 @@ def descope_subscription_ids_set(subset_key):
|
||||
return (tokens[1] or None, tokens[0] or None)
|
||||
|
||||
|
||||
def scope_subscribers_set(queue=None, project=None,
|
||||
subscriber_suffix=''):
|
||||
|
||||
return "%s.%s.%s" % (normalize_none_str(project),
|
||||
normalize_none_str(queue),
|
||||
subscriber_suffix)
|
||||
|
||||
|
||||
# NOTE(prashanthr_): Aliasing the scope_message_ids_set function
|
||||
# to be used in the pools and claims controller as similar
|
||||
# functionality is required to scope redis id's.
|
||||
@@ -241,12 +233,6 @@ def msg_expired_filter(message, now):
|
||||
return message.expires <= now
|
||||
|
||||
|
||||
def subscription_expired_filter(subscription, now):
|
||||
"""Return True if the subscription has expired."""
|
||||
|
||||
return subscription.expires <= now
|
||||
|
||||
|
||||
class QueueListCursor(object):
|
||||
|
||||
def __init__(self, client, queues, denormalizer):
|
||||
@@ -281,6 +267,12 @@ class SubscriptionListCursor(object):
|
||||
def next(self):
|
||||
curr = next(self.subscription_iter)
|
||||
subscription = self.client.hmget(curr, ['s', 'u', 't', 'o'])
|
||||
# NOTE(flwang): The expired subscription will be removed
|
||||
# automatically, but the key can't be deleted automatically as well.
|
||||
# Though we clean up those expired ids when create new subscription,
|
||||
# we still need to filter them out before a new subscription creation.
|
||||
if not subscription[0]:
|
||||
return self.next()
|
||||
return self.denormalizer(subscription, encodeutils.safe_decode(curr))
|
||||
|
||||
def __next__(self):
|
||||
|
||||
Reference in New Issue
Block a user