Merge "Fix the TTL issue of subscriptions for Redis"
This commit is contained in:
commit
8bfb318484
@ -147,7 +147,7 @@ class SubscriptionEnvelope(object):
|
||||
pipe.hmset(self.id, hmap)
|
||||
pipe.expire(self.id, self.ttl)
|
||||
|
||||
def to_basic(self, now):
|
||||
def to_basic(self):
|
||||
basic_msg = {
|
||||
'id': self.id,
|
||||
'source': self.source,
|
||||
|
@ -31,7 +31,6 @@ SubscriptionEnvelope = models.SubscriptionEnvelope
|
||||
|
||||
SUBSET_INDEX_KEY = 'subset_index'
|
||||
SUBSCRIPTION_IDS_SUFFIX = 'subscriptions'
|
||||
SUBSCRIBERS_SUFFIX = 'subscribers'
|
||||
|
||||
|
||||
class SubscriptionController(base.Subscription):
|
||||
@ -93,21 +92,14 @@ class SubscriptionController(base.Subscription):
|
||||
|
||||
subscription = SubscriptionEnvelope.from_redis(subscription_id,
|
||||
self._client)
|
||||
now = timeutils.utcnow_ts()
|
||||
|
||||
if subscription and not utils.subscription_expired_filter(subscription,
|
||||
now):
|
||||
return subscription.to_basic(now)
|
||||
if subscription:
|
||||
return subscription.to_basic()
|
||||
else:
|
||||
raise errors.SubscriptionDoesNotExist(subscription_id)
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def create(self, queue, subscriber, ttl, options, project=None):
|
||||
member_key = utils.scope_subscribers_set(queue, project,
|
||||
SUBSCRIBERS_SUFFIX)
|
||||
if self._client.sismember(member_key, subscriber):
|
||||
return None
|
||||
subscription_id = str(uuid.uuid4())
|
||||
subset_key = utils.scope_subscription_ids_set(queue,
|
||||
project,
|
||||
@ -115,7 +107,6 @@ class SubscriptionController(base.Subscription):
|
||||
|
||||
source = queue
|
||||
now = timeutils.utcnow_ts()
|
||||
ttl = int(ttl)
|
||||
expires = now + ttl
|
||||
|
||||
subscription = {'id': subscription_id,
|
||||
@ -131,14 +122,49 @@ class SubscriptionController(base.Subscription):
|
||||
try:
|
||||
# Pipeline ensures atomic inserts.
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.sadd(member_key, subscriber)
|
||||
pipe.zadd(subset_key, 1,
|
||||
subscription_id).hmset(subscription_id, subscription)
|
||||
pipe.execute()
|
||||
if not self._is_duplicated_subscriber(subscriber,
|
||||
queue,
|
||||
project):
|
||||
pipe.zadd(subset_key, 1,
|
||||
subscription_id).hmset(subscription_id,
|
||||
subscription)
|
||||
pipe.expire(subscription_id, ttl)
|
||||
pipe.execute()
|
||||
else:
|
||||
return None
|
||||
return subscription_id
|
||||
except redis.exceptions.ResponseError:
|
||||
return None
|
||||
|
||||
def _is_duplicated_subscriber(self, subscriber, queue, project):
|
||||
"""Check if the subscriber is existing or not.
|
||||
|
||||
Given the limitation of Redis' expires(), it's hard to auto expire
|
||||
subscriber from the set and subscription id from the sorted set, so
|
||||
this method is used to do a ugly duplication check when adding a new
|
||||
subscription so that we don't need the set for subscriber. And as a
|
||||
side effect, this method will remove the unreachable subscription's id
|
||||
from the sorted set.
|
||||
"""
|
||||
subset_key = utils.scope_subscription_ids_set(queue,
|
||||
project,
|
||||
SUBSCRIPTION_IDS_SUFFIX)
|
||||
try:
|
||||
sub_ids = (q for q in self._client.zrange(subset_key, 0, -1))
|
||||
for s_id in sub_ids:
|
||||
subscription = self._client.hmget(s_id, ['s', 'u', 't', 'o'])
|
||||
if subscription == [None, None, None, None]:
|
||||
# NOTE(flwang): Under this check, that means the
|
||||
# subscription has been expired. So redis can't get
|
||||
# the subscription but the id is still there. So let's
|
||||
# delete the id for clean up.
|
||||
self._client.zrem(subset_key, s_id)
|
||||
if subscription[1] == subscriber:
|
||||
return True
|
||||
return False
|
||||
except redis.exceptions.ResponseError:
|
||||
return True
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def exists(self, queue, subscription_id, project=None):
|
||||
@ -168,12 +194,8 @@ class SubscriptionController(base.Subscription):
|
||||
def delete(self, queue, subscription_id, project=None):
|
||||
subset_key = utils.scope_subscription_ids_set(queue, project,
|
||||
SUBSCRIPTION_IDS_SUFFIX)
|
||||
member_key = utils.scope_subscribers_set(queue, project,
|
||||
SUBSCRIBERS_SUFFIX)
|
||||
subscriber = self._client.hget(subscription_id, 'u')
|
||||
# NOTE(prashanthr_): Pipelining is used to mitigate race conditions
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.srem(member_key, subscriber)
|
||||
pipe.zrem(subset_key, subscription_id)
|
||||
pipe.delete(subscription_id)
|
||||
pipe.execute()
|
||||
|
@ -122,14 +122,6 @@ def descope_subscription_ids_set(subset_key):
|
||||
return (tokens[1] or None, tokens[0] or None)
|
||||
|
||||
|
||||
def scope_subscribers_set(queue=None, project=None,
|
||||
subscriber_suffix=''):
|
||||
|
||||
return "%s.%s.%s" % (normalize_none_str(project),
|
||||
normalize_none_str(queue),
|
||||
subscriber_suffix)
|
||||
|
||||
|
||||
# NOTE(prashanthr_): Aliasing the scope_message_ids_set function
|
||||
# to be used in the pools and claims controller as similar
|
||||
# functionality is required to scope redis id's.
|
||||
@ -241,12 +233,6 @@ def msg_expired_filter(message, now):
|
||||
return message.expires <= now
|
||||
|
||||
|
||||
def subscription_expired_filter(subscription, now):
|
||||
"""Return True if the subscription has expired."""
|
||||
|
||||
return subscription.expires <= now
|
||||
|
||||
|
||||
class QueueListCursor(object):
|
||||
|
||||
def __init__(self, client, queues, denormalizer):
|
||||
@ -281,6 +267,12 @@ class SubscriptionListCursor(object):
|
||||
def next(self):
|
||||
curr = next(self.subscription_iter)
|
||||
subscription = self.client.hmget(curr, ['s', 'u', 't', 'o'])
|
||||
# NOTE(flwang): The expired subscription will be removed
|
||||
# automatically, but the key can't be deleted automatically as well.
|
||||
# Though we clean up those expired ids when create new subscription,
|
||||
# we still need to filter them out before a new subscription creation.
|
||||
if not subscription[0]:
|
||||
return self.next()
|
||||
return self.denormalizer(subscription, encodeutils.safe_decode(curr))
|
||||
|
||||
def __next__(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user