Updating the Get unit demo
This change updates the Get unit demo, adding support for: * the additional display of key-related objects * the passing of more arguments to the client (e.g., KeyFormatType) * the retrieval of Public/PrivateKeys
This commit is contained in:
parent
3e11002567
commit
d263302077
@ -15,7 +15,13 @@
|
||||
|
||||
from kmip.core.enums import KeyFormatType
|
||||
|
||||
from kmip.core.keys import ECPrivateKey
|
||||
from kmip.core.keys import OpaqueKey
|
||||
from kmip.core.keys import PKCS1Key
|
||||
from kmip.core.keys import PKCS8Key
|
||||
from kmip.core.keys import RawKey
|
||||
from kmip.core.keys import TransparentSymmetricKey
|
||||
from kmip.core.keys import X509Key
|
||||
|
||||
|
||||
class KeyFactory(object):
|
||||
@ -28,17 +34,17 @@ class KeyFactory(object):
|
||||
if key_format is KeyFormatType.RAW:
|
||||
return self._create_raw_key(value)
|
||||
elif key_format is KeyFormatType.OPAQUE:
|
||||
return self._create_opaque_key(value)
|
||||
return self._create_opaque_key()
|
||||
elif key_format is KeyFormatType.PKCS_1:
|
||||
return self._create_pkcs_1_key(value)
|
||||
return self._create_pkcs_1_key()
|
||||
elif key_format is KeyFormatType.PKCS_8:
|
||||
return self._create_pkcs_8_key(value)
|
||||
return self._create_pkcs_8_key()
|
||||
elif key_format is KeyFormatType.X_509:
|
||||
return self._create_x_509_key(value)
|
||||
return self._create_x_509_key()
|
||||
elif key_format is KeyFormatType.EC_PRIVATE_KEY:
|
||||
return self._create_ec_private_key(value)
|
||||
return self._create_ec_private_key()
|
||||
elif key_format is KeyFormatType.TRANSPARENT_SYMMETRIC_KEY:
|
||||
return self._create_transparent_symmetric_key(value)
|
||||
return self._create_transparent_symmetric_key()
|
||||
elif key_format is KeyFormatType.TRANSPARENT_DSA_PRIVATE_KEY:
|
||||
return self._create_transparent_dsa_private_key(value)
|
||||
elif key_format is KeyFormatType.TRANSPARENT_DSA_PUBLIC_KEY:
|
||||
@ -71,23 +77,23 @@ class KeyFactory(object):
|
||||
data = value.get('bytes')
|
||||
return RawKey(data)
|
||||
|
||||
def _create_opaque_key(self, value):
|
||||
raise NotImplementedError()
|
||||
def _create_opaque_key(self):
|
||||
return OpaqueKey()
|
||||
|
||||
def _create_pkcs_1_key(self, value):
|
||||
raise NotImplementedError()
|
||||
def _create_pkcs_1_key(self):
|
||||
return PKCS1Key()
|
||||
|
||||
def _create_pkcs_8_key(self, value):
|
||||
raise NotImplementedError()
|
||||
def _create_pkcs_8_key(self):
|
||||
return PKCS8Key()
|
||||
|
||||
def _create_x_509_key(self, value):
|
||||
raise NotImplementedError()
|
||||
def _create_x_509_key(self):
|
||||
return X509Key()
|
||||
|
||||
def _create_ec_private_key(self, value):
|
||||
raise NotImplementedError()
|
||||
def _create_ec_private_key(self):
|
||||
return ECPrivateKey()
|
||||
|
||||
def _create_transparent_symmetric_key(self, value):
|
||||
raise NotImplementedError()
|
||||
def _create_transparent_symmetric_key(self):
|
||||
return TransparentSymmetricKey()
|
||||
|
||||
def _create_transparent_dsa_private_key(self, value):
|
||||
raise NotImplementedError()
|
||||
|
@ -28,6 +28,8 @@ from kmip.core.objects import KeyWrappingData
|
||||
from kmip.core.objects import KeyValueStruct
|
||||
from kmip.core.objects import KeyValue
|
||||
|
||||
from kmip.core.secrets import PrivateKey
|
||||
from kmip.core.secrets import PublicKey
|
||||
from kmip.core.secrets import SymmetricKey
|
||||
from kmip.core.secrets import Template
|
||||
|
||||
@ -43,15 +45,15 @@ class SecretFactory(object):
|
||||
self.template_input = self.base_error.format('Template', '{0}', '{1}',
|
||||
'{2}')
|
||||
|
||||
def create_secret(self, secret_type, value=None):
|
||||
def create(self, secret_type, value=None):
|
||||
if secret_type is ObjectType.CERTIFICATE:
|
||||
return self._create_certificate(value)
|
||||
elif secret_type is ObjectType.SYMMETRIC_KEY:
|
||||
return self._create_symmetric_key(value)
|
||||
elif secret_type is ObjectType.PUBLIC_KEY:
|
||||
return self._create_public_key(value)
|
||||
return self._create_public_key()
|
||||
elif secret_type is ObjectType.PRIVATE_KEY:
|
||||
return self._create_private_key(value)
|
||||
return self._create_private_key()
|
||||
elif secret_type is ObjectType.SPLIT_KEY:
|
||||
return self._create_split_key(value)
|
||||
elif secret_type is ObjectType.TEMPLATE:
|
||||
@ -106,11 +108,11 @@ class SecretFactory(object):
|
||||
key_wrap_data)
|
||||
return SymmetricKey(key_block)
|
||||
|
||||
def _create_public_key(self, value):
|
||||
raise NotImplementedError()
|
||||
def _create_public_key(self):
|
||||
return PublicKey()
|
||||
|
||||
def _create_private_key(self, value):
|
||||
raise NotImplementedError()
|
||||
def _create_private_key(self):
|
||||
return PrivateKey()
|
||||
|
||||
def _create_split_key(self, value):
|
||||
raise NotImplementedError()
|
||||
|
@ -130,7 +130,7 @@ class GetResponsePayload(Struct):
|
||||
self.unique_identifier.read(tstream)
|
||||
|
||||
secret_type = self.object_type.enum
|
||||
self.secret = self.secret_factory.create_secret(secret_type)
|
||||
self.secret = self.secret_factory.create(secret_type)
|
||||
self.secret.read(tstream)
|
||||
|
||||
self.is_oversized(tstream)
|
||||
|
@ -52,7 +52,7 @@ class RegisterRequestPayload(Struct):
|
||||
self.template_attribute.read(tstream)
|
||||
|
||||
secret_type = self.object_type.enum
|
||||
secret = self.secret_factory.create_secret(secret_type)
|
||||
secret = self.secret_factory.create(secret_type)
|
||||
|
||||
if self.is_tag_next(secret.tag, tstream):
|
||||
self.secret = secret
|
||||
|
@ -71,7 +71,7 @@ if __name__ == '__main__':
|
||||
secret_features.update([('cryptographic_algorithm', algorithm_value)])
|
||||
secret_features.update([('cryptographic_length', 128)])
|
||||
|
||||
secret = secret_factory.create_secret(object_type, secret_features)
|
||||
secret = secret_factory.create(object_type, secret_features)
|
||||
|
||||
result = client.register(object_type, template_attribute, secret,
|
||||
credential)
|
||||
|
@ -14,12 +14,15 @@
|
||||
# under the License.
|
||||
|
||||
from kmip.core.enums import CredentialType
|
||||
from kmip.core.enums import KeyFormatType as KeyFormatTypeEnum
|
||||
from kmip.core.enums import Operation
|
||||
from kmip.core.enums import ResultStatus
|
||||
|
||||
from kmip.core.factories.attributes import AttributeFactory
|
||||
from kmip.core.factories.credentials import CredentialFactory
|
||||
|
||||
from kmip.core.misc import KeyFormatType
|
||||
|
||||
from kmip.demos import utils
|
||||
|
||||
from kmip.services.kmip_client import KMIPProxy
|
||||
@ -38,12 +41,22 @@ if __name__ == '__main__':
|
||||
password = opts.password
|
||||
config = opts.config
|
||||
uuid = opts.uuid
|
||||
format_type = opts.format
|
||||
|
||||
# Exit early if the UUID is not specified
|
||||
if uuid is None:
|
||||
logging.debug('No UUID provided, exiting early from demo')
|
||||
sys.exit()
|
||||
|
||||
format_type_enum = None
|
||||
if format_type is not None:
|
||||
format_type_enum = getattr(KeyFormatTypeEnum, format_type, None)
|
||||
|
||||
if format_type_enum is None:
|
||||
logging.error(
|
||||
"Invalid key format type specified; exiting early from demo")
|
||||
sys.exit()
|
||||
|
||||
# Build and setup logging and needed factories
|
||||
f_log = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir,
|
||||
'logconfig.ini')
|
||||
@ -63,12 +76,18 @@ if __name__ == '__main__':
|
||||
'Password': password}
|
||||
credential = credential_factory.create_credential(credential_type,
|
||||
credential_value)
|
||||
|
||||
key_format_type = None
|
||||
if format_type_enum is not None:
|
||||
key_format_type = KeyFormatType(format_type_enum)
|
||||
|
||||
# Build the client and connect to the server
|
||||
client = KMIPProxy(config=config)
|
||||
client.open()
|
||||
|
||||
# Retrieve the SYMMETRIC_KEY object
|
||||
result = client.get(uuid, credential)
|
||||
result = client.get(uuid=uuid, key_format_type=key_format_type,
|
||||
credential=credential)
|
||||
client.close()
|
||||
|
||||
# Display operation results
|
||||
@ -79,7 +98,8 @@ if __name__ == '__main__':
|
||||
logger.debug('retrieved object type: {0}'.format(
|
||||
result.object_type.enum))
|
||||
logger.debug('retrieved UUID: {0}'.format(result.uuid.value))
|
||||
logger.debug('retrieved secret: {0}'.format(result.secret))
|
||||
|
||||
utils.log_secret(logger, result.object_type.enum, result.secret)
|
||||
else:
|
||||
logger.debug('get() result reason: {0}'.format(
|
||||
result.result_reason.enum))
|
||||
|
@ -114,7 +114,7 @@ if __name__ == '__main__':
|
||||
secret_features.update([('cryptographic_algorithm', algorithm_enum)])
|
||||
secret_features.update([('cryptographic_length', length)])
|
||||
|
||||
secret = secret_factory.create_secret(object_type, secret_features)
|
||||
secret = secret_factory.create(object_type, secret_features)
|
||||
|
||||
# Register the SYMMETRIC_KEY object
|
||||
result = client.register(object_type, template_attribute, secret,
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from kmip.core.enums import ObjectType
|
||||
from kmip.core.enums import Operation
|
||||
|
||||
import optparse
|
||||
@ -109,6 +110,15 @@ def build_cli_parser(operation):
|
||||
default=None,
|
||||
dest="uuid",
|
||||
help="UUID of secret to retrieve from the KMIP server")
|
||||
parser.add_option(
|
||||
"-f",
|
||||
"--format",
|
||||
action="store",
|
||||
type="str",
|
||||
default=None,
|
||||
dest="format",
|
||||
help=("Format in which to retrieve the secret. Supported formats "
|
||||
"include: RAW, PKCS_1, PKCS_8, X_509"))
|
||||
elif operation is Operation.LOCATE:
|
||||
parser.add_option(
|
||||
"-n",
|
||||
@ -154,6 +164,10 @@ def log_template_attribute(logger, template_attribute):
|
||||
name = names[i]
|
||||
logger.info('name {0}: {1}'.format(i, name))
|
||||
|
||||
log_attribute_list(attributes)
|
||||
|
||||
|
||||
def log_attribute_list(logger, attributes):
|
||||
logger.info('number of attributes: {0}'.format(len(attributes)))
|
||||
for i in range(len(attributes)):
|
||||
attribute = attributes[i]
|
||||
@ -166,3 +180,75 @@ def log_template_attribute(logger, template_attribute):
|
||||
logger.info(' attribute_index: {0}'.format(attribute_index))
|
||||
logger.info(' attribute_value: {0}'.format(
|
||||
repr(attribute_value)))
|
||||
|
||||
|
||||
def log_secret(logger, secret_type, secret_value):
|
||||
if secret_type is ObjectType.PRIVATE_KEY:
|
||||
log_private_key(logger, secret_value)
|
||||
elif secret_type is ObjectType.PUBLIC_KEY:
|
||||
log_public_key(logger, secret_value)
|
||||
else:
|
||||
logger.info('generic secret: {0}'.format(secret_value))
|
||||
|
||||
|
||||
def log_public_key(logger, public_key):
|
||||
key_block = public_key.key_block
|
||||
|
||||
log_key_block(logger, key_block)
|
||||
|
||||
|
||||
def log_private_key(logger, private_key):
|
||||
key_block = private_key.key_block
|
||||
|
||||
log_key_block(logger, key_block)
|
||||
|
||||
|
||||
def log_key_block(logger, key_block):
|
||||
if key_block is not None:
|
||||
logger.info('key block:')
|
||||
|
||||
key_format_type = key_block.key_format_type
|
||||
key_compression_type = key_block.key_compression_type
|
||||
key_value = key_block.key_value
|
||||
cryptographic_algorithm = key_block.cryptographic_algorithm
|
||||
cryptographic_length = key_block.cryptographic_length
|
||||
key_wrapping_data = key_block.key_wrapping_data
|
||||
|
||||
logger.info('* key format type: {0}'.format(key_format_type))
|
||||
logger.info('* key compression type: {0}'.format(
|
||||
key_compression_type))
|
||||
logger.info('* cryptographic algorithm: {0}'.format(
|
||||
cryptographic_algorithm))
|
||||
logger.info('* cryptographic length: {0}'.format(
|
||||
cryptographic_length))
|
||||
|
||||
log_key_value(logger, key_value)
|
||||
log_key_wrapping_data(logger, key_wrapping_data)
|
||||
else:
|
||||
logger.info('key block: {0}'.format(key_block))
|
||||
|
||||
|
||||
def log_key_value(logger, key_value):
|
||||
if key_value is not None:
|
||||
key_format_type = key_value.key_format_type
|
||||
key_value = key_value.key_value
|
||||
|
||||
logger.info('key format type: {0}'.format(key_format_type))
|
||||
|
||||
if key_value is not None:
|
||||
logger.info('key value:')
|
||||
|
||||
key_material = key_value.key_material
|
||||
attributes = key_value.attributes
|
||||
|
||||
logger.info('key material: {0}'.format(repr(key_material)))
|
||||
|
||||
log_attribute_list(logger, attributes)
|
||||
else:
|
||||
logger.info('key value: {0}'.format(key_value))
|
||||
else:
|
||||
logger.info('key value: {0}'.format(key_value))
|
||||
|
||||
|
||||
def log_key_wrapping_data(logger, key_wrapping_data):
|
||||
logger.info('key wrapping data: {0}'.format(key_wrapping_data))
|
||||
|
@ -140,7 +140,12 @@ class KMIPProxy(KMIP):
|
||||
|
||||
def get(self, uuid=None, key_format_type=None, key_compression_type=None,
|
||||
key_wrapping_specification=None, credential=None):
|
||||
return self._get(unique_identifier=uuid, credential=credential)
|
||||
return self._get(
|
||||
unique_identifier=uuid,
|
||||
key_format_type=key_format_type,
|
||||
key_compression_type=key_compression_type,
|
||||
key_wrapping_specification=key_wrapping_specification,
|
||||
credential=credential)
|
||||
|
||||
def destroy(self, uuid, credential=None):
|
||||
return self._destroy(unique_identifier=uuid,
|
||||
@ -406,7 +411,7 @@ class KMIPProxy(KMIP):
|
||||
if unique_identifier is not None:
|
||||
uuid = attr.UniqueIdentifier(unique_identifier)
|
||||
if key_format_type is not None:
|
||||
kft = get.GetRequestPayload.KeyFormatType(key_format_type)
|
||||
kft = get.GetRequestPayload.KeyFormatType(key_format_type.enum)
|
||||
if key_compression_type is not None:
|
||||
kct = key_compression_type
|
||||
kct = get.GetRequestPayload.KeyCompressionType(kct)
|
||||
|
@ -1405,8 +1405,7 @@ class TestResponseMessage(TestCase):
|
||||
'key_value': {'bytes': key},
|
||||
'cryptographic_algorithm': crypto_algorithm,
|
||||
'cryptographic_length': cryptographic_length}
|
||||
secret = self.secret_factory.create_secret(ObjectType.SYMMETRIC_KEY,
|
||||
value)
|
||||
secret = self.secret_factory.create(ObjectType.SYMMETRIC_KEY, value)
|
||||
resp_pl = get.GetResponsePayload(object_type=object_type,
|
||||
unique_identifier=uniq_id,
|
||||
secret=secret)
|
||||
|
@ -147,7 +147,7 @@ class TestKMIPClientIntegration(TestCase):
|
||||
result = self._create_symmetric_key()
|
||||
uuid = result.uuid.value
|
||||
|
||||
result = self.client.get(uuid, credential)
|
||||
result = self.client.get(uuid=uuid, credential=credential)
|
||||
|
||||
self._check_result_status(result.result_status.enum, ResultStatus,
|
||||
ResultStatus.SUCCESS)
|
||||
@ -172,7 +172,7 @@ class TestKMIPClientIntegration(TestCase):
|
||||
uuid = result.uuid.value
|
||||
|
||||
# Verify the secret was created
|
||||
result = self.client.get(uuid, credential)
|
||||
result = self.client.get(uuid=uuid, credential=credential)
|
||||
|
||||
self._check_result_status(result.result_status.enum, ResultStatus,
|
||||
ResultStatus.SUCCESS)
|
||||
@ -194,7 +194,7 @@ class TestKMIPClientIntegration(TestCase):
|
||||
self._check_uuid(result.uuid.value, str)
|
||||
|
||||
# Verify the secret was destroyed
|
||||
result = self.client.get(uuid, credential)
|
||||
result = self.client.get(uuid=uuid, credential=credential)
|
||||
|
||||
self._check_result_status(result.result_status.enum, ResultStatus,
|
||||
ResultStatus.OPERATION_FAILED)
|
||||
@ -239,8 +239,7 @@ class TestKMIPClientIntegration(TestCase):
|
||||
secret_features.update([('cryptographic_algorithm', algorithm_value)])
|
||||
secret_features.update([('cryptographic_length', 128)])
|
||||
|
||||
secret = self.secret_factory.create_secret(object_type,
|
||||
secret_features)
|
||||
secret = self.secret_factory.create(object_type, secret_features)
|
||||
|
||||
result = self.client.register(object_type, template_attribute, secret,
|
||||
credential)
|
||||
@ -256,7 +255,7 @@ class TestKMIPClientIntegration(TestCase):
|
||||
None]])
|
||||
# Check that the returned key bytes match what was provided
|
||||
uuid = result.uuid.value
|
||||
result = self.client.get(uuid, credential)
|
||||
result = self.client.get(uuid=uuid, credential=credential)
|
||||
|
||||
self._check_result_status(result.result_status.enum, ResultStatus,
|
||||
ResultStatus.SUCCESS)
|
||||
|
Loading…
Reference in New Issue
Block a user