Blackify openstack.identity

Black used with the '-l 79 -S' flags.

A future change will ignore this commit in git-blame history by adding a
'git-blame-ignore-revs' file.

Change-Id: I2faa1f6c0a41946b3672bafc430b53a09611aea4
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2023-05-03 12:11:13 +01:00
parent f526b990f3
commit 542ddaa1ad
46 changed files with 552 additions and 470 deletions

View File

@ -18,7 +18,6 @@ from openstack import proxy
class Proxy(proxy.Proxy):
def extensions(self):
"""Retrieve a generator of extensions
@ -78,8 +77,9 @@ class Proxy(proxy.Proxy):
attempting to find a nonexistent resource.
:returns: One :class:`~openstack.identity.v2.role.Role` or None
"""
return self._find(_role.Role, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_role.Role, name_or_id, ignore_missing=ignore_missing
)
def get_role(self, role):
"""Get a single role
@ -155,8 +155,9 @@ class Proxy(proxy.Proxy):
attempting to find a nonexistent resource.
:returns: One :class:`~openstack.identity.v2.tenant.Tenant` or None
"""
return self._find(_tenant.Tenant, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_tenant.Tenant, name_or_id, ignore_missing=ignore_missing
)
def get_tenant(self, tenant):
"""Get a single tenant
@ -232,8 +233,9 @@ class Proxy(proxy.Proxy):
attempting to find a nonexistent resource.
:returns: One :class:`~openstack.identity.v2.user.User` or None
"""
return self._find(_user.User, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_user.User, name_or_id, ignore_missing=ignore_missing
)
def get_user(self, user):
"""Get a single user

View File

@ -48,8 +48,7 @@ class Extension(resource.Resource):
if base_path is None:
base_path = cls.base_path
resp = session.get(base_path,
params=params)
resp = session.get(base_path, params=params)
resp = resp.json()
for data in resp[cls.resources_key]['values']:
yield cls.existing(**data)

View File

@ -11,8 +11,9 @@
# under the License.
import openstack.exceptions as exception
from openstack.identity.v3 import application_credential as \
_application_credential
from openstack.identity.v3 import (
application_credential as _application_credential,
)
from openstack.identity.v3 import credential as _credential
from openstack.identity.v3 import domain as _domain
from openstack.identity.v3 import endpoint as _endpoint
@ -27,18 +28,24 @@ from openstack.identity.v3 import region as _region
from openstack.identity.v3 import registered_limit as _registered_limit
from openstack.identity.v3 import role as _role
from openstack.identity.v3 import role_assignment as _role_assignment
from openstack.identity.v3 import role_domain_group_assignment \
as _role_domain_group_assignment
from openstack.identity.v3 import role_domain_user_assignment \
as _role_domain_user_assignment
from openstack.identity.v3 import role_project_group_assignment \
as _role_project_group_assignment
from openstack.identity.v3 import role_project_user_assignment \
as _role_project_user_assignment
from openstack.identity.v3 import role_system_group_assignment \
as _role_system_group_assignment
from openstack.identity.v3 import role_system_user_assignment \
as _role_system_user_assignment
from openstack.identity.v3 import (
role_domain_group_assignment as _role_domain_group_assignment,
)
from openstack.identity.v3 import (
role_domain_user_assignment as _role_domain_user_assignment,
)
from openstack.identity.v3 import (
role_project_group_assignment as _role_project_group_assignment,
)
from openstack.identity.v3 import (
role_project_user_assignment as _role_project_user_assignment,
)
from openstack.identity.v3 import (
role_system_group_assignment as _role_system_group_assignment,
)
from openstack.identity.v3 import (
role_system_user_assignment as _role_system_user_assignment,
)
from openstack.identity.v3 import service as _service
from openstack.identity.v3 import system as _system
from openstack.identity.v3 import trust as _trust
@ -49,8 +56,7 @@ from openstack import utils
class Proxy(proxy.Proxy):
_resource_registry = {
"application_credential":
_application_credential.ApplicationCredential,
"application_credential": _application_credential.ApplicationCredential, # noqa: E501
"credential": _credential.Credential,
"domain": _domain.Domain,
"endpoint": _endpoint.Endpoint,
@ -65,18 +71,12 @@ class Proxy(proxy.Proxy):
"registered_limit": _registered_limit.RegisteredLimit,
"role": _role.Role,
"role_assignment": _role_assignment.RoleAssignment,
"role_domain_group_assignment":
_role_domain_group_assignment.RoleDomainGroupAssignment,
"role_domain_user_assignment":
_role_domain_user_assignment.RoleDomainUserAssignment,
"role_project_group_assignment":
_role_project_group_assignment.RoleProjectGroupAssignment,
"role_project_user_assignment":
_role_project_user_assignment.RoleProjectUserAssignment,
"role_system_group_assignment":
_role_system_group_assignment.RoleSystemGroupAssignment,
"role_system_user_assignment":
_role_system_user_assignment.RoleSystemUserAssignment,
"role_domain_group_assignment": _role_domain_group_assignment.RoleDomainGroupAssignment, # noqa: E501
"role_domain_user_assignment": _role_domain_user_assignment.RoleDomainUserAssignment, # noqa: E501
"role_project_group_assignment": _role_project_group_assignment.RoleProjectGroupAssignment, # noqa: E501
"role_project_user_assignment": _role_project_user_assignment.RoleProjectUserAssignment, # noqa: E501
"role_system_group_assignment": _role_system_group_assignment.RoleSystemGroupAssignment, # noqa: E501
"role_system_user_assignment": _role_system_user_assignment.RoleSystemUserAssignment, # noqa: E501
"service": _service.Service,
"system": _system.System,
"trust": _trust.Trust,
@ -108,8 +108,9 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
self._delete(_credential.Credential, credential,
ignore_missing=ignore_missing)
self._delete(
_credential.Credential, credential, ignore_missing=ignore_missing
)
def find_credential(self, name_or_id, ignore_missing=True):
"""Find a single credential
@ -123,8 +124,9 @@ class Proxy(proxy.Proxy):
:returns: One :class:`~openstack.identity.v3.credential.Credential`
or None
"""
return self._find(_credential.Credential, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_credential.Credential, name_or_id, ignore_missing=ignore_missing
)
def get_credential(self, credential):
"""Get a single credential
@ -201,8 +203,9 @@ class Proxy(proxy.Proxy):
attempting to find a nonexistent resource.
:returns: One :class:`~openstack.identity.v3.domain.Domain` or None
"""
return self._find(_domain.Domain, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_domain.Domain, name_or_id, ignore_missing=ignore_missing
)
def get_domain(self, domain):
"""Get a single domain
@ -266,8 +269,9 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
self._delete(_endpoint.Endpoint, endpoint,
ignore_missing=ignore_missing)
self._delete(
_endpoint.Endpoint, endpoint, ignore_missing=ignore_missing
)
def find_endpoint(self, name_or_id, ignore_missing=True):
"""Find a single endpoint
@ -280,8 +284,9 @@ class Proxy(proxy.Proxy):
attempting to find a nonexistent resource.
:returns: One :class:`~openstack.identity.v3.endpoint.Endpoint` or None
"""
return self._find(_endpoint.Endpoint, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_endpoint.Endpoint, name_or_id, ignore_missing=ignore_missing
)
def get_endpoint(self, endpoint):
"""Get a single endpoint
@ -453,8 +458,7 @@ class Proxy(proxy.Proxy):
:return: List of :class:`~openstack.identity.v3.user.User`
"""
group = self._get_resource(_group.Group, group)
base_path = utils.urljoin(
group.base_path, group.id, 'users')
base_path = utils.urljoin(group.base_path, group.id, 'users')
users = self._list(_user.User, base_path=base_path, **attrs)
return users
@ -496,8 +500,9 @@ class Proxy(proxy.Proxy):
attempting to find a nonexistent resource.
:returns: One :class:`~openstack.identity.v3.policy.Policy` or None
"""
return self._find(_policy.Policy, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_policy.Policy, name_or_id, ignore_missing=ignore_missing
)
def get_policy(self, policy):
"""Get a single policy
@ -671,8 +676,9 @@ class Proxy(proxy.Proxy):
attempting to find a nonexistent resource.
:returns: One :class:`~openstack.identity.v3.service.Service` or None
"""
return self._find(_service.Service, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_service.Service, name_or_id, ignore_missing=ignore_missing
)
def get_service(self, service):
"""Get a single service
@ -831,8 +837,9 @@ class Proxy(proxy.Proxy):
attempting to find a nonexistent resource.
:returns: One :class:`~openstack.identity.v3.trust.Trust` or None
"""
return self._find(_trust.Trust, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_trust.Trust, name_or_id, ignore_missing=ignore_missing
)
def get_trust(self, trust):
"""Get a single trust
@ -896,8 +903,9 @@ class Proxy(proxy.Proxy):
attempting to find a nonexistent region.
:returns: One :class:`~openstack.identity.v3.region.Region` or None
"""
return self._find(_region.Region, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_region.Region, name_or_id, ignore_missing=ignore_missing
)
def get_region(self, region):
"""Get a single region
@ -1017,8 +1025,9 @@ class Proxy(proxy.Proxy):
"""
return self._update(_role.Role, role, **attrs)
def role_assignments_filter(self, domain=None, project=None, system=None,
group=None, user=None):
def role_assignments_filter(
self, domain=None, project=None, system=None, group=None, user=None
):
"""Retrieve a generator of roles assigned to user/group
:param domain: Either the ID of a domain or a
@ -1038,19 +1047,23 @@ class Proxy(proxy.Proxy):
"""
if domain and project and system:
raise exception.InvalidRequest(
'Only one of domain, project, or system can be specified')
'Only one of domain, project, or system can be specified'
)
if domain is None and project is None and system is None:
raise exception.InvalidRequest(
'Either domain, project, or system should be specified')
'Either domain, project, or system should be specified'
)
if group and user:
raise exception.InvalidRequest(
'Only one of group or user can be specified')
'Only one of group or user can be specified'
)
if group is None and user is None:
raise exception.InvalidRequest(
'Either group or user should be specified')
'Either group or user should be specified'
)
if domain:
domain = self._get_resource(_domain.Domain, domain)
@ -1058,36 +1071,48 @@ class Proxy(proxy.Proxy):
group = self._get_resource(_group.Group, group)
return self._list(
_role_domain_group_assignment.RoleDomainGroupAssignment,
domain_id=domain.id, group_id=group.id)
domain_id=domain.id,
group_id=group.id,
)
else:
user = self._get_resource(_user.User, user)
return self._list(
_role_domain_user_assignment.RoleDomainUserAssignment,
domain_id=domain.id, user_id=user.id)
domain_id=domain.id,
user_id=user.id,
)
elif project:
project = self._get_resource(_project.Project, project)
if group:
group = self._get_resource(_group.Group, group)
return self._list(
_role_project_group_assignment.RoleProjectGroupAssignment,
project_id=project.id, group_id=group.id)
project_id=project.id,
group_id=group.id,
)
else:
user = self._get_resource(_user.User, user)
return self._list(
_role_project_user_assignment.RoleProjectUserAssignment,
project_id=project.id, user_id=user.id)
project_id=project.id,
user_id=user.id,
)
else:
system = self._get_resource(_project.System, system)
if group:
group = self._get_resource(_group.Group, group)
return self._list(
_role_system_group_assignment.RoleSystemGroupAssignment,
system_id=system.id, group_id=group.id)
system_id=system.id,
group_id=group.id,
)
else:
user = self._get_resource(_user.User, user)
return self._list(
_role_system_user_assignment.RoleSystemUserAssignment,
system_id=system.id, user_id=user.id)
system_id=system.id,
user_id=user.id,
)
def role_assignments(self, **query):
"""Retrieve a generator of role assignments
@ -1154,8 +1179,9 @@ class Proxy(proxy.Proxy):
:rtype: :class:
`~openstack.identity.v3.registered_limit.RegisteredLimit`
"""
return self._update(_registered_limit.RegisteredLimit,
registered_limit, **attrs)
return self._update(
_registered_limit.RegisteredLimit, registered_limit, **attrs
)
def delete_registered_limit(self, registered_limit, ignore_missing=True):
"""Delete a registered_limit
@ -1172,8 +1198,11 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
self._delete(_registered_limit.RegisteredLimit, registered_limit,
ignore_missing=ignore_missing)
self._delete(
_registered_limit.RegisteredLimit,
registered_limit,
ignore_missing=ignore_missing,
)
def limits(self, **query):
"""Retrieve a generator of limits
@ -1222,8 +1251,7 @@ class Proxy(proxy.Proxy):
:returns: The updated limit.
:rtype: :class:`~openstack.identity.v3.limit.Limit`
"""
return self._update(_limit.Limit,
limit, **attrs)
return self._update(_limit.Limit, limit, **attrs)
def delete_limit(self, limit, ignore_missing=True):
"""Delete a limit
@ -1237,8 +1265,7 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
self._delete(limit.Limit, limit,
ignore_missing=ignore_missing)
self._delete(limit.Limit, limit, ignore_missing=ignore_missing)
def assign_domain_role_to_user(self, domain, user, role):
"""Assign role to user on a domain
@ -1542,8 +1569,11 @@ class Proxy(proxy.Proxy):
:class:`~openstack.identity.v3.application_credential.ApplicationCredential`
"""
user = self._get_resource(_user.User, user)
return self._list(_application_credential.ApplicationCredential,
user_id=user.id, **query)
return self._list(
_application_credential.ApplicationCredential,
user_id=user.id,
**query,
)
def get_application_credential(self, user, application_credential):
"""Get a single application credential
@ -1562,9 +1592,11 @@ class Proxy(proxy.Proxy):
resource can be found.
"""
user = self._get_resource(_user.User, user)
return self._get(_application_credential.ApplicationCredential,
application_credential,
user_id=user.id)
return self._get(
_application_credential.ApplicationCredential,
application_credential,
user_id=user.id,
)
def create_application_credential(self, user, name, **attrs):
"""Create a new application credential from attributes
@ -1584,9 +1616,12 @@ class Proxy(proxy.Proxy):
"""
user = self._get_resource(_user.User, user)
return self._create(_application_credential.ApplicationCredential,
name=name,
user_id=user.id, **attrs)
return self._create(
_application_credential.ApplicationCredential,
name=name,
user_id=user.id,
**attrs,
)
def find_application_credential(
self,
@ -1619,8 +1654,9 @@ class Proxy(proxy.Proxy):
**query,
)
def delete_application_credential(self, user, application_credential,
ignore_missing=True):
def delete_application_credential(
self, user, application_credential, ignore_missing=True
):
"""Delete an application credential
:param user: Either the ID of a user or a
@ -1638,10 +1674,12 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
user = self._get_resource(_user.User, user)
self._delete(_application_credential.ApplicationCredential,
application_credential,
user_id=user.id,
ignore_missing=ignore_missing)
self._delete(
_application_credential.ApplicationCredential,
application_credential,
user_id=user.id,
ignore_missing=ignore_missing,
)
def create_federation_protocol(self, idp_id, **attrs):
"""Create a new federation protocol from attributes
@ -1663,11 +1701,13 @@ class Proxy(proxy.Proxy):
idp_cls = _identity_provider.IdentityProvider
if isinstance(idp_id, idp_cls):
idp_id = idp_id.id
return self._create(_federation_protocol.FederationProtocol,
idp_id=idp_id, **attrs)
return self._create(
_federation_protocol.FederationProtocol, idp_id=idp_id, **attrs
)
def delete_federation_protocol(self, idp_id, protocol,
ignore_missing=True):
def delete_federation_protocol(
self, idp_id, protocol, ignore_missing=True
):
"""Delete a federation protocol
:param idp_id: The ID of the identity provider or a
@ -1693,8 +1733,9 @@ class Proxy(proxy.Proxy):
idp_cls = _identity_provider.IdentityProvider
if isinstance(idp_id, idp_cls):
idp_id = idp_id.id
self._delete(cls, protocol,
ignore_missing=ignore_missing, idp_id=idp_id)
self._delete(
cls, protocol, ignore_missing=ignore_missing, idp_id=idp_id
)
def find_federation_protocol(self, idp_id, protocol, ignore_missing=True):
"""Find a single federation protocol
@ -1714,8 +1755,12 @@ class Proxy(proxy.Proxy):
idp_cls = _identity_provider.IdentityProvider
if isinstance(idp_id, idp_cls):
idp_id = idp_id.id
return self._find(_federation_protocol.FederationProtocol, protocol,
ignore_missing=ignore_missing, idp_id=idp_id)
return self._find(
_federation_protocol.FederationProtocol,
protocol,
ignore_missing=ignore_missing,
idp_id=idp_id,
)
def get_federation_protocol(self, idp_id, protocol):
"""Get a single federation protocol
@ -1759,8 +1804,9 @@ class Proxy(proxy.Proxy):
idp_cls = _identity_provider.IdentityProvider
if isinstance(idp_id, idp_cls):
idp_id = idp_id.id
return self._list(_federation_protocol.FederationProtocol,
idp_id=idp_id, **query)
return self._list(
_federation_protocol.FederationProtocol, idp_id=idp_id, **query
)
def update_federation_protocol(self, idp_id, protocol, **attrs):
"""Update a federation protocol
@ -1827,8 +1873,9 @@ class Proxy(proxy.Proxy):
attempting to find a nonexistent resource.
:returns: One :class:`~openstack.identity.v3.mapping.Mapping` or None
"""
return self._find(_mapping.Mapping, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_mapping.Mapping, name_or_id, ignore_missing=ignore_missing
)
def get_mapping(self, mapping):
"""Get a single mapping
@ -1894,8 +1941,11 @@ class Proxy(proxy.Proxy):
:returns: ``None``
"""
self._delete(_identity_provider.IdentityProvider, identity_provider,
ignore_missing=ignore_missing)
self._delete(
_identity_provider.IdentityProvider,
identity_provider,
ignore_missing=ignore_missing,
)
def find_identity_provider(self, name_or_id, ignore_missing=True):
"""Find a single identity provider
@ -1910,8 +1960,11 @@ class Proxy(proxy.Proxy):
:rtype:
:class:`~openstack.identity.v3.identity_provider.IdentityProvider`
"""
return self._find(_identity_provider.IdentityProvider, name_or_id,
ignore_missing=ignore_missing)
return self._find(
_identity_provider.IdentityProvider,
name_or_id,
ignore_missing=ignore_missing,
)
def get_identity_provider(self, identity_provider):
"""Get a single mapping
@ -1926,8 +1979,9 @@ class Proxy(proxy.Proxy):
:raises: :class:`~openstack.exceptions.ResourceNotFound`
when no resource can be found.
"""
return self._get(_identity_provider.IdentityProvider,
identity_provider)
return self._get(
_identity_provider.IdentityProvider, identity_provider
)
def identity_providers(self, **query):
"""Retrieve a generator of identity providers
@ -1954,5 +2008,6 @@ class Proxy(proxy.Proxy):
:rtype:
:class:`~openstack.identity.v3.identity_provider.IdentityProvider`
"""
return self._update(_identity_provider.IdentityProvider,
identity_provider, **attrs)
return self._update(
_identity_provider.IdentityProvider, identity_provider, **attrs
)

