This patch increases the test coverage of the following middlewares: - list_endpoints - crypto - crossdomain Change-Id: I3dec85f61da07bd110bf42220d5ba46e11833a90
		
			
				
	
	
		
			329 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			329 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
#  Copyright (c) 2018 OpenStack Foundation
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 | 
						|
# implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
import logging
 | 
						|
import mock
 | 
						|
import os
 | 
						|
import unittest
 | 
						|
from tempfile import mkdtemp
 | 
						|
from textwrap import dedent
 | 
						|
from shutil import rmtree
 | 
						|
import sys
 | 
						|
sys.modules['kmip'] = mock.Mock()
 | 
						|
sys.modules['kmip.pie'] = mock.Mock()
 | 
						|
sys.modules['kmip.pie.client'] = mock.Mock()
 | 
						|
 | 
						|
from swift.common.middleware.crypto import kmip_keymaster
 | 
						|
 | 
						|
 | 
						|
KMIP_CLIENT_CLASS = \
 | 
						|
    'swift.common.middleware.crypto.kmip_keymaster.ProxyKmipClient'
 | 
						|
 | 
						|
 | 
						|
class MockProxyKmipClient(object):
 | 
						|
    def __init__(self, secrets, calls, kwargs):
 | 
						|
        calls.append(('__init__', kwargs))
 | 
						|
        self.secrets = secrets
 | 
						|
        self.calls = calls
 | 
						|
 | 
						|
    def get(self, uid):
 | 
						|
        self.calls.append(('get', uid))
 | 
						|
        return self.secrets[uid]
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, exc_type, exc_value, traceback):
 | 
						|
        pass
 | 
						|
 | 
						|
 | 
						|
def create_secret(algorithm_name, length, value):
 | 
						|
    algorithm = mock.MagicMock()
 | 
						|
    algorithm.name = algorithm_name
 | 
						|
    secret = mock.MagicMock(cryptographic_algorithm=algorithm,
 | 
						|
                            cryptographic_length=length,
 | 
						|
                            value=value)
 | 
						|
    return secret
 | 
						|
 | 
						|
 | 
						|
def create_mock_client(secrets, calls):
 | 
						|
    def mock_client(*args, **kwargs):
 | 
						|
        if args:
 | 
						|
            raise Exception('unexpected args provided: %r' % (args,))
 | 
						|
        return MockProxyKmipClient(secrets, calls, kwargs)
 | 
						|
    return mock_client
 | 
						|
 | 
						|
 | 
						|
class InMemoryHandler(logging.Handler):
 | 
						|
    def __init__(self):
 | 
						|
        self.messages = []
 | 
						|
        super(InMemoryHandler, self).__init__()
 | 
						|
 | 
						|
    def handle(self, record):
 | 
						|
        self.messages.append(record.msg)
 | 
						|
 | 
						|
 | 
						|
