Adds support for authenticating via ec2 signatures

* modifies credentials to store tenant id
* adds credentials backend api
* cleans up a bunch of whitespace errors
* adds new /ec2token endpoint for new logic
* fixes unit/base.py test case
* adds unittests for ec2_authn
* includes compatibility for clients that ignore ports

Change-Id: I1bd1e549cc74cbb708059b07b2928bf09c8ba1ca
This commit is contained in:
Vishvananda Ishaya 2011-08-02 12:40:22 -07:00
parent bd34551cf3
commit dea6aff927
13 changed files with 617 additions and 52 deletions

View File

@ -18,7 +18,7 @@ log_file = keystone.log
backends = keystone.backends.sqlalchemy
#For LDAP support, add: ,keystone.backends.ldap
# Dictionary Maps every service to a header.Missing services would get header
# Dictionary Maps every service to a header.Missing services would get header
# X_(SERVICE_NAME) Key => Service Name, Value => Header Name
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
@ -49,8 +49,9 @@ keystone-service-admin-role = KeystoneServiceAdmin
# server. Any valid SQLAlchemy connection string is fine.
# See: http://bit.ly/ideIpI
sql_connection = sqlite:///keystone.db
backend_entities = ['UserRoleAssociation', 'Endpoints',
'Role', 'Tenant', 'User', 'Credentials', 'EndpointTemplates', 'Token','Service']
backend_entities = ['UserRoleAssociation', 'Endpoints', 'Role', 'Tenant',
'User', 'Credentials', 'EndpointTemplates', 'Token',
'Service']
# Period in seconds after which SQLAlchemy should reestablish its connection
# to the database.

View File

@ -67,6 +67,9 @@ class BaseUserAPI(object):
def get_by_tenant(self, id, tenant_id):
raise NotImplementedError
def get_by_access(self, access):
raise NotImplementedError
def get_group_by_tenant(self, id):
raise NotImplementedError
@ -280,6 +283,17 @@ class BaseEndpointTemplateAPI(object):
class BaseServiceAPI:
def get_all(self):
raise NotImplementedError
def get_page(self, marker, limit):
raise NotImplementedError
def get_page_markers(self, marker, limit):
raise NotImplementedError
class BaseCredentialsAPI(object):
def create(self, values):
raise NotImplementedError
@ -289,11 +303,13 @@ class BaseServiceAPI:
def get_all(self):
raise NotImplementedError
def get_page(self, marker, limit):
def get_by_access(self, access):
raise NotImplementedError
def get_page_markers(self, marker, limit):
def delete(self, id):
raise NotImplementedError
#API
#TODO(Yogi) Refactor all API to separate classes specific to models.
ENDPOINT_TEMPLATE = BaseEndpointTemplateAPI()
@ -304,6 +320,7 @@ TENANT = BaseTenantAPI()
TOKEN = BaseTokenAPI()
USER = BaseUserAPI()
SERVICE = BaseServiceAPI()
CREDENTIALS = BaseCredentialsAPI()
# Function to dynamically set module references.
@ -332,3 +349,6 @@ def set_value(variable_name, value):
elif variable_name == 'service':
global SERVICE
SERVICE = value
elif variable_name == 'credentials':
global CREDENTIALS
CREDENTIALS = value

View File

@ -0,0 +1,51 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.backends.sqlalchemy import get_session, models
from keystone.backends.api import BaseCredentialsAPI
class CredentialsAPI(BaseCredentialsAPI):
def create(self, values):
credentials_ref = models.Credentials()
credentials_ref.update(values)
credentials_ref.save()
return credentials_ref
def get(self, id, session=None):
if not session:
session = get_session()
result = session.query(models.Group).filter_by(id=id).first()
return result
def get_by_access(self, access, session=None):
if not session:
session = get_session()
result = session.query(models.Credentials).\
filter_by(type="EC2", key=access).first()
return result
def delete(self, id, session=None):
if not session:
session = get_session()
with session.begin():
group_ref = self.get(id, session)
session.delete(group_ref)
def get():
return CredentialsAPI()

View File

