Adding operation policy enforcement to the KMIP server engine

This change adds enforcement of KMIP operation policies to the
server engine, specifically to the Get and Destroy operations.
Explicit object ownership is enforced as a result, with ownership
now a tracked internal server property of managed objects. Tests
for this new functionality are included.
This commit is contained in:
Peter Hamilton 2016-10-14 12:59:12 -04:00
parent 843c75d830
commit e0b0a5c7bf
3 changed files with 408 additions and 6 deletions

View File

@ -57,6 +57,7 @@ class ManagedObject(sql.Base):
String(50),
default='default'
)
_owner = Column('owner', String(50), default=None)
__mapper_args__ = {
'polymorphic_identity': 'ManagedObject',
@ -78,6 +79,7 @@ class ManagedObject(sql.Base):
self.names = list()
self.operation_policy_name = None
self._object_type = None
self._owner = None
# All remaining attributes are not considered part of the public API
# and are subject to change.

View File

@ -42,6 +42,8 @@ from kmip.core.messages.payloads import register
from kmip.core import misc
from kmip.core.policy import policies
from kmip.pie import factory
from kmip.pie import objects
from kmip.pie import sqltypes
@ -116,6 +118,12 @@ class KmipEngine(object):
}
self._attribute_policy = policy.AttributePolicy(self._protocol_version)
self._operation_policies = policies
self._client_identity = None
def _get_enum_string(self, e):
return ''.join([x.capitalize() for x in e.name.split('_')])
def _kmip_version_supported(supported):
def decorator(function):
@ -156,12 +164,16 @@ class KmipEngine(object):
)
def _verify_credential(self, request_credential, connection_credential):
# TODO (peterhamilton) Add authentication support
# TODO (peterhamilton) Improve authentication support
# 1. If present, verify user ID of connection_credential is valid user.
# 2. If present, verify request_credential is valid credential.
# 3. If both present, verify that they are compliant with each other.
# 4. If neither present, set server to only allow Query operations.
pass
# For now, simply use the connection_credential as received. It was
# obtained from a valid client certificate, so consider it a trusted
# form of client identity.
self._client_identity = connection_credential
@_synchronize
def process_request(self, request, credential=None):
@ -177,14 +189,15 @@ class KmipEngine(object):
Args:
request (RequestMessage): The request message containing the batch
items to be processed.
credential (Credential): A credential containing any identifying
information about the client obtained from the client
certificate. Optional, defaults to None.
credential (string): Identifying information about the client
obtained from the client certificate. Optional, defaults to
None.
Returns:
ResponseMessage: The response containing all of the results from
the request batch items.
"""
self._client_identity = None
header = request.request_header
# Process the protocol version
@ -621,6 +634,55 @@ class KmipEngine(object):
"The {0} attribute is unsupported.".format(attribute_name)
)
def _is_allowed_by_operation_policy(
self,
operation_policy,
session_identity,
object_owner,
object_type,
operation
):
policy_set = self._operation_policies.get(operation_policy)
if not policy_set:
self._logger.warning(
"The '{0}' policy does not exist.".format(operation_policy)
)
return False
object_policy = policy_set.get(object_type)
if not object_policy:
self._logger.warning(
"The '{0}' policy does not apply to {1} objects.".format(
operation_policy,
self._get_enum_string(object_type)
)
)
return False
operation_object_policy = object_policy.get(operation)
if not operation_object_policy:
self._logger.warning(
"The '{0}' policy does not apply to {1} operations on {2} "
"objects.".format(
operation_policy,
self._get_enum_string(operation),
self._get_enum_string(object_type)
)
)
return False
if operation_object_policy == enums.Policy.ALLOW_ALL:
return True
elif operation_object_policy == enums.Policy.ALLOW_OWNER:
if session_identity == object_owner:
return True
else:
return False
elif operation_object_policy == enums.Policy.DISALLOW_ALL:
return False
else:
return False
def _process_operation(self, operation, payload):
if operation == enums.Operation.CREATE:
return self._process_create(payload)
@ -710,6 +772,7 @@ class KmipEngine(object):
)
# TODO (peterhamilton) Set additional server-only attributes.
managed_object._owner = self._client_identity
self._data_session.add(managed_object)
@ -876,6 +939,8 @@ class KmipEngine(object):
)
# TODO (peterhamilton) Set additional server-only attributes.
public_key._owner = self._client_identity
private_key._owner = self._client_identity
self._data_session.add(public_key)
self._data_session.add(private_key)
@ -949,6 +1014,7 @@ class KmipEngine(object):
)
# TODO (peterhamilton) Set additional server-only attributes.
managed_object._owner = self._client_identity
self._data_session.add(managed_object)
@ -1004,6 +1070,20 @@ class KmipEngine(object):
object_type.unique_identifier == unique_identifier
).one()
# Determine if the request should be carried out under the object's
# operation policy. If not, feign ignorance of the object.
is_allowed = self._is_allowed_by_operation_policy(
managed_object.operation_policy_name,
self._client_identity,
managed_object._owner,
managed_object._object_type,
enums.Operation.GET
)
if not is_allowed:
raise exceptions.ItemNotFound(
"Could not locate object: {0}".format(unique_identifier)
)
if key_format_type:
if not hasattr(managed_object, 'key_format_type'):
raise exceptions.KeyFormatTypeNotSupported(
@ -1047,12 +1127,30 @@ class KmipEngine(object):
else:
unique_identifier = self._id_placeholder
self._get_object_type(unique_identifier)
object_type = self._get_object_type(unique_identifier)
# TODO (peterhamilton) Process attributes to see if destroy possible
# 1. Check object state. If invalid, error out.
# 2. Check object deactivation date. If invalid, error out.
managed_object = self._data_session.query(object_type).filter(
object_type.unique_identifier == unique_identifier
).one()
# Determine if the request should be carried out under the object's
# operation policy. If not, feign ignorance of the object.
is_allowed = self._is_allowed_by_operation_policy(
managed_object.operation_policy_name,
self._client_identity,
managed_object._owner,
managed_object._object_type,
enums.Operation.DESTROY
)
if not is_allowed:
raise exceptions.ItemNotFound(
"Could not locate object: {0}".format(unique_identifier)
)
self._logger.info(
"Destroying an object with ID: {0}".format(unique_identifier)
)

View File

@ -1468,6 +1468,244 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_is_allowed_by_operation_policy(self):
"""
Test that an allowed operation is correctly allowed by the operation
policy.
"""
e = engine.KmipEngine()
e._operation_policies = {
'test': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
}
is_allowed = e._is_allowed_by_operation_policy(
'test',
'test',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertTrue(is_allowed)
def test_is_allowed_by_operation_policy_blocked(self):
"""
Test that an unallowed operation is correctly blocked by the operation
policy.
"""
e = engine.KmipEngine()
e._operation_policies = {
'test': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
}
is_allowed = e._is_allowed_by_operation_policy(
'test',
'random',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(is_allowed)
def test_is_allowed_by_operation_public(self):
"""
Test that a public operation is allowed by the operation policy.
"""
e = engine.KmipEngine()
e._operation_policies = {
'test': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL
}
}
}
is_allowed = e._is_allowed_by_operation_policy(
'test',
'test',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertTrue(is_allowed)
is_allowed = e._is_allowed_by_operation_policy(
'test',
'random',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertTrue(is_allowed)
def test_is_allowed_by_operation_block_all(self):
"""
Test that a blocked operation is blocked by the operation policy.
"""
e = engine.KmipEngine()
e._operation_policies = {
'test': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.DISALLOW_ALL
}
}
}
is_allowed = e._is_allowed_by_operation_policy(
'test',
'test',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(is_allowed)
is_allowed = e._is_allowed_by_operation_policy(
'test',
'random',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(is_allowed)
def test_is_allowed_by_operation_safety_check(self):
"""
Test that an unknown operation is blocked by the operation policy.
"""
e = engine.KmipEngine()
e._operation_policies = {
'test': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: 'unknown value'
}
}
}
is_allowed = e._is_allowed_by_operation_policy(
'test',
'test',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(is_allowed)
is_allowed = e._is_allowed_by_operation_policy(
'test',
'random',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(is_allowed)
def test_is_allowed_by_operation_policy_nonexistent_policy(self):
"""
Test that a check with a non-existent policy yields a logging warning
and a blocked operation.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
policy = 'nonexistent-policy'
is_allowed = e._is_allowed_by_operation_policy(
policy,
'test',
'test',
enums.ObjectType.SYMMETRIC_KEY,
enums.Operation.GET
)
self.assertFalse(is_allowed)
e._logger.warning.assert_called_once_with(
"The '{0}' policy does not exist.".format(policy)
)
def test_is_allowed_by_operation_policy_not_object_applicable(self):
"""
Test that a check for an object with a non-applicable policy yields
a logging warning and a blocked operation.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._operation_policies = {
'test': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
}
policy = 'test'
object_type = enums.ObjectType.PRIVATE_KEY
is_allowed = e._is_allowed_by_operation_policy(
policy,
'test',
'test',
object_type,
enums.Operation.GET
)
self.assertFalse(is_allowed)
e._logger.warning.assert_called_once_with(
"The '{0}' policy does not apply to {1} objects.".format(
policy,
e._get_enum_string(object_type)
)
)
def test_is_allowed_by_operation_policy_not_applicable(self):
"""
Test that a check with a non-applicable policy yields a logging
warning and a blocked operation.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._operation_policies = {
'test': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_OWNER
}
}
}
policy = 'test'
object_type = enums.ObjectType.SYMMETRIC_KEY
operation = enums.Operation.CREATE
is_allowed = e._is_allowed_by_operation_policy(
policy,
'test',
'test',
object_type,
operation
)
self.assertFalse(is_allowed)
e._logger.warning.assert_called_once_with(
"The '{0}' policy does not apply to {1} operations on {2} "
"objects.".format(
policy,
e._get_enum_string(operation),
e._get_enum_string(object_type)
)
)
def test_create(self):
"""
Test that a Create request can be processed correctly.
@ -2797,6 +3035,38 @@ class TestKmipEngine(testtools.TestCase):
"Processing operation: Get"
)
def test_get_not_allowed_by_policy(self):
"""
Test that an unallowed request is handled correctly by Get.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._client_identity = 'test'
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
obj_a._owner = 'admin'
e._data_session.add(obj_a)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
payload = get.GetRequestPayload(
unique_identifier=attributes.UniqueIdentifier(id_a)
)
# Test by specifying the ID of the object to get.
args = [payload]
self.assertRaisesRegex(
exceptions.ItemNotFound,
"Could not locate object: {0}".format(id_a),
e._process_get,
*args
)
def test_destroy(self):
"""
Test that a Destroy request can be processed correctly.
@ -2869,6 +3139,38 @@ class TestKmipEngine(testtools.TestCase):
e._data_session.commit()
def test_destroy_not_allowed_by_policy(self):
"""
Test that an unallowed request is handled correctly by Destroy.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._client_identity = 'test'
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
obj_a._owner = 'admin'
e._data_session.add(obj_a)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
payload = destroy.DestroyRequestPayload(
unique_identifier=attributes.UniqueIdentifier(id_a)
)
# Test by specifying the ID of the object to destroy.
args = [payload]
self.assertRaisesRegex(
exceptions.ItemNotFound,
"Could not locate object: {0}".format(id_a),
e._process_destroy,
*args
)
def test_query(self):
"""
Test that a Query request can be processed correctly, for different