don't allow get/update/delete subscirtions with wrong queue

A subscription is always belong to a queue. We should
not allow users to get/update/delete the subscription
if they provide wrong queue name.

Change-Id: I4728fa5139962aa621d0af6fb385613435d3b1be
Closes-bug: #1653847
This commit is contained in:
wangxiyuan 2017-01-04 10:12:26 +08:00
parent b41802c3ad
commit 455cf6eb5c
4 changed files with 53 additions and 14 deletions

View File

@ -90,7 +90,8 @@ class SubscriptionController(base.Subscription):
@utils.raises_conn_error
def get(self, queue, subscription_id, project=None):
res = self._collection.find_one({'_id': utils.to_oid(subscription_id),
'p': project})
'p': project,
's': queue})
if not res:
raise errors.SubscriptionDoesNotExist(subscription_id)
@ -143,7 +144,8 @@ class SubscriptionController(base.Subscription):
try:
res = self._collection.update(
{'_id': utils.to_oid(subscription_id),
'p': project},
'p': project,
's': queue},
{'$set': fields},
upsert=False)
except pymongo.errors.DuplicateKeyError:
@ -154,7 +156,8 @@ class SubscriptionController(base.Subscription):
@utils.raises_conn_error
def delete(self, queue, subscription_id, project=None):
self._collection.remove({'_id': utils.to_oid(subscription_id),
'p': project}, w=0)
'p': project,
's': queue}, w=0)
@utils.raises_conn_error
def get_with_subscriber(self, queue, subscriber, project=None):

View File

@ -93,8 +93,10 @@ class SubscriptionController(base.Subscription):
@utils.raises_conn_error
@utils.retries_on_connection_error
def get(self, queue, subscription_id, project=None):
subscription = SubscriptionEnvelope.from_redis(subscription_id,
self._client)
subscription = None
if self.exists(queue, subscription_id, project):
subscription = SubscriptionEnvelope.from_redis(subscription_id,
self._client)
if subscription:
now = timeutils.utcnow_ts()
return subscription.to_basic(now)
@ -230,11 +232,13 @@ class SubscriptionController(base.Subscription):
def delete(self, queue, subscription_id, project=None):
subset_key = utils.scope_subscription_ids_set(queue, project,
SUBSCRIPTION_IDS_SUFFIX)
# NOTE(prashanthr_): Pipelining is used to mitigate race conditions
with self._client.pipeline() as pipe:
pipe.zrem(subset_key, subscription_id)
pipe.delete(subscription_id)
pipe.execute()
if self._client.zrank(subset_key, subscription_id) is not None:
# NOTE(prashanthr_): Pipelining is used to mitigate race conditions
with self._client.pipeline() as pipe:
pipe.zrem(subset_key, subscription_id)
pipe.delete(subscription_id)
pipe.execute()
@utils.raises_conn_error
@utils.retries_on_connection_error

View File

@ -239,10 +239,9 @@ class TestSubscriptionsNegative(base.BaseV2MessagingTest):
subscription_id = results[0][1]["subscription_id"]
non_existent_queue = data_utils.rand_name('rand_queuename')
update_rbody = {'ttl': 1000}
resp, _ = self.client.update_subscription(non_existent_queue,
subscription_id,
update_rbody)
self.assertEqual('204', resp['status'])
self.assertRaises(lib_exc.NotFound, self.client.update_subscription,
non_existent_queue, subscription_id, update_rbody)
for result in results:
subscription_id = result[1]["subscription_id"]
self.delete_subscription(self.queue_name, subscription_id)

View File

@ -1175,6 +1175,39 @@ class SubscriptionControllerTest(ControllerBaseTest):
project=self.project)
self.assertIsNone(s_id)
def test_get_update_delete_on_non_existing_queue(self):
self._precreate_queue(precreate_queue=True)
s_id = self.subscription_controller.create(
self.source,
self.subscriber,
self.ttl,
self.options,
project=self.project)
self.addCleanup(self.subscription_controller.delete, self.source, s_id,
self.project)
self.assertIsNotNone(s_id)
non_existing_queue = "fake_name"
# get
self.assertRaises(errors.SubscriptionDoesNotExist,
self.subscription_controller.get,
non_existing_queue, s_id, project=self.project)
# update
body = {
"subscriber": self.subscriber,
"ttl": self.ttl,
"options": self.options
}
self.assertRaises(errors.SubscriptionDoesNotExist,
self.subscription_controller.update,
non_existing_queue, s_id, project=self.project,
**body)
# delete
self.subscription_controller.delete(non_existing_queue, s_id,
project=self.project)
s_id = self.subscription_controller.get(self.queue_name, s_id,
project=self.project)
self.assertIsNotNone(s_id)
def test_nonexist_source(self):
try:
s_id = self.subscription_controller.create('fake_queue_name',