Add policy to allow owner to force delete volumes

The default in cinder is to only allow the admin to
force delete a volume; this change allows the
admin_or_owner to force delete a volume.

This was previously authored by Chris MacNaughton in change
I703a21b68186059a63f0d06d88cd2528e821f3d3
And then reverted in change
I77f9351da8516e5af40fea57400101e6dd16b528

This change includes gating on the OpenStack version.

Change-Id: I35599bae8a94724869a36c555ebfc6bf94384bd4
Co-Authored-By: Chris MacNaughton <chris.macnaughton@canonical.com>
Closes-Bug: #1782008
This commit is contained in:
Chris MacNaughton 2018-07-17 10:06:52 +02:00 committed by David Ames
parent 834cde35ec
commit 26c0dec5f3
3 changed files with 158 additions and 0 deletions

View File

@ -157,6 +157,7 @@ class CinderCharmError(Exception):
CINDER_CONF_DIR = "/etc/cinder"
CINDER_CONF = '%s/cinder.conf' % CINDER_CONF_DIR
CINDER_API_CONF = '%s/api-paste.ini' % CINDER_CONF_DIR
CINDER_POLICY_JSON = '%s/policy.json' % CINDER_CONF_DIR
CEPH_CONF = '/etc/ceph/ceph.conf'
HAPROXY_CONF = '/etc/haproxy/haproxy.cfg'
@ -227,6 +228,10 @@ BASE_RESOURCE_MAP = OrderedDict([
'contexts': [context.IdentityServiceContext()],
'services': ['cinder-api'],
}),
(CINDER_POLICY_JSON, {
'contexts': [],
'services': ['cinder-api']
}),
(ceph_config_file(), {
'contexts': [context.CephContext()],
'services': ['cinder-volume']
@ -269,6 +274,7 @@ def resource_map(release=None):
hook execution.
"""
resource_map = deepcopy(BASE_RESOURCE_MAP)
release = release or os_release('cinder-common', base='icehouse')
if relation_ids('backup-backend'):
resource_map[CINDER_CONF]['services'].append('cinder-backup')
resource_map[ceph_config_file()]['services'].append('cinder-backup')
@ -322,6 +328,9 @@ def resource_map(release=None):
'services': ['apache2']
}
if release and CompareOpenStackReleases(release) < 'queens':
resource_map.pop(CINDER_POLICY_JSON)
return resource_map

View File

@ -0,0 +1,3 @@
{
"volume_extension:volume_admin_actions:force_delete": "rule:admin_or_owner"
}

View File

@ -26,6 +26,8 @@ from charmhelpers.contrib.openstack.amulet.utils import (
# ERROR
)
import keystoneclient
# Use DEBUG to turn on debug logging
u = OpenStackAmuletUtils(DEBUG)
@ -154,6 +156,22 @@ class CinderBasicDeployment(OpenStackAmuletDeployment):
else:
api_version = 1
self.cinder = u.authenticate_cinder_admin(self.keystone, api_version)
if self._get_openstack_release() >= self.xenial_queens:
self.create_users_v3()
self.keystone_non_admin = u.authenticate_keystone(
self.keystone_sentry.info['public-address'],
user_domain_name=self.demo_domain,
username=self.demo_user_v3,
password='password',
api_version=self.keystone_api_version,
project_domain_name=self.demo_domain,
project_name=self.demo_project,
)
else:
self.create_users_v2()
self.keystone_non_admin = u.authenticate_keystone_user(
self.keystone, user=self.demo_user,
password='password', tenant=self.demo_tenant)
force_v1_client = False
if self._get_openstack_release() == self.trusty_icehouse:
@ -168,6 +186,118 @@ class CinderBasicDeployment(OpenStackAmuletDeployment):
self.keystone,
force_v1_client=force_v1_client)
self.cinder_non_admin = u.authenticate_cinder_admin(
self.keystone_non_admin, api_version)
def create_users_v2(self):
# Create a demo tenant/role/user
self.demo_tenant = 'demoTenant'
self.demo_role = 'demoRole'
self.demo_user = 'demoUser'
self.keystone_api_version = 2
if not u.tenant_exists(self.keystone, self.demo_tenant):
tenant = self.keystone.tenants.create(
tenant_name=self.demo_tenant,
description='demo tenant',
enabled=True)
self.keystone.roles.create(name=self.demo_role)
self.keystone.users.create(name=self.demo_user,
password='password',
tenant_id=tenant.id,
email='demo@demo.com')
def create_users_v3(self):
# Create a demo tenant/role/user
self.demo_project = 'demoProject'
self.demo_user_v3 = 'demoUserV3'
self.demo_role = 'demoRoleV3'
self.demo_domain_admin = 'demoDomainAdminV3'
self.demo_domain = 'demoDomain'
self.keystone_api_version = 3
try:
domain = self.keystone.domains.find(name=self.demo_domain)
except keystoneclient.exceptions.NotFound:
domain = self.keystone.domains.create(
self.demo_domain,
description='Demo Domain',
enabled=True
)
try:
self.keystone.projects.find(name=self.demo_project)
except keystoneclient.exceptions.NotFound:
self.keystone.projects.create(
self.demo_project,
domain,
description='Demo Project',
enabled=True,
)
try:
self.keystone.roles.find(name=self.demo_role)
except keystoneclient.exceptions.NotFound:
self.keystone.roles.create(name=self.demo_role)
try:
self.keystone.roles.find(name='Member')
except keystoneclient.exceptions.NotFound:
self.keystone.roles.create(name='Member')
if not self.find_keystone_v3_user(self.keystone,
self.demo_user_v3,
self.demo_domain):
user = self.keystone.users.create(
self.demo_user_v3,
domain=domain.id,
project=self.demo_project,
password='password',
email='demov3@demo.com',
description='Demo',
enabled=True)
role = self.keystone.roles.find(name='Member')
u.log.debug("self.keystone.roles.grant('{}', user='{}', "
"domain='{}')".format(role.id, user.id, domain.id))
self.keystone.roles.grant(
role.id,
user=user.id,
project=self.keystone.projects.find(name=self.demo_project).id)
try:
self.keystone.roles.find(name='Admin')
except keystoneclient.exceptions.NotFound:
self.keystone.roles.create(name='Admin')
if not self.find_keystone_v3_user(self.keystone,
self.demo_domain_admin,
self.demo_domain):
user = self.keystone.users.create(
self.demo_domain_admin,
domain=domain.id,
project=self.demo_project,
password='password',
email='demoadminv3@demo.com',
description='Demo Admin',
enabled=True)
role = self.keystone.roles.find(name='Admin')
u.log.debug("self.keystone.roles.grant('{}', user='{}', "
"domain='{}')".format(role.id, user.id, domain.id))
self.keystone.roles.grant(
role.id,
user=user.id,
domain=domain.id)
def find_keystone_v3_user(self, client, username, domain):
"""Find a user within a specified keystone v3 domain"""
domain_users = client.users.list(
domain=client.domains.find(name=domain).id
)
for user in domain_users:
if username.lower() == user.name.lower():
return user
return None
def _extend_cinder_volume(self, vol_id, new_size=2):
"""Extend an existing cinder volume size.
@ -743,6 +873,22 @@ class CinderBasicDeployment(OpenStackAmuletDeployment):
u.log.debug('Deleting volume {}...'.format(vol.id))
u.delete_resource(self.cinder.volumes, vol.id, msg="cinder volume")
def test_404_admin_force_delete_volume(self):
"""Create a cinder volume and delete it."""
u.log.debug('Creating, checking and deleting cinder volume...')
vol_new = u.create_cinder_volume(self.cinder)
vol_new.force_delete()
def test_405_non_admin_force_delete_volume(self):
"""Create a cinder volume and delete it."""
os_release = self._get_openstack_release()
if os_release < self.xenial_queens:
u.log.info('Skipping test, {} < queens'.format(os_release))
return
u.log.debug('Creating, checking and deleting cinder volume...')
vol_new = u.create_cinder_volume(self.cinder_non_admin)
vol_new.force_delete()
def test_900_restart_on_config_change(self):
"""Verify that the specified services are restarted when the
config is changed."""