View File

@ -27,7 +27,8 @@ class Credential(resource.Resource):
commit_method = 'PATCH'
_query_mapping = resource.QueryParameters(
'type', 'user_id',
'type',
'user_id',
)
# Properties

View File

@ -50,54 +50,72 @@ class Domain(resource.Resource):
def assign_role_to_user(self, session, user, role):
"""Assign role to user on domain"""
url = utils.urljoin(self.base_path, self.id, 'users',
user.id, 'roles', role.id)
resp = session.put(url,)
url = utils.urljoin(
self.base_path, self.id, 'users', user.id, 'roles', role.id
)
resp = session.put(
url,
)
if resp.status_code == 204:
return True
return False
def validate_user_has_role(self, session, user, role):
"""Validates that a user has a role on a domain"""
url = utils.urljoin(self.base_path, self.id, 'users',
user.id, 'roles', role.id)
resp = session.head(url,)
url = utils.urljoin(
self.base_path, self.id, 'users', user.id, 'roles', role.id
)
resp = session.head(
url,
)
if resp.status_code == 204:
return True
return False
def unassign_role_from_user(self, session, user, role):
"""Unassigns a role from a user on a domain"""
url = utils.urljoin(self.base_path, self.id, 'users',
user.id, 'roles', role.id)
resp = session.delete(url,)
url = utils.urljoin(
self.base_path, self.id, 'users', user.id, 'roles', role.id
)
resp = session.delete(
url,
)
if resp.status_code == 204:
return True
return False
def assign_role_to_group(self, session, group, role):
"""Assign role to group on domain"""
url = utils.urljoin(self.base_path, self.id, 'groups',
group.id, 'roles', role.id)
resp = session.put(url,)
url = utils.urljoin(
self.base_path, self.id, 'groups', group.id, 'roles', role.id
)
resp = session.put(
url,
)
if resp.status_code == 204:
return True
return False
def validate_group_has_role(self, session, group, role):
"""Validates that a group has a role on a domain"""
url = utils.urljoin(self.base_path, self.id, 'groups',
group.id, 'roles', role.id)
resp = session.head(url,)
url = utils.urljoin(
self.base_path, self.id, 'groups', group.id, 'roles', role.id
)
resp = session.head(
url,
)
if resp.status_code == 204:
return True
return False
def unassign_role_from_group(self, session, group, role):
"""Unassigns a role from a group on a domain"""
url = utils.urljoin(self.base_path, self.id, 'groups',
group.id, 'roles', role.id)
resp = session.delete(url,)
url = utils.urljoin(
self.base_path, self.id, 'groups', group.id, 'roles', role.id
)
resp = session.delete(
url,
)
if resp.status_code == 204:
return True
return False

