Improve authentication plugins management.
The current auth plugin system lacks some functionality to be used with other methods that might require additional configuration options or that do not require a user to pass some options that are now compulsory (for example, X.509 authentication needs to get a certificate file, and does not need either a username or a password). This commit extends the current system to handle these extra features, while remaining compatible with older plugins. DocImpact: We should documment how to implement additional authentication plugins, such as BasicAuth, X509, etc. Implements: blueprint authentication-plugins Change-Id: I7b0ef4981efba8160dea94bf852dba7e2e4068f5
This commit is contained in:
parent
593adf229a
commit
abd75f24b1
141
novaclient/auth_plugin.py
Normal file
141
novaclient/auth_plugin.py
Normal file
@ -0,0 +1,141 @@
|
||||
# Copyright 2013 OpenStack Foundation
|
||||
# Copyright 2013 Spanish National Research Council.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import pkg_resources
|
||||
|
||||
from novaclient import exceptions
|
||||
from novaclient import utils
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_discovered_plugins = {}
|
||||
|
||||
|
||||
def discover_auth_systems():
|
||||
"""Discover the available auth-systems.
|
||||
|
||||
This won't take into account the old style auth-systems.
|
||||
"""
|
||||
ep_name = 'openstack.client.auth_plugin'
|
||||
for ep in pkg_resources.iter_entry_points(ep_name):
|
||||
try:
|
||||
auth_plugin = ep.load()
|
||||
except (ImportError, pkg_resources.UnknownExtra, AttributeError) as e:
|
||||
logger.debug("ERROR: Cannot load auth plugin %s" % ep.name)
|
||||
logger.debug(e, exc_info=1)
|
||||
else:
|
||||
_discovered_plugins[ep.name] = auth_plugin
|
||||
|
||||
|
||||
def load_auth_system_opts(parser):
|
||||
"""Load options needed by the available auth-systems into a parser.
|
||||
|
||||
This function will try to populate the parser with options from the
|
||||
available plugins.
|
||||
"""
|
||||
for name, auth_plugin in _discovered_plugins.iteritems():
|
||||
add_opts_fn = getattr(auth_plugin, "add_opts", None)
|
||||
if add_opts_fn:
|
||||
group = parser.add_argument_group("Auth-system '%s' options" %
|
||||
name)
|
||||
add_opts_fn(group)
|
||||
|
||||
|
||||
def load_plugin(auth_system):
|
||||
if auth_system in _discovered_plugins:
|
||||
return _discovered_plugins[auth_system]()
|
||||
|
||||
# NOTE(aloga): If we arrive here, the plugin will be an old-style one,
|
||||
# so we have to create a fake AuthPlugin for it.
|
||||
return DeprecatedAuthPlugin(auth_system)
|
||||
|
||||
|
||||
class BaseAuthPlugin(object):
|
||||
"""Base class for authentication plugins.
|
||||
|
||||
An authentication plugin needs to override at least the authenticate
|
||||
method to be a valid plugin.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.opts = {}
|
||||
|
||||
def get_auth_url(self):
|
||||
"""Return the auth url for the plugin (if any)."""
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def add_opts(parser):
|
||||
"""Populate and return the parser with the options for this plugin.
|
||||
|
||||
If the plugin does not need any options, it should return the same
|
||||
parser untouched.
|
||||
"""
|
||||
return parser
|
||||
|
||||
def parse_opts(self, args):
|
||||
"""Parse the actual auth-system options if any.
|
||||
|
||||
This method is expected to populate the attribute self.opts with a
|
||||
dict containing the options and values needed to make authentication.
|
||||
If the dict is empty, the client should assume that it needs the same
|
||||
options as the 'keystone' auth system (i.e. os_username and
|
||||
os_password).
|
||||
|
||||
Returns the self.opts dict.
|
||||
"""
|
||||
return self.opts
|
||||
|
||||
def authenticate(self, cls, auth_url):
|
||||
"""Authenticate using plugin defined method."""
|
||||
raise exceptions.AuthSystemNotFound(self.auth_system)
|
||||
|
||||
|
||||
class DeprecatedAuthPlugin(object):
|
||||
"""Class to mimic the AuthPlugin class for deprecated auth systems.
|
||||
|
||||
Old auth systems only define two entry points: openstack.client.auth_url
|
||||
and openstack.client.authenticate. This class will load those entry points
|
||||
into a class similar to a valid AuthPlugin.
|
||||
"""
|
||||
def __init__(self, auth_system):
|
||||
self.auth_system = auth_system
|
||||
|
||||
def authenticate(cls, auth_url):
|
||||
raise exceptions.AuthSystemNotFound(self.auth_system)
|
||||
|
||||
self.opts = {}
|
||||
|
||||
self.get_auth_url = lambda: None
|
||||
self.authenticate = authenticate
|
||||
|
||||
self._load_endpoints()
|
||||
|
||||
def _load_endpoints(self):
|
||||
ep_name = 'openstack.client.auth_url'
|
||||
fn = utils._load_entry_point(ep_name, name=self.auth_system)
|
||||
if fn:
|
||||
self.get_auth_url = fn
|
||||
|
||||
ep_name = 'openstack.client.authenticate'
|
||||
fn = utils._load_entry_point(ep_name, name=self.auth_system)
|
||||
if fn:
|
||||
self.authenticate = fn
|
||||
|
||||
def parse_opts(self, args):
|
||||
return self.opts
|
@ -31,15 +31,6 @@ from novaclient import service_catalog
|
||||
from novaclient import utils
|
||||
|
||||
|
||||
def get_auth_system_url(auth_system):
|
||||
"""Load plugin-based auth_url"""
|
||||
ep_name = 'openstack.client.auth_url'
|
||||
for ep in pkg_resources.iter_entry_points(ep_name):
|
||||
if ep.name == auth_system:
|
||||
return ep.load()()
|
||||
raise exceptions.AuthSystemNotFound(auth_system)
|
||||
|
||||
|
||||
class HTTPClient(object):
|
||||
|
||||
USER_AGENT = 'python-novaclient'
|
||||
@ -52,12 +43,17 @@ class HTTPClient(object):
|
||||
timings=False, bypass_url=None,
|
||||
os_cache=False, no_cache=True,
|
||||
http_log_debug=False, auth_system='keystone',
|
||||
auth_plugin=None,
|
||||
cacert=None):
|
||||
self.user = user
|
||||
self.password = password
|
||||
self.projectid = projectid
|
||||
|
||||
if auth_system and auth_system != 'keystone' and not auth_plugin:
|
||||
raise exceptions.AuthSystemNotFound(auth_system)
|
||||
|
||||
if not auth_url and auth_system and auth_system != 'keystone':
|
||||
auth_url = get_auth_system_url(auth_system)
|
||||
auth_url = auth_plugin.get_auth_url()
|
||||
if not auth_url:
|
||||
raise exceptions.EndpointNotFound()
|
||||
self.auth_url = auth_url.rstrip('/')
|
||||
@ -94,6 +90,7 @@ class HTTPClient(object):
|
||||
self.verify_cert = True
|
||||
|
||||
self.auth_system = auth_system
|
||||
self.auth_plugin = auth_plugin
|
||||
|
||||
self._logger = logging.getLogger(__name__)
|
||||
if self.http_log_debug:
|
||||
@ -392,12 +389,7 @@ class HTTPClient(object):
|
||||
raise exceptions.from_response(resp, body, url)
|
||||
|
||||
def _plugin_auth(self, auth_url):
|
||||
"""Load plugin-based authentication"""
|
||||
ep_name = 'openstack.client.authenticate'
|
||||
for ep in pkg_resources.iter_entry_points(ep_name):
|
||||
if ep.name == self.auth_system:
|
||||
return ep.load()(self, auth_url)
|
||||
raise exceptions.AuthSystemNotFound(self.auth_system)
|
||||
self.auth_plugin.authenticate(self, auth_url)
|
||||
|
||||
def _v2_auth(self, url):
|
||||
"""Authenticate against a v2.0 auth service."""
|
||||
@ -414,7 +406,7 @@ class HTTPClient(object):
|
||||
|
||||
self._authenticate(url, body)
|
||||
|
||||
def _authenticate(self, url, body):
|
||||
def _authenticate(self, url, body, **kwargs):
|
||||
"""Authenticate and extract the service catalog."""
|
||||
token_url = url + "/tokens"
|
||||
|
||||
@ -423,7 +415,8 @@ class HTTPClient(object):
|
||||
token_url,
|
||||
"POST",
|
||||
body=body,
|
||||
allow_redirects=True)
|
||||
allow_redirects=True,
|
||||
**kwargs)
|
||||
|
||||
return self._extract_service_catalog(url, resp, body)
|
||||
|
||||
|
@ -46,6 +46,7 @@ except ImportError:
|
||||
pass
|
||||
|
||||
import novaclient
|
||||
import novaclient.auth_plugin
|
||||
from novaclient import client
|
||||
from novaclient import exceptions as exc
|
||||
import novaclient.extension
|
||||
@ -398,6 +399,9 @@ class OpenStackComputeShell(object):
|
||||
parser.add_argument('--bypass_url',
|
||||
help=argparse.SUPPRESS)
|
||||
|
||||
# The auth-system-plugins might require some extra options
|
||||
novaclient.auth_plugin.load_auth_system_opts(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def get_subcommand_parser(self, version):
|
||||
@ -514,11 +518,15 @@ class OpenStackComputeShell(object):
|
||||
format=streamformat)
|
||||
|
||||
def main(self, argv):
|
||||
|
||||
# Parse args once to find version and debug settings
|
||||
parser = self.get_base_parser()
|
||||
(options, args) = parser.parse_known_args(argv)
|
||||
self.setup_debugging(options.debug)
|
||||
|
||||
# Discover available auth plugins
|
||||
novaclient.auth_plugin.discover_auth_systems()
|
||||
|
||||
# build available subcommands based on version
|
||||
self.extensions = self._discover_extensions(
|
||||
options.os_compute_api_version)
|
||||
@ -566,6 +574,11 @@ class OpenStackComputeShell(object):
|
||||
args.bypass_url, args.os_cache,
|
||||
args.os_cacert, args.timeout)
|
||||
|
||||
if os_auth_system and os_auth_system != "keystone":
|
||||
auth_plugin = novaclient.auth_plugin.load_plugin(os_auth_system)
|
||||
else:
|
||||
auth_plugin = None
|
||||
|
||||
# Fetched and set later as needed
|
||||
os_password = None
|
||||
|
||||
@ -579,12 +592,16 @@ class OpenStackComputeShell(object):
|
||||
#FIXME(usrleon): Here should be restrict for project id same as
|
||||
# for os_username or os_password but for compatibility it is not.
|
||||
if not utils.isunauthenticated(args.func):
|
||||
if not os_username:
|
||||
if not username:
|
||||
raise exc.CommandError("You must provide a username "
|
||||
"via either --os-username or env[OS_USERNAME]")
|
||||
else:
|
||||
os_username = username
|
||||
if auth_plugin:
|
||||
auth_plugin.parse_opts(args)
|
||||
|
||||
if not auth_plugin or not auth_plugin.opts:
|
||||
if not os_username:
|
||||
if not username:
|
||||
raise exc.CommandError("You must provide a username "
|
||||
"via either --os-username or env[OS_USERNAME]")
|
||||
else:
|
||||
os_username = username
|
||||
|
||||
if not os_tenant_name:
|
||||
if not projectid:
|
||||
@ -597,8 +614,7 @@ class OpenStackComputeShell(object):
|
||||
if not os_auth_url:
|
||||
if not url:
|
||||
if os_auth_system and os_auth_system != 'keystone':
|
||||
os_auth_url = \
|
||||
client.get_auth_system_url(os_auth_system)
|
||||
os_auth_url = auth_plugin.get_auth_url()
|
||||
else:
|
||||
os_auth_url = url
|
||||
|
||||
@ -627,6 +643,7 @@ class OpenStackComputeShell(object):
|
||||
region_name=os_region_name, endpoint_type=endpoint_type,
|
||||
extensions=self.extensions, service_type=service_type,
|
||||
service_name=service_name, auth_system=os_auth_system,
|
||||
auth_plugin=auth_plugin,
|
||||
volume_service_name=volume_service_name,
|
||||
timings=args.timings, bypass_url=bypass_url,
|
||||
os_cache=os_cache, http_log_debug=options.debug,
|
||||
@ -636,7 +653,12 @@ class OpenStackComputeShell(object):
|
||||
# identifying keyring key can come from the underlying client
|
||||
if not utils.isunauthenticated(args.func):
|
||||
helper = SecretsHelper(args, self.cs.client)
|
||||
use_pw = True
|
||||
if (auth_plugin and auth_plugin.opts and
|
||||
"os_password" not in auth_plugin.opts):
|
||||
use_pw = False
|
||||
else:
|
||||
use_pw = True
|
||||
|
||||
tenant_id, auth_token, management_url = (helper.tenant_id,
|
||||
helper.auth_token,
|
||||
helper.management_url)
|
||||
|
@ -1,4 +1,5 @@
|
||||
import os
|
||||
import pkg_resources
|
||||
import re
|
||||
import sys
|
||||
import textwrap
|
||||
@ -369,3 +370,12 @@ def check_uuid_like(val):
|
||||
raise exceptions.CommandError(
|
||||
"error: Invalid tenant-id %s supplied"
|
||||
% val)
|
||||
|
||||
|
||||
def _load_entry_point(ep_name, name=None):
|
||||
"""Try to load the entry point ep_name that matches name."""
|
||||
for ep in pkg_resources.iter_entry_points(ep_name, name=name):
|
||||
try:
|
||||
return ep.load()
|
||||
except (ImportError, pkg_resources.UnknownExtra, AttributeError):
|
||||
continue
|
||||
|
@ -74,6 +74,7 @@ class Client(object):
|
||||
volume_service_name=None, timings=False,
|
||||
bypass_url=None, os_cache=False, no_cache=True,
|
||||
http_log_debug=False, auth_system='keystone',
|
||||
auth_plugin=None,
|
||||
cacert=None):
|
||||
# FIXME(comstud): Rename the api_key argument above when we
|
||||
# know it's not being used as keyword argument
|
||||
@ -132,6 +133,7 @@ class Client(object):
|
||||
insecure=insecure,
|
||||
timeout=timeout,
|
||||
auth_system=auth_system,
|
||||
auth_plugin=auth_plugin,
|
||||
proxy_token=proxy_token,
|
||||
proxy_tenant_id=proxy_tenant_id,
|
||||
region_name=region_name,
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import argparse
|
||||
import mock
|
||||
import pkg_resources
|
||||
import requests
|
||||
@ -22,6 +23,7 @@ try:
|
||||
except ImportError:
|
||||
import simplejson as json
|
||||
|
||||
from novaclient import auth_plugin
|
||||
from novaclient import exceptions
|
||||
from novaclient.v1_1 import client
|
||||
from tests import utils
|
||||
@ -71,7 +73,7 @@ def requested_headers(cs):
|
||||
}
|
||||
|
||||
|
||||
class AuthPluginTest(utils.TestCase):
|
||||
class DeprecatedAuthPluginTest(utils.TestCase):
|
||||
def test_auth_system_success(self):
|
||||
class MockEntrypoint(pkg_resources.EntryPoint):
|
||||
def load(self):
|
||||
@ -80,9 +82,11 @@ class AuthPluginTest(utils.TestCase):
|
||||
def authenticate(self, cls, auth_url):
|
||||
cls._authenticate(auth_url, {"fake": "me"})
|
||||
|
||||
def mock_iter_entry_points(_type):
|
||||
def mock_iter_entry_points(_type, name):
|
||||
if _type == 'openstack.client.authenticate':
|
||||
return [MockEntrypoint("fake", "fake", ["fake"])]
|
||||
else:
|
||||
return []
|
||||
|
||||
mock_request = mock_http_request()
|
||||
|
||||
@ -90,8 +94,10 @@ class AuthPluginTest(utils.TestCase):
|
||||
mock_iter_entry_points)
|
||||
@mock.patch.object(requests, "request", mock_request)
|
||||
def test_auth_call():
|
||||
plugin = auth_plugin.DeprecatedAuthPlugin("fake")
|
||||
cs = client.Client("username", "password", "project_id",
|
||||
"auth_url/v2.0", auth_system="fake")
|
||||
"auth_url/v2.0", auth_system="fake",
|
||||
auth_plugin=plugin)
|
||||
cs.client.authenticate()
|
||||
|
||||
headers = requested_headers(cs)
|
||||
@ -108,7 +114,7 @@ class AuthPluginTest(utils.TestCase):
|
||||
test_auth_call()
|
||||
|
||||
def test_auth_system_not_exists(self):
|
||||
def mock_iter_entry_points(_t):
|
||||
def mock_iter_entry_points(_t, name=None):
|
||||
return [pkg_resources.EntryPoint("fake", "fake", ["fake"])]
|
||||
|
||||
mock_request = mock_http_request()
|
||||
@ -117,8 +123,11 @@ class AuthPluginTest(utils.TestCase):
|
||||
mock_iter_entry_points)
|
||||
@mock.patch.object(requests, "request", mock_request)
|
||||
def test_auth_call():
|
||||
auth_plugin.discover_auth_systems()
|
||||
plugin = auth_plugin.DeprecatedAuthPlugin("notexists")
|
||||
cs = client.Client("username", "password", "project_id",
|
||||
"auth_url/v2.0", auth_system="notexists")
|
||||
"auth_url/v2.0", auth_system="notexists",
|
||||
auth_plugin=plugin)
|
||||
self.assertRaises(exceptions.AuthSystemNotFound,
|
||||
cs.client.authenticate)
|
||||
|
||||
@ -139,29 +148,35 @@ class AuthPluginTest(utils.TestCase):
|
||||
def authenticate(self, cls, auth_url):
|
||||
cls._authenticate(auth_url, {"fake": "me"})
|
||||
|
||||
def mock_iter_entry_points(_type):
|
||||
def mock_iter_entry_points(_type, name):
|
||||
if _type == 'openstack.client.auth_url':
|
||||
return [MockAuthUrlEntrypoint("fakewithauthurl",
|
||||
"fakewithauthurl.plugin",
|
||||
"fakewithauthurl",
|
||||
["auth_url"])]
|
||||
elif _type == 'openstack.client.authenticate':
|
||||
return [MockAuthenticateEntrypoint("fakewithauthurl",
|
||||
"fakewithauthurl.plugin",
|
||||
["auth_url"])]
|
||||
"fakewithauthurl",
|
||||
["authenticate"])]
|
||||
else:
|
||||
return []
|
||||
|
||||
mock_request = mock_http_request()
|
||||
|
||||
@mock.patch.object(pkg_resources, "iter_entry_points",
|
||||
mock_iter_entry_points)
|
||||
@mock.patch.object(requests, "request", mock_request)
|
||||
def test_auth_call():
|
||||
plugin = auth_plugin.DeprecatedAuthPlugin("fakewithauthurl")
|
||||
cs = client.Client("username", "password", "project_id",
|
||||
auth_system="fakewithauthurl")
|
||||
auth_system="fakewithauthurl",
|
||||
auth_plugin=plugin)
|
||||
cs.client.authenticate()
|
||||
self.assertEquals(cs.client.auth_url, "http://faked/v2.0")
|
||||
|
||||
test_auth_call()
|
||||
|
||||
def test_auth_system_raises_exception_when_missing_auth_url(self):
|
||||
@mock.patch.object(pkg_resources, "iter_entry_points")
|
||||
def test_client_raises_exc_without_auth_url(self, mock_iter_entry_points):
|
||||
class MockAuthUrlEntrypoint(pkg_resources.EntryPoint):
|
||||
def load(self):
|
||||
return self.auth_url
|
||||
@ -169,17 +184,163 @@ class AuthPluginTest(utils.TestCase):
|
||||
def auth_url(self):
|
||||
return None
|
||||
|
||||
def mock_iter_entry_points(_type):
|
||||
return [MockAuthUrlEntrypoint("fakewithauthurl",
|
||||
"fakewithauthurl.plugin",
|
||||
["auth_url"])]
|
||||
mock_iter_entry_points.side_effect = lambda _t, name: [
|
||||
MockAuthUrlEntrypoint("fakewithauthurl",
|
||||
"fakewithauthurl",
|
||||
["auth_url"])]
|
||||
|
||||
@mock.patch.object(pkg_resources, "iter_entry_points",
|
||||
mock_iter_entry_points)
|
||||
def test_auth_call():
|
||||
self.assertRaises(
|
||||
plugin = auth_plugin.DeprecatedAuthPlugin("fakewithauthurl")
|
||||
self.assertRaises(
|
||||
exceptions.EndpointNotFound,
|
||||
client.Client, "username", "password", "project_id",
|
||||
auth_system="fakewithauthurl", auth_plugin=plugin)
|
||||
|
||||
|
||||
class AuthPluginTest(utils.TestCase):
|
||||
@mock.patch.object(requests, "request")
|
||||
@mock.patch.object(pkg_resources, "iter_entry_points")
|
||||
def test_auth_system_success(self, mock_iter_entry_points, mock_request):
|
||||
"""Test that we can authenticate using the auth system."""
|
||||
class MockEntrypoint(pkg_resources.EntryPoint):
|
||||
def load(self):
|
||||
return FakePlugin
|
||||
|
||||
class FakePlugin(auth_plugin.BaseAuthPlugin):
|
||||
def authenticate(self, cls, auth_url):
|
||||
cls._authenticate(auth_url, {"fake": "me"})
|
||||
|
||||
mock_iter_entry_points.side_effect = lambda _t: [
|
||||
MockEntrypoint("fake", "fake", ["FakePlugin"])]
|
||||
|
||||
mock_request.side_effect = mock_http_request()
|
||||
|
||||
auth_plugin.discover_auth_systems()
|
||||
plugin = auth_plugin.load_plugin("fake")
|
||||
cs = client.Client("username", "password", "project_id",
|
||||
"auth_url/v2.0", auth_system="fake",
|
||||
auth_plugin=plugin)
|
||||
cs.client.authenticate()
|
||||
|
||||
headers = requested_headers(cs)
|
||||
token_url = cs.client.auth_url + "/tokens"
|
||||
|
||||
mock_request.assert_called_with(
|
||||
"POST",
|
||||
token_url,
|
||||
headers=headers,
|
||||
data='{"fake": "me"}',
|
||||
allow_redirects=True,
|
||||
**self.TEST_REQUEST_BASE)
|
||||
|
||||
@mock.patch.object(pkg_resources, "iter_entry_points")
|
||||
def test_discover_auth_system_options(self, mock_iter_entry_points):
|
||||
"""Test that we can load the auth system options."""
|
||||
class FakePlugin(auth_plugin.BaseAuthPlugin):
|
||||
@staticmethod
|
||||
def add_opts(parser):
|
||||
parser.add_argument('--auth_system_opt',
|
||||
default=False,
|
||||
action='store_true',
|
||||
help="Fake option")
|
||||
return parser
|
||||
|
||||
class MockEntrypoint(pkg_resources.EntryPoint):
|
||||
def load(self):
|
||||
return FakePlugin
|
||||
|
||||
mock_iter_entry_points.side_effect = lambda _t: [
|
||||
MockEntrypoint("fake", "fake", ["FakePlugin"])]
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
auth_plugin.discover_auth_systems()
|
||||
auth_plugin.load_auth_system_opts(parser)
|
||||
opts, args = parser.parse_known_args(['--auth_system_opt'])
|
||||
|
||||
self.assertTrue(opts.auth_system_opt)
|
||||
|
||||
@mock.patch.object(pkg_resources, "iter_entry_points")
|
||||
def test_parse_auth_system_options(self, mock_iter_entry_points):
|
||||
"""Test that we can parse the auth system options."""
|
||||
class MockEntrypoint(pkg_resources.EntryPoint):
|
||||
def load(self):
|
||||
return FakePlugin
|
||||
|
||||
class FakePlugin(auth_plugin.BaseAuthPlugin):
|
||||
def __init__(self):
|
||||
self.opts = {"fake_argument": True}
|
||||
|
||||
def parse_opts(self, args):
|
||||
return self.opts
|
||||
|
||||
mock_iter_entry_points.side_effect = lambda _t: [
|
||||
MockEntrypoint("fake", "fake", ["FakePlugin"])]
|
||||
|
||||
auth_plugin.discover_auth_systems()
|
||||
plugin = auth_plugin.load_plugin("fake")
|
||||
|
||||
plugin.parse_opts([])
|
||||
self.assertIn("fake_argument", plugin.opts)
|
||||
|
||||
@mock.patch.object(pkg_resources, "iter_entry_points")
|
||||
def test_auth_system_defining_url(self, mock_iter_entry_points):
|
||||
"""Test the auth_system defining an url."""
|
||||
class MockEntrypoint(pkg_resources.EntryPoint):
|
||||
def load(self):
|
||||
return FakePlugin
|
||||
|
||||
class FakePlugin(auth_plugin.BaseAuthPlugin):
|
||||
def get_auth_url(self):
|
||||
return "http://faked/v2.0"
|
||||
|
||||
mock_iter_entry_points.side_effect = lambda _t: [
|
||||
MockEntrypoint("fake", "fake", ["FakePlugin"])]
|
||||
|
||||
auth_plugin.discover_auth_systems()
|
||||
plugin = auth_plugin.load_plugin("fake")
|
||||
|
||||
cs = client.Client("username", "password", "project_id",
|
||||
auth_system="fakewithauthurl",
|
||||
auth_plugin=plugin)
|
||||
self.assertEquals(cs.client.auth_url, "http://faked/v2.0")
|
||||
|
||||
@mock.patch.object(pkg_resources, "iter_entry_points")
|
||||
def test_exception_if_no_authenticate(self, mock_iter_entry_points):
|
||||
"""Test that no authenticate raises a proper exception."""
|
||||
class MockEntrypoint(pkg_resources.EntryPoint):
|
||||
def load(self):
|
||||
return FakePlugin
|
||||
|
||||
class FakePlugin(auth_plugin.BaseAuthPlugin):
|
||||
pass
|
||||
|
||||
mock_iter_entry_points.side_effect = lambda _t: [
|
||||
MockEntrypoint("fake", "fake", ["FakePlugin"])]
|
||||
|
||||
auth_plugin.discover_auth_systems()
|
||||
plugin = auth_plugin.load_plugin("fake")
|
||||
|
||||
self.assertRaises(
|
||||
exceptions.EndpointNotFound,
|
||||
client.Client, "username", "password", "project_id",
|
||||
auth_system="fakewithauthurl")
|
||||
auth_system="fake", auth_plugin=plugin)
|
||||
|
||||
test_auth_call()
|
||||
@mock.patch.object(pkg_resources, "iter_entry_points")
|
||||
def test_exception_if_no_url(self, mock_iter_entry_points):
|
||||
"""Test that no auth_url at all raises exception."""
|
||||
class MockEntrypoint(pkg_resources.EntryPoint):
|
||||
def load(self):
|
||||
return FakePlugin
|
||||
|
||||
class FakePlugin(auth_plugin.BaseAuthPlugin):
|
||||
pass
|
||||
|
||||
mock_iter_entry_points.side_effect = lambda _t: [
|
||||
MockEntrypoint("fake", "fake", ["FakePlugin"])]
|
||||
|
||||
auth_plugin.discover_auth_systems()
|
||||
plugin = auth_plugin.load_plugin("fake")
|
||||
|
||||
self.assertRaises(
|
||||
exceptions.EndpointNotFound,
|
||||
client.Client, "username", "password", "project_id",
|
||||
auth_system="fake", auth_plugin=plugin)
|
||||
|
Loading…
Reference in New Issue
Block a user