@ -136,9 +136,9 @@ class UserAPI(BaseUserAPI):
return user
# Find user through grants to this tenant
user_tenant = session.query(models.UserRoleAssociation).filter_by(\
tenant_id=tenant_id, user_id=id).first()
if user_tenant:
result = session.query(models.UserRoleAssociation).\
filter_by(tenant_id=tenant_id, user_id=id).first()
if result:
return self.get(id, session)
else:
return None

View File

@ -134,12 +134,15 @@ class User(Base, KeystoneBase):
enabled = Column(Integer)
tenant_id = Column(String(255), ForeignKey('tenants.id'))
roles = relationship(UserRoleAssociation, cascade="all")
credentials = relationship('Credentials', backref='user', cascade="all")
class Credentials(Base, KeystoneBase):
__tablename__ = 'credentials'
user_id = Column(String(255), ForeignKey('users.id'), primary_key=True)
__api__ = 'credentials'
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(String(255), ForeignKey('users.id'))
tenant_id = Column(String(255), ForeignKey('tenants.id'), nullable=True)
type = Column(String(20)) # ('Password','APIKey','EC2')
key = Column(String(255))
secret = Column(String(255))

View File

@ -1,6 +1,6 @@
from keystone import utils
from keystone.common import wsgi
from keystone.logic.types.auth import PasswordCredentials
from keystone.logic.types import auth
import keystone.config as config
@ -16,9 +16,18 @@ class AuthController(wsgi.Controller):
self.request = req
creds = utils.get_normalized_request_content(
PasswordCredentials, req)
auth.PasswordCredentials, req)
return utils.send_result(200, req, config.SERVICE.authenticate(creds))
@utils.wrap_error
def authenticate_ec2(self, req):
self.request = req
creds = utils.get_normalized_request_content(
auth.Ec2Credentials, req)
return utils.send_result(200, req,
config.SERVICE.authenticate_ec2(creds))
@utils.wrap_error
def validate_token(self, req, token_id):
belongs_to = req.GET["belongsTo"] if "belongsTo" in req.GET else None

View File

@ -17,6 +17,7 @@ from datetime import datetime, timedelta
import uuid
from keystone.logic.types import auth, atom
from keystone.logic.signer import Signer
import keystone.backends as backends
import keystone.backends.api as api
import keystone.backends.models as models
@ -42,40 +43,71 @@ class IdentityService(object):
if not isinstance(credentials, auth.PasswordCredentials):
raise fault.BadRequestFault("Expecting Password Credentials!")
if not credentials.tenant_id:
duser = api.USER.get(credentials.username)
def validate(duser):
hashed_pass = utils.get_hashed_password(credentials.password)
return duser.password == hashed_pass
return self._authenticate(validate,
credentials.username,
credentials.tenant_id)
def authenticate_ec2(self, credentials):
# Check credentials
if not isinstance(credentials, auth.Ec2Credentials):
raise fault.BadRequestFault("Expecting Ec2 Credentials!")
creds = api.CREDENTIALS.get_by_access(credentials.access)
if not creds:
raise fault.UnauthorizedFault("No credentials found for %s"
% credentials.access)
def validate(duser):
signer = Signer(creds.secret)
signature = signer.generate(credentials)
if signature == credentials.signature:
return True
# NOTE(vish): Some libraries don't use the port when signing
# requests, so try again without port.
if ':' in credentials.host:
hostname, _sep, port = credentials.partition(':')
credentials.host = hostname
signature = signer.generate(credentials)
return signature == credentials.signature
return False
return self._authenticate(validate, creds.user_id, creds.tenant_id)
def _authenticate(self, validate, user_id, tenant_id=None):
if not tenant_id:
duser = api.USER.get(user_id)
if duser == None:
raise fault.UnauthorizedFault("Unauthorized")
else:
duser = api.USER.get_by_tenant(credentials.username,
credentials.tenant_id)
duser = api.USER.get_by_tenant(user_id, tenant_id)
if duser == None:
raise fault.UnauthorizedFault("Unauthorized on this tenant")
if not duser.enabled:
raise fault.UserDisabledFault("Your account has been disabled")
if duser.password != utils.get_hashed_password(credentials.password):
raise fault.UnauthorizedFault("Unauthorized")
try:
if not validate(duser):
raise fault.UnauthorizedFault("Unauthorized")
except Exception as exc:
raise fault.UnauthorizedFault("Unable to validate: %s" % exc)
#
# Look for an existing token, or create one,
# TODO: Handle tenant/token search
#
if not credentials.tenant_id:
dtoken = api.TOKEN.get_for_user(duser.id)
else:
dtoken = api.TOKEN.get_for_user_by_tenant(duser.id,
credentials.tenant_id)
tenant_id = credentials.tenant_id or duser.tenant_id
user_id = duser.id
tenant_id = tenant_id or duser.tenant_id
dtoken = api.TOKEN.get_for_user_by_tenant(user_id, tenant_id)
if not dtoken or dtoken.expires < datetime.now():
# Create new token
dtoken = models.Token()
dtoken.id = str(uuid.uuid4())
dtoken.user_id = duser.id
if credentials.tenant_id:
dtoken.tenant_id = credentials.tenant_id
dtoken.user_id = user_id
dtoken.tenant_id = tenant_id
dtoken.expires = datetime.now() + timedelta(days=1)
api.TOKEN.create(dtoken)
#if tenant_id is passed in the call that tenant_id is passed else

