Fix use of get_chain

A recent change[1] switched to the newer methods in
hvac 11.2, but unfortunately the semantics between
client.secrets.pki.read_certificate() and client.read() are different,
in that the latter returns None on InvalidPath, whereas the former
allow the exception to bubble up.

This means that for the call sites here, we need to catch InvalidPath,
instead of the TypeError.
The original reason for TypeError was that the function
would end up calling None['key'] if read_certificate failed.

[1]: https://review.opendev.org/c/openstack/charm-vault/+/848205

Change-Id: I46b93457c8a757189802ca2c2cdf31cc9c5a9516
This commit is contained in:
Samuel Walladge 2022-07-14 15:03:49 -04:00
parent ee3271063d
commit 212d2a7dba
4 changed files with 81 additions and 5 deletions

View File

@ -80,10 +80,12 @@ def is_ca_ready(client, name, role):
def get_chain(name=None):
"""Check if CA is ready for use
"""Get the certificate chain
:returns: Whether CA is ready
:rtype: bool
:raises hvac.exceptions.VaultDown: vault is not ready
:raises hvac.exceptions.InvalidPath: certificate chain not found
:returns: certificate chain data
:rtype: str
"""
client = vault.get_local_client()
if not name:
@ -437,7 +439,7 @@ def find_cert_in_cache(request):
"""
try:
ca_chain = get_chain()
except (hvac.exceptions.VaultDown, TypeError):
except (hvac.exceptions.VaultDown, hvac.exceptions.InvalidPath):
# Fetching CA chain may fail
ca_chain = None

View File

@ -1031,7 +1031,10 @@ def sync_cert_from_cache():
try:
# this might fail if we were restarted and need to be unsealed
chain = vault_pki.get_chain()
except (vault.hvac.exceptions.VaultDown, TypeError):
except (
vault.hvac.exceptions.VaultDown,
vault.hvac.exceptions.InvalidPath,
):
pass
except vault.VaultNotReady:
# With Vault not being ready, there's no sense in continuing

View File

@ -83,6 +83,15 @@ class TestLibCharmVaultPKI(unit_tests.test_utils.CharmTestCase):
client_mock.secrets.pki.read_certificate.assert_called_once_with(
'ca_chain', mount_point='my_backend')
@patch.object(vault_pki.vault, 'get_local_client')
def test_get_chain_nonexisting(self, get_local_client):
client_mock = mock.MagicMock()
client_mock.secrets.pki.read_certificate.side_effect = (
hvac.exceptions.InvalidPath)
get_local_client.return_value = client_mock
with self.assertRaises(hvac.exceptions.InvalidPath):
vault_pki.get_chain('my_backend')
@patch.object(vault_pki.vault, 'get_local_client')
def test_get_chain_default_pki(self, get_local_client):
client_mock = mock.MagicMock()
@ -474,6 +483,25 @@ class TestLibCharmVaultPKI(unit_tests.test_utils.CharmTestCase):
pki = vault_pki.get_pki_cache()
self.assertEqual(pki, {})
@patch.object(vault_pki, 'get_pki_cache')
@patch.object(vault_pki, 'get_chain')
@patch.object(vault_pki, 'get_ca')
def test_find_cert_in_cache_err(self, get_ca, get_chain, get_pki_cache):
"""Test getting cert from cache when CA is missing."""
get_ca.return_value = None
get_chain.side_effect = hvac.exceptions.InvalidPath
cert, key = vault_pki.find_cert_in_cache(MagicMock())
# assert that CA cert or chain was retrieved
get_ca.assert_called_once_with()
get_chain.assert_called_once_with()
# assert that function does not proceed due to the missing CA
get_pki_cache.assert_not_called()
self.assertIsNone(cert)
self.assertIsNone(key)
@patch.object(vault_pki, 'get_pki_cache')
@patch.object(vault_pki, 'get_chain')
@patch.object(vault_pki, 'get_ca')

View File

@ -4,6 +4,7 @@ from unittest.mock import patch, call
import json
import charms.reactive
import hvac
# Mock out reactive decorators prior to importing reactive.vault
dec_mock = mock.MagicMock()
@ -1215,6 +1216,48 @@ class TestHandlers(unit_tests.test_utils.CharmTestCase):
certs_in_cache[index][1],
)
@mock.patch.object(handlers, 'vault_pki')
@mock.patch.object(handlers, 'leader_get')
def test_sync_cert_from_cache_err(self, leader_get, vault_pki):
"""Test that it gracefully fails if get_chain doesn't succeed."""
global_client_bundle = {
"certificate": "Global client cert",
"private_key": "Global client key",
}
leader_get.return_value = json.dumps(global_client_bundle)
certs_in_cache = (
("cn1_cert", "cn1_key"),
("cn2_cert", "cn2_key"),
)
vault_pki.find_cert_in_cache.side_effect = certs_in_cache
vault_pki.get_chain.side_effect = hvac.exceptions.InvalidPath
self.is_flag_set.return_value = False
tls = self.endpoint_from_flag.return_value
self.is_flag_set.return_value = True
tls.set_chain.assert_not_called()
tls.all_requests = [mock.Mock(cert_type='cert_type1',
common_name='common_name1',
sans='sans1'),
mock.Mock(cert_type='cert_type2',
common_name='common_name2',
sans='sans2'),
]
handlers.sync_cert_from_cache()
tls.set_client_cert.assert_called_once_with(
global_client_bundle["certificate"],
global_client_bundle["private_key"],
)
for index, request in enumerate(tls.all_requests):
request.set_cert.assert_called_once_with(
certs_in_cache[index][0],
certs_in_cache[index][1],
)
@mock.patch.object(handlers, 'vault_pki')
def test_tune_pki_backend(self, vault_pki):
self.config.return_value = {