Add `force` flag to get-csr

As bug/1947265 notes running the get-csr actions can result in the
CA being wiped from the leader DB. This change attempts to make
it more clear to the user that this action be destructive.

* Deprecate the `get-csr` action and replace it with
  `regenerate-intermediate-ca`. They are functionally equivalent but
  the new name makes it clearer that the CA may be destroyed.

* Adds `force` option to the action. The force action must be used
  if a CA already exists.

* The functional test of rerunning the `regenerate-intermediate-ca`
  action is now included in the vault tests so no need to run the
  tests twice now.

Func-Test-PR: https://github.com/openstack-charmers/zaza-openstack-tests/pull/974
Change-Id: Ie01dd7ec0e9134689518b37b5d70c8dd5a556241
Closes-Bug: #1947265
This commit is contained in:
Liam Young 2022-02-14 17:26:28 +00:00 committed by Felipe Reyes
parent 5f689e8f76
commit 457a51377d
5 changed files with 127 additions and 9 deletions

View File

@ -9,7 +9,9 @@ authorize-charm:
refresh-secrets:
description: Refresh secret_id's and re-issue retrieval tokens for secrets endpoints
get-csr:
description: Get intermediate CA csr
description: >-
Get intermediate CA csr (DEPRECATED Please use regenerate-intermediate-ca).
WARNING Current certificates will be invalidated and will be recreated after the CSR is signed and uploaded.
properties:
# Depending on the configuration of CA that will sign the CSRs it
# may be necessary to ensure these fields match the CA
@ -37,6 +39,47 @@ get-csr:
type: string
description: >-
The L (Locality) values in the subject field of the CSR.
force:
type: boolean
default: False
description: >-
Requesting a new CSR and remove the existing intermediate CA
regenerate-intermediate-ca:
description: >-
Create a new intermediate CA and return a csr for it
WARNING Current certificates will be invalidated and will be recreated after the CSR is signed and uploaded.
properties:
# Depending on the configuration of CA that will sign the CSRs it
# may be necessary to ensure these fields match the CA
country:
type: string
description: >-
The C (Country) values in the subject field of the CSR
province:
type: string
description: >-
The ST (Province) values in the subject field of the CSR.
organization:
type: string
description: >-
The O (Organization) values in the subject field of the CSR.
organizational-unit:
type: string
description: >-
The OU (OrganizationalUnit) values in the subject field of the CSR.
common-name:
type: string
description: >-
The CN (Common Name) values in the subject field of the CSR.
locality:
type: string
description: >-
The L (Locality) values in the subject field of the CSR.
force:
type: boolean
default: False
description: >-
Requesting a new CSR and remove the existing intermediate CA
upload-signed-csr:
description: Upload a signed csr to vault
properties:

View File

