gluon/gluon/context.py
Kamal Hussain 44b11447c2 Gluon RBAC using keystone and oslo.policy
Initial code changes for Gluon RBAC
implementation. Includes code for integrating
keystonemiddleware and oslo.policy. RBAC is not
yet enabled on the API flow but it will be done in
the next set of changes.

Change-Id: I89b56bb623a3b3e26f9c6a9705d61f0dad3fe1fc
Implements: blueprint gluon-auth
2016-12-15 10:33:26 -06:00

142 lines
4.3 KiB
Python

# Copyright 2016 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Context: context for security/db session."""
import copy
import datetime
from oslo_context import context as oslo_context
from oslo_db.sqlalchemy import enginefacade
from gluon import policy
class ContextBase(oslo_context.RequestContext):
"""Security context and request information.
Represents the user taking a given action within the system.
"""
def __init__(self, user_id=None, tenant_id=None, is_admin=None,
timestamp=None, tenant_name=None, user_name=None,
is_advsvc=None, **kwargs):
"""Object initialization.
:param overwrite: Set to False to ensure that the greenthread local
copy of the index is not overwritten.
"""
# NOTE(jamielennox): We maintain these arguments in order for tests
# that pass arguments positionally.
kwargs.setdefault('user', user_id)
kwargs.setdefault('tenant', tenant_id)
super(ContextBase, self).__init__(is_admin=is_admin, **kwargs)
self.user_name = user_name
self.tenant_name = tenant_name
if not timestamp:
timestamp = datetime.datetime.utcnow()
self.timestamp = timestamp
self.is_advsvc = is_advsvc
if self.is_advsvc is None:
self.is_advsvc = self.is_admin or policy.check_is_advsvc(self)
if self.is_admin is None:
self.is_admin = policy.check_is_admin(self)
@property
def project_id(self):
return self.tenant
@property
def tenant_id(self):
return self.tenant
@tenant_id.setter
def tenant_id(self, tenant_id):
self.tenant = tenant_id
@property
def user_id(self):
return self.user
@user_id.setter
def user_id(self, user_id):
self.user = user_id
def to_dict(self):
context = super(ContextBase, self).to_dict()
context.update({
'user_id': self.user_id,
'tenant_id': self.tenant_id,
'project_id': self.project_id,
'timestamp': str(self.timestamp),
'tenant_name': self.tenant_name,
'project_name': self.tenant_name,
'user_name': self.user_name,
})
return context
@classmethod
def from_dict(cls, values):
return cls(user_id=values.get('user_id', values.get('user')),
tenant_id=values.get('tenant_id', values.get('project_id')),
is_admin=values.get('is_admin'),
roles=values.get('roles'),
timestamp=values.get('timestamp'),
request_id=values.get('request_id'),
tenant_name=values.get('tenant_name'),
user_name=values.get('user_name'),
auth_token=values.get('auth_token'))
def elevated(self):
"""Return a version of this context with admin flag set."""
context = copy.copy(self)
context.is_admin = True
if 'admin' not in [x.lower() for x in context.roles]:
context.roles = context.roles + ["admin"]
return context
@enginefacade.transaction_context_provider
class ContextBaseWithSession(ContextBase):
pass
class Context(ContextBaseWithSession):
def __init__(self, *args, **kwargs):
super(Context, self).__init__(*args, **kwargs)
self._session = None
@property
def session(self):
return self._session
def get_admin_context():
return Context(user_id=None,
tenant_id=None,
is_admin=True,
overwrite=False)
def get_admin_context_without_session():
return ContextBase(user_id=None,
tenant_id=None,
is_admin=True)