136
keystone/logic/signer.py Normal file
View File

@ -0,0 +1,136 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# PORTIONS OF THIS FILE ARE FROM:
# http://code.google.com/p/boto
# Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
"""
Utility class for parsing signed AMI manifests.
"""
import base64
import hashlib
import hmac
import logging
import urllib
LOG = logging.getLogger('keystone.signer')
class Signer(object):
"""Hacked up code from boto/connection.py"""
def __init__(self, secret_key):
secret_key = secret_key.encode()
self.hmac = hmac.new(secret_key, digestmod=hashlib.sha1)
if hashlib.sha256:
self.hmac_256 = hmac.new(secret_key, digestmod=hashlib.sha256)
def generate(self, credentials):
"""Generate auth string according to what SignatureVersion is given."""
if credentials.params['SignatureVersion'] == '0':
return self._calc_signature_0(credentials.params)
if credentials.params['SignatureVersion'] == '1':
return self._calc_signature_1(credentials.params)
if credentials.params['SignatureVersion'] == '2':
return self._calc_signature_2(credentials.params,
credentials.verb,
credentials.host,
credentials.path)
raise Exception('Unknown Signature Version: %s' %
credentials.params['SignatureVersion'])
@staticmethod
def _get_utf8_value(value):
"""Get the UTF8-encoded version of a value."""
if not isinstance(value, str) and not isinstance(value, unicode):
value = str(value)
if isinstance(value, unicode):
return value.encode('utf-8')
else:
return value
def _calc_signature_0(self, params):
"""Generate AWS signature version 0 string."""
s = params['Action'] + params['Timestamp']
self.hmac.update(s)
return base64.b64encode(self.hmac.digest())
def _calc_signature_1(self, params):
"""Generate AWS signature version 1 string."""
keys = params.keys()
keys.sort(cmp=lambda x, y: cmp(x.lower(), y.lower()))
for key in keys:
self.hmac.update(key)
val = self._get_utf8_value(params[key])
self.hmac.update(val)
return base64.b64encode(self.hmac.digest())
def _calc_signature_2(self, params, verb, server_string, path):
"""Generate AWS signature version 2 string."""
LOG.debug('using _calc_signature_2')
string_to_sign = '%s\n%s\n%s\n' % (verb, server_string, path)
if self.hmac_256:
current_hmac = self.hmac_256
params['SignatureMethod'] = 'HmacSHA256'
else:
current_hmac = self.hmac
params['SignatureMethod'] = 'HmacSHA1'
keys = params.keys()
keys.sort()
pairs = []
for key in keys:
val = self._get_utf8_value(params[key])
val = urllib.quote(val, safe='-_~')
pairs.append(urllib.quote(key, safe='') + '=' + val)
qs = '&'.join(pairs)
LOG.debug('query string: %s', qs)
string_to_sign += qs
LOG.debug('string_to_sign: %s', string_to_sign)
current_hmac.update(string_to_sign)
b64 = base64.b64encode(current_hmac.digest())
LOG.debug('len(b64)=%d', len(b64))
LOG.debug('base64 encoded digest: %s', b64)
return b64
if __name__ == '__main__':
print Signer('foo').generate({'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2'},
'get', 'server', '/foo')