View File

@ -27,7 +27,9 @@ class Endpoint(resource.Resource):
commit_method = 'PATCH'
_query_mapping = resource.QueryParameters(
'interface', 'region_id', 'service_id',
'interface',
'region_id',
'service_id',
)
# Properties

View File

@ -29,7 +29,8 @@ class Group(resource.Resource):
commit_method = 'PATCH'
_query_mapping = resource.QueryParameters(
'domain_id', 'name',
'domain_id',
'name',
)
# Properties
@ -45,23 +46,26 @@ class Group(resource.Resource):
def add_user(self, session, user):
"""Add user to the group"""
url = utils.urljoin(
self.base_path, self.id, 'users', user.id)
resp = session.put(url,)
url = utils.urljoin(self.base_path, self.id, 'users', user.id)
resp = session.put(
url,
)
exceptions.raise_from_response(resp)
def remove_user(self, session, user):
"""Remove user from the group"""
url = utils.urljoin(
self.base_path, self.id, 'users', user.id)
resp = session.delete(url,)
url = utils.urljoin(self.base_path, self.id, 'users', user.id)
resp = session.delete(
url,
)
exceptions.raise_from_response(resp)
def check_user(self, session, user):
"""Check whether user belongs to group"""
url = utils.urljoin(
self.base_path, self.id, 'users', user.id)
resp = session.head(url,)
url = utils.urljoin(self.base_path, self.id, 'users', user.id)
resp = session.head(
url,
)
if resp.status_code == 404:
# If we recieve 404 - treat this as False,
# rather then returning exception

