Initial change to add secret consumers

This adds consumers to the objects. Unit tests are
also covered.

Co-Authored-By: Ade Lee <alee@redhat.com>
Co-Authored-By: Mauricio Harley <mharley@redhat.com>
Change-Id: I598209e30d8f0e4515292b1f8c9a89aa952bac4e
This commit is contained in:
Grzegorz Grasza 2023-01-20 10:25:05 +01:00
parent fb6b880eaf
commit fe10397ac0
14 changed files with 169 additions and 40 deletions

View File

@ -57,6 +57,7 @@ class Key(managed_object.ManagedObject):
dict_fields["algorithm"] = self.algorithm
dict_fields["bit_length"] = self.bit_length
dict_fields["consumers"] = self.consumers
return dict_fields
@ -78,6 +79,7 @@ class Key(managed_object.ManagedObject):
name=dict_fields["name"],
created=dict_fields["created"],
id=id,
consumers=dict_fields["consumers"]
)
except KeyError as e:
raise exception.InvalidManagedObjectDictError(field=str(e))

View File

@ -29,12 +29,13 @@ from castellan.common import exception
class ManagedObject(object, metaclass=abc.ABCMeta):
"""Base class to represent all managed objects."""
def __init__(self, name=None, created=None, id=None):
def __init__(self, name=None, created=None, id=None, consumers=[]):
"""Managed Object
:param name: the name of the managed object.
:param created: the time a managed object was created.
:param id: the ID of the object, generated after storing the object.
:param consumers: the list of object's consumers.
"""
self._name = name
@ -46,6 +47,7 @@ class ManagedObject(object, metaclass=abc.ABCMeta):
type(created))
self._id = id
self._consumers = consumers
@property
def id(self):
@ -73,6 +75,14 @@ class ManagedObject(object, metaclass=abc.ABCMeta):
"""
return self._created
@property
def consumers(self):
"""Returns the list of consumers for this object.
Returns the object's consumers or [] if the object does not have any.
"""
return self._consumers
@property
@abc.abstractmethod
def format(self):
@ -111,7 +121,8 @@ class ManagedObject(object, metaclass=abc.ABCMeta):
pass
@classmethod
def from_dict(cls, dict_fields, id=None, metadata_only=False):
def from_dict(cls, dict_fields, id=None, metadata_only=False,
consumers=[]):
"""Returns an instance of this class based on a dict object.
:param dict_fields: The dictionary containing all necessary params
@ -119,6 +130,7 @@ class ManagedObject(object, metaclass=abc.ABCMeta):
:param id: The optional param 'id' to be passed to the constructor.
:param metadata_only: A switch to create an instance with metadata
only, without the secret itself.
:param consumers: A list with object's consumers.
"""
try:
value = None
@ -134,6 +146,7 @@ class ManagedObject(object, metaclass=abc.ABCMeta):
name=dict_fields["name"],
created=dict_fields["created"],
id=id,
consumers=consumers
)
except KeyError as e:
raise exception.InvalidManagedObjectDictError(field=str(e))
@ -159,4 +172,5 @@ class ManagedObject(object, metaclass=abc.ABCMeta):
"name": self.name,
"created": self.created,
"value": value,
"consumers": self.consumers
}

View File

@ -25,13 +25,14 @@ from castellan.common.objects import managed_object
class OpaqueData(managed_object.ManagedObject):
"""This class represents opaque data."""
def __init__(self, data, name=None, created=None, id=None):
def __init__(self, data, name=None, created=None, id=None, consumers=[]):
"""Create a new OpaqueData object.
Expected type for data is a bytestring.
"""
self._data = data
super().__init__(name=name, created=created, id=id)
super().__init__(name=name, created=created, id=id,
consumers=consumers)
@classmethod
def managed_type(cls):

View File

@ -25,13 +25,15 @@ from castellan.common.objects import managed_object
class Passphrase(managed_object.ManagedObject):
"""This class represents a passphrase."""
def __init__(self, passphrase, name=None, created=None, id=None):
def __init__(self, passphrase, name=None, created=None, id=None,
consumers=[]):
"""Create a new Passphrase object.
The expected type for the passphrase is a bytestring.
"""
self._passphrase = passphrase
super().__init__(name=name, created=created, id=id)
super().__init__(name=name, created=created, id=id,
consumers=consumers)
@classmethod
def managed_type(cls):

View File

@ -26,7 +26,7 @@ class PrivateKey(key.Key):
"""This class represents private keys."""
def __init__(self, algorithm, bit_length, key,
name=None, created=None, id=None):
name=None, created=None, id=None, consumers=[]):
"""Create a new PrivateKey object.
The arguments specify the algorithm and bit length for the asymmetric
@ -35,7 +35,8 @@ class PrivateKey(key.Key):
self._alg = algorithm
self._bit_length = bit_length
self._key = key
super().__init__(name=name, created=created, id=id)
super().__init__(name=name, created=created, id=id,
consumers=consumers)
@classmethod
def managed_type(cls):

