added a test, need to get it working now
This commit is contained in:
parent
a328b99178
commit
03b75a5e69
|
@ -0,0 +1,30 @@
|
|||
class DictKvs(dict):
|
||||
def set(self, key, value):
|
||||
return self[key] = value
|
||||
|
||||
|
||||
class KvsIdentity(object):
|
||||
def __init__(self, db=None):
|
||||
if db is None:
|
||||
db = DictKvs()
|
||||
self.db = db
|
||||
|
||||
# Public Interface
|
||||
def tenants_for_token(self, token_id):
|
||||
token = self.db.get('token-%s' % token_id)
|
||||
user = self.db.get('user-%s' % token['user'])
|
||||
o = []
|
||||
for tenant_id in user['tenants']:
|
||||
o.append(self.db.get('tenant-%s' % tenant_id))
|
||||
|
||||
return o
|
||||
|
||||
# Private CRUD for testing
|
||||
def _create_user(self, id, user):
|
||||
self.db.set('user-%s' % id, user)
|
||||
|
||||
def _create_tenant(self, id, tenant):
|
||||
self.db.set('tenant-%s' % id, tenant)
|
||||
|
||||
def _create_token(self, id, token):
|
||||
self.db.set('token-%s' % id, token)
|
|
@ -19,7 +19,6 @@ class Manager(object):
|
|||
def __init__(self):
|
||||
self.driver = utils.import_object(FLAGS.identity_driver)
|
||||
|
||||
|
||||
def authenticate(self, context, **kwargs):
|
||||
"""Passthru authentication to the identity driver.
|
||||
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
|
||||
|
||||
class Token(dict):
|
||||
def __init__(self, id=None, user=None, tenant=None, *args, **kw):
|
||||
super(Token, self).__init__(id=id, user=user, tenant=tenant, *args, **kw)
|
||||
|
||||
|
||||
class User(dict):
|
||||
def __init__(self, id=None, tenants=None, *args, **kw):
|
||||
if tenants is None:
|
||||
tenants = []
|
||||
super(User, self).__init__(id=id, tenants=[], *args, **kw)
|
||||
|
||||
|
||||
class Tenant(dict):
|
||||
def __init__(self, id=None, *args, **kw):
|
||||
super(Tenant, self).__init__(id=id, *args, **kw)
|
|
@ -0,0 +1,20 @@
|
|||
import os
|
||||
import unittest
|
||||
|
||||
|
||||
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
|
||||
VENDOR = os.path.join(ROOTDIR, 'vendor')
|
||||
|
||||
|
||||
class TestCase(unittest.TestCase):
|
||||
def assertDictEquals(self, expected, actual):
|
||||
for k in expected:
|
||||
self.assertTrue(k in actual,
|
||||
"Expected key %s not in %s." % (k, actual))
|
||||
self.assertEquals(expected[k], actual[k],
|
||||
"Expected value for %s to be '%s', not '%s'."
|
||||
% (k, expected[k], actual[k]))
|
||||
for k in actual:
|
||||
self.assertTrue(k in expected,
|
||||
"Unexpected key %s in %s." % (k, actual))
|
||||
|
|
@ -18,6 +18,7 @@
|
|||
# under the License.
|
||||
|
||||
import logging
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
|
@ -40,3 +41,41 @@ def import_object(import_str):
|
|||
except ImportError:
|
||||
cls = import_class(import_str)
|
||||
return cls()
|
||||
|
||||
# From python 2.7
|
||||
def check_output(*popenargs, **kwargs):
|
||||
r"""Run command with arguments and return its output as a byte string.
|
||||
|
||||
If the exit code was non-zero it raises a CalledProcessError. The
|
||||
CalledProcessError object will have the return code in the returncode
|
||||
attribute and output in the output attribute.
|
||||
|
||||
The arguments are the same as for the Popen constructor. Example:
|
||||
|
||||
>>> check_output(["ls", "-l", "/dev/null"])
|
||||
'crw-rw-rw- 1 root root 1, 3 Oct 18 2007 /dev/null\n'
|
||||
|
||||
The stdout argument is not allowed as it is used internally.
|
||||
To capture standard error in the result, use stderr=STDOUT.
|
||||
|
||||
>>> check_output(["/bin/sh", "-c",
|
||||
... "ls -l non_existent_file ; exit 0"],
|
||||
... stderr=STDOUT)
|
||||
'ls: non_existent_file: No such file or directory\n'
|
||||
"""
|
||||
if 'stdout' in kwargs:
|
||||
raise ValueError('stdout argument not allowed, it will be overridden.')
|
||||
logging.debug(' '.join(popenargs[0]))
|
||||
process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
|
||||
output, unused_err = process.communicate()
|
||||
retcode = process.poll()
|
||||
if retcode:
|
||||
cmd = kwargs.get("args")
|
||||
if cmd is None:
|
||||
cmd = popenargs[0]
|
||||
raise subprocess.CalledProcessError(retcode, cmd)
|
||||
return output
|
||||
|
||||
|
||||
def git(*args):
|
||||
return check_output(['git'] + list(args))
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
import copy
|
||||
import os
|
||||
import json
|
||||
|
||||
from keystonelight import models
|
||||
from keystonelight import test
|
||||
from keystonelight import utils
|
||||
|
||||
|
||||
IDENTITY_API_REPO = 'git://github.com/openstack/identity-api.git'
|
||||
|
||||
|
||||
SAMPLE_DIR = 'openstack-identity-api/src/docbkx/samples'
|
||||
|
||||
|
||||
cd = os.chdir
|
||||
|
||||
|
||||
def checkout_samples(rev):
|
||||
"""Make sure we have a checkout of the API docs."""
|
||||
revdir = os.path.join(test.VENDOR, 'identity-api-%s' % rev)
|
||||
|
||||
if not os.path.exists(revdir):
|
||||
utils.git('clone', IDENTITY_API_REPO, revdir)
|
||||
|
||||
cd(revdir)
|
||||
utils.git('pull')
|
||||
utils.git('checkout', rev)
|
||||
return revdir
|
||||
|
||||
|
||||
class CompatTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(CompatTestCase, self).setUp()
|
||||
|
||||
self.auth_creds = json.load(open(
|
||||
os.path.join(self.sampledir, 'auth_credentials.json')))
|
||||
self.auth_creds_notenant = copy.deepcopy(self.auth_creds)
|
||||
self.auth_creds_notenant['auth'].pop('tenantName', None)
|
||||
|
||||
self.tenants_for_token = json.load(open(
|
||||
os.path.join(self.sampledir, 'tenants.json')))
|
||||
|
||||
# For the tenants for token call
|
||||
self.user_foo = self.backend._create_user(
|
||||
'foo',
|
||||
models.User(id='foo', tenants=['1234', '3456']))
|
||||
self.tenant_1234 = self.backend._create_tenant(
|
||||
'1234',
|
||||
models.Tenant(id='1234',
|
||||
name='ACME Corp',
|
||||
description='A description...',
|
||||
enabled=True))
|
||||
self.tenant_3456 = self.backend._create_tenant(
|
||||
'3456',
|
||||
models.Tenant(id='3456',
|
||||
name='Iron Works',
|
||||
description='A description...',
|
||||
enabled=True))
|
||||
|
||||
self.token_foo_unscoped = self.backend._create_token(
|
||||
'foo_unscoped',
|
||||
models.Token(id='foo_unscoped',
|
||||
user='foo'))
|
||||
self.token_foo_scoped = self.backend._create_token(
|
||||
'foo_scoped',
|
||||
models.Token(id='foo_unscoped',
|
||||
user='foo',
|
||||
tenant='1234'))
|
||||
|
||||
|
||||
class HeadCompatTestCase(CompatTestCase):
|
||||
def setUp(self):
|
||||
revdir = checkout_samples('HEAD')
|
||||
self.sampledir = os.path.join(revdir, SAMPLE_DIR)
|
||||
super(HeadCompatTestCase, self).setUp()
|
||||
|
||||
def test_tenants_for_token_unscoped(self):
|
||||
# get_tenants_for_token
|
||||
client = self.api.client(token=self.token_foo_unscoped['id'])
|
||||
resp = client.get('/v2.0/tenants')
|
||||
data = json.loads(resp.body)
|
||||
self.assertDictEquals(self.tenants_for_token, data)
|
||||
|
||||
def test_tenants_for_token_scoped(self):
|
||||
# get_tenants_for_token
|
||||
client = self.api.client(token=self.token_foo_scoped['id'])
|
||||
resp = client.get('/v2.0/tenants')
|
||||
data = json.loads(resp.body)
|
||||
self.assertDictEquals(self.tenants_for_token, data)
|
Loading…
Reference in New Issue