diff --git a/zaqar/api/v2/request.py b/zaqar/api/v2/request.py index 807f5014e..5c4ddb20f 100644 --- a/zaqar/api/v2/request.py +++ b/zaqar/api/v2/request.py @@ -58,7 +58,7 @@ class RequestSchema(v1_1.RequestSchema): 'ttl': {'type': 'integer'}, 'options': {'type': 'object'}, }, - 'required': ['queue_name', 'ttl'], + 'required': ['queue_name', ], } }, 'required': ['action', 'headers', 'body'] diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index ce6837bc9..05e9fe4aa 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. +import datetime from oslo_utils import timeutils import pymongo.errors @@ -29,6 +30,11 @@ SUBSCRIPTIONS_INDEX = [ ('p', 1), ] +# For removing expired subscriptions +TTL_INDEX_FIELDS = [ + ('e', 1), +] + class SubscriptionController(base.Subscription): """Implements subscription resource operations using MongoDB. @@ -49,6 +55,14 @@ class SubscriptionController(base.Subscription): self._collection = self.driver.subscriptions_database.subscriptions self._queue_ctrl = self.driver.queue_controller self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True) + # NOTE(flwang): MongoDB will automatically delete the subscription + # from the subscriptions collection when the subscription's 'e' value + # is older than the number of seconds specified in expireAfterSeconds, + # i.e. 0 seconds older in this case. As such, the data expires at the + # specified 'e' value. + self._collection.ensure_index(TTL_INDEX_FIELDS, name='ttl', + expireAfterSeconds=0, + background=True) @utils.raises_conn_error def list(self, queue, project=None, marker=None, @@ -92,8 +106,8 @@ class SubscriptionController(base.Subscription): def create(self, queue, subscriber, ttl, options, project=None): source = queue now = timeutils.utcnow_ts() - ttl = int(ttl) - expires = now + ttl + now_dt = datetime.datetime.utcfromtimestamp(now) + expires = now_dt + datetime.timedelta(seconds=ttl) if not self._queue_ctrl.exists(source, project): raise errors.QueueDoesNotExist(source, project) diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py index e6bbe9b01..b20ae9646 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py @@ -97,4 +97,19 @@ class TestValidation(base.V2Base): self.project_id, body='{"timespace": "Shangri-la"}', headers=empty_headers) + + def test_subscription_ttl(self): + # Normal case + body = '{"subscriber": "http://trigger.she", "ttl": 100, "options":{}}' + self.simulate_post(self.queue_path + '/subscriptions', + self.project_id, body=body, + headers=self.headers) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + # Very big TTL + body = ('{"subscriber": "http://a.c", "ttl": 99999999999999999' + ', "options":{}}') + self.simulate_post(self.queue_path + '/subscriptions', + self.project_id, body=body, + headers=self.headers) self.assertEqual(falcon.HTTP_400, self.srmock.status) diff --git a/zaqar/transport/base.py b/zaqar/transport/base.py index 8c847fa16..c99c6f006 100644 --- a/zaqar/transport/base.py +++ b/zaqar/transport/base.py @@ -33,7 +33,9 @@ _RESOURCE_DEFAULTS = ( cfg.IntOpt('default_claim_ttl', default=300, help=('Defines how long a message will be in claimed state.')), cfg.IntOpt('default_claim_grace', default=60, - help=('Defines the message grace period in seconds.')) + help=('Defines the message grace period in seconds.')), + cfg.IntOpt('default_subscription_ttl', default=3600, + help=('Defines how long a subscription will be available.')), ) _TRANSPORT_GROUP = 'transport' @@ -66,6 +68,10 @@ class ResourceDefaults(object): def claim_grace(self): return self._defaults.default_claim_grace + @property + def subscription_ttl(self): + return self._defaults.default_subscription_ttl + @six.add_metaclass(abc.ABCMeta) class DriverBase(object): diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index 40cfc6825..55120341a 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -14,9 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import re from oslo_config import cfg +from oslo_utils import timeutils import six from zaqar.i18n import _ @@ -24,6 +26,7 @@ from zaqar.i18n import _ MIN_MESSAGE_TTL = 60 MIN_CLAIM_TTL = 60 MIN_CLAIM_GRACE = 60 +MIN_SUBSCRIPTION_TTL = 60 _TRANSPORT_LIMITS_OPTIONS = ( cfg.IntOpt('max_queues_per_page', default=20, @@ -295,7 +298,7 @@ class Validator(object): :param subscription: dict of subscription :raises: ValidationFailed if the subscription is invalid. """ - for p in ('subscriber', 'ttl', 'options'): + for p in ('subscriber',): if p not in subscription.keys(): raise ValidationFailed(_(u'Missing parameter %s in body.') % p) @@ -329,9 +332,29 @@ class Validator(object): raise ValidationFailed(msg) ttl = subscription.get('ttl', None) - if ttl and not isinstance(ttl, int): - msg = _(u'TTL must be an integer.') - raise ValidationFailed(msg) + if ttl: + if not isinstance(ttl, int): + msg = _(u'TTL must be an integer.') + raise ValidationFailed(msg) + + if ttl < MIN_SUBSCRIPTION_TTL: + msg = _(u'The TTL for a subscription ' + 'must be at least {0} seconds long.') + raise ValidationFailed(msg, MIN_SUBSCRIPTION_TTL) + + # NOTE(flwang): By this change, technically, user can set a very + # big TTL so as to get a very long subscription. + now = timeutils.utcnow_ts() + now_dt = datetime.datetime.utcfromtimestamp(now) + msg = _(u'The TTL seconds for a subscription plus current time' + ' must be less than {0}.') + try: + # NOTE(flwang): If below expression works, then we believe the + # ttl is acceptable otherwise it exceeds the max time of + # python. + now_dt + datetime.timedelta(seconds=ttl) + except OverflowError: + raise ValidationFailed(msg, datetime.datetime.max) def subscription_listing(self, limit=None, **kwargs): """Restrictions involving a list of subscriptions. diff --git a/zaqar/transport/wsgi/v2_0/__init__.py b/zaqar/transport/wsgi/v2_0/__init__.py index b68effa6b..3445ba216 100644 --- a/zaqar/transport/wsgi/v2_0/__init__.py +++ b/zaqar/transport/wsgi/v2_0/__init__.py @@ -99,7 +99,8 @@ def public_endpoints(driver, conf): # Subscription Endpoints ('/queues/{queue_name}/subscriptions', subscriptions.CollectionResource(driver._validate, - subscription_controller)), + subscription_controller, + defaults.subscription_ttl)), ('/queues/{queue_name}/subscriptions/{subscription_id}', subscriptions.ItemResource(driver._validate, diff --git a/zaqar/transport/wsgi/v2_0/subscriptions.py b/zaqar/transport/wsgi/v2_0/subscriptions.py index 85b859755..b9d60280a 100644 --- a/zaqar/transport/wsgi/v2_0/subscriptions.py +++ b/zaqar/transport/wsgi/v2_0/subscriptions.py @@ -105,11 +105,14 @@ class ItemResource(object): class CollectionResource(object): - __slots__ = ('_subscription_controller', '_validate') + __slots__ = ('_subscription_controller', '_validate', + '_default_subscription_ttl') - def __init__(self, validate, subscription_controller): + def __init__(self, validate, subscription_controller, + default_subscription_ttl): self._subscription_controller = subscription_controller self._validate = validate + self._default_subscription_ttl = default_subscription_ttl @decorators.TransportLog("Subscription collection") @acl.enforce("subscription:get_all") @@ -168,8 +171,8 @@ class CollectionResource(object): try: self._validate.subscription_posting(document) subscriber = document['subscriber'] - ttl = int(document['ttl']) - options = document['options'] + ttl = document.get('ttl', self._default_subscription_ttl) + options = document.get('options', {}) created = self._subscription_controller.create(queue_name, subscriber, ttl,