View File

@ -28,7 +28,8 @@ class Limit(resource.Resource):
commit_jsonpatch = True
_query_mapping = resource.QueryParameters(
'service_id', 'region_id', 'resource_name', 'project_id')
'service_id', 'region_id', 'resource_name', 'project_id'
)
# Properties
#: User-facing description of the registered_limit. *Type: string*

View File

@ -64,54 +64,72 @@ class Project(resource.Resource, tag.TagMixin):
def assign_role_to_user(self, session, user, role):
"""Assign role to user on project"""
url = utils.urljoin(self.base_path, self.id, 'users',
user.id, 'roles', role.id)
resp = session.put(url,)
url = utils.urljoin(
self.base_path, self.id, 'users', user.id, 'roles', role.id
)
resp = session.put(
url,
)
if resp.status_code == 204:
return True
return False
def validate_user_has_role(self, session, user, role):
"""Validates that a user has a role on a project"""
url = utils.urljoin(self.base_path, self.id, 'users',
user.id, 'roles', role.id)
resp = session.head(url,)
url = utils.urljoin(
self.base_path, self.id, 'users', user.id, 'roles', role.id
)
resp = session.head(
url,
)
if resp.status_code == 204:
return True
return False
def unassign_role_from_user(self, session, user, role):
"""Unassigns a role from a user on a project"""
url = utils.urljoin(self.base_path, self.id, 'users',
user.id, 'roles', role.id)
resp = session.delete(url,)
url = utils.urljoin(
self.base_path, self.id, 'users', user.id, 'roles', role.id
)
resp = session.delete(
url,
)
if resp.status_code == 204:
return True
return False
def assign_role_to_group(self, session, group, role):
"""Assign role to group on project"""
url = utils.urljoin(self.base_path, self.id, 'groups',
group.id, 'roles', role.id)
resp = session.put(url,)
url = utils.urljoin(
self.base_path, self.id, 'groups', group.id, 'roles', role.id
)
resp = session.put(
url,
)
if resp.status_code == 204:
return True
return False
def validate_group_has_role(self, session, group, role):
"""Validates that a group has a role on a project"""
url = utils.urljoin(self.base_path, self.id, 'groups',
group.id, 'roles', role.id)
resp = session.head(url,)
url = utils.urljoin(
self.base_path, self.id, 'groups', group.id, 'roles', role.id
)
resp = session.head(
url,
)
if resp.status_code == 204:
return True
return False
def unassign_role_from_group(self, session, group, role):
"""Unassigns a role from a group on a project"""
url = utils.urljoin(self.base_path, self.id, 'groups',
group.id, 'roles', role.id)
resp = session.delete(url,)
url = utils.urljoin(
self.base_path, self.id, 'groups', group.id, 'roles', role.id
)
resp = session.delete(
url,
)
if resp.status_code == 204:
return True
return False

