Added the force parameter to consumer removal and the corresponding CLI commands

When deleting a secret that has consumers, the
--force parameter must be specified in the CLI.

Change-Id: I49d19ac843d5c805fd7f533d07a3a719ce9a1104
This commit is contained in:
Mauricio Harley 2022-11-24 10:50:27 +01:00 committed by Andre Aranha
parent 3ffa1600af
commit 7f6b3cf790
10 changed files with 387 additions and 52 deletions

View File

@ -28,10 +28,16 @@ class DeleteSecret(command.Command):
def get_parser(self, prog_name):
parser = super(DeleteSecret, self).get_parser(prog_name)
parser.add_argument('URI', help='The URI reference for the secret')
parser.add_argument('--force', '-f',
default=False,
help='if specified, forces the '
'deletion of secrets that have consumers.',
action='store_true')
return parser
def take_action(self, args):
self.app.client_manager.key_manager.secrets.delete(args.URI)
self.app.client_manager.key_manager.secrets.delete(
args.URI, args.force)
class GetSecret(show.ShowOne):
@ -198,3 +204,51 @@ class StoreSecret(show.ShowOne):
secret_type=args.secret_type)
entity.store()
return entity._get_formatted_entity()
class CreateConsumer(command.Command):
"""Create a consumer for a secret."""
def get_parser(self, prog_name):
parser = super(CreateConsumer, self).get_parser(prog_name)
parser.add_argument('URI', help='The URI reference for the secret')
parser.add_argument('--service-type-name', '-s', required=True,
help='the service that will consume the secret')
parser.add_argument('--resource-type', '-t', required=True,
help='the type of resource that will consume '
'the secret')
parser.add_argument('--resource-id', '-i', required=True,
help='the id of the resource that will consume '
'the secret')
return parser
def take_action(self, args):
self.app.client_manager.key_manager.secrets.register_consumer(
args.URI,
args.service_type_name,
args.resource_type,
args.resource_id)
class DeleteConsumer(command.Command):
"""Delete a consumer from a secret."""
def get_parser(self, prog_name):
parser = super(DeleteConsumer, self).get_parser(prog_name)
parser.add_argument('URI', help='The URI reference for the secret')
parser.add_argument('--service-type-name', '-s', required=True,
help='the service that is consuming the secret')
parser.add_argument('--resource-type', '-t', required=True,
help='the type of resource that is consuming '
'the secret')
parser.add_argument('--resource-id', '-i', required=True,
help='the id of the resource that is consuming '
'the secret')
return parser
def take_action(self, args):
self.app.client_manager.key_manager.secrets.remove_consumer(
args.URI,
args.service_type_name,
args.resource_type,
args.resource_id)

View File

@ -110,3 +110,28 @@ def get_server_supported_versions(min_version, max_version):
if min_version and max_version:
return get_custom_current_response(min_version, max_version)
return STABLE_RESPONSE
def mock_get_secret_for_client(client, consumers=[]):
api_get_return = {
'created': '2022-11-25T15:17:56',
'updated': '2022-11-25T15:17:56',
'status': 'ACTIVE',
'name': 'Dummy secret',
'secret_type': 'opaque',
'expiration': None,
'algorithm': None,
'bit_length': None,
'mode': None,
'creator_id': '8ddfdbc4d92440369569af0589a20fa4',
'consumers': consumers or [],
'content_types': {'default': 'text/plain'},
'secret_ref': 'http://192.168.1.23/key-manager/v1/'
'secrets/d46cfe10-c8ba-452f-a82f-a06834e45604'
}
client.client.get = mock.MagicMock()
client.client.get.return_value = api_get_return
def mock_delete_secret_for_responses(responses, entity_href):
responses.delete(entity_href, status_code=204)

View File

