AuthN support for Quantum

Adds authentication support for Quantum.  Generates a context object
and stuffs it into the 'quantum.context' variable in the WSGI environment.
This will be used in conjunction with authZ, later.

Partially implements blueprint authorization-support-for-quantum.

Change-Id: I8af171c2f11a08db5ee41e609d60ad203548650d
This commit is contained in:
Kevin L. Mitchell 2012-05-30 18:10:46 -05:00 committed by Jason Kölker
parent 76a8f573c1
commit a6e8169df6
4 changed files with 263 additions and 9 deletions

View File

@ -29,27 +29,26 @@ use = egg:Paste#urlmap
# To enable Keystone integration comment out the
# following line and uncomment the next one
pipeline = extensions quantumapiapp_v1_0
# pipeline = authtoken extensions quantumapiapp_v1_0
# pipeline = authtoken keystonecontext extensions quantumapiapp_v1_0
[pipeline:quantumapi_v1_1]
# By default, authentication is disabled.
# To enable Keystone integration comment out the
# following line and uncomment the next one
pipeline = extensions quantumapiapp_v1_1
# pipeline = authtoken extensions quantumapiapp_v1_1
# pipeline = authtoken keystonecontext extensions quantumapiapp_v1_1
[filter:keystonecontext]
paste.filter_factory = quantum.auth:QuantumKeystoneContext.factory
[filter:authtoken]
paste.filter_factory = keystone.middleware.auth_token:filter_factory
auth_host = 127.0.0.1
auth_port = 35357
auth_protocol = http
# auth_uri = http://127.0.0.1:5000/
admin_tenant_name = service
admin_user = nova
admin_password = sp
# admin_token = 9a82c95a-99e9-4c3a-b5ee-199f6ba7ff04
# memcache_servers = 127.0.0.1:11211
# token_cache_time = 300
admin_tenant_name = %SERVICE_TENANT_NAME%
admin_user = %SERVICE_USER%
admin_password = %SERVICE_PASSWORD%
[filter:extensions]
paste.filter_factory = quantum.extensions.extensions:plugin_aware_extension_middleware_factory

52
quantum/auth.py Normal file
View File

@ -0,0 +1,52 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import webob.dec
import webob.exc
from quantum import context
from quantum import wsgi
LOG = logging.getLogger(__name__)
class QuantumKeystoneContext(wsgi.Middleware):
"""Make a request context from keystone headers."""
@webob.dec.wsgify
def __call__(self, req):
# Determine the user ID
user_id = req.headers.get('X_USER_ID', req.headers.get('X_USER'))
if not user_id:
LOG.debug("Neither X_USER_ID nor X_USER found in request")
return webob.exc.HTTPUnauthorized()
# Determine the tenant
tenant_id = req.headers.get('X_TENANT_ID', req.headers.get('X_TENANT'))
# Suck out the roles
roles = [r.strip() for r in req.headers.get('X_ROLE', '').split(',')]
# Create a context with the authentication data
ctx = context.Context(user_id, tenant_id, roles=roles)
# Inject the context...
req.environ['quantum.context'] = ctx
return self.application

113
quantum/context.py Normal file
View File

