Add secret consumers Python API

This adds the Python API for secret consumers,
similar to the container consumers API.
Also adding functional tests for both secret
and container consumers.

Co-Authored-By: Grzegorz Grasza <xek@redhat.com>
Change-Id: I024db27d12ea33713bda0273d8748577cc89a38d
This commit is contained in:
Mauricio Harley 2022-09-05 17:17:50 +02:00 committed by Grzegorz Grasza
parent 35599e2b98
commit 3ffa1600af
4 changed files with 300 additions and 1 deletions

View File

@ -32,18 +32,24 @@ class SecretData(object):
self.payload_content_type = 'text/plain'
self.algorithm = 'AES'
self.created = str(timeutils.utcnow())
self.consumer = {'service': 'service_test',
'resource_type': 'type_test',
'resource_id': 'id_test'}
self.secret_dict = {'name': self.name,
'status': 'ACTIVE',
'algorithm': self.algorithm,
'created': self.created}
def get_dict(self, secret_ref=None, content_types_dict=None):
def get_dict(self, secret_ref=None, content_types_dict=None,
consumers=None):
secret = self.secret_dict
if secret_ref:
secret['secret_ref'] = secret_ref
if content_types_dict:
secret['content_types'] = content_types_dict
if consumers:
secret['consumers'] = consumers
return secret
@ -55,6 +61,9 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
self.secret = SecretData()
self.manager = self.client.secrets
self.consumers_post_resource = self.entity_href + '/consumers/'
self.consumers_delete_resource = self.entity_href + '/consumers'
def test_should_entity_str(self):
secret_obj = self.manager.create(name=self.secret.name)
self.assertIn(self.secret.name, str(secret_obj))
@ -558,6 +567,39 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
def test_should_fail_delete_no_href(self):
self.assertRaises(ValueError, self.manager.delete, None)
def test_should_register_consumer(self):
data = self.secret.get_dict(self.entity_href,
consumers=[self.secret.consumer])
self.responses.post(self.entity_href + '/consumers/', json=data)
secret = self.manager.register_consumer(
self.entity_href, self.secret.consumer.get('service'),
self.secret.consumer.get('resource_type'),
self.secret.consumer.get('resource_id')
)
self.assertIsInstance(secret, secrets.Secret)
self.assertEqual(self.entity_href, secret.secret_ref)
body = jsonutils.loads(self.responses.last_request.text)
self.assertEqual(self.consumers_post_resource,
self.responses.last_request.url)
self.assertEqual(self.secret.consumer, body)
self.assertEqual([self.secret.consumer], secret.consumers)
def test_should_remove_consumer(self):
self.responses.delete(self.entity_href + '/consumers', status_code=204)
self.manager.remove_consumer(
self.entity_href, self.secret.consumer.get('service'),
self.secret.consumer.get('resource_type'),
self.secret.consumer.get('resource_id')
)
body = jsonutils.loads(self.responses.last_request.text)
self.assertEqual(self.consumers_delete_resource,
self.responses.last_request.url)
self.assertEqual(self.secret.consumer, body)
def test_should_get_total(self):
self.responses.get(self.entity_base, json={'total': 1})
total = self.manager.total()

View File

