Checking barbican resource id in URI is a valid uuid

Adding uuid format checks for id used in secrets, containers, consumers
and transport keys resources.

Addressed consumer test failure related to recent change.

Change-Id: Ib8aa5e7fc9049e9e817bf02707850979c020f5af
Closes-Bug: #1603828
This commit is contained in:
Arun Kant 2016-08-10 14:26:26 -07:00
parent 2c6a9a9bb0
commit 56fff40f7d
10 changed files with 95 additions and 22 deletions

View File

@ -39,6 +39,11 @@ def _consumer_ownership_mismatch():
'can delete it.'))
def _invalid_consumer_id():
"""Throw exception indicating consumer id is invalid."""
pecan.abort(404, u._('Not Found. Provided consumer id is invalid.'))
class ContainerConsumerController(controllers.ACLMixin):
"""Handles Consumer entity retrieval and deletion requests."""
@ -85,6 +90,8 @@ class ContainerConsumersController(controllers.ACLMixin):
@pecan.expose()
def _lookup(self, consumer_id, *remainder):
if not utils.validate_id_is_uuid(consumer_id):
_invalid_consumer_id()()
return ContainerConsumerController(consumer_id), remainder
@pecan.expose(generic=True)

View File

@ -37,6 +37,11 @@ def container_not_found():
'another castle.'))
def invalid_container_id():
"""Throw exception indicating container id is invalid."""
pecan.abort(404, u._('Not Found. Provided container id is invalid.'))
class ContainerController(controllers.ACLMixin):
"""Handles Container entity retrieval and deletion requests."""
@ -117,6 +122,8 @@ class ContainersController(controllers.ACLMixin):
@pecan.expose()
def _lookup(self, container_id, *remainder):
if not utils.validate_id_is_uuid(container_id):
invalid_container_id()
container = self.container_repo.get_container_by_id(
entity_id=container_id, suppress_exception=True)
if not container:

View File

@ -40,6 +40,11 @@ def _secret_not_found():
'another castle.'))
def _invalid_secret_id():
"""Throw exception indicating secret id is invalid."""
pecan.abort(404, u._('Not Found. Provided secret id is invalid.'))
def _secret_payload_not_found():
"""Throw exception indicating secret's payload is not found."""
pecan.abort(404, u._('Not Found. Sorry but your secret has no payload.'))
@ -319,6 +324,8 @@ class SecretsController(controllers.ACLMixin):
# check, the execution only gets here if authentication of the user was
# previously successful.
if not utils.validate_id_is_uuid(secret_id):
_invalid_secret_id()()
secret = self.secret_repo.get_secret_by_id(
entity_id=secret_id, suppress_exception=True)
if not secret:

View File

@ -33,6 +33,11 @@ def _transport_key_not_found():
pecan.abort(404, u._('Not Found. Transport Key not found.'))
def _invalid_transport_key_id():
"""Throw exception indicating transport key id is invalid."""
pecan.abort(404, u._('Not Found. Provided transport key id is invalid.'))
class TransportKeyController(controllers.ACLMixin):
"""Handles transport key retrieval requests."""
@ -83,6 +88,8 @@ class TransportKeysController(controllers.ACLMixin):
@pecan.expose()
def _lookup(self, transport_key_id, *remainder):
if not utils.validate_id_is_uuid(transport_key_id):
_invalid_transport_key_id()
return TransportKeyController(transport_key_id, self.repo), remainder
@pecan.expose(generic=True)

View File

@ -188,3 +188,19 @@ def generate_uuid():
def is_multiple_backends_enabled():
secretstore_conf = config.get_module_config('secretstore')
return secretstore_conf.secretstore.enable_multiple_secret_stores
def validate_id_is_uuid(input_id, version=4):
"""Validates provided id is uuid4 format value.
Returns true when provided id is a valid version 4 uuid otherwise
returns False.
This validation is to be used only for ids which are generated by barbican
(e.g. not for keystone project_id)
"""
try:
value = uuid.UUID(input_id, version=version)
except Exception:
return False
return str(value) == input_id

View File

