Adding ACL db repository changes (Part 2)

Added repositories for SecretACL and ContainerACL entities.

There is no repository logic added for ACL user tables as its data
is accessed via relationship defined in respective ACL tables.
Currently ACL logic does not require separate APIs for these entities.

Depends-On: Iffe63e865fd98eb9393a9d87f7921909dc48e4e1

Change-Id: I88aaa3ee686a501062bf46974847fc1f3ea844bf
Implements: blueprint add-per-secret-policy
This commit is contained in:
Arun Kant
2015-03-25 09:28:43 -07:00
parent 5881083be7
commit 315daccb48
4 changed files with 792 additions and 0 deletions

View File

@@ -27,6 +27,7 @@ import uuid
from oslo_config import cfg
from oslo_log import log
import sqlalchemy
from sqlalchemy import func as sa_func
from sqlalchemy import or_
import sqlalchemy.orm as sa_orm
@@ -48,6 +49,7 @@ sa_logger = None
# Singleton repository references, instantiated via get_xxxx_repository()
# functions below. Please keep this list in alphabetical order.
_CA_REPOSITORY = None
_CONTAINER_ACL_REPOSITORY = None
_CONTAINER_CONSUMER_REPOSITORY = None
_CONTAINER_REPOSITORY = None
_CONTAINER_SECRET_REPOSITORY = None
@@ -61,6 +63,7 @@ _PREFERRED_CA_REPOSITORY = None
_PROJECT_REPOSITORY = None
_PROJECT_CA_REPOSITORY = None
_PROJECT_SECRET_REPOSITORY = None
_SECRET_ACL_REPOSITORY = None
_SECRET_META_REPOSITORY = None
_SECRET_REPOSITORY = None
_TRANSPORT_KEY_REPOSITORY = None
@@ -669,6 +672,32 @@ class SecretRepo(BaseRepo):
query = query.filter(models.ProjectSecret.project_id == project_id)
return query
def get_secret_by_id(self, entity_id, suppress_exception=False,
session=None):
"""Gets secret by its entity id without project id check."""
session = self.get_session(session)
try:
utcnow = timeutils.utcnow()
# Note(john-wood-w): SQLAlchemy requires '== None' below,
# not 'is None'.
expiration_filter = or_(models.Secret.expiration == None,
models.Secret.expiration > utcnow)
query = session.query(models.Secret)
query = query.filter_by(id=entity_id, deleted=False)
query = query.filter(expiration_filter)
entity = query.one()
except sa_orm.exc.NoResultFound:
entity = None
if not suppress_exception:
LOG.exception(u._LE("Problem getting secret %s"),
entity_id)
raise exception.NotFound(u._(
"No secret found with secret-ID {id}").format(
entity_name=self._do_entity_name(),
id=entity_id))
return entity
class EncryptedDatumRepo(BaseRepo):
"""Repository for the EncryptedDatum entity
@@ -1140,6 +1169,25 @@ class ContainerRepo(BaseRepo):
return session.query(models.Container).filter_by(
deleted=False).filter_by(project_id=project_id)
def get_container_by_id(self, entity_id, suppress_exception=False,
session=None):
"""Gets container by its entity id without project id check."""
session = self.get_session(session)
try:
query = session.query(models.Container)
query = query.filter_by(id=entity_id, deleted=False)
entity = query.one()
except sa_orm.exc.NoResultFound:
entity = None
if not suppress_exception:
LOG.exception(u._LE("Problem getting container %s"),
entity_id)
raise exception.NotFound(u._(
"No container found with container-ID {id}").format(
entity_name=self._do_entity_name(),
id=entity_id))
return entity
class ContainerSecretRepo(BaseRepo):
"""Repository for the ContainerSecret entity."""
@@ -1625,12 +1673,206 @@ class PreferredCertificateAuthorityRepo(BaseRepo):
project_id=project_id).filter_by(deleted=False)
class SecretACLRepo(BaseRepo):
"""Repository for the SecretACL entity.
There is no need for SecretACLUserRepo as none of logic access
SecretACLUser (ACL user data) directly. Its always derived from
SecretACL relationship.
SecretACL and SecretACLUser data is not soft delete. So there is no need
to have deleted=False filter in queries.
"""
def _do_entity_name(self):
"""Sub-class hook: return entity name, such as for debugging."""
return "SecretACL"
def _do_build_get_query(self, entity_id, external_project_id, session):
"""Sub-class hook: build a retrieve query."""
query = session.query(models.SecretACL)
query = query.filter_by(id=entity_id)
return query
def _do_validate(self, values):
"""Sub-class hook: validate values."""
pass
def get_by_secret_id(self, secret_id, session=None):
"""Return list of secret ACLs by secret id."""
session = self.get_session(session)
query = session.query(models.SecretACL)
query = query.filter_by(secret_id=secret_id)
return query.all()
def create_or_replace_from(self, secret, secret_acl, user_ids=None,
session=None):
session = self.get_session(session)
secret.updated_at = timeutils.utcnow()
secret.secret_acls.append(secret_acl)
secret.save(session=session)
self._create_or_replace_acl_users(secret_acl, user_ids,
session=session)
def _create_or_replace_acl_users(self, secret_acl, user_ids, session=None):
"""Creates or updates secret acl user based on input user_ids list.
user_ids is expected to be list of ids (enforced by schema validation).
Input user ids should have complete list of acl users. It does not
apply partial update of user ids.
If user_ids is None, no change is made in acl user data.
If user_ids list is not None, then following change is made.
For existing acl users, just update timestamp if user_id is present in
input user ids list. Otherwise, remove existing acl user entries.
Then add the remainining input user ids as new acl user db entries.
"""
if user_ids is None:
return
user_ids = set(user_ids)
now = timeutils.utcnow()
session = self.get_session(session)
secret_acl.updated_at = now
for acl_user in secret_acl.acl_users:
if acl_user.user_id in user_ids: # input user_id already exists
acl_user.updated_at = now
user_ids.remove(acl_user.user_id)
else:
acl_user.delete(session)
for user_id in user_ids:
acl_user = models.SecretACLUser(secret_acl.id, user_id)
secret_acl.acl_users.append(acl_user)
secret_acl.save(session=session)
def get_count(self, secret_id, session=None):
"""Gets count of existing secret ACL(s) for a given secret."""
session = self.get_session(session)
query = session.query(sa_func.count(models.SecretACL.id))
return query.scalar()
def delete_acls_for_secret(self, secret, session=None):
session = self.get_session(session)
for entity in secret.secret_acls:
entity.delete(session=session)
class ContainerACLRepo(BaseRepo):
"""Repository for the ContainerACL entity.
There is no need for ContainerACLUserRepo as none of logic access
ContainerACLUser (ACL user data) directly. Its always derived from
ContainerACL relationship.
ContainerACL and ContainerACLUser data is not soft delete. So there is no
need to have deleted=False filter in queries.
"""
def _do_entity_name(self):
"""Sub-class hook: return entity name, such as for debugging."""
return "ContainerACL"
def _do_build_get_query(self, entity_id, external_project_id, session):
"""Sub-class hook: build a retrieve query."""
query = session.query(models.ContainerACL)
query = query.filter_by(id=entity_id)
return query
def _do_validate(self, values):
"""Sub-class hook: validate values."""
pass
def get_by_container_id(self, container_id, session=None):
"""Return list of container ACLs by container id."""
session = self.get_session(session)
query = session.query(models.ContainerACL)
query = query.filter_by(container_id=container_id)
return query.all()
def create_or_replace_from(self, container, container_acl,
user_ids=None, session=None):
session = self.get_session(session)
container.updated_at = timeutils.utcnow()
container.container_acls.append(container_acl)
container.save(session=session)
self._create_or_replace_acl_users(container_acl, user_ids, session)
def _create_or_replace_acl_users(self, container_acl, user_ids,
session=None):
"""Creates or updates container acl user based on input user_ids list.
user_ids is expected to be list of ids (enforced by schema validation).
Input user ids should have complete list of acl users. It does not
apply partial update of user ids.
If user_ids is None, no change is made in acl user data.
If user_ids list is not None, then following change is made.
For existing acl users, just update timestamp if user_id is present in
input user ids list. Otherwise, remove existing acl user entries.
Then add the remainining input user ids as new acl user db entries.
"""
if user_ids is None:
return
user_ids = set(user_ids)
now = timeutils.utcnow()
session = self.get_session(session)
container_acl.updated_at = now
for acl_user in container_acl.acl_users:
if acl_user.user_id in user_ids: # input user_id already exists
acl_user.updated_at = now
user_ids.remove(acl_user.user_id)
else:
acl_user.delete(session)
for user_id in user_ids:
acl_user = models.ContainerACLUser(container_acl.id, user_id)
container_acl.acl_users.append(acl_user)
container_acl.save(session=session)
def get_count(self, secret_id, session=None):
"""Gets count of existing container ACL(s) for a given container."""
session = self.get_session(session)
query = session.query(sa_func.count(models.ContainerACL.id))
return query.scalar()
def delete_acls_for_container(self, container, session=None):
session = self.get_session(session)
for entity in container.container_acls:
entity.delete(session=session)
def get_ca_repository():
"""Returns a singleton Secret repository instance."""
global _CA_REPOSITORY
return _get_repository(_CA_REPOSITORY, CertificateAuthorityRepo)
def get_container_acl_repository():
"""Returns a singleton Container ACL repository instance."""
global _CONTAINER_ACL_REPOSITORY
return _get_repository(_CONTAINER_ACL_REPOSITORY, ContainerACLRepo)
def get_container_consumer_repository():
"""Returns a singleton Container Consumer repository instance."""
global _CONTAINER_CONSUMER_REPOSITORY
@@ -1714,6 +1956,12 @@ def get_project_secret_repository():
return _get_repository(_PROJECT_SECRET_REPOSITORY, ProjectSecretRepo)
def get_secret_acl_repository():
"""Returns a singleton Secret ACL repository instance."""
global _SECRET_ACL_REPOSITORY
return _get_repository(_SECRET_ACL_REPOSITORY, SecretACLRepo)
def get_secret_meta_repository():
"""Returns a singleton Secret meta repository instance."""
global _SECRET_META_REPOSITORY

View File

@@ -0,0 +1,494 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from barbican.common import exception
from barbican.model import models
from barbican.model import repositories
from barbican.tests import database_utils
class TestACLMixin(object):
def _assert_acl_users(self, user_ids, acls, acl_id, check_size=True):
"""Checks that all input users are present in matching acl users data.
It also checks if number of acl users are same as input users when
check_size flag is True.
"""
acls_map = self._map_id_to_acl(acls)
acl_users = acls_map[acl_id].to_dict_fields()['users']
if check_size:
self.assertEqual(len(user_ids), len(acl_users))
self.assertTrue(all(user_id in user_ids for user_id in acl_users))
def _map_id_to_acl(self, acls):
"""Provides dictionary of id and acl from acls list."""
m = {}
for acl in acls:
m[acl.id] = acl
return m
class WhenTestingSecretACLRepository(database_utils.RepositoryTestCase,
TestACLMixin):
def setUp(self):
super(WhenTestingSecretACLRepository, self).setUp()
self.acl_repo = repositories.get_secret_acl_repository()
def _create_base_secret(self):
# Setup the secret and needed base relationship
secret_repo = repositories.get_secret_repository()
session = secret_repo.get_session()
secret = secret_repo.create_from(models.Secret(), session=session)
project = models.Project()
project.external_id = "keystone_project_id"
project.save(session=session)
project_secret = models.ProjectSecret()
project_secret.secret_id = secret.id
project_secret.project_id = project.id
project_secret.save(session=session)
session.commit()
return secret
def test_get_by_secret_id(self):
session = self.acl_repo.get_session()
secret = self._create_base_secret()
acls = self.acl_repo.get_by_secret_id(secret.id, session)
self.assertEqual(0, len(acls))
acl1 = self.acl_repo.create_from(models.SecretACL(secret.id, 'read',
True, ['u1', 'u2']),
session)
acls = self.acl_repo.get_by_secret_id(secret.id, session)
self.assertEqual(1, len(acls))
self.assertEqual(acl1.id, acls[0].id)
self.assertEqual('read', acls[0].operation)
self._assert_acl_users(['u2', 'u1'], acls, acl1.id)
def test_get_by_entity_id(self):
session = self.acl_repo.get_session()
secret = self._create_base_secret()
acl1 = self.acl_repo.create_from(models.SecretACL(
secret.id, 'read', True, ['u1', 'u2']), session)
acl = self.acl_repo.get(acl1.id, session)
self.assertIsNotNone(acl)
self.assertEqual(acl1.id, acl.id)
self.assertEqual('read', acl.operation)
self._assert_acl_users(['u1', 'u2'], [acl], acl1.id)
self.acl_repo.delete_entity_by_id(acl1.id, session)
acl = self.acl_repo.get(acl1.id, session, suppress_exception=True)
self.assertIsNone(acl)
def test_should_raise_notfound_exception_get_by_entity_id(self):
self.assertRaises(exception.NotFound, self.acl_repo.get,
"invalid_id", suppress_exception=False)
def test_create_or_replace_from_for_new_acls(self):
"""Check create_or_replace_from and get count call.
It creates new acls with users and make sure that same users
are returned when acls are queries by secret id.
It uses get count to assert expected number of acls for that secret.
"""
session = self.acl_repo.get_session()
secret = self._create_base_secret()
acl1 = self.acl_repo.create_from(models.SecretACL(
secret.id, 'read'), session)
self.acl_repo.create_or_replace_from(
secret, acl1, user_ids=['u1', 'u2'], session=session)
acl2 = self.acl_repo.create_from(models.SecretACL(
secret.id, 'write', True), session)
self.acl_repo.create_or_replace_from(
secret, acl2, user_ids=['u1', 'u2', 'u3'], session=session)
acl3 = self.acl_repo.create_from(models.SecretACL(
secret.id, 'delete'), session)
self.acl_repo.create_or_replace_from(
secret, acl3, user_ids=[], session=session)
acls = self.acl_repo.get_by_secret_id(secret.id, session)
self.assertEqual(3, len(acls))
id_map = self._map_id_to_acl(acls)
self.assertEqual(False, id_map[acl1.id].creator_only)
self.assertEqual(True, id_map[acl2.id].creator_only)
self.assertEqual('read', id_map[acl1.id].operation)
self.assertEqual('write', id_map[acl2.id].operation)
self.assertEqual('delete', id_map[acl3.id].operation)
# order of input users should not matter
self._assert_acl_users(['u1', 'u2'], acls, acl1.id)
self._assert_acl_users(['u2', 'u1'], acls, acl1.id)
self._assert_acl_users(['u2', 'u1', 'u3'], acls, acl2.id)
count = self.acl_repo.get_count(secret.id, session)
self.assertEqual(3, count)
self.assertTrue(len(acls) == count)
def test_create_or_replace_from_with_none_or_blank_users(self):
session = self.acl_repo.get_session()
secret = self._create_base_secret()
acl1 = self.acl_repo.create_from(models.SecretACL(
secret.id, 'read'), session)
self.acl_repo.create_or_replace_from(
secret, acl1, user_ids=None, session=session)
acl2 = self.acl_repo.create_from(models.SecretACL(
secret.id, 'list'), session)
self.acl_repo.create_or_replace_from(
secret, acl1, user_ids=[], session=session)
acls = self.acl_repo.get_by_secret_id(secret.id, session)
id_map = self._map_id_to_acl(acls)
self.assertIsNone(id_map[acl1.id].to_dict_fields().get('users'))
self.assertIsNone(id_map[acl2.id].to_dict_fields().get('users'))
def test_create_or_replace_from_for_existing_acls(self):
"""Check create_or_replace_from and get count call.
It modifies existing acls with users and make sure that updated users
and creator_only flag changes are returned when acls are queries by
secret id. It uses get count to assert expected number of acls for that
secret.
"""
session = self.acl_repo.get_session()
secret = self._create_base_secret()
acl1 = self.acl_repo.create_from(models.SecretACL(
secret.id, 'read'), session)
self.acl_repo.create_or_replace_from(
secret, acl1, user_ids=['u1', 'u2'], session=session)
acl2 = self.acl_repo.create_from(models.SecretACL(
secret.id, 'write'), session)
self.acl_repo.create_or_replace_from(
secret, acl2, user_ids=['u1', 'u2', 'u3'], session=session)
acl3 = self.acl_repo.create_from(models.SecretACL(
secret.id, 'list'), session)
self.acl_repo.create_or_replace_from(
secret, acl3, user_ids=[], session=session)
acls = self.acl_repo.get_by_secret_id(secret.id, session)
self.assertEqual(3, len(acls))
id_map = self._map_id_to_acl(acls)
# replace users in existing acls
id_map[acl1.id].creator_only = True
self.acl_repo.create_or_replace_from(
secret, id_map[acl1.id], user_ids=['u5'], session=session)
self.acl_repo.create_or_replace_from(
secret, id_map[acl2.id], user_ids=['u1', 'u2', 'u3', 'u4'],
session=session)
self.acl_repo.create_or_replace_from(
secret, id_map[acl3.id], user_ids=['u1', 'u2', 'u4'],
session=session)
session.commit() # commit the changes made so far
acls = self.acl_repo.get_by_secret_id(secret.id, session)
id_map = self._map_id_to_acl(acls)
self.assertEqual(3, len(acls))
self.assertEqual(True, id_map[acl1.id].creator_only)
self.assertEqual(False, id_map[acl2.id].creator_only)
self.assertEqual(False, id_map[acl3.id].creator_only)
self._assert_acl_users(['u5'], acls, acl1.id)
self._assert_acl_users(['u1', 'u2', 'u3', 'u4'], acls, acl2.id)
self._assert_acl_users(['u1', 'u2', 'u4'], acls, acl3.id)
def test_delete_single_acl_and_count(self):
session = self.acl_repo.get_session()
secret = self._create_base_secret()
acl1 = self.acl_repo.create_from(models.SecretACL(secret.id, 'read',
None, ['u1', 'u2']),
session)
self.acl_repo.create_or_replace_from(secret, acl1)
acl2 = self.acl_repo.create_from(
models.SecretACL(secret.id, 'write'), session)
self.acl_repo.create_or_replace_from(
secret, acl2, user_ids=['u1', 'u2', 'u3'])
acl3 = self.acl_repo.create_from(models.SecretACL(
secret.id, 'list'), session)
self.acl_repo.create_or_replace_from(secret, acl3,
user_ids=['u1', 'u3'])
count = self.acl_repo.get_count(secret.id)
self.assertTrue(count == 3)
self.acl_repo.delete_entity_by_id(acl2.id, None)
session.commit()
self.assertEqual(2, len(secret.secret_acls))
deleted_acl = self.acl_repo.get(acl2.id, suppress_exception=True)
self.assertIsNone(deleted_acl)
acls = self.acl_repo.get_by_secret_id(secret.id)
self.assertEqual(2, len(acls))
count = self.acl_repo.get_count(secret.id)
self.assertTrue(count == 2)
def test_delete_acls_for_secret(self):
session = self.acl_repo.get_session()
secret = self._create_base_secret()
acl1 = self.acl_repo.create_from(models.SecretACL(
secret.id, 'read'), session)
self.acl_repo.create_or_replace_from(
secret, acl1, user_ids=['u1', 'u2'], session=session)
acl2 = self.acl_repo.create_from(models.SecretACL(
secret.id, 'write'), session)
self.acl_repo.create_or_replace_from(
secret, acl2, user_ids=['u1', 'u2', 'u3'], session=session)
self.acl_repo.delete_acls_for_secret(secret)
acls = self.acl_repo.get_by_secret_id(secret.id)
self.assertEqual(0, len(acls))
class WhenTestingContainerACLRepository(database_utils.RepositoryTestCase,
TestACLMixin):
def setUp(self):
super(WhenTestingContainerACLRepository, self).setUp()
self.acl_repo = repositories.get_container_acl_repository()
def _create_base_container(self):
# Setup the container and needed base relationship
container_repo = repositories.get_container_repository()
session = container_repo.get_session()
project = models.Project()
project.external_id = "keystone_project_id"
project.save(session=session)
container = models.Container()
container.project_id = project.id
container.save(session=session)
session.commit()
return container
def test_get_by_container_id(self):
session = self.acl_repo.get_session()
container = self._create_base_container()
acls = self.acl_repo.get_by_container_id(container.id, session)
self.assertEqual(0, len(acls))
acl1 = self.acl_repo.create_from(models.ContainerACL(container.id,
'read', True,
['u1', 'u2']),
session)
acls = self.acl_repo.get_by_container_id(container.id, session)
self.assertEqual(1, len(acls))
self.assertEqual(acl1.id, acls[0].id)
self.assertEqual('read', acls[0].operation)
self._assert_acl_users(['u1', 'u2'], acls, acl1.id)
def test_get_by_entity_id(self):
session = self.acl_repo.get_session()
container = self._create_base_container()
acl1 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'read', True, ['u1', 'u2']), session)
acl = self.acl_repo.get(acl1.id, session)
self.assertIsNotNone(acl)
self.assertEqual(acl1.id, acl.id)
self.assertEqual('read', acl.operation)
self._assert_acl_users(['u1', 'u2'], [acl], acl1.id)
self.acl_repo.delete_entity_by_id(acl1.id, session)
acl = self.acl_repo.get(acl1.id, session, suppress_exception=True)
self.assertIsNone(acl)
def test_should_raise_notfound_exception_get_by_entity_id(self):
self.assertRaises(exception.NotFound, self.acl_repo.get,
"invalid_id", suppress_exception=False)
def test_create_or_replace_from_for_new_acls(self):
"""Check create_or_replace_from and get count call.
It creates new acls with users and make sure that same users
are returned when acls are queries by secret id.
It uses get count to assert expected number of acls for that secret.
"""
session = self.acl_repo.get_session()
container = self._create_base_container()
acl1 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'read'), session)
self.acl_repo.create_or_replace_from(
container, acl1, user_ids=['u1', 'u2'], session=session)
acl2 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'write', True), session)
self.acl_repo.create_or_replace_from(
container, acl2, user_ids=['u1', 'u2', 'u3'], session=session)
acl3 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'list'), session)
self.acl_repo.create_or_replace_from(
container, acl3, user_ids=[], session=session)
acls = self.acl_repo.get_by_container_id(container.id, session)
self.assertEqual(3, len(acls))
id_map = self._map_id_to_acl(acls)
self.assertEqual(False, id_map[acl1.id].creator_only)
self.assertEqual(True, id_map[acl2.id].creator_only)
self.assertEqual('read', id_map[acl1.id].operation)
self.assertEqual('write', id_map[acl2.id].operation)
self.assertEqual('list', id_map[acl3.id].operation)
# order of input users should not matter
self._assert_acl_users(['u1', 'u2'], acls, acl1.id)
self._assert_acl_users(['u2', 'u1'], acls, acl1.id)
self._assert_acl_users(['u2', 'u1', 'u3'], acls, acl2.id)
count = self.acl_repo.get_count(container.id, session)
self.assertEqual(3, count)
self.assertTrue(len(acls) == count)
def test_create_or_replace_from_with_none_or_blank_users(self):
session = self.acl_repo.get_session()
container = self._create_base_container()
acl1 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'read'), session)
self.acl_repo.create_or_replace_from(
container, acl1, user_ids=None, session=session)
acl2 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'write'), session)
self.acl_repo.create_or_replace_from(
container, acl1, user_ids=[], session=session)
acls = self.acl_repo.get_by_container_id(container.id, session)
id_map = self._map_id_to_acl(acls)
self.assertIsNone(id_map[acl1.id].to_dict_fields().get('users'))
self.assertIsNone(id_map[acl2.id].to_dict_fields().get('users'))
def test_create_or_replace_from_for_existing_acls(self):
"""Check create_or_replace_from and get count call.
It modifies existing acls with users and make sure that updated users
and creator_only flag changes are returned when acls are queries by
secret id. It uses get count to assert expected number of acls for that
secret.
"""
session = self.acl_repo.get_session()
container = self._create_base_container()
acl1 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'read'), session)
self.acl_repo.create_or_replace_from(
container, acl1, user_ids=['u1', 'u2'], session=session)
acl2 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'write'), session)
self.acl_repo.create_or_replace_from(
container, acl2, user_ids=['u1', 'u2', 'u3'], session=session)
acl3 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'list'), session)
self.acl_repo.create_or_replace_from(
container, acl3, user_ids=[], session=session)
acls = self.acl_repo.get_by_container_id(container.id, session)
self.assertEqual(3, len(acls))
id_map = self._map_id_to_acl(acls)
# replace users in existing acls
id_map[acl1.id].creator_only = True
self.acl_repo.create_or_replace_from(
container, id_map[acl1.id], user_ids=['u5'], session=session)
self.acl_repo.create_or_replace_from(
container, id_map[acl2.id], user_ids=['u1', 'u2', 'u3', 'u4'],
session=session)
self.acl_repo.create_or_replace_from(
container, id_map[acl3.id], user_ids=['u1', 'u2', 'u4'],
session=session)
session.commit()
acls = self.acl_repo.get_by_container_id(container.id, session)
id_map = self._map_id_to_acl(acls)
self.assertEqual(3, len(acls))
self.assertEqual(True, id_map[acl1.id].creator_only)
self.assertEqual(False, id_map[acl2.id].creator_only)
self.assertEqual(False, id_map[acl3.id].creator_only)
self._assert_acl_users(['u5'], acls, acl1.id)
self._assert_acl_users(['u1', 'u2', 'u3', 'u4'], acls, acl2.id)
self._assert_acl_users(['u1', 'u2', 'u4'], acls, acl3.id)
def test_delete_single_acl_and_count(self):
session = self.acl_repo.get_session()
container = self._create_base_container()
acl1 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'read'), session)
self.acl_repo.create_or_replace_from(container, acl1,
user_ids=['u1', 'u2'])
acl2 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'write'), session)
self.acl_repo.create_or_replace_from(container, acl2,
user_ids=['u1', 'u2', 'u3'])
acl3 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'list'), session)
self.acl_repo.create_or_replace_from(container, acl3,
user_ids=['u1', 'u3'])
count = self.acl_repo.get_count(container.id)
self.assertTrue(count == 3)
self.acl_repo.delete_entity_by_id(acl2.id, None)
session.commit() # commit the changes made so far
self.assertEqual(2, len(container.container_acls))
deleted_acl = self.acl_repo.get(acl2.id, suppress_exception=True)
self.assertIsNone(deleted_acl)
acls = self.acl_repo.get_by_container_id(container.id)
self.assertEqual(2, len(acls))
count = self.acl_repo.get_count(container.id)
self.assertTrue(count == 2)
def test_delete_acls_for_secret(self):
session = self.acl_repo.get_session()
container = self._create_base_container()
acl1 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'read'), session)
self.acl_repo.create_or_replace_from(
container, acl1, user_ids=['u1', 'u2'], session=session)
acl2 = self.acl_repo.create_from(models.ContainerACL(
container.id, 'write'), session)
self.acl_repo.create_or_replace_from(
container, acl2, user_ids=['u1', 'u2', 'u3'], session=session)
self.acl_repo.delete_acls_for_container(container)
acls = self.acl_repo.get_by_container_id(container.id)
self.assertEqual(0, len(acls))

View File

@@ -11,6 +11,7 @@
# limitations under the License.
from barbican.common import exception
from barbican.model import models
from barbican.model import repositories
from barbican.tests import database_utils
@@ -30,3 +31,27 @@ class WhenTestingContainerRepository(database_utils.RepositoryTestCase):
"my keystone id",
session=session,
suppress_exception=False)
def test_get_container_by_id(self):
session = self.repo.get_session()
project = models.Project()
project.external_id = "my keystone id"
project.save(session=session)
container = models.Container()
container.project_id = project.id
container.save(session=session)
session.commit()
db_container = self.repo.get_container_by_id(container.id)
self.assertIsNotNone(db_container)
def test_should_raise_notfound_exception(self):
self.assertRaises(exception.NotFound, self.repo.get_container_by_id,
"invalid_id", suppress_exception=False)
def test_should_suppress_notfound_exception(self):
self.assertIsNone(self.repo.get_container_by_id(
"invalid_id", suppress_exception=True))

View File

@@ -78,6 +78,31 @@ class WhenTestingSecretRepository(database_utils.RepositoryTestCase):
self.assertEqual(limit, 10)
self.assertEqual(total, 1)
def test_get_secret_by_id(self):
session = self.repo.get_session()
secret = self.repo.create_from(models.Secret(), session=session)
project = models.Project()
project.external_id = "my keystone id"
project.save(session=session)
project_secret = models.ProjectSecret()
project_secret.secret_id = secret.id
project_secret.project_id = project.id
project_secret.save(session=session)
session.commit()
db_secret = self.repo.get_secret_by_id(secret.id)
self.assertIsNotNone(db_secret)
def test_should_raise_notfound_exception(self):
self.assertRaises(exception.NotFound, self.repo.get_secret_by_id,
"invalid_id", suppress_exception=False)
def test_should_suppress_notfound_exception(self):
self.assertIsNone(self.repo.get_secret_by_id("invalid_id",
suppress_exception=True))
@utils.parameterized_dataset(dataset_for_filter_tests)
def test_get_by_create_date_with_filter(
self, secret_1_dict, secret_2_dict, query_dict):