View File

@ -28,7 +28,8 @@ class RegisteredLimit(resource.Resource):
commit_jsonpatch = True
_query_mapping = resource.QueryParameters(
'service_id', 'region_id', 'resource_name')
'service_id', 'region_id', 'resource_name'
)
# Properties
#: User-facing description of the registered_limit. *Type: string*

View File

@ -26,8 +26,7 @@ class Role(resource.Resource):
allow_list = True
commit_method = 'PATCH'
_query_mapping = resource.QueryParameters(
'name', 'domain_id')
_query_mapping = resource.QueryParameters('name', 'domain_id')
# Properties
#: Unique role name, within the owning domain. *Type: string*

View File

@ -22,10 +22,19 @@ class RoleAssignment(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'group_id', 'role_id', 'scope_domain_id', 'scope_project_id',
'user_id', 'effective', 'include_names', 'include_subtree',
role_id='role.id', user_id='user.id', group_id='group.id',
scope_project_id='scope.project.id', scope_domain_id='scope.domain.id',
'group_id',
'role_id',
'scope_domain_id',
'scope_project_id',
'user_id',
'effective',
'include_names',
'include_subtree',
role_id='role.id',
user_id='user.id',
group_id='group.id',
scope_project_id='scope.project.id',
scope_domain_id='scope.domain.id',
scope_system='scope.system',
)