@ -0,0 +1,129 @@
# Copyright 2022 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from barbicanclient import client
from barbicanclient.tests import test_client
from barbicanclient.tests.utils import mock_delete_secret_for_responses
from barbicanclient.tests.utils import mock_get_secret_for_client
from barbicanclient.tests.v1.test_secrets import SecretData
from barbicanclient.v1 import secrets
from oslo_serialization import jsonutils
class WhenTestingConsumers(test_client.BaseEntityResource):
def setUp(self):
self._setUp('secrets')
self.secret = SecretData()
self.client_v1_0 = client.Client(
endpoint=self.endpoint, project_id=self.project_id,
microversion='1.0')
self.manager = self.client.secrets
self.manager_v1_0 = self.client_v1_0.secrets
self.consumers_post_resource = self.entity_href + '/consumers/'
self.consumers_delete_resource = self.entity_href + '/consumers'
def test_register_consumer_fails_with_lower_microversion(self):
self.assertRaises(
NotImplementedError, self.manager_v1_0.register_consumer,
self.entity_href, self.secret.consumer.get('service'),
self.secret.consumer.get('resource_type'),
self.secret.consumer.get('resource_id'))
def _register_consumer(self):
data = self.secret.get_dict(
self.entity_href, consumers=[self.secret.consumer])
self.responses.post(self.entity_href + '/consumers/', json=data)
return self.manager.register_consumer(
self.entity_href, self.secret.consumer.get('service'),
self.secret.consumer.get('resource_type'),
self.secret.consumer.get('resource_id'))
def test_should_register_consumer_with_correct_microversion(self):
self._register_consumer()
def test_should_register_consumer_and_return_secret(self):
self.assertIsInstance(self._register_consumer(), secrets.Secret)
def test_should_register_consumer_with_correct_secret_href(self):
secret = self._register_consumer()
self.assertEqual(self.entity_href, secret.secret_ref)
def test_should_register_consumer_with_correct_url(self):
self._register_consumer()
self.assertEqual(
self.consumers_post_resource, self.responses.last_request.url)
def test_should_register_consumer_with_consumer(self):
secret = self._register_consumer()
self.assertEqual([self.secret.consumer], secret.consumers)
def test_remove_consumer_fails_with_lower_microversion(self):
self.assertRaises(
NotImplementedError, self.manager_v1_0.remove_consumer,
self.entity_href, self.secret.consumer.get('service'),
self.secret.consumer.get('resource_type'),
self.secret.consumer.get('resource_id'))
def _remove_consumer(self):
self.responses.delete(self.entity_href + '/consumers', status_code=204)
self.manager.remove_consumer(
self.entity_href, self.secret.consumer.get('service'),
self.secret.consumer.get('resource_type'),
self.secret.consumer.get('resource_id'))
def test_should_remove_consumer_with_correct_microversion(self):
self._remove_consumer()
def test_should_remove_consumer_with_correct_url(self):
self._remove_consumer()
self.assertEqual(
self.consumers_delete_resource, self.responses.last_request.url)
def test_should_remove_consumer_with_correct_consumer(self):
self._remove_consumer()
self.assertEqual(
self.consumers_delete_resource, self.responses.last_request.url)
body = jsonutils.loads(self.responses.last_request.text)
self.assertEqual(self.secret.consumer, body)
def _delete_from_manager(self, secret_ref, force=False, consumers=[]):
mock_get_secret_for_client(self.client, consumers=consumers)
mock_delete_secret_for_responses(self.responses, self.entity_href)
self.manager.delete(secret_ref=secret_ref, force=force)
def _delete_from_manager_with_consumers(self, secret_ref, force=False):
consumers = [{'service': 'service_test',
'resource_type': 'type_test',
'resource_id': 'id_test'}]
self._delete_from_manager(secret_ref, force=force, consumers=consumers)
def test_delete_from_manager_fails_with_consumers_without_force(self):
self.assertRaises(
ValueError,
self._delete_from_manager_with_consumers, self.entity_href,
force=False)
def test_should_delete_from_manager_with_consumers_and_force(self):
self._delete_from_manager_with_consumers(self.entity_href, force=True)
def test_should_delete_from_manager_without_consumers_and_force(self):
self._delete_from_manager(self.entity_href, force=True)

