Merge "Add Secret Consumer Controllers and their tests"
This commit is contained in:
commit
a699da6408
@ -44,7 +44,7 @@ def _invalid_consumer_id():
|
||||
|
||||
|
||||
class ContainerConsumerController(controllers.ACLMixin):
|
||||
"""Handles Consumer entity retrieval and deletion requests."""
|
||||
"""Handles Container Consumer entity retrieval and deletion requests"""
|
||||
|
||||
def __init__(self, consumer_id):
|
||||
self.consumer_id = consumer_id
|
||||
@ -76,7 +76,7 @@ class ContainerConsumerController(controllers.ACLMixin):
|
||||
|
||||
|
||||
class ContainerConsumersController(controllers.ACLMixin):
|
||||
"""Handles Consumer creation requests."""
|
||||
"""Handles Container Consumer creation requests"""
|
||||
|
||||
def __init__(self, container_id):
|
||||
self.container_id = container_id
|
||||
@ -131,7 +131,7 @@ class ContainerConsumersController(controllers.ACLMixin):
|
||||
)
|
||||
resp_ctrs_overall.update({'total': total})
|
||||
|
||||
LOG.info('Retrieved a consumer list for project: %s',
|
||||
LOG.info('Retrieved a container consumer list for project: %s',
|
||||
external_project_id)
|
||||
return resp_ctrs_overall
|
||||
|
||||
@ -157,7 +157,7 @@ class ContainerConsumersController(controllers.ACLMixin):
|
||||
url = hrefs.convert_consumer_to_href(new_consumer.container_id)
|
||||
pecan.response.headers['Location'] = url
|
||||
|
||||
LOG.info('Created a consumer for project: %s',
|
||||
LOG.info('Created a container consumer for project: %s',
|
||||
external_project_id)
|
||||
|
||||
return self._return_container_data(self.container_id)
|
||||
@ -182,7 +182,7 @@ class ContainerConsumersController(controllers.ACLMixin):
|
||||
)
|
||||
if not consumer:
|
||||
_consumer_not_found()
|
||||
LOG.debug("Found consumer: %s", consumer)
|
||||
LOG.debug("Found container consumer: %s", consumer)
|
||||
|
||||
container = self._get_container(self.container_id)
|
||||
owner_of_consumer = consumer.project_id == project.id
|
||||
@ -195,11 +195,11 @@ class ContainerConsumersController(controllers.ACLMixin):
|
||||
self.consumer_repo.delete_entity_by_id(consumer.id,
|
||||
external_project_id)
|
||||
except exception.NotFound:
|
||||
LOG.exception('Problem deleting consumer')
|
||||
LOG.exception('Problem deleting container consumer')
|
||||
_consumer_not_found()
|
||||
|
||||
ret_data = self._return_container_data(self.container_id)
|
||||
LOG.info('Deleted a consumer for project: %s',
|
||||
LOG.info('Deleted a container consumer for project: %s',
|
||||
external_project_id)
|
||||
return ret_data
|
||||
|
||||
@ -222,3 +222,183 @@ class ContainerConsumersController(controllers.ACLMixin):
|
||||
return hrefs.convert_to_hrefs(
|
||||
hrefs.convert_to_hrefs(dict_fields)
|
||||
)
|
||||
|
||||
|
||||
class SecretConsumerController(controllers.ACLMixin):
|
||||
"""Handles Secret Consumer entity retrieval and deletion requests"""
|
||||
|
||||
def __init__(self, consumer_id):
|
||||
self.consumer_id = consumer_id
|
||||
self.consumer_repo = repo.get_secret_consumer_repository()
|
||||
self.validator = validators.SecretConsumerValidator()
|
||||
|
||||
@pecan.expose(generic=True)
|
||||
def index(self):
|
||||
pecan.abort(405) # HTTP 405 Method Not Allowed as default
|
||||
|
||||
@index.when(method='GET', template='json')
|
||||
@controllers.handle_exceptions(u._('SecretConsumer retrieval'))
|
||||
@controllers.enforce_rbac('consumer:get')
|
||||
def on_get(self, external_project_id):
|
||||
consumer = self.consumer_repo.get(
|
||||
entity_id=self.consumer_id,
|
||||
suppress_exception=True)
|
||||
if not consumer:
|
||||
_consumer_not_found()
|
||||
|
||||
dict_fields = consumer.to_dict_fields()
|
||||
|
||||
LOG.info('Retrieved a secret consumer for project: %s',
|
||||
external_project_id)
|
||||
|
||||
return hrefs.convert_to_hrefs(
|
||||
hrefs.convert_to_hrefs(dict_fields)
|
||||
)
|
||||
|
||||
|
||||
class SecretConsumersController(controllers.ACLMixin):
|
||||
"""Handles Secret Consumer creation requests"""
|
||||
|
||||
def __init__(self, secret_id):
|
||||
self.secret_id = secret_id
|
||||
self.consumer_repo = repo.get_secret_consumer_repository()
|
||||
self.secret_repo = repo.get_secret_repository()
|
||||
self.project_repo = repo.get_project_repository()
|
||||
self.validator = validators.SecretConsumerValidator()
|
||||
self.quota_enforcer = quota.QuotaEnforcer('consumers',
|
||||
self.consumer_repo)
|
||||
|
||||
@pecan.expose()
|
||||
def _lookup(self, consumer_id, *remainder):
|
||||
if not utils.validate_id_is_uuid(consumer_id):
|
||||
_invalid_consumer_id()()
|
||||
return SecretConsumerController(consumer_id), remainder
|
||||
|
||||
@pecan.expose(generic=True)
|
||||
def index(self, **kwargs):
|
||||
pecan.abort(405) # HTTP 405 Method Not Allowed as default
|
||||
|
||||
@index.when(method='GET', template='json')
|
||||
@controllers.handle_exceptions(u._('SecretConsumers(s) retrieval'))
|
||||
@controllers.enforce_rbac('consumers:get')
|
||||
def on_get(self, external_project_id, **kw):
|
||||
LOG.debug('Start consumers on_get '
|
||||
'for secret-ID %s:', self.secret_id)
|
||||
result = self.consumer_repo.get_by_secret_id(
|
||||
self.secret_id,
|
||||
offset_arg=kw.get('offset', 0),
|
||||
limit_arg=kw.get('limit'),
|
||||
suppress_exception=True
|
||||
)
|
||||
|
||||
consumers, offset, limit, total = result
|
||||
|
||||
if not consumers:
|
||||
resp_ctrs_overall = {'consumers': [], 'total': total}
|
||||
else:
|
||||
resp_ctrs = [
|
||||
hrefs.convert_to_hrefs(c.to_dict_fields())
|
||||
for c in consumers
|
||||
]
|
||||
consumer_path = "secrets/{secret_id}/consumers".format(
|
||||
secret_id=self.secret_id)
|
||||
|
||||
resp_ctrs_overall = hrefs.add_nav_hrefs(
|
||||
consumer_path,
|
||||
offset,
|
||||
limit,
|
||||
total,
|
||||
{'consumers': resp_ctrs}
|
||||
)
|
||||
resp_ctrs_overall.update({'total': total})
|
||||
|
||||
LOG.info('Retrieved a consumer list for project: %s',
|
||||
external_project_id)
|
||||
return resp_ctrs_overall
|
||||
|
||||
@index.when(method='POST', template='json')
|
||||
@controllers.handle_exceptions(u._('SecretConsumer creation'))
|
||||
@controllers.enforce_rbac('consumers:post')
|
||||
@controllers.enforce_content_types(['application/json'])
|
||||
def on_post(self, external_project_id, **kwargs):
|
||||
|
||||
project = res.get_or_create_project(external_project_id)
|
||||
data = api.load_body(pecan.request, validator=self.validator)
|
||||
LOG.debug('Start on_post...%s', data)
|
||||
|
||||
secret = self._get_secret(self.secret_id)
|
||||
|
||||
self.quota_enforcer.enforce(project)
|
||||
|
||||
new_consumer = models.SecretConsumerMetadatum(
|
||||
self.secret_id,
|
||||
project.id,
|
||||
data["service"],
|
||||
data["resource_type"],
|
||||
data["resource_id"],
|
||||
)
|
||||
self.consumer_repo.create_or_update_from(new_consumer, secret)
|
||||
|
||||
url = hrefs.convert_consumer_to_href(new_consumer.secret_id)
|
||||
pecan.response.headers['Location'] = url
|
||||
|
||||
LOG.info('Created a consumer for project: %s',
|
||||
external_project_id)
|
||||
|
||||
return self._return_secret_data(self.secret_id)
|
||||
|
||||
@index.when(method='DELETE', template='json')
|
||||
@controllers.handle_exceptions(u._('SecretConsumer deletion'))
|
||||
@controllers.enforce_rbac('consumers:delete')
|
||||
@controllers.enforce_content_types(['application/json'])
|
||||
def on_delete(self, external_project_id, **kwargs):
|
||||
data = api.load_body(pecan.request, validator=self.validator)
|
||||
LOG.debug('Start on_delete...%s', data)
|
||||
project = self.project_repo.find_by_external_project_id(
|
||||
external_project_id, suppress_exception=True)
|
||||
if not project:
|
||||
_consumer_not_found()
|
||||
|
||||
consumer = self.consumer_repo.get_by_values(
|
||||
self.secret_id,
|
||||
data["resource_id"],
|
||||
suppress_exception=True
|
||||
)
|
||||
if not consumer:
|
||||
_consumer_not_found()
|
||||
LOG.debug("Found consumer: %s", consumer)
|
||||
|
||||
secret = self._get_secret(self.secret_id)
|
||||
owner_of_consumer = consumer.project_id == project.id
|
||||
owner_of_secret = secret.project.external_id \
|
||||
== external_project_id
|
||||
if not owner_of_consumer and not owner_of_secret:
|
||||
_consumer_ownership_mismatch()
|
||||
|
||||
try:
|
||||
self.consumer_repo.delete_entity_by_id(consumer.id,
|
||||
external_project_id)
|
||||
except exception.NotFound:
|
||||
LOG.exception('Problem deleting consumer')
|
||||
_consumer_not_found()
|
||||
|
||||
ret_data = self._return_secret_data(self.secret_id)
|
||||
LOG.info('Deleted a consumer for project: %s',
|
||||
external_project_id)
|
||||
return ret_data
|
||||
|
||||
def _get_secret(self, secret_id):
|
||||
secret = self.secret_repo.get_secret_by_id(
|
||||
secret_id, suppress_exception=True)
|
||||
if not secret:
|
||||
controllers.secrets.secret_not_found()
|
||||
return secret
|
||||
|
||||
def _return_secret_data(self, secret_id):
|
||||
secret = self._get_secret(secret_id)
|
||||
|
||||
dict_fields = secret.to_dict_fields()
|
||||
|
||||
return hrefs.convert_to_hrefs(
|
||||
hrefs.convert_to_hrefs(dict_fields)
|
||||
)
|
||||
|
@ -17,6 +17,7 @@ from six.moves.urllib import parse
|
||||
from barbican import api
|
||||
from barbican.api import controllers
|
||||
from barbican.api.controllers import acls
|
||||
from barbican.api.controllers import consumers
|
||||
from barbican.api.controllers import secretmeta
|
||||
from barbican.common import accept
|
||||
from barbican.common import exception
|
||||
@ -76,6 +77,8 @@ class SecretController(controllers.ACLMixin):
|
||||
def __init__(self, secret):
|
||||
LOG.debug('=== Creating SecretController ===')
|
||||
self.secret = secret
|
||||
self.consumers = consumers.SecretConsumersController(secret.id)
|
||||
self.consumer_repo = repo.get_secret_consumer_repository()
|
||||
self.transport_key_repo = repo.get_transport_key_repository()
|
||||
|
||||
def get_acl_tuple(self, req, **kwargs):
|
||||
@ -253,9 +256,20 @@ class SecretController(controllers.ACLMixin):
|
||||
@controllers.handle_exceptions(u._('Secret deletion'))
|
||||
@controllers.enforce_rbac('secret:delete')
|
||||
def on_delete(self, external_project_id, **kwargs):
|
||||
secret_consumers = self.consumer_repo.get_by_secret_id(
|
||||
self.secret.id,
|
||||
suppress_exception=True
|
||||
)
|
||||
plugin.delete_secret(self.secret, external_project_id)
|
||||
LOG.info('Deleted secret for project: %s', external_project_id)
|
||||
|
||||
for consumer in secret_consumers[0]:
|
||||
try:
|
||||
self.consumer_repo.delete_entity_by_id(
|
||||
consumer.id, external_project_id)
|
||||
except exception.NotFound: # nosec
|
||||
pass
|
||||
|
||||
|
||||
class SecretsController(controllers.ACLMixin):
|
||||
"""Handles Secret creation requests."""
|
||||
|
@ -374,6 +374,13 @@ class Secret(BASE, SoftDeleteMixIn, ModelBase):
|
||||
'bit_length': self.bit_length,
|
||||
'mode': self.mode,
|
||||
'creator_id': self.creator_id,
|
||||
"consumers": [
|
||||
{
|
||||
"service": consumer.service,
|
||||
"resource_type": consumer.resource_type,
|
||||
"resource_id": consumer.resource_id,
|
||||
} for consumer in self.consumers if not consumer.deleted
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
|
@ -17,10 +17,10 @@ import os
|
||||
from barbican.tests import utils
|
||||
|
||||
|
||||
class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
class WhenTestingContainerConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(WhenTestingConsumersResource, self).setUp()
|
||||
super(WhenTestingContainerConsumersResource, self).setUp()
|
||||
|
||||
self.container_name = "Im_a_container"
|
||||
self.container_type = "generic"
|
||||
@ -48,7 +48,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumer = create_consumer(
|
||||
consumer_resp, consumer = create_container_consumer(
|
||||
self.app,
|
||||
container_id=container_uuid,
|
||||
name=self.consumer_a["name"],
|
||||
@ -66,7 +66,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_consumer(
|
||||
consumer_resp, consumers = create_container_consumer(
|
||||
self.app,
|
||||
container_id=container_uuid,
|
||||
name=self.consumer_a["name"],
|
||||
@ -74,7 +74,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
)
|
||||
self.assertEqual(200, consumer_resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_consumer(
|
||||
consumer_resp, consumers = create_container_consumer(
|
||||
self.app,
|
||||
container_id=container_uuid,
|
||||
name=self.consumer_b["name"],
|
||||
@ -82,7 +82,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
)
|
||||
self.assertEqual(200, consumer_resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_consumer(
|
||||
consumer_resp, consumers = create_container_consumer(
|
||||
self.app,
|
||||
container_id=container_uuid,
|
||||
name=self.consumer_c["name"],
|
||||
@ -116,7 +116,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_consumer(
|
||||
consumer_resp, consumers = create_container_consumer(
|
||||
self.app,
|
||||
container_id=container_uuid,
|
||||
name=self.consumer_a["name"],
|
||||
@ -124,7 +124,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
)
|
||||
self.assertEqual(200, consumer_resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_consumer(
|
||||
consumer_resp, consumers = create_container_consumer(
|
||||
self.app,
|
||||
container_id=container_uuid,
|
||||
name=self.consumer_b["name"],
|
||||
@ -132,7 +132,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
)
|
||||
self.assertEqual(200, consumer_resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_consumer(
|
||||
consumer_resp, consumers = create_container_consumer(
|
||||
self.app,
|
||||
container_id=container_uuid,
|
||||
name=self.consumer_c["name"],
|
||||
@ -170,7 +170,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_consumer(
|
||||
consumer_resp, consumers = create_container_consumer(
|
||||
self.app,
|
||||
container_id=container_uuid,
|
||||
name=self.consumer_a["name"],
|
||||
@ -208,7 +208,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
self.assertEqual([], consumer_get_resp.json["consumers"])
|
||||
|
||||
def test_fail_create_container_not_found(self):
|
||||
consumer_resp, consumers = create_consumer(
|
||||
consumer_resp, consumers = create_container_consumer(
|
||||
self.app,
|
||||
container_id="bad_container_id",
|
||||
name=self.consumer_a["name"],
|
||||
@ -271,7 +271,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumer = create_consumer(
|
||||
consumer_resp, consumer = create_container_consumer(
|
||||
self.app,
|
||||
container_id=container_uuid,
|
||||
url="http://theurl",
|
||||
@ -287,7 +287,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumer = create_consumer(
|
||||
consumer_resp, consumer = create_container_consumer(
|
||||
self.app,
|
||||
container_id=container_uuid,
|
||||
name="thename",
|
||||
@ -303,7 +303,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumer = create_consumer(
|
||||
consumer_resp, consumer = create_container_consumer(
|
||||
self.app,
|
||||
container_id=container_uuid,
|
||||
name="",
|
||||
@ -320,7 +320,7 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumer = create_consumer(
|
||||
consumer_resp, consumer = create_container_consumer(
|
||||
self.app,
|
||||
container_id=container_uuid,
|
||||
name="thename",
|
||||
@ -330,6 +330,336 @@ class WhenTestingConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
self.assertEqual(400, consumer_resp.status_int)
|
||||
|
||||
|
||||
class WhenTestingSecretConsumersResource(utils.BarbicanAPIBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(WhenTestingSecretConsumersResource, self).setUp()
|
||||
|
||||
self.consumer_a = {
|
||||
"service": "service_a",
|
||||
"resource_type": "resource_type_a",
|
||||
"resource_id": "resource_id_a",
|
||||
}
|
||||
|
||||
self.consumer_b = {
|
||||
"service": "service_b",
|
||||
"resource_type": "resource_type_b",
|
||||
"resource_id": "resource_id_b",
|
||||
}
|
||||
|
||||
self.consumer_c = {
|
||||
"service": "service_c",
|
||||
"resource_type": "resource_type_c",
|
||||
"resource_id": "resource_id_c",
|
||||
}
|
||||
|
||||
def test_can_create_new_consumer(self):
|
||||
resp, secret_id = create_secret(self.app)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumer = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service=self.consumer_a["service"],
|
||||
resource_type=self.consumer_a["resource_type"],
|
||||
resource_id=self.consumer_a["resource_id"],
|
||||
)
|
||||
|
||||
self.assertEqual(200, consumer_resp.status_int)
|
||||
self.assertEqual([self.consumer_a], consumer)
|
||||
|
||||
def test_can_get_consumers(self):
|
||||
resp, secret_id = create_secret(self.app)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service=self.consumer_a["service"],
|
||||
resource_type=self.consumer_a["resource_type"],
|
||||
resource_id=self.consumer_a["resource_id"],
|
||||
)
|
||||
self.assertEqual(200, consumer_resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service=self.consumer_b["service"],
|
||||
resource_type=self.consumer_b["resource_type"],
|
||||
resource_id=self.consumer_b["resource_id"],
|
||||
)
|
||||
self.assertEqual(200, consumer_resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service=self.consumer_c["service"],
|
||||
resource_type=self.consumer_c["resource_type"],
|
||||
resource_id=self.consumer_c["resource_id"],
|
||||
)
|
||||
self.assertEqual(200, consumer_resp.status_int)
|
||||
|
||||
consumer_get_resp = self.app.get(
|
||||
'/secrets/{secret_id}/consumers/'.format(
|
||||
secret_id=secret_id))
|
||||
|
||||
self.assertEqual(200, consumer_get_resp.status_int)
|
||||
self.assertIn(consumers[0]["service"],
|
||||
consumer_get_resp.json["consumers"][0]["service"])
|
||||
self.assertIn(consumers[0]["resource_type"],
|
||||
consumer_get_resp.json["consumers"][0]["resource_type"])
|
||||
self.assertIn(consumers[0]["resource_id"],
|
||||
consumer_get_resp.json["consumers"][0]["resource_id"])
|
||||
self.assertIn(consumers[1]["service"],
|
||||
consumer_get_resp.json["consumers"][1]["service"])
|
||||
self.assertIn(consumers[1]["resource_type"],
|
||||
consumer_get_resp.json["consumers"][1]["resource_type"])
|
||||
self.assertIn(consumers[1]["resource_id"],
|
||||
consumer_get_resp.json["consumers"][1]["resource_id"])
|
||||
self.assertIn(consumers[2]["service"],
|
||||
consumer_get_resp.json["consumers"][2]["service"])
|
||||
self.assertIn(consumers[2]["resource_type"],
|
||||
consumer_get_resp.json["consumers"][2]["resource_type"])
|
||||
self.assertIn(consumers[2]["resource_id"],
|
||||
consumer_get_resp.json["consumers"][2]["resource_id"])
|
||||
|
||||
def test_can_get_consumers_with_limit_and_offset(self):
|
||||
resp, secret_id = create_secret(self.app)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service=self.consumer_a["service"],
|
||||
resource_type=self.consumer_a["resource_type"],
|
||||
resource_id=self.consumer_a["resource_id"],
|
||||
)
|
||||
self.assertEqual(200, consumer_resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service=self.consumer_b["service"],
|
||||
resource_type=self.consumer_b["resource_type"],
|
||||
resource_id=self.consumer_b["resource_id"],
|
||||
)
|
||||
self.assertEqual(200, consumer_resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service=self.consumer_c["service"],
|
||||
resource_type=self.consumer_c["resource_type"],
|
||||
resource_id=self.consumer_c["resource_id"],
|
||||
)
|
||||
self.assertEqual(200, consumer_resp.status_int)
|
||||
|
||||
consumer_get_resp = self.app.get(
|
||||
'/secrets/{secret_id}/consumers/?limit=1&offset=1'.format(
|
||||
secret_id=secret_id))
|
||||
self.assertEqual(200, consumer_get_resp.status_int)
|
||||
|
||||
secret_url = resp.json["secret_ref"]
|
||||
|
||||
prev_cons = u"{secret_url}/consumers?limit=1&offset=0".format(
|
||||
secret_url=secret_url)
|
||||
self.assertEqual(prev_cons, consumer_get_resp.json["previous"])
|
||||
|
||||
next_cons = u"{secret_url}/consumers?limit=1&offset=2".format(
|
||||
secret_url=secret_url)
|
||||
self.assertEqual(next_cons, consumer_get_resp.json["next"])
|
||||
|
||||
self.assertEqual(
|
||||
self.consumer_b["service"],
|
||||
consumer_get_resp.json["consumers"][0]["service"]
|
||||
)
|
||||
self.assertEqual(
|
||||
self.consumer_b["resource_type"],
|
||||
consumer_get_resp.json["consumers"][0]["resource_type"]
|
||||
)
|
||||
self.assertEqual(
|
||||
self.consumer_b["resource_id"],
|
||||
consumer_get_resp.json["consumers"][0]["resource_id"]
|
||||
)
|
||||
|
||||
self.assertEqual(3, consumer_get_resp.json["total"])
|
||||
|
||||
def test_can_delete_consumer(self):
|
||||
resp, secret_id = create_secret(self.app)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumers = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service=self.consumer_a["service"],
|
||||
resource_type=self.consumer_a["resource_type"],
|
||||
resource_id=self.consumer_a["resource_id"],
|
||||
)
|
||||
self.assertEqual(200, consumer_resp.status_int)
|
||||
|
||||
request = {
|
||||
"service": self.consumer_a["service"],
|
||||
"resource_type": self.consumer_a["resource_type"],
|
||||
"resource_id": self.consumer_a["resource_id"],
|
||||
}
|
||||
cleaned_request = {key: val for key, val in request.items()
|
||||
if val is not None}
|
||||
|
||||
consumer_del_resp = self.app.delete_json(
|
||||
'/secrets/{secret_id}/consumers/'.format(
|
||||
secret_id=secret_id
|
||||
), cleaned_request, headers={'Content-Type': 'application/json'})
|
||||
|
||||
self.assertEqual(200, consumer_del_resp.status_int)
|
||||
|
||||
def test_can_get_no_consumers(self):
|
||||
resp, secret_id = create_secret(self.app)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_get_resp = self.app.get(
|
||||
'/secrets/{secret_id}/consumers/'.format(
|
||||
secret_id=secret_id))
|
||||
|
||||
self.assertEqual(200, consumer_get_resp.status_int)
|
||||
self.assertEqual([], consumer_get_resp.json["consumers"])
|
||||
|
||||
def test_fail_create_secret_not_found(self):
|
||||
consumer_resp, consumers = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id="bad_secret_id",
|
||||
service=self.consumer_a["service"],
|
||||
resource_type=self.consumer_a["resource_type"],
|
||||
resource_id=self.consumer_a["resource_id"],
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(404, consumer_resp.status_int)
|
||||
|
||||
def test_fail_get_secret_not_found(self):
|
||||
consumer_get_resp = self.app.get(
|
||||
'/secrets/{secret_id}/consumers/'.format(
|
||||
secret_id="bad_secret_id"), expect_errors=True)
|
||||
|
||||
self.assertEqual(404, consumer_get_resp.status_int)
|
||||
|
||||
def test_fail_delete_secret_not_found(self):
|
||||
request = {
|
||||
"service": self.consumer_a["service"],
|
||||
"resource_type": self.consumer_a["resource_type"],
|
||||
"resource_id": self.consumer_a["resource_id"],
|
||||
}
|
||||
cleaned_request = {key: val for key, val in request.items()
|
||||
if val is not None}
|
||||
|
||||
consumer_del_resp = self.app.delete_json(
|
||||
'/secrets/{secret_id}/consumers/'.format(
|
||||
secret_id="bad_secret_id"
|
||||
), cleaned_request, headers={'Content-Type': 'application/json'},
|
||||
expect_errors=True)
|
||||
|
||||
self.assertEqual(404, consumer_del_resp.status_int)
|
||||
|
||||
def test_fail_delete_consumer_not_found(self):
|
||||
resp, secret_id = create_secret(self.app)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
request = {
|
||||
"service": self.consumer_a["service"],
|
||||
"resource_type": self.consumer_a["resource_type"],
|
||||
"resource_id": self.consumer_a["resource_id"],
|
||||
}
|
||||
cleaned_request = {key: val for key, val in request.items()
|
||||
if val is not None}
|
||||
|
||||
consumer_del_resp = self.app.delete_json(
|
||||
'/secrets/{secret_id}/consumers/'.format(
|
||||
secret_id=secret_id
|
||||
), cleaned_request, headers={'Content-Type': 'application/json'},
|
||||
expect_errors=True)
|
||||
|
||||
self.assertEqual(404, consumer_del_resp.status_int)
|
||||
|
||||
def test_fail_create_no_service(self):
|
||||
resp, secret_id = create_secret(self.app)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumer = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
resource_type="resource_type",
|
||||
resource_id="resource_id",
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(400, consumer_resp.status_int)
|
||||
|
||||
def test_fail_create_no_resource_type(self):
|
||||
resp, secret_id = create_secret(self.app)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumer = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service="service",
|
||||
resource_id="resource_id",
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(400, consumer_resp.status_int)
|
||||
|
||||
def test_fail_create_no_resource_id(self):
|
||||
resp, secret_id = create_secret(self.app)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumer = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service="service",
|
||||
resource_type="resource_type",
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(400, consumer_resp.status_int)
|
||||
|
||||
def test_fail_create_empty_service(self):
|
||||
resp, secret_id = create_secret(self.app)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumer = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service="",
|
||||
resource_type="resource_type",
|
||||
resource_id="resource_id",
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(400, consumer_resp.status_int)
|
||||
|
||||
def test_fail_create_empty_resource_type(self):
|
||||
resp, secret_id = create_secret(self.app)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumer = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service="service",
|
||||
resource_type="",
|
||||
resource_id="resource_id",
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(400, consumer_resp.status_int)
|
||||
|
||||
def test_fail_create_empty_resource_id(self):
|
||||
resp, secret_id = create_secret(self.app)
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
consumer_resp, consumer = create_secret_consumer(
|
||||
self.app,
|
||||
secret_id=secret_id,
|
||||
service="service",
|
||||
resource_type="resource_type",
|
||||
resource_id="",
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(400, consumer_resp.status_int)
|
||||
|
||||
|
||||
# ----------------------- Helper Functions ---------------------------
|
||||
def create_container(app, name=None, container_type=None, expect_errors=False,
|
||||
headers=None):
|
||||
@ -355,7 +685,7 @@ def create_container(app, name=None, container_type=None, expect_errors=False,
|
||||
return resp, created_uuid
|
||||
|
||||
|
||||
def create_consumer(app, container_id=None, name=None, url=None,
|
||||
def create_container_consumer(app, container_id=None, name=None, url=None,
|
||||
expect_errors=False, headers=None):
|
||||
request = {
|
||||
'name': name,
|
||||
@ -377,3 +707,38 @@ def create_consumer(app, container_id=None, name=None, url=None,
|
||||
consumers = resp.json.get('consumers', '')
|
||||
|
||||
return resp, consumers
|
||||
|
||||
|
||||
def create_secret(app, expect_errors=False):
|
||||
resp = app.post_json('/secrets/', {}, expect_errors=expect_errors)
|
||||
|
||||
secret_id = None
|
||||
if resp.status_int == 201:
|
||||
secret_ref = resp.json.get('secret_ref', '')
|
||||
_, secret_id = os.path.split(secret_ref)
|
||||
|
||||
return resp, secret_id
|
||||
|
||||
|
||||
def create_secret_consumer(app, secret_id=None, service=None,
|
||||
resource_type=None, resource_id=None,
|
||||
expect_errors=False, headers=None):
|
||||
request = {
|
||||
"service": service,
|
||||
"resource_type": resource_type,
|
||||
"resource_id": resource_id,
|
||||
}
|
||||
request = {k: v for k, v in request.items() if v is not None}
|
||||
|
||||
resp = app.post_json(
|
||||
"/secrets/{}/consumers/".format(secret_id),
|
||||
request,
|
||||
expect_errors=expect_errors,
|
||||
headers=headers
|
||||
)
|
||||
|
||||
consumers = None
|
||||
if resp.status_int == 200:
|
||||
consumers = resp.json.get('consumers', '')
|
||||
|
||||
return resp, consumers
|
||||
|
@ -52,14 +52,17 @@ def get_barbican_env(external_project_id):
|
||||
|
||||
|
||||
def create_secret(id_ref="id", name="name",
|
||||
algorithm=None, bit_length=None,
|
||||
mode=None, encrypted_datum=None, content_type=None):
|
||||
algorithm=None, bit_length=None, mode=None,
|
||||
encrypted_datum=None, content_type=None, project_id=None):
|
||||
"""Generate a Secret entity instance."""
|
||||
info = {'id': id_ref,
|
||||
info = {
|
||||
'id': id_ref,
|
||||
'name': name,
|
||||
'algorithm': algorithm,
|
||||
'bit_length': bit_length,
|
||||
'mode': mode}
|
||||
'mode': mode,
|
||||
'project_id': project_id,
|
||||
}
|
||||
secret = models.Secret(info)
|
||||
secret.id = id_ref
|
||||
if encrypted_datum:
|
||||
@ -109,7 +112,7 @@ def create_container(id_ref, project_id=None, external_project_id=None):
|
||||
return container
|
||||
|
||||
|
||||
def create_consumer(container_id, project_id, id_ref):
|
||||
def create_container_consumer(container_id, project_id, id_ref):
|
||||
"""Generate a ContainerConsumerMetadatum entity instance."""
|
||||
data = {
|
||||
'name': 'test name',
|
||||
@ -122,6 +125,19 @@ def create_consumer(container_id, project_id, id_ref):
|
||||
return consumer
|
||||
|
||||
|
||||
def create_secret_consumer(secret_id, project_id, id_ref):
|
||||
"""Generate a SecretConsumerMetadatum entity instance."""
|
||||
consumer = models.SecretConsumerMetadatum(
|
||||
secret_id,
|
||||
project_id,
|
||||
"service",
|
||||
"resource_type",
|
||||
"resource_id",
|
||||
)
|
||||
consumer.id = id_ref
|
||||
return consumer
|
||||
|
||||
|
||||
class SecretAllowAllMimeTypesDecoratorTest(utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -719,10 +735,10 @@ class TestingJsonSanitization(utils.BaseTestCase):
|
||||
.endswith(' '), "whitespace should be gone")
|
||||
|
||||
|
||||
class WhenCreatingConsumersUsingConsumersResource(FunctionalTest):
|
||||
class WhenCreatingContainerConsumersUsingResource(FunctionalTest):
|
||||
def setUp(self):
|
||||
super(
|
||||
WhenCreatingConsumersUsingConsumersResource, self
|
||||
WhenCreatingContainerConsumersUsingResource, self
|
||||
).setUp()
|
||||
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
|
||||
self.app.extra_environ = get_barbican_env(self.external_project_id)
|
||||
@ -832,11 +848,11 @@ class WhenCreatingConsumersUsingConsumersResource(FunctionalTest):
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
|
||||
class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
|
||||
class WhenGettingOrDeletingContainerConsumersUsingResource(FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(
|
||||
WhenGettingOrDeletingConsumersUsingConsumerResource, self
|
||||
WhenGettingOrDeletingContainerConsumersUsingResource, self
|
||||
).setUp()
|
||||
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
|
||||
self.app.extra_environ = get_barbican_env(self.external_project_id)
|
||||
@ -871,10 +887,10 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
|
||||
external_project_id=self.external_project_id)
|
||||
|
||||
# Set up mocked consumers
|
||||
self.consumer = create_consumer(
|
||||
self.consumer = create_container_consumer(
|
||||
self.container.id, self.project_internal_id,
|
||||
id_ref=utils.generate_test_valid_uuid())
|
||||
self.consumer2 = create_consumer(
|
||||
self.consumer2 = create_container_consumer(
|
||||
self.container.id, self.project_internal_id,
|
||||
id_ref=utils.generate_test_valid_uuid())
|
||||
|
||||
@ -1019,10 +1035,10 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
|
||||
)
|
||||
|
||||
|
||||
class WhenPerformingUnallowedOperationsOnConsumers(FunctionalTest):
|
||||
class WhenPerformingUnallowedOperationsOnContainerConsumers(FunctionalTest):
|
||||
def setUp(self):
|
||||
super(
|
||||
WhenPerformingUnallowedOperationsOnConsumers, self
|
||||
WhenPerformingUnallowedOperationsOnContainerConsumers, self
|
||||
).setUp()
|
||||
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
|
||||
self.app.extra_environ = get_barbican_env(self.external_project_id)
|
||||
@ -1078,10 +1094,10 @@ class WhenPerformingUnallowedOperationsOnConsumers(FunctionalTest):
|
||||
external_project_id=self.external_project_id)
|
||||
|
||||
# Set up mocked container consumers
|
||||
self.consumer = create_consumer(
|
||||
self.consumer = create_container_consumer(
|
||||
self.container.id, self.project_internal_id,
|
||||
id_ref=utils.generate_test_valid_uuid())
|
||||
self.consumer2 = create_consumer(
|
||||
self.consumer2 = create_container_consumer(
|
||||
self.container.id, self.project_internal_id,
|
||||
id_ref=utils.generate_test_valid_uuid())
|
||||
|
||||
@ -1145,11 +1161,11 @@ class WhenPerformingUnallowedOperationsOnConsumers(FunctionalTest):
|
||||
self.assertEqual(405, resp.status_int)
|
||||
|
||||
|
||||
class WhenOwnershipMismatch(FunctionalTest):
|
||||
class WhenOwnershipMismatchForContainerConsumer(FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(
|
||||
WhenOwnershipMismatch, self
|
||||
WhenOwnershipMismatchForContainerConsumer, self
|
||||
).setUp()
|
||||
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
|
||||
self.app.extra_environ = get_barbican_env(self.external_project_id)
|
||||
@ -1183,10 +1199,10 @@ class WhenOwnershipMismatch(FunctionalTest):
|
||||
external_project_id='differentProjectId')
|
||||
|
||||
# Set up mocked consumers
|
||||
self.consumer = create_consumer(self.container.id,
|
||||
self.consumer = create_container_consumer(self.container.id,
|
||||
self.project_internal_id,
|
||||
id_ref='id2')
|
||||
self.consumer2 = create_consumer(self.container.id,
|
||||
self.consumer2 = create_container_consumer(self.container.id,
|
||||
self.project_internal_id,
|
||||
id_ref='id3')
|
||||
|
||||
@ -1215,3 +1231,462 @@ class WhenOwnershipMismatch(FunctionalTest):
|
||||
'/containers/{0}/consumers/'.format(self.container.id),
|
||||
self.consumer_ref, expect_errors=True)
|
||||
self.assertEqual(403, resp.status_int)
|
||||
|
||||
|
||||
class WhenCreatingSecretConsumersUsingResource(FunctionalTest):
|
||||
def setUp(self):
|
||||
super(
|
||||
WhenCreatingSecretConsumersUsingResource, self
|
||||
).setUp()
|
||||
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
|
||||
self.app.extra_environ = get_barbican_env(self.external_project_id)
|
||||
|
||||
@property
|
||||
def root(self):
|
||||
self._init()
|
||||
|
||||
class RootController(object):
|
||||
secrets = controllers.secrets.SecretsController()
|
||||
|
||||
return RootController()
|
||||
|
||||
def _init(self):
|
||||
self.external_project_id = 'keystoneid1234'
|
||||
self.project_internal_id = 'projectid1234'
|
||||
|
||||
# Set up mocked project
|
||||
self.project = models.Project()
|
||||
self.project.id = self.project_internal_id
|
||||
self.project.external_id = self.external_project_id
|
||||
|
||||
# Set up mocked secret
|
||||
self.secret = models.Secret()
|
||||
self.secret.id = utils.generate_test_valid_uuid()
|
||||
self.secret.project = self.project
|
||||
self.secret.project_id = self.project_internal_id
|
||||
|
||||
# Set up consumer ref
|
||||
self.consumer_ref = {
|
||||
"service": "service",
|
||||
"resource_type": "resource_type",
|
||||
"resource_id": "resource_id",
|
||||
}
|
||||
|
||||
# Set up mocked project repo
|
||||
self.project_repo = mock.MagicMock()
|
||||
self.project_repo.get.return_value = self.project
|
||||
self.setup_project_repository_mock(self.project_repo)
|
||||
|
||||
# Set up mocked quota enforcer
|
||||
self.quota_patch = mock.patch(
|
||||
'barbican.common.quota.QuotaEnforcer.enforce', return_value=None)
|
||||
self.quota_patch.start()
|
||||
self.addCleanup(self.quota_patch.stop)
|
||||
|
||||
# Set up mocked secret repo
|
||||
self.secret_repo = mock.MagicMock()
|
||||
self.secret_repo.get_secret_by_id.return_value = self.secret
|
||||
self.setup_secret_repository_mock(self.secret_repo)
|
||||
|
||||
# Set up mocked secret meta repo
|
||||
self.secret_meta_repo = mock.MagicMock()
|
||||
self.secret_meta_repo.get_metadata_for_secret.return_value = None
|
||||
self.setup_secret_meta_repository_mock(self.secret_meta_repo)
|
||||
|
||||
# Set up mocked secret consumer repo
|
||||
self.consumer_repo = mock.MagicMock()
|
||||
self.consumer_repo.create_from.return_value = None
|
||||
self.setup_secret_consumer_repository_mock(self.consumer_repo)
|
||||
|
||||
def test_should_add_new_consumer(self):
|
||||
resp = self.app.post_json(
|
||||
'/secrets/{0}/consumers/'.format(self.secret.id),
|
||||
self.consumer_ref
|
||||
)
|
||||
self.assertEqual(200, resp.status_int)
|
||||
self.assertNotIn(self.external_project_id, resp.headers['Location'])
|
||||
|
||||
args, kwargs = self.consumer_repo.create_or_update_from.call_args
|
||||
consumer = args[0]
|
||||
self.assertIsInstance(consumer, models.SecretConsumerMetadatum)
|
||||
|
||||
def test_should_fail_consumer_bad_json(self):
|
||||
resp = self.app.post(
|
||||
'/secrets/{0}/consumers/'.format(self.secret.id),
|
||||
'',
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(415, resp.status_int)
|
||||
|
||||
def test_should_404_when_secret_ref_doesnt_exist(self):
|
||||
self.secret_repo.get_secret_by_id.return_value = None
|
||||
resp = self.app.post_json(
|
||||
'/secrets/{0}/consumers/'.format('bad_id'),
|
||||
self.consumer_ref, expect_errors=True
|
||||
)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
|
||||
class WhenGettingOrDeletingSecretConsumersUsingResource(FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(
|
||||
WhenGettingOrDeletingSecretConsumersUsingResource, self
|
||||
).setUp()
|
||||
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
|
||||
self.app.extra_environ = get_barbican_env(self.external_project_id)
|
||||
|
||||
@property
|
||||
def root(self):
|
||||
self._init()
|
||||
|
||||
class RootController(object):
|
||||
secrets = controllers.secrets.SecretsController()
|
||||
|
||||
return RootController()
|
||||
|
||||
def _init(self):
|
||||
self.external_project_id = 'keystoneid1234'
|
||||
self.project_internal_id = 'projectid1234'
|
||||
|
||||
# Set up mocked project
|
||||
self.project = models.Project()
|
||||
self.project.id = self.project_internal_id
|
||||
self.project.external_id = self.external_project_id
|
||||
|
||||
# Set up mocked secret
|
||||
self.secret = models.Secret()
|
||||
self.secret.id = utils.generate_test_valid_uuid()
|
||||
self.secret.project = self.project
|
||||
self.secret.project_id = self.project_internal_id
|
||||
|
||||
# Set up mocked consumers
|
||||
self.consumer = create_secret_consumer(
|
||||
self.secret.id, self.project_internal_id,
|
||||
id_ref=utils.generate_test_valid_uuid())
|
||||
self.consumer2 = create_secret_consumer(
|
||||
self.secret.id, self.project_internal_id,
|
||||
id_ref=utils.generate_test_valid_uuid())
|
||||
|
||||
self.consumer_ref = {
|
||||
"service": self.consumer.service,
|
||||
"resource_type": self.consumer.resource_type,
|
||||
"resource_id": self.consumer.resource_type,
|
||||
}
|
||||
|
||||
# Set up mocked project repo
|
||||
self.project_repo = mock.MagicMock()
|
||||
self.project_repo.get.return_value = self.project
|
||||
self.setup_project_repository_mock(self.project_repo)
|
||||
|
||||
# Set up mocked secret repo
|
||||
self.secret_repo = mock.MagicMock()
|
||||
self.secret_repo.get_secret_by_id.return_value = self.secret
|
||||
self.setup_secret_repository_mock(self.secret_repo)
|
||||
|
||||
# Set up mocked secret meta repo
|
||||
self.secret_meta_repo = mock.MagicMock()
|
||||
self.secret_meta_repo.get_metadata_for_secret.return_value = None
|
||||
self.setup_secret_meta_repository_mock(self.secret_meta_repo)
|
||||
|
||||
# Set up mocked secret consumer repo
|
||||
self.consumer_repo = mock.MagicMock()
|
||||
self.consumer_repo.get_by_values.return_value = self.consumer
|
||||
self.consumer_repo.delete_entity_by_id.return_value = None
|
||||
self.setup_secret_consumer_repository_mock(self.consumer_repo)
|
||||
|
||||
def test_should_get_consumer(self):
|
||||
ret_val = ([self.consumer], 0, 0, 1)
|
||||
self.consumer_repo.get_by_secret_id.return_value = ret_val
|
||||
|
||||
resp = self.app.get('/secrets/{0}/consumers/'.format(
|
||||
self.secret.id
|
||||
))
|
||||
self.assertEqual(200, resp.status_int)
|
||||
|
||||
self.consumer_repo.get_by_secret_id.assert_called_once_with(
|
||||
self.secret.id,
|
||||
limit_arg=None,
|
||||
offset_arg=0,
|
||||
suppress_exception=True
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
self.consumer.service,
|
||||
resp.json["consumers"][0]["service"],
|
||||
)
|
||||
self.assertEqual(
|
||||
self.consumer.resource_type,
|
||||
resp.json["consumers"][0]["resource_type"]
|
||||
)
|
||||
self.assertEqual(
|
||||
self.consumer.resource_id,
|
||||
resp.json["consumers"][0]["resource_id"]
|
||||
)
|
||||
|
||||
def test_should_404_when_secret_ref_doesnt_exist(self):
|
||||
self.secret_repo.get_secret_by_id.return_value = None
|
||||
resp = self.app.get('/secrets/{0}/consumers/'.format(
|
||||
'bad_id'
|
||||
), expect_errors=True)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
def test_should_get_consumer_by_id(self):
|
||||
self.consumer_repo.get.return_value = self.consumer
|
||||
resp = self.app.get('/secrets/{0}/consumers/{1}/'.format(
|
||||
self.secret.id, self.consumer.id
|
||||
))
|
||||
self.assertEqual(200, resp.status_int)
|
||||
|
||||
def test_should_404_with_bad_consumer_id(self):
|
||||
self.consumer_repo.get.return_value = None
|
||||
resp = self.app.get('/secrets/{0}/consumers/{1}/'.format(
|
||||
self.secret.id, 'bad_id'
|
||||
), expect_errors=True)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
def test_should_get_no_consumers(self):
|
||||
self.consumer_repo.get_by_secret_id.return_value = ([], 0, 0, 0)
|
||||
resp = self.app.get('/secrets/{0}/consumers/'.format(
|
||||
self.secret.id
|
||||
))
|
||||
self.assertEqual(200, resp.status_int)
|
||||
|
||||
def test_should_delete_consumer(self):
|
||||
self.app.delete_json('/secrets/{0}/consumers/'.format(
|
||||
self.secret.id
|
||||
), self.consumer_ref)
|
||||
|
||||
self.consumer_repo.delete_entity_by_id.assert_called_once_with(
|
||||
self.consumer.id, self.external_project_id)
|
||||
|
||||
def test_should_fail_deleting_consumer_bad_json(self):
|
||||
resp = self.app.delete(
|
||||
'/secrets/{0}/consumers/'.format(self.secret.id),
|
||||
'',
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(415, resp.status_int)
|
||||
|
||||
def test_should_404_on_delete_when_consumer_not_found(self):
|
||||
old_return = self.consumer_repo.get_by_values.return_value
|
||||
self.consumer_repo.get_by_values.return_value = None
|
||||
resp = self.app.delete_json('/secrets/{0}/consumers/'.format(
|
||||
self.secret.id
|
||||
), self.consumer_ref, expect_errors=True)
|
||||
self.consumer_repo.get_by_values.return_value = old_return
|
||||
self.assertEqual(404, resp.status_int)
|
||||
# Error response should have json content type
|
||||
self.assertEqual("application/json", resp.content_type)
|
||||
|
||||
def test_should_404_on_delete_when_consumer_not_found_later(self):
|
||||
self.consumer_repo.delete_entity_by_id.side_effect = excep.NotFound()
|
||||
resp = self.app.delete_json('/secrets/{0}/consumers/'.format(
|
||||
self.secret.id
|
||||
), self.consumer_ref, expect_errors=True)
|
||||
self.consumer_repo.delete_entity_by_id.side_effect = None
|
||||
self.assertEqual(404, resp.status_int)
|
||||
# Error response should have json content type
|
||||
self.assertEqual("application/json", resp.content_type)
|
||||
|
||||
def test_should_delete_consumers_on_secret_delete(self):
|
||||
consumers = [self.consumer, self.consumer2]
|
||||
ret_val = (consumers, 0, 0, 1)
|
||||
self.consumer_repo.get_by_secret_id.return_value = ret_val
|
||||
|
||||
resp = self.app.delete(
|
||||
'/secrets/{0}/'.format(self.secret.id)
|
||||
)
|
||||
self.assertEqual(204, resp.status_int)
|
||||
|
||||
# Verify consumers were deleted
|
||||
calls = []
|
||||
for consumer in consumers:
|
||||
calls.append(mock.call(consumer.id, self.external_project_id))
|
||||
self.consumer_repo.delete_entity_by_id.assert_has_calls(
|
||||
calls, any_order=True
|
||||
)
|
||||
|
||||
def test_should_pass_on_secret_delete_with_missing_consumers(self):
|
||||
consumers = [self.consumer, self.consumer2]
|
||||
ret_val = (consumers, 0, 0, 1)
|
||||
self.consumer_repo.get_by_secret_id.return_value = ret_val
|
||||
self.consumer_repo.delete_entity_by_id.side_effect = excep.NotFound
|
||||
|
||||
resp = self.app.delete(
|
||||
'/secrets/{0}/'.format(self.secret.id)
|
||||
)
|
||||
self.assertEqual(204, resp.status_int)
|
||||
|
||||
# Verify consumers were deleted
|
||||
calls = []
|
||||
for consumer in consumers:
|
||||
calls.append(mock.call(consumer.id, self.external_project_id))
|
||||
self.consumer_repo.delete_entity_by_id.assert_has_calls(
|
||||
calls, any_order=True
|
||||
)
|
||||
|
||||
|
||||
class WhenPerformingUnallowedOperationsOnSecretConsumers(FunctionalTest):
|
||||
def setUp(self):
|
||||
super(
|
||||
WhenPerformingUnallowedOperationsOnSecretConsumers, self
|
||||
).setUp()
|
||||
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
|
||||
self.app.extra_environ = get_barbican_env(self.external_project_id)
|
||||
|
||||
@property
|
||||
def root(self):
|
||||
self._init()
|
||||
|
||||
class RootController(object):
|
||||
secrets = controllers.secrets.SecretsController()
|
||||
|
||||
return RootController()
|
||||
|
||||
def _init(self):
|
||||
self.external_project_id = 'keystoneid1234'
|
||||
self.project_internal_id = 'projectid1234'
|
||||
|
||||
# Set up mocked project
|
||||
self.project = models.Project()
|
||||
self.project.id = self.project_internal_id
|
||||
self.project.external_id = self.external_project_id
|
||||
|
||||
# Set up mocked secret
|
||||
self.secret = models.Secret()
|
||||
self.secret.id = utils.generate_test_valid_uuid()
|
||||
self.secret.project_id = self.project_internal_id
|
||||
|
||||
# Set up mocked secret consumers
|
||||
self.consumer = create_secret_consumer(
|
||||
self.secret.id, self.project_internal_id,
|
||||
id_ref=utils.generate_test_valid_uuid())
|
||||
self.consumer_ref = {
|
||||
"service": self.consumer.service,
|
||||
"resource_type": self.consumer.resource_type,
|
||||
"resource_id": self.consumer.resource_type,
|
||||
}
|
||||
|
||||
# Set up mocked project repo
|
||||
self.project_repo = mock.MagicMock()
|
||||
self.project_repo.get.return_value = self.project
|
||||
self.setup_project_repository_mock(self.project_repo)
|
||||
|
||||
# Set up secret repo
|
||||
self.secret_repo = mock.MagicMock()
|
||||
self.secret_repo.get_secret_by_id.return_value = self.secret
|
||||
self.setup_secret_repository_mock(self.secret_repo)
|
||||
|
||||
# Set up secret consumer repo
|
||||
self.consumer_repo = mock.MagicMock()
|
||||
self.consumer_repo.get_by_values.return_value = self.consumer
|
||||
self.consumer_repo.delete_entity_by_id.return_value = None
|
||||
self.setup_secret_consumer_repository_mock(self.consumer_repo)
|
||||
|
||||
def test_should_not_allow_put_on_consumers(self):
|
||||
ret_val = ([self.consumer], 0, 0, 1)
|
||||
self.consumer_repo.get_by_secret_id.return_value = ret_val
|
||||
|
||||
resp = self.app.put_json(
|
||||
'/secrets/{0}/consumers/'.format(self.secret.id),
|
||||
self.consumer_ref,
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(405, resp.status_int)
|
||||
|
||||
def test_should_not_allow_post_on_consumer_by_id(self):
|
||||
self.consumer_repo.get.return_value = self.consumer
|
||||
resp = self.app.post_json(
|
||||
'/secrets/{0}/consumers/{1}/'.format(self.secret.id,
|
||||
self.consumer.id),
|
||||
self.consumer_ref,
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(405, resp.status_int)
|
||||
|
||||
def test_should_not_allow_put_on_consumer_by_id(self):
|
||||
self.consumer_repo.get.return_value = self.consumer
|
||||
resp = self.app.put_json(
|
||||
'/secrets/{0}/consumers/{1}/'.format(self.secret.id,
|
||||
self.consumer.id),
|
||||
self.consumer_ref,
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(405, resp.status_int)
|
||||
|
||||
def test_should_not_allow_delete_on_consumer_by_id(self):
|
||||
self.consumer_repo.get.return_value = self.consumer
|
||||
resp = self.app.delete(
|
||||
'/secrets/{0}/consumers/{1}/'.format(self.secret.id,
|
||||
self.consumer.id),
|
||||
expect_errors=True
|
||||
)
|
||||
self.assertEqual(405, resp.status_int)
|
||||
|
||||
|
||||
class WhenOwnershipMismatchForSecretConsumer(FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(
|
||||
WhenOwnershipMismatchForSecretConsumer, self
|
||||
).setUp()
|
||||
self.app = webtest.TestApp(app.build_wsgi_app(self.root))
|
||||
self.app.extra_environ = get_barbican_env(self.external_project_id)
|
||||
|
||||
@property
|
||||
def root(self):
|
||||
self._init()
|
||||
|
||||
class RootController(object):
|
||||
secrets = controllers.secrets.SecretsController()
|
||||
return RootController()
|
||||
|
||||
def _init(self):
|
||||
self.external_project_id = 'keystoneid1234'
|
||||
self.project_internal_id = 'projectid1234'
|
||||
|
||||
# Set up mocked project
|
||||
self.project = models.Project()
|
||||
self.project.id = self.project_internal_id
|
||||
self.project.external_id = self.external_project_id
|
||||
|
||||
# Set up mocked secret
|
||||
self.secret = models.Secret()
|
||||
self.secret.id = utils.generate_test_valid_uuid()
|
||||
self.secret.project = models.Project()
|
||||
self.secret.project.external_id = "differentProjectId"
|
||||
|
||||
# Set up mocked consumer
|
||||
self.consumer = create_secret_consumer(self.secret.id,
|
||||
self.project_internal_id,
|
||||
id_ref='consumerid1234')
|
||||
|
||||
self.consumer_ref = {
|
||||
"service": self.consumer.service,
|
||||
"resource_type": self.consumer.resource_type,
|
||||
"resource_id": self.consumer.resource_type,
|
||||
}
|
||||
|
||||
# Set up mocked project repo
|
||||
self.project_repo = mock.MagicMock()
|
||||
self.project_repo.get.return_value = self.project
|
||||
self.setup_project_repository_mock(self.project_repo)
|
||||
|
||||
# Set up mocked secret repo
|
||||
self.secret_repo = mock.MagicMock()
|
||||
self.secret_repo.get.return_value = self.secret
|
||||
self.secret_repo.get_secret_by_id.return_value = self.secret
|
||||
self.setup_secret_repository_mock(self.secret_repo)
|
||||
|
||||
# Set up mocked secret consumer repo
|
||||
self.consumer_repo = mock.MagicMock()
|
||||
self.consumer_repo.get_by_values.return_value = self.consumer
|
||||
self.consumer_repo.delete_entity_by_id.return_value = None
|
||||
self.setup_secret_consumer_repository_mock(self.consumer_repo)
|
||||
|
||||
def test_consumer_check_ownership_mismatch(self):
|
||||
resp = self.app.delete_json(
|
||||
'/secrets/{0}/consumers/'.format(self.secret.id),
|
||||
self.consumer_ref, expect_errors=True)
|
||||
self.assertEqual(403, resp.status_int)
|
||||
|
@ -116,6 +116,14 @@ class PreferredSecretStoreResource(TestableResource):
|
||||
controller_cls = secretstores.PreferredSecretStoreController
|
||||
|
||||
|
||||
class SecretConsumersResource(TestableResource):
|
||||
controller_cls = consumers.SecretConsumersController
|
||||
|
||||
|
||||
class SecretConsumerResource(TestableResource):
|
||||
controller_cls = consumers.SecretConsumerController
|
||||
|
||||
|
||||
class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin):
|
||||
|
||||
def setUp(self):
|
||||
@ -1275,3 +1283,99 @@ class WhenTestingPreferredSecretStoreResource(BaseTestCase):
|
||||
|
||||
def _invoke_on_post(self):
|
||||
self.resource.on_post(self.req, self.resp)
|
||||
|
||||
|
||||
class WhenTestingSecretConsumersResource(BaseTestCase):
|
||||
"""RBAC tests for barbican.api.resources.SecretConsumersResource"""
|
||||
def setUp(self):
|
||||
super(WhenTestingSecretConsumersResource, self).setUp()
|
||||
|
||||
self.external_project_id = '12345project'
|
||||
self.secret_id = '12345secret'
|
||||
|
||||
# Force an error on GET calls that pass RBAC, as we are not testing
|
||||
# such flows in this test module.
|
||||
self.consumer_repo = mock.MagicMock()
|
||||
get_by_secret_id = mock.MagicMock(return_value=None,
|
||||
side_effect=self
|
||||
._generate_get_error())
|
||||
self.consumer_repo.get_by_secret_id = get_by_secret_id
|
||||
|
||||
self.setup_project_repository_mock()
|
||||
self.setup_secret_consumer_repository_mock(self.consumer_repo)
|
||||
self.setup_secret_repository_mock()
|
||||
|
||||
self.resource = SecretConsumersResource(secret_id=self.secret_id)
|
||||
|
||||
def test_rules_should_be_loaded(self):
|
||||
self.assertIsNotNone(self.policy_enforcer.rules)
|
||||
|
||||
def test_should_pass_create_consumer(self):
|
||||
self._assert_pass_rbac(['admin'], self._invoke_on_post,
|
||||
content_type='application/json')
|
||||
|
||||
def test_should_raise_create_consumer(self):
|
||||
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
|
||||
self._invoke_on_post,
|
||||
content_type='application/json')
|
||||
|
||||
def test_should_pass_delete_consumer(self):
|
||||
self._assert_pass_rbac(['admin'], self._invoke_on_delete,
|
||||
content_type='application/json')
|
||||
|
||||
def test_should_raise_delete_consumer(self):
|
||||
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
|
||||
self._invoke_on_delete)
|
||||
|
||||
def test_should_pass_get_consumers(self):
|
||||
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
||||
self._invoke_on_get,
|
||||
content_type='application/json')
|
||||
|
||||
def test_should_raise_get_consumers(self):
|
||||
self._assert_fail_rbac([None, 'bogus'],
|
||||
self._invoke_on_get,
|
||||
content_type='application/json')
|
||||
|
||||
def _invoke_on_post(self):
|
||||
self.resource.on_post(self.req, self.resp)
|
||||
|
||||
def _invoke_on_delete(self):
|
||||
self.resource.on_delete(self.req, self.resp)
|
||||
|
||||
def _invoke_on_get(self):
|
||||
self.resource.on_get(self.req, self.resp)
|
||||
|
||||
|
||||
class WhenTestingSecretConsumerResource(BaseTestCase):
|
||||
"""RBAC tests for barbican.api.resources.SecretConsumerResource"""
|
||||
def setUp(self):
|
||||
super(WhenTestingSecretConsumerResource, self).setUp()
|
||||
|
||||
self.external_project_id = '12345project'
|
||||
self.consumer_id = '12345consumer'
|
||||
|
||||
# Force an error on GET calls that pass RBAC, as we are not testing
|
||||
# such flows in this test module.
|
||||
self.consumer_repo = mock.MagicMock()
|
||||
fail_method = mock.MagicMock(return_value=None,
|
||||
side_effect=self._generate_get_error())
|
||||
self.consumer_repo.get = fail_method
|
||||
|
||||
self.setup_project_repository_mock()
|
||||
self.setup_secret_consumer_repository_mock(self.consumer_repo)
|
||||
self.resource = SecretConsumerResource(consumer_id=self.consumer_id)
|
||||
|
||||
def test_rules_should_be_loaded(self):
|
||||
self.assertIsNotNone(self.policy_enforcer.rules)
|
||||
|
||||
def test_should_pass_get_consumer(self):
|
||||
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
||||
self._invoke_on_get)
|
||||
|
||||
def test_should_raise_get_consumer(self):
|
||||
self._assert_fail_rbac([None, 'bogus'],
|
||||
self._invoke_on_get)
|
||||
|
||||
def _invoke_on_get(self):
|
||||
self.resource.on_get(self.req, self.resp)
|
||||
|
@ -141,6 +141,20 @@ class MockModelRepositoryMixin(object):
|
||||
mock_repo_obj=mock_container_consumer_repo,
|
||||
patcher_obj=self.mock_container_consumer_repo_patcher)
|
||||
|
||||
def setup_secret_consumer_repository_mock(
|
||||
self, mock_secret_consumer_repo=mock.MagicMock()):
|
||||
"""Mocks the secret consumer repository factory function
|
||||
|
||||
:param mock_secret_consumer_repo: The pre-configured mock
|
||||
secret consumer repo to be
|
||||
returned.
|
||||
"""
|
||||
self.mock_secret_consumer_repo_patcher = None
|
||||
self._setup_repository_mock(
|
||||
repo_factory='get_secret_consumer_repository',
|
||||
mock_repo_obj=mock_secret_consumer_repo,
|
||||
patcher_obj=self.mock_secret_consumer_repo_patcher)
|
||||
|
||||
def setup_container_repository_mock(self,
|
||||
mock_container_repo=mock.MagicMock()):
|
||||
"""Mocks the container repository factory function
|
||||
|
@ -23,7 +23,7 @@ class SecretModel(base_models.BaseModel):
|
||||
secret_ref=None, bit_length=None, mode=None, secret_type=None,
|
||||
payload_content_type=None, payload=None, content_types=None,
|
||||
payload_content_encoding=None, status=None, updated=None,
|
||||
created=None, creator_id=None, metadata=None):
|
||||
created=None, creator_id=None, metadata=None, consumers=None):
|
||||
super(SecretModel, self).__init__()
|
||||
|
||||
self.name = name
|
||||
@ -42,3 +42,4 @@ class SecretModel(base_models.BaseModel):
|
||||
self.created = created
|
||||
self.creator_id = creator_id
|
||||
self.metadata = metadata
|
||||
self.consumers = consumers
|
||||
|
Loading…
Reference in New Issue
Block a user