View File

@ -22,54 +22,66 @@ class System(resource.Resource):
def assign_role_to_user(self, session, user, role):
"""Assign role to user on system"""
url = utils.urljoin(self.base_path, 'users', user.id,
'roles', role.id)
resp = session.put(url,)
url = utils.urljoin(self.base_path, 'users', user.id, 'roles', role.id)
resp = session.put(
url,
)
if resp.status_code == 204:
return True
return False
def validate_user_has_role(self, session, user, role):
"""Validates that a user has a role on a system"""
url = utils.urljoin(self.base_path, 'users', user.id,
'roles', role.id)
resp = session.head(url,)
url = utils.urljoin(self.base_path, 'users', user.id, 'roles', role.id)
resp = session.head(
url,
)
if resp.status_code == 204:
return True
return False
def unassign_role_from_user(self, session, user, role):
"""Unassigns a role from a user on a system"""
url = utils.urljoin(self.base_path, 'users', user.id,
'roles', role.id)
resp = session.delete(url,)
url = utils.urljoin(self.base_path, 'users', user.id, 'roles', role.id)
resp = session.delete(
url,
)
if resp.status_code == 204:
return True
return False
def assign_role_to_group(self, session, group, role):
"""Assign role to group on system"""
url = utils.urljoin(self.base_path, 'groups', group.id,
'roles', role.id)
resp = session.put(url,)
url = utils.urljoin(
self.base_path, 'groups', group.id, 'roles', role.id
)
resp = session.put(
url,
)
if resp.status_code == 204:
return True
return False
def validate_group_has_role(self, session, group, role):
"""Validates that a group has a role on a system"""
url = utils.urljoin(self.base_path, 'groups', group.id,
'roles', role.id)
resp = session.head(url,)
url = utils.urljoin(
self.base_path, 'groups', group.id, 'roles', role.id
)
resp = session.head(
url,
)
if resp.status_code == 204:
return True
return False
def unassign_role_from_group(self, session, group, role):
"""Unassigns a role from a group on a system"""
url = utils.urljoin(self.base_path, 'groups', group.id,
'roles', role.id)
resp = session.delete(url,)
url = utils.urljoin(
self.base_path, 'groups', group.id, 'roles', role.id
)
resp = session.delete(
url,
)
if resp.status_code == 204:
return True
return False