View File

@ -21,6 +21,7 @@ from oslo_utils import timeutils
from barbicanclient import base
from barbicanclient import exceptions
from barbicanclient.tests import test_client
from barbicanclient.tests.utils import mock_get_secret_for_client
from barbicanclient.v1 import acls
from barbicanclient.v1 import secrets
@ -409,22 +410,26 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_decrypt(bad_href)
def test_should_delete_from_manager(self, secret_ref=None):
secret_ref = secret_ref or self.entity_href
def _mock_delete_secret(self):
self.responses.delete(self.entity_href, status_code=204)
self.manager.delete(secret_ref=secret_ref)
def _delete_from_manager(self, secret_ref, force=False):
mock_get_secret_for_client(self.client)
self._mock_delete_secret()
self.manager.delete(secret_ref=secret_ref, force=force)
# Verify the correct URL was used to make the call.
self.assertEqual(self.entity_href, self.responses.last_request.url)
def test_should_delete_from_manager(self):
self._delete_from_manager(self.entity_href)
def test_should_delete_from_manager_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_delete_from_manager(bad_href)
self._delete_from_manager(secret_ref=bad_href)
def test_should_delete_from_manager_using_only_uuid(self):
self.test_should_delete_from_manager(self.entity_id)
self._delete_from_manager(secret_ref=self.entity_id)
def test_should_delete_from_object(self, secref_ref=None):
secref_ref = secref_ref or self.entity_href
@ -567,39 +572,6 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
def test_should_fail_delete_no_href(self):
self.assertRaises(ValueError, self.manager.delete, None)
def test_should_register_consumer(self):
data = self.secret.get_dict(self.entity_href,
consumers=[self.secret.consumer])
self.responses.post(self.entity_href + '/consumers/', json=data)
secret = self.manager.register_consumer(
self.entity_href, self.secret.consumer.get('service'),
self.secret.consumer.get('resource_type'),
self.secret.consumer.get('resource_id')
)
self.assertIsInstance(secret, secrets.Secret)
self.assertEqual(self.entity_href, secret.secret_ref)
body = jsonutils.loads(self.responses.last_request.text)
self.assertEqual(self.consumers_post_resource,
self.responses.last_request.url)
self.assertEqual(self.secret.consumer, body)
self.assertEqual([self.secret.consumer], secret.consumers)
def test_should_remove_consumer(self):
self.responses.delete(self.entity_href + '/consumers', status_code=204)
self.manager.remove_consumer(
self.entity_href, self.secret.consumer.get('service'),
self.secret.consumer.get('resource_type'),
self.secret.consumer.get('resource_id')
)
body = jsonutils.loads(self.responses.last_request.text)
self.assertEqual(self.consumers_delete_resource,
self.responses.last_request.url)
self.assertEqual(self.secret.consumer, body)
def test_should_get_total(self):
self.responses.get(self.entity_base, json={'total': 1})
total = self.manager.total()

View File

