Update auth_token middleware to support creds.

Updates to the auth_token middleware to support admin_user and
admin_password in addition to the existing admin_token. If an
admin_token isn't specified then a call to obtain the admin_token
is made. If an admin token expires the username and password can
also be used to obtain a fresh token.

Also, added a test for case for middleware where token isn't
specified.

Fixes LP Bug #923573.

Change-Id: I643efec310cbb9a175607cc17f0c077f261b1d6d
This commit is contained in:
Dan Prince 2012-02-01 15:17:26 -05:00
parent d2e6f63fe1
commit f76477c7b1
4 changed files with 78 additions and 24 deletions

View File

@ -101,7 +101,7 @@ from dateutil import parser
import errno
import eventlet
from eventlet import wsgi
from httplib import HTTPException
import httplib
import json
# memcache is imported in __init__ if memcache caching is configured
import logging
@ -194,6 +194,8 @@ class AuthProtocol(object):
# Credentials used to verify this component with the Auth service since
# validating tokens is a privileged call
self.admin_token = conf.get('admin_token')
self.admin_user = conf.get('admin_user', None)
self.admin_password = conf.get('admin_password', None)
# Certificate file and key file used to authenticate with Keystone
# server
self.cert_file = conf.get('certfile', None)
@ -265,7 +267,7 @@ class AuthProtocol(object):
# the keystone server
try:
self.osksvalidate = self._supports_osksvalidate()
except (HTTPException, StandardError):
except (httplib.HTTPException, StandardError):
pass
#Prep headers to forward request to local or remote downstream service
@ -427,7 +429,38 @@ class AuthProtocol(object):
return HTTPUnauthorized()(env,
start_response)
def _verify_claims(self, env, claims):
def _build_token_uri(self):
return '/v2.0/tokens/%s' % self.service_id_querystring
def _get_admin_auth_token(self, username, password):
"""
This function gets an admin auth token to be used by this service to
validate a user's token. Validate_token is a priviledged call so
it needs to be authenticated by a service that is calling it
"""
headers = {
"Content-type": "application/json",
"Accept": "application/json"}
params = {
"auth": {
"passwordCredentials": {
"username": username,
"password": password,
}
}
}
if self.auth_protocol == "http":
conn = httplib.HTTPConnection(self.auth_host, self.auth_port)
else:
conn = httplib.HTTPSConnection(self.auth_host, self.auth_port,
cert_file=self.cert_file)
conn.request("POST", self._build_token_uri(), json.dumps(params),
headers=headers)
response = conn.getresponse()
data = response.read()
return data
def _verify_claims(self, env, claims, retry=True):
"""Verify claims and extract identity information, if applicable."""
cached_claims = self._cache_get(env, claims)
@ -444,10 +477,10 @@ class AuthProtocol(object):
# Step 1: We need to auth with the keystone service, so get an
# admin token
#TODO(ziad): Need to properly implement this, where to store creds
# for now using token from ini
#auth = self.get_admin_auth_token("admin", "secrete", "1")
#admin_token = json.loads(auth)["auth"]["token"]["id"]
if not self.admin_token:
auth = self._get_admin_auth_token(self.admin_user,
self.admin_password)
self.admin_token = json.loads(auth)["access"]["token"]["id"]
# Step 2: validate the user's token with the auth service
# since this is a priviledged op,m we need to auth ourselves
@ -467,11 +500,6 @@ class AuthProtocol(object):
logger.debug("Connecting to %s://%s:%s to check claims" % (
self.auth_protocol, self.auth_host, self.auth_port))
##TODO(ziad):we need to figure out how to auth to keystone
#since validate_token is a priviledged call
#Khaled's version uses creds to get a token
# "X-Auth-Token": admin_token}
# we're using a test token from the ini file for now
try:
conn = http_connect(self.auth_host, self.auth_port, 'GET',
path,
@ -504,9 +532,13 @@ class AuthProtocol(object):
datetime.strftime(time.time(),
EXPIRE_TIME_FORMAT)},
valid=False)
# Keystone rejected claim
logger.debug("Failing the validation")
raise ValidationFailed()
if retry:
self.admin_token = None
return self._verify_claims(env, claims, False)
else:
# Keystone rejected claim
logger.debug("Failing the validation")
raise ValidationFailed()
token_info = json.loads(data)
@ -642,7 +674,7 @@ class AuthProtocol(object):
logger.debug("Falling back to core API behavior (using tokens in "
"URL)")
return False
except HTTPException as exc:
except httplib.HTTPException as exc:
logger.exception("Error trying to detect extensions.")
logger.debug("Falling back to core API behavior (using tokens in "
"URL)")

