neutron/neutron/context.py
Salvatore Orlando 734e77365b Remove get_admin_roles and associated logic
get_admin_roles was introduced so that contextes generated from
within plugins could be used for policy checks. This was the case
up to the Havana release as several plugins invoked the policy
engine directly to authorize requests.

This was an incorrect behaviour and has now been fixed, meaning
that get_admin_roles is no longer need and can be safely removed.
This will result in a leaner and more reliable codebase. Indeed the
function being removed here was the cause of several bugs where the
policy engine was initialized too early in the server bootstrap
process.
While this patch removes the feature it does not remove the
load_admin_roles parameter from context.get_admin_context. Doing so
will break other projects such as neutron-lbaas. The parameter is
deprecated by this patch and an appropriate warning emitted.

As a consequence neutron's will now no longer perform policy checks
when context.is_admin=True. This flag is instead set either when
a context is explicitly created for granting admin privileges, or
when Neutron is operating in noauth mode. In the latter case every
request is treated by neutron as an admin request, and get_admin_roles
is simply ensuring the appropriate roles get pushed into the context
so that the policy engine will grant admin rights to the request.
This behaviour is probably just a waste of resource; also it is not
adding anything from a security perspective.

On the other hand not performing checks when context.is_admin is
True should not pose a security threat either in noauth mode or
with the keystone middleware. In the former case the software keeps
operating assuming admin rights for every requests, whereas in the
latter case the keystone middleware will always supply a context
with the appropriate roles, and there is no way for an attacker
to trick keystonemiddleware into generating a context for which
is_admin=True.

Finally, this patch also does some non-trivial changes in test_l3.py
as some tests were mocking context.to_dict ignoring the is_admin flag.

Closes-Bug: #1446021

Change-Id: I8a5c02712a0b43f3e36a4f14620ebbd73fbfb03f
2015-06-09 11:12:47 +02:00

162 lines
5.1 KiB
Python

# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Context: context for security/db session."""
import copy
import datetime
from debtcollector import removals
from oslo_context import context as oslo_context
from oslo_log import log as logging
from neutron.db import api as db_api
from neutron import policy
LOG = logging.getLogger(__name__)
class ContextBase(oslo_context.RequestContext):
"""Security context and request information.
Represents the user taking a given action within the system.
"""
def __init__(self, user_id, tenant_id, is_admin=None, read_deleted="no",
roles=None, timestamp=None, request_id=None, tenant_name=None,
user_name=None, overwrite=True, auth_token=None, **kwargs):
"""Object initialization.
:param read_deleted: 'no' indicates deleted records are hidden, 'yes'
indicates deleted records are visible, 'only' indicates that
*only* deleted records are visible.
:param overwrite: Set to False to ensure that the greenthread local
copy of the index is not overwritten.
:param kwargs: Extra arguments that might be present, but we ignore
because they possibly came in from older rpc messages.
"""
super(ContextBase, self).__init__(auth_token=auth_token,
user=user_id, tenant=tenant_id,
is_admin=is_admin,
request_id=request_id,
overwrite=overwrite)
self.user_name = user_name
self.tenant_name = tenant_name
self.read_deleted = read_deleted
if not timestamp:
timestamp = datetime.datetime.utcnow()
self.timestamp = timestamp
self._session = None
self.roles = roles or []
self.is_advsvc = self.is_admin or policy.check_is_advsvc(self)
if self.is_admin is None:
self.is_admin = policy.check_is_admin(self)
@property
def project_id(self):
return self.tenant
@property
def tenant_id(self):
return self.tenant
@tenant_id.setter
def tenant_id(self, tenant_id):
self.tenant = tenant_id
@property
def user_id(self):
return self.user
@user_id.setter
def user_id(self, user_id):
self.user = user_id
def _get_read_deleted(self):
return self._read_deleted
def _set_read_deleted(self, read_deleted):
if read_deleted not in ('no', 'yes', 'only'):
raise ValueError(_("read_deleted can only be one of 'no', "
"'yes' or 'only', not %r") % read_deleted)
self._read_deleted = read_deleted
def _del_read_deleted(self):
del self._read_deleted
read_deleted = property(_get_read_deleted, _set_read_deleted,
_del_read_deleted)
def to_dict(self):
context = super(ContextBase, self).to_dict()
context.update({
'user_id': self.user_id,
'tenant_id': self.tenant_id,
'project_id': self.project_id,
'read_deleted': self.read_deleted,
'roles': self.roles,
'timestamp': str(self.timestamp),
'tenant_name': self.tenant_name,
'project_name': self.tenant_name,
'user_name': self.user_name,
})
return context
@classmethod
def from_dict(cls, values):
return cls(**values)
def elevated(self, read_deleted=None):
"""Return a version of this context with admin flag set."""
context = copy.copy(self)
context.is_admin = True
if 'admin' not in [x.lower() for x in context.roles]:
context.roles = context.roles + ["admin"]
if read_deleted is not None:
context.read_deleted = read_deleted
return context
class Context(ContextBase):
@property
def session(self):
if self._session is None:
self._session = db_api.get_session()
return self._session
@removals.removed_kwarg('load_admin_roles')
def get_admin_context(read_deleted="no", load_admin_roles=True):
return Context(user_id=None,
tenant_id=None,
is_admin=True,
read_deleted=read_deleted,
overwrite=False)
def get_admin_context_without_session(read_deleted="no"):
return ContextBase(user_id=None,
tenant_id=None,
is_admin=True,
read_deleted=read_deleted)