Merge "Fix the TTL issue of subscriptions for MongoDB"
This commit is contained in:
commit
883bade05a
@ -58,7 +58,7 @@ class RequestSchema(v1_1.RequestSchema):
|
||||
'ttl': {'type': 'integer'},
|
||||
'options': {'type': 'object'},
|
||||
},
|
||||
'required': ['queue_name', 'ttl'],
|
||||
'required': ['queue_name', ],
|
||||
}
|
||||
},
|
||||
'required': ['action', 'headers', 'body']
|
||||
|
@ -11,6 +11,7 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
import datetime
|
||||
|
||||
from oslo_utils import timeutils
|
||||
import pymongo.errors
|
||||
@ -29,6 +30,11 @@ SUBSCRIPTIONS_INDEX = [
|
||||
('p', 1),
|
||||
]
|
||||
|
||||
# For removing expired subscriptions
|
||||
TTL_INDEX_FIELDS = [
|
||||
('e', 1),
|
||||
]
|
||||
|
||||
|
||||
class SubscriptionController(base.Subscription):
|
||||
"""Implements subscription resource operations using MongoDB.
|
||||
@ -49,6 +55,14 @@ class SubscriptionController(base.Subscription):
|
||||
self._collection = self.driver.subscriptions_database.subscriptions
|
||||
self._queue_ctrl = self.driver.queue_controller
|
||||
self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True)
|
||||
# NOTE(flwang): MongoDB will automatically delete the subscription
|
||||
# from the subscriptions collection when the subscription's 'e' value
|
||||
# is older than the number of seconds specified in expireAfterSeconds,
|
||||
# i.e. 0 seconds older in this case. As such, the data expires at the
|
||||
# specified 'e' value.
|
||||
self._collection.ensure_index(TTL_INDEX_FIELDS, name='ttl',
|
||||
expireAfterSeconds=0,
|
||||
background=True)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def list(self, queue, project=None, marker=None,
|
||||
@ -92,8 +106,8 @@ class SubscriptionController(base.Subscription):
|
||||
def create(self, queue, subscriber, ttl, options, project=None):
|
||||
source = queue
|
||||
now = timeutils.utcnow_ts()
|
||||
ttl = int(ttl)
|
||||
expires = now + ttl
|
||||
now_dt = datetime.datetime.utcfromtimestamp(now)
|
||||
expires = now_dt + datetime.timedelta(seconds=ttl)
|
||||
|
||||
if not self._queue_ctrl.exists(source, project):
|
||||
raise errors.QueueDoesNotExist(source, project)
|
||||
|
@ -97,4 +97,19 @@ class TestValidation(base.V2Base):
|
||||
self.project_id,
|
||||
body='{"timespace": "Shangri-la"}',
|
||||
headers=empty_headers)
|
||||
|
||||
def test_subscription_ttl(self):
|
||||
# Normal case
|
||||
body = '{"subscriber": "http://trigger.she", "ttl": 100, "options":{}}'
|
||||
self.simulate_post(self.queue_path + '/subscriptions',
|
||||
self.project_id, body=body,
|
||||
headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||
|
||||
# Very big TTL
|
||||
body = ('{"subscriber": "http://a.c", "ttl": 99999999999999999'
|
||||
', "options":{}}')
|
||||
self.simulate_post(self.queue_path + '/subscriptions',
|
||||
self.project_id, body=body,
|
||||
headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
|
@ -33,7 +33,9 @@ _RESOURCE_DEFAULTS = (
|
||||
cfg.IntOpt('default_claim_ttl', default=300,
|
||||
help=('Defines how long a message will be in claimed state.')),
|
||||
cfg.IntOpt('default_claim_grace', default=60,
|
||||
help=('Defines the message grace period in seconds.'))
|
||||
help=('Defines the message grace period in seconds.')),
|
||||
cfg.IntOpt('default_subscription_ttl', default=3600,
|
||||
help=('Defines how long a subscription will be available.')),
|
||||
)
|
||||
|
||||
_TRANSPORT_GROUP = 'transport'
|
||||
@ -66,6 +68,10 @@ class ResourceDefaults(object):
|
||||
def claim_grace(self):
|
||||
return self._defaults.default_claim_grace
|
||||
|
||||
@property
|
||||
def subscription_ttl(self):
|
||||
return self._defaults.default_subscription_ttl
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class DriverBase(object):
|
||||
|
@ -14,9 +14,11 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import datetime
|
||||
import re
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
|
||||
from zaqar.i18n import _
|
||||
@ -24,6 +26,7 @@ from zaqar.i18n import _
|
||||
MIN_MESSAGE_TTL = 60
|
||||
MIN_CLAIM_TTL = 60
|
||||
MIN_CLAIM_GRACE = 60
|
||||
MIN_SUBSCRIPTION_TTL = 60
|
||||
|
||||
_TRANSPORT_LIMITS_OPTIONS = (
|
||||
cfg.IntOpt('max_queues_per_page', default=20,
|
||||
@ -295,7 +298,7 @@ class Validator(object):
|
||||
:param subscription: dict of subscription
|
||||
:raises: ValidationFailed if the subscription is invalid.
|
||||
"""
|
||||
for p in ('subscriber', 'ttl', 'options'):
|
||||
for p in ('subscriber',):
|
||||
if p not in subscription.keys():
|
||||
raise ValidationFailed(_(u'Missing parameter %s in body.') % p)
|
||||
|
||||
@ -329,10 +332,30 @@ class Validator(object):
|
||||
raise ValidationFailed(msg)
|
||||
|
||||
ttl = subscription.get('ttl', None)
|
||||
if ttl and not isinstance(ttl, int):
|
||||
if ttl:
|
||||
if not isinstance(ttl, int):
|
||||
msg = _(u'TTL must be an integer.')
|
||||
raise ValidationFailed(msg)
|
||||
|
||||
if ttl < MIN_SUBSCRIPTION_TTL:
|
||||
msg = _(u'The TTL for a subscription '
|
||||
'must be at least {0} seconds long.')
|
||||
raise ValidationFailed(msg, MIN_SUBSCRIPTION_TTL)
|
||||
|
||||
# NOTE(flwang): By this change, technically, user can set a very
|
||||
# big TTL so as to get a very long subscription.
|
||||
now = timeutils.utcnow_ts()
|
||||
now_dt = datetime.datetime.utcfromtimestamp(now)
|
||||
msg = _(u'The TTL seconds for a subscription plus current time'
|
||||
' must be less than {0}.')
|
||||
try:
|
||||
# NOTE(flwang): If below expression works, then we believe the
|
||||
# ttl is acceptable otherwise it exceeds the max time of
|
||||
# python.
|
||||
now_dt + datetime.timedelta(seconds=ttl)
|
||||
except OverflowError:
|
||||
raise ValidationFailed(msg, datetime.datetime.max)
|
||||
|
||||
def subscription_listing(self, limit=None, **kwargs):
|
||||
"""Restrictions involving a list of subscriptions.
|
||||
|
||||
|
@ -99,7 +99,8 @@ def public_endpoints(driver, conf):
|
||||
# Subscription Endpoints
|
||||
('/queues/{queue_name}/subscriptions',
|
||||
subscriptions.CollectionResource(driver._validate,
|
||||
subscription_controller)),
|
||||
subscription_controller,
|
||||
defaults.subscription_ttl)),
|
||||
|
||||
('/queues/{queue_name}/subscriptions/{subscription_id}',
|
||||
subscriptions.ItemResource(driver._validate,
|
||||
|
@ -105,11 +105,14 @@ class ItemResource(object):
|
||||
|
||||
class CollectionResource(object):
|
||||
|
||||
__slots__ = ('_subscription_controller', '_validate')
|
||||
__slots__ = ('_subscription_controller', '_validate',
|
||||
'_default_subscription_ttl')
|
||||
|
||||
def __init__(self, validate, subscription_controller):
|
||||
def __init__(self, validate, subscription_controller,
|
||||
default_subscription_ttl):
|
||||
self._subscription_controller = subscription_controller
|
||||
self._validate = validate
|
||||
self._default_subscription_ttl = default_subscription_ttl
|
||||
|
||||
@decorators.TransportLog("Subscription collection")
|
||||
@acl.enforce("subscription:get_all")
|
||||
@ -167,8 +170,8 @@ class CollectionResource(object):
|
||||
try:
|
||||
self._validate.subscription_posting(document)
|
||||
subscriber = document['subscriber']
|
||||
ttl = int(document['ttl'])
|
||||
options = document['options']
|
||||
ttl = document.get('ttl', self._default_subscription_ttl)
|
||||
options = document.get('options', {})
|
||||
created = self._subscription_controller.create(queue_name,
|
||||
subscriber,
|
||||
ttl,
|
||||
|
Loading…
Reference in New Issue
Block a user