Add system scope support to context switcher

Change-Id: Idd2ec7ae6e978a358b4b3639e86cadae06c90976
This commit is contained in:
Radomir Dopieralski 2021-11-02 17:05:46 +01:00
parent f9bab3fe19
commit 34a0159d1a
12 changed files with 252 additions and 14 deletions

View File

@ -233,3 +233,28 @@ class BasePlugin(object, metaclass=abc.ABCMeta):
unscoped_auth_ref.user_id, _name)
break
return domain_auth, domain_auth_ref
def get_system_scoped_auth(self, unscoped_auth, unscoped_auth_ref,
system_scope):
"""Get the system scoped keystone auth and access info
This function returns a system scoped keystone token plugin
and AccessInfo object.
:param unscoped_auth: keystone auth plugin
:param unscoped_auth_ref: keystoneclient.access.AccessInfo` or None.
:param system_scope: system that we should try to scope to
:return: keystone token auth plugin, AccessInfo object
"""
session = utils.get_session()
auth_url = unscoped_auth.auth_url
system_auth = None
system_auth_ref = None
token = unscoped_auth_ref.auth_token
system_auth = utils.get_token_auth_plugin(
auth_url,
token,
system_scope=system_scope)
system_auth_ref = system_auth.get_access(session)
return system_auth, system_auth_ref

View File

@ -1382,6 +1382,54 @@ class OpenStackAuthTests(test.TestCase):
def test_switch_region_with_next(self, next=None):
self.test_switch_region(next='/next_url')
@mock.patch.object(v3_auth.Token, 'get_access')
@mock.patch.object(password.PasswordPlugin, 'list_projects')
@mock.patch.object(v3_auth.Password, 'get_access')
def test_switch_system_scope(self, mock_get_access, mock_project_list,
mock_get_access_token,
next=None):
projects = []
user = self.data.user
scoped = self.data.unscoped_access_info
form_data = self.get_form_data(user)
mock_get_access.return_value = self.data.unscoped_access_info
mock_get_access_token.return_value = scoped
mock_project_list.return_value = projects
url = reverse('login')
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
response = self.client.post(url, form_data)
self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
self.assertFalse(self.client.session['token'].system_scoped)
url = reverse('switch_system_scope')
if next:
form_data.update({auth.REDIRECT_FIELD_NAME: next})
response = self.client.get(url, form_data)
if next:
expected_url = next
self.assertEqual(response['location'], expected_url)
else:
self.assertRedirects(response, settings.LOGIN_REDIRECT_URL)
self.assertNotEqual(False, self.client.session['token'].system_scoped)
mock_get_access.assert_called_once_with(IsA(session.Session))
mock_get_access_token.assert_called_with(IsA(session.Session))
mock_project_list.assert_called_once_with(
IsA(session.Session),
IsA(v3_auth.Password),
self.data.unscoped_access_info)
class OpenStackAuthTestsPublicURL(OpenStackAuthTests):
interface = 'publicURL'

View File

@ -30,6 +30,9 @@ urlpatterns = [
url(r'^switch_keystone_provider/(?P<keystone_provider>[^/]+)/$',
views.switch_keystone_provider,
name='switch_keystone_provider'),
url(r'^switch_system_scope/$',
views.switch_system_scope,
name='switch_system_scope'),
]
if utils.allow_expired_passowrd_change():

View File

