Keystone resource plugin for Group

Adds resource plugin for Keystone group

Change-Id: Ib04a2c389a59a3ef9db11c2c3e7fef731a6196fe
Implements: blueprint keystone-resources
This commit is contained in:
Kanagaraj Manickam
2015-03-05 09:22:22 +05:30
parent ebdaaae518
commit 610539045b
4 changed files with 1195 additions and 0 deletions

View File

@@ -0,0 +1,182 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import role_assignments
from heat.common.i18n import _
from heat.engine import constraints
from heat.engine import properties
from heat.engine import support
class KeystoneGroup(role_assignments.KeystoneRoleAssignment):
'''
Heat Template Resource for Keystone Group.
heat_template_version: 2013-05-23
description: Sample Keystone Group template
parameters:
group_name:
type: string
description: Keystone group name
group_description:
type: string
description: Keystone group description
group_domain:
type: string
description: Keystone group domain name
group_role:
type: string
description: role
group_role_domain:
type: string
description: group role domain
group_role_project:
type: string
description: group role project
resources:
admin_group:
type: OS::Keystone::Group
properties:
name: {get_param: group_name}
domain: {get_param: group_domain}
description: {get_param: group_description}
roles:
- role: {get_param: group_role}
domain: {get_param: group_role_domain}
project: {get_param: group_role_project}
'''
support_status = support.SupportStatus(
version='2015.1',
message=_('Supported versions: keystone v3'))
PROPERTIES = (
NAME, DOMAIN, DESCRIPTION
) = (
'name', 'domain', 'description'
)
properties_schema = {
NAME: properties.Schema(
properties.Schema.STRING,
_('Name of keystone group.'),
update_allowed=True
),
DOMAIN: properties.Schema(
properties.Schema.STRING,
_('Name or id of keystone domain.'),
default='default',
update_allowed=True,
constraints=[constraints.CustomConstraint('keystone.domain')]
),
DESCRIPTION: properties.Schema(
properties.Schema.STRING,
_('Description of keystone group.'),
default='',
update_allowed=True
)
}
(properties_schema.
update(role_assignments.KeystoneRoleAssignment.properties_schema))
def _create_group(self,
group_name,
description,
domain):
domain = (self.client_plugin('keystone').
get_domain_id(domain))
return self.keystone().client.groups.create(
name=group_name,
domain=domain,
description=description)
def _delete_group(self, group_id):
return self.keystone().client.groups.delete(group_id)
def _update_group(self,
group_id,
domain,
new_name=None,
new_description=None):
values = dict()
if new_name is not None:
values['name'] = new_name
if new_description is not None:
values['description'] = new_description
if len(values) == 0:
return
values['group'] = group_id
domain = (self.client_plugin('keystone').
get_domain_id(domain))
values['domain_id'] = domain
return self.keystone().client.groups.update(**values)
def handle_create(self):
group_name = (self.properties.get(self.NAME) or
self.physical_resource_name())
description = self.properties.get(self.DESCRIPTION)
domain = self.properties.get(self.DOMAIN)
group = self._create_group(
group_name=group_name,
description=description,
domain=domain
)
self.resource_id_set(group.id)
super(KeystoneGroup, self).handle_create(user_id=None,
group_id=group.id)
def handle_update(self, json_snippet=None, tmpl_diff=None, prop_diff=None):
name = prop_diff.get(self.NAME) or self.physical_resource_name()
description = prop_diff.get(self.DESCRIPTION)
domain = (prop_diff.get(self.DOMAIN) or
self._stored_properties_data.get(self.DOMAIN))
self._update_group(
group_id=self.resource_id,
new_name=name,
new_description=description,
domain=domain
)
super(KeystoneGroup, self).handle_update(user_id=None,
group_id=self.resource_id,
prop_diff=prop_diff)
def handle_delete(self):
if self.resource_id is not None:
try:
super(KeystoneGroup, self).handle_delete(
user_id=None,
group_id=self.resource_id)
self._delete_group(group_id=self.resource_id)
except Exception as ex:
self.client_plugin('keystone').ignore_not_found(ex)
def resource_mapping():
return {
'OS::Keystone::Group': KeystoneGroup
}