View File

@ -127,9 +127,9 @@ class AuthProtocol(object):
logger.debug("Authentication Service:%s", self.auth_location)
# Credentials used to verify this component with the Auth service
# since validating tokens is a privileged call
self.admin_user = conf.get('auth_admin_user')
self.admin_password = conf.get('auth_admin_password')
self.admin_token = conf.get('auth_admin_token')
self.admin_user = conf.get('admin_user')
self.admin_password = conf.get('admin_password')
self.admin_token = conf.get('admin_token')
# bind to one or more service instances
service_ids = conf.get('service_ids')
self.serviceId_qs = ''

View File

@ -1,6 +1,8 @@
import unittest2 as unittest
import uuid
import keystone.common.exception
import keystone.backends.api as db_api
from keystone.test.functional import common
from keystone.test import client as client_tests
@ -19,6 +21,26 @@ class TestAuthTokenMiddleware(common.MiddlewareTestCase):
super(TestAuthTokenMiddleware, self).setUp(auth_token)
class TestAuthTokenMiddlewareWithNoAdminToken(common.MiddlewareTestCase):
"""
Tests for Keystone WSGI middleware: Auth Token
"""
def setUp(self):
settings = {'delay_auth_decision': '0',
'auth_host': client_tests.TEST_TARGET_SERVER_ADMIN_ADDRESS,
'auth_port': client_tests.TEST_TARGET_SERVER_ADMIN_PORT,
'auth_protocol':
client_tests.TEST_TARGET_SERVER_ADMIN_PROTOCOL,
'auth_uri': ('%s://%s:%s/' % \
(client_tests.TEST_TARGET_SERVER_SERVICE_PROTOCOL,
client_tests.TEST_TARGET_SERVER_SERVICE_ADDRESS,
client_tests.TEST_TARGET_SERVER_SERVICE_PORT)),
'admin_user': self.admin_username,
'admin_password': self.admin_password}
super(TestAuthTokenMiddlewareWithNoAdminToken, self).setUp(auth_token,
settings)
#
# Glance
#
@ -65,9 +87,9 @@ class TestQuantumMiddleware(common.MiddlewareTestCase):
client_tests.TEST_TARGET_SERVER_SERVICE_ADDRESS,
client_tests.TEST_TARGET_SERVER_SERVICE_PORT)),
'auth_version': '2.0',
'auth_admin_token': self.admin_token,
'auth_admin_user': self.admin_username,
'auth_admin_password': self.admin_password}
'admin_token': self.admin_token,
'admin_user': self.admin_username,
'admin_password': self.admin_password}
super(TestQuantumMiddleware, self).setUp(quantum_auth_token, settings)

View File

@ -1596,8 +1596,8 @@ class MiddlewareTestCase(FunctionalTestCase):
client_tests.TEST_TARGET_SERVER_SERVICE_ADDRESS,
client_tests.TEST_TARGET_SERVER_SERVICE_PORT)),
'admin_token': self.admin_token,
'auth_admin_user': self.admin_username,
'auth_admin_password': self.admin_password}
'admin_user': self.admin_username,
'admin_password': self.admin_password}
cert_file = isSsl()
if cert_file:
settings['certfile'] = cert_file