Support the hints mechanism in list_credentials()

This fix implements the hints mechanism, considering filters as hints,
so the particular backend implementation has an option to process or
ignore it. Since the EC2 credentials code calls list_credentials() with
user_id as a param, a separate method list_credentials_for_user has been
introduced to provide the compatibility while support the standard hints
mechanism in list_credentials().

This fix doesn't plug hints into the controller, it prepares the way for
implementation of the bp filter-credentials-by-user to support filtering
credentials by user in a follow on patch.

Closes-Bug: #1353511

Change-Id: Ibcf59aa45a8fc7e5cc66fd4edb91ae8fdc641d93
This commit is contained in:
Alexey Miroshkin 2014-08-09 18:57:25 +04:00
parent 6b56182468
commit 8aa322236a
5 changed files with 87 additions and 10 deletions

View File

@ -182,8 +182,8 @@ class Ec2ControllerCommon(object):
"""
self.identity_api.get_user(user_id)
credential_refs = self.credential_api.list_credentials(
user_id=user_id)
credential_refs = self.credential_api.list_credentials_for_user(
user_id)
return {'credentials':
[self._convert_v3_to_ec2_credential(credential)
for credential in credential_refs]}

View File

@ -41,12 +41,18 @@ class Credential(credential.Driver):
session.add(ref)
return ref.to_dict()
def list_credentials(self, **filters):
@sql.truncated
def list_credentials(self, hints):
session = sql.get_session()
credentials = session.query(CredentialModel)
credentials = sql.filter_limit_query(CredentialModel,
credentials, hints)
return [s.to_dict() for s in credentials]
def list_credentials_for_user(self, user_id):
session = sql.get_session()
query = session.query(CredentialModel)
if 'user_id' in filters:
query = query.filter_by(user_id=filters.get('user_id'))
refs = query.all()
refs = query.filter_by(user_id=user_id).all()
return [ref.to_dict() for ref in refs]
def _get_credential(self, session, credential_id):

View File

@ -19,6 +19,7 @@ import abc
import six
from keystone.common import dependency
from keystone.common import driver_hints
from keystone.common import manager
from keystone import config
from keystone import exception
@ -42,6 +43,10 @@ class Manager(manager.Manager):
def __init__(self):
super(Manager, self).__init__(CONF.credential.driver)
@manager.response_truncated
def list_credentials(self, hints=None):
return self.driver.list_credentials(hints or driver_hints.Hints())
@six.add_metaclass(abc.ABCMeta)
class Driver(object):
@ -57,8 +62,23 @@ class Driver(object):
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def list_credentials(self, **filters):
"""List all credentials in the system applying filters.
def list_credentials(self, hints):
"""List all credentials.
:param hints: contains the list of filters yet to be satisfied.
Any filters satisfied here will be removed so that
the caller will know if any filters remain.
:returns: a list of credential_refs or an empty list.
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def list_credentials_for_user(self, user_id):
"""List credentials for a user.
:param user_id: ID of a user to filter credentials by.
:returns: a list of credential_refs or an empty list.

View File

@ -23,6 +23,7 @@ import sqlalchemy
from sqlalchemy import exc
from testtools import matchers
from keystone.common import driver_hints
from keystone.common import sql
from keystone import config
from keystone import exception
@ -555,3 +556,52 @@ class SqlModuleInitialization(tests.TestCase):
sql.initialize()
set_defaults.assert_called_with(CONF,
connection='sqlite:///keystone.db')
class SqlCredential(SqlTests):
def _create_credential_with_user_id(self, user_id=uuid.uuid4().hex):
credential_id = uuid.uuid4().hex
new_credential = {
'id': credential_id,
'user_id': user_id,
'project_id': uuid.uuid4().hex,
'blob': uuid.uuid4().hex,
'type': uuid.uuid4().hex,
'extra': uuid.uuid4().hex
}
self.credential_api.create_credential(credential_id, new_credential)
return new_credential
def _validateCredentialList(self, retrieved_credentials,
expected_credentials):
self.assertEqual(len(retrieved_credentials), len(expected_credentials))
retrived_ids = [c['id'] for c in retrieved_credentials]
for cred in expected_credentials:
self.assertIn(cred['id'], retrived_ids)
def setUp(self):
super(SqlCredential, self).setUp()
self.credentials = []
for _ in range(3):
self.credentials.append(
self._create_credential_with_user_id())
self.user_credentials = []
for _ in range(3):
cred = self._create_credential_with_user_id(self.user_foo['id'])
self.user_credentials.append(cred)
self.credentials.append(cred)
def test_list_credentials(self):
credentials = self.credential_api.list_credentials()
self._validateCredentialList(credentials, self.credentials)
# test filtering using hints
hints = driver_hints.Hints()
hints.add_filter('user_id', self.user_foo['id'])
credentials = self.credential_api.list_credentials(hints)
self._validateCredentialList(credentials, self.user_credentials)
def test_list_credentials_for_user(self):
credentials = self.credential_api.list_credentials_for_user(
self.user_foo['id'])
self._validateCredentialList(credentials, self.user_credentials)

View File

@ -372,8 +372,9 @@ class TestCredentialEc2(CredentialBaseTestCase):
"""Test ec2 credential deletion."""
ec2_cred = self._get_ec2_cred()
uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
cred_from_credential_api = self.credential_api.list_credentials(
user_id=self.user_id)
cred_from_credential_api = (
self.credential_api
.list_credentials_for_user(self.user_id))
self.assertEqual(1, len(cred_from_credential_api))
self.delete(uri)
self.assertRaises(exception.CredentialNotFound,