class TestKmipKeymaster(unittest.TestCase):
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.tempdir = mkdtemp()
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        rmtree(self.tempdir)
 | 
						|
 | 
						|
    def test_config_in_filter_section(self):
 | 
						|
        conf = {'__file__': '/etc/swift/proxy-server.conf',
 | 
						|
                '__name__': 'kmip_keymaster',
 | 
						|
                'key_id': '1234'}
 | 
						|
        secrets = {'1234': create_secret('AES', 256, b'x' * 32)}
 | 
						|
        calls = []
 | 
						|
        with mock.patch(KMIP_CLIENT_CLASS, create_mock_client(secrets, calls)):
 | 
						|
            km = kmip_keymaster.filter_factory(conf)(None)
 | 
						|
 | 
						|
        self.assertEqual({None: b'x' * 32}, km._root_secrets)
 | 
						|
        self.assertEqual(None, km.active_secret_id)
 | 
						|
        self.assertIsNone(km.keymaster_config_path)
 | 
						|
        self.assertEqual(calls, [
 | 
						|
            ('__init__', {'config_file': '/etc/swift/proxy-server.conf',
 | 
						|
                          'config': 'filter:kmip_keymaster'}),
 | 
						|
            ('get', '1234'),
 | 
						|
        ])
 | 
						|
 | 
						|
    def test_multikey_config_in_filter_section(self):
 | 
						|
        conf = {'__file__': '/etc/swift/proxy-server.conf',
 | 
						|
                '__name__': 'kmip-keymaster',
 | 
						|
                'key_id': '1234',
 | 
						|
                'key_id_xyzzy': 'foobar',
 | 
						|
                'key_id_alt_secret_id': 'foobar',
 | 
						|
                'active_root_secret_id': 'xyzzy'}
 | 
						|
        secrets = {'1234': create_secret('AES', 256, b'x' * 32),
 | 
						|
                   'foobar': create_secret('AES', 256, b'y' * 32)}
 | 
						|
        calls = []
 | 
						|
        with mock.patch(KMIP_CLIENT_CLASS, create_mock_client(secrets, calls)):
 | 
						|
            km = kmip_keymaster.KmipKeyMaster(None, conf)
 | 
						|
 | 
						|
        self.assertEqual({None: b'x' * 32, 'xyzzy': b'y' * 32,
 | 
						|
                          'alt_secret_id': b'y' * 32},
 | 
						|
                         km._root_secrets)
 | 
						|
        self.assertEqual('xyzzy', km.active_secret_id)
 | 
						|
        self.assertIsNone(km.keymaster_config_path)
 | 
						|
        self.assertEqual(calls, [
 | 
						|
            ('__init__', {'config_file': '/etc/swift/proxy-server.conf',
 | 
						|
                          'config': 'filter:kmip-keymaster'}),
 | 
						|
            ('get', '1234'),
 | 
						|
            ('get', 'foobar'),
 | 
						|
        ])
 | 
						|
 | 
						|
    def test_bad_active_key(self):
 | 
						|
        conf = {'__file__': '/etc/swift/proxy-server.conf',
 | 
						|
                '__name__': 'kmip_keymaster',
 | 
						|
                'key_id': '1234',
 | 
						|
                'key_id_xyzzy': 'foobar',
 | 
						|
                'active_root_secret_id': 'unknown'}
 | 
						|
        secrets = {'1234': create_secret('AES', 256, b'x' * 32),
 | 
						|
                   'foobar': create_secret('AES', 256, b'y' * 32)}
 | 
						|
        calls = []
 | 
						|
        with mock.patch(KMIP_CLIENT_CLASS,
 | 
						|
                        create_mock_client(secrets, calls)), \
 | 
						|
                self.assertRaises(ValueError) as raised:
 | 
						|
            kmip_keymaster.KmipKeyMaster(None, conf)
 | 
						|
        self.assertEqual('No secret loaded for active_root_secret_id unknown',
 | 
						|
                         str(raised.exception))
 | 
						|
 | 
						|
    def test_config_in_separate_file(self):
 | 
						|
        km_conf = """
 | 
						|
        [kmip_keymaster]
 | 
						|
        key_id = 4321
 | 
						|
        """
 | 
						|
        km_config_file = os.path.join(self.tempdir, 'km.conf')
 | 
						|
        with open(km_config_file, 'wt') as fd:
 | 
						|
            fd.write(dedent(km_conf))
 | 
						|
 | 
						|
        conf = {'__file__': '/etc/swift/proxy-server.conf',
 | 
						|
                '__name__': 'keymaster-kmip',
 | 
						|
                'keymaster_config_path': km_config_file}
 | 
						|
        secrets = {'4321': create_secret('AES', 256, b'x' * 32)}
 | 
						|
        calls = []
 | 
						|
        with mock.patch(KMIP_CLIENT_CLASS, create_mock_client(secrets, calls)):
 | 
						|
            km = kmip_keymaster.KmipKeyMaster(None, conf)
 | 
						|
        self.assertEqual({None: b'x' * 32}, km._root_secrets)
 | 
						|
        self.assertEqual(None, km.active_secret_id)
 | 
						|
        self.assertEqual(km_config_file, km.keymaster_config_path)
 | 
						|
        self.assertEqual(calls, [
 | 
						|
            ('__init__', {'config_file': km_config_file,
 | 
						|
                          'config': 'kmip_keymaster'}),
 | 
						|
            ('get', '4321')])
 | 
						|
 | 
						|
    def test_multikey_config_in_separate_file(self):
 | 
						|
        km_conf = """
 | 
						|
        [kmip_keymaster]
 | 
						|
        key_id = 4321
 | 
						|
        key_id_secret_id = another id
 | 
						|
        active_root_secret_id = secret_id
 | 
						|
        """
 | 
						|
        km_config_file = os.path.join(self.tempdir, 'km.conf')
 | 
						|
        with open(km_config_file, 'wt') as fd:
 | 
						|
            fd.write(dedent(km_conf))
 | 
						|
 | 
						|
        conf = {'__file__': '/etc/swift/proxy-server.conf',
 | 
						|
                '__name__': 'kmip_keymaster',
 | 
						|
                'keymaster_config_path': km_config_file}
 | 
						|
        secrets = {'4321': create_secret('AES', 256, b'x' * 32),
 | 
						|
                   'another id': create_secret('AES', 256, b'y' * 32)}
 | 
						|
        calls = []
 | 
						|
        with mock.patch(KMIP_CLIENT_CLASS, create_mock_client(secrets, calls)):
 | 
						|
            km = kmip_keymaster.KmipKeyMaster(None, conf)
 | 
						|
        self.assertEqual({None: b'x' * 32, 'secret_id': b'y' * 32},
 | 
						|
                         km._root_secrets)
 | 
						|
        self.assertEqual('secret_id', km.active_secret_id)
 | 
						|
        self.assertEqual(km_config_file, km.keymaster_config_path)
 | 
						|
        self.assertEqual(calls, [
 | 
						|
            ('__init__', {'config_file': km_config_file,
 | 
						|
                          'config': 'kmip_keymaster'}),
 | 
						|
            ('get', '4321'),
 | 
						|
            ('get', 'another id')])
 | 
						|
 | 
						|
    def test_proxy_server_conf_dir(self):
 | 
						|
        proxy_server_conf_dir = os.path.join(self.tempdir, 'proxy_server.d')
 | 
						|
        os.mkdir(proxy_server_conf_dir)
 | 
						|
 | 
						|
        # KmipClient can't read conf from a dir, so check that is caught early
 | 
						|
        conf = {'__file__': proxy_server_conf_dir,
 | 
						|
                '__name__': 'kmip_keymaster',
 | 
						|
                'key_id': '789'}
 | 
						|
        with self.assertRaises(ValueError) as cm:
 | 
						|
            kmip_keymaster.KmipKeyMaster(None, conf)
 | 
						|
        self.assertIn('config cannot be read from conf dir', str(cm.exception))
 | 
						|
 | 
						|
        # ...but a conf file in a conf dir could point back to itself for the
 | 
						|
        # KmipClient config
 | 
						|
        km_config_file = os.path.join(proxy_server_conf_dir, '40.conf')
 | 
						|
        km_conf = """
 | 
						|
        [filter:kmip_keymaster]
 | 
						|
        keymaster_config_file = %s
 | 
						|
 | 
						|
        [kmip_keymaster]
 | 
						|
        key_id = 789
 | 
						|
        """ % km_config_file
 | 
						|
 | 
						|
        with open(km_config_file, 'wt') as fd:
 | 
						|
            fd.write(dedent(km_conf))
 | 
						|
 | 
						|
        conf = {'__file__': proxy_server_conf_dir,
 | 
						|
                '__name__': 'kmip_keymaster',
 | 
						|
                'keymaster_config_path': km_config_file}
 | 
						|
        secrets = {'789': create_secret('AES', 256, b'x' * 32)}
 | 
						|
        calls = []
 | 
						|
        with mock.patch(KMIP_CLIENT_CLASS, create_mock_client(secrets, calls)):
 | 
						|
            km = kmip_keymaster.KmipKeyMaster(None, conf)
 | 
						|
        self.assertEqual({None: b'x' * 32}, km._root_secrets)
 | 
						|
        self.assertEqual(None, km.active_secret_id)
 | 
						|
        self.assertEqual(km_config_file, km.keymaster_config_path)
 | 
						|
        self.assertEqual(calls, [
 | 
						|
            ('__init__', {'config_file': km_config_file,
 | 
						|
                          # NB: no "filter:"
 | 
						|
                          'config': 'kmip_keymaster'}),
 | 
						|
            ('get', '789')])
 | 
						|
 | 
						|
    def test_bad_key_length(self):
 | 
						|
        conf = {'__file__': '/etc/swift/proxy-server.conf',
 | 
						|
                '__name__': 'kmip_keymaster',
 | 
						|
                'key_id': '1234'}
 | 
						|
        secrets = {'1234': create_secret('AES', 128, b'x' * 16)}
 | 
						|
        calls = []
 | 
						|
        with mock.patch(KMIP_CLIENT_CLASS,
 | 
						|
                        create_mock_client(secrets, calls)), \
 | 
						|
                self.assertRaises(ValueError) as cm:
 | 
						|
            kmip_keymaster.KmipKeyMaster(None, conf)
 | 
						|
        self.assertIn('Expected key 1234 to be an AES-256 key',
 | 
						|
                      str(cm.exception))
 | 
						|
        self.assertEqual(calls, [
 | 
						|
            ('__init__', {'config_file': '/etc/swift/proxy-server.conf',
 | 
						|
                          'config': 'filter:kmip_keymaster'}),
 | 
						|
            ('get', '1234')])
 | 
						|
 | 
						|
    def test_bad_key_algorithm(self):
 | 
						|
        conf = {'__file__': '/etc/swift/proxy-server.conf',
 | 
						|
                '__name__': 'kmip_keymaster',
 | 
						|
                'key_id': '1234'}
 | 
						|
        secrets = {'1234': create_secret('notAES', 256, b'x' * 32)}
 | 
						|
        calls = []
 | 
						|
        with mock.patch(KMIP_CLIENT_CLASS,
 | 
						|
                        create_mock_client(secrets, calls)), \
 | 
						|
                self.assertRaises(ValueError) as cm:
 | 
						|
            kmip_keymaster.KmipKeyMaster(None, conf)
 | 
						|
        self.assertIn('Expected key 1234 to be an AES-256 key',
 | 
						|
                      str(cm.exception))
 | 
						|
        self.assertEqual(calls, [
 | 
						|
            ('__init__', {'config_file': '/etc/swift/proxy-server.conf',
 | 
						|
                          'config': 'filter:kmip_keymaster'}),
 | 
						|
            ('get', '1234')])
 | 
						|
 | 
						|
    def test_missing_key_id(self):
 | 
						|
        conf = {'__file__': '/etc/swift/proxy-server.conf',
 | 
						|
                '__name__': 'kmip_keymaster'}
 | 
						|
        secrets = {}
 | 
						|
        calls = []
 | 
						|
        with mock.patch(KMIP_CLIENT_CLASS,
 | 
						|
                        create_mock_client(secrets, calls)), \
 | 
						|
                self.assertRaises(ValueError) as cm:
 | 
						|
            kmip_keymaster.KmipKeyMaster(None, conf)
 | 
						|
        self.assertEqual('No secret loaded for active_root_secret_id None',
 | 
						|
                         str(cm.exception))
 | 
						|
        # We make the client, but never use it
 | 
						|
        self.assertEqual(calls, [
 | 
						|
            ('__init__', {'config_file': '/etc/swift/proxy-server.conf',
 | 
						|
                          'config': 'filter:kmip_keymaster'})])
 | 
						|
 | 
						|
    def test_logger_manipulations(self):
 | 
						|
        root_logger = logging.getLogger()
 | 
						|
        old_level = root_logger.getEffectiveLevel()
 | 
						|
        handler = InMemoryHandler()
 | 
						|
        try:
 | 
						|
            root_logger.setLevel(logging.DEBUG)
 | 
						|
            root_logger.addHandler(handler)
 | 
						|
 | 
						|
            conf = {'__file__': '/etc/swift/proxy-server.conf',
 | 
						|
                    '__name__': 'kmip_keymaster'}
 | 
						|
            secrets = {}
 | 
						|
            calls = []
 | 
						|
            with mock.patch(KMIP_CLIENT_CLASS,
 | 
						|
                            create_mock_client(secrets, calls)), \
 | 
						|
                    self.assertRaises(ValueError):
 | 
						|
                # missing key_id, as above, but that's not the interesting bit
 | 
						|
                kmip_keymaster.KmipKeyMaster(None, conf)
 | 
						|
 | 
						|
            self.assertEqual(handler.messages, [])
 | 
						|
 | 
						|
            logger = logging.getLogger('kmip.services.server.kmip_protocol')
 | 
						|
            logger.debug('Something secret!')
 | 
						|
            logger.info('Something useful')
 | 
						|
            self.assertNotIn('Something secret!', handler.messages)
 | 
						|
            self.assertIn('Something useful', handler.messages)
 | 
						|
 | 
						|
            logger = logging.getLogger('kmip.core.config_helper')
 | 
						|
            logger.debug('Also secret')
 | 
						|
            logger.warning('Also useful')
 | 
						|
            self.assertNotIn('Also secret', handler.messages)
 | 
						|
            self.assertIn('Also useful', handler.messages)
 | 
						|
 | 
						|
            logger = logging.getLogger('kmip')
 | 
						|
            logger.debug('Boring, but not secret')
 | 
						|
            self.assertIn('Boring, but not secret', handler.messages)
 | 
						|
        finally:
 | 
						|
            root_logger.setLevel(old_level)
 | 
						|
            root_logger.removeHandler(handler)
 |