View File

@ -26,7 +26,7 @@ class PublicKey(key.Key):
"""This class represents public keys."""
def __init__(self, algorithm, bit_length, key,
name=None, created=None, id=None):
name=None, created=None, id=None, consumers=[]):
"""Create a new PublicKey object.
The arguments specify the algorithm and bit length for the asymmetric
@ -36,7 +36,8 @@ class PublicKey(key.Key):
self._alg = algorithm
self._bit_length = bit_length
self._key = key
super().__init__(name=name, created=created, id=id)
super().__init__(name=name, created=created, id=id,
consumers=consumers)
@classmethod
def managed_type(cls):

View File

@ -26,7 +26,7 @@ class SymmetricKey(key.Key):
"""This class represents symmetric keys."""
def __init__(self, algorithm, bit_length, key,
name=None, created=None, id=None):
name=None, created=None, id=None, consumers=[]):
"""Create a new SymmetricKey object.
The arguments specify the algorithm and bit length for the symmetric
@ -35,7 +35,8 @@ class SymmetricKey(key.Key):
self._alg = algorithm
self._bit_length = bit_length
self._key = key
super().__init__(name=name, created=created, id=id)
super().__init__(name=name, created=created, id=id,
consumers=consumers)
@classmethod
def managed_type(cls):

View File

@ -25,13 +25,14 @@ from castellan.common.objects import certificate
class X509(certificate.Certificate):
"""This class represents X.509 certificates."""
def __init__(self, data, name=None, created=None, id=None):
def __init__(self, data, name=None, created=None, id=None, consumers=[]):
"""Create a new X509 object.
The data should be in a bytestring.
"""
self._data = data
super().__init__(name=name, created=created, id=id)
super().__init__(name=name, created=created, id=id,
consumers=consumers)
@classmethod
def managed_type(cls):

View File

@ -26,12 +26,16 @@ class OpaqueDataTestCase(base.TestCase):
def _create_data(self):
return opaque_data.OpaqueData(self.data,
self.name,
self.created)
self.created,
consumers=self.consumers)
def setUp(self):
self.data = bytes(b"secret opaque data")
self.name = 'my opaque'
self.created = 1448088699
self.consumers = [{'service': 'service_test',
'resource_type': 'type_test',
'resource_id': 'id_test'}]
self.opaque_data = self._create_data()
super(OpaqueDataTestCase, self).setUp()
@ -55,11 +59,15 @@ class OpaqueDataTestCase(base.TestCase):
def test_get_created(self):
self.assertEqual(self.created, self.opaque_data.created)
def test_get_consumers(self):
self.assertEqual(self.consumers, self.opaque_data.consumers)
def test_get_created_none(self):
created = None
data = opaque_data.OpaqueData(self.data,
self.name,
created)
created,
consumers=self.consumers)
self.assertEqual(created, data.created)

View File

@ -26,12 +26,16 @@ class PassphraseTestCase(base.TestCase):
def _create_passphrase(self):
return passphrase.Passphrase(self.passphrase_data,
self.name,
self.created)
self.created,
consumers=self.consumers)
def setUp(self):
self.passphrase_data = bytes(b"secret passphrase")
self.name = 'my phrase'
self.created = 1448088699
self.consumers = [{'service': 'service_test',
'resource_type': 'type_test',
'resource_id': 'id_test'}]
self.passphrase = self._create_passphrase()
super(PassphraseTestCase, self).setUp()
@ -55,11 +59,15 @@ class PassphraseTestCase(base.TestCase):
def test_get_created(self):
self.assertEqual(self.created, self.passphrase.created)
def test_get_consumers(self):
self.assertEqual(self.consumers, self.passphrase.consumers)
def test_get_created_none(self):
created = None
phrase = passphrase.Passphrase(self.passphrase_data,
self.name,
created)
created,
consumers=self.consumers)
self.assertEqual(created, phrase.created)

View File