@ -259,6 +259,12 @@ class WhenGettingOrDeletingContainerUsingContainerResource(
container_uuid, self.project_id)
def test_should_throw_exception_for_get_when_container_not_found(self):
resp = self.app.get(
'/containers/{0}/'.format(utils.generate_test_valid_uuid()),
expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_throw_exception_for_get_when_invalid_container_id(self):
resp = self.app.get('/containers/bad_id/', expect_errors=True)
self.assertEqual(404, resp.status_int)

View File

@ -372,8 +372,18 @@ class WhenGettingPuttingOrDeletingSecret(utils.BarbicanAPIBaseTestCase):
self.assertEqual(decoded, get_resp.body)
def test_returns_404_on_get_when_not_found(self):
"""Test with valid uuid which is not present in DB."""
get_resp = self.app.get(
'/secrets/98c876d9-aaac-44e4-8ea8-441932962b05',
'/secrets/{0}'.format(utils.generate_test_valid_uuid()),
headers={'Accept': 'application/json'},
expect_errors=True
)
self.assertEqual(404, get_resp.status_int)
def test_returns_404_on_get_invalid_secret_id(self):
"""Test where uuid provided is not valid."""
get_resp = self.app.get(
'/secrets/98c876d9-aaac-44e4-8ea8-invalid-id',
headers={'Accept': 'application/json'},
expect_errors=True
)

View File

@ -227,7 +227,7 @@ class BaseSecretsResource(FunctionalTest):
# Set up mocked secret
self.secret = models.Secret()
self.secret.id = utils.generate_test_uuid(tail_value=1)
self.secret.id = utils.generate_test_valid_uuid()
# Set up mocked secret repo
self.secret_repo = mock.MagicMock()
@ -289,7 +289,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.external_project_id = 'keystone1234'
self.name = 'name1234'
secret_id = utils.generate_test_uuid(tail_value=1)
secret_id = utils.generate_test_valid_uuid()
datum_id = "iddatum1"
kek_id = "idkek1"
@ -780,7 +780,7 @@ class WhenCreatingConsumersUsingConsumersResource(FunctionalTest):
# Set up mocked container
self.container = create_container(
id_ref='id1',
id_ref=utils.generate_test_valid_uuid(),
project_id=self.project_internal_id,
external_project_id=self.external_project_id)
@ -866,17 +866,17 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
# Set up mocked container
self.container = create_container(
id_ref='id1',
id_ref=utils.generate_test_valid_uuid(),
project_id=self.project_internal_id,
external_project_id=self.external_project_id)
# Set up mocked consumers
self.consumer = create_consumer(self.container.id,
self.project_internal_id,
id_ref='id2')
self.consumer2 = create_consumer(self.container.id,
self.project_internal_id,
id_ref='id3')
self.consumer = create_consumer(
self.container.id, self.project_internal_id,
id_ref=utils.generate_test_valid_uuid())
self.consumer2 = create_consumer(
self.container.id, self.project_internal_id,
id_ref=utils.generate_test_valid_uuid())
self.consumer_ref = {
'name': self.consumer.name,
@ -1073,17 +1073,17 @@ class WhenPerformingUnallowedOperationsOnConsumers(FunctionalTest):
# Set up mocked container
self.container = create_container(
id_ref='id1',
id_ref=utils.generate_test_valid_uuid(),
project_id=self.project_internal_id,
external_project_id=self.external_project_id)
# Set up mocked container consumers
self.consumer = create_consumer(self.container.id,
self.project_internal_id,
id_ref='id2')
self.consumer2 = create_consumer(self.container.id,
self.project_internal_id,
id_ref='id3')
self.consumer = create_consumer(
self.container.id, self.project_internal_id,
id_ref=utils.generate_test_valid_uuid())
self.consumer2 = create_consumer(
self.container.id, self.project_internal_id,
id_ref=utils.generate_test_valid_uuid())
self.consumer_ref = {
'name': self.consumer.name,
@ -1178,7 +1178,7 @@ class WhenOwnershipMismatch(FunctionalTest):
# Set up mocked container
self.container = create_container(
id_ref='id1',
id_ref=utils.generate_test_valid_uuid(),
project_id=self.project_internal_id,
external_project_id='differentProjectId')

View File

@ -290,7 +290,7 @@ class WhenGettingOrDeletingTransKeyUsingTransportKeyResource(FunctionalTest):
def _init(self):
self.external_project_id = 'keystoneid1234'
self.transport_key = SAMPLE_TRANSPORT_KEY
self.tkey_id = "id1"
self.tkey_id = utils.generate_test_valid_uuid()
self.tkey = create_transport_key(
id_ref=self.tkey_id,
@ -308,7 +308,15 @@ class WhenGettingOrDeletingTransKeyUsingTransportKeyResource(FunctionalTest):
def test_should_throw_exception_for_get_when_trans_key_not_found(self):
self.repo.get.return_value = None
resp = self.app.get(
'/transport_keys/{0}/'.format(self.tkey.id),
'/transport_keys/{0}/'.format(utils.generate_test_valid_uuid()),
expect_errors=True
)
self.assertEqual(404, resp.status_int)
def test_should_throw_exception_for_get_when_trans_key_invalid(self):
resp = self.app.get(
'/transport_keys/{0}/'.format("invalid_key_id"),
expect_errors=True
)
self.assertEqual(404, resp.status_int)

View File

@ -78,7 +78,7 @@ class BarbicanAPIBaseTestCase(oslotest.BaseTestCase):
database_utils.setup_in_memory_db()
# Generic project id to perform actions under
self.project_id = str(uuid.uuid4())
self.project_id = generate_test_valid_uuid()
# Build the test app
wsgi_app = app.build_wsgi_app(
@ -640,6 +640,11 @@ def generate_test_uuid(tail_value=0):
pad=12)
def generate_test_valid_uuid():
"""Returns a valid uuid value, similar to uuid generated in barbican"""
return str(uuid.uuid4())
def get_symmetric_key():
s = b"MIICdgIBADANBgkqhkiG9w=="
return s