Merge "Switch auth to keystoneauth"

This commit is contained in:
Zuul
2026-02-10 17:25:47 +00:00
committed by Gerrit Code Review
5 changed files with 209 additions and 170 deletions

View File

@@ -82,22 +82,32 @@ class ClientTest(utils.TestCase):
self.assertIsNotNone(c.client)
self.assertIsNone(c.keystone_client)
@mock.patch.object(client.Client, '_get_keystone_client', mock.Mock())
def test_valid_region_name_v1(self):
@mock.patch.object(client.Client, '_get_keystone_auth_and_session')
def test_valid_region_name_v1(self, mock_get_auth):
self.mock_object(client.httpclient, 'HTTPClient')
kc = client.Client._get_keystone_client.return_value
kc.service_catalog = mock.Mock()
kc.service_catalog.get_endpoints = mock.Mock(return_value=self.catalog)
self.mock_object(client.adapter, 'LegacyJsonAdapter')
# Mock the auth and session returned by _get_keystone_auth_and_session
mock_auth = mock.Mock()
mock_session = mock.Mock()
mock_get_auth.return_value = (mock_auth, mock_session)
# Mock the adapter to return token and endpoint
mocked_adapter = client.adapter.LegacyJsonAdapter.return_value
mocked_adapter.session.get_token.return_value = 'fake_token'
mocked_adapter.session.get_endpoint.return_value = 'http://1.2.3.4'
mocked_adapter.auth = mock_auth
c = client.Client(
api_version=manilaclient.API_DEPRECATED_VERSION,
service_type="share",
region_name='TestRegion',
)
self.assertTrue(client.Client._get_keystone_client.called)
kc.service_catalog.get_endpoints.assert_called_with('share')
self.assertTrue(mock_get_auth.called)
client.httpclient.HTTPClient.assert_called_with(
'http://1.2.3.4',
mock.ANY,
'fake_token',
'python-manilaclient',
insecure=False,
cacert=None,
@@ -109,43 +119,67 @@ class ClientTest(utils.TestCase):
)
self.assertIsNotNone(c.client)
@mock.patch.object(client.Client, '_get_keystone_client', mock.Mock())
def test_nonexistent_region_name(self):
kc = client.Client._get_keystone_client.return_value
kc.service_catalog = mock.Mock()
kc.service_catalog.get_endpoints = mock.Mock(return_value=self.catalog)
@mock.patch.object(client.Client, '_get_keystone_auth_and_session')
def test_nonexistent_region_name(self, mock_get_auth):
self.mock_object(client.adapter, 'LegacyJsonAdapter')
# Mock the auth and session returned by _get_keystone_auth_and_session
mock_auth = mock.Mock()
mock_session = mock.Mock()
mock_get_auth.return_value = (mock_auth, mock_session)
# Mock the adapter to return token but no endpoint (None)
mocked_adapter = client.adapter.LegacyJsonAdapter.return_value
mocked_adapter.session.get_token.return_value = 'fake_token'
mocked_adapter.session.get_endpoint.return_value = None
mocked_adapter.auth = mock_auth
self.assertRaises(
RuntimeError,
client.Client,
api_version=manilaclient.API_MAX_VERSION,
region_name='FakeRegion',
)
self.assertTrue(client.Client._get_keystone_client.called)
kc.service_catalog.get_endpoints.assert_called_with('sharev2')
self.assertTrue(mock_get_auth.called)
mocked_adapter.session.get_endpoint.assert_called_with(
mock_auth,
interface='publicURL',
service_type='sharev2',
region_name='FakeRegion',
)
@mock.patch.object(client.Client, '_get_keystone_client', mock.Mock())
def test_regions_with_same_name(self):
@mock.patch.object(client.Client, '_get_keystone_auth_and_session')
def test_regions_with_same_name(self, mock_get_auth):
self.mock_object(client.httpclient, 'HTTPClient')
catalog = {
'sharev2': [
{'region': 'FirstRegion', 'publicURL': 'http://1.2.3.4'},
{'region': 'secondregion', 'publicURL': 'http://1.1.1.1'},
{'region': 'SecondRegion', 'publicURL': 'http://2.2.2.2'},
],
}
kc = client.Client._get_keystone_client.return_value
kc.service_catalog = mock.Mock()
kc.service_catalog.get_endpoints = mock.Mock(return_value=catalog)
self.mock_object(client.adapter, 'LegacyJsonAdapter')
# Mock the auth and session returned by _get_keystone_auth_and_session
mock_auth = mock.Mock()
mock_session = mock.Mock()
mock_get_auth.return_value = (mock_auth, mock_session)
# Mock the adapter to return token and endpoint
mocked_adapter = client.adapter.LegacyJsonAdapter.return_value
mocked_adapter.session.get_token.return_value = 'fake_token'
mocked_adapter.session.get_endpoint.return_value = 'http://2.2.2.2'
mocked_adapter.auth = mock_auth
c = client.Client(
api_version=manilaclient.API_MIN_VERSION,
service_type='sharev2',
region_name='SecondRegion',
)
self.assertTrue(client.Client._get_keystone_client.called)
kc.service_catalog.get_endpoints.assert_called_with('sharev2')
self.assertTrue(mock_get_auth.called)
mocked_adapter.session.get_endpoint.assert_called_with(
mock_auth,
interface='publicURL',
service_type='sharev2',
region_name='SecondRegion',
)
client.httpclient.HTTPClient.assert_called_with(
'http://2.2.2.2',
mock.ANY,
'fake_token',
'python-manilaclient',
insecure=False,
cacert=None,
@@ -219,68 +253,29 @@ class ClientTest(utils.TestCase):
return None
self.mock_object(client.httpclient, 'HTTPClient')
self.mock_object(client.ks_client, 'Client')
self.mock_object(client.identity.v3, 'Password')
self.mock_object(client.adapter, 'LegacyJsonAdapter')
self.mock_object(client.session.discover, 'Discover')
self.mock_object(client.session, 'Session')
client_args = self._get_client_args(**kwargs)
client_args['api_version'] = manilaclient.API_MIN_VERSION
self.auth_url = client_args['auth_url']
catalog = {
'share': [
{
'region': 'SecondRegion',
'region_id': 'SecondRegion',
'url': 'http://4.4.4.4',
'interface': 'public',
},
],
'sharev2': [
{
'region': 'FirstRegion',
'interface': 'public',
'region_id': 'SecondRegion',
'url': 'http://1.1.1.1',
},
{
'region': 'secondregion',
'interface': 'public',
'region_id': 'SecondRegion',
'url': 'http://2.2.2.2',
},
{
'region': 'SecondRegion',
'interface': 'internal',
'region_id': 'SecondRegion',
'url': 'http://3.3.3.1',
},
{
'region': 'SecondRegion',
'interface': 'public',
'region_id': 'SecondRegion',
'url': 'http://3.3.3.3',
},
{
'region': 'SecondRegion',
'interface': 'admin',
'region_id': 'SecondRegion',
'url': 'http://3.3.3.2',
},
],
}
client.session.discover.Discover.return_value.url_for.side_effect = (
fake_url_for
)
client.ks_client.Client.return_value.auth_token.return_value = (
'fake_token'
)
mocked_ks_client = client.ks_client.Client.return_value
mocked_ks_client.service_catalog.get_endpoints.return_value = catalog
# Mock the adapter to return token and endpoint
mocked_adapter = client.adapter.LegacyJsonAdapter.return_value
mocked_adapter.session.get_token.return_value = 'fake_token'
mocked_adapter.session.get_endpoint.return_value = 'http://3.3.3.3'
mocked_adapter.auth = client.identity.v3.Password.return_value
client.Client(**client_args)
client.httpclient.HTTPClient.assert_called_with(
'http://3.3.3.3',
mock.ANY,
'fake_token',
'python-manilaclient',
insecure=False,
cacert=None,
@@ -291,9 +286,8 @@ class ClientTest(utils.TestCase):
api_version=manilaclient.API_MIN_VERSION,
)
client.ks_client.Client.assert_called_with(
session=mock.ANY,
version=(3, 0),
# Verify identity.v3.Password was called with correct credentials
client.identity.v3.Password.assert_called_with(
auth_url='url_v3.0',
username=client_args['username'],
password=client_args.get('password'),
@@ -306,18 +300,35 @@ class ClientTest(utils.TestCase):
project_name=client_args['project_name'],
project_domain_name=client_args['project_domain_name'],
project_domain_id=client_args['project_domain_id'],
)
# Verify LegacyJsonAdapter was created
client.adapter.LegacyJsonAdapter.assert_called_with(
session=mock.ANY,
auth=mock.ANY,
interface=client_args['endpoint_type'],
service_type=client_args['service_type'],
service_name=None,
region_name=client_args['region_name'],
)
# Verify session.get_token() was called
mocked_adapter.session.get_token.assert_called_with(mock.ANY)
# Verify session.get_endpoint() was called
mocked_adapter.session.get_endpoint.assert_called_with(
mock.ANY,
interface=client_args['endpoint_type'],
service_type=client_args['service_type'],
region_name=client_args['region_name'],
)
mocked_ks_client.service_catalog.get_endpoints.assert_called_with(
client_args['service_type']
)
mocked_ks_client.authenticate.assert_called_with()
@mock.patch.object(client.ks_client, 'Client', mock.Mock())
@mock.patch.object(client.session.discover, 'Discover', mock.Mock())
@mock.patch.object(client.session, 'Session', mock.Mock())
def test_client_init_no_session_no_auth_token_endpoint_not_found(self):
self.mock_object(client.httpclient, 'HTTPClient')
self.mock_object(client.identity.v3, 'Password')
self.mock_object(client.adapter, 'LegacyJsonAdapter')
client_args = self._get_client_args(
auth_urli='fake_url',
password='foo_password',
@@ -325,7 +336,6 @@ class ClientTest(utils.TestCase):
)
discover = client.session.discover.Discover
discover.return_value.url_for.return_value = None
mocked_ks_client = client.ks_client.Client.return_value
self.assertRaises(
exceptions.CommandError, client.Client, **client_args
@@ -334,6 +344,5 @@ class ClientTest(utils.TestCase):
self.assertTrue(client.session.Session.called)
self.assertTrue(client.session.discover.Discover.called)
self.assertFalse(client.httpclient.HTTPClient.called)
self.assertFalse(client.ks_client.Client.called)
self.assertFalse(mocked_ks_client.service_catalog.get_endpoints.called)
self.assertFalse(mocked_ks_client.authenticate.called)
self.assertFalse(client.identity.v3.Password.called)
self.assertFalse(client.adapter.LegacyJsonAdapter.called)

View File

@@ -13,8 +13,8 @@
from debtcollector import removals
from keystoneauth1 import adapter
from keystoneauth1 import identity
from keystoneauth1 import session
from keystoneclient import client as ks_client
import manilaclient
from manilaclient.common import constants
@@ -50,7 +50,7 @@ class Client:
Or, alternatively, you can create a client instance using the
keystoneauth1.session API::
>>> from keystoneclient.auth.identity import v3
>>> from keystoneauth1.identity import v3
>>> from keystoneauth1 import session
>>> from manilaclient import client
>>> auth = v3.Password(auth_url=AUTH_URL,
@@ -68,6 +68,24 @@ class Client:
...
"""
@removals.removed_kwarg(
'use_keyring',
message='This parameter is no longer supported and has no effect.',
version='5.8.0',
removal_version='6.0.0',
)
@removals.removed_kwarg(
'force_new_token',
message='This parameter is no longer supported and has no effect.',
version='5.8.0',
removal_version='6.0.0',
)
@removals.removed_kwarg(
'cached_token_lifetime',
message='This parameter is no longer supported and has no effect.',
version='5.8.0',
removal_version='6.0.0',
)
def __init__(
self,
username=None,
@@ -124,10 +142,6 @@ class Client:
self.cert = cert
self.insecure = insecure
self.use_keyring = use_keyring
self.force_new_token = force_new_token
self.cached_token_lifetime = cached_token_lifetime
if input_auth_token and not service_catalog_url:
msg = (
"For token-based authentication you should "
@@ -144,6 +158,7 @@ class Client:
# if token is provided.
if not input_auth_token:
if session:
# Modern path - session provided by caller (e.g., OSC plugin)
self.keystone_client = adapter.LegacyJsonAdapter(
session=session,
auth=auth,
@@ -153,36 +168,30 @@ class Client:
region_name=region_name,
)
input_auth_token = self.keystone_client.session.get_token(auth)
else:
self.keystone_client = self._get_keystone_client()
input_auth_token = self.keystone_client.auth_token
# Legacy path - create auth plugin and session ourselves
auth, ks_session = self._get_keystone_auth_and_session()
self.keystone_client = adapter.LegacyJsonAdapter(
session=ks_session,
auth=auth,
interface=endpoint_type,
service_type=service_type,
service_name=service_name,
region_name=region_name,
)
input_auth_token = self.keystone_client.session.get_token(auth)
if not input_auth_token:
raise RuntimeError("Not Authorized")
if session and not service_catalog_url:
if not service_catalog_url:
# Use keystoneauth1 session endpoint discovery
service_catalog_url = self.keystone_client.session.get_endpoint(
auth, interface=endpoint_type, service_type=service_type
self.keystone_client.auth,
interface=endpoint_type,
service_type=service_type,
region_name=region_name,
)
elif not service_catalog_url:
catalog = self.keystone_client.service_catalog.get_endpoints(
service_type
)
for catalog_entry in catalog.get(service_type, []):
if catalog_entry.get("interface") == (
endpoint_type.lower().split("url")[0]
) or catalog_entry.get(endpoint_type):
if region_name and not region_name == (
catalog_entry.get(
"region", catalog_entry.get("region_id")
)
):
continue
service_catalog_url = catalog_entry.get(
"url", catalog_entry.get(endpoint_type)
)
break
if not service_catalog_url:
raise RuntimeError("Could not find Manila endpoint in catalog")
@@ -227,17 +236,21 @@ class Client:
if extension.manager_class:
setattr(self, extension.name, extension.manager_class(self))
def _get_keystone_client(self):
# First create a Keystone session
def _get_keystone_auth_and_session(self):
"""Create keystoneauth1 auth plugin and session for authentication.
Returns:
tuple: (auth_plugin, session) for use with keystoneauth1
"""
# Create session with SSL settings
if self.insecure:
verify = False
else:
verify = self.cacert or True
ks_session = session.Session(verify=verify, cert=self.cert)
# Discover the supported keystone versions using the given url
# Discover Keystone v3 endpoint
ks_discover = session.discover.Discover(ks_session, self.auth_url)
auth_url = ks_discover.url_for('v3.0')
if not auth_url:
raise exceptions.CommandError(
@@ -245,9 +258,8 @@ class Client:
'with using the given auth_url.'
)
keystone_client = ks_client.Client(
session=ks_session,
version=(3, 0),
# Create v3 Password auth plugin
auth = identity.v3.Password(
auth_url=auth_url,
username=self.username,
password=self.password,
@@ -258,8 +270,6 @@ class Client:
project_name=self.project_name,
project_domain_name=self.project_domain_name,
project_domain_id=self.project_domain_id,
region_name=self.region_name,
)
keystone_client.authenticate()
return keystone_client
return auth, ks_session

View File

@@ -10,9 +10,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from debtcollector import removals
from keystoneauth1 import adapter
from keystoneauth1 import identity
from keystoneauth1 import session
from keystoneclient import client as ks_client
import manilaclient
from manilaclient.common import constants
@@ -64,7 +65,7 @@ class Client:
Or, alternatively, you can create a client instance using the
keystoneauth1.session API::
>>> from keystoneclient.auth.identity import v3
>>> from keystoneauth1.identity import v3
>>> from keystoneauth1 import session
>>> from manilaclient import client
>>> auth = v3.Password(auth_url=AUTH_URL,
@@ -82,6 +83,24 @@ class Client:
...
"""
@removals.removed_kwarg(
'use_keyring',
message='This parameter is no longer supported and has no effect.',
version='5.8.0',
removal_version='6.0.0',
)
@removals.removed_kwarg(
'force_new_token',
message='This parameter is no longer supported and has no effect.',
version='5.8.0',
removal_version='6.0.0',
)
@removals.removed_kwarg(
'cached_token_lifetime',
message='This parameter is no longer supported and has no effect.',
version='5.8.0',
removal_version='6.0.0',
)
def __init__(
self,
username=None,
@@ -138,10 +157,6 @@ class Client:
self.cert = cert
self.insecure = insecure
self.use_keyring = use_keyring
self.force_new_token = force_new_token
self.cached_token_lifetime = cached_token_lifetime
if input_auth_token and not service_catalog_url:
msg = (
"For token-based authentication you should "
@@ -158,6 +173,7 @@ class Client:
# if token is provided.
if not input_auth_token:
if session:
# Modern path - session provided by caller (e.g., OSC plugin)
self.keystone_client = adapter.LegacyJsonAdapter(
session=session,
auth=auth,
@@ -167,39 +183,30 @@ class Client:
region_name=region_name,
)
input_auth_token = self.keystone_client.session.get_token(auth)
else:
self.keystone_client = self._get_keystone_client()
input_auth_token = self.keystone_client.auth_token
# Legacy path - create auth plugin and session ourselves
auth, ks_session = self._get_keystone_auth_and_session()
self.keystone_client = adapter.LegacyJsonAdapter(
session=ks_session,
auth=auth,
interface=endpoint_type,
service_type=service_type,
service_name=service_name,
region_name=region_name,
)
input_auth_token = self.keystone_client.session.get_token(auth)
if not input_auth_token:
raise RuntimeError("Not Authorized")
if session and not service_catalog_url:
if not service_catalog_url:
# Use keystoneauth1 session endpoint discovery
service_catalog_url = self.keystone_client.session.get_endpoint(
auth,
self.keystone_client.auth,
interface=endpoint_type,
service_type=service_type,
region_name=region_name,
)
elif not service_catalog_url:
catalog = self.keystone_client.service_catalog.get_endpoints(
service_type
)
for catalog_entry in catalog.get(service_type, []):
if catalog_entry.get("interface") == (
endpoint_type.lower().split("url")[0]
) or catalog_entry.get(endpoint_type):
if region_name and not region_name == (
catalog_entry.get(
"region", catalog_entry.get("region_id")
)
):
continue
service_catalog_url = catalog_entry.get(
"url", catalog_entry.get(endpoint_type)
)
break
if not service_catalog_url:
raise RuntimeError("Could not find Manila endpoint in catalog")
@@ -292,17 +299,21 @@ class Client:
if extension.manager_class:
setattr(self, extension.name, extension.manager_class(self))
def _get_keystone_client(self):
# First create a Keystone session
def _get_keystone_auth_and_session(self):
"""Create keystoneauth1 auth plugin and session for authentication.
Returns:
tuple: (auth_plugin, session) for use with keystoneauth1
"""
# Create session with SSL settings
if self.insecure:
verify = False
else:
verify = self.cacert or True
ks_session = session.Session(verify=verify, cert=self.cert)
# Discover the supported keystone versions using the given url
# Discover Keystone v3 endpoint
ks_discover = session.discover.Discover(ks_session, self.auth_url)
auth_url = ks_discover.url_for('v3.0')
if not auth_url:
raise exceptions.CommandError(
@@ -310,9 +321,8 @@ class Client:
'with using the given auth_url.'
)
keystone_client = ks_client.Client(
session=ks_session,
version=(3, 0),
# Create v3 Password auth plugin
auth = identity.v3.Password(
auth_url=auth_url,
username=self.username,
password=self.password,
@@ -323,8 +333,6 @@ class Client:
project_name=self.project_name,
project_domain_name=self.project_domain_name,
project_domain_id=self.project_domain_id,
region_name=self.region_name,
)
keystone_client.authenticate()
return keystone_client
return auth, ks_session

View File

@@ -0,0 +1,12 @@
---
deprecations:
- |
Legacy keystoneclient options "use_keyring", "force_new_token"
and "cached_token_lifetime" were dropped. Using these options
will have no effect with this release and will cause an error
in a future release.
security:
- |
The use of the keystoneclient library has been replaced with keystoneauth.
The latter library is better maintained and has more auth options that are
now usable with manilaclient.

View File

@@ -11,5 +11,5 @@ oslo.utils>=3.33.0 # Apache-2.0
PrettyTable>=0.7.1 # BSD
requests>=2.14.2 # Apache-2.0
osc-lib>=3.2.0 # Apache-2.0
python-keystoneclient>=3.8.0 # Apache-2.0
keystoneauth1>=3.0.0 # Apache-2.0
debtcollector>=1.2.0 # Apache-2.0