Forbid update subscription to duplicate
Now if users update a subscription in a queue, it doesn't check the legality of the new subscriber. We should forbid update subscription to duplicate and return 409 response in such case. This patch modifies Redis and MongoDB storage controllers and API v2. It also adds unit tests for them. The functional test will be updated in zaqar client. APIimpact Closes-Bug: #1545548 Closes-Bug: #1547131 Co-Authored-By: wangxiyuan <wangxiyuan@huawei.com> Co-Authored-By: Eva Balycheva <ubershy@gmail.com> Change-Id: I28fe7a114860488c503642417690bf5bfbd99678
This commit is contained in:
parent
146e37c70f
commit
f94a9ee4ce
@ -665,6 +665,8 @@ class Subscription(ControllerBase):
|
||||
:param kwargs: one of: `source`, `subscriber`, `ttl`, `options`
|
||||
:type kwargs: dict
|
||||
:raises: SubscriptionDoesNotExist if not found
|
||||
:raises: SubscriptionAlreadyExists on attempt to update in a way to
|
||||
create duplicate subscription
|
||||
"""
|
||||
|
||||
raise NotImplementedError
|
||||
|
@ -204,3 +204,9 @@ class PoolCapabilitiesMismatch(ExceptionBase):
|
||||
class PoolAlreadyExists(Conflict):
|
||||
|
||||
msg_format = u'The database URI is in use by another pool.'
|
||||
|
||||
|
||||
class SubscriptionAlreadyExists(Conflict):
|
||||
|
||||
msg_format = (u'Such subscription already exists. Subscriptions '
|
||||
u'are unique by project + queue + subscriber URI.')
|
||||
|
@ -45,7 +45,7 @@ class SubscriptionController(base.Subscription):
|
||||
's': source :: six.text_type
|
||||
'u': subscriber:: six.text_type
|
||||
't': ttl:: int
|
||||
'e': expires: int
|
||||
'e': expires: datetime.datetime
|
||||
'o': options :: dict
|
||||
'p': project :: six.text_type
|
||||
"""
|
||||
@ -136,12 +136,14 @@ class SubscriptionController(base.Subscription):
|
||||
key_transform=key_transform)
|
||||
assert fields, ('`subscriber`, `ttl`, '
|
||||
'or `options` not found in kwargs')
|
||||
|
||||
res = self._collection.update({'_id': utils.to_oid(subscription_id),
|
||||
'p': project},
|
||||
{'$set': fields},
|
||||
upsert=False)
|
||||
|
||||
try:
|
||||
res = self._collection.update(
|
||||
{'_id': utils.to_oid(subscription_id),
|
||||
'p': project},
|
||||
{'$set': fields},
|
||||
upsert=False)
|
||||
except pymongo.errors.DuplicateKeyError:
|
||||
raise errors.SubscriptionAlreadyExists()
|
||||
if not res['updatedExisting']:
|
||||
raise errors.SubscriptionDoesNotExist(subscription_id)
|
||||
|
||||
|
@ -183,6 +183,24 @@ class SubscriptionController(base.Subscription):
|
||||
key_transform=key_transform)
|
||||
assert fields, ('`subscriber`, `ttl`, '
|
||||
'or `options` not found in kwargs')
|
||||
|
||||
# Let's get our subscription by ID. If it does not exist,
|
||||
# SubscriptionDoesNotExist error will be raised internally.
|
||||
subscription_to_update = self.get(queue, subscription_id,
|
||||
project=project)
|
||||
|
||||
new_subscriber = fields.get('u', None)
|
||||
|
||||
# Let's do some checks to prevent subscription duplication.
|
||||
if new_subscriber:
|
||||
# Check if 'new_subscriber' is really new for our subscription.
|
||||
if subscription_to_update['subscriber'] != new_subscriber:
|
||||
# It's new. We should raise error if this subscriber already
|
||||
# exists for the queue and project.
|
||||
if self._is_duplicated_subscriber(new_subscriber, queue,
|
||||
project):
|
||||
raise errors.SubscriptionAlreadyExists()
|
||||
|
||||
# NOTE(Eva-i): if there are new options, we need to pack them before
|
||||
# sending to the database.
|
||||
new_options = fields.get('o', None)
|
||||
|
@ -1095,6 +1095,47 @@ class SubscriptionControllerTest(ControllerBaseTest):
|
||||
self.options,
|
||||
self.project)
|
||||
|
||||
def test_update_raises_if_try_to_update_to_existing_subscription(self):
|
||||
# create two subscriptions: fake_0 and fake_1
|
||||
ids = []
|
||||
for s in six.moves.xrange(2):
|
||||
subscriber = 'http://fake_{0}'.format(s)
|
||||
s_id = self.subscription_controller.create(
|
||||
self.source,
|
||||
subscriber,
|
||||
self.ttl,
|
||||
self.options,
|
||||
project=self.project)
|
||||
self.addCleanup(self.subscription_controller.delete, self.source,
|
||||
s_id, self.project)
|
||||
ids.append(s_id)
|
||||
# update fake_0 to fake_2, success
|
||||
update_fields = {
|
||||
'subscriber': 'http://fake_2'
|
||||
}
|
||||
self.subscription_controller.update(self.queue_name,
|
||||
ids[0],
|
||||
project=self.project,
|
||||
**update_fields)
|
||||
# update fake_1 to fake_2, raise error
|
||||
self.assertRaises(errors.SubscriptionAlreadyExists,
|
||||
self.subscription_controller.update,
|
||||
self.queue_name,
|
||||
ids[1],
|
||||
project=self.project,
|
||||
**update_fields)
|
||||
|
||||
def test_update_raises_if_subscription_does_not_exist(self):
|
||||
update_fields = {
|
||||
'subscriber': 'http://fake'
|
||||
}
|
||||
self.assertRaises(errors.SubscriptionDoesNotExist,
|
||||
self.subscription_controller.update,
|
||||
self.queue_name,
|
||||
'notexists',
|
||||
project=self.project,
|
||||
**update_fields)
|
||||
|
||||
|
||||
class PoolsControllerTest(ControllerBaseTest):
|
||||
"""Pools Controller base tests.
|
||||
|
@ -283,6 +283,18 @@ class TestSubscriptionsMongoDB(base.V2Base):
|
||||
headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_404, self.srmock.status)
|
||||
|
||||
def test_patch_to_duplicate_raise_409(self):
|
||||
self._create_subscription()
|
||||
toupdate = self._create_subscription(subscriber='http://update.me',
|
||||
ttl=600,
|
||||
options='{"a":1}')
|
||||
toupdate_sid = jsonutils.loads(toupdate[0])['subscription_id']
|
||||
doc = {'subscriber': 'http://triger.me'}
|
||||
self.simulate_patch(self.subscription_path + '/' + toupdate_sid,
|
||||
body=jsonutils.dumps(doc),
|
||||
headers=self.headers)
|
||||
self.assertEqual(falcon.HTTP_409, self.srmock.status)
|
||||
|
||||
def test_patch_no_body(self):
|
||||
self._create_subscription()
|
||||
resp = self.simulate_get(self.subscription_path,
|
||||
|
@ -91,6 +91,9 @@ class ItemResource(object):
|
||||
except storage_errors.SubscriptionDoesNotExist as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPNotFound(six.text_type(ex))
|
||||
except storage_errors.SubscriptionAlreadyExists as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPConflict(six.text_type(ex))
|
||||
except validation.ValidationFailed as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
|
Loading…
Reference in New Issue
Block a user