@ -536,10 +536,11 @@ class SecretManager(base.BaseEntityManager):
algorithm=algorithm, bit_length=bit_length, mode=mode,
secret_type=secret_type, expiration=expiration)
def delete(self, secret_ref):
def delete(self, secret_ref, force=False):
"""Delete a Secret from Barbican
:param secret_ref: Full HATEOAS reference to a Secret, or a UUID
:param force: When true, forces the deletion of secrets with consumers
:raises barbicanclient.exceptions.HTTPAuthError: 401 Responses
:raises barbicanclient.exceptions.HTTPClientError: 4xx Responses
:raises barbicanclient.exceptions.HTTPServerError: 5xx Responses
@ -547,8 +548,16 @@ class SecretManager(base.BaseEntityManager):
base.validate_ref_and_return_uuid(secret_ref, 'Secret')
if not secret_ref:
raise ValueError('secret_ref is required.')
secret_object = self.get(secret_ref=secret_ref)
uuid_ref = base.calculate_uuid_ref(secret_ref, self._entity)
# If secret has no consumers OR
# if secret has consumers but force==True, then delete it.
if not secret_object.consumers or force:
self._api.delete(uuid_ref)
else:
raise ValueError(
"Secret has consumers! Remove them first or use the force "
"parameter to delete it.")
def list(self, limit=10, offset=0, name=None, algorithm=None, mode=None,
bits=0, secret_type=None, created=None, updated=None,
@ -617,6 +626,12 @@ class SecretManager(base.BaseEntityManager):
for s in response.get('secrets', [])
]
def _enforce_microversion(self):
if self._api.microversion == "1.0":
raise NotImplementedError(
"Server does not support secret consumers. Minimum "
"key-manager microversion required: 1.1")
def register_consumer(self, secret_ref, service, resource_type,
resource_id):
"""Add a consumer to the secret
@ -635,10 +650,7 @@ class SecretManager(base.BaseEntityManager):
'{0} of service {1} for resource type {2}'
'with resource id {3}'.format(secret_ref, service,
resource_type, resource_id))
if self._api.microversion == (1, 0):
raise NotImplementedError(
"Server does not support secret consumers. Minimum "
"key-manager microversion required: 1.1")
self._enforce_microversion()
secret_uuid = base.validate_ref_and_return_uuid(
secret_ref, 'Secret')
href = '{0}/{1}/consumers'.format(self._entity, secret_uuid)
@ -666,10 +678,7 @@ class SecretManager(base.BaseEntityManager):
'{0} of service {1} for resource type {2}'
'with resource id {3}'.format(secret_ref, service,
resource_type, resource_id))
if self._api.microversion == (1, 0):
raise NotImplementedError(
"Server does not support secret consumers. Minimum "
"key-manager microversion required: 1.1")
self._enforce_microversion()
secret_uuid = base.validate_ref_and_return_uuid(
secret_ref, 'secret')
href = '{0}/{1}/consumers'.format(self._entity, secret_uuid)

View File

@ -0,0 +1,48 @@
# Copyright 2022 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from functionaltests.cli.v1.behaviors import base_behaviors
class ConsumerBehaviors(base_behaviors.BaseBehaviors):
def __init__(self):
super(ConsumerBehaviors, self).__init__()
self.LOG = logging.getLogger(type(self).__name__)
def register_consumer(self, secret_href, service, resource_type,
resource_id):
argv = ['secret', 'consumer', 'create']
self.add_auth_and_endpoint(argv)
argv.extend(['--service-type-name', service])
argv.extend(['--resource-type', resource_type])
argv.extend(['--resource-id', resource_id])
argv.extend([secret_href])
stdout, stderr = self.issue_barbican_command(argv)
def remove_consumer(self, secret_href, service, resource_type,
resource_id):
argv = ['secret', 'consumer', 'delete']
self.add_auth_and_endpoint(argv)
argv.extend(['--service-type-name', service])
argv.extend(['--resource-type', resource_type])
argv.extend(['--resource-id', resource_id])
argv.extend([secret_href])
stdout, stderr = self.issue_barbican_command(argv)

View File

