placement/placement/auth.py

116 lines
3.6 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystonemiddleware import auth_token
from oslo_log import log as logging
from oslo_middleware import request_id
import webob.dec
import webob.exc
from placement import context
LOG = logging.getLogger(__name__)
class Middleware(object):
def __init__(self, application, **kwargs):
self.application = application
# NOTE(cdent): Only to be used in tests where auth is being faked. This
# middleware can be used to mimic keystonemiddleware auth_token middleware,
# which is important for building API protection tests without an external
# dependency on keystone.
class NoAuthMiddleware(Middleware):
"""Require a token if one isn't present."""
def __init__(self, application):
self.application = application
@webob.dec.wsgify
def __call__(self, req):
if req.environ['PATH_INFO'] == '/':
return self.application
if 'X-Auth-Token' not in req.headers:
return webob.exc.HTTPUnauthorized()
token = req.headers['X-Auth-Token']
user_id, _sep, project_id = token.partition(':')
project_id = project_id or user_id
# Real keystone expands and flattens roles to include their implied
# roles, e.g. admin implies member and reader, so tests should include
# this flattened list also
if 'HTTP_X_ROLES' in req.environ.keys():
roles = req.headers['X_ROLES'].split(',')
elif user_id == 'admin':
roles = ['admin']
else:
roles = []
req.headers['X_USER_ID'] = user_id
if not req.headers.get('OPENSTACK_SYSTEM_SCOPE'):
req.headers['X_TENANT_ID'] = project_id
req.headers['X_ROLES'] = ','.join(roles)
return self.application
class PlacementKeystoneContext(Middleware):
"""Make a request context from keystone headers."""
@webob.dec.wsgify
def __call__(self, req):
req_id = req.environ.get(request_id.ENV_REQUEST_ID)
ctx = context.RequestContext.from_environ(
req.environ, request_id=req_id)
if ctx.user_id is None and req.environ['PATH_INFO'] not in ['/', '']:
LOG.debug("Neither X_USER_ID nor X_USER found in request")
return webob.exc.HTTPUnauthorized()
req.environ['placement.context'] = ctx
return self.application
class PlacementAuthProtocol(auth_token.AuthProtocol):
"""A wrapper on Keystone auth_token middleware.
Does not perform verification of authentication tokens
for root in the API.
"""
def __init__(self, app, conf):
self._placement_app = app
super(PlacementAuthProtocol, self).__init__(app, conf)
def __call__(self, environ, start_response):
if environ['PATH_INFO'] in ['/', '']:
return self._placement_app(environ, start_response)
return super(PlacementAuthProtocol, self).__call__(
environ, start_response)
def filter_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
def auth_filter(app):
return PlacementAuthProtocol(app, conf)
return auth_filter