View File

@ -26,7 +26,8 @@ class Trust(resource.Resource):
allow_list = True
_query_mapping = resource.QueryParameters(
'trustor_user_id', 'trustee_user_id')
'trustor_user_id', 'trustee_user_id'
)
# Properties
#: A boolean indicating whether the trust can be issued by the trustee as

View File

@ -32,8 +32,7 @@ class Version(resource.Resource):
if base_path is None:
base_path = cls.base_path
resp = session.get(base_path,
params=params)
resp = session.get(base_path, params=params)
resp = resp.json()
for data in resp[cls.resources_key]['values']:
yield cls.existing(**data)

View File

@ -15,7 +15,6 @@ from openstack.tests.functional import base
class TestApplicationCredentials(base.BaseFunctionalTest):
def setUp(self):
super(TestApplicationCredentials, self).setUp()
self.user_id = self.operator_cloud.current_user_id
@ -24,8 +23,11 @@ class TestApplicationCredentials(base.BaseFunctionalTest):
app_creds = self.conn.identity.create_application_credential(
user=self.user_id, name='app_cred'
)
self.addCleanup(self.conn.identity.delete_application_credential,
self.user_id, app_creds['id'])
self.addCleanup(
self.conn.identity.delete_application_credential,
self.user_id,
app_creds['id'],
)
return app_creds
def test_create_application_credentials(self):
@ -61,8 +63,9 @@ class TestApplicationCredentials(base.BaseFunctionalTest):
self.conn.identity.delete_application_credential(
user=self.user_id, application_credential=app_creds['id']
)
self.assertRaises(exceptions.NotFoundException,
self.conn.identity.get_application_credential,
user=self.user_id,
application_credential=app_creds['id']
)
self.assertRaises(
exceptions.NotFoundException,
self.conn.identity.get_application_credential,
user=self.user_id,
application_credential=app_creds['id'],
)

