From 4a3769e113d75f91d6b177b50f2a2d98b36281b1 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Wed, 9 Nov 2016 18:22:32 -0500 Subject: [PATCH] Adding dynamic operation policy loading to the KMIP server This change adds support for dynamic operation policy loading. The server config file now supports a 'policy_path' option that points to a filesystem directory. Each file in the directory should contain a JSON policy object. The KMIP server will scan this directory and attempt to load all valid policies it finds. The results of this process will be logged. --- kmip/core/policy.py | 62 +++++++ kmip/services/server/config.py | 26 ++- kmip/services/server/engine.py | 72 +++++++- kmip/services/server/server.py | 34 +++- kmip/tests/unit/core/test_policy.py | 130 +++++++++++++ .../tests/unit/services/server/test_config.py | 52 ++++++ .../tests/unit/services/server/test_engine.py | 174 ++++++++++++++++++ .../tests/unit/services/server/test_server.py | 51 +++-- 8 files changed, 578 insertions(+), 23 deletions(-) create mode 100644 kmip/tests/unit/core/test_policy.py diff --git a/kmip/core/policy.py b/kmip/core/policy.py index 80af186..4be2c0e 100644 --- a/kmip/core/policy.py +++ b/kmip/core/policy.py @@ -13,9 +13,71 @@ # License for the specific language governing permissions and limitations # under the License. +import json +import six + from kmip.core import enums +def read_policy_from_file(path): + with open(path, 'r') as f: + try: + policy_blob = json.loads(f.read()) + except Exception as e: + raise ValueError( + "An error occurred while attempting to parse the JSON " + "file. {0}".format(e) + ) + + policies = dict() + + for name, object_policies in six.iteritems(policy_blob): + processed_object_policies = dict() + + for object_type, operation_policies in six.iteritems(object_policies): + processed_operation_policies = dict() + + for operation, permission in six.iteritems(operation_policies): + + try: + enum_operation = enums.Operation[operation] + except Exception: + raise ValueError( + "'{0}' is not a valid Operation value.".format( + operation + ) + ) + try: + enum_policy = enums.Policy[permission] + except Exception: + raise ValueError( + "'{0}' is not a valid Policy value.".format( + permission + ) + ) + + processed_operation_policies.update([ + (enum_operation, enum_policy) + ]) + + try: + enum_type = enums.ObjectType[object_type] + except Exception: + raise ValueError( + "'{0}' is not a valid ObjectType value.".format( + object_type + ) + ) + + processed_object_policies.update([ + (enum_type, processed_operation_policies) + ]) + + policies.update([(name, processed_object_policies)]) + + return policies + + policies = { 'default': { enums.ObjectType.CERTIFICATE: { diff --git a/kmip/services/server/config.py b/kmip/services/server/config.py index 0997aad..4687999 100644 --- a/kmip/services/server/config.py +++ b/kmip/services/server/config.py @@ -41,7 +41,8 @@ class KmipServerConfig(object): 'certificate_path', 'key_path', 'ca_path', - 'auth_suite' + 'auth_suite', + 'policy_path' ] def set_setting(self, setting, value): @@ -75,8 +76,10 @@ class KmipServerConfig(object): self._set_key_path(value) elif setting == 'ca_path': self._set_ca_path(value) - else: + elif setting == 'auth_suite': self._set_auth_suite(value) + else: + self._set_policy_path(value) def load_settings(self, path): """ @@ -141,6 +144,8 @@ class KmipServerConfig(object): self._set_ca_path(parser.get('server', 'ca_path')) if parser.has_option('server', 'auth_suite'): self._set_auth_suite(parser.get('server', 'auth_suite')) + if parser.has_option('server', 'policy_path'): + self._set_policy_path(parser.get('server', 'policy_path')) def _set_hostname(self, value): if isinstance(value, six.string_types): @@ -224,3 +229,20 @@ class KmipServerConfig(object): ) else: self.settings['auth_suite'] = value + + def _set_policy_path(self, value): + if value is None: + self.settings['policy_path'] = None + elif isinstance(value, six.string_types): + if os.path.exists(value): + self.settings['policy_path'] = value + else: + raise exceptions.ConfigurationError( + "The policy path value, if specified, must be a valid " + "string path to a filesystem directory." + ) + else: + raise exceptions.ConfigurationError( + "The policy path, if specified, must be a valid string path " + "to a filesystem directory." + ) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 98eca1a..dc4e113 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -13,7 +13,9 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import logging +import os import six import sqlalchemy @@ -42,7 +44,7 @@ from kmip.core.messages.payloads import register from kmip.core import misc -from kmip.core.policy import policies +from kmip.core import policy as operation_policy from kmip.pie import factory from kmip.pie import objects @@ -77,9 +79,14 @@ class KmipEngine(object): * Cryptographic usage mask enforcement per object type """ - def __init__(self): + def __init__(self, policy_path=None): """ Create a KmipEngine. + + Args: + policy_path (string): The path to the filesystem directory + containing PyKMIP server operation policy JSON files. + Optional, defaults to None. """ self._logger = logging.getLogger('kmip.server.engine') @@ -118,10 +125,69 @@ class KmipEngine(object): } self._attribute_policy = policy.AttributePolicy(self._protocol_version) - self._operation_policies = policies + self._operation_policies = copy.deepcopy(operation_policy.policies) + self._load_operation_policies(policy_path) self._client_identity = None + def _load_operation_policies(self, policy_path): + if (policy_path is None) or (not os.path.isdir(policy_path)): + self._logger.warning( + "The specified operation policy directory ({0}) is not " + "valid. No user-defined policies will be loaded".format( + policy_path + ) + ) + return dict() + else: + self._logger.info( + "Loading user-defined operation policy files from: {0}".format( + policy_path + ) + ) + + for filename in os.listdir(policy_path): + file_path = os.path.join(policy_path, filename) + if os.path.isfile(file_path): + self._logger.info( + "Loading user_defined operation policies " + "from file: {0}".format(file_path) + ) + + try: + policies = operation_policy.read_policy_from_file( + file_path + ) + except ValueError as e: + self._logger.error( + "A failure occurred while loading policies." + ) + self._logger.exception(e) + continue + + reserved_policies = ['default', 'public'] + for policy_name in six.iterkeys(policies): + if policy_name in reserved_policies: + self._logger.warning( + "Loaded policy '{0}' overwrites a reserved " + "policy and will be thrown out.".format( + policy_name + ) + ) + elif policy_name in six.iterkeys( + self._operation_policies + ): + self._logger.warning( + "Loaded policy '{0}' overwrites a " + "preexisting policy and will be thrown " + "out.".format(policy_name) + ) + else: + self._operation_policies.update([( + policy_name, + policies.get(policy_name) + )]) + def _get_enum_string(self, e): return ''.join([x.capitalize() for x in e.name.split('_')]) diff --git a/kmip/services/server/server.py b/kmip/services/server/server.py index cf81bcd..32b6ea9 100644 --- a/kmip/services/server/server.py +++ b/kmip/services/server/server.py @@ -50,7 +50,9 @@ class KmipServer(object): ca_path=None, auth_suite=None, config_path='/etc/pykmip/server.conf', - log_path='/var/log/pykmip/server.log'): + log_path='/var/log/pykmip/server.log', + policy_path='/etc/pykmip/policies' + ): """ Create a KmipServer. @@ -91,6 +93,9 @@ class KmipServer(object): log_path (string): The path to the base server log file (e.g., '/var/log/pykmip/server.log'). Optional, defaults to '/var/log/pykmip/server.log'. + policy_path (string): The path to the filesystem directory + containing PyKMIP server operation policy JSON files. + Optional, defaults to '/etc/pykmip/policies'. """ self._logger = logging.getLogger('kmip.server') self._setup_logging(log_path) @@ -103,7 +108,8 @@ class KmipServer(object): certificate_path, key_path, ca_path, - auth_suite + auth_suite, + policy_path ) if self.config.settings.get('auth_suite') == 'TLS1.2': @@ -111,7 +117,9 @@ class KmipServer(object): else: self.auth_suite = auth.BasicAuthenticationSuite() - self._engine = engine.KmipEngine() + self._engine = engine.KmipEngine( + self.config.settings.get('policy_path') + ) self._session_id = 1 self._is_serving = False @@ -144,7 +152,9 @@ class KmipServer(object): certificate_path=None, key_path=None, ca_path=None, - auth_suite=None): + auth_suite=None, + policy_path=None + ): if path: self.config.load_settings(path) @@ -160,6 +170,8 @@ class KmipServer(object): self.config.set_setting('ca_path', ca_path) if auth_suite: self.config.set_setting('auth_suite', auth_suite) + if policy_path: + self.config.set_setting('policy_path', policy_path) def start(self): """ @@ -447,6 +459,18 @@ def build_argument_parser(): "A string representing a path to a log file. Defaults to None." ), ) + parser.add_option( + "-o", + "--policy_path", + action="store", + type="str", + default=None, + dest="policy_path", + help=( + "A string representing a path to the operation policy filesystem " + "directory. Optional, defaults to None." + ), + ) return parser @@ -473,6 +497,8 @@ def main(args=None): kwargs['config_path'] = opts.config_path if opts.log_path: kwargs['log_path'] = opts.log_path + if opts.policy_path: + kwargs['policy_path'] = opts.policy_path # Create and start the server. s = KmipServer(**kwargs) diff --git a/kmip/tests/unit/core/test_policy.py b/kmip/tests/unit/core/test_policy.py new file mode 100644 index 0000000..1c32c97 --- /dev/null +++ b/kmip/tests/unit/core/test_policy.py @@ -0,0 +1,130 @@ +# Copyright (c) 2016 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import shutil +import tempfile +import testtools + +from kmip.core import enums +from kmip.core import policy + + +class TestPolicy(testtools.TestCase): + + def setUp(self): + super(TestPolicy, self).setUp() + + self.temp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.temp_dir) + + def tearDown(self): + super(TestPolicy, self).tearDown() + + def test_read_policy_from_file(self): + policy_file = tempfile.NamedTemporaryFile( + dir=self.temp_dir, + delete=False + ) + with open(policy_file.name, 'w') as f: + f.write( + '{"test": {"CERTIFICATE": {"LOCATE": "ALLOW_ALL"}}}' + ) + + policies = policy.read_policy_from_file(policy_file.name) + + self.assertEqual(1, len(policies)) + self.assertIn('test', policies.keys()) + + test_policy = { + enums.ObjectType.CERTIFICATE: { + enums.Operation.LOCATE: enums.Policy.ALLOW_ALL + } + } + + self.assertEqual(test_policy, policies.get('test')) + + def test_read_policy_from_file_empty(self): + policy_file = tempfile.NamedTemporaryFile( + dir=self.temp_dir, + delete=False + ) + with open(policy_file.name, 'w') as f: + f.write('') + + args = (policy_file.name, ) + regex = "An error occurred while attempting to parse the JSON file." + self.assertRaisesRegexp( + ValueError, + regex, + policy.read_policy_from_file, + *args + ) + + def test_read_policy_from_file_bad_object_type(self): + policy_file = tempfile.NamedTemporaryFile( + dir=self.temp_dir, + delete=False + ) + with open(policy_file.name, 'w') as f: + f.write( + '{"test": {"INVALID": {"LOCATE": "ALLOW_ALL"}}}' + ) + + args = (policy_file.name, ) + regex = "'INVALID' is not a valid ObjectType value." + self.assertRaisesRegexp( + ValueError, + regex, + policy.read_policy_from_file, + *args + ) + + def test_read_policy_from_file_bad_operation(self): + policy_file = tempfile.NamedTemporaryFile( + dir=self.temp_dir, + delete=False + ) + with open(policy_file.name, 'w') as f: + f.write( + '{"test": {"CERTIFICATE": {"INVALID": "ALLOW_ALL"}}}' + ) + + args = (policy_file.name, ) + regex = "'INVALID' is not a valid Operation value." + self.assertRaisesRegexp( + ValueError, + regex, + policy.read_policy_from_file, + *args + ) + + def test_read_policy_from_file_bad_permission(self): + policy_file = tempfile.NamedTemporaryFile( + dir=self.temp_dir, + delete=False + ) + with open(policy_file.name, 'w') as f: + f.write( + '{"test": {"CERTIFICATE": {"LOCATE": "INVALID"}}}' + ) + + args = (policy_file.name, ) + regex = "'INVALID' is not a valid Policy value." + self.assertRaisesRegexp( + ValueError, + regex, + policy.read_policy_from_file, + *args + ) diff --git a/kmip/tests/unit/services/server/test_config.py b/kmip/tests/unit/services/server/test_config.py index 1081985..19bfb54 100644 --- a/kmip/tests/unit/services/server/test_config.py +++ b/kmip/tests/unit/services/server/test_config.py @@ -53,6 +53,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_hostname = mock.MagicMock() c._set_key_path = mock.MagicMock() c._set_port = mock.MagicMock() + c._set_policy_path = mock.MagicMock() # Test the right error is generated when setting an unsupported # setting. @@ -88,6 +89,9 @@ class TestKmipServerConfig(testtools.TestCase): c.set_setting('auth_suite', 'Basic') c._set_auth_suite.assert_called_once_with('Basic') + c.set_setting('policy_path', '/etc/pykmip/policies') + c._set_policy_path.assert_called_once_with('/etc/pykmip/policies') + def test_load_settings(self): """ Test that the right calls are made and the right errors generated when @@ -139,6 +143,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_hostname = mock.MagicMock() c._set_key_path = mock.MagicMock() c._set_port = mock.MagicMock() + c._set_policy_path = mock.MagicMock() # Test that the right calls are made when correctly parsing settings. parser = configparser.SafeConfigParser() @@ -149,6 +154,7 @@ class TestKmipServerConfig(testtools.TestCase): parser.set('server', 'key_path', '/test/path/server.key') parser.set('server', 'ca_path', '/test/path/ca.crt') parser.set('server', 'auth_suite', 'Basic') + parser.set('server', 'policy_path', '/test/path/policies') c._parse_settings(parser) @@ -160,6 +166,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_key_path.assert_called_once_with('/test/path/server.key') c._set_ca_path.assert_called_once_with('/test/path/ca.crt') c._set_auth_suite.assert_called_once_with('Basic') + c._set_policy_path.assert_called_once_with('/test/path/policies') # Test that a ConfigurationError is generated when the expected # section is missing. @@ -478,3 +485,48 @@ class TestKmipServerConfig(testtools.TestCase): *args ) self.assertNotEqual('invalid', c.settings.get('auth_suite')) + + def test_set_policy_path(self): + """ + Test that the policy_path configuration property can be set correctly. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + self.assertNotIn('policy_path', c.settings.keys()) + + # Test that the setting is set correctly with a valid value. + with mock.patch('os.path.exists') as os_mock: + os_mock.return_value = True + c._set_policy_path('/test/path/policies') + + self.assertIn('policy_path', c.settings.keys()) + self.assertEqual( + '/test/path/policies', + c.settings.get('policy_path') + ) + + c._set_policy_path(None) + self.assertIn('policy_path', c.settings.keys()) + self.assertEqual(None, c.settings.get('policy_path')) + + # Test that a ConfigurationError is generated when setting the wrong + # value. + c._logger.reset_mock() + args = (0, ) + self.assertRaises( + exceptions.ConfigurationError, + c._set_policy_path, + *args + ) + self.assertNotEqual(0, c.settings.get('policy_path')) + + args = ('/test/path/policies', ) + with mock.patch('os.path.exists') as os_mock: + os_mock.return_value = False + self.assertRaises( + exceptions.ConfigurationError, + c._set_policy_path, + *args + ) + self.assertNotEqual(0, c.settings.get('policy_path')) diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index c8e9154..8cb06d1 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -14,10 +14,12 @@ # under the License. import mock +import shutil import sqlalchemy from sqlalchemy.orm import exc +import tempfile import testtools import time @@ -74,6 +76,9 @@ class TestKmipEngine(testtools.TestCase): bind=self.engine ) + self.temp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.temp_dir) + def tearDown(self): super(TestKmipEngine, self).tearDown() @@ -133,6 +138,175 @@ class TestKmipEngine(testtools.TestCase): """ engine.KmipEngine() + def test_load_operation_policies(self): + """ + Test that the KmipEngine can correctly load operation policies. + """ + e = engine.KmipEngine() + e._logger = mock.MagicMock() + + policy_file = tempfile.NamedTemporaryFile( + dir=self.temp_dir + ) + with open(policy_file.name, 'w') as f: + f.write( + '{"test": {"CERTIFICATE": {"LOCATE": "ALLOW_ALL"}}}' + ) + + self.assertEqual(2, len(e._operation_policies)) + + e._load_operation_policies(self.temp_dir) + e._logger.info.assert_any_call( + "Loading user-defined operation policy files from: {0}".format( + self.temp_dir + ) + ) + e._logger.info.assert_any_call( + "Loading user_defined operation policies from file: {0}".format( + policy_file.name + ) + ) + + self.assertEqual(3, len(e._operation_policies)) + self.assertIn('test', e._operation_policies.keys()) + + test_policy = { + enums.ObjectType.CERTIFICATE: { + enums.Operation.LOCATE: enums.Policy.ALLOW_ALL + } + } + + self.assertEqual(test_policy, e._operation_policies.get('test')) + + def test_load_operation_policies_with_file_read_error(self): + """ + Test that the KmipEngine can correctly handle load errors. + """ + e = engine.KmipEngine() + e._logger = mock.MagicMock() + + policy_file = tempfile.NamedTemporaryFile( + dir=self.temp_dir + ) + with open(policy_file.name, 'w') as f: + f.write( + '{"test": {"INVALID": {"LOCATE": "ALLOW_ALL"}}}' + ) + + self.assertEqual(2, len(e._operation_policies)) + + e._load_operation_policies(self.temp_dir) + e._logger.info.assert_any_call( + "Loading user-defined operation policy files from: {0}".format( + self.temp_dir + ) + ) + e._logger.info.assert_any_call( + "Loading user_defined operation policies from file: {0}".format( + policy_file.name + ) + ) + e._logger.error.assert_called_once_with( + "A failure occurred while loading policies." + ) + e._logger.exception.assert_called_once() + + self.assertEqual(2, len(e._operation_policies)) + + def test_load_operation_policies_with_reserved(self): + """ + Test that the KmipEngine can correctly load operation policies, even + when a policy attempts to overwrite a reserved one. + """ + e = engine.KmipEngine() + e._logger = mock.MagicMock() + + policy_file = tempfile.NamedTemporaryFile( + dir=self.temp_dir + ) + with open(policy_file.name, 'w') as f: + f.write( + '{"public": {"CERTIFICATE": {"LOCATE": "ALLOW_ALL"}}}' + ) + + self.assertEqual(2, len(e._operation_policies)) + + e._load_operation_policies(self.temp_dir) + e._logger.info.assert_any_call( + "Loading user-defined operation policy files from: {0}".format( + self.temp_dir + ) + ) + e._logger.info.assert_any_call( + "Loading user_defined operation policies from file: {0}".format( + policy_file.name + ) + ) + e._logger.warning.assert_called_once_with( + "Loaded policy 'public' overwrites a reserved policy and will " + "be thrown out." + ) + + self.assertEqual(2, len(e._operation_policies)) + + def test_load_operation_policies_with_duplicate(self): + """ + Test that the KmipEngine can correctly load operation policies, even + when a policy is defined multiple times. + """ + e = engine.KmipEngine() + e._logger = mock.MagicMock() + + policy_file_a = tempfile.NamedTemporaryFile( + dir=self.temp_dir + ) + with open(policy_file_a.name, 'w') as f: + f.write( + '{"test": {"CERTIFICATE": {"LOCATE": "ALLOW_ALL"}}}' + ) + + policy_file_b = tempfile.NamedTemporaryFile( + dir=self.temp_dir + ) + with open(policy_file_b.name, 'w') as f: + f.write( + '{"test": {"CERTIFICATE": {"LOCATE": "ALLOW_ALL"}}}' + ) + + self.assertEqual(2, len(e._operation_policies)) + + e._load_operation_policies(self.temp_dir) + e._logger.info.assert_any_call( + "Loading user-defined operation policy files from: {0}".format( + self.temp_dir + ) + ) + e._logger.info.assert_any_call( + "Loading user_defined operation policies from file: {0}".format( + policy_file_a.name + ) + ) + e._logger.info.assert_any_call( + "Loading user_defined operation policies from file: {0}".format( + policy_file_b.name + ) + ) + e._logger.warning.assert_called_once_with( + "Loaded policy 'test' overwrites a preexisting policy and will " + "be thrown out." + ) + + self.assertEqual(3, len(e._operation_policies)) + self.assertIn('test', e._operation_policies.keys()) + + test_policy = { + enums.ObjectType.CERTIFICATE: { + enums.Operation.LOCATE: enums.Policy.ALLOW_ALL + } + } + + self.assertEqual(test_policy, e._operation_policies.get('test')) + def test_version_operation_match(self): """ Test that a valid response is generated when trying to invoke an diff --git a/kmip/tests/unit/services/server/test_server.py b/kmip/tests/unit/services/server/test_server.py index 5ad251f..8545c4e 100644 --- a/kmip/tests/unit/services/server/test_server.py +++ b/kmip/tests/unit/services/server/test_server.py @@ -91,13 +91,17 @@ class TestKmipServer(testtools.TestCase): self.assertTrue(s._logger.addHandler.called) s._logger.setLevel.assert_called_once_with(logging.INFO) + @mock.patch('kmip.services.server.engine.KmipEngine') @mock.patch('kmip.services.auth.TLS12AuthenticationSuite') @mock.patch('kmip.services.server.server.KmipServer._setup_logging') - def test_setup_configuration(self, logging_mock, auth_mock): + def test_setup_configuration(self, logging_mock, auth_mock, engine_mock): """ Test that the server setup configuration works without error. """ - s = server.KmipServer(config_path=None) + s = server.KmipServer( + config_path=None, + policy_path=None + ) s.config = mock.MagicMock() # Test the right calls are made when reinvoking config setup @@ -108,7 +112,8 @@ class TestKmipServer(testtools.TestCase): '/etc/pykmip/certs/server.crt', '/etc/pykmip/certs/server.key', '/etc/pykmip/certs/ca.crt', - 'Basic' + 'Basic', + '/etc/pykmip/policies' ) s.config.load_settings.assert_called_with('/etc/pykmip/server.conf') @@ -127,14 +132,23 @@ class TestKmipServer(testtools.TestCase): '/etc/pykmip/certs/ca.crt' ) s.config.set_setting.assert_any_call('auth_suite', 'Basic') + s.config.set_setting.assert_any_call( + 'policy_path', + '/etc/pykmip/policies' + ) # Test that an attempt is made to instantiate the TLS 1.2 auth suite - s = server.KmipServer(auth_suite='TLS1.2', config_path=None) + s = server.KmipServer( + auth_suite='TLS1.2', + config_path=None, + policy_path=None + ) self.assertEqual('TLS1.2', s.config.settings.get('auth_suite')) self.assertIsNotNone(s.auth_suite) + @mock.patch('kmip.services.server.engine.KmipEngine') @mock.patch('kmip.services.server.server.KmipServer._setup_logging') - def test_start(self, logging_mock): + def test_start(self, logging_mock, engine_mock): """ Test that starting the KmipServer either runs as expected or generates the expected error. @@ -145,7 +159,8 @@ class TestKmipServer(testtools.TestCase): s = server.KmipServer( hostname='127.0.0.1', port=5696, - config_path=None + config_path=None, + policy_path=None ) s._logger = mock.MagicMock() @@ -205,8 +220,9 @@ class TestKmipServer(testtools.TestCase): ) s._logger.exception.assert_called_once_with(test_exception) + @mock.patch('kmip.services.server.engine.KmipEngine') @mock.patch('kmip.services.server.server.KmipServer._setup_logging') - def test_stop(self, logging_mock): + def test_stop(self, logging_mock, engine_mock): """ Test that the right calls and log messages are triggered while cleaning up the server and any remaining sessions. @@ -214,7 +230,8 @@ class TestKmipServer(testtools.TestCase): s = server.KmipServer( hostname='127.0.0.1', port=5696, - config_path=None + config_path=None, + policy_path=None ) s._logger = mock.MagicMock() s._socket = mock.MagicMock() @@ -321,8 +338,9 @@ class TestKmipServer(testtools.TestCase): s._socket.close.assert_called_once_with() s._logger.exception(test_exception) + @mock.patch('kmip.services.server.engine.KmipEngine') @mock.patch('kmip.services.server.server.KmipServer._setup_logging') - def test_serve(self, logging_mock): + def test_serve(self, logging_mock, engine_mock): """ Test that the right calls and log messages are triggered while serving connections. @@ -330,7 +348,8 @@ class TestKmipServer(testtools.TestCase): s = server.KmipServer( hostname='127.0.0.1', port=5696, - config_path=None + config_path=None, + policy_path=None ) s._is_serving = True s._logger = mock.MagicMock() @@ -400,8 +419,9 @@ class TestKmipServer(testtools.TestCase): handler(None, None) self.assertFalse(s._is_serving) + @mock.patch('kmip.services.server.engine.KmipEngine') @mock.patch('kmip.services.server.server.KmipServer._setup_logging') - def test_setup_connection_handler(self, logging_mock): + def test_setup_connection_handler(self, logging_mock, engine_mock): """ Test that a KmipSession can be successfully created and spun off from the KmipServer. @@ -409,7 +429,8 @@ class TestKmipServer(testtools.TestCase): s = server.KmipServer( hostname='127.0.0.1', port=5696, - config_path=None + config_path=None, + policy_path=None ) s._logger = mock.MagicMock() @@ -455,8 +476,9 @@ class TestKmipServer(testtools.TestCase): self.assertEqual(3, s._session_id) + @mock.patch('kmip.services.server.engine.KmipEngine') @mock.patch('kmip.services.server.server.KmipServer._setup_logging') - def test_as_context_manager(self, logging_mock): + def test_as_context_manager(self, logging_mock, engine_mock): """ Test that the right methods are called when the KmipServer is used as a context manager. @@ -464,7 +486,8 @@ class TestKmipServer(testtools.TestCase): s = server.KmipServer( hostname='127.0.0.1', port=5696, - config_path=None + config_path=None, + policy_path=None ) s._logger = mock.MagicMock() s.start = mock.MagicMock()