@ -29,7 +29,8 @@ class PrivateKeyTestCase(base.KeyTestCase):
self.bit_length,
self.encoded,
self.name,
self.created)
self.created,
consumers=self.consumers)
def setUp(self):
self.algorithm = 'RSA'
@ -37,6 +38,9 @@ class PrivateKeyTestCase(base.KeyTestCase):
self.encoded = bytes(utils.get_private_key_der())
self.name = 'my key'
self.created = 1448088699
self.consumers = [{'service': 'service_test',
'resource_type': 'type_test',
'resource_id': 'id_test'}]
super(PrivateKeyTestCase, self).setUp()
@ -69,13 +73,17 @@ class PrivateKeyTestCase(base.KeyTestCase):
def test_get_created(self):
self.assertEqual(self.created, self.key.created)
def test_get_consumers(self):
self.assertEqual(self.consumers, self.key.consumers)
def test_get_created_none(self):
created = None
key = private_key.PrivateKey(self.algorithm,
self.bit_length,
self.encoded,
self.name,
created)
created,
consumers=self.consumers)
self.assertEqual(created, key.created)
@ -88,7 +96,8 @@ class PrivateKeyTestCase(base.KeyTestCase):
other_key = private_key.PrivateKey(self.algorithm,
self.bit_length,
self.encoded)
self.encoded,
consumers=self.consumers)
self.assertTrue(self.key == other_key)
self.assertFalse(self.key is other_key)
@ -100,14 +109,16 @@ class PrivateKeyTestCase(base.KeyTestCase):
other_key = private_key.PrivateKey('DSA',
self.bit_length,
self.encoded,
self.name)
self.name,
consumers=self.consumers)
self.assertTrue(self.key != other_key)
def test___ne___bit_length(self):
other_key = private_key.PrivateKey(self.algorithm,
4096,
self.encoded,
self.name)
self.name,
consumers=self.consumers)
self.assertTrue(self.key != other_key)
def test___ne___encoded(self):
@ -115,9 +126,22 @@ class PrivateKeyTestCase(base.KeyTestCase):
other_key = private_key.PrivateKey(self.algorithm,
self.bit_length,
different_encoded,
self.name)
self.name,
consumers=self.consumers)
self.assertTrue(self.key != other_key)
def test___ne___consumers(self):
different_consumers = [{'service': 'other_service',
'resource_type': 'other_type',
'resource_id': 'other_id'}]
other_key = private_key.PrivateKey(self.algorithm,
self.bit_length,
self.encoded,
self.name,
consumers=different_consumers)
self.assertTrue(self.key is not other_key)
def test_to_and_from_dict(self):
other = objects.from_dict(self.key.to_dict())
self.assertEqual(self.key, other)

View File

@ -29,7 +29,8 @@ class PublicKeyTestCase(base.KeyTestCase):
self.bit_length,
self.encoded,
self.name,
self.created)
self.created,
consumers=self.consumers)
def setUp(self):
self.algorithm = 'RSA'
@ -37,6 +38,9 @@ class PublicKeyTestCase(base.KeyTestCase):
self.encoded = bytes(utils.get_public_key_der())
self.name = 'my key'
self.created = 1448088699
self.consumers = [{'service': 'service_test',
'resource_type': 'type_test',
'resource_id': 'id_test'}]
super(PublicKeyTestCase, self).setUp()
@ -48,7 +52,8 @@ class PublicKeyTestCase(base.KeyTestCase):
self.bit_length,
None,
self.name,
self.created)
self.created,
consumers=self.consumers)
self.assertTrue(k.is_metadata_only())
def test_get_algorithm(self):
@ -69,13 +74,17 @@ class PublicKeyTestCase(base.KeyTestCase):
def test_get_created(self):
self.assertEqual(self.created, self.key.created)
def test_get_consumers(self):
self.assertEqual(self.consumers, self.key.consumers)
def test_get_created_none(self):
created = None
key = public_key.PublicKey(self.algorithm,
self.bit_length,
self.encoded,
self.name,
created)
created,
consumers=self.consumers)
self.assertEqual(created, key.created)
@ -88,7 +97,8 @@ class PublicKeyTestCase(base.KeyTestCase):
other_key = public_key.PublicKey(self.algorithm,
self.bit_length,
self.encoded)
self.encoded,
consumers=self.consumers)
self.assertTrue(self.key == other_key)
self.assertFalse(self.key is other_key)
@ -100,14 +110,16 @@ class PublicKeyTestCase(base.KeyTestCase):
other_key = public_key.PublicKey('DSA',
self.bit_length,
self.encoded,
self.name)
self.name,
consumers=self.consumers)
self.assertTrue(self.key != other_key)
def test___ne___bit_length(self):
other_key = public_key.PublicKey(self.algorithm,
4096,
self.encoded,
self.name)
self.name,
consumers=self.consumers)
self.assertTrue(self.key != other_key)
def test___ne___encoded(self):
@ -115,9 +127,21 @@ class PublicKeyTestCase(base.KeyTestCase):
other_key = public_key.PublicKey(self.algorithm,
self.bit_length,
different_encoded,
self.name)
self.name,
consumers=self.consumers)
self.assertTrue(self.key != other_key)
def test___ne___consumers(self):
different_consumers = [{'service': 'other_service',
'resource_type': 'other_type',
'resource_id': 'other_id'}]
other_key = public_key.PublicKey(self.algorithm,
self.bit_length,
self.encoded,
self.name,
consumers=different_consumers)
self.assertTrue(self.key is not other_key)
def test_to_and_from_dict(self):
other = objects.from_dict(self.key.to_dict())
self.assertEqual(self.key, other)

