Merge "Adding ACL check when new stored key order is submitted"
This commit is contained in:
commit
0bf1268d77
|
@ -27,6 +27,8 @@ from barbican.model import repositories as repo
|
|||
|
||||
LOG = utils.getLogger(__name__)
|
||||
|
||||
CONTAINER_GET = 'container:get'
|
||||
|
||||
|
||||
def container_not_found():
|
||||
"""Throw exception indicating container not found."""
|
||||
|
@ -59,7 +61,7 @@ class ContainerController(controllers.ACLMixin):
|
|||
|
||||
@index.when(method='GET', template='json')
|
||||
@controllers.handle_exceptions(u._('Container retrieval'))
|
||||
@controllers.enforce_rbac('container:get')
|
||||
@controllers.enforce_rbac(CONTAINER_GET)
|
||||
def on_get(self, external_project_id):
|
||||
dict_fields = self.container.to_dict_fields()
|
||||
|
||||
|
|
|
@ -206,7 +206,7 @@ class OrdersController(controllers.ACLMixin):
|
|||
container_ref = order_meta.get('container_ref')
|
||||
validators.validate_stored_key_rsa_container(
|
||||
external_project_id,
|
||||
container_ref)
|
||||
container_ref, pecan.request)
|
||||
|
||||
new_order = models.Order()
|
||||
new_order.meta = body.get('meta')
|
||||
|
|
|
@ -22,6 +22,7 @@ from OpenSSL import crypto
|
|||
from oslo_config import cfg
|
||||
import six
|
||||
|
||||
from barbican.api import controllers
|
||||
from barbican.common import exception
|
||||
from barbican.common import hrefs
|
||||
from barbican.common import utils
|
||||
|
@ -88,7 +89,7 @@ def validate_ca_id(project_id, order_meta):
|
|||
project_id=project_id)
|
||||
|
||||
|
||||
def validate_stored_key_rsa_container(project_id, container_ref):
|
||||
def validate_stored_key_rsa_container(project_id, container_ref, req):
|
||||
try:
|
||||
container_id = hrefs.get_container_id_from_ref(container_ref)
|
||||
except Exception:
|
||||
|
@ -98,9 +99,9 @@ def validate_stored_key_rsa_container(project_id, container_ref):
|
|||
raise exception.InvalidContainer(reason=reason)
|
||||
|
||||
container_repo = repo.get_container_repository()
|
||||
container = container_repo.get(container_id,
|
||||
external_project_id=project_id,
|
||||
suppress_exception=True)
|
||||
|
||||
container = container_repo.get_container_by_id(entity_id=container_id,
|
||||
suppress_exception=True)
|
||||
if not container:
|
||||
reason = u._("Container Not Found")
|
||||
raise exception.InvalidContainer(reason=reason)
|
||||
|
@ -109,10 +110,11 @@ def validate_stored_key_rsa_container(project_id, container_ref):
|
|||
reason = u._("Container Wrong Type")
|
||||
raise exception.InvalidContainer(reason=reason)
|
||||
|
||||
# TODO(dave) Validation should be done to determine if the
|
||||
# requester of the certificate has permissions to access the
|
||||
# keys in this container. This can be done after the ACL patch
|
||||
# has landed.
|
||||
ctxt = controllers._get_barbican_context(req)
|
||||
inst = controllers.containers.ContainerController(container)
|
||||
controllers._do_enforce_rbac(inst, req,
|
||||
controllers.containers.CONTAINER_GET,
|
||||
ctxt)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
|
|
|
@ -21,8 +21,11 @@ import testtools
|
|||
from barbican.common import resources
|
||||
from barbican.model import models
|
||||
from barbican.model import repositories
|
||||
from barbican.tests.api.controllers import test_acls
|
||||
from barbican.tests.api import test_resources_policy as test_policy
|
||||
from barbican.tests import utils
|
||||
|
||||
|
||||
order_repo = repositories.get_order_repository()
|
||||
project_repo = repositories.get_project_repository()
|
||||
ca_repo = repositories.get_ca_repository()
|
||||
|
@ -427,12 +430,14 @@ class WhenCreatingCertificateOrders(utils.BarbicanAPIBaseTestCase):
|
|||
self.assertEqual(403, create_resp.status_int)
|
||||
|
||||
|
||||
class WhenCreatingStoredKeyOrders(utils.BarbicanAPIBaseTestCase):
|
||||
class WhenCreatingStoredKeyOrders(utils.BarbicanAPIBaseTestCase,
|
||||
test_policy.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(WhenCreatingStoredKeyOrders, self).setUp()
|
||||
|
||||
# Make sure we have a project
|
||||
self.project = resources.get_or_create_project(self.project_id)
|
||||
self.creator_user_id = 'creatorUserId'
|
||||
|
||||
def test_can_create_new_stored_key_order(self):
|
||||
container_name = 'rsa container name'
|
||||
|
@ -459,6 +464,175 @@ class WhenCreatingStoredKeyOrders(utils.BarbicanAPIBaseTestCase):
|
|||
order = order_repo.get(order_uuid, self.project_id)
|
||||
self.assertIsInstance(order, models.Order)
|
||||
|
||||
def _setup_acl_order_context_and_create_order(
|
||||
self, add_acls=False, read_creator_only=False, order_roles=None,
|
||||
order_user=None, expect_errors=False):
|
||||
"""Helper method to setup acls, order context and return created order.
|
||||
|
||||
Create order uses actual oslo policy enforcer instead of being None.
|
||||
Create ACLs for container if 'add_acls' is True.
|
||||
Make container private when 'read_creator_only' is True.
|
||||
"""
|
||||
|
||||
container_name = 'rsa container name'
|
||||
container_type = 'rsa'
|
||||
secret_refs = []
|
||||
self.app.extra_environ = {
|
||||
'barbican.context': self._build_context(self.project_id,
|
||||
user=self.creator_user_id)
|
||||
}
|
||||
_, container_id = create_container(
|
||||
self.app,
|
||||
name=container_name,
|
||||
container_type=container_type,
|
||||
secret_refs=secret_refs
|
||||
)
|
||||
|
||||
if add_acls:
|
||||
_, _ = test_acls.manage_acls(
|
||||
self.app, 'containers', container_id,
|
||||
read_user_ids=['u1', 'u3', 'u4'],
|
||||
read_creator_only=read_creator_only,
|
||||
is_update=False)
|
||||
|
||||
self.app.extra_environ = {
|
||||
'barbican.context': self._build_context(
|
||||
self.project_id, roles=order_roles, user=order_user,
|
||||
is_admin=False, policy_enforcer=self.policy_enforcer)
|
||||
}
|
||||
|
||||
stored_key_meta = {
|
||||
'request_type': 'stored-key',
|
||||
'subject_dn': 'cn=barbican-server,o=example.com',
|
||||
'container_ref': 'https://localhost/v1/containers/' + container_id
|
||||
}
|
||||
return create_order(
|
||||
self.app,
|
||||
order_type='certificate',
|
||||
meta=stored_key_meta,
|
||||
expect_errors=expect_errors
|
||||
)
|
||||
|
||||
def test_can_create_new_stored_key_order_no_acls_and_policy_check(self):
|
||||
"""Create stored key order with actual policy enforcement logic.
|
||||
|
||||
Order can be created as long as order project and user roles are
|
||||
allowed in policy. In the test, user requesting order has container
|
||||
project and has 'creator' role. Order should be created regardless
|
||||
of what user id is.
|
||||
"""
|
||||
|
||||
create_resp, order_id = self._setup_acl_order_context_and_create_order(
|
||||
add_acls=False, read_creator_only=False, order_roles=['creator'],
|
||||
order_user='anyUserId', expect_errors=False)
|
||||
|
||||
self.assertEqual(202, create_resp.status_int)
|
||||
|
||||
order = order_repo.get(order_id, self.project_id)
|
||||
self.assertIsInstance(order, models.Order)
|
||||
self.assertEqual('anyUserId', order.creator_id)
|
||||
|
||||
def test_should_fail_for_user_observer_role_no_acls_and_policy_check(self):
|
||||
"""Should not allow create order when user doesn't have necessary role.
|
||||
|
||||
Order can be created as long as order project and user roles are
|
||||
allowed in policy. In the test, user requesting order has container
|
||||
project but has 'observer' role. Create order should fail as expected
|
||||
role is 'admin' or 'creator'.
|
||||
"""
|
||||
|
||||
create_resp, _ = self._setup_acl_order_context_and_create_order(
|
||||
add_acls=False, read_creator_only=False, order_roles=['observer'],
|
||||
order_user='anyUserId', expect_errors=True)
|
||||
self.assertEqual(403, create_resp.status_int)
|
||||
|
||||
def test_can_create_order_with_private_container_and_creator_user(self):
|
||||
"""Create order using private container with creator user.
|
||||
|
||||
Container has been marked private via ACLs. Still creator of container
|
||||
should be able to create stored key order using that container
|
||||
successfully.
|
||||
"""
|
||||
create_resp, order_id = self._setup_acl_order_context_and_create_order(
|
||||
add_acls=True, read_creator_only=True, order_roles=['creator'],
|
||||
order_user=self.creator_user_id, expect_errors=False)
|
||||
|
||||
self.assertEqual(202, create_resp.status_int)
|
||||
|
||||
order = order_repo.get(order_id, self.project_id)
|
||||
self.assertIsInstance(order, models.Order)
|
||||
self.assertEqual(self.creator_user_id, order.creator_id)
|
||||
|
||||
def test_can_create_order_with_private_container_and_acl_user(self):
|
||||
"""Create order using private container with acl user.
|
||||
|
||||
Container has been marked private via ACLs. So *generally* project user
|
||||
should not be able to create stored key order using that container.
|
||||
But here it can create order as that user is defined in read ACL user
|
||||
list. Here project user means user which has 'creator' role in the
|
||||
container project. Order project is same as container.
|
||||
"""
|
||||
|
||||
create_resp, order_id = self._setup_acl_order_context_and_create_order(
|
||||
add_acls=True, read_creator_only=True, order_roles=['creator'],
|
||||
order_user='u3', expect_errors=False)
|
||||
self.assertEqual(202, create_resp.status_int)
|
||||
|
||||
order = order_repo.get(order_id, self.project_id)
|
||||
self.assertIsInstance(order, models.Order)
|
||||
self.assertEqual('u3', order.creator_id)
|
||||
|
||||
def test_should_raise_with_private_container_and_project_user(self):
|
||||
"""Create order should fail using private container for project user.
|
||||
|
||||
Container has been marked private via ACLs. So project user should not
|
||||
be able to create stored key order using that container. Here project
|
||||
user means user which has 'creator' role in the container project.
|
||||
Order project is same as container. If container was not marked
|
||||
private, this user would have been able to create order. See next test.
|
||||
"""
|
||||
|
||||
create_resp, _ = self._setup_acl_order_context_and_create_order(
|
||||
add_acls=True, read_creator_only=True, order_roles=['creator'],
|
||||
order_user='anyProjectUser', expect_errors=True)
|
||||
|
||||
self.assertEqual(403, create_resp.status_int)
|
||||
|
||||
def test_can_create_order_with_non_private_acls_and_project_user(self):
|
||||
"""Create order using non-private container with project user.
|
||||
|
||||
Container has not been marked private via ACLs. So project user should
|
||||
be able to create stored key order using that container successfully.
|
||||
Here project user means user which has 'creator' role in the container
|
||||
project. Order project is same as container.
|
||||
"""
|
||||
create_resp, order_id = self._setup_acl_order_context_and_create_order(
|
||||
add_acls=True, read_creator_only=False, order_roles=['creator'],
|
||||
order_user='anyProjectUser', expect_errors=False)
|
||||
|
||||
self.assertEqual(202, create_resp.status_int)
|
||||
|
||||
order = order_repo.get(order_id, self.project_id)
|
||||
self.assertIsInstance(order, models.Order)
|
||||
self.assertEqual('anyProjectUser', order.creator_id)
|
||||
|
||||
def test_can_create_order_with_non_private_acls_and_creator_user(self):
|
||||
"""Create order using non-private container with creator user.
|
||||
|
||||
Container has not been marked private via ACLs. So user who created
|
||||
container should be able to create stored key order using that
|
||||
container successfully. Order project is same as container.
|
||||
"""
|
||||
create_resp, order_id = self._setup_acl_order_context_and_create_order(
|
||||
add_acls=True, read_creator_only=False, order_roles=['creator'],
|
||||
order_user=self.creator_user_id, expect_errors=False)
|
||||
|
||||
self.assertEqual(202, create_resp.status_int)
|
||||
|
||||
order = order_repo.get(order_id, self.project_id)
|
||||
self.assertIsInstance(order, models.Order)
|
||||
self.assertEqual(self.creator_user_id, order.creator_id)
|
||||
|
||||
def test_should_raise_with_bad_container_ref(self):
|
||||
stored_key_meta = {
|
||||
'request_type': 'stored-key',
|
||||
|
|
|
@ -36,14 +36,15 @@ class BarbicanAPIBaseTestCase(oslotest.BaseTestCase):
|
|||
"""Base TestCase for all tests needing to interact with a Barbican app."""
|
||||
root_controller = None
|
||||
|
||||
def _build_context(self, project_id):
|
||||
def _build_context(self, project_id, roles=None, user=None, is_admin=True,
|
||||
policy_enforcer=None):
|
||||
context = barbican.context.RequestContext(
|
||||
roles=None,
|
||||
user=None,
|
||||
roles=roles,
|
||||
user=user,
|
||||
project=project_id,
|
||||
is_admin=True
|
||||
is_admin=is_admin
|
||||
)
|
||||
context.policy_enforcer = None
|
||||
context.policy_enforcer = policy_enforcer
|
||||
return context
|
||||
|
||||
def setUp(self):
|
||||
|
|
Loading…
Reference in New Issue