Merge "Fix Dogtag plugin and tests to match current secret_store API"
This commit is contained in:
@@ -65,7 +65,6 @@ class DogtagPlugin(sstore.SecretStoreBase):
|
||||
# metadata constants
|
||||
KEY_ID = "key_id"
|
||||
SECRET_TYPE = "secret_type"
|
||||
SECRET_FORMAT = "secret_format"
|
||||
SECRET_KEYSPEC = "secret_keyspec"
|
||||
|
||||
def __init__(self, conf=CONF):
|
||||
@@ -126,7 +125,7 @@ class DogtagPlugin(sstore.SecretStoreBase):
|
||||
self.keyclient.set_transport_cert(
|
||||
DogtagPlugin.TRANSPORT_NICK)
|
||||
|
||||
def store_secret(self, secret_dto):
|
||||
def store_secret(self, secret_dto, context):
|
||||
"""Store a secret in the DRM
|
||||
|
||||
This will likely require another parameter which includes the wrapped
|
||||
@@ -149,11 +148,10 @@ class DogtagPlugin(sstore.SecretStoreBase):
|
||||
key_algorithm=None,
|
||||
key_size=None)
|
||||
return {DogtagPlugin.SECRET_TYPE: secret_dto.type,
|
||||
DogtagPlugin.SECRET_FORMAT: secret_dto.format,
|
||||
DogtagPlugin.SECRET_KEYSPEC: secret_dto.key_spec,
|
||||
DogtagPlugin.KEY_ID: response.get_key_id()}
|
||||
|
||||
def get_secret(self, secret_metadata):
|
||||
def get_secret(self, secret_metadata, context):
|
||||
"""Retrieve a secret from the DRM
|
||||
|
||||
The secret_metadata is simply the dict returned by a store_secret() or
|
||||
@@ -175,10 +173,13 @@ class DogtagPlugin(sstore.SecretStoreBase):
|
||||
key_id = secret_metadata[DogtagPlugin.KEY_ID]
|
||||
|
||||
recovered_key = self.keyclient.retrieve_key(key_id)
|
||||
|
||||
# TODO(alee) remove final field when content_type is removed
|
||||
# from secret_dto
|
||||
ret = sstore.SecretDTO(secret_metadata[DogtagPlugin.SECRET_TYPE],
|
||||
secret_metadata[DogtagPlugin.SECRET_FORMAT],
|
||||
recovered_key,
|
||||
secret_metadata[DogtagPlugin.SECRET_KEYSPEC])
|
||||
secret_metadata[DogtagPlugin.SECRET_KEYSPEC],
|
||||
None)
|
||||
|
||||
return ret
|
||||
|
||||
@@ -190,7 +191,7 @@ class DogtagPlugin(sstore.SecretStoreBase):
|
||||
"""
|
||||
pass
|
||||
|
||||
def generate_symmetric_key(self, key_spec):
|
||||
def generate_symmetric_key(self, key_spec, context):
|
||||
"""Generate a symmetric key
|
||||
|
||||
This calls generate_symmetric_key() on the DRM passing in the
|
||||
@@ -215,11 +216,10 @@ class DogtagPlugin(sstore.SecretStoreBase):
|
||||
key_spec.bit_length,
|
||||
usages)
|
||||
return {DogtagPlugin.SECRET_KEYSPEC: key_spec,
|
||||
DogtagPlugin.SECRET_FORMAT: sstore.KeyFormat.RAW,
|
||||
DogtagPlugin.SECRET_TYPE: sstore.SecretType.SYMMETRIC,
|
||||
DogtagPlugin.KEY_ID: response.get_key_id()}
|
||||
|
||||
def generate_asymmetric_key(self, key_spec):
|
||||
def generate_asymmetric_key(self, key_spec, context):
|
||||
"""Generate an asymmetric key."""
|
||||
raise NotImplementedError(
|
||||
"Feature not yet implemented by dogtag plugin")
|
||||
|
||||
@@ -58,7 +58,8 @@ class WhenTestingDogtagPlugin(testtools.TestCase):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
key_spec = sstore.KeySpec(sstore.KeyAlgorithm.AES, 128)
|
||||
self.plugin.generate_symmetric_key(key_spec)
|
||||
context = mock.MagicMock()
|
||||
self.plugin.generate_symmetric_key(key_spec, context)
|
||||
|
||||
self.keyclient_mock.generate_symmetric_key.assert_called_once_with(
|
||||
mock.ANY,
|
||||
@@ -70,10 +71,12 @@ class WhenTestingDogtagPlugin(testtools.TestCase):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
key_spec = sstore.KeySpec(sstore.KeyAlgorithm.EC, 192)
|
||||
context = mock.MagicMock()
|
||||
self.assertRaises(
|
||||
dogtag_import.DogtagPluginAlgorithmException,
|
||||
self.plugin.generate_symmetric_key,
|
||||
key_spec
|
||||
key_spec,
|
||||
context
|
||||
)
|
||||
|
||||
def test_raises_error_with_no_pem_path(self):
|
||||
@@ -113,11 +116,14 @@ class WhenTestingDogtagPlugin(testtools.TestCase):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
payload = 'encrypt me!!'
|
||||
key_spec = mock.MagicMock()
|
||||
content_type = mock.MagicMock()
|
||||
context = mock.MagicMock()
|
||||
secret_dto = sstore.SecretDTO(sstore.SecretType.SYMMETRIC,
|
||||
sstore.KeyFormat.RAW,
|
||||
payload,
|
||||
mock.MagicMock())
|
||||
self.plugin.store_secret(secret_dto)
|
||||
key_spec,
|
||||
content_type)
|
||||
self.plugin.store_secret(secret_dto, context)
|
||||
self.keyclient_mock.archive_key.assert_called_once_with(
|
||||
mock.ANY,
|
||||
"passPhrase",
|
||||
@@ -128,14 +134,15 @@ class WhenTestingDogtagPlugin(testtools.TestCase):
|
||||
def test_get_secret(self):
|
||||
if not imports_ok:
|
||||
self.skipTest("Dogtag imports not available")
|
||||
key_spec = mock.MagicMock()
|
||||
context = mock.MagicMock()
|
||||
secret_metadata = {
|
||||
dogtag_import.DogtagPlugin.SECRET_FORMAT: sstore.KeyFormat.RAW,
|
||||
dogtag_import.DogtagPlugin.SECRET_TYPE:
|
||||
sstore.SecretType.SYMMETRIC,
|
||||
dogtag_import.DogtagPlugin.SECRET_KEYSPEC: mock.MagicMock(),
|
||||
dogtag_import.DogtagPlugin.SECRET_KEYSPEC: key_spec,
|
||||
dogtag_import.DogtagPlugin.KEY_ID: 'key1'
|
||||
}
|
||||
self.plugin.get_secret(secret_metadata)
|
||||
self.plugin.get_secret(secret_metadata, context)
|
||||
|
||||
self.keyclient_mock.retrieve_key.assert_called_once_with('key1')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user