@ -61,6 +61,7 @@ def create_user_from_token(request, token, endpoint, services_region=None):
project_name=token.project['name'],
domain_id=token.domain['id'],
domain_name=token.domain['name'],
system_scoped=token.system_scoped,
enabled=True,
service_catalog=token.serviceCatalog,
roles=token.roles,
@ -117,6 +118,10 @@ class Token(object):
self.roles = [{'name': role} for role in auth_ref.role_names]
self.serviceCatalog = auth_ref.service_catalog.catalog
# System scope
# Only keystone API V3 has it.
self.system_scoped = getattr(auth_ref, 'system_scoped', False)
class User(models.AbstractBaseUser, models.AnonymousUser):
"""A User class with some extra special sauce for Keystone.
@ -200,7 +205,8 @@ class User(models.AbstractBaseUser, models.AnonymousUser):
services_region=None, user_domain_id=None,
user_domain_name=None, domain_id=None, domain_name=None,
project_id=None, project_name=None, is_federated=False,
unscoped_token=None, password=None, password_expires_at=None):
unscoped_token=None, password=None, password_expires_at=None,
system_scoped=False):
self.id = id
self.pk = id
self.token = token
@ -212,6 +218,7 @@ class User(models.AbstractBaseUser, models.AnonymousUser):
self.domain_name = domain_name
self.project_id = project_id or tenant_id
self.project_name = project_name or tenant_name
self.system_scoped = system_scoped
self.service_catalog = service_catalog
self._services_region = (
services_region or
@ -223,6 +230,7 @@ class User(models.AbstractBaseUser, models.AnonymousUser):
self._authorized_tenants = authorized_tenants
self.is_federated = is_federated
self.password_expires_at = password_expires_at
self._is_system_user = None
# Unscoped token is used for listing user's project that works
# for both federated and keystone user.
@ -330,6 +338,22 @@ class User(models.AbstractBaseUser, models.AnonymousUser):
regions.append(region)
return regions
@property
def is_system_user(self):
"""Check if the user has access to the system scope."""
if self._is_system_user is not None:
return self._is_system_user
try:
self._is_system_user = utils.get_system_access(
user_id=self.id,
auth_url=self.endpoint,
token=self.unscoped_token,
is_federated=self.is_federated)
except (keystone_exceptions.ClientException,
keystone_exceptions.AuthorizationFailure):
LOG.exception('Unable to retrieve systems list.')
return self._is_system_user
def save(self, *args, **kwargs):
# Presume we can't write to Keystone.
pass

View File

@ -20,6 +20,7 @@ from django.conf import settings
from django.contrib import auth
from django.contrib.auth import models
from django.utils import timezone
from keystoneauth1 import exceptions as keystone_exceptions
from keystoneauth1.identity import v3 as v3_auth
from keystoneauth1 import session
from keystoneauth1 import token_endpoint
@ -294,7 +295,13 @@ def clean_up_auth_url(auth_url):
scheme, netloc, re.sub(r'/auth.*', '', path), '', ''))
def get_token_auth_plugin(auth_url, token, project_id=None, domain_name=None):
def get_token_auth_plugin(auth_url, token, project_id=None, domain_name=None,
system_scope=None):
if system_scope:
return v3_auth.Token(auth_url=auth_url,
token=token,
system_scope=system_scope,
reauthenticate=False)
if domain_name:
return v3_auth.Token(auth_url=auth_url,
token=token,
@ -322,6 +329,25 @@ def get_project_list(*args, **kwargs):
return projects
def get_system_access(user_id, auth_url, token, is_federated):
session = get_session()
auth_url, _ = fix_auth_url_version_prefix(auth_url)
auth = token_endpoint.Token(auth_url, token)
client = get_keystone_client().Client(session=session, auth=auth)
# Old versions of keystoneclient don't have auth.system endpoint yet.
auth_system = getattr(client.auth, 'system', None)
if auth_system is not None:
return 'all' in auth_system()
# Fall back to trying to get the system scope token.
try:
auth = get_token_auth_plugin(auth_url=auth_url, token=token,
system_scope='all')
auth.get_access(session)
except keystone_exceptions.ClientException:
return False
return True
def default_services_region(service_catalog, request=None,
ks_endpoint=None):
"""Return the default service region.

View File

