StarlingX Installation/Update/Patching/Backup/Restore
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

100 lines
3.2 KiB

# -*- encoding: utf-8 -*-
# Copyright © 2012 New Dream Network, LLC (DreamHost)
# Author: Doug Hellmann <> # noqa: H105
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Copyright (c) 2013-2022 Wind River Systems, Inc.
from oslo_config import cfg
from pecan import hooks
from sysinv.common import context
from sysinv.common import utils
from sysinv.openstack.common import policy
from webob import exc
class ConfigHook(hooks.PecanHook):
"""Attach the config object to the request so controllers can get to it."""
def before(self, state):
state.request.cfg = cfg.CONF
class ContextHook(hooks.PecanHook):
"""Configures a request context and attaches it to the request.
The following HTTP request headers are used:
X-User-Id or X-User:
Used for context.user_id.
X-Tenant-Id or X-Tenant:
Used for context.tenant.
Used for context.auth_token.
Used for setting context.is_admin flag to either True or False.
The flag is set to True, if X-Roles contains either an administrator
or admin substring. Otherwise it is set to False.
def __init__(self, public_api_routes):
self.public_api_routes = public_api_routes
super(ContextHook, self).__init__()
def before(self, state):
user_id = state.request.headers.get('X-User-Id')
user_id = state.request.headers.get('X-User', user_id)
tenant = state.request.headers.get('X-Tenant-Id')
tenant = state.request.headers.get('X-Tenant', tenant)
domain_id = state.request.headers.get('X-User-Domain-Id')
domain_name = state.request.headers.get('X-User-Domain-Name')
auth_token = state.request.headers.get('X-Auth-Token', None)
creds = {'roles': state.request.headers.get('X-Roles', '').split(',')}
is_admin = policy.check('admin', state.request.headers, creds)
path = utils.safe_rstrip(state.request.path, '/')
is_public_api = path in self.public_api_routes
state.request.context = context.RequestContext(
class AdminAuthHook(hooks.PecanHook):
"""Verify that the user has admin rights.
Checks whether the request context is an admin context and
rejects the request otherwise.
def before(self, state):
ctx = state.request.context
is_admin_api = policy.check('admin_api', {}, ctx.to_dict())
if not is_admin_api and not ctx.is_public_api:
raise exc.HTTPForbidden()