@ -15,6 +15,7 @@
import base64
import json
import hvac
import os
import sys
from traceback import format_exc
@ -64,6 +65,29 @@ def get_intermediate_csrs(*args):
hookenv.action_fail('Please run action on lead unit')
return
action_config = hookenv.action_get() or {}
ca_chain = None
try:
client = vault.get_local_client()
# vault_pki.get_chain will return None if the intermediate CA has
# never been setup. Whereas if it has been invalidated it will
# throw a hvac.exceptions.InternalServerError
ca_chain = client.read(
'{}/cert/ca_chain'.format(vault_pki.CHARM_PKI_MP))
except hvac.exceptions.InternalServerError as e:
# hvac returns this error string if the CA chain is not present.
if 'stored CA information not able to be parsed' in str(e):
ca_chain = None
else:
raise
if ca_chain and not action_config.get('force'):
hookenv.action_fail(
'This action will invalidate this intermediate CA chain until the '
'signed csr is uploaded. During this time no new certificate '
'requests will be processed. If you are sure you want to go ahead '
'with this then please run the action with force=True')
return
csrs = vault_pki.get_csr(
ttl=action_config.get('ttl'),
country=action_config.get('country'),
@ -72,15 +96,25 @@ def get_intermediate_csrs(*args):
province=action_config.get('province'),
organization=action_config.get('organization'),
organizational_unit=action_config.get('organizational-unit'))
# We have to clear both the reactive flag, as well as the leadership
# managed root-ca option, otherwise, we will end up with the flag being
# reset in the reactive handler after this is run.
# The vault_pki.get_csr action is destructive and wipes the existing
# intermediate CA. To flag this to the charm we have to clear both the
# reactive flag, as well as the leadership managed root-ca option,
# otherwise, we will end up with the flag being reset in the reactive
# handler after this is run.
clear_flag('charm.vault.ca.ready')
hookenv.leader_set(
{'root-ca': None})
hookenv.action_set({'output': csrs})
def get_csr(*args):
hookenv.log(
("The get_csr action is deprecated, please use "
"regenerate-intermediate-ca"),
hookenv.WARNING)
return get_intermediate_csrs(args)
def upload_signed_csr(*args):
if not hookenv.is_leader():
hookenv.action_fail('Please run action on lead unit')
@ -272,7 +306,8 @@ def generate_cert(*args):
ACTIONS = {
"authorize-charm": authorize_charm_action,
"refresh-secrets": refresh_secrets,
"get-csr": get_intermediate_csrs,
"get-csr": get_csr,
"regenerate-intermediate-ca": get_intermediate_csrs,
"upload-signed-csr": upload_signed_csr,
"reissue-certificates": reissue_certificates,
"generate-root-ca": generate_root_ca,

View File

@ -0,0 +1 @@
actions.py

View File

@ -35,10 +35,6 @@ target_deploy_status:
tests:
- zaza.openstack.charm_tests.vault.tests.VaultTest
# This second run of the tests is to ensure that Vault can handle updating the
# root CA in Vault with a refreshed CSR and won't end up in a hook-error
# state. (LP: #1866150).
- zaza.openstack.charm_tests.vault.tests.VaultTest
tests_options:
force_deploy:

View File

@ -3,6 +3,9 @@ from unittest.mock import patch
import src.actions.actions as actions
import unit_tests.test_utils
import mock
import hvac
class TestActions(unit_tests.test_utils.CharmTestCase):
@ -11,6 +14,9 @@ class TestActions(unit_tests.test_utils.CharmTestCase):
self.patches = []
self.patch_all()
self.patch_object(actions, 'hookenv', name='mock_hookenv')
self.patch_object(actions.vault, 'get_local_client')
self.client = mock.MagicMock()
self.get_local_client.return_value = self.client
def test_generate_cert_not_leader(self):
"""Test when not leader, action fails"""
@ -66,3 +72,40 @@ class TestActions(unit_tests.test_utils.CharmTestCase):
self.mock_hookenv.action_fail.assert_called_with(
'Vault is not ready (1)'
)
def test_get_intermediate_csrs(self):
self.mock_hookenv.is_leader.return_value = True
self.client.read.return_value = None
actions.get_intermediate_csrs()
self.mock_hookenv.leader_set.assert_called_once_with({'root-ca': None})
self.assertFalse(self.mock_hookenv.action_fail.called)
def test_get_intermediate_csrs_invalid_ca(self):
self.mock_hookenv.is_leader.return_value = True
self.client.read.side_effect = hvac.exceptions.InternalServerError(
errors=['stored CA information not able to be parsed'])
actions.get_intermediate_csrs()
self.mock_hookenv.leader_set.assert_called_once_with({'root-ca': None})
self.assertFalse(self.mock_hookenv.action_fail.called)
def test_get_intermediate_csrs_existing_cert(self):
self.mock_hookenv.is_leader.return_value = True
self.mock_hookenv.action_get.return_value = {'force': False}
self.client.read.return_value = 'a-chain'
actions.get_intermediate_csrs()
self.assertFalse(self.mock_hookenv.leader_set.called)
self.mock_hookenv.action_fail.assert_called()
def test_get_intermediate_csrs_existing_cert_force(self):
self.mock_hookenv.is_leader.return_value = True
self.mock_hookenv.action_get.return_value = {'force': True}
self.client.read.return_value = 'a-chain'
actions.get_intermediate_csrs()
self.mock_hookenv.leader_set.assert_called_once_with({'root-ca': None})
self.assertFalse(self.mock_hookenv.action_fail.called)
def test_get_intermediate_csrs_non_leader(self):
self.mock_hookenv.is_leader.return_value = False
actions.get_intermediate_csrs()
self.assertFalse(self.mock_hookenv.leader_set.called)
self.mock_hookenv.action_fail.assert_called()