@ -400,6 +400,55 @@ def switch_keystone_provider(request, keystone_provider=None,
return response
# TODO(stephenfin): Migrate to CBV
@login_required
def switch_system_scope(request, redirect_field_name=auth.REDIRECT_FIELD_NAME):
"""Switches an authenticated user from one system to another."""
LOG.debug('Switching to system scope for user "%s".', request.user.username)
endpoint, __ = utils.fix_auth_url_version_prefix(request.user.endpoint)
session = utils.get_session()
# Keystone can be configured to prevent exchanging a scoped token for
# another token. Always use the unscoped token for requesting a
# scoped token.
unscoped_token = request.user.unscoped_token
auth = utils.get_token_auth_plugin(auth_url=endpoint,
token=unscoped_token,
system_scope='all')
try:
auth_ref = auth.get_access(session)
except keystone_exceptions.ClientException:
msg = (
_('System switch failed for user "%(username)s".') %
{'username': request.user.username})
messages.error(request, msg)
auth_ref = None
LOG.exception('An error occurred while switching sessions.')
else:
msg = 'System switch successful for user "%(username)s".' % \
{'username': request.user.username}
LOG.info(msg)
# Ensure the user-originating redirection url is safe.
# Taken from django.contrib.auth.views.login()
redirect_to = request.GET.get(redirect_field_name, '')
if not http.is_safe_url(url=redirect_to,
allowed_hosts=[request.get_host()]):
redirect_to = settings.LOGIN_REDIRECT_URL
if auth_ref:
user = auth_user.create_user_from_token(
request,
auth_user.Token(auth_ref, unscoped_token=unscoped_token),
endpoint)
auth_user.set_session_from_user(request, user)
message = _('Switch to system scope successful.')
messages.success(request, message)
response = shortcuts.redirect(redirect_to)
return response
class PasswordView(edit_views.FormView):
"""Changes user's password when it's expired or otherwise inaccessible."""
template_name = 'auth/password.html'

View File

@ -1,12 +1,19 @@
{% load i18n %}
<span class="fa fa-list-alt"></span>
<span class="context-overview">
{% if domain_supported %}
<span class="context-domain">{{ domain_name }}</span>
<span class="fa fa-circle context-delimiter"></span>
{% endif %}
{% if project_name %}
<span class="context-project">{{ project_name }}</span>
{% endif %}
{% if multi_region %}
<span class="fa fa-circle context-delimiter"></span>
<span class="context-region">{{ region_name }}</span>
{% endif %}
{% if system_scoped %}
<span class="context-system">{% trans "system scope" %}</span>
{% endif %}
</span>

View File

@ -0,0 +1,13 @@
{% load i18n %}
<ul class="dropdown-menu">
<li class="dropdown-header">{% trans "Systems:" %}</li>
<li>
<a class="{% if system_scoped %}dropdown-selected{% endif %}"
href="{% url 'switch_system_scope' %}{% if page_url %}?next={{ page_url }}{% endif %}"
target="_self">
<span class="fa fa-check dropdown-selected-icon"></span>
<span class="dropdown-title">{% trans "system scope" %}</span>
</a>
</li>
</ul>

View File

@ -29,5 +29,13 @@
{% show_region_list %}
</li>
{% endif %}
{% is_system_user as system_user %}
{% if system_user %}
<li>
{% show_system_list %}
</li>
{% endif %}
</ul>
</li>

View File

@ -41,6 +41,15 @@ def is_multidomain():
return is_multidomain_supported()
@register.simple_tag(takes_context=True)
def is_system_user(context):
try:
request = context['request']
except KeyError:
return False
return request.user.is_system_user
@register.inclusion_tag('context_selection/_overview.html',
takes_context=True)
def show_overview(context):
@ -55,6 +64,7 @@ def show_overview(context):
'project_name': project_name or request.user.project_name,
'multi_region': is_multi_region_configured(request),
'region_name': request.user.services_region,
'system_scoped': request.user.system_scoped,
'request': request}
return context
@ -102,6 +112,20 @@ def show_region_list(context):
return context
@register.inclusion_tag('context_selection/_system_list.html',
takes_context=True)
def show_system_list(context):
if 'request' not in context:
return {}
request = context['request']
panel = request.horizon.get('panel')
context = {
'system_scoped': request.user.system_scoped,
'page_url': panel.get_absolute_url() if panel else None,
}
return context
@register.inclusion_tag('context_selection/_anti_clickjack.html',
takes_context=True)
def iframe_embed_settings(context):

View File

@ -261,18 +261,22 @@ class TestCase(horizon_helpers.TestCase):
authorized_tenants=None, enabled=True, domain_id=None,
user_domain_name=None):
def get_user(request):
return user.User(id=id,
token=token,
user=username,
domain_id=domain_id,
user_domain_name=user_domain_name,
tenant_id=tenant_id,
tenant_name=tenant_name,
service_catalog=service_catalog,
roles=roles,
enabled=enabled,
authorized_tenants=authorized_tenants,
endpoint=settings.OPENSTACK_KEYSTONE_URL)
ret = user.User(
id=id,
token=token,
user=username,
domain_id=domain_id,
user_domain_name=user_domain_name,
tenant_id=tenant_id,
tenant_name=tenant_name,
service_catalog=service_catalog,
roles=roles,
enabled=enabled,
authorized_tenants=authorized_tenants,
endpoint=settings.OPENSTACK_KEYSTONE_URL,
)
ret._is_system_user = False
return ret
utils.get_user = get_user
def assertRedirectsNoFollow(self, response, expected_url):

View File

@ -0,0 +1,7 @@
---
features:
- |
A new entry has been added to the context switcher menu, visible only
when the current user has access to the system scope. This entry, labeled
"system scope", allows to switch to a system-scope token, so that operations
that require this kind of token can be performed.