View File

@ -25,7 +25,6 @@ EXAMPLE = {
class TestVersion(base.TestCase):
def test_basic(self):
sot = version.Version()
self.assertEqual('version', sot.resource_key)

View File

@ -27,7 +27,6 @@ EXAMPLE = {
class TestExtension(base.TestCase):
def test_basic(self):
sot = extension.Extension()
self.assertEqual('extension', sot.resource_key)

View File

@ -24,7 +24,6 @@ EXAMPLE = {
class TestRole(base.TestCase):
def test_basic(self):
sot = role.Role()
self.assertEqual('role', sot.resource_key)

View File

@ -24,7 +24,6 @@ EXAMPLE = {
class TestTenant(base.TestCase):
def test_basic(self):
sot = tenant.Tenant()
self.assertEqual('tenant', sot.resource_key)

View File

@ -24,7 +24,6 @@ EXAMPLE = {
class TestUser(base.TestCase):
def test_basic(self):
sot = user.User()
self.assertEqual('user', sot.resource_key)

View File

@ -15,29 +15,26 @@ from openstack.tests.unit import base
EXAMPLE = {
"user": {
"id": "8ac43bb0926245cead88676a96c750d3"},
"user": {"id": "8ac43bb0926245cead88676a96c750d3"},
"name": 'monitoring',
"secret": 'rEaqvJka48mpv',
"roles": [
{"name": "Reader"}
],
"roles": [{"name": "Reader"}],
"expires_at": '2018-02-27T18:30:59Z',
"description": "Application credential for monitoring",
"unrestricted": "False",
"project_id": "3",
"links": {"self": "http://example.com/v3/application_credential_1"}
"links": {"self": "http://example.com/v3/application_credential_1"},
}
class TestApplicationCredential(base.TestCase):
def test_basic(self):
sot = application_credential.ApplicationCredential()
self.assertEqual('application_credential', sot.resource_key)
self.assertEqual('application_credentials', sot.resources_key)
self.assertEqual('/users/%(user_id)s/application_credentials',
sot.base_path)
self.assertEqual(
'/users/%(user_id)s/application_credentials', sot.base_path
)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_fetch)
self.assertTrue(sot.allow_commit)

View File

@ -25,7 +25,6 @@ EXAMPLE = {
class TestCredential(base.TestCase):
def test_basic(self):
sot = credential.Credential()
self.assertEqual('credential', sot.resource_key)
@ -45,7 +44,8 @@ class TestCredential(base.TestCase):
'limit': 'limit',
'marker': 'marker',
},
sot._query_mapping._mapping)
sot._query_mapping._mapping,
)
def test_make_it(self):
sot = credential.Credential(**EXAMPLE)

View File

@ -32,7 +32,6 @@ EXAMPLE = {
class TestDomain(base.TestCase):
def setUp(self):
super(TestDomain, self).setUp()
self.sess = mock.Mock(spec=adapter.Adapter)
@ -67,7 +66,8 @@ class TestDomain(base.TestCase):
'limit': 'limit',
'marker': 'marker',
},
sot._query_mapping._mapping)
sot._query_mapping._mapping,
)
def test_make_it(self):
sot = domain.Domain(**EXAMPLE)
@ -84,12 +84,11 @@ class TestDomain(base.TestCase):
self.assertTrue(
sot.assign_role_to_user(
self.sess,
user.User(id='1'),
role.Role(id='2')))