View File

@@ -0,0 +1,318 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from heat.common import exception
from heat.common.i18n import _
from heat.engine import constraints
from heat.engine import properties
from heat.engine import resource
from heat.engine import support
class KeystoneRoleAssignment(resource.Resource):
'''
Keystone Role assignment class implements role assignments between
user/groups and project/domain.
heat_template_version: 2013-05-23
parameters:
... Group or User parameters
group_role:
type: string
description: role
group_role_domain:
type: string
description: group role domain
group_role_project:
type: string
description: group role project
resources:
admin_group:
type: OS::Keystone::Group OR OS::Keystone::User
properties:
... Group or User properties
roles:
- role: {get_param: group_role}
domain: {get_param: group_role_domain}
- role: {get_param: group_role}
project: {get_param: group_role_project}
'''
support_status = support.SupportStatus(
version='2015.1',
message=_('Supported versions: keystone v3'))
PROPERTIES = (
ROLES
) = (
'roles'
)
_ROLES_MAPPING_PROPERTIES = (
ROLE, DOMAIN, PROJECT
) = (
'role', 'domain', 'project'
)
properties_schema = {
ROLES: properties.Schema(
properties.Schema.LIST,
_('List of role assignments.'),
schema=properties.Schema(
properties.Schema.MAP,
_('Map between role with either project or domain.'),
schema={
ROLE: properties.Schema(
properties.Schema.STRING,
_('Keystone role'),
required=True,
constraints=([constraints.
CustomConstraint('keystone.role')])
),
PROJECT: properties.Schema(
properties.Schema.STRING,
_('Keystone project'),
constraints=([constraints.
CustomConstraint('keystone.project')])
),
DOMAIN: properties.Schema(
properties.Schema.STRING,
_('Keystone domain'),
constraints=([constraints.
CustomConstraint('keystone.domain')])
),
}
),
update_allowed=True
)
}
def _add_role_assignments_to_group(self, group_id, role_assignments):
for role_assignment in self._normalize_to_id(role_assignments):
if role_assignment.get(self.PROJECT) is not None:
self.keystone().client.roles.grant(
role=role_assignment.get(self.ROLE),
project=role_assignment.get(self.PROJECT),
group=group_id
)
elif role_assignment.get(self.DOMAIN) is not None:
self.keystone().client.roles.grant(
role=role_assignment.get(self.ROLE),
domain=role_assignment.get(self.DOMAIN),
group=group_id
)
def _add_role_assignments_to_user(self, user_id, role_assignments):
for role_assignment in self._normalize_to_id(role_assignments):
if role_assignment.get(self.PROJECT) is not None:
self.keystone().client.roles.grant(
role=role_assignment.get(self.ROLE),
project=role_assignment.get(self.PROJECT),
user=user_id
)
elif role_assignment.get(self.DOMAIN) is not None:
self.keystone().client.roles.grant(
role=role_assignment.get(self.ROLE),
domain=role_assignment.get(self.DOMAIN),
user=user_id
)
def _remove_role_assignments_from_group(self, group_id, role_assignments):
for role_assignment in self._normalize_to_id(role_assignments):
if role_assignment.get(self.PROJECT) is not None:
self.keystone().client.roles.revoke(
role=role_assignment.get(self.ROLE),
project=role_assignment.get(self.PROJECT),
group=group_id
)
elif role_assignment.get(self.DOMAIN) is not None:
self.keystone().client.roles.revoke(
role=role_assignment.get(self.ROLE),
domain=role_assignment.get(self.DOMAIN),
group=group_id
)
def _remove_role_assignments_from_user(self, user_id, role_assignments):
for role_assignment in self._normalize_to_id(role_assignments):
if role_assignment.get(self.PROJECT) is not None:
self.keystone().client.roles.revoke(
role=role_assignment.get(self.ROLE),
project=role_assignment.get(self.PROJECT),
user=user_id
)
elif role_assignment.get(self.DOMAIN) is not None:
self.keystone().client.roles.revoke(
role=role_assignment.get(self.ROLE),
domain=role_assignment.get(self.DOMAIN),
user=user_id
)
def _normalize_to_id(self, role_assignment_prps):
role_assignments = []
if role_assignment_prps is None:
return role_assignments
for role_assignment in role_assignment_prps:
role = role_assignment.get(self.ROLE)
project = role_assignment.get(self.PROJECT)
domain = role_assignment.get(self.DOMAIN)
role_assignments.append({
self.ROLE: (self.client_plugin('keystone').
get_role_id(role)),
self.PROJECT: (self.client_plugin('keystone').
get_project_id(project)) if project else None,
self.DOMAIN: (self.client_plugin('keystone').
get_domain_id(domain)) if domain else None
})
return role_assignments
@staticmethod
def _find_diff(updated_prps, stored_prps):
updated_role_project_assignments = []
updated_role_domain_assignments = []
# Split the properties into two set of role assignments
# (project, domain) from updated properties
for role_assignment in updated_prps or []:
if role_assignment.get(KeystoneRoleAssignment.PROJECT) is not None:
updated_role_project_assignments.append(
'%s:%s' % (
role_assignment[KeystoneRoleAssignment.ROLE],
role_assignment[KeystoneRoleAssignment.PROJECT]))
elif (role_assignment.get(KeystoneRoleAssignment.DOMAIN)
is not None):
updated_role_domain_assignments.append(
'%s:%s' % (role_assignment[KeystoneRoleAssignment.ROLE],
role_assignment[KeystoneRoleAssignment.DOMAIN]))
stored_role_project_assignments = []
stored_role_domain_assignments = []
# Split the properties into two set of role assignments
# (project, domain) from updated properties
for role_assignment in (stored_prps or []):
if role_assignment.get(KeystoneRoleAssignment.PROJECT) is not None:
stored_role_project_assignments.append(
'%s:%s' % (
role_assignment[KeystoneRoleAssignment.ROLE],
role_assignment[KeystoneRoleAssignment.PROJECT]))
elif (role_assignment.get(KeystoneRoleAssignment.DOMAIN)
is not None):
stored_role_domain_assignments.append(
'%s:%s' % (role_assignment[KeystoneRoleAssignment.ROLE],
role_assignment[KeystoneRoleAssignment.DOMAIN]))
new_role_assignments = []
removed_role_assignments = []
# NOTE: finding the diff of list of strings is easier by using 'set'
# so properties are converted to string in above sections
# New items
for item in (set(updated_role_project_assignments) -
set(stored_role_project_assignments)):
new_role_assignments.append(
{KeystoneRoleAssignment.ROLE: item[:item.find(':')],
KeystoneRoleAssignment.PROJECT: item[item.find(':') + 1:]}
)
for item in (set(updated_role_domain_assignments) -
set(stored_role_domain_assignments)):
new_role_assignments.append(
{KeystoneRoleAssignment.ROLE: item[:item.find(':')],
KeystoneRoleAssignment.DOMAIN: item[item.find(':') + 1:]}
)
# Old items
for item in (set(stored_role_project_assignments) -
set(updated_role_project_assignments)):
removed_role_assignments.append(
{KeystoneRoleAssignment.ROLE: item[:item.find(':')],
KeystoneRoleAssignment.PROJECT: item[item.find(':') + 1:]}
)
for item in (set(stored_role_domain_assignments) -
set(updated_role_domain_assignments)):
removed_role_assignments.append(
{KeystoneRoleAssignment.ROLE: item[:item.find(':')],
KeystoneRoleAssignment.DOMAIN: item[item.find(':') + 1:]}
)
return new_role_assignments, removed_role_assignments
def handle_create(self, user_id=None, group_id=None):
if self.properties.get(self.ROLES) is not None:
if user_id is not None:
self._add_role_assignments_to_user(
user_id,
self.properties.get(self.ROLES))
elif group_id is not None:
self._add_role_assignments_to_group(
group_id,
self.properties.get(self.ROLES))
def handle_update(self, user_id=None, group_id=None, prop_diff=None):
(new_role_assignments,
removed_role_assignments) = KeystoneRoleAssignment._find_diff(
prop_diff.get(self.ROLES),
self._stored_properties_data.get(self.ROLES))
if len(new_role_assignments) > 0:
if user_id is not None:
self._add_role_assignments_to_user(
user_id,
new_role_assignments)
elif group_id is not None:
self._add_role_assignments_to_group(
group_id,
new_role_assignments)
if len(removed_role_assignments) > 0:
if user_id is not None:
self._remove_role_assignments_from_user(
user_id,
removed_role_assignments)
elif group_id is not None:
self._remove_role_assignments_from_group(
group_id,
removed_role_assignments)
def handle_delete(self, user_id=None, group_id=None):
if self._stored_properties_data.get(self.ROLES) is not None:
if user_id is not None:
self._remove_role_assignments_from_user(
user_id,
(self._stored_properties_data.
get(self.ROLES)))
elif group_id is not None:
self._remove_role_assignments_from_group(
group_id,
(self._stored_properties_data.
get(self.ROLES)))
def validate(self):
super(KeystoneRoleAssignment, self).validate()
if self.properties.get(self.ROLES) is not None:
for role_assignment in self.properties.get(self.ROLES):
project = role_assignment.get(self.PROJECT)
domain = role_assignment.get(self.DOMAIN)
if project is not None and domain is not None:
raise exception.ResourcePropertyConflict(self.PROJECT,
self.DOMAIN)
if project is None and domain is None:
msg = _('Either project or domain must be specified for'
' role %s') % role_assignment.get(self.ROLE)
raise exception.StackValidationFailed(message=msg)