@ -616,3 +616,67 @@ class SecretManager(base.BaseEntityManager):
Secret(api=self._api, **s)
for s in response.get('secrets', [])
]
def register_consumer(self, secret_ref, service, resource_type,
resource_id):
"""Add a consumer to the secret
:param secret_ref: Full HATEOAS reference to a secret, or a UUID
:param service: Name of the consuming service
:param resource_type: Type of the consuming resource
:param resource_id: ID of the consuming resource
:returns: A secret object per the get() method
:raises barbicanclient.exceptions.HTTPAuthError: 401 Responses
:raises barbicanclient.exceptions.HTTPClientError: 4xx Responses
:raises barbicanclient.exceptions.HTTPServerError: 5xx Responses
:raises NotImplementedError: When using microversion 1.0
"""
LOG.debug('Creating consumer registration for secret '
'{0} of service {1} for resource type {2}'
'with resource id {3}'.format(secret_ref, service,
resource_type, resource_id))
if self._api.microversion == (1, 0):
raise NotImplementedError(
"Server does not support secret consumers. Minimum "
"key-manager microversion required: 1.1")
secret_uuid = base.validate_ref_and_return_uuid(
secret_ref, 'Secret')
href = '{0}/{1}/consumers'.format(self._entity, secret_uuid)
consumer_dict = dict()
consumer_dict['service'] = service
consumer_dict['resource_type'] = resource_type
consumer_dict['resource_id'] = resource_id
response = self._api.post(href, json=consumer_dict)
return Secret(api=self._api, **response)
def remove_consumer(self, secret_ref, service,
resource_type, resource_id):
"""Remove a consumer from the secret
:param secret_ref: Full HATEOAS reference to a secret, or a UUID
:param service: Name of the previously consuming service
:param resource_type: type of the previously consuming resource
:param resource_id: ID of the previously consuming resource
:raises barbicanclient.exceptions.HTTPAuthError: 401 Responses
:raises barbicanclient.exceptions.HTTPClientError: 4xx Responses
:raises barbicanclient.exceptions.HTTPServerError: 5xx Responses
"""
LOG.debug('Deleting consumer registration for secret '
'{0} of service {1} for resource type {2}'
'with resource id {3}'.format(secret_ref, service,
resource_type, resource_id))
if self._api.microversion == (1, 0):
raise NotImplementedError(
"Server does not support secret consumers. Minimum "
"key-manager microversion required: 1.1")
secret_uuid = base.validate_ref_and_return_uuid(
secret_ref, 'secret')
href = '{0}/{1}/consumers'.format(self._entity, secret_uuid)
consumer_dict = {
'service': service,
'resource_type': resource_type,
'resource_id': resource_id
}
self._api.delete(href, json=consumer_dict)

View File

@ -251,6 +251,44 @@ class GenericContainersTestCase(BaseContainersTestCase):
self.assertIsNotNone(container_entity.acls.read)
self.assertEqual([], container_entity.acls.read.users)
@utils.parameterized_dataset({
'remove_one': [[{'name': 'ab', 'URL': 'http://c.d/e/1'},
{'name': 'ab', 'URL': 'http://c.d/e/2'}],
[{'name': 'ab', 'URL': 'http://c.d/e/1'}]],
'remove_all': [[{'name': 'ab', 'URL': 'http://c.d/e/1'},
{'name': 'ab', 'URL': 'http://c.d/e/2'}],
[{'name': 'ab', 'URL': 'http://c.d/e/1'},
{'name': 'ab', 'URL': 'http://c.d/e/2'}]]
})
@testcase.attr('positive')
def test_container_create_and_registering_removing_consumers(
self,
register_consumers,
remove_consumers):
new_container = self.barbicanclient.containers.create(
**create_container_defaults_data)
container_ref = self.cleanup.add_entity(new_container)
self.assertIsNotNone(container_ref)
for consumer in register_consumers:
container = self.barbicanclient.containers.register_consumer(
container_ref, consumer['name'], consumer['URL'])
self.assertEqual(container_ref, container.container_ref)
self.assertCountEqual(register_consumers, container.consumers)
for consumer in remove_consumers:
self.barbicanclient.containers.remove_consumer(
container_ref, consumer['name'], consumer['URL'])
container = self.barbicanclient.containers.get(container_ref)
removed_urls = set([v['URL'] for v in remove_consumers])
remaining_consumers = [v for v in register_consumers
if v['URL'] not in removed_urls]
self.assertCountEqual(remaining_consumers, container.consumers)
@utils.parameterized_test_case
class RSAContainersTestCase(BaseContainersTestCase):

View File

