2012-01-31 02:43:26 +00:00
|
|
|
# Copyright 2012 Nebula Inc
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
|
|
# not use this file except in compliance with the License. You may obtain
|
|
|
|
# a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
# License for the specific language governing permissions and limitations
|
|
|
|
# under the License.
|
|
|
|
|
2017-11-22 14:21:03 +00:00
|
|
|
import yaml
|
2016-03-15 13:38:17 +00:00
|
|
|
|
2017-11-22 14:21:03 +00:00
|
|
|
from django import template
|
|
|
|
from django.template import loader
|
2017-03-17 19:15:23 +00:00
|
|
|
from django.test.utils import override_settings
|
2017-12-12 04:30:33 +00:00
|
|
|
from django.urls import reverse
|
2012-10-04 22:43:40 +00:00
|
|
|
|
|
|
|
from openstack_dashboard import api
|
|
|
|
from openstack_dashboard.test import helpers as test
|
2012-01-31 02:43:26 +00:00
|
|
|
|
|
|
|
|
2017-01-25 11:21:24 +00:00
|
|
|
INDEX_URL = reverse('horizon:project:api_access:index')
|
|
|
|
API_URL = "horizon:project:api_access"
|
2014-06-12 14:41:39 +00:00
|
|
|
EC2_URL = reverse(API_URL + ":ec2")
|
|
|
|
OPENRC_URL = reverse(API_URL + ":openrc")
|
|
|
|
CREDS_URL = reverse(API_URL + ":view_credentials")
|
2015-12-02 13:50:53 +00:00
|
|
|
RECREATE_CREDS_URL = reverse(API_URL + ":recreate_credentials")
|
2012-01-31 02:43:26 +00:00
|
|
|
|
|
|
|
|
2013-02-09 22:34:22 +00:00
|
|
|
class APIAccessTests(test.TestCase):
|
2018-01-07 20:21:49 +00:00
|
|
|
@test.create_mocks({api.keystone: ('create_ec2_credentials',
|
|
|
|
'list_ec2_credentials')})
|
2012-01-31 02:43:26 +00:00
|
|
|
def test_ec2_download_view(self):
|
|
|
|
creds = self.ec2.first()
|
2018-01-07 20:21:49 +00:00
|
|
|
self.mock_list_ec2_credentials.return_value = []
|
|
|
|
self.mock_create_ec2_credentials.return_value = creds
|
2012-01-31 02:43:26 +00:00
|
|
|
|
2013-02-09 22:34:22 +00:00
|
|
|
res = self.client.get(EC2_URL)
|
2012-01-31 02:43:26 +00:00
|
|
|
self.assertEqual(res.status_code, 200)
|
|
|
|
self.assertEqual(res['content-type'], 'application/zip')
|
2014-06-12 14:41:39 +00:00
|
|
|
|
2018-01-07 20:21:49 +00:00
|
|
|
self.mock_list_ec2_credentials.assert_called_once_with(
|
|
|
|
test.IsHttpRequest(), self.user.id)
|
|
|
|
self.mock_create_ec2_credentials.assert_called_once_with(
|
|
|
|
test.IsHttpRequest(), self.user.id, self.tenant.id)
|
|
|
|
|
2015-05-29 17:10:12 +00:00
|
|
|
@override_settings(OPENSTACK_API_VERSIONS={"identity": 3})
|
2014-06-12 14:41:39 +00:00
|
|
|
def test_openrc_credentials(self):
|
|
|
|
res = self.client.get(OPENRC_URL)
|
|
|
|
self.assertEqual(res.status_code, 200)
|
2017-01-25 11:21:24 +00:00
|
|
|
openrc = 'project/api_access/openrc.sh.template'
|
2014-06-12 14:41:39 +00:00
|
|
|
self.assertTemplateUsed(res, openrc)
|
|
|
|
name = 'export OS_USERNAME="{}"'.format(self.request.user.username)
|
2015-05-29 17:10:12 +00:00
|
|
|
p_id = 'export OS_PROJECT_ID={}'.format(self.request.user.tenant_id)
|
|
|
|
domain = 'export OS_USER_DOMAIN_NAME="{}"'.format(
|
|
|
|
self.request.user.user_domain_name)
|
2015-08-28 15:42:53 +00:00
|
|
|
self.assertIn(name.encode('utf-8'), res.content)
|
2015-05-29 17:10:12 +00:00
|
|
|
self.assertIn(p_id.encode('utf-8'), res.content)
|
|
|
|
self.assertIn(domain.encode('utf-8'), res.content)
|
2014-06-12 14:41:39 +00:00
|
|
|
|
2018-01-07 20:21:49 +00:00
|
|
|
@test.create_mocks({api.keystone: ('list_ec2_credentials',)})
|
2014-06-12 14:41:39 +00:00
|
|
|
def test_credential_api(self):
|
|
|
|
certs = self.ec2.list()
|
2018-01-07 20:21:49 +00:00
|
|
|
self.mock_list_ec2_credentials.return_value = certs
|
2014-06-12 14:41:39 +00:00
|
|
|
|
|
|
|
res = self.client.get(CREDS_URL)
|
2018-01-07 20:21:49 +00:00
|
|
|
|
2014-06-12 14:41:39 +00:00
|
|
|
self.assertEqual(res.status_code, 200)
|
2017-01-25 11:21:24 +00:00
|
|
|
credentials = 'project/api_access/credentials.html'
|
2014-06-12 14:41:39 +00:00
|
|
|
self.assertTemplateUsed(res, credentials)
|
|
|
|
self.assertEqual(self.user.id, res.context['openrc_creds']['user'].id)
|
|
|
|
self.assertEqual(certs[0].access,
|
|
|
|
res.context['ec2_creds']['ec2_access_key'])
|
2018-01-07 20:21:49 +00:00
|
|
|
self.mock_list_ec2_credentials.assert_called_once_with(
|
|
|
|
test.IsHttpRequest(), self.user.id)
|
2015-12-02 13:50:53 +00:00
|
|
|
|
2018-01-07 20:21:49 +00:00
|
|
|
@test.create_mocks({api.keystone: ('create_ec2_credentials',
|
|
|
|
'list_ec2_credentials',
|
|
|
|
'delete_user_ec2_credentials')})
|
2015-12-02 13:50:53 +00:00
|
|
|
def _test_recreate_user_credentials(self, exists_credentials=True):
|
|
|
|
old_creds = self.ec2.list() if exists_credentials else []
|
|
|
|
new_creds = self.ec2.first()
|
2018-01-07 20:21:49 +00:00
|
|
|
self.mock_list_ec2_credentials.return_value = old_creds
|
2015-12-02 13:50:53 +00:00
|
|
|
if exists_credentials:
|
2018-01-07 20:21:49 +00:00
|
|
|
self.mock_delete_user_ec2_credentials.return_value = []
|
|
|
|
self.mock_create_ec2_credentials.return_value = new_creds
|
2015-12-02 13:50:53 +00:00
|
|
|
|
|
|
|
res_get = self.client.get(RECREATE_CREDS_URL)
|
|
|
|
self.assertEqual(res_get.status_code, 200)
|
|
|
|
credentials = \
|
2017-01-25 11:21:24 +00:00
|
|
|
'project/api_access/recreate_credentials.html'
|
2015-12-02 13:50:53 +00:00
|
|
|
self.assertTemplateUsed(res_get, credentials)
|
|
|
|
|
|
|
|
res_post = self.client.post(RECREATE_CREDS_URL)
|
|
|
|
self.assertNoFormErrors(res_post)
|
|
|
|
self.assertRedirectsNoFollow(res_post, INDEX_URL)
|
|
|
|
|
2018-01-07 20:21:49 +00:00
|
|
|
self.mock_list_ec2_credentials.assert_called_once_with(
|
|
|
|
test.IsHttpRequest(), self.user.id)
|
|
|
|
if exists_credentials:
|
|
|
|
self.mock_delete_user_ec2_credentials.assert_called_once_with(
|
|
|
|
test.IsHttpRequest(), self.user.id, old_creds[0].access)
|
|
|
|
else:
|
|
|
|
self.mock_delete_user_ec2_credentials.assert_not_called()
|
|
|
|
self.mock_create_ec2_credentials.assert_called_once_with(
|
|
|
|
test.IsHttpRequest(), self.user.id, self.tenant.id)
|
|
|
|
|
2015-12-02 13:50:53 +00:00
|
|
|
def test_recreate_user_credentials(self):
|
|
|
|
self._test_recreate_user_credentials()
|
|
|
|
|
|
|
|
def test_recreate_user_credentials_with_no_existing_creds(self):
|
|
|
|
self._test_recreate_user_credentials(exists_credentials=False)
|
2016-03-15 13:38:17 +00:00
|
|
|
|
|
|
|
|
|
|
|
class ASCIITenantNameRCTests(test.TestCase):
|
|
|
|
TENANT_NAME = 'tenant'
|
|
|
|
|
|
|
|
def _setup_user(self, **kwargs):
|
2020-09-15 05:08:17 +00:00
|
|
|
super()._setup_user(tenant_name=self.TENANT_NAME)
|
2016-03-15 13:38:17 +00:00
|
|
|
|
|
|
|
@override_settings(OPENSTACK_API_VERSIONS={"identity": 3})
|
|
|
|
def test_openrc_credentials_filename(self):
|
|
|
|
expected = 'attachment; filename="%s-openrc.sh"' % self.TENANT_NAME
|
|
|
|
res = self.client.get(OPENRC_URL)
|
|
|
|
|
|
|
|
self.assertEqual(res.status_code, 200)
|
|
|
|
self.assertEqual(expected, res['content-disposition'])
|
|
|
|
|
|
|
|
|
|
|
|
class UnicodeTenantNameRCTests(test.TestCase):
|
2021-01-05 07:22:42 +00:00
|
|
|
TENANT_NAME = '\u043f\u0440\u043e\u0435\u043a\u0442'
|
2016-03-15 13:38:17 +00:00
|
|
|
|
|
|
|
def _setup_user(self, **kwargs):
|
2020-09-15 05:08:17 +00:00
|
|
|
super()._setup_user(tenant_name=self.TENANT_NAME)
|
2016-03-15 13:38:17 +00:00
|
|
|
|
|
|
|
@override_settings(OPENSTACK_API_VERSIONS={"identity": 3})
|
|
|
|
def test_openrc_credentials_filename(self):
|
|
|
|
expected = ('attachment; filename="%s-openrc.sh"' %
|
|
|
|
self.TENANT_NAME).encode('utf-8')
|
|
|
|
res = self.client.get(OPENRC_URL)
|
|
|
|
|
|
|
|
self.assertEqual(res.status_code, 200)
|
|
|
|
|
|
|
|
result_content_disposition = res['content-disposition']
|
|
|
|
|
2020-01-09 14:49:18 +00:00
|
|
|
result_content_disposition = result_content_disposition.\
|
|
|
|
encode('latin-1')
|
2016-03-15 13:38:17 +00:00
|
|
|
self.assertEqual(expected,
|
|
|
|
result_content_disposition)
|
2017-11-22 14:21:03 +00:00
|
|
|
|
|
|
|
|
|
|
|
class FakeUser(object):
|
|
|
|
username = "cool user"
|
|
|
|
|
|
|
|
|
|
|
|
class TemplateRenderTest(test.TestCase):
|
|
|
|
"""Tests for templates render."""
|
|
|
|
|
|
|
|
def test_openrc_html_escape(self):
|
|
|
|
context = {
|
|
|
|
"user": FakeUser(),
|
|
|
|
"tenant_id": "some-cool-id",
|
|
|
|
"auth_url": "http://tests.com",
|
|
|
|
"tenant_name": "ENG Perf R&D"}
|
|
|
|
out = loader.render_to_string(
|
|
|
|
'project/api_access/openrc.sh.template',
|
|
|
|
context,
|
|
|
|
template.Context(context))
|
|
|
|
|
|
|
|
self.assertNotIn("&", out)
|
|
|
|
self.assertIn("ENG Perf R&D", out)
|
|
|
|
|
|
|
|
def test_openrc_html_evil_shell_escape(self):
|
|
|
|
context = {
|
|
|
|
"user": FakeUser(),
|
|
|
|
"tenant_id": "some-cool-id",
|
|
|
|
"auth_url": "http://tests.com",
|
|
|
|
"tenant_name": 'o"; sudo rm -rf /'}
|
|
|
|
out = loader.render_to_string(
|
|
|
|
'project/api_access/openrc.sh.template',
|
|
|
|
context,
|
|
|
|
template.Context(context))
|
|
|
|
|
|
|
|
self.assertNotIn('o"', out)
|
|
|
|
self.assertIn('\"', out)
|
|
|
|
|
|
|
|
def test_openrc_html_evil_shell_backslash_escape(self):
|
|
|
|
context = {
|
|
|
|
"user": FakeUser(),
|
|
|
|
"tenant_id": "some-cool-id",
|
|
|
|
"auth_url": "http://tests.com",
|
|
|
|
"tenant_name": 'o\"; sudo rm -rf /'}
|
|
|
|
out = loader.render_to_string(
|
|
|
|
'project/api_access/openrc.sh.template',
|
|
|
|
context,
|
|
|
|
template.Context(context))
|
|
|
|
|
|
|
|
self.assertNotIn('o\"', out)
|
|
|
|
self.assertNotIn('o"', out)
|
|
|
|
self.assertIn('\\"', out)
|
|
|
|
|
|
|
|
def test_openrc_set_region(self):
|
|
|
|
context = {
|
|
|
|
"user": FakeUser(),
|
|
|
|
"tenant_id": "some-cool-id",
|
|
|
|
"auth_url": "http://tests.com",
|
|
|
|
"tenant_name": "Tenant",
|
|
|
|
"region": "Colorado"}
|
|
|
|
out = loader.render_to_string(
|
|
|
|
'project/api_access/openrc.sh.template',
|
|
|
|
context,
|
|
|
|
template.Context(context))
|
|
|
|
|
|
|
|
self.assertIn("OS_REGION_NAME=\"Colorado\"", out)
|
|
|
|
|
|
|
|
def test_openrc_region_not_set(self):
|
|
|
|
context = {
|
|
|
|
"user": FakeUser(),
|
|
|
|
"tenant_id": "some-cool-id",
|
|
|
|
"auth_url": "http://tests.com",
|
|
|
|
"tenant_name": "Tenant"}
|
|
|
|
out = loader.render_to_string(
|
|
|
|
'project/api_access/openrc.sh.template',
|
|
|
|
context,
|
|
|
|
template.Context(context))
|
|
|
|
|
|
|
|
self.assertIn("OS_REGION_NAME=\"\"", out)
|
|
|
|
|
|
|
|
def test_clouds_yaml_set_region(self):
|
|
|
|
context = {
|
|
|
|
"cloud_name": "openstack",
|
|
|
|
"user": FakeUser(),
|
|
|
|
"tenant_id": "some-cool-id",
|
|
|
|
"auth_url": "http://example.com",
|
|
|
|
"tenant_name": "Tenant",
|
|
|
|
"region": "Colorado"}
|
2018-02-13 08:09:27 +00:00
|
|
|
out = yaml.safe_load(loader.render_to_string(
|
2017-11-22 14:21:03 +00:00
|
|
|
'project/api_access/clouds.yaml.template',
|
|
|
|
context,
|
|
|
|
template.Context(context)))
|
|
|
|
|
|
|
|
self.assertIn('clouds', out)
|
|
|
|
self.assertIn('openstack', out['clouds'])
|
|
|
|
self.assertNotIn('profile', out['clouds']['openstack'])
|
|
|
|
self.assertEqual(
|
|
|
|
"http://example.com",
|
|
|
|
out['clouds']['openstack']['auth']['auth_url'])
|
|
|
|
self.assertEqual("Colorado", out['clouds']['openstack']['region_name'])
|
|
|
|
self.assertNotIn('regions', out['clouds']['openstack'])
|
|
|
|
|
|
|
|
def test_clouds_yaml_region_not_set(self):
|
|
|
|
context = {
|
|
|
|
"cloud_name": "openstack",
|
|
|
|
"user": FakeUser(),
|
|
|
|
"tenant_id": "some-cool-id",
|
|
|
|
"auth_url": "http://example.com",
|
|
|
|
"tenant_name": "Tenant"}
|
2018-02-13 08:09:27 +00:00
|
|
|
out = yaml.safe_load(loader.render_to_string(
|
2017-11-22 14:21:03 +00:00
|
|
|
'project/api_access/clouds.yaml.template',
|
|
|
|
context,
|
|
|
|
template.Context(context)))
|
|
|
|
|
|
|
|
self.assertIn('clouds', out)
|
|
|
|
self.assertIn('openstack', out['clouds'])
|
|
|
|
self.assertNotIn('profile', out['clouds']['openstack'])
|
|
|
|
self.assertEqual(
|
|
|
|
"http://example.com",
|
|
|
|
out['clouds']['openstack']['auth']['auth_url'])
|
|
|
|
self.assertNotIn('region_name', out['clouds']['openstack'])
|
|
|
|
self.assertNotIn('regions', out['clouds']['openstack'])
|
|
|
|
|
|
|
|
def test_clouds_yaml_regions(self):
|
|
|
|
regions = ['region1', 'region2']
|
|
|
|
context = {
|
|
|
|
"cloud_name": "openstack",
|
|
|
|
"user": FakeUser(),
|
|
|
|
"tenant_id": "some-cool-id",
|
|
|
|
"auth_url": "http://example.com",
|
|
|
|
"tenant_name": "Tenant",
|
|
|
|
"regions": regions}
|
2018-02-13 08:09:27 +00:00
|
|
|
out = yaml.safe_load(loader.render_to_string(
|
2017-11-22 14:21:03 +00:00
|
|
|
'project/api_access/clouds.yaml.template',
|
|
|
|
context,
|
|
|
|
template.Context(context)))
|
|
|
|
|
|
|
|
self.assertIn('clouds', out)
|
|
|
|
self.assertIn('openstack', out['clouds'])
|
|
|
|
self.assertNotIn('profile', out['clouds']['openstack'])
|
|
|
|
self.assertEqual(
|
|
|
|
"http://example.com",
|
|
|
|
out['clouds']['openstack']['auth']['auth_url'])
|
|
|
|
self.assertNotIn('region_name', out['clouds']['openstack'])
|
|
|
|
self.assertIn('regions', out['clouds']['openstack'])
|
|
|
|
self.assertEqual(regions, out['clouds']['openstack']['regions'])
|
|
|
|
|
|
|
|
def test_clouds_yaml_profile(self):
|
|
|
|
regions = ['region1', 'region2']
|
|
|
|
context = {
|
|
|
|
"cloud_name": "openstack",
|
|
|
|
"user": FakeUser(),
|
|
|
|
"profile": "example",
|
|
|
|
"tenant_id": "some-cool-id",
|
|
|
|
"auth_url": "http://example.com",
|
|
|
|
"tenant_name": "Tenant",
|
|
|
|
"regions": regions}
|
2018-02-13 08:09:27 +00:00
|
|
|
out = yaml.safe_load(loader.render_to_string(
|
2017-11-22 14:21:03 +00:00
|
|
|
'project/api_access/clouds.yaml.template',
|
|
|
|
context,
|
|
|
|
template.Context(context)))
|
|
|
|
|
|
|
|
self.assertIn('clouds', out)
|
|
|
|
self.assertIn('openstack', out['clouds'])
|
|
|
|
self.assertIn('profile', out['clouds']['openstack'])
|
|
|
|
self.assertEqual('example', out['clouds']['openstack']['profile'])
|
|
|
|
self.assertNotIn('auth_url', out['clouds']['openstack']['auth'])
|
|
|
|
self.assertNotIn('region_name', out['clouds']['openstack'])
|
|
|
|
self.assertNotIn('regions', out['clouds']['openstack'])
|