identity: Add access rule CRUD support

This patch adds the client support of access rule in
application credentials [1].

[1] https://docs.openstack.org/api-ref/identity/v3/index.html#list-access-rules

Change-Id: I9e1eab6eb3ff6e152b408af2fe6ddc57c8c168b2
This commit is contained in:
Stephen Finucane 2023-05-16 11:03:58 +01:00
parent 692b7f39e3
commit 4faf511f4f
8 changed files with 271 additions and 0 deletions

View File

@ -14,6 +14,7 @@ import openstack.exceptions as exception
from openstack.identity.v3 import (
application_credential as _application_credential,
)
from openstack.identity.v3 import access_rule as _access_rule
from openstack.identity.v3 import credential as _credential
from openstack.identity.v3 import domain as _domain
from openstack.identity.v3 import endpoint as _endpoint
@ -57,6 +58,7 @@ from openstack import utils
class Proxy(proxy.Proxy):
_resource_registry = {
"application_credential": _application_credential.ApplicationCredential, # noqa: E501
"access_rule": _access_rule.AccessRule,
"credential": _credential.Credential,
"domain": _domain.Domain,
"endpoint": _endpoint.Endpoint,
@ -2011,3 +2013,56 @@ class Proxy(proxy.Proxy):
return self._update(
_identity_provider.IdentityProvider, identity_provider, **attrs
)
# ========== Access rules ==========
def access_rules(self, user, **query):
"""Retrieve a generator of access rules
:param user: Either the ID of a user or a :class:`~.user.User`
instance.
:param kwargs query: Optional query parameters to be sent to
limit the resources being returned.
:returns: A generator of access rules instances.
:rtype: :class:`~openstack.identity.v3.access_rule.AccessRule`
"""
user = self._get_resource(_user.User, user)
return self._list(_access_rule.AccessRule, user_id=user.id, **query)
def get_access_rule(self, user, access_rule):
"""Get a single access rule
:param user: Either the ID of a user or a :class:`~.user.User`
instance.
:param access rule: The value can be the ID of an access rule or a
:class:`~.access_rule.AccessRule` instance.
:returns: One :class:`~.access_rule.AccessRule`
:raises: :class:`~openstack.exceptions.ResourceNotFound` when no
resource can be found.
"""
user = self._get_resource(_user.User, user)
return self._get(_access_rule.AccessRule, access_rule, user_id=user.id)
def delete_access_rule(self, user, access_rule, ignore_missing=True):
"""Delete an access rule
:param user: Either the ID of a user or a :class:`~.user.User`
instance.
:param access rule: The value can be either the ID of an
access rule or a :class:`~.access_rule.AccessRule` instance.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be raised when
the access rule does not exist. When set to ``True``, no exception
will be thrown when attempting to delete a nonexistent access rule.
:returns: ``None``
"""
user = self._get_resource(_user.User, user)
self._delete(
_access_rule.AccessRule,
access_rule,
user_id=user.id,
ignore_missing=ignore_missing,
)

View File

@ -0,0 +1,39 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack import resource
class AccessRule(resource.Resource):
resource_key = 'access_rule'
resources_key = 'access_rules'
base_path = '/users/%(user_id)s/access_rules'
# capabilities
allow_fetch = True
allow_delete = True
allow_list = True
# Properties
#: The links for the access rule resource.
links = resource.Body('links')
#: Method that application credential is permitted to use.
# *Type: string*
method = resource.Body('method')
#: Path that the application credential is permitted to access.
# *Type: string*
path = resource.Body('path')
#: Service type identifier that application credential had access.
# *Type: string*
service = resource.Body('service')
#: User ID using access rule. *Type: string*
user_id = resource.URI('user_id')

View File

@ -47,3 +47,5 @@ class ApplicationCredential(resource.Resource):
unrestricted = resource.Body('unrestricted', type=bool)
#: ID of project. *Type: string*
project_id = resource.Body('project_id')
#: access rules for application credential. *Type: list*
access_rules = resource.Body('access_rules')

View File

@ -0,0 +1,81 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack import exceptions
from openstack.tests.functional import base
class TestAccessRule(base.BaseFunctionalTest):
def setUp(self):
super().setUp()
self.user_id = self.operator_cloud.current_user_id
def _create_application_credential_with_access_rule(self):
"""create application credential with access_rule."""
app_cred = self.conn.identity.create_application_credential(
user=self.user_id,
name='app_cred',
access_rules=[
{
"path": "/v2.0/metrics",
"service": "monitoring",
"method": "GET",
}
],
)
self.addCleanup(
self.conn.identity.delete_application_credential,
self.user_id,
app_cred['id'],
)
return app_cred
def test_get_access_rule(self):
app_cred = self._create_application_credential_with_access_rule()
access_rule_id = app_cred['access_rules'][0]['id']
access_rule = self.conn.identity.get_access_rule(
user=self.user_id, access_rule=access_rule_id
)
self.assertEqual(access_rule['id'], access_rule_id)
self.assertEqual(access_rule['user_id'], self.user_id)
def test_list_access_rules(self):
app_cred = self._create_application_credential_with_access_rule()
access_rule_id = app_cred['access_rules'][0]['id']
access_rules = self.conn.identity.access_rules(user=self.user_id)
self.assertEqual(1, len(list(access_rules)))
for access_rule in access_rules:
self.assertEqual(app_cred['user_id'], self.user_id)
self.assertEqual(access_rule_id, access_rule['id'])
def test_delete_access_rule(self):
app_cred = self._create_application_credential_with_access_rule()
access_rule_id = app_cred['access_rules'][0]['id']
# This is expected to raise an exception since access_rule is still
# in use for app_cred.
self.assertRaises(
exceptions.HttpException,
self.conn.identity.delete_access_rule,
user=self.user_id,
access_rule=access_rule_id,
)
# delete application credential first to delete access rule
self.conn.identity.delete_application_credential(
user=self.user_id, application_credential=app_cred['id']
)
# delete orphaned access rules
self.conn.identity.delete_access_rule(
user=self.user_id, access_rule=access_rule_id
)

View File

@ -0,0 +1,42 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.identity.v3 import access_rule
from openstack.tests.unit import base
EXAMPLE = {
"links": {
"self": "https://example.com/identity/v3/access_rules"
"/07d719df00f349ef8de77d542edf010c"
},
"path": "/v2.1/servers/{server_id}/ips",
"method": "GET",
"service": "compute",
}
class TestAccessRule(base.TestCase):
def test_basic(self):
sot = access_rule.AccessRule()
self.assertEqual('access_rule', sot.resource_key)
self.assertEqual('access_rules', sot.resources_key)
self.assertEqual('/users/%(user_id)s/access_rules', sot.base_path)
self.assertTrue(sot.allow_fetch)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
def test_make_it(self):
sot = access_rule.AccessRule(**EXAMPLE)
self.assertEqual(EXAMPLE['path'], sot.path)
self.assertEqual(EXAMPLE['method'], sot.method)
self.assertEqual(EXAMPLE['service'], sot.service)
self.assertEqual(EXAMPLE['links'], sot.links)

View File

@ -19,6 +19,9 @@ EXAMPLE = {
"name": 'monitoring',
"secret": 'rEaqvJka48mpv',
"roles": [{"name": "Reader"}],
"access_rules": [
{"path": "/v2.0/metrics", "service": "monitoring", "method": "GET"},
],
"expires_at": '2018-02-27T18:30:59Z',
"description": "Application credential for monitoring",
"unrestricted": "False",
@ -51,3 +54,4 @@ class TestApplicationCredential(base.TestCase):
self.assertEqual(EXAMPLE['project_id'], sot.project_id)
self.assertEqual(EXAMPLE['roles'], sot.roles)
self.assertEqual(EXAMPLE['links'], sot.links)
self.assertEqual(EXAMPLE['access_rules'], sot.access_rules)

View File

@ -13,6 +13,7 @@
import uuid
from openstack.identity.v3 import _proxy
from openstack.identity.v3 import access_rule
from openstack.identity.v3 import credential
from openstack.identity.v3 import domain
from openstack.identity.v3 import endpoint
@ -568,3 +569,45 @@ class TestIdentityProxyRoleAssignments(TestIdentityProxyBase):
self.proxy._get_resource(role.Role, 'rid'),
],
)
class TestAccessRule(TestIdentityProxyBase):
def test_access_rule_delete(self):
self.verify_delete(
self.proxy.delete_access_rule,
access_rule.AccessRule,
False,
method_args=[],
method_kwargs={'user': USER_ID, 'access_rule': 'access_rule'},
expected_args=['access_rule'],
expected_kwargs={'user_id': USER_ID},
)
def test_access_rule_delete_ignore(self):
self.verify_delete(
self.proxy.delete_access_rule,
access_rule.AccessRule,
True,
method_args=[],
method_kwargs={'user': USER_ID, 'access_rule': 'access_rule'},
expected_args=['access_rule'],
expected_kwargs={'user_id': USER_ID},
)
def test_access_rule_get(self):
self.verify_get(
self.proxy.get_access_rule,
access_rule.AccessRule,
method_args=[],
method_kwargs={'user': USER_ID, 'access_rule': 'access_rule'},
expected_args=['access_rule'],
expected_kwargs={'user_id': USER_ID},
)
def test_access_rules(self):
self.verify_list(
self.proxy.access_rules,
access_rule.AccessRule,
method_kwargs={'user': USER_ID},
expected_kwargs={'user_id': USER_ID},
)

View File

@ -0,0 +1,5 @@
---
features:
- |
Added support for `access_rules
<https://docs.openstack.org/keystone/latest/user/application_credentials.html#access-rules>`_.