View File

@ -28,7 +28,8 @@ class SymmetricKeyTestCase(base.KeyTestCase):
self.bit_length,
self.encoded,
self.name,
self.created)
self.created,
consumers=self.consumers)
def setUp(self):
self.algorithm = 'AES'
@ -36,6 +37,9 @@ class SymmetricKeyTestCase(base.KeyTestCase):
self.bit_length = len(self.encoded) * 8
self.name = 'my key'
self.created = 1448088699
self.consumers = [{'service': 'service_test',
'resource_type': 'type_test',
'resource_id': 'id_test'}]
super(SymmetricKeyTestCase, self).setUp()
@ -47,7 +51,8 @@ class SymmetricKeyTestCase(base.KeyTestCase):
self.bit_length,
None,
self.name,
self.created)
self.created,
consumers=self.consumers)
self.assertTrue(k.is_metadata_only())
def test_get_format(self):
@ -68,13 +73,17 @@ class SymmetricKeyTestCase(base.KeyTestCase):
def test_get_created(self):
self.assertEqual(self.created, self.key.created)
def test_get_consumers(self):
self.assertEqual(self.consumers, self.key.consumers)
def test_get_created_none(self):
created = None
key = sym_key.SymmetricKey(self.algorithm,
self.bit_length,
self.encoded,
self.name,
created)
created,
consumers=self.consumers)
self.assertEqual(created, key.created)
@ -87,7 +96,8 @@ class SymmetricKeyTestCase(base.KeyTestCase):
other_key = sym_key.SymmetricKey(self.algorithm,
self.bit_length,
self.encoded)
self.encoded,
consumers=self.consumers)
self.assertTrue(self.key == other_key)
self.assertFalse(self.key is other_key)
@ -99,14 +109,16 @@ class SymmetricKeyTestCase(base.KeyTestCase):
other_key = sym_key.SymmetricKey('DES',
self.bit_length,
self.encoded,
self.name)
self.name,
consumers=self.consumers)
self.assertTrue(self.key != other_key)
def test___ne___bit_length(self):
other_key = sym_key.SymmetricKey(self.algorithm,
self.bit_length * 2,
self.encoded,
self.name)
self.name,
consumers=self.consumers)
self.assertTrue(self.key != other_key)
def test___ne___encoded(self):
@ -114,9 +126,21 @@ class SymmetricKeyTestCase(base.KeyTestCase):
other_key = sym_key.SymmetricKey(self.algorithm,
self.bit_length,
different_encoded,
self.name)
self.name,
consumers=self.consumers)
self.assertTrue(self.key != other_key)
def test___ne___consumers(self):
different_consumers = [{'service': 'other_service',
'resource_type': 'other_type',
'resource_id': 'other_id'}]
other_key = sym_key.SymmetricKey(self.algorithm,
self.bit_length,
self.encoded,
self.name,
consumers=different_consumers)
self.assertTrue(self.key is not other_key)
def test_to_and_from_dict(self):
other = objects.from_dict(self.key.to_dict())
self.assertEqual(self.key, other)

View File

@ -27,12 +27,16 @@ class X509TestCase(base.CertificateTestCase):
def _create_cert(self):
return x_509.X509(self.data,
self.name,
self.created)
self.created,
consumers=self.consumers)
def setUp(self):
self.data = utils.get_certificate_der()
self.name = 'my cert'
self.created = 1448088699
self.consumers = [{'service': 'service_test',
'resource_type': 'type_test',
'resource_id': 'id_test'}]
super(X509TestCase, self).setUp()
@ -55,11 +59,15 @@ class X509TestCase(base.CertificateTestCase):
def test_get_created(self):
self.assertEqual(self.created, self.cert.created)
def test_get_consumers(self):
self.assertEqual(self.consumers, self.cert.consumers)
def test_get_created_none(self):
created = None
cert = x_509.X509(self.data,
self.name,
created)
created,
consumers=self.consumers)
self.assertEqual(created, cert.created)
@ -82,6 +90,16 @@ class X509TestCase(base.CertificateTestCase):
other_x509 = x_509.X509(b'\x00\x00\x00', self.name)
self.assertTrue(self.cert != other_x509)
def test___ne___consumers(self):
different_consumers = [{'service': 'other_service',
'resource_type': 'other_type',
'resource_id': 'other_id'}]
other_cert = x_509.X509(self.data,
self.name,
self.created,
consumers=different_consumers)
self.assertTrue(self.cert is not other_cert)
def test_to_and_from_dict(self):
other = objects.from_dict(self.cert.to_dict())
self.assertEqual(self.cert, other)