View File

@@ -0,0 +1,306 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from heat.engine import constraints
from heat.engine import properties
from heat.engine import resource
from heat.engine import stack
from heat.engine import template
from heat.tests import common
from heat.tests import utils
from ..resources.group import KeystoneGroup # noqa
from ..resources.group import resource_mapping # noqa
keystone_group_template = {
'heat_template_version': '2013-05-23',
'resources': {
'test_group': {
'type': 'OS::Keystone::Group',
'properties': {
'name': 'test_group_1',
'description': 'Test group',
'domain': 'default'
}
}
}
}
RESOURCE_TYPE = 'OS::Keystone::Group'
class KeystoneGroupTest(common.HeatTestCase):
def setUp(self):
super(KeystoneGroupTest, self).setUp()
self.ctx = utils.dummy_context()
# For unit testing purpose. Register resource provider explicitly.
resource._register_class(RESOURCE_TYPE, KeystoneGroup)
self.stack = stack.Stack(
self.ctx, 'test_stack_keystone',
template.Template(keystone_group_template)
)
self.test_group = self.stack['test_group']
# Mock client
self.keystoneclient = mock.MagicMock()
self.test_group.keystone = mock.MagicMock()
self.test_group.keystone.return_value = self.keystoneclient
self.groups = self.keystoneclient.client.groups
# Mock client plugin
def _side_effect(value):
return value
self.keystone_client_plugin = mock.MagicMock()
(self.keystone_client_plugin.get_domain_id.
side_effect) = _side_effect
self.test_group.client_plugin = mock.MagicMock()
(self.test_group.client_plugin.
return_value) = self.keystone_client_plugin
def _get_mock_group(self):
value = mock.MagicMock()
group_id = '477e8273-60a7-4c41-b683-fdb0bc7cd151'
value.id = group_id
return value
def test_resource_mapping(self):
mapping = resource_mapping()
self.assertEqual(1, len(mapping))
self.assertEqual(KeystoneGroup, mapping[RESOURCE_TYPE])
self.assertIsInstance(self.test_group, KeystoneGroup)
def test_properties_title(self):
property_title_map = {
KeystoneGroup.NAME: 'name',
KeystoneGroup.DESCRIPTION: 'description',
KeystoneGroup.DOMAIN: 'domain'
}
for actual_title, expected_title in property_title_map.items():
self.assertEqual(
expected_title,
actual_title,
'KeystoneGroup PROPERTIES(%s) title modified.' %
actual_title)
def test_property_name_validate_schema(self):
schema = KeystoneGroup.properties_schema[KeystoneGroup.NAME]
self.assertEqual(
True,
schema.update_allowed,
'update_allowed for property %s is modified' %
KeystoneGroup.NAME)
self.assertEqual(properties.Schema.STRING,
schema.type,
'type for property %s is modified' %
KeystoneGroup.NAME)
self.assertEqual('Name of keystone group.',
schema.description,
'description for property %s is modified' %
KeystoneGroup.NAME)
def test_property_description_validate_schema(self):
schema = KeystoneGroup.properties_schema[KeystoneGroup.DESCRIPTION]
self.assertEqual(
True,
schema.update_allowed,
'update_allowed for property %s is modified' %
KeystoneGroup.DESCRIPTION)
self.assertEqual(properties.Schema.STRING,
schema.type,
'type for property %s is modified' %
KeystoneGroup.DESCRIPTION)
self.assertEqual('Description of keystone group.',
schema.description,
'description for property %s is modified' %
KeystoneGroup.DESCRIPTION)
self.assertEqual(
'',
schema.default,
'default for property %s is modified' %
KeystoneGroup.DESCRIPTION)
def test_property_domain_validate_schema(self):
schema = KeystoneGroup.properties_schema[KeystoneGroup.DOMAIN]
self.assertEqual(
True,
schema.update_allowed,
'update_allowed for property %s is modified' %
KeystoneGroup.DOMAIN)
self.assertEqual(properties.Schema.STRING,
schema.type,
'type for property %s is modified' %
KeystoneGroup.DOMAIN)
self.assertEqual('Name or id of keystone domain.',
schema.description,
'description for property %s is modified' %
KeystoneGroup.DOMAIN)
self.assertEqual([constraints.CustomConstraint('keystone.domain')],
schema.constraints,
'constrains for property %s is modified' %
KeystoneGroup.DOMAIN)
self.assertEqual(
'default',
schema.default,
'default for property %s is modified' %
KeystoneGroup.DOMAIN)
def _get_property_schema_value_default(self, name):
schema = KeystoneGroup.properties_schema[name]
return schema.default
def test_group_handle_create(self):
mock_group = self._get_mock_group()
self.groups.create.return_value = mock_group
# validate the properties
self.assertEqual(
'test_group_1',
self.test_group.properties.get(KeystoneGroup.NAME))
self.assertEqual(
'Test group',
self.test_group.properties.get(KeystoneGroup.DESCRIPTION))
self.assertEqual(
'default',
self.test_group.properties.get(KeystoneGroup.DOMAIN))
self.test_group.handle_create()
# validate group creation
self.groups.create.assert_called_once_with(
name='test_group_1',
description='Test group',
domain='default')
# validate physical resource id
self.assertEqual(mock_group.id, self.test_group.resource_id)
def test_group_handle_create_default(self):
values = {
KeystoneGroup.NAME: None,
KeystoneGroup.DESCRIPTION:
(self._get_property_schema_value_default(
KeystoneGroup.DESCRIPTION)),
KeystoneGroup.DOMAIN:
(self._get_property_schema_value_default(
KeystoneGroup.DOMAIN)),
KeystoneGroup.ROLES: None
}
def _side_effect(key):
return values[key]
mock_group = self._get_mock_group()
self.groups.create.return_value = mock_group
self.test_group.properties = mock.MagicMock()
self.test_group.properties.get.side_effect = _side_effect
self.test_group.physical_resource_name = mock.MagicMock()
self.test_group.physical_resource_name.return_value = 'foo'
# validate the properties
self.assertEqual(
None,
self.test_group.properties.get(KeystoneGroup.NAME))
self.assertEqual(
'',
self.test_group.properties.get(KeystoneGroup.DESCRIPTION))
self.assertEqual(
'default',
self.test_group.properties.get(KeystoneGroup.DOMAIN))
self.test_group.handle_create()
# validate group creation
self.groups.create.assert_called_once_with(
name='foo',
description='',
domain='default')
def test_group_handle_update(self):
self.test_group.resource_id = '477e8273-60a7-4c41-b683-fdb0bc7cd151'
self.test_group._stored_properties_data = dict(roles=None)
prop_diff = {KeystoneGroup.NAME: 'test_group_1_updated',
KeystoneGroup.DESCRIPTION: 'Test Group updated',
KeystoneGroup.DOMAIN: 'test_domain'}
self.test_group.handle_update(json_snippet=None,
tmpl_diff=None,
prop_diff=prop_diff)
self.groups.update.assert_called_once_with(
group=self.test_group.resource_id,
name=prop_diff[KeystoneGroup.NAME],
description=prop_diff[KeystoneGroup.DESCRIPTION],
domain_id='test_domain'
)
def test_group_handle_update_default(self):
self.test_group.resource_id = '477e8273-60a7-4c41-b683-fdb0bc7cd151'
self.test_group._stored_properties_data = dict(domain='default')
self.test_group.physical_resource_name = mock.MagicMock()
self.test_group.physical_resource_name.return_value = 'foo'
prop_diff = {KeystoneGroup.DESCRIPTION: 'Test Project updated'}
self.test_group.handle_update(json_snippet=None,
tmpl_diff=None,
prop_diff=prop_diff)
# validate default name to physical resource name and
# domain is set from stored properties used during creation.
self.groups.update.assert_called_once_with(
group=self.test_group.resource_id,
name='foo',
description=prop_diff[KeystoneGroup.DESCRIPTION],
domain_id='default'
)
def test_group_handle_delete(self):
self.test_group.resource_id = '477e8273-60a7-4c41-b683-fdb0bc7cd151'
self.test_group._stored_properties_data = dict(roles=None)
self.groups.delete.return_value = None
self.assertIsNone(self.test_group.handle_delete())
self.groups.delete.assert_called_once_with(
self.test_group.resource_id
)
def test_group_handle_delete_resource_id_is_none(self):
self.resource_id = None
self.assertIsNone(self.test_group.handle_delete())
def test_group_handle_delete_not_found(self):
self.test_group._stored_properties_data = dict(roles=None)
exc = self.keystoneclient.NotFound
self.groups.delete.side_effect = exc
self.assertIsNone(self.test_group.handle_delete())

