From f94a9ee4cee28e79fbb4af1da66ab876f2c3079f Mon Sep 17 00:00:00 2001 From: wangxiyuan Date: Sun, 14 Feb 2016 17:30:35 +0800 Subject: [PATCH] Forbid update subscription to duplicate Now if users update a subscription in a queue, it doesn't check the legality of the new subscriber. We should forbid update subscription to duplicate and return 409 response in such case. This patch modifies Redis and MongoDB storage controllers and API v2. It also adds unit tests for them. The functional test will be updated in zaqar client. APIimpact Closes-Bug: #1545548 Closes-Bug: #1547131 Co-Authored-By: wangxiyuan Co-Authored-By: Eva Balycheva Change-Id: I28fe7a114860488c503642417690bf5bfbd99678 --- zaqar/storage/base.py | 2 + zaqar/storage/errors.py | 6 +++ zaqar/storage/mongodb/subscriptions.py | 16 ++++---- zaqar/storage/redis/subscriptions.py | 18 ++++++++ zaqar/tests/unit/storage/base.py | 41 +++++++++++++++++++ .../transport/wsgi/v2_0/test_subscriptions.py | 12 ++++++ zaqar/transport/wsgi/v2_0/subscriptions.py | 3 ++ 7 files changed, 91 insertions(+), 7 deletions(-) diff --git a/zaqar/storage/base.py b/zaqar/storage/base.py index a4a5b1711..1971f6813 100644 --- a/zaqar/storage/base.py +++ b/zaqar/storage/base.py @@ -665,6 +665,8 @@ class Subscription(ControllerBase): :param kwargs: one of: `source`, `subscriber`, `ttl`, `options` :type kwargs: dict :raises: SubscriptionDoesNotExist if not found + :raises: SubscriptionAlreadyExists on attempt to update in a way to + create duplicate subscription """ raise NotImplementedError diff --git a/zaqar/storage/errors.py b/zaqar/storage/errors.py index cb24a8985..0a534927e 100644 --- a/zaqar/storage/errors.py +++ b/zaqar/storage/errors.py @@ -204,3 +204,9 @@ class PoolCapabilitiesMismatch(ExceptionBase): class PoolAlreadyExists(Conflict): msg_format = u'The database URI is in use by another pool.' + + +class SubscriptionAlreadyExists(Conflict): + + msg_format = (u'Such subscription already exists. Subscriptions ' + u'are unique by project + queue + subscriber URI.') diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index 05e9fe4aa..9d5ce0cdf 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -45,7 +45,7 @@ class SubscriptionController(base.Subscription): 's': source :: six.text_type 'u': subscriber:: six.text_type 't': ttl:: int - 'e': expires: int + 'e': expires: datetime.datetime 'o': options :: dict 'p': project :: six.text_type """ @@ -136,12 +136,14 @@ class SubscriptionController(base.Subscription): key_transform=key_transform) assert fields, ('`subscriber`, `ttl`, ' 'or `options` not found in kwargs') - - res = self._collection.update({'_id': utils.to_oid(subscription_id), - 'p': project}, - {'$set': fields}, - upsert=False) - + try: + res = self._collection.update( + {'_id': utils.to_oid(subscription_id), + 'p': project}, + {'$set': fields}, + upsert=False) + except pymongo.errors.DuplicateKeyError: + raise errors.SubscriptionAlreadyExists() if not res['updatedExisting']: raise errors.SubscriptionDoesNotExist(subscription_id) diff --git a/zaqar/storage/redis/subscriptions.py b/zaqar/storage/redis/subscriptions.py index 0c187376f..acc91dba3 100644 --- a/zaqar/storage/redis/subscriptions.py +++ b/zaqar/storage/redis/subscriptions.py @@ -183,6 +183,24 @@ class SubscriptionController(base.Subscription): key_transform=key_transform) assert fields, ('`subscriber`, `ttl`, ' 'or `options` not found in kwargs') + + # Let's get our subscription by ID. If it does not exist, + # SubscriptionDoesNotExist error will be raised internally. + subscription_to_update = self.get(queue, subscription_id, + project=project) + + new_subscriber = fields.get('u', None) + + # Let's do some checks to prevent subscription duplication. + if new_subscriber: + # Check if 'new_subscriber' is really new for our subscription. + if subscription_to_update['subscriber'] != new_subscriber: + # It's new. We should raise error if this subscriber already + # exists for the queue and project. + if self._is_duplicated_subscriber(new_subscriber, queue, + project): + raise errors.SubscriptionAlreadyExists() + # NOTE(Eva-i): if there are new options, we need to pack them before # sending to the database. new_options = fields.get('o', None) diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index e5e04088c..90028baf8 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -1095,6 +1095,47 @@ class SubscriptionControllerTest(ControllerBaseTest): self.options, self.project) + def test_update_raises_if_try_to_update_to_existing_subscription(self): + # create two subscriptions: fake_0 and fake_1 + ids = [] + for s in six.moves.xrange(2): + subscriber = 'http://fake_{0}'.format(s) + s_id = self.subscription_controller.create( + self.source, + subscriber, + self.ttl, + self.options, + project=self.project) + self.addCleanup(self.subscription_controller.delete, self.source, + s_id, self.project) + ids.append(s_id) + # update fake_0 to fake_2, success + update_fields = { + 'subscriber': 'http://fake_2' + } + self.subscription_controller.update(self.queue_name, + ids[0], + project=self.project, + **update_fields) + # update fake_1 to fake_2, raise error + self.assertRaises(errors.SubscriptionAlreadyExists, + self.subscription_controller.update, + self.queue_name, + ids[1], + project=self.project, + **update_fields) + + def test_update_raises_if_subscription_does_not_exist(self): + update_fields = { + 'subscriber': 'http://fake' + } + self.assertRaises(errors.SubscriptionDoesNotExist, + self.subscription_controller.update, + self.queue_name, + 'notexists', + project=self.project, + **update_fields) + class PoolsControllerTest(ControllerBaseTest): """Pools Controller base tests. diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py index 902cc4a9c..38ce86b11 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py @@ -283,6 +283,18 @@ class TestSubscriptionsMongoDB(base.V2Base): headers=self.headers) self.assertEqual(falcon.HTTP_404, self.srmock.status) + def test_patch_to_duplicate_raise_409(self): + self._create_subscription() + toupdate = self._create_subscription(subscriber='http://update.me', + ttl=600, + options='{"a":1}') + toupdate_sid = jsonutils.loads(toupdate[0])['subscription_id'] + doc = {'subscriber': 'http://triger.me'} + self.simulate_patch(self.subscription_path + '/' + toupdate_sid, + body=jsonutils.dumps(doc), + headers=self.headers) + self.assertEqual(falcon.HTTP_409, self.srmock.status) + def test_patch_no_body(self): self._create_subscription() resp = self.simulate_get(self.subscription_path, diff --git a/zaqar/transport/wsgi/v2_0/subscriptions.py b/zaqar/transport/wsgi/v2_0/subscriptions.py index 2a6e69d57..9b4c1ece6 100644 --- a/zaqar/transport/wsgi/v2_0/subscriptions.py +++ b/zaqar/transport/wsgi/v2_0/subscriptions.py @@ -91,6 +91,9 @@ class ItemResource(object): except storage_errors.SubscriptionDoesNotExist as ex: LOG.debug(ex) raise wsgi_errors.HTTPNotFound(six.text_type(ex)) + except storage_errors.SubscriptionAlreadyExists as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPConflict(six.text_type(ex)) except validation.ValidationFailed as ex: LOG.debug(ex) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))