229 lines
10 KiB
Python
229 lines
10 KiB
Python
# Copyright 2017 Canonical Ltd
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
|
|
from unit_tests.test_utils import CharmTestCase
|
|
from unittest.mock import patch
|
|
|
|
import ssl_utils
|
|
|
|
TO_PATCH = [
|
|
'config',
|
|
]
|
|
|
|
TEST_CA = """-----BEGIN CERTIFICATE-----
|
|
MIIDbTCCAlWgAwIBAgIURtdGGKKjckiLPLue8Wn/sCS5u+QwDQYJKoZIhvcNAQEL
|
|
BQAwPTE7MDkGA1UEAxMyVmF1bHQgUm9vdCBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkg
|
|
KGNoYXJtLXBraS1sb2NhbCkwIBgPMDAwMTAxMDEwMDAwMDBaFw0xODExMjQxMzQx
|
|
MjdaMD0xOzA5BgNVBAMTMlZhdWx0IFJvb3QgQ2VydGlmaWNhdGUgQXV0aG9yaXR5
|
|
IChjaGFybS1wa2ktbG9jYWwpMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC
|
|
AQEAwUEg8XFO2GzI19aNAfH8KeBsLvpYTX4nNREEGLMkl7qfqO+rcwNmN/60UxSu
|
|
Hbsqfjv6B6kWD6dd1/OvveYjxqPA97OqO5LOUE43ojzUkxai5GeF5fvu3QGIR7iZ
|
|
a9PEDFjFKeCdwyKLoIHNdXw1TM0sQmWM7sSiMhCfrpeZEe+En+KZQugo+BiLrhKA
|
|
yZTIkEP5+6r/Nrxfkx2/Kklrq8LOyLfH91LbmJEVEKQNloCYphZYwB7n9GPvKlGv
|
|
pvPuJc7wEkmtCMp0dNjo3MZ0ij1SIN6Ntx8DqhPJ8QKvNDogVmeEGpQFBcrzfkol
|
|
LMXPBpX2Qx6dPqLGHCbWQDnvewIDAQABo2MwYTAOBgNVHQ8BAf8EBAMCAQYwDwYD
|
|
VR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUc1rh2BEHSQJ0qxhPTDQKRJg2AGEwHwYD
|
|
VR0jBBgwFoAUc1rh2BEHSQJ0qxhPTDQKRJg2AGEwDQYJKoZIhvcNAQELBQADggEB
|
|
ABZvreticW5UuoQS7NAVICCvh5FwgrkC5tnHX3p8TOhMIpJTgrKhedJZKzLc254g
|
|
/jAsb7q775IcMOhS2vFJSQd6rV0cMNCdFjk0sTTe01OXoJj2fN3MMbEEGfs6crwk
|
|
TKiXEJ9XYc04Ul4b8XJ0d5hYejr5IF9leJ2JJMiGTJFGU1Oi8Lctj7qyX0nlo+x5
|
|
Xhj8BbsJsbUGoA+bXvCOO88voyOZoRGCg1JFztbpgIAV6k64DJ7xp9tNDhZJj0Uo
|
|
2MDrWbfUYFWMiD5L0d5MjeX7aGIPhJsMund1zFHr1ho64OdCJ1zDmtk4UYzZ0deE
|
|
5nLA3FXh+snaEpmpl7X9Xus=
|
|
-----END CERTIFICATE-----"""
|
|
|
|
B64_TEST_CA = """LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURiVENDQWxXZ0F3SUJBZ0lVUnRkR0dLS2pja2lMUEx1ZThXbi9zQ1M1dStRd0RRWUpLb1pJaHZjTkFRRUwKQlFBd1BURTdNRGtHQTFVRUF4TXlWbUYxYkhRZ1VtOXZkQ0JEWlhKMGFXWnBZMkYwWlNCQmRYUm9iM0pwZEhrZwpLR05vWVhKdExYQnJhUzFzYjJOaGJDa3dJQmdQTURBd01UQXhNREV3TURBd01EQmFGdzB4T0RFeE1qUXhNelF4Ck1qZGFNRDB4T3pBNUJnTlZCQU1UTWxaaGRXeDBJRkp2YjNRZ1EyVnlkR2xtYVdOaGRHVWdRWFYwYUc5eWFYUjUKSUNoamFHRnliUzF3YTJrdGJHOWpZV3dwTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQwpBUUVBd1VFZzhYRk8yR3pJMTlhTkFmSDhLZUJzTHZwWVRYNG5OUkVFR0xNa2w3cWZxTytyY3dObU4vNjBVeFN1Ckhic3FmanY2QjZrV0Q2ZGQxL092dmVZanhxUEE5N09xTzVMT1VFNDNvanpVa3hhaTVHZUY1ZnZ1M1FHSVI3aVoKYTlQRURGakZLZUNkd3lLTG9JSE5kWHcxVE0wc1FtV003c1NpTWhDZnJwZVpFZStFbitLWlF1Z28rQmlMcmhLQQp5WlRJa0VQNSs2ci9Ocnhma3gyL0trbHJxOExPeUxmSDkxTGJtSkVWRUtRTmxvQ1lwaFpZd0I3bjlHUHZLbEd2CnB2UHVKYzd3RWttdENNcDBkTmpvM01aMGlqMVNJTjZOdHg4RHFoUEo4UUt2TkRvZ1ZtZUVHcFFGQmNyemZrb2wKTE1YUEJwWDJReDZkUHFMR0hDYldRRG52ZXdJREFRQUJvMk13WVRBT0JnTlZIUThCQWY4RUJBTUNBUVl3RHdZRApWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVWMxcmgyQkVIU1FKMHF4aFBURFFLUkpnMkFHRXdId1lEClZSMGpCQmd3Rm9BVWMxcmgyQkVIU1FKMHF4aFBURFFLUkpnMkFHRXdEUVlKS29aSWh2Y05BUUVMQlFBRGdnRUIKQUJadnJldGljVzVVdW9RUzdOQVZJQ0N2aDVGd2dya0M1dG5IWDNwOFRPaE1JcEpUZ3JLaGVkSlpLekxjMjU0ZwovakFzYjdxNzc1SWNNT2hTMnZGSlNRZDZyVjBjTU5DZEZqazBzVFRlMDFPWG9KajJmTjNNTWJFRUdmczZjcndrClRLaVhFSjlYWWMwNFVsNGI4WEowZDVoWWVqcjVJRjlsZUoySkpNaUdUSkZHVTFPaThMY3RqN3F5WDBubG8reDUKWGhqOEJic0pzYlVHb0ErYlh2Q09PODh2b3lPWm9SR0NnMUpGenRicGdJQVY2azY0REo3eHA5dE5EaFpKajBVbwoyTURyV2JmVVlGV01pRDVMMGQ1TWplWDdhR0lQaEpzTXVuZDF6RkhyMWhvNjRPZENKMXpEbXRrNFVZelowZGVFCjVuTEEzRlhoK3NuYUVwbXBsN1g5WHVzPQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0t""" # noqa: E501
|
|
|
|
|
|
class TestSSLUtils(CharmTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestSSLUtils, self).setUp(ssl_utils, TO_PATCH)
|
|
|
|
@patch('ssl_utils.get_hostname')
|
|
@patch('ssl_utils.get_relation_ip')
|
|
def test_get_unit_amqp_endpoint_data(self, get_relation_ip, get_hostname):
|
|
self.config.return_value = '10.0.0.0/24'
|
|
get_relation_ip.return_value = '10.0.0.10'
|
|
get_hostname.return_value = 'myhost'
|
|
self.assertEqual(
|
|
ssl_utils.get_unit_amqp_endpoint_data(),
|
|
('10.0.0.10', 'myhost'))
|
|
get_relation_ip.assert_called_once_with(
|
|
'amqp',
|
|
cidr_network='10.0.0.0/24')
|
|
get_hostname.assert_called_once_with('10.0.0.10')
|
|
|
|
@patch('ssl_utils.ch_cert_utils.get_bundle_for_cn')
|
|
@patch('ssl_utils.get_unit_amqp_endpoint_data')
|
|
def test_get_relation_cert_data(self, get_unit_amqp_endpoint_data,
|
|
get_bundle_for_cn):
|
|
get_unit_amqp_endpoint_data.return_value = ('10.0.0.10',
|
|
'juju-345.lcd')
|
|
get_bundle_for_cn.return_value = {
|
|
'ca': 'vaultca',
|
|
'cert': 'vaultcert',
|
|
'key': 'vaultkey'}
|
|
self.assertEqual(
|
|
ssl_utils.get_relation_cert_data(),
|
|
{'ca': 'vaultca', 'cert': 'vaultcert', 'key': 'vaultkey'})
|
|
get_bundle_for_cn.assert_called_once_with('juju-345.lcd')
|
|
|
|
@patch('ssl_utils.get_relation_cert_data')
|
|
def test_get_ssl_mode_off(self, get_relation_cert_data):
|
|
get_relation_cert_data.return_value = {}
|
|
test_config = {
|
|
'ssl': 'off',
|
|
'ssl_enabled': False,
|
|
'ssl_on': False,
|
|
'ssl_key': None,
|
|
'ssl_cert': None}
|
|
self.config.side_effect = lambda x: test_config[x]
|
|
self.assertEqual(
|
|
ssl_utils.get_ssl_mode(),
|
|
('off', False))
|
|
|
|
@patch('ssl_utils.get_relation_cert_data')
|
|
def test_get_ssl_enabled_true(self, get_relation_cert_data):
|
|
get_relation_cert_data.return_value = {}
|
|
test_config = {
|
|
'ssl': 'off',
|
|
'ssl_enabled': True,
|
|
'ssl_on': False,
|
|
'ssl_key': None,
|
|
'ssl_cert': None}
|
|
self.config.side_effect = lambda x: test_config[x]
|
|
self.assertEqual(
|
|
ssl_utils.get_ssl_mode(),
|
|
('on', False))
|
|
|
|
@patch('ssl_utils.get_relation_cert_data')
|
|
def test_get_ssl_enabled_false(self, get_relation_cert_data):
|
|
get_relation_cert_data.return_value = {}
|
|
test_config = {
|
|
'ssl': 'on',
|
|
'ssl_enabled': False,
|
|
'ssl_on': False,
|
|
'ssl_key': None,
|
|
'ssl_cert': None}
|
|
self.config.side_effect = lambda x: test_config[x]
|
|
self.assertEqual(
|
|
ssl_utils.get_ssl_mode(),
|
|
('on', False))
|
|
|
|
@patch('ssl_utils.get_relation_cert_data')
|
|
def test_get_ssl_enabled_external_ca(self, get_relation_cert_data):
|
|
get_relation_cert_data.return_value = {}
|
|
test_config = {
|
|
'ssl': 'on',
|
|
'ssl_enabled': False,
|
|
'ssl_on': False,
|
|
'ssl_key': 'key1',
|
|
'ssl_cert': 'cert1'}
|
|
self.config.side_effect = lambda x: test_config[x]
|
|
self.assertEqual(
|
|
ssl_utils.get_ssl_mode(),
|
|
('on', True))
|
|
|
|
@patch('ssl_utils.get_relation_cert_data')
|
|
def test_get_ssl_enabled_relation_certs(self, get_relation_cert_data):
|
|
get_relation_cert_data.return_value = {
|
|
'cert': 'vaultcert',
|
|
'key': 'vaultkey',
|
|
'ca': 'vaultca'}
|
|
self.assertEqual(
|
|
ssl_utils.get_ssl_mode(),
|
|
('certs-relation', True))
|
|
|
|
@patch('ssl_utils.get_relation_cert_data')
|
|
@patch('ssl_utils.get_ssl_mode')
|
|
def test_get_ssl_mode_ssl_off(self, get_ssl_mode, get_relation_cert_data):
|
|
get_ssl_mode.return_value = ('off', False)
|
|
get_relation_cert_data.return_value = {}
|
|
relation_data = {}
|
|
ssl_utils.configure_client_ssl(relation_data)
|
|
self.assertEqual(relation_data, {})
|
|
|
|
@patch('ssl_utils.ServiceCA')
|
|
@patch('ssl_utils.get_ssl_mode')
|
|
def test_get_ssl_mode_ssl_on_no_ca(self, get_ssl_mode, ServiceCA):
|
|
ServiceCA.get_ca().get_ca_bundle.return_value = 'cert1'
|
|
get_ssl_mode.return_value = ('on', False)
|
|
test_config = {
|
|
'ssl_port': '9090'}
|
|
self.config.side_effect = lambda x: test_config[x]
|
|
relation_data = {}
|
|
ssl_utils.configure_client_ssl(relation_data)
|
|
self.assertEqual(
|
|
relation_data,
|
|
{'ssl_port': '9090', 'ssl_ca': 'Y2VydDE='})
|
|
|
|
@patch('ssl_utils.get_ssl_mode')
|
|
def test_get_ssl_mode_ssl_on_ext_ca(self, get_ssl_mode):
|
|
get_ssl_mode.return_value = ('on', True)
|
|
test_config = {
|
|
'ssl_port': '9090',
|
|
'ssl_ca': TEST_CA}
|
|
self.config.side_effect = lambda x: test_config[x]
|
|
relation_data = {}
|
|
ssl_utils.configure_client_ssl(relation_data)
|
|
self.maxDiff = None
|
|
self.assertEqual(
|
|
relation_data,
|
|
{'ssl_port': '9090', 'ssl_ca': B64_TEST_CA})
|
|
|
|
@patch('ssl_utils.get_ssl_mode')
|
|
def test_get_ssl_mode_ssl_on_ext_ca_b64(self, get_ssl_mode):
|
|
get_ssl_mode.return_value = ('on', True)
|
|
test_config = {
|
|
'ssl_port': '9090',
|
|
'ssl_ca': 'ZXh0X2Nh'}
|
|
self.config.side_effect = lambda x: test_config[x]
|
|
relation_data = {}
|
|
ssl_utils.configure_client_ssl(relation_data)
|
|
self.assertEqual(
|
|
relation_data,
|
|
{'ssl_port': '9090', 'ssl_ca': 'ZXh0X2Nh'})
|
|
|
|
@patch('ssl_utils.local_unit')
|
|
@patch('ssl_utils.relation_ids')
|
|
@patch('ssl_utils.relation_get')
|
|
@patch('ssl_utils.configure_client_ssl')
|
|
@patch('ssl_utils.relation_set')
|
|
def test_reconfigure_client_ssl_no_ssl(self, relation_set,
|
|
configure_client_ssl, relation_get,
|
|
relation_ids, local_unit):
|
|
relation_ids.return_value = ['rel1']
|
|
relation_get.return_value = {'ssl_key': 'aa'}
|
|
ssl_utils.reconfigure_client_ssl(ssl_enabled=False)
|
|
relation_set.assert_called_with(
|
|
relation_id='rel1',
|
|
ssl_ca='',
|
|
ssl_cert='',
|
|
ssl_key='',
|
|
ssl_port='')
|
|
|
|
@patch('ssl_utils.local_unit')
|
|
@patch('ssl_utils.relation_ids')
|
|
@patch('ssl_utils.relation_get')
|
|
@patch('ssl_utils.configure_client_ssl')
|
|
@patch('ssl_utils.relation_set')
|
|
def test_reconfigure_client_ssl(self, relation_set, configure_client_ssl,
|
|
relation_get, relation_ids, local_unit):
|
|
relation_ids.return_value = ['rel1']
|
|
relation_get.return_value = {}
|
|
ssl_utils.reconfigure_client_ssl(ssl_enabled=True)
|
|
configure_client_ssl.assert_called_with({})
|