View File

@@ -0,0 +1,389 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from heat.common import exception
from heat.engine import properties
from heat.engine import resource
from heat.engine import stack
from heat.engine import template
from heat.tests import common
from heat.tests import utils
from ..resources.role_assignments import KeystoneRoleAssignment # noqa
RESOURCE_TYPE = 'OS::Keystone::DummyRoleAssignment'
keystone_role_assignment_template = {
'heat_template_version': '2013-05-23',
'resources': {
'test_role_assignment': {
'type': RESOURCE_TYPE,
'properties': {
'roles': [
{
'role': 'role_1',
'project': 'project_1',
},
{
'role': 'role_1',
'domain': 'domain_1'
}
]
}
}
}
}
class KeystoneRoleAssignmentTest(common.HeatTestCase):
def setUp(self):
super(KeystoneRoleAssignmentTest, self).setUp()
self.ctx = utils.dummy_context()
# For unit testing purpose. Register resource provider explicitly.
resource._register_class(RESOURCE_TYPE, KeystoneRoleAssignment)
self.stack = stack.Stack(
self.ctx, 'test_stack_keystone',
template.Template(keystone_role_assignment_template)
)
self.test_role_assignment = self.stack['test_role_assignment']
# Mock client
self.keystoneclient = mock.MagicMock()
self.test_role_assignment.keystone = mock.MagicMock()
self.test_role_assignment.keystone.return_value = self.keystoneclient
self.roles = self.keystoneclient.client.roles
# Mock client plugin
def _side_effect(value):
return value
self.keystone_client_plugin = mock.MagicMock()
(self.keystone_client_plugin.get_domain_id.
side_effect) = _side_effect
(self.keystone_client_plugin.get_role_id.
side_effect) = _side_effect
(self.keystone_client_plugin.get_project_id.
side_effect) = _side_effect
self.test_role_assignment.client_plugin = mock.MagicMock()
(self.test_role_assignment.client_plugin.
return_value) = self.keystone_client_plugin
def test_resource_mapping_not_defined(self):
# this resource is not planned to support in heat, so resource_mapping
# is not to be defined in KeystoneRoleAssignment
try:
from ..resources.role_assignments import resource_mapping # noqa
self.fail("KeystoneRoleAssignment is designed to be exposed as"
"Heat resource")
except Exception:
pass
def test_properties_title(self):
property_title_map = {
KeystoneRoleAssignment.ROLES: 'roles'
}
for actual_title, expected_title in property_title_map.items():
self.assertEqual(
expected_title,
actual_title,
'KeystoneRoleAssignment PROPERTIES(%s) title modified.' %
actual_title)
def test_property_roles_validate_schema(self):
schema = (KeystoneRoleAssignment.
properties_schema[KeystoneRoleAssignment.ROLES])
self.assertEqual(
True,
schema.update_allowed,
'update_allowed for property %s is modified' %
KeystoneRoleAssignment.ROLES)
self.assertEqual(properties.Schema.LIST,
schema.type,
'type for property %s is modified' %
KeystoneRoleAssignment.ROLES)
self.assertEqual('List of role assignments.',
schema.description,
'description for property %s is modified' %
KeystoneRoleAssignment.ROLES)
def test_role_assignment_handle_create_user(self):
# validate the properties
self.assertEqual([
{
'role': 'role_1',
'project': 'project_1',
'domain': None
},
{
'role': 'role_1',
'project': None,
'domain': 'domain_1'
}],
(self.test_role_assignment.properties.
get(KeystoneRoleAssignment.ROLES)))
self.test_role_assignment.handle_create(user_id='user_1',
group_id=None)
# validate role assignment creation
# role-user-domain
self.roles.grant.assert_any_call(
role='role_1',
user='user_1',
domain='domain_1')
# role-user-project
self.roles.grant.assert_any_call(
role='role_1',
user='user_1',
project='project_1')
def test_role_assignment_handle_create_group(self):
# validate the properties
self.assertEqual([
{
'role': 'role_1',
'project': 'project_1',
'domain': None
},
{
'role': 'role_1',
'project': None,
'domain': 'domain_1'
}],
(self.test_role_assignment.properties.
get(KeystoneRoleAssignment.ROLES)))
self.test_role_assignment.handle_create(user_id=None,
group_id='group_1')
# validate role assignment creation
# role-group-domain
self.roles.grant.assert_any_call(
role='role_1',
group='group_1',
domain='domain_1')
# role-group-project
self.roles.grant.assert_any_call(
role='role_1',
group='group_1',
project='project_1')
def test_role_assignment_handle_update_user(self):
self.test_role_assignment._stored_properties_data = {
'roles': [
{
'role': 'role_1',
'project': 'project_1'
},
{
'role': 'role_1',
'domain': 'domain_1'
}
]
}
prop_diff = {
KeystoneRoleAssignment.ROLES: [
{
'role': 'role_2',
'project': 'project_1'
},
{
'role': 'role_2',
'domain': 'domain_1'
}
]
}
self.test_role_assignment.handle_update(
user_id='user_1',
group_id=None,
prop_diff=prop_diff)
# Add role2-project1-domain1
# role-user-domain
self.roles.grant.assert_any_call(
role='role_2',
user='user_1',
domain='domain_1')
# role-user-project
self.roles.grant.assert_any_call(
role='role_2',
user='user_1',
project='project_1')
# Remove role1-project1-domain1
# role-user-domain
self.roles.revoke.assert_any_call(
role='role_1',
user='user_1',
domain='domain_1')
# role-user-project
self.roles.revoke.assert_any_call(
role='role_1',
user='user_1',
project='project_1')
def test_role_assignment_handle_update_group(self):
self.test_role_assignment._stored_properties_data = {
'roles': [
{
'role': 'role_1',
'project': 'project_1'
},
{
'role': 'role_1',
'domain': 'domain_1'
}
]
}
prop_diff = {
KeystoneRoleAssignment.ROLES: [
{
'role': 'role_2',
'project': 'project_1'
},
{
'role': 'role_2',
'domain': 'domain_1'
}
]
}
self.test_role_assignment.handle_update(
user_id=None,
group_id='group_1',
prop_diff=prop_diff)
# Add role2-project1-domain1
# role-group-domain
self.roles.grant.assert_any_call(
role='role_2',
group='group_1',
domain='domain_1')
# role-group-project
self.roles.grant.assert_any_call(
role='role_2',
group='group_1',
project='project_1')
# Remove role1-project1-domain1
# role-group-domain
self.roles.revoke.assert_any_call(
role='role_1',
group='group_1',
domain='domain_1')
# role-group-project
self.roles.revoke.assert_any_call(
role='role_1',
group='group_1',
project='project_1')
def test_role_assignment_handle_delete_user(self):
self.test_role_assignment._stored_properties_data = {
'roles': [
{
'role': 'role_1',
'project': 'project_1'
},
{
'role': 'role_1',
'domain': 'domain_1'
}
]
}
self.assertIsNone(self.test_role_assignment.handle_delete(
user_id='user_1',
group_id=None
))
# Remove role1-project1-domain1
# role-user-domain
self.roles.revoke.assert_any_call(
role='role_1',
user='user_1',
domain='domain_1')
# role-user-project
self.roles.revoke.assert_any_call(
role='role_1',
user='user_1',
project='project_1')
def test_role_assignment_handle_delete_group(self):
self.test_role_assignment._stored_properties_data = {
'roles': [
{
'role': 'role_1',
'project': 'project_1'
},
{
'role': 'role_1',
'domain': 'domain_1'
}
]
}
self.assertIsNone(self.test_role_assignment.handle_delete(
user_id=None,
group_id='group_1'
))
# Remove role1-project1-domain1
# role-group-domain
self.roles.revoke.assert_any_call(
role='role_1',
group='group_1',
domain='domain_1')
# role-group-project
self.roles.revoke.assert_any_call(
role='role_1',
group='group_1',
project='project_1')
def test_validate_1(self):
self.test_role_assignment.properties = mock.MagicMock()
# both project and domain are none
self.test_role_assignment.properties.get.return_value = [
dict(role='role1')]
self.assertRaises(exception.StackValidationFailed,
self.test_role_assignment.validate)
def test_validate_2(self):
self.test_role_assignment.properties = mock.MagicMock()
# both project and domain are not none
self.test_role_assignment.properties.get.return_value = [
dict(role='role1',
project='project1',
domain='domain1')
]
self.assertRaises(exception.ResourcePropertyConflict,
self.test_role_assignment.validate)