# Copyright 2017 Canonical Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from unit_tests.test_utils import CharmTestCase from unittest.mock import patch import ssl_utils TO_PATCH = [ 'config', ] TEST_CA = """-----BEGIN CERTIFICATE----- MIIDbTCCAlWgAwIBAgIURtdGGKKjckiLPLue8Wn/sCS5u+QwDQYJKoZIhvcNAQEL BQAwPTE7MDkGA1UEAxMyVmF1bHQgUm9vdCBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkg KGNoYXJtLXBraS1sb2NhbCkwIBgPMDAwMTAxMDEwMDAwMDBaFw0xODExMjQxMzQx MjdaMD0xOzA5BgNVBAMTMlZhdWx0IFJvb3QgQ2VydGlmaWNhdGUgQXV0aG9yaXR5 IChjaGFybS1wa2ktbG9jYWwpMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC AQEAwUEg8XFO2GzI19aNAfH8KeBsLvpYTX4nNREEGLMkl7qfqO+rcwNmN/60UxSu Hbsqfjv6B6kWD6dd1/OvveYjxqPA97OqO5LOUE43ojzUkxai5GeF5fvu3QGIR7iZ a9PEDFjFKeCdwyKLoIHNdXw1TM0sQmWM7sSiMhCfrpeZEe+En+KZQugo+BiLrhKA yZTIkEP5+6r/Nrxfkx2/Kklrq8LOyLfH91LbmJEVEKQNloCYphZYwB7n9GPvKlGv pvPuJc7wEkmtCMp0dNjo3MZ0ij1SIN6Ntx8DqhPJ8QKvNDogVmeEGpQFBcrzfkol LMXPBpX2Qx6dPqLGHCbWQDnvewIDAQABo2MwYTAOBgNVHQ8BAf8EBAMCAQYwDwYD VR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUc1rh2BEHSQJ0qxhPTDQKRJg2AGEwHwYD VR0jBBgwFoAUc1rh2BEHSQJ0qxhPTDQKRJg2AGEwDQYJKoZIhvcNAQELBQADggEB ABZvreticW5UuoQS7NAVICCvh5FwgrkC5tnHX3p8TOhMIpJTgrKhedJZKzLc254g /jAsb7q775IcMOhS2vFJSQd6rV0cMNCdFjk0sTTe01OXoJj2fN3MMbEEGfs6crwk TKiXEJ9XYc04Ul4b8XJ0d5hYejr5IF9leJ2JJMiGTJFGU1Oi8Lctj7qyX0nlo+x5 Xhj8BbsJsbUGoA+bXvCOO88voyOZoRGCg1JFztbpgIAV6k64DJ7xp9tNDhZJj0Uo 2MDrWbfUYFWMiD5L0d5MjeX7aGIPhJsMund1zFHr1ho64OdCJ1zDmtk4UYzZ0deE 5nLA3FXh+snaEpmpl7X9Xus= -----END CERTIFICATE-----""" B64_TEST_CA = """LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURiVENDQWxXZ0F3SUJBZ0lVUnRkR0dLS2pja2lMUEx1ZThXbi9zQ1M1dStRd0RRWUpLb1pJaHZjTkFRRUwKQlFBd1BURTdNRGtHQTFVRUF4TXlWbUYxYkhRZ1VtOXZkQ0JEWlhKMGFXWnBZMkYwWlNCQmRYUm9iM0pwZEhrZwpLR05vWVhKdExYQnJhUzFzYjJOaGJDa3dJQmdQTURBd01UQXhNREV3TURBd01EQmFGdzB4T0RFeE1qUXhNelF4Ck1qZGFNRDB4T3pBNUJnTlZCQU1UTWxaaGRXeDBJRkp2YjNRZ1EyVnlkR2xtYVdOaGRHVWdRWFYwYUc5eWFYUjUKSUNoamFHRnliUzF3YTJrdGJHOWpZV3dwTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQwpBUUVBd1VFZzhYRk8yR3pJMTlhTkFmSDhLZUJzTHZwWVRYNG5OUkVFR0xNa2w3cWZxTytyY3dObU4vNjBVeFN1Ckhic3FmanY2QjZrV0Q2ZGQxL092dmVZanhxUEE5N09xTzVMT1VFNDNvanpVa3hhaTVHZUY1ZnZ1M1FHSVI3aVoKYTlQRURGakZLZUNkd3lLTG9JSE5kWHcxVE0wc1FtV003c1NpTWhDZnJwZVpFZStFbitLWlF1Z28rQmlMcmhLQQp5WlRJa0VQNSs2ci9Ocnhma3gyL0trbHJxOExPeUxmSDkxTGJtSkVWRUtRTmxvQ1lwaFpZd0I3bjlHUHZLbEd2CnB2UHVKYzd3RWttdENNcDBkTmpvM01aMGlqMVNJTjZOdHg4RHFoUEo4UUt2TkRvZ1ZtZUVHcFFGQmNyemZrb2wKTE1YUEJwWDJReDZkUHFMR0hDYldRRG52ZXdJREFRQUJvMk13WVRBT0JnTlZIUThCQWY4RUJBTUNBUVl3RHdZRApWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVWMxcmgyQkVIU1FKMHF4aFBURFFLUkpnMkFHRXdId1lEClZSMGpCQmd3Rm9BVWMxcmgyQkVIU1FKMHF4aFBURFFLUkpnMkFHRXdEUVlKS29aSWh2Y05BUUVMQlFBRGdnRUIKQUJadnJldGljVzVVdW9RUzdOQVZJQ0N2aDVGd2dya0M1dG5IWDNwOFRPaE1JcEpUZ3JLaGVkSlpLekxjMjU0ZwovakFzYjdxNzc1SWNNT2hTMnZGSlNRZDZyVjBjTU5DZEZqazBzVFRlMDFPWG9KajJmTjNNTWJFRUdmczZjcndrClRLaVhFSjlYWWMwNFVsNGI4WEowZDVoWWVqcjVJRjlsZUoySkpNaUdUSkZHVTFPaThMY3RqN3F5WDBubG8reDUKWGhqOEJic0pzYlVHb0ErYlh2Q09PODh2b3lPWm9SR0NnMUpGenRicGdJQVY2azY0REo3eHA5dE5EaFpKajBVbwoyTURyV2JmVVlGV01pRDVMMGQ1TWplWDdhR0lQaEpzTXVuZDF6RkhyMWhvNjRPZENKMXpEbXRrNFVZelowZGVFCjVuTEEzRlhoK3NuYUVwbXBsN1g5WHVzPQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0t""" # noqa: E501 class TestSSLUtils(CharmTestCase): def setUp(self): super(TestSSLUtils, self).setUp(ssl_utils, TO_PATCH) @patch('ssl_utils.get_hostname') @patch('ssl_utils.get_relation_ip') def test_get_unit_amqp_endpoint_data(self, get_relation_ip, get_hostname): self.config.return_value = '10.0.0.0/24' get_relation_ip.return_value = '10.0.0.10' get_hostname.return_value = 'myhost' self.assertEqual( ssl_utils.get_unit_amqp_endpoint_data(), ('10.0.0.10', 'myhost')) get_relation_ip.assert_called_once_with( 'amqp', cidr_network='10.0.0.0/24') get_hostname.assert_called_once_with('10.0.0.10') @patch('ssl_utils.ch_cert_utils.get_bundle_for_cn') @patch('ssl_utils.get_unit_amqp_endpoint_data') def test_get_relation_cert_data(self, get_unit_amqp_endpoint_data, get_bundle_for_cn): get_unit_amqp_endpoint_data.return_value = ('10.0.0.10', 'juju-345.lcd') get_bundle_for_cn.return_value = { 'ca': 'vaultca', 'cert': 'vaultcert', 'key': 'vaultkey'} self.assertEqual( ssl_utils.get_relation_cert_data(), {'ca': 'vaultca', 'cert': 'vaultcert', 'key': 'vaultkey'}) get_bundle_for_cn.assert_called_once_with('juju-345.lcd') @patch('ssl_utils.get_relation_cert_data') def test_get_ssl_mode_off(self, get_relation_cert_data): get_relation_cert_data.return_value = {} test_config = { 'ssl': 'off', 'ssl_enabled': False, 'ssl_on': False, 'ssl_key': None, 'ssl_cert': None} self.config.side_effect = lambda x: test_config[x] self.assertEqual( ssl_utils.get_ssl_mode(), ('off', False)) @patch('ssl_utils.get_relation_cert_data') def test_get_ssl_enabled_true(self, get_relation_cert_data): get_relation_cert_data.return_value = {} test_config = { 'ssl': 'off', 'ssl_enabled': True, 'ssl_on': False, 'ssl_key': None, 'ssl_cert': None} self.config.side_effect = lambda x: test_config[x] self.assertEqual( ssl_utils.get_ssl_mode(), ('on', False)) @patch('ssl_utils.get_relation_cert_data') def test_get_ssl_enabled_false(self, get_relation_cert_data): get_relation_cert_data.return_value = {} test_config = { 'ssl': 'on', 'ssl_enabled': False, 'ssl_on': False, 'ssl_key': None, 'ssl_cert': None} self.config.side_effect = lambda x: test_config[x] self.assertEqual( ssl_utils.get_ssl_mode(), ('on', False)) @patch('ssl_utils.get_relation_cert_data') def test_get_ssl_enabled_external_ca(self, get_relation_cert_data): get_relation_cert_data.return_value = {} test_config = { 'ssl': 'on', 'ssl_enabled': False, 'ssl_on': False, 'ssl_key': 'key1', 'ssl_cert': 'cert1'} self.config.side_effect = lambda x: test_config[x] self.assertEqual( ssl_utils.get_ssl_mode(), ('on', True)) @patch('ssl_utils.get_relation_cert_data') def test_get_ssl_enabled_relation_certs(self, get_relation_cert_data): get_relation_cert_data.return_value = { 'cert': 'vaultcert', 'key': 'vaultkey', 'ca': 'vaultca'} self.assertEqual( ssl_utils.get_ssl_mode(), ('certs-relation', True)) @patch('ssl_utils.get_relation_cert_data') @patch('ssl_utils.get_ssl_mode') def test_get_ssl_mode_ssl_off(self, get_ssl_mode, get_relation_cert_data): get_ssl_mode.return_value = ('off', False) get_relation_cert_data.return_value = {} relation_data = {} ssl_utils.configure_client_ssl(relation_data) self.assertEqual(relation_data, {}) @patch('ssl_utils.ServiceCA') @patch('ssl_utils.get_ssl_mode') def test_get_ssl_mode_ssl_on_no_ca(self, get_ssl_mode, ServiceCA): ServiceCA.get_ca().get_ca_bundle.return_value = 'cert1' get_ssl_mode.return_value = ('on', False) test_config = { 'ssl_port': '9090'} self.config.side_effect = lambda x: test_config[x] relation_data = {} ssl_utils.configure_client_ssl(relation_data) self.assertEqual( relation_data, {'ssl_port': '9090', 'ssl_ca': 'Y2VydDE='}) @patch('ssl_utils.get_ssl_mode') def test_get_ssl_mode_ssl_on_ext_ca(self, get_ssl_mode): get_ssl_mode.return_value = ('on', True) test_config = { 'ssl_port': '9090', 'ssl_ca': TEST_CA} self.config.side_effect = lambda x: test_config[x] relation_data = {} ssl_utils.configure_client_ssl(relation_data) self.maxDiff = None self.assertEqual( relation_data, {'ssl_port': '9090', 'ssl_ca': B64_TEST_CA}) @patch('ssl_utils.get_ssl_mode') def test_get_ssl_mode_ssl_on_ext_ca_b64(self, get_ssl_mode): get_ssl_mode.return_value = ('on', True) test_config = { 'ssl_port': '9090', 'ssl_ca': 'ZXh0X2Nh'} self.config.side_effect = lambda x: test_config[x] relation_data = {} ssl_utils.configure_client_ssl(relation_data) self.assertEqual( relation_data, {'ssl_port': '9090', 'ssl_ca': 'ZXh0X2Nh'}) @patch('ssl_utils.local_unit') @patch('ssl_utils.relation_ids') @patch('ssl_utils.relation_get') @patch('ssl_utils.configure_client_ssl') @patch('ssl_utils.relation_set') def test_reconfigure_client_ssl_no_ssl(self, relation_set, configure_client_ssl, relation_get, relation_ids, local_unit): relation_ids.return_value = ['rel1'] relation_get.return_value = {'ssl_key': 'aa'} ssl_utils.reconfigure_client_ssl(ssl_enabled=False) relation_set.assert_called_with( relation_id='rel1', ssl_ca='', ssl_cert='', ssl_key='', ssl_port='') @patch('ssl_utils.local_unit') @patch('ssl_utils.relation_ids') @patch('ssl_utils.relation_get') @patch('ssl_utils.configure_client_ssl') @patch('ssl_utils.relation_set') def test_reconfigure_client_ssl(self, relation_set, configure_client_ssl, relation_get, relation_ids, local_unit): relation_ids.return_value = ['rel1'] relation_get.return_value = {} ssl_utils.reconfigure_client_ssl(ssl_enabled=True) configure_client_ssl.assert_called_with({})