Allow fetching by UUID, and respect interface

When passing a UUID to the client, use the Barbican endpoint from
the service catalog to fetch the entity. When passing an href, strip
everything before the UUID and use it the same as a passed UUID.

This allows for service usage when secrets are created with a public
endpoint but must be retrieved from an internal or admin endpoint,
and is probably how all usage should have worked to begin with.

Change-Id: I90778a2eeefc4cfe42b0e2a48ba09036e3e6d83d
Story: 2003197
Task: 23353
This commit is contained in:
Adam Harwell 2018-08-02 08:53:32 +09:00
parent 1a8cc8c1eb
commit 6651c8ffce
14 changed files with 366 additions and 96 deletions

View File

@ -15,9 +15,13 @@
"""
Base utilities to build API operation managers.
"""
import logging
import uuid
LOG = logging.getLogger(__name__)
def filter_null_keys(dictionary):
return dict(((k, v) for k, v in dictionary.items() if v is not None))
@ -30,19 +34,26 @@ def censored_copy(data_dict, censor_keys):
data_dict.items()}
def validate_ref(ref, entity):
"""Verifies that there is a real uuid at the end of the uri
def validate_ref_and_return_uuid(ref, entity):
"""Verifies that there is a real uuid (possibly at the end of a uri)
:return: Returns True for easier testing
:return: The uuid.UUID object
:raises ValueError: If it cannot correctly parse the uuid in the ref.
"""
try:
_, entity_uuid = ref.rstrip('/').rsplit('/', 1)
uuid.UUID(entity_uuid)
# This works for a ref *or* a UUID, since we just pick the last piece
ref_pieces = ref.rstrip('/').rsplit('/', 1)
return uuid.UUID(ref_pieces[-1])
except Exception:
raise ValueError('{0} incorrectly specified.'.format(entity))
return True
def calculate_uuid_ref(ref, entity):
entity_uuid = validate_ref_and_return_uuid(
ref, entity.capitalize().rstrip('s'))
entity_ref = "{entity}/{uuid}".format(entity=entity, uuid=entity_uuid)
LOG.info("Calculated %s uuid ref: %s", entity.capitalize(), entity_ref)
return entity_ref
class ImmutableException(Exception):

View File

@ -14,6 +14,7 @@
# limitations under the License.
import testtools
import uuid
import barbicanclient
from barbicanclient import base
@ -23,12 +24,21 @@ from barbicanclient import version
class TestValidateRef(testtools.TestCase):
def test_valid_ref(self):
ref = 'http://localhost/ff2ca003-5ebb-4b61-8a17-3f9c54ef6356'
self.assertTrue(base.validate_ref(ref, 'Thing'))
secret_uuid = uuid.uuid4()
ref = 'http://localhost/' + str(secret_uuid)
self.assertEqual(secret_uuid,
base.validate_ref_and_return_uuid(ref, 'Thing'))
def test_valid_uuid(self):
secret_uuid = uuid.uuid4()
self.assertEqual(secret_uuid,
base.validate_ref_and_return_uuid(str(secret_uuid),
'Thing'))
def test_invalid_uuid(self):
ref = 'http://localhost/not_a_uuid'
self.assertRaises(ValueError, base.validate_ref, ref, 'Thing')
self.assertRaises(ValueError, base.validate_ref_and_return_uuid, ref,
'Thing')
def test_censored_copy(self):
d1 = {'a': '1', 'password': 'my_password', 'payload': 'my_key',

View File

@ -25,12 +25,13 @@ class ACLTestCase(test_client.BaseEntityResource):
def setUp(self):
self._setUp('acl', entity_id='d9f95d61-8863-49d3-a045-5c2cb77130b5')
self.secret_ref = (self.endpoint +
'/secrets/8a3108ec-88fc-4f5c-86eb-f37b8ae8358e')
self.secret_uuid = '8a3108ec-88fc-4f5c-86eb-f37b8ae8358e'
self.secret_ref = (self.endpoint + '/v1/secrets/' + self.secret_uuid)
self.secret_acl_ref = '{0}/acl'.format(self.secret_ref)
self.container_ref = (self.endpoint + '/containers/'
'83c302c7-86fe-4f07-a277-c4962f121f19')
self.container_uuid = '83c302c7-86fe-4f07-a277-c4962f121f19'
self.container_ref = (self.endpoint + '/v1/containers/' +
self.container_uuid)
self.container_acl_ref = '{0}/acl'.format(self.container_ref)
self.manager = self.client.acls
@ -54,16 +55,22 @@ class ACLTestCase(test_client.BaseEntityResource):
class WhenTestingACLManager(ACLTestCase):
def test_should_get_secret_acl(self):
def test_should_get_secret_acl(self, entity_ref=None):
entity_ref = entity_ref or self.secret_ref
self.responses.get(self.secret_acl_ref,
json=self.get_acl_response_data())
api_resp = self.manager.get(entity_ref=self.secret_ref)
api_resp = self.manager.get(entity_ref=entity_ref)
self.assertEqual(self.secret_acl_ref,
self.responses.last_request.url)
self.assertFalse(api_resp.get('read').project_access)
self.assertEqual('read', api_resp.get('read').operation_type)
self.assertEqual(self.secret_acl_ref, api_resp.get('read').acl_ref)
self.assertIn(api_resp.get('read').acl_ref_relative,
self.secret_acl_ref)
def test_should_get_secret_acl_using_stripped_uuid(self):
bad_href = "http://badsite.com/secrets/" + self.secret_uuid
self.test_should_get_secret_acl(bad_href)
def test_should_get_secret_acl_with_extra_trailing_slashes(self):
self.responses.get(requests_mock.ANY,
@ -73,16 +80,22 @@ class WhenTestingACLManager(ACLTestCase):
self.assertEqual(self.secret_acl_ref,
self.responses.last_request.url)
def test_should_get_container_acl(self):
def test_should_get_container_acl(self, entity_ref=None):
entity_ref = entity_ref or self.container_ref
self.responses.get(self.container_acl_ref,
json=self.get_acl_response_data())
api_resp = self.manager.get(entity_ref=self.container_ref)
api_resp = self.manager.get(entity_ref=entity_ref)
self.assertEqual(self.container_acl_ref,
self.responses.last_request.url)
self.assertFalse(api_resp.get('read').project_access)
self.assertEqual('read', api_resp.get('read').operation_type)
self.assertEqual(self.container_acl_ref, api_resp.get('read').acl_ref)
self.assertIn(api_resp.get('read').acl_ref_relative,
self.container_acl_ref)
def test_should_get_container_acl_using_stripped_uuid(self):
bad_href = "http://badsite.com/containers/" + self.container_uuid
self.test_should_get_container_acl(bad_href)
def test_should_get_container_acl_with_trailing_slashes(self):
self.responses.get(requests_mock.ANY,
@ -122,20 +135,26 @@ class WhenTestingACLManager(ACLTestCase):
read_acl_via_get = entity.get('read')
self.assertEqual(read_acl, read_acl_via_get)
def test_should_create_acl_with_users(self):
entity = self.manager.create(entity_ref=self.container_ref + '///',
def test_should_create_acl_with_users(self, entity_ref=None):
entity_ref = entity_ref or self.container_ref
entity = self.manager.create(entity_ref=entity_ref + '///',
users=self.users2, project_access=False)
self.assertIsInstance(entity, acls.ContainerACL)
# entity ref is kept same as provided input.
self.assertEqual(self.container_ref + '///', entity.entity_ref)
self.assertEqual(entity_ref + '///', entity.entity_ref)
read_acl = entity.read
self.assertFalse(read_acl.project_access)
self.assertEqual(self.users2, read_acl.users)
self.assertEqual(acls.DEFAULT_OPERATION_TYPE, read_acl.operation_type)
# acl ref removes extra trailing slashes if there
self.assertIn(self.container_ref, read_acl.acl_ref,
self.assertIn(entity_ref, read_acl.acl_ref,
'ACL ref has additional /acl')
self.assertIn(read_acl.acl_ref_relative, self.container_acl_ref)
def test_should_create_acl_with_users_stripped_uuid(self):
bad_href = "http://badsite.com/containers/" + self.container_uuid
self.test_should_create_acl_with_users(bad_href)
def test_should_create_acl_with_no_users(self):
entity = self.manager.create(entity_ref=self.container_ref, users=[])
@ -163,18 +182,23 @@ class WhenTestingACLManager(ACLTestCase):
class WhenTestingACLEntity(ACLTestCase):
def test_should_submit_acl_with_users_project_access_set(self):
def test_should_submit_acl_with_users_project_access_set(self, href=None):
href = href or self.secret_ref
data = {'acl_ref': self.secret_acl_ref}
# register put acl URI with expected acl ref in response
self.responses.put(self.secret_acl_ref, json=data)
entity = self.manager.create(entity_ref=self.secret_ref + '///',
entity = self.manager.create(entity_ref=href + '///',
users=self.users1, project_access=True)
api_resp = entity.submit()
self.assertEqual(self.secret_acl_ref, api_resp)
self.assertEqual(self.secret_acl_ref,
self.responses.last_request.url)
def test_should_submit_acl_with_users_project_access_stripped_uuid(self):
bad_href = "http://badsite.com/secrets/" + self.secret_uuid
self.test_should_submit_acl_with_users_project_access_set(bad_href)
def test_should_submit_acl_with_project_access_set_but_no_users(self):
data = {'acl_ref': self.secret_acl_ref}
# register put acl URI with expected acl ref in response
@ -368,10 +392,11 @@ class WhenTestingACLEntity(ACLTestCase):
self.assertIsNone(data[4]) # updated
self.assertEqual(self.container_acl_ref, data[5])
def test_should_secret_acl_remove(self):
def test_should_secret_acl_remove(self, entity_ref=None):
entity_ref = entity_ref or self.secret_ref
self.responses.delete(self.secret_acl_ref)
entity = self.manager.create(entity_ref=self.secret_ref,
entity = self.manager.create(entity_ref=entity_ref,
users=self.users2)
api_resp = entity.remove()
@ -391,14 +416,23 @@ class WhenTestingACLEntity(ACLTestCase):
self.responses.delete(self.container_acl_ref)
def test_should_container_acl_remove(self):
def test_should_secret_acl_remove_stripped_uuid(self):
bad_href = "http://badsite.com/secrets/" + self.secret_uuid
self.test_should_secret_acl_remove(bad_href)
def test_should_container_acl_remove(self, entity_ref=None):
entity_ref = entity_ref or self.container_ref
self.responses.delete(self.container_acl_ref)
entity = self.manager.create(entity_ref=self.container_ref)
entity = self.manager.create(entity_ref=entity_ref)
entity.remove()
self.assertEqual(self.container_acl_ref,
self.responses.last_request.url)
def test_should_container_acl_remove_stripped_uuid(self):
bad_href = "http://badsite.com/containers/" + self.container_uuid
self.test_should_container_acl_remove(bad_href)
def test_should_fail_acl_remove_invalid_uri(self):
# secret_acl URI expected and not secret acl URI
entity = self.manager.create(entity_ref=self.secret_acl_ref)

View File

@ -55,13 +55,15 @@ class WhenTestingCAs(test_client.BaseEntityResource):
self.ca = CAData()
self.manager = self.client.cas
def test_should_get_lazy(self):
data = self.ca.get_dict(self.entity_href)
def test_should_get_lazy(self, ca_ref=None):
ca_ref = ca_ref or self.entity_href
data = self.ca.get_dict(ca_ref)
m = self.responses.get(self.entity_href, json=data)
ca = self.manager.get(ca_ref=self.entity_href)
ca = self.manager.get(ca_ref=ca_ref)
self.assertIsInstance(ca, cas.CA)
self.assertEqual(self.entity_href, ca._ca_ref)
self.assertEqual(ca_ref, ca._ca_ref)
# Verify GET wasn't called yet
self.assertFalse(m.called)
@ -72,6 +74,13 @@ class WhenTestingCAs(test_client.BaseEntityResource):
# Verify the correct URL was used to make the GET call
self.assertEqual(self.entity_href, m.last_request.url)
def test_should_get_lazy_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_get_lazy(bad_href)
def test_should_get_lazy_using_only_uuid(self):
self.test_should_get_lazy(self.entity_id)
def test_should_get_lazy_in_meta(self):
data = self.ca.get_dict(self.entity_href)
m = self.responses.get(self.entity_href, json=data)

View File

@ -367,13 +367,15 @@ class WhenTestingContainers(test_client.BaseEntityResource):
except AttributeError:
pass
def test_should_get_generic_container(self):
data = self.container.get_dict(self.entity_href)
def test_should_get_generic_container(self, container_ref=None):
container_ref = container_ref or self.entity_href
data = self.container.get_dict(container_ref)
self.responses.get(self.entity_href, json=data)
container = self.manager.get(container_ref=self.entity_href)
container = self.manager.get(container_ref=container_ref)
self.assertIsInstance(container, containers.Container)
self.assertEqual(self.entity_href, container.container_ref)
self.assertEqual(container_ref, container.container_ref)
# Verify the correct URL was used to make the call.
self.assertEqual(self.entity_href, self.responses.last_request.url)
@ -414,21 +416,39 @@ class WhenTestingContainers(test_client.BaseEntityResource):
self.assertIsNotNone(container.public_key)
self.assertIsNotNone(container.private_key_passphrase)
def test_should_delete_from_manager(self):
def test_should_get_generic_container_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_get_generic_container(bad_href)
def test_should_get_generic_container_using_only_uuid(self):
self.test_should_get_generic_container(self.entity_id)
def test_should_delete_from_manager(self, container_ref=None):
container_ref = container_ref or self.entity_href
self.responses.delete(self.entity_href, status_code=204)
self.manager.delete(container_ref=self.entity_href)
self.manager.delete(container_ref=container_ref)
# Verify the correct URL was used to make the call.
self.assertEqual(self.entity_href, self.responses.last_request.url)
def test_should_delete_from_object(self):
data = self.container.get_dict(self.entity_href)
def test_should_delete_from_manager_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_delete_from_manager(bad_href)
def test_should_delete_from_manager_using_only_uuid(self):
self.test_should_delete_from_manager(self.entity_id)
def test_should_delete_from_object(self, container_ref=None):
container_ref = container_ref or self.entity_href
data = self.container.get_dict(container_ref)
m = self.responses.get(self.entity_href, json=data)
n = self.responses.delete(self.entity_href, status_code=204)
container = self.manager.get(container_ref=self.entity_href)
self.assertIsNotNone(container.container_ref)
container = self.manager.get(container_ref=container_ref)
self.assertEqual(container_ref, container.container_ref)
container.delete()
@ -439,6 +459,13 @@ class WhenTestingContainers(test_client.BaseEntityResource):
# Verify that the Container no longer has a container_ref
self.assertIsNone(container.container_ref)
def test_should_delete_from_object_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_delete_from_object(bad_href)
def test_should_delete_from_object_using_only_uuid(self):
self.test_should_delete_from_object(self.entity_id)
def test_should_store_after_delete_from_object(self):
data = self.container.get_dict(self.entity_href)
self.responses.get(self.entity_href, json=data)

View File

@ -198,6 +198,34 @@ class WhenTestingKeyOrders(OrdersTestCase):
except AttributeError:
pass
def test_should_delete_from_object(self, order_ref=None):
order_ref = order_ref or self.entity_href
data = {'order_ref': order_ref}
self.responses.post(self.entity_base + '/', json=data)
self.responses.delete(self.entity_href, status_code=204)
order = self.manager.create_key(
name='name',
algorithm='algorithm',
payload_content_type='payload_content_type'
)
order_href = order.submit()
self.assertEqual(order_ref, order_href)
order.delete()
# Verify the correct URL was used to make the call.
self.assertEqual(self.entity_href, self.responses.last_request.url)
def test_should_delete_from_object_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_delete_from_object(bad_href)
def test_should_delete_from_object_using_only_uuid(self):
self.test_should_delete_from_object(self.entity_id)
class WhenTestingAsymmetricOrders(OrdersTestCase):
@ -265,16 +293,25 @@ class WhenTestingAsymmetricOrders(OrdersTestCase):
class WhenTestingOrderManager(OrdersTestCase):
def test_should_get(self):
def test_should_get(self, order_ref=None):
order_ref = order_ref or self.entity_href
self.responses.get(self.entity_href, text=self.key_order_data)
order = self.manager.get(order_ref=self.entity_href)
order = self.manager.get(order_ref=order_ref)
self.assertIsInstance(order, orders.KeyOrder)
self.assertEqual(self.entity_href, order.order_ref)
# Verify the correct URL was used to make the call.
self.assertEqual(self.entity_href, self.responses.last_request.url)
def test_should_get_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_get(bad_href)
def test_should_get_using_only_uuid(self):
self.test_should_get(self.entity_id)
def test_should_get_invalid_meta(self):
self.responses.get(self.entity_href, text=self.key_order_invalid_data)
@ -300,14 +337,22 @@ class WhenTestingOrderManager(OrdersTestCase):
self.assertEqual(['10'], self.responses.last_request.qs['limit'])
self.assertEqual(['5'], self.responses.last_request.qs['offset'])
def test_should_delete(self):
def test_should_delete(self, order_ref=None):
order_ref = order_ref or self.entity_href
self.responses.delete(self.entity_href, status_code=204)
self.manager.delete(order_ref=self.entity_href)
self.manager.delete(order_ref=order_ref)
# Verify the correct URL was used to make the call.
self.assertEqual(self.entity_href, self.responses.last_request.url)
def test_should_delete_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_delete(bad_href)
def test_should_delete_using_only_uuid(self):
self.test_should_delete(self.entity_id)
def test_should_fail_delete_no_href(self):
self.assertRaises(ValueError, self.manager.delete, None)
@ -330,16 +375,25 @@ class WhenTestingOrderManager(OrdersTestCase):
class WhenTestingCertificateOrders(OrdersTestCase):
def test_get(self):
def test_get(self, order_ref=None):
order_ref = order_ref or self.entity_href
self.responses.get(self.entity_href, text=self.cert_order_data)
order = self.manager.get(order_ref=self.entity_href)
order = self.manager.get(order_ref=order_ref)
self.assertIsInstance(order, orders.CertificateOrder)
self.assertEqual(self.entity_href, order.order_ref)
# Verify the correct URL was used to make the call.
self.assertEqual(self.entity_href, self.responses.last_request.url)
def test_get_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_get(bad_href)
def test_get_using_only_uuid(self):
self.test_get(self.entity_id)
def test_repr(self):
order_args = self._get_order_args(self.cert_order_data)
order_obj = orders.CertificateOrder(api=None, **order_args)

View File

@ -219,13 +219,15 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
except AttributeError:
pass
def test_should_get_lazy(self):
data = self.secret.get_dict(self.entity_href)
def test_should_get_lazy(self, secret_ref=None):
secret_ref = secret_ref or self.entity_href
data = self.secret.get_dict(secret_ref)
m = self.responses.get(self.entity_href, json=data)
secret = self.manager.get(secret_ref=self.entity_href)
secret = self.manager.get(secret_ref=secret_ref)
self.assertIsInstance(secret, secrets.Secret)
self.assertEqual(self.entity_href, secret.secret_ref)
self.assertEqual(secret_ref, secret.secret_ref)
# Verify GET wasn't called yet
self.assertFalse(m.called)
@ -236,6 +238,13 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
# Verify the correct URL was used to make the GET call
self.assertEqual(self.entity_href, m.last_request.url)
def test_should_get_lazy_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_get_lazy(bad_href)
def test_should_get_lazy_using_only_uuid(self):
self.test_should_get_lazy(self.entity_id)
def test_should_get_acls_lazy(self):
data = self.secret.get_dict(self.entity_href)
m = self.responses.get(self.entity_href, json=data)
@ -388,22 +397,81 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
self.assertEqual(self.entity_payload_href,
decryption_response.last_request.url)
def test_should_delete(self):
def test_should_delete_from_manager(self, secret_ref=None):
secret_ref = secret_ref or self.entity_href
self.responses.delete(self.entity_href, status_code=204)
self.manager.delete(secret_ref=self.entity_href)
self.manager.delete(secret_ref=secret_ref)
# Verify the correct URL was used to make the call.
self.assertEqual(self.entity_href, self.responses.last_request.url)
def test_should_update(self):
data = {'secret_ref': self.entity_href}
def test_should_delete_from_manager_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_delete_from_manager(bad_href)
def test_should_delete_from_manager_using_only_uuid(self):
self.test_should_delete_from_manager(self.entity_id)
def test_should_delete_from_object(self, secref_ref=None):
secref_ref = secref_ref or self.entity_href
data = {'secret_ref': secref_ref}
self.responses.post(self.entity_base + '/', json=data)
secret = self.manager.create()
secret.payload = None
secret.store()
# Verify the secret has the correct ref for testing deletes
self.assertEqual(secref_ref, secret.secret_ref)
self.responses.delete(self.entity_href, status_code=204)
secret.delete()
# Verify the correct URL was used to make the call.
self.assertEqual(self.entity_href, self.responses.last_request.url)
def test_should_delete_from_object_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_delete_from_object(bad_href)
def test_should_delete_from_object_using_only_uuid(self):
self.test_should_delete_from_object(self.entity_id)
def test_should_update_from_manager(self, secret_ref=None):
# This literal will have type(unicode) in Python 2, but will have
# type(str) in Python 3. It is six.text_type in both cases.
text_payload = u'time for an ice cold \U0001f37a'
secret_ref = secret_ref or self.entity_href
self.responses.put(self.entity_href, status_code=204)
self.manager.update(secret_ref=secret_ref, payload=text_payload)
# Verify the correct URL was used to make the call.
self.assertEqual(self.entity_href, self.responses.last_request.url)
def test_should_update_from_manager_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_update_from_manager(bad_href)
def test_should_update_from_manager_using_only_uuid(self):
self.test_should_update_from_manager(self.entity_id)
def test_should_update_from_object(self, secref_ref=None):
secref_ref = secref_ref or self.entity_href
data = {'secret_ref': secref_ref}
self.responses.post(self.entity_base + '/', json=data)
secret = self.manager.create()
secret.payload = None
secret.store()
# Verify the secret has the correct ref for testing updates
self.assertEqual(secref_ref, secret.secret_ref)
# This literal will have type(unicode) in Python 2, but will have
# type(str) in Python 3. It is six.text_type in both cases.
text_payload = u'time for an ice cold \U0001f37a'
@ -419,6 +487,13 @@ class WhenTestingSecrets(test_client.BaseEntityResource):
# Verify that the data has been updated
self.assertEqual(text_payload, secret.payload)
def test_should_update_from_object_using_stripped_uuid(self):
bad_href = "http://badsite.com/" + self.entity_id
self.test_should_update_from_object(bad_href)
def test_should_update_from_object_using_only_uuid(self):
self.test_should_update_from_object(self.entity_id)
def test_should_get_list(self):
secret_resp = self.secret.get_dict(self.entity_href)

View File

@ -85,10 +85,18 @@ class _PerOperationACL(ACLFormatter):
def acl_ref(self):
return ACL.get_acl_ref_from_entity_ref(self.entity_ref)
@property
def acl_ref_relative(self):
return self._parent_acl.acl_ref_relative
@property
def entity_ref(self):
return self._entity_ref
@property
def entity_uuid(self):
return self._parent_acl.entity_uuid
@property
def project_access(self):
"""Flag indicating project access behavior is enabled or not"""
@ -203,6 +211,12 @@ class ACL(object):
"""Entity URI reference."""
return self._entity_ref
@property
def entity_uuid(self):
"""Entity UUID"""
return str(base.validate_ref_and_return_uuid(
self._entity_ref, self._acl_type))
@property
def operation_acls(self):
"""List of operation specific ACL settings."""
@ -212,6 +226,11 @@ class ACL(object):
def acl_ref(self):
return ACL.get_acl_ref_from_entity_ref(self.entity_ref)
@property
def acl_ref_relative(self):
return ACL.get_acl_ref_from_entity_ref_relative(
self.entity_uuid, self._parent_entity_path)
def add_operation_acl(self, users=None, project_access=None,
operation_type=None, created=None,
updated=None,):
@ -301,7 +320,7 @@ class ACL(object):
acl_data['users'] = per_op_acl.users
acl_dict[op_type] = acl_data
response = self._api.put(self.acl_ref, json=acl_dict)
response = self._api.put(self.acl_ref_relative, json=acl_dict)
return response.json().get('acl_ref')
@ -314,7 +333,7 @@ class ACL(object):
self.validate_input_ref()
LOG.debug('Removing ACL for {0} for href: {1}'
.format(self.acl_type, self.entity_ref))
self._api.delete(self.acl_ref)
self._api.delete(self.acl_ref_relative)
def load_acls_data(self):
"""Loads ACL entity from Barbican server using its acl_ref
@ -328,7 +347,7 @@ class ACL(object):
:raises barbicanclient.exceptions.HTTPServerError: 5xx Responses
"""
response = self._api.get(self.acl_ref)
response = self._api.get(self.acl_ref_relative)
del self.operation_acls[:] # clearing list for all of its references
for op_type in response:
@ -354,7 +373,7 @@ class ACL(object):
else:
raise ValueError('{0} URI is not specified.'.format(res_title))
base.validate_ref(self.entity_ref, ref_type)
base.validate_ref_and_return_uuid(self.entity_ref, ref_type)
return ref_type
@staticmethod
@ -364,6 +383,14 @@ class ACL(object):
entity_ref = entity_ref.rstrip('/')
return '{0}/{1}'.format(entity_ref, ACL._resource_name)
@staticmethod
def get_acl_ref_from_entity_ref_relative(entity_ref, entity_type):
# Utility for converting entity ref to acl ref
if entity_ref:
entity_ref = entity_ref.rstrip('/')
return '{0}/{1}/{2}'.format(entity_type, entity_ref,
ACL._resource_name)
@staticmethod
def identify_ref_type(entity_ref):
# Utility for identifying ACL type from given entity URI.

View File

@ -171,7 +171,8 @@ class CA(CAFormatter):
def _fill_lazy_properties(self):
if self._ca_ref and not self._plugin_name:
result = self._api.get(self._ca_ref)
uuid_ref = base.calculate_uuid_ref(self._ca_ref, self._entity)
result = self._api.get(uuid_ref)
self._fill_from_data(
meta=result.get('meta'),
expiration=result.get('expiration'),
@ -205,7 +206,7 @@ class CAManager(base.BaseEntityManager):
:raises barbicanclient.exceptions.HTTPServerError: 5xx Responses
"""
LOG.debug("Getting ca - CA href: {0}".format(ca_ref))
base.validate_ref(ca_ref, 'CA')
base.validate_ref_and_return_uuid(ca_ref, 'CA')
return CA(
api=self._api,
ca_ref=ca_ref

View File

@ -211,7 +211,9 @@ class Container(ContainerFormatter):
def delete(self):
"""Delete container from Barbican"""
if self._container_ref:
self._api.delete(self._container_ref)
uuid_ref = base.calculate_uuid_ref(self._container_ref,
self._entity)
self._api.delete(uuid_ref)
self._container_ref = None
self._status = None
self._created = None
@ -235,9 +237,10 @@ class Container(ContainerFormatter):
raise AttributeError("container_ref not set, cannot reload data.")
LOG.debug('Getting container - Container href: {0}'
.format(self._container_ref))
base.validate_ref(self._container_ref, 'Container')
uuid_ref = base.calculate_uuid_ref(self._container_ref,
self._entity)
try:
response = self._api.get(self._container_ref)
response = self._api.get(uuid_ref)
except AttributeError:
raise LookupError('Container {0} could not be found.'
.format(self._container_ref))
@ -530,14 +533,14 @@ class ContainerManager(base.BaseEntityManager):
def get(self, container_ref):
"""Retrieve an existing Container from Barbican
:param str container_ref: Full HATEOAS reference to a Container
:param container_ref: Full HATEOAS reference to a Container, or a UUID
:returns: Container object or a subclass of the appropriate type
"""
LOG.debug('Getting container - Container href: {0}'
.format(container_ref))
base.validate_ref(container_ref, 'Container')
uuid_ref = base.calculate_uuid_ref(container_ref, self._entity)
try:
response = self._api.get(container_ref)
response = self._api.get(uuid_ref)
except AttributeError:
raise LookupError('Container {0} could not be found.'
.format(container_ref))
@ -688,14 +691,15 @@ class ContainerManager(base.BaseEntityManager):
def delete(self, container_ref):
"""Delete a Container from Barbican
:param container_ref: Full HATEOAS reference to a Container
:param container_ref: Full HATEOAS reference to a Container, or a UUID
:raises barbicanclient.exceptions.HTTPAuthError: 401 Responses
:raises barbicanclient.exceptions.HTTPClientError: 4xx Responses
:raises barbicanclient.exceptions.HTTPServerError: 5xx Responses
"""
if not container_ref:
raise ValueError('container_ref is required.')
self._api.delete(container_ref)
uuid_ref = base.calculate_uuid_ref(container_ref, self._entity)
self._api.delete(uuid_ref)
def list(self, limit=10, offset=0, name=None, type=None):
"""List containers for the project.
@ -728,7 +732,7 @@ class ContainerManager(base.BaseEntityManager):
def register_consumer(self, container_ref, name, url):
"""Add a consumer to the container
:param container_ref: Full HATEOAS reference to a Container
:param container_ref: Full HATEOAS reference to a Container, or a UUID
:param name: Name of the consuming service
:param url: URL of the consuming resource
:returns: A container object per the get() method
@ -738,8 +742,9 @@ class ContainerManager(base.BaseEntityManager):
"""
LOG.debug('Creating consumer registration for container '
'{0} as {1}: {2}'.format(container_ref, name, url))
href = '{0}/{1}/consumers'.format(self._entity,
container_ref.split('/')[-1])
container_uuid = base.validate_ref_and_return_uuid(
container_ref, 'Container')
href = '{0}/{1}/consumers'.format(self._entity, container_uuid)
consumer_dict = dict()
consumer_dict['name'] = name
consumer_dict['URL'] = url
@ -750,7 +755,7 @@ class ContainerManager(base.BaseEntityManager):
def remove_consumer(self, container_ref, name, url):
"""Remove a consumer from the container
:param container_ref: Full HATEOAS reference to a Container
:param container_ref: Full HATEOAS reference to a Container, or a UUID
:param name: Name of the previously consuming service
:param url: URL of the previously consuming resource
:raises barbicanclient.exceptions.HTTPAuthError: 401 Responses
@ -759,8 +764,9 @@ class ContainerManager(base.BaseEntityManager):
"""
LOG.debug('Deleting consumer registration for container '
'{0} as {1}: {2}'.format(container_ref, name, url))
href = '{0}/{1}/consumers'.format(self._entity,
container_ref.split('/')[-1])
container_uuid = base.validate_ref_and_return_uuid(
container_ref, 'Container')
href = '{0}/{1}/consumers'.format(self._entity, container_uuid)
consumer_dict = {
'name': name,
'URL': url

View File

@ -241,7 +241,8 @@ class Order(object):
def delete(self):
"""Deletes the Order from Barbican"""
if self._order_ref:
self._api.delete(self._order_ref)
uuid_ref = base.calculate_uuid_ref(self._order_ref, self._entity)
self._api.delete(uuid_ref)
self._order_ref = None
else:
raise LookupError("Order is not yet stored.")
@ -388,16 +389,16 @@ class OrderManager(base.BaseEntityManager):
def get(self, order_ref):
"""Retrieve an existing Order from Barbican
:param order_ref: Full HATEOAS reference to an Order
:param order_ref: Full HATEOAS reference to an Order, or a UUID
:returns: An instance of the appropriate subtype of Order
:raises barbicanclient.exceptions.HTTPAuthError: 401 Responses
:raises barbicanclient.exceptions.HTTPClientError: 4xx Responses
:raises barbicanclient.exceptions.HTTPServerError: 5xx Responses
"""
LOG.debug("Getting order - Order href: {0}".format(order_ref))
base.validate_ref(order_ref, 'Order')
uuid_ref = base.calculate_uuid_ref(order_ref, self._entity)
try:
response = self._api.get(order_ref)
response = self._api.get(uuid_ref)
except AttributeError:
raise LookupError(
'Order {0} could not be found.'.format(order_ref)
@ -518,11 +519,12 @@ class OrderManager(base.BaseEntityManager):
def delete(self, order_ref):
"""Delete an Order from Barbican
:param order_ref: The href for the order
:param order_ref: Full HATEOAS reference to an Order, or a UUID
"""
if not order_ref:
raise ValueError('order_ref is required.')
self._api.delete(order_ref)
uuid_ref = base.calculate_uuid_ref(order_ref, self._entity)
self._api.delete(uuid_ref)
def list(self, limit=10, offset=0):
"""List Orders for the project

View File

@ -357,14 +357,16 @@ class Secret(SecretFormatter):
else:
raise exceptions.PayloadException("Invalid Payload Type")
self._api.put(self._secret_ref,
uuid_ref = base.calculate_uuid_ref(self._secret_ref, self._entity)
self._api.put(uuid_ref,
headers=headers,
data=self.payload)
def delete(self):
"""Deletes the Secret from Barbican"""
if self._secret_ref:
self._api.delete(self._secret_ref)
uuid_ref = base.calculate_uuid_ref(self._secret_ref, self._entity)
self._api.delete(uuid_ref)
self._secret_ref = None
else:
raise LookupError("Secret is not yet stored.")
@ -411,7 +413,8 @@ class Secret(SecretFormatter):
def _fill_lazy_properties(self):
if self._secret_ref and not self._name:
result = self._api.get(self._secret_ref)
uuid_ref = base.calculate_uuid_ref(self._secret_ref, self._entity)
result = self._api.get(uuid_ref)
self._fill_from_data(
name=result.get('name'),
expiration=result.get('expiration'),
@ -444,7 +447,7 @@ class SecretManager(base.BaseEntityManager):
def get(self, secret_ref, payload_content_type=None):
"""Retrieve an existing Secret from Barbican
:param str secret_ref: Full HATEOAS reference to a Secret
:param str secret_ref: Full HATEOAS reference to a Secret, or a UUID
:param str payload_content_type: DEPRECATED: Content type to use for
payload decryption. Setting this can lead to unexpected results.
See Launchpad Bug #1419166.
@ -455,7 +458,7 @@ class SecretManager(base.BaseEntityManager):
:raises barbicanclient.exceptions.HTTPServerError: 5xx Responses
"""
LOG.debug("Getting secret - Secret href: {0}".format(secret_ref))
base.validate_ref(secret_ref, 'Secret')
base.validate_ref_and_return_uuid(secret_ref, 'Secret')
return Secret(
api=self._api,
payload_content_type=payload_content_type,
@ -463,16 +466,16 @@ class SecretManager(base.BaseEntityManager):
)
def update(self, secret_ref, payload=None):
"""Update an existing Secret from Barbican
"""Update an existing Secret in Barbican
:param str secret_ref: Full HATEOAS reference to a Secret
:param str secret_ref: Full HATEOAS reference to a Secret, or a UUID
:param str payload: New payload to add to secret
:raises barbicanclient.exceptions.HTTPAuthError: 401 Responses
:raises barbicanclient.exceptions.HTTPClientError: 4xx Responses
:raises barbicanclient.exceptions.HTTPServerError: 5xx Responses
"""
base.validate_ref(secret_ref, 'Secret')
base.validate_ref_and_return_uuid(secret_ref, 'Secret')
if not secret_ref:
raise ValueError('secret_ref is required.')
@ -483,7 +486,8 @@ class SecretManager(base.BaseEntityManager):
else:
raise exceptions.PayloadException("Invalid Payload Type")
self._api.put(secret_ref,
uuid_ref = base.calculate_uuid_ref(secret_ref, self._entity)
self._api.put(uuid_ref,
headers=headers,
data=payload)
@ -524,15 +528,16 @@ class SecretManager(base.BaseEntityManager):
def delete(self, secret_ref):
"""Delete a Secret from Barbican
:param secret_ref: The href for the secret to be deleted
:param secret_ref: Full HATEOAS reference to a Secret, or a UUID
:raises barbicanclient.exceptions.HTTPAuthError: 401 Responses
:raises barbicanclient.exceptions.HTTPClientError: 4xx Responses
:raises barbicanclient.exceptions.HTTPServerError: 5xx Responses
"""
base.validate_ref(secret_ref, 'Secret')
base.validate_ref_and_return_uuid(secret_ref, 'Secret')
if not secret_ref:
raise ValueError('secret_ref is required.')
self._api.delete(secret_ref)
uuid_ref = base.calculate_uuid_ref(secret_ref, self._entity)
self._api.delete(uuid_ref)
def list(self, limit=10, offset=0, name=None, algorithm=None, mode=None,
bits=0, secret_type=None, created=None, updated=None,

View File

@ -0,0 +1,8 @@
---
features:
- |
Lookups (for all Read/Update/Delete actions) are now performed using only
the UUID of the entity. For backward compatability, full HATEOS refs may
be used, but everything before the UUID will be stripped and the service
catalog entry for Barbican will be substituted. This should have no impact
on accessing existing secrets with any version of Barbican.

View File

@ -22,6 +22,7 @@ commands =
coverage html -d cover
coverage xml -o cover/coverage.xml
coverage report -m
whitelist_externals = rm
[testenv:debug]
basepython = python3