@ -0,0 +1,113 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Context: context for security/db session."""
import copy
import logging
from datetime import datetime
from quantum.db import api as db_api
LOG = logging.getLogger(__name__)
class Context(object):
"""Security context and request information.
Represents the user taking a given action within the system.
"""
def __init__(self, user_id, tenant_id, is_admin=None, read_deleted="no",
roles=None, timestamp=None, **kwargs):
"""
:param read_deleted: 'no' indicates deleted records are hidden, 'yes'
indicates deleted records are visible, 'only' indicates that
*only* deleted records are visible.
"""
if kwargs:
LOG.warn(_('Arguments dropped when creating context: %s') %
str(kwargs))
self.user_id = user_id
self.tenant_id = tenant_id
self.roles = roles or []
self.is_admin = is_admin
if self.is_admin is None:
self.is_admin = 'admin' in [x.lower() for x in self.roles]
elif self.is_admin and 'admin' not in [x.lower() for x in self.roles]:
self.roles.append('admin')
self.read_deleted = read_deleted
if not timestamp:
timestamp = datetime.utcnow()
self.timestamp = timestamp
self._session = None
def _get_read_deleted(self):
return self._read_deleted
def _set_read_deleted(self, read_deleted):
if read_deleted not in ('no', 'yes', 'only'):
raise ValueError(_("read_deleted can only be one of 'no', "
"'yes' or 'only', not %r") % read_deleted)
self._read_deleted = read_deleted
def _del_read_deleted(self):
del self._read_deleted
read_deleted = property(_get_read_deleted, _set_read_deleted,
_del_read_deleted)
@property
def session(self):
if self._session is None:
self._session = db_api.get_session()
return self._session
def to_dict(self):
return {'user_id': self.user_id,
'tenant_id': self.tenant_id,
'is_admin': self.is_admin,
'read_deleted': self.read_deleted,
'roles': self.roles,
'timestamp': str(self.timestamp)}
@classmethod
def from_dict(cls, values):
return cls(**values)
def elevated(self, read_deleted=None):
"""Return a version of this context with admin flag set."""
context = copy.copy(self)
context.is_admin = True
if 'admin' not in [x.lower() for x in context.roles]:
context.roles.append('admin')
if read_deleted is not None:
context.read_deleted = read_deleted
return context
def get_admin_context(read_deleted="no"):
return Context(user_id=None,
tenant_id=None,
is_admin=True,
read_deleted=read_deleted)

View File

@ -0,0 +1,90 @@
import unittest
import webob
from quantum import auth
class QuantumKeystoneContextTestCase(unittest.TestCase):
def setUp(self):
super(QuantumKeystoneContextTestCase, self).setUp()
@webob.dec.wsgify
def fake_app(req):
self.context = req.environ['quantum.context']
return webob.Response()
self.context = None
self.middleware = auth.QuantumKeystoneContext(fake_app)
self.request = webob.Request.blank('/')
self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken'
def test_no_user_no_user_id(self):
self.request.headers['X_TENANT_ID'] = 'testtenantid'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '401 Unauthorized')
def test_with_user(self):
self.request.headers['X_TENANT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.user_id, 'testuserid')
def test_with_user_id(self):
self.request.headers['X_TENANT_ID'] = 'testtenantid'
self.request.headers['X_USER'] = 'testuser'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.user_id, 'testuser')
def test_user_id_trumps_user(self):
self.request.headers['X_TENANT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
self.request.headers['X_USER'] = 'testuser'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.user_id, 'testuserid')
def test_with_tenant_id(self):
self.request.headers['X_TENANT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'test_user_id'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.tenant_id, 'testtenantid')
def test_with_tenant(self):
self.request.headers['X_TENANT'] = 'testtenant'
self.request.headers['X_USER_ID'] = 'test_user_id'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.tenant_id, 'testtenant')
def test_tenant_id_trumps_tenant(self):
self.request.headers['X_TENANT_ID'] = 'testtenantid'
self.request.headers['X_TENANT'] = 'testtenant'
self.request.headers['X_USER_ID'] = 'testuserid'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.tenant_id, 'testtenantid')
def test_roles_no_admin(self):
self.request.headers['X_TENANT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
self.request.headers['X_ROLE'] = 'role1, role2 , role3,role4,role5'
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.roles, ['role1', 'role2', 'role3',
'role4', 'role5'])
self.assertEqual(self.context.is_admin, False)
def test_roles_with_admin(self):
self.request.headers['X_TENANT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
self.request.headers['X_ROLE'] = ('role1, role2 , role3,role4,role5,'
'AdMiN')
response = self.request.get_response(self.middleware)
self.assertEqual(response.status, '200 OK')
self.assertEqual(self.context.roles, ['role1', 'role2', 'role3',
'role4', 'role5', 'AdMiN'])
self.assertEqual(self.context.is_admin, True)