Merge "Check configured san cert cnames for domain name."
This commit is contained in:
commit
a891d47775
|
@ -19,6 +19,7 @@ import json
|
|||
|
||||
from oslo_log import log
|
||||
|
||||
from poppy.provider.akamai import utils
|
||||
from poppy.provider import base
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -55,6 +56,24 @@ class CertificateController(base.CertificateBase):
|
|||
def create_certificate(self, cert_obj, enqueue=True):
|
||||
if cert_obj.cert_type == 'san':
|
||||
try:
|
||||
found, found_cert = (
|
||||
self._check_domain_already_exists_on_san_certs(
|
||||
cert_obj.domain_name
|
||||
)
|
||||
)
|
||||
if found is True:
|
||||
return self.responder.ssl_certificate_provisioned(None, {
|
||||
'status': 'failed',
|
||||
'san cert': None,
|
||||
'created_at': str(datetime.datetime.now()),
|
||||
'action': (
|
||||
'Domain {0} already exists '
|
||||
'on san cert {1}.'.format(
|
||||
cert_obj.domain_name, found_cert
|
||||
)
|
||||
)
|
||||
})
|
||||
|
||||
if enqueue:
|
||||
self.mod_san_queue.enqueue_mod_san_request(
|
||||
json.dumps(cert_obj.to_dict()))
|
||||
|
@ -235,3 +254,17 @@ class CertificateController(base.CertificateBase):
|
|||
cert_obj.cert_type
|
||||
)
|
||||
})
|
||||
|
||||
def _check_domain_already_exists_on_san_certs(self, domain_name):
|
||||
"""Check all configured san certs for domain."""
|
||||
|
||||
found = False
|
||||
found_cert = None
|
||||
for san_cert_name in self.san_cert_cnames:
|
||||
sans = utils.get_sans_by_host(san_cert_name)
|
||||
if domain_name in sans:
|
||||
found = True
|
||||
found_cert = san_cert_name
|
||||
break
|
||||
|
||||
return found, found_cert
|
||||
|
|
|
@ -74,6 +74,39 @@ def get_ssl_number_of_hosts(remote_host):
|
|||
return result
|
||||
|
||||
|
||||
def get_sans_by_host(remote_host):
|
||||
"""Get Subject Alternative Names for a (SAN) Cert."""
|
||||
|
||||
for ssl_version in ssl_versions:
|
||||
try:
|
||||
cert = ssl.get_server_certificate(
|
||||
(remote_host, 443),
|
||||
ssl_version=ssl_version
|
||||
)
|
||||
except ssl.SSLError:
|
||||
# This exception m
|
||||
continue
|
||||
|
||||
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
|
||||
|
||||
sans = []
|
||||
for idx in range(0, x509.get_extension_count()):
|
||||
extension = x509.get_extension(idx)
|
||||
if extension.get_short_name() == 'subjectAltName':
|
||||
sans = [
|
||||
san.replace('DNS:', '').strip() for san in
|
||||
str(extension).split(',')
|
||||
]
|
||||
break
|
||||
|
||||
# accumulate all sans across multiple versions
|
||||
result = sans
|
||||
break
|
||||
else:
|
||||
raise ValueError('Get remote host certificate info failed...')
|
||||
return result
|
||||
|
||||
|
||||
def connect_to_zookeeper_storage_backend(conf):
|
||||
"""Connect to a zookeeper cluster"""
|
||||
storage_backend_hosts = ','.join(['%s:%s' % (
|
||||
|
|
|
@ -33,6 +33,14 @@ class TestCertificates(base.TestCase):
|
|||
self.san_cert_cnames = [str(x) for x in range(7)]
|
||||
self.driver.san_cert_cnames = self.san_cert_cnames
|
||||
|
||||
background_job_controller_patcher = mock.patch(
|
||||
'poppy.provider.akamai.utils.get_sans_by_host'
|
||||
)
|
||||
self.mock_get_sans_by_host = background_job_controller_patcher.start()
|
||||
self.addCleanup(background_job_controller_patcher.stop)
|
||||
|
||||
self.mock_get_sans_by_host.return_value = []
|
||||
|
||||
self.controller = certificates.CertificateController(self.driver)
|
||||
|
||||
@ddt.data(("SPS Request Complete", ""),
|
||||
|
@ -355,3 +363,32 @@ class TestCertificates(base.TestCase):
|
|||
mod_san_q.enqueue_mod_san_request.assert_called_once_with(
|
||||
json.dumps(ssl_certificate.load_from_json(data).to_dict())
|
||||
)
|
||||
|
||||
def test_cert_create_domain_exists_on_san(self):
|
||||
|
||||
data = {
|
||||
"cert_type": "san",
|
||||
"domain_name": "www.abc.com",
|
||||
"flavor_id": "premium"
|
||||
}
|
||||
|
||||
self.mock_get_sans_by_host.return_value = [
|
||||
data["domain_name"]
|
||||
]
|
||||
|
||||
controller = certificates.CertificateController(self.driver)
|
||||
|
||||
responder = controller.create_certificate(
|
||||
ssl_certificate.load_from_json(data),
|
||||
True
|
||||
)
|
||||
|
||||
self.assertIsNone(responder['Akamai']['cert_domain'])
|
||||
self.assertEqual(
|
||||
'failed',
|
||||
responder['Akamai']['extra_info']['status']
|
||||
)
|
||||
self.assertEqual(
|
||||
'Domain www.abc.com already exists on san cert 0.',
|
||||
responder['Akamai']['extra_info']['action']
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue