Tim Burke a38efb6031 Add v1password keystoneauth plugin
This lets us use Keystone sessions against endpoints like swauth and
tempauth with code like:

    import keystoneauth1.loading
    import keystoneauth1.session
    import swiftclient

    loader = keystoneauth1.loading.get_plugin_loader('v1password')
    auth_plugin = loader.load_from_options(
        auth_url='http://saio:8080/auth/v1.0',
        username='test:tester',
        password='testing')
    keystone_session = keystoneauth1.session.Session(auth_plugin)

    conn = swiftclient.Connection(session=keystone_session)

The plugin includes an optional project_name option, which may be used
to override the swift account from the storage url that was returned.
Additionally, it includes enough infrastructure to support some commands
in python-openstackclient>=3.0:

    export OS_AUTH_TYPE=v1password
    export OS_AUTH_URL=http://saio:8080/auth/v1.0
    export OS_PROJECT_NAME=AUTH_test2
    export OS_USERNAME=test:tester
    export OS_PASSWORD=testing

    openstack token issue
    openstack catalog list
    openstack catalog show object-store
    openstack object store account show
    openstack container list
    openstack container create <container>
    openstack container save <container>
    openstack container show <container>
    openstack container delete <container>
    openstack object list <container>
    openstack object create <container> <file>
    openstack object save <container> <object>
    opsentack object show <container> <object>
    openstack object delete <container> <object>

Change-Id: Ia963dc44415f72a6518227e86d9528a987e07491
2016-10-24 01:52:37 +02:00

247 lines
10 KiB
Python

# Copyright 2016 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
import datetime
import json
import mock
import unittest
from keystoneauth1 import plugin
from keystoneauth1 import loading
from keystoneauth1 import exceptions
from swiftclient import authv1
class TestDataNoAccount(object):
options = dict(
auth_url='http://saio:8080/auth/v1.0',
username='test:tester',
password='testing')
storage_url = 'http://saio:8080/v1/AUTH_test'
expected_endpoint = storage_url
token = 'token'
class TestDataWithAccount(object):
options = dict(
auth_url='http://saio:8080/auth/v1.0',
username='test2:tester2',
project_name='SOME_other_account',
password='testing2')
storage_url = 'http://saio:8080/v1/AUTH_test2'
expected_endpoint = 'http://saio:8080/v1/SOME_other_account'
token = 'other_token'
class TestPluginLoading(TestDataNoAccount, unittest.TestCase):
def test_can_load(self):
loader = loading.get_plugin_loader('v1password')
self.assertIsInstance(loader, authv1.PasswordLoader)
auth_plugin = loader.load_from_options(**self.options)
self.assertIsInstance(auth_plugin, authv1.PasswordPlugin)
self.assertEqual(self.options['auth_url'], auth_plugin.auth_url)
self.assertEqual(self.options['username'], auth_plugin.user)
self.assertEqual(self.options.get('project_name'), auth_plugin.account)
self.assertEqual(self.options['password'], auth_plugin.key)
def test_get_state(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.assertIsNone(auth_plugin.get_auth_state())
with mock.patch('swiftclient.authv1.time.time', return_value=1234.56):
auth_plugin.auth_ref = authv1.AccessInfoV1(
self.options['auth_url'],
self.storage_url,
self.options.get('project_name'),
self.options['username'],
self.token,
60)
expected = json.dumps({
'auth_url': self.options['auth_url'],
'username': self.options['username'],
'account': self.options.get('project_name'),
'issued': 1234.56,
'storage_url': self.storage_url,
'auth_token': self.token,
'expires': 1234.56 + 60,
}, sort_keys=True)
self.assertEqual(expected, auth_plugin.auth_ref.get_state())
self.assertEqual(expected, auth_plugin.get_auth_state())
def test_set_state(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.assertIsNone(auth_plugin.auth_ref)
auth_plugin.auth_ref = object()
auth_plugin.set_auth_state(None)
self.assertIsNone(auth_plugin.get_auth_state())
state = json.dumps({
'auth_url': self.options['auth_url'],
'username': self.options['username'],
'account': self.options.get('project_name'),
'issued': 1234.56,
'storage_url': self.storage_url,
'auth_token': self.token,
'expires': None,
}, sort_keys=True)
auth_plugin.set_auth_state(state)
self.assertIsInstance(auth_plugin.auth_ref, authv1.AccessInfoV1)
self.assertEqual(self.options['username'],
auth_plugin.auth_ref.username)
self.assertEqual(self.options['auth_url'],
auth_plugin.auth_ref.auth_url)
self.assertEqual(self.storage_url, auth_plugin.auth_ref.storage_url)
self.assertEqual(self.options.get('project_name'), auth_plugin.account)
self.assertEqual(self.token, auth_plugin.auth_ref.auth_token)
self.assertEqual(1234.56, auth_plugin.auth_ref._issued)
self.assertIs(datetime.datetime, type(auth_plugin.auth_ref.issued))
self.assertIsNone(auth_plugin.auth_ref._expires)
self.assertIsNone(auth_plugin.auth_ref.expires)
class TestPluginLoadingWithAccount(TestDataWithAccount, TestPluginLoading):
pass
class TestPlugin(TestDataNoAccount, unittest.TestCase):
def setUp(self):
self.mock_session = mock.MagicMock()
self.mock_response = self.mock_session.get.return_value
self.mock_response.status_code = 200
self.mock_response.headers = {
'X-Auth-Token': self.token,
'X-Storage-Url': self.storage_url,
}
def test_get_access(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
with mock.patch('swiftclient.authv1.time.time', return_value=1234.56):
access = auth_plugin.get_access(self.mock_session)
self.assertEqual(self.mock_session.get.mock_calls, [mock.call(
self.options['auth_url'], authenticated=False, log=False, headers={
'X-Auth-User': self.options['username'],
'X-Auth-Key': self.options['password'],
})])
self.assertEqual(self.options['username'], access.username)
# `openstack token issue` requires a user_id property
self.assertEqual(self.options['username'], access.user_id)
self.assertEqual(self.storage_url, access.storage_url)
self.assertEqual(self.token, access.auth_token)
self.assertEqual(1234.56, access._issued)
self.assertIs(datetime.datetime, type(auth_plugin.auth_ref.issued))
self.assertIsNone(access.expires)
# `openstack catalog list/show` require a catalog property
catalog = access.service_catalog.catalog
self.assertEqual('swift', catalog[0].get('name'))
self.assertEqual('object-store', catalog[0].get('type'))
self.assertIn('endpoints', catalog[0])
self.assertIn(self.storage_url, [
e.get('publicURL') for e in catalog[0]['endpoints']])
def test_get_access_with_expiry(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.mock_response.headers['X-Auth-Token-Expires'] = '78.9'
with mock.patch('swiftclient.authv1.time.time',
return_value=1234.56) as mock_time:
access = auth_plugin.get_access(self.mock_session)
self.assertEqual(1234.56 + 78.9, access._expires)
self.assertIs(datetime.datetime,
type(auth_plugin.auth_ref.expires))
self.assertIs(True, access.will_expire_soon(90))
self.assertIs(False, access.will_expire_soon(60))
self.assertEqual(3, len(mock_time.mock_calls))
def test_get_access_bad_expiry(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.mock_response.headers['X-Auth-Token-Expires'] = 'foo'
access = auth_plugin.get_access(self.mock_session)
self.assertEqual(None, access.expires)
self.assertIs(False, access.will_expire_soon(60))
self.assertIs(False, access.will_expire_soon(1e20))
def test_get_access_bad_status(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.mock_response.status_code = 401
self.assertRaises(exceptions.InvalidResponse,
auth_plugin.get_access, self.mock_session)
def test_get_access_missing_token(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.mock_response.headers.pop('X-Auth-Token')
self.assertRaises(exceptions.InvalidResponse,
auth_plugin.get_access, self.mock_session)
def test_get_access_accepts_storage_token(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.mock_response.headers.pop('X-Auth-Token')
self.mock_response.headers['X-Storage-Token'] = 'yet another token'
access = auth_plugin.get_access(self.mock_session)
self.assertEqual('yet another token', access.auth_token)
def test_get_access_missing_url(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
self.mock_response.headers.pop('X-Storage-Url')
self.assertRaises(exceptions.InvalidResponse,
auth_plugin.get_access, self.mock_session)
def test_get_endpoint(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
object_store_endpoint = auth_plugin.get_endpoint(
self.mock_session, service_type='object-store')
self.assertEqual(object_store_endpoint, self.expected_endpoint)
auth_endpoint = auth_plugin.get_endpoint(
self.mock_session, interface=plugin.AUTH_INTERFACE)
self.assertEqual(auth_endpoint, self.options['auth_url'])
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(self.mock_session)
self.assertEqual('public endpoint for None service not found',
str(exc_mgr.exception))
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(
self.mock_session, service_type='identity', region_name='DFW')
self.assertEqual(
'public endpoint for identity service in DFW region not found',
str(exc_mgr.exception))
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(
self.mock_session, service_type='image', service_name='glance')
self.assertEqual(
'public endpoint for image service named glance not found',
str(exc_mgr.exception))
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(
self.mock_session, service_type='compute', service_name='nova',
region_name='IAD')
self.assertEqual('public endpoint for compute service named nova in '
'IAD region not found', str(exc_mgr.exception))
class TestPluginWithAccount(TestDataWithAccount, TestPlugin):
pass