@ -133,6 +133,161 @@ class SecretsTestCase(base.TestCase):
resp = self.barbicanclient.secrets.get(secret_ref)
self.assertEqual(secret.mode, resp.mode)
@utils.parameterized_dataset({
'remove_one': [[{'service': 'service_test1',
'resource_type': 'type_test1',
'resource_id': 'id_test1'},
{'service': 'service_test2',
'resource_type': 'type_test2',
'resource_id': 'id_test2'}],
[{'service': 'service_test1',
'resource_type': 'type_test1',
'resource_id': 'id_test1'}]],
'remove_all': [[{'service': 'service_test1',
'resource_type': 'type_test1',
'resource_id': 'id_test1'},
{'service': 'service_test2',
'resource_type': 'type_test2',
'resource_id': 'id_test2'}],
[{'service': 'service_test1',
'resource_type': 'type_test1',
'resource_id': 'id_test1'},
{'service': 'service_test2',
'resource_type': 'type_test2',
'resource_id': 'id_test2'}]],
'add_duplicate_remove_one': [[{'service': 'service_test1',
'resource_type': 'type_test1',
'resource_id': 'id_test1'},
{'service': 'service_test1',
'resource_type': 'type_test1',
'resource_id': 'id_test1'},
{'service': 'service_test2',
'resource_type': 'type_test2',
'resource_id': 'id_test2'}],
[{'service': 'service_test1',
'resource_type': 'type_test1',
'resource_id': 'id_test1'}]]
})
@testcase.attr('positive')
def test_secret_create_and_registering_removing_consumers(
self,
register_consumers,
remove_consumers):
"""The following activities are carried:
Create a secret, then register each consumer
in the register_consumers list, then remove each consumer
in the remove_consumers list.
"""
new_secret = self.barbicanclient.secrets.create(
**secret_create_defaults_data)
secret_ref = self.cleanup.add_entity(new_secret)
self.assertIsNotNone(secret_ref)
for consumer in register_consumers:
secret = self.barbicanclient.secrets.register_consumer(
secret_ref, **consumer)
self.assertEqual(secret_ref, secret.secret_ref)
# We expect that duplicate calls to register_consumers don't
# create new consumers even though the API returns HTTP 200 OK
deduplicated_consumers_count = len(set(
[c['resource_id'] for c in register_consumers]))
self.assertEqual(deduplicated_consumers_count,
len(secret.consumers))
for consumer in remove_consumers:
self.barbicanclient.secrets.remove_consumer(
secret_ref, **consumer)
secret = self.barbicanclient.secrets.get(secret_ref)
removed_ids = set([v['resource_id'] for v in remove_consumers])
remaining_consumers = [v for v in register_consumers
if v['resource_id'] not in removed_ids]
self.assertCountEqual(remaining_consumers, secret.consumers)
@utils.parameterized_dataset({
'no_args': [[{}]],
'one_arg_1': [[{'service': 'service1'}]],
'one_arg_2': [[{'resource_type': 'type1'}]],
'one_arg_3': [[{'resource_id': 'id1'}]],
'two_args_1': [[{'service': 'service1',
'resource_type': 'type1'}]],
'two_args_2': [[{'service': 'service1',
'resource_id': 'id1'}]],
'two_args_3': [[{'resource_type': 'type1',
'resource_id': 'id'}]]
})
@testcase.attr('negative')
def test_consumer_register_missing_positional_arguments(
self,
register_consumers):
"""Missing Positional Arguments - Registration
Tries to register a secret consumer without
providing all of the required positional arguments
(service, resource_type, resource_id).
"""
new_secret = self.barbicanclient.secrets.create(
**secret_create_defaults_data)
secret_ref = self.cleanup.add_entity(new_secret)
self.assertIsNotNone(secret_ref)
for consumer in register_consumers:
e = self.assertRaises(
TypeError,
self.barbicanclient.secrets.register_consumer,
secret_ref, **consumer
)
self.assertIn('register_consumer() missing', str(e))
@utils.parameterized_dataset({
'no_args': [[{}]],
'one_arg_1': [[{'service': 'service1'}]],
'one_arg_2': [[{'resource_type': 'type1'}]],
'one_arg_3': [[{'resource_id': 'id1'}]],
'two_args_1': [[{'service': 'service1',
'resource_type': 'type1'}]],
'two_args_2': [[{'service': 'service1',
'resource_id': 'id1'}]],
'two_args_3': [[{'resource_type': 'type1',
'resource_id': 'id'}]]
})
@testcase.attr('negative')
def test_consumer_remove_missing_positional_arguments(
self,
register_consumers):
"""Missing Positional Arguments - Removal
Tries to remove a secret consumer without
providing all of the required positional arguments
(service, resource_type, resource_id).
"""
new_secret = self.barbicanclient.secrets.create(
**secret_create_defaults_data)
secret_ref = self.cleanup.add_entity(new_secret)
self.assertIsNotNone(secret_ref)
secret = self.barbicanclient.secrets.register_consumer(
secret_ref,
service="service1",
resource_type="type1",
resource_id="id1"
)
self.assertEqual(secret_ref, secret.secret_ref)
for consumer in register_consumers:
e = self.assertRaises(
TypeError,
self.barbicanclient.secrets.remove_consumer,
secret_ref, **consumer
)
self.assertIn('remove_consumer() missing', str(e))
@testcase.attr('negative')
def test_secret_delete_doesnt_exist(self):
"""Deletes a non-existent secret.