View File

@ -79,6 +79,107 @@ class PasswordCredentials(object):
str(e))
class Ec2Credentials(object):
"""Credentials based on username, access_key, signature and data.
@type access: str
@param access: Access key for user in the form of access:project.
@type signature: str
@param signature: Signature of the request.
@type params: dictionary of str
@param params: Web paramaters used for the signature.
@type verb: str
@param verb: Web request verb ('GET' or 'POST').
@type host: str
@param host: Web request host string (including port).
@type path: str
@param path: Web request path.
"""
def __init__(self, access, signature, verb,
host, path, params):
self.access = access
self.signature = signature
self.verb = verb
self.host = host
self.path = path
self.params = params
@staticmethod
def from_xml(xml_str):
try:
dom = etree.Element("root")
dom.append(etree.fromstring(xml_str))
root = dom.find("{http://docs.openstack.org/identity/api/v2.0}"
"ec2Credentials")
if root == None:
raise fault.BadRequestFault("Expecting ec2Credentials")
access = root.get("access")
if access == None:
raise fault.BadRequestFault("Expecting an access key")
signature = root.get("signature")
if signature == None:
raise fault.BadRequestFault("Expecting a signature")
verb = root.get("verb")
if verb == None:
raise fault.BadRequestFault("Expecting a verb")
host = root.get("host")
if host == None:
raise fault.BadRequestFault("Expecting a host")
path = root.get("path")
if path == None:
raise fault.BadRequestFault("Expecting a path")
# TODO(vish): parse xml params
params = {}
return Ec2Credentials(access, signature, verb, host, path, params)
except etree.LxmlError as e:
raise fault.BadRequestFault("Cannot parse password credentials",
str(e))
@staticmethod
def from_json(json_str):
try:
obj = json.loads(json_str)
if not "ec2Credentials" in obj:
raise fault.BadRequestFault("Expecting ec2Credentials")
cred = obj["ec2Credentials"]
# Check that fields are valid
invalid = [key for key in cred if key not in\
['username', 'access', 'signature', 'params',
'verb', 'host', 'path']]
if invalid != []:
raise fault.BadRequestFault("Invalid attribute(s): %s"
% invalid)
if not "access" in cred:
raise fault.BadRequestFault("Expecting an access key")
access = cred["access"]
if not "signature" in cred:
raise fault.BadRequestFault("Expecting a signature")
signature = cred["signature"]
if not "verb" in cred:
raise fault.BadRequestFault("Expecting a verb")
verb = cred["verb"]
if not "host" in cred:
raise fault.BadRequestFault("Expecting a host")
host = cred["host"]
if not "path" in cred:
raise fault.BadRequestFault("Expecting a path")
path = cred["path"]
if not "params" in cred:
raise fault.BadRequestFault("Expecting params")
params = cred["params"]
return Ec2Credentials(access, signature, verb, host, path, params)
except (ValueError, TypeError) as e:
raise fault.BadRequestFault("Cannot parse password credentials",
str(e))
class Token(object):
"""An auth token."""

View File

