diff --git a/zaqar/storage/base.py b/zaqar/storage/base.py index a4a5b1711..1971f6813 100644 --- a/zaqar/storage/base.py +++ b/zaqar/storage/base.py @@ -665,6 +665,8 @@ class Subscription(ControllerBase): :param kwargs: one of: `source`, `subscriber`, `ttl`, `options` :type kwargs: dict :raises: SubscriptionDoesNotExist if not found + :raises: SubscriptionAlreadyExists on attempt to update in a way to + create duplicate subscription """ raise NotImplementedError diff --git a/zaqar/storage/errors.py b/zaqar/storage/errors.py index cb24a8985..0a534927e 100644 --- a/zaqar/storage/errors.py +++ b/zaqar/storage/errors.py @@ -204,3 +204,9 @@ class PoolCapabilitiesMismatch(ExceptionBase): class PoolAlreadyExists(Conflict): msg_format = u'The database URI is in use by another pool.' + + +class SubscriptionAlreadyExists(Conflict): + + msg_format = (u'Such subscription already exists. Subscriptions ' + u'are unique by project + queue + subscriber URI.') diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index 05e9fe4aa..9d5ce0cdf 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -45,7 +45,7 @@ class SubscriptionController(base.Subscription): 's': source :: six.text_type 'u': subscriber:: six.text_type 't': ttl:: int - 'e': expires: int + 'e': expires: datetime.datetime 'o': options :: dict 'p': project :: six.text_type """ @@ -136,12 +136,14 @@ class SubscriptionController(base.Subscription): key_transform=key_transform) assert fields, ('`subscriber`, `ttl`, ' 'or `options` not found in kwargs') - - res = self._collection.update({'_id': utils.to_oid(subscription_id), - 'p': project}, - {'$set': fields}, - upsert=False) - + try: + res = self._collection.update( + {'_id': utils.to_oid(subscription_id), + 'p': project}, + {'$set': fields}, + upsert=False) + except pymongo.errors.DuplicateKeyError: + raise errors.SubscriptionAlreadyExists() if not res['updatedExisting']: raise errors.SubscriptionDoesNotExist(subscription_id) diff --git a/zaqar/storage/redis/subscriptions.py b/zaqar/storage/redis/subscriptions.py index 0c187376f..acc91dba3 100644 --- a/zaqar/storage/redis/subscriptions.py +++ b/zaqar/storage/redis/subscriptions.py @@ -183,6 +183,24 @@ class SubscriptionController(base.Subscription): key_transform=key_transform) assert fields, ('`subscriber`, `ttl`, ' 'or `options` not found in kwargs') + + # Let's get our subscription by ID. If it does not exist, + # SubscriptionDoesNotExist error will be raised internally. + subscription_to_update = self.get(queue, subscription_id, + project=project) + + new_subscriber = fields.get('u', None) + + # Let's do some checks to prevent subscription duplication. + if new_subscriber: + # Check if 'new_subscriber' is really new for our subscription. + if subscription_to_update['subscriber'] != new_subscriber: + # It's new. We should raise error if this subscriber already + # exists for the queue and project. + if self._is_duplicated_subscriber(new_subscriber, queue, + project): + raise errors.SubscriptionAlreadyExists() + # NOTE(Eva-i): if there are new options, we need to pack them before # sending to the database. new_options = fields.get('o', None) diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index e5e04088c..90028baf8 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -1095,6 +1095,47 @@ class SubscriptionControllerTest(ControllerBaseTest): self.options, self.project) + def test_update_raises_if_try_to_update_to_existing_subscription(self): + # create two subscriptions: fake_0 and fake_1 + ids = [] + for s in six.moves.xrange(2): + subscriber = 'http://fake_{0}'.format(s) + s_id = self.subscription_controller.create( + self.source, + subscriber, + self.ttl, + self.options, + project=self.project) + self.addCleanup(self.subscription_controller.delete, self.source, + s_id, self.project) + ids.append(s_id) + # update fake_0 to fake_2, success + update_fields = { + 'subscriber': 'http://fake_2' + } + self.subscription_controller.update(self.queue_name, + ids[0], + project=self.project, + **update_fields) + # update fake_1 to fake_2, raise error + self.assertRaises(errors.SubscriptionAlreadyExists, + self.subscription_controller.update, + self.queue_name, + ids[1], + project=self.project, + **update_fields) + + def test_update_raises_if_subscription_does_not_exist(self): + update_fields = { + 'subscriber': 'http://fake' + } + self.assertRaises(errors.SubscriptionDoesNotExist, + self.subscription_controller.update, + self.queue_name, + 'notexists', + project=self.project, + **update_fields) + class PoolsControllerTest(ControllerBaseTest): """Pools Controller base tests. diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py index 902cc4a9c..38ce86b11 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py @@ -283,6 +283,18 @@ class TestSubscriptionsMongoDB(base.V2Base): headers=self.headers) self.assertEqual(falcon.HTTP_404, self.srmock.status) + def test_patch_to_duplicate_raise_409(self): + self._create_subscription() + toupdate = self._create_subscription(subscriber='http://update.me', + ttl=600, + options='{"a":1}') + toupdate_sid = jsonutils.loads(toupdate[0])['subscription_id'] + doc = {'subscriber': 'http://triger.me'} + self.simulate_patch(self.subscription_path + '/' + toupdate_sid, + body=jsonutils.dumps(doc), + headers=self.headers) + self.assertEqual(falcon.HTTP_409, self.srmock.status) + def test_patch_no_body(self): self._create_subscription() resp = self.simulate_get(self.subscription_path, diff --git a/zaqar/transport/wsgi/v2_0/subscriptions.py b/zaqar/transport/wsgi/v2_0/subscriptions.py index 2a6e69d57..9b4c1ece6 100644 --- a/zaqar/transport/wsgi/v2_0/subscriptions.py +++ b/zaqar/transport/wsgi/v2_0/subscriptions.py @@ -91,6 +91,9 @@ class ItemResource(object): except storage_errors.SubscriptionDoesNotExist as ex: LOG.debug(ex) raise wsgi_errors.HTTPNotFound(six.text_type(ex)) + except storage_errors.SubscriptionAlreadyExists as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPConflict(six.text_type(ex)) except validation.ValidationFailed as ex: LOG.debug(ex) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))