Merge "pki: Add pki_utility unit tests"

This commit is contained in:
Zuul 2019-03-19 18:47:30 +00:00 committed by Gerrit Code Review
commit f35c3c1616
2 changed files with 175 additions and 0 deletions

View File

View File

@ -0,0 +1,175 @@
# Copyright 2019 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import time
import click
import mock
import pytest
from pegleg import config
from pegleg.engine.catalog import pki_utility
from pegleg.engine.common import managed_document
CERT_HEADER = '-----BEGIN CERTIFICATE-----\n'
CERT_KEY_HEADER = '-----BEGIN RSA PRIVATE KEY-----\n'
PUBLIC_KEY_HEADER = '-----BEGIN PUBLIC KEY-----\n'
PRIVATE_KEY_HEADER = '-----BEGIN RSA PRIVATE KEY-----\n'
PEGLEG_MANAGED_DOC_SCHEMA = 'pegleg/PeglegManagedDocument/v1'
CA_SCHEMA = 'deckhand/CertificateAuthority/v1'
CA_KEY_SCHEMA = 'deckhand/CertificateAuthorityKey/v1'
CERT_SCHEMA = 'deckhand/Certificate/v1'
CERT_KEY_SCHEMA = 'deckhand/CertificateKey/v1'
PUBLIC_KEY_SCHEMA = 'deckhand/PublicKey/v1'
PRIVATE_KEY_SCHEMA = 'deckhand/PrivateKey/v1'
@pytest.mark.skipif(
not pki_utility.PKIUtility.cfssl_exists(),
reason='cfssl must be installed to execute these tests')
class TestPKIUtility(object):
@classmethod
def setup_class(cls):
mock.patch.object(
managed_document,
'_get_repo_url_and_rev',
new=lambda: ('fake://github.com/nothing.git', 'master')).start()
def test_generate_ca(self):
pki_obj = pki_utility.PKIUtility()
ca_cert_wrapper, ca_key_wrapper = pki_obj.generate_ca(
self.__class__.__name__)
assert 'pegleg/PeglegManagedDocument/v1' == ca_cert_wrapper['schema']
assert 'pegleg/PeglegManagedDocument/v1' == ca_key_wrapper['schema']
ca_cert = ca_cert_wrapper['data']['managedDocument']
assert isinstance(ca_cert, dict), ca_cert
ca_key = ca_key_wrapper['data']['managedDocument']
assert isinstance(ca_key, dict), ca_key
assert isinstance(ca_cert, dict), ca_cert
assert CA_SCHEMA in ca_cert['schema']
assert CERT_HEADER in ca_cert['data']
assert isinstance(ca_key, dict), ca_key
assert CA_KEY_SCHEMA in ca_key['schema']
assert CERT_KEY_HEADER in ca_key['data']
def test_generate_keypair(self):
pki_obj = pki_utility.PKIUtility()
pub_key_wrapper, priv_key_wrapper = pki_obj.generate_keypair(
self.__class__.__name__)
assert 'pegleg/PeglegManagedDocument/v1' == pub_key_wrapper['schema']
assert 'pegleg/PeglegManagedDocument/v1' == priv_key_wrapper['schema']
pub_key = pub_key_wrapper['data']['managedDocument']
assert isinstance(pub_key, dict), pub_key
priv_key = priv_key_wrapper['data']['managedDocument']
assert isinstance(pub_key, dict), priv_key
assert isinstance(pub_key, dict), pub_key
assert PUBLIC_KEY_SCHEMA in pub_key['schema']
assert PUBLIC_KEY_HEADER in pub_key['data']
assert isinstance(priv_key, dict), priv_key
assert PRIVATE_KEY_SCHEMA in priv_key['schema']
assert PRIVATE_KEY_HEADER in priv_key['data']
def test_generate_certificate(self):
pki_obj = pki_utility.PKIUtility()
ca_cert_wrapper, ca_key_wrapper = pki_obj.generate_ca(
self.__class__.__name__)
ca_cert = ca_cert_wrapper['data']['managedDocument']
ca_key = ca_key_wrapper['data']['managedDocument']
cert_wrapper, cert_key_wrapper = pki_obj.generate_certificate(
name=self.__class__.__name__,
ca_cert=ca_cert['data'],
ca_key=ca_key['data'],
cn='admin')
assert 'pegleg/PeglegManagedDocument/v1' == cert_wrapper['schema']
assert 'pegleg/PeglegManagedDocument/v1' == cert_key_wrapper['schema']
cert = cert_wrapper['data']['managedDocument']
assert isinstance(cert, dict), cert
cert_key = cert_key_wrapper['data']['managedDocument']
assert isinstance(cert_key, dict), cert_key
assert isinstance(cert, dict), cert
assert CERT_SCHEMA in cert['schema']
assert CERT_HEADER in cert['data']
assert isinstance(cert_key, dict), cert_key
assert CERT_KEY_SCHEMA in cert_key['schema']
assert CERT_KEY_HEADER in cert_key['data']
def test_check_expiry_is_expired_false(self):
"""Check that ``check_expiry`` returns False if cert isn't expired."""
pki_obj = pki_utility.PKIUtility()
ca_config = json.loads(pki_obj.ca_config)
ca_config['signing']['default']['expiry'] = '1h'
m_callable = mock.PropertyMock(return_value=json.dumps(ca_config))
with mock.patch.object(
pki_utility.PKIUtility, 'ca_config', new_callable=m_callable):
ca_cert_wrapper, ca_key_wrapper = pki_obj.generate_ca(
self.__class__.__name__)
ca_cert = ca_cert_wrapper['data']['managedDocument']
ca_key = ca_key_wrapper['data']['managedDocument']
cert_wrapper, _ = pki_obj.generate_certificate(
name=self.__class__.__name__,
ca_cert=ca_cert['data'],
ca_key=ca_key['data'],
cn='admin')
cert = cert_wrapper['data']['managedDocument']
# Validate that the cert hasn't expired.
is_expired = pki_obj.check_expiry(cert=cert['data'])
assert not is_expired
def test_check_expiry_is_expired_true(self):
"""Check that ``check_expiry`` returns True is cert is expired.
Second values are used to demonstrate precision down to the second.
"""
pki_obj = pki_utility.PKIUtility()
ca_config = json.loads(pki_obj.ca_config)
ca_config['signing']['default']['expiry'] = '1s'
m_callable = mock.PropertyMock(return_value=json.dumps(ca_config))
with mock.patch.object(
pki_utility.PKIUtility, 'ca_config', new_callable=m_callable):
ca_cert_wrapper, ca_key_wrapper = pki_obj.generate_ca(
self.__class__.__name__)
ca_cert = ca_cert_wrapper['data']['managedDocument']
ca_key = ca_key_wrapper['data']['managedDocument']
cert_wrapper, _ = pki_obj.generate_certificate(
name=self.__class__.__name__,
ca_cert=ca_cert['data'],
ca_key=ca_key['data'],
cn='admin')
cert = cert_wrapper['data']['managedDocument']
time.sleep(2)
# Validate that the cert has expired.
is_expired = pki_obj.check_expiry(cert=cert['data'])
assert is_expired