@ -94,9 +94,10 @@ class AuthProtocol(object):
else:
# If the user isn't authenticated, we reject the request and
# return 401 indicating we need Basic Auth credentials.
return HTTPUnauthorized("Authentication required",
ret = HTTPUnauthorized("Authentication required",
[('WWW-Authenticate',
'Basic realm="Use guest/guest"')])(env, start_response)
'Basic realm="Use guest/guest"')])
return ret(env, start_response)
else:
# Claims were provided - validate them
import base64
@ -107,9 +108,10 @@ class AuthProtocol(object):
#Claims were rejected
if not self.delay_auth_decision:
# Reject request (or ask for valid claims)
return HTTPUnauthorized("Authentication required",
[('WWW-Authenticate',
'Basic realm="Use guest/guest"')])(env, start_response)
ret = HTTPUnauthorized("Authentication required",
[('WWW-Authenticate',
'Basic realm="Use guest/guest"')])
return ret(env, start_response)
else:
# Claims are valid, forward request
_decorate_request_headers("X_IDENTITY_STATUS", "Invalid",

View File

@ -22,6 +22,9 @@ class ServiceApi(wsgi.Router):
mapper.connect("/tokens", controller=auth_controller,
action="authenticate",
conditions=dict(method=["POST"]))
mapper.connect("/v2.0/ec2tokens", controller=auth_controller,
action="authenticate_ec2",
conditions=dict(method=["POST"]))
# Tenant Operations
tenant_controller = TenantController(options)

View File

@ -21,6 +21,10 @@ import functools
import httplib
import logging
import pprint
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
'..', '..', '..', '..', '..', 'keystone')))
import unittest
from lxml import etree, objectify
@ -45,14 +49,6 @@ class ServiceAPITest(unittest.TestCase):
"""
api_class = server.ServiceApi
"""
Dict of configuration options to pass to the API controller
"""
# using an in-memory db
options = {'sql_connection': 'sqlite:///',
'verbose': False,
'debug': False}
"""
Set of dicts of tenant attributes we start each test case with
"""
@ -88,10 +84,29 @@ class ServiceAPITest(unittest.TestCase):
"""
api_version = '2.0'
"""
Dict of configuration options to pass to the API controller
"""
options = {
'backends': "keystone.backends.sqlalchemy",
'keystone.backends.sqlalchemy': {
'sql_connection': 'sqlite://', # in-memory db
'verbose': False,
'debug': False,
'backend_entities':
"['UserRoleAssociation', 'Endpoints', 'Role', 'Tenant', "
"'Tenant', 'User', 'Credentials', 'EndpointTemplates', "
"'Token', 'Service']",
},
'keystone-admin-role': 'Admin',
'keystone-service-admin-role': 'KeystoneServiceAdmin',
}
def setUp(self):
self.api = self.api_class(self.options)
self.expires = datetime.datetime.utcnow()
dt = datetime
self.expires = dt.datetime.utcnow() + dt.timedelta(days=1)
self.clear_all_data()
# Create all our base tenants
@ -101,10 +116,11 @@ class ServiceAPITest(unittest.TestCase):
# Create the user we will authenticate with
self.auth_user = self.fixture_create_user(**self.auth_user_attrs)
self.auth_token = self.fixture_create_token(
id=self.auth_token_id,
user_id=self.auth_user['id'],
tenant_id=self.auth_user['tenant_id'],
expires=self.expires,
token_id=self.auth_token_id)
)
self.add_verify_status_helpers()
@ -119,8 +135,19 @@ class ServiceAPITest(unittest.TestCase):
"""
db.unregister_models()
logger.debug("Cleared all data from database")
#TODO: You can't register models without passing in options
db.register_models(options=None)
opts = self.options
db.register_models(options=opts['keystone.backends.sqlalchemy'])
def fixture_create_credentials(self, **kwargs):
"""
Creates a tenant fixture.
:params **kwargs: Attributes of the tenant to create
"""
values = kwargs.copy()
credentials = db_api.CREDENTIALS.create(values)
logger.debug("Created credentials fixture %s", credentials['id'])
return credentials
def fixture_create_tenant(self, **kwargs):
"""
@ -129,7 +156,7 @@ class ServiceAPITest(unittest.TestCase):
:params **kwargs: Attributes of the tenant to create
"""
values = kwargs.copy()
tenant = db_api.tenant.create(values)
tenant = db_api.TENANT.create(values)
logger.debug("Created tenant fixture %s", values['id'])
return tenant
@ -143,11 +170,11 @@ class ServiceAPITest(unittest.TestCase):
values = kwargs.copy()
tenant_id = values.get('tenant_id')
if tenant_id:
if not db_api.tenant.get(tenant_id):
db_api.tenant.create({'id': tenant_id,
if not db_api.TENANT.get(tenant_id):
db_api.TENANT.create({'id': tenant_id,
'enabled': True,
'desc': tenant_id})
user = db_api.user.create(values)
user = db_api.USER.create(values)
logger.debug("Created user fixture %s", values['id'])
return user
@ -158,8 +185,8 @@ class ServiceAPITest(unittest.TestCase):
:params **kwargs: Attributes of the token to create
"""
values = kwargs.copy()
token = db_api.token.create(values)
logger.debug("Created token fixture %s", values['token_id'])
token = db_api.TOKEN.create(values)
logger.debug("Created token fixture %s", values['id'])
return token
def get_request(self, method, url, headers=None):

View File

@ -0,0 +1,180 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import unittest
import base
from keystone.test.unit.decorators import jsonify
from keystone.logic import signer
from keystone.logic.types import auth
LOGGER = logging.getLogger('test.unit.test_ec2_authn')
class EC2AuthnMethods(base.ServiceAPITest):
@jsonify
def test_authn_ec2_success_json(self):
"""
Test that good ec2 credentials returns a 200 OK
"""
access = "xpd285.access"
secret = "345fgi.secret"
kwargs = {
"user_id": self.auth_user['id'],
"tenant_id": self.auth_user['tenant_id'],
"type": "EC2",
"key": access,
"secret": secret,
}
self.fixture_create_credentials(**kwargs)
url = "/ec2tokens"
req = self.get_request('POST', url)
params = {
"SignatureVersion": "2",
"one_param": "5",
"two_params": "happy",
}
credentials = {
"access": access,
"verb": "GET",
"params": params,
"host": "some.host.com:8773",
"path": "services/Cloud",
"signature": None,
}
sign = signer.Signer(secret)
obj_creds = auth.Ec2Credentials(**credentials)
credentials['signature'] = sign.generate(obj_creds)
body = {
"ec2Credentials": credentials
}
req.body = json.dumps(body)
self.get_response()
expected = {
u'auth': {
u'serviceCatalog': {},
u'token': {
u'expires': self.expires.strftime("%Y-%m-%dT%H:%M:%S.%f"),
u'id': self.auth_token_id
}
}
}
self.assert_dict_equal(expected, json.loads(self.res.body))
self.status_ok()
@jsonify
def test_authn_ec2_success_json_bad_user(self):
"""
Test that bad credentials returns a 401
"""
access = "xpd285.access"
secret = "345fgi.secret"
kwargs = {
"user_id": 'bad',
"tenant_id": self.auth_user['tenant_id'],
"type": "EC2",
"key": access,
"secret": secret,
}
self.fixture_create_credentials(**kwargs)
url = "/ec2tokens"
req = self.get_request('POST', url)
params = {
"SignatureVersion": "2",
"one_param": "5",
"two_params": "happy",
}
credentials = {
"access": access,
"verb": "GET",
"params": params,
"host": "some.host.com:8773",
"path": "services/Cloud",
"signature": None,
}
sign = signer.Signer(secret)
obj_creds = auth.Ec2Credentials(**credentials)
credentials['signature'] = sign.generate(obj_creds)
body = {
"ec2Credentials": credentials
}
req.body = json.dumps(body)
self.get_response()
expected = {
u'unauthorized': {
u'code': u'401',
u'message': u'Unauthorized on this tenant',
}
}
self.assert_dict_equal(expected, json.loads(self.res.body))
self.assertEqual(self.res.status_int, 401)
@jsonify
def test_authn_ec2_success_json_bad_tenant(self):
"""
Test that bad credentials returns a 401
"""
access = "xpd285.access"
secret = "345fgi.secret"
kwargs = {
"user_id": self.auth_user['id'],
"tenant_id": 'bad',
"type": "EC2",
"key": access,
"secret": secret,
}
self.fixture_create_credentials(**kwargs)
url = "/ec2tokens"
req = self.get_request('POST', url)
params = {
"SignatureVersion": "2",
"one_param": "5",
"two_params": "happy",
}
credentials = {
"access": access,
"verb": "GET",
"params": params,
"host": "some.host.com:8773",
"path": "services/Cloud",
"signature": None,
}
sign = signer.Signer(secret)
obj_creds = auth.Ec2Credentials(**credentials)
credentials['signature'] = sign.generate(obj_creds)
body = {
"ec2Credentials": credentials
}
req.body = json.dumps(body)
self.get_response()
expected = {
u'unauthorized': {
u'code': u'401',
u'message': u'Unauthorized on this tenant',
}
}
self.assert_dict_equal(expected, json.loads(self.res.body))
self.assertEqual(self.res.status_int, 401)
if __name__ == '__main__':
unittest.main()