python-swiftclient/test/unit/test_authv1.py

256 lines
10 KiB
Python

# Copyright 2016 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
import datetime
import json
import unittest
from unittest import mock
try:
from keystoneauth1 import plugin
from keystoneauth1 import loading
from keystoneauth1 import exceptions
from swiftclient import authv1
except ImportError:
plugin = loading = exceptions = authv1 = None
class TestDataNoAccount:
options = dict(
auth_url='http://saio:8080/auth/v1.0',
username='test:tester',
password='testing')
storage_url = 'http://saio:8080/v1/AUTH_test'
expected_endpoint = storage_url
token = 'token'
class TestDataWithAccount:
options = dict(
auth_url='http://saio:8080/auth/v1.0',
username='test2:tester2',
project_name='SOME_other_account',
password='testing2')
storage_url = 'http://saio:8080/v1/AUTH_test2'
expected_endpoint = 'http://saio:8080/v1/SOME_other_account'
token = 'other_token'
class TestPluginLoading(TestDataNoAccount, unittest.TestCase):
def setUp(self):
if authv1 is None:
raise unittest.SkipTest('keystoneauth1 is not available')
def test_can_load(self):
loader = loading.get_plugin_loader('v1password')
self.assertIsInstance(loader, authv1.PasswordLoader)
auth_plugin = loader.load_from_options(**self.options)
self.assertIsInstance(auth_plugin, authv1.PasswordPlugin)
self.assertEqual(self.options['auth_url'], auth_plugin.auth_url)
self.assertEqual(self.options['username'], auth_plugin.user)
self.assertEqual(self.options.get('project_name'), auth_plugin.account)
self.assertEqual(self.options['password'], auth_plugin.key)
def test_get_state(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.assertIsNone(auth_plugin.get_auth_state())
with mock.patch('swiftclient.authv1.time.time', return_value=1234.56):
auth_plugin.auth_ref = authv1.AccessInfoV1(
self.options['auth_url'],
self.storage_url,
self.options.get('project_name'),
self.options['username'],
self.token,
60)
expected = json.dumps({
'auth_url': self.options['auth_url'],
'username': self.options['username'],
'account': self.options.get('project_name'),
'issued': 1234.56,
'storage_url': self.storage_url,
'auth_token': self.token,
'expires': 1234.56 + 60,
}, sort_keys=True)
self.assertEqual(expected, auth_plugin.auth_ref.get_state())
self.assertEqual(expected, auth_plugin.get_auth_state())
def test_set_state(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.assertIsNone(auth_plugin.auth_ref)
auth_plugin.auth_ref = object()
auth_plugin.set_auth_state(None)
self.assertIsNone(auth_plugin.get_auth_state())
state = json.dumps({
'auth_url': self.options['auth_url'],
'username': self.options['username'],
'account': self.options.get('project_name'),
'issued': 1234.56,
'storage_url': self.storage_url,
'auth_token': self.token,
'expires': None,
}, sort_keys=True)
auth_plugin.set_auth_state(state)
self.assertIsInstance(auth_plugin.auth_ref, authv1.AccessInfoV1)
self.assertEqual(self.options['username'],
auth_plugin.auth_ref.username)
self.assertEqual(self.options['auth_url'],
auth_plugin.auth_ref.auth_url)
self.assertEqual(self.storage_url, auth_plugin.auth_ref.storage_url)
self.assertEqual(self.options.get('project_name'), auth_plugin.account)
self.assertEqual(self.token, auth_plugin.auth_ref.auth_token)
self.assertEqual(1234.56, auth_plugin.auth_ref._issued)
self.assertIs(datetime.datetime, type(auth_plugin.auth_ref.issued))
self.assertIsNone(auth_plugin.auth_ref._expires)
self.assertIsNone(auth_plugin.auth_ref.expires)
class TestPluginLoadingWithAccount(TestDataWithAccount, TestPluginLoading):
pass
class TestPlugin(TestDataNoAccount, unittest.TestCase):
def setUp(self):
if authv1 is None:
raise unittest.SkipTest('keystoneauth1 is not available')
self.mock_session = mock.MagicMock()
self.mock_response = self.mock_session.get.return_value
self.mock_response.status_code = 200
self.mock_response.headers = {
'X-Auth-Token': self.token,
'X-Storage-Url': self.storage_url,
}
def test_get_access(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
with mock.patch('swiftclient.authv1.time.time', return_value=1234.56):
access = auth_plugin.get_access(self.mock_session)
self.assertEqual(self.mock_session.get.mock_calls, [mock.call(
self.options['auth_url'], authenticated=False, log=False, headers={
'X-Auth-User': self.options['username'],
'X-Auth-Key': self.options['password'],
})])
self.assertEqual(self.options['username'], access.username)
# `openstack token issue` requires a user_id property
self.assertEqual(self.options['username'], access.user_id)
self.assertEqual(self.storage_url, access.storage_url)
self.assertEqual(self.token, access.auth_token)
self.assertEqual(1234.56, access._issued)
self.assertIs(datetime.datetime, type(auth_plugin.auth_ref.issued))
self.assertIsNone(access.expires)
# `openstack catalog list/show` require a catalog property
catalog = access.service_catalog.catalog
self.assertEqual('swift', catalog[0].get('name'))
self.assertEqual('object-store', catalog[0].get('type'))
self.assertIn('endpoints', catalog[0])
self.assertIn(self.storage_url, [
e.get('publicURL') for e in catalog[0]['endpoints']])
def test_get_access_with_expiry(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.mock_response.headers['X-Auth-Token-Expires'] = '78.9'
with mock.patch('swiftclient.authv1.time.time',
return_value=1234.56) as mock_time:
access = auth_plugin.get_access(self.mock_session)
self.assertEqual(1234.56 + 78.9, access._expires)
self.assertIs(datetime.datetime,
type(auth_plugin.auth_ref.expires))
self.assertIs(True, access.will_expire_soon(90))
self.assertIs(False, access.will_expire_soon(60))
self.assertEqual(3, len(mock_time.mock_calls))
def test_get_access_bad_expiry(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.mock_response.headers['X-Auth-Token-Expires'] = 'foo'
access = auth_plugin.get_access(self.mock_session)
self.assertIsNone(access.expires)
self.assertIs(False, access.will_expire_soon(60))
self.assertIs(False, access.will_expire_soon(1e20))
def test_get_access_bad_status(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.mock_response.status_code = 401
self.assertRaises(exceptions.InvalidResponse,
auth_plugin.get_access, self.mock_session)
def test_get_access_missing_token(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.mock_response.headers.pop('X-Auth-Token')
self.assertRaises(exceptions.InvalidResponse,
auth_plugin.get_access, self.mock_session)
def test_get_access_accepts_storage_token(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.mock_response.headers.pop('X-Auth-Token')
self.mock_response.headers['X-Storage-Token'] = 'yet another token'
access = auth_plugin.get_access(self.mock_session)
self.assertEqual('yet another token', access.auth_token)
def test_get_access_missing_url(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.mock_response.headers.pop('X-Storage-Url')
self.assertRaises(exceptions.InvalidResponse,
auth_plugin.get_access, self.mock_session)
def test_get_endpoint(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
object_store_endpoint = auth_plugin.get_endpoint(
self.mock_session, service_type='object-store')
self.assertEqual(object_store_endpoint, self.expected_endpoint)
auth_endpoint = auth_plugin.get_endpoint(
self.mock_session, interface=plugin.AUTH_INTERFACE)
self.assertEqual(auth_endpoint, self.options['auth_url'])
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(self.mock_session)
self.assertEqual('public endpoint for None service not found',
str(exc_mgr.exception))
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(
self.mock_session, service_type='identity', region_name='DFW')
self.assertEqual(
'public endpoint for identity service in DFW region not found',
str(exc_mgr.exception))
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(
self.mock_session, service_type='image', service_name='glance')
self.assertEqual(
'public endpoint for image service named glance not found',
str(exc_mgr.exception))
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(
self.mock_session, service_type='compute', service_name='nova',
region_name='IAD')
self.assertEqual('public endpoint for compute service named nova in '
'IAD region not found', str(exc_mgr.exception))
class TestPluginWithAccount(TestDataWithAccount, TestPlugin):
pass