@ -46,7 +46,7 @@ class SecretBehaviors(base_behaviors.BaseBehaviors):
:param secret_href the href to the secret to delete
"""
argv = ['secret', 'delete']
argv = ['secret', 'delete', '--force']
self.add_auth_and_endpoint(argv)
argv.extend([secret_href])

View File

@ -288,6 +288,83 @@ class SecretsTestCase(base.TestCase):
)
self.assertIn('remove_consumer() missing', str(e))
@testcase.attr('positive')
def test_secret_delete_without_consumers_no_force(self):
new_secret = self.barbicanclient.secrets.create(
**secret_create_defaults_data)
secret_ref = self.cleanup.add_entity(new_secret)
self.barbicanclient.secrets.delete(secret_ref, force=False)
resp = self.barbicanclient.secrets.get(secret_ref)
self.assertRaises(exceptions.HTTPClientError, getattr, resp, "name")
self.cleanup.delete_entity(secret_ref)
@testcase.attr('positive')
def test_secret_delete_without_consumers_with_force(self):
new_secret = self.barbicanclient.secrets.create(
**secret_create_defaults_data)
secret_ref = self.cleanup.add_entity(new_secret)
self.barbicanclient.secrets.delete(secret_ref, force=True)
resp = self.barbicanclient.secrets.get(secret_ref)
self.assertRaises(exceptions.HTTPClientError, getattr, resp, "name")
self.cleanup.delete_entity(secret_ref)
@testcase.attr('negative')
def test_secret_delete_with_consumers_no_force(self):
"""Deleting a secret with consumers.
Tries to delete a secret with consumers, but
without providing the 'force' parameter.
"""
new_secret = self.barbicanclient.secrets.create(
**secret_create_defaults_data)
secret_ref = self.cleanup.add_entity(new_secret)
self.assertIsNotNone(secret_ref)
secret = self.barbicanclient.secrets.register_consumer(
secret_ref,
service="service1",
resource_type="type1",
resource_id="id1"
)
self.assertEqual(secret_ref, secret.secret_ref)
e = self.assertRaises(ValueError, self.barbicanclient.secrets.delete,
secret.secret_ref)
self.assertIn("Secret has consumers! Remove them first or use the "
"force parameter to delete it.", str(e))
@testcase.attr('positive')
def test_secret_delete_with_consumers_with_force(self):
"""Deleting a secret with consumers.
Tries to delete a secret with consumers,
making the 'force' parameter equals True.
"""
new_secret = self.barbicanclient.secrets.create(
**secret_create_defaults_data)
secret_ref = self.cleanup.add_entity(new_secret)
self.assertIsNotNone(secret_ref)
secret = self.barbicanclient.secrets.register_consumer(
secret_ref,
service="service1",
resource_type="type1",
resource_id="id1"
)
self.assertEqual(secret_ref, secret.secret_ref)
self.barbicanclient.secrets.delete(secret.secret_ref, True)
resp = self.barbicanclient.secrets.get(secret_ref)
self.assertRaises(exceptions.HTTPClientError, getattr, resp, "name")
self.cleanup.delete_entity(secret_ref)
@testcase.attr('negative')
def test_secret_delete_doesnt_exist(self):
"""Deletes a non-existent secret.

View File

@ -58,6 +58,24 @@ class CleanUp(object):
self.created_entities[entity_type].append(entity_ref)
return entity_ref
def delete_entity(self, entity):
"""Deletes an entity from Barbican
Used for testing. Individually deletes an entity.
"""
entity_type = entity.lower()
if 'acl' in entity_type:
entity_type = 'acl'
elif 'secret' in entity_type:
entity_type = 'secret'
elif 'container' in entity_type:
entity_type = 'container'
else:
entity_type = 'order'
self.created_entities[entity_type].remove(entity)
def _delete_all_containers(self):
"""Helper method to delete all containers used for testing"""
for container_ref in self.created_entities['container']:
@ -66,7 +84,7 @@ class CleanUp(object):
def _delete_all_secrets(self):
"""Helper method to delete all secrets used for testing"""
for secret_ref in self.created_entities['secret']:
self.barbicanclient.secrets.delete(secret_ref)
self.barbicanclient.secrets.delete(secret_ref, True)
def _delete_all_acls(self):
"""Helper method to delete all acls used for testing"""

View File

@ -49,6 +49,9 @@ openstack.key_manager.v1 =
secret_container_list = barbicanclient.barbican_cli.v1.containers:ListContainer
secret_container_create = barbicanclient.barbican_cli.v1.containers:CreateContainer
secret_consumer_create = barbicanclient.barbican_cli.v1.secrets:CreateConsumer
secret_consumer_delete = barbicanclient.barbican_cli.v1.secrets:DeleteConsumer
ca_get = barbicanclient.barbican_cli.v1.cas:GetCA
ca_list = barbicanclient.barbican_cli.v1.cas:ListCA