Add certificates controller to provider drivers

Refactors the provider drivers to separate out the certificate
creation logic into its own controller.

Change-Id: I0e8bcdd5de1a3686faea212095d69da47d41fd3f
This commit is contained in:
Isaac Mungai 2016-04-04 15:29:30 -04:00
parent 84b5128ffb
commit 1bfcb41e04
25 changed files with 793 additions and 510 deletions

View File

@ -76,11 +76,15 @@ class ProviderWrapper(object):
purge_url)
def create_certificate(self, ext, cert_obj, enqueue):
"""Create a provider
"""Create a certificate
:param ext
:param service_obj
:param cert_obj
:param enqueue
:returns: ext.obj.service_controller.create(service_obj)
"""
return ext.obj.service_controller.create_certificate(cert_obj, enqueue)
return ext.obj.certificate_controller.create_certificate(
cert_obj,
enqueue
)

View File

@ -0,0 +1,223 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
from oslo_log import log
from poppy.provider import base
LOG = log.getLogger(__name__)
class CertificateController(base.CertificateBase):
@property
def mod_san_queue(self):
return self.driver.mod_san_queue
@property
def san_cert_cnames(self):
return self.driver.san_cert_cnames
@property
def cert_info_storage(self):
return self.driver.cert_info_storage
@property
def san_mapping_queue(self):
return self.driver.san_mapping_queue
@property
def sps_api_client(self):
return self.driver.akamai_sps_api_client
def __init__(self, driver):
super(CertificateController, self).__init__(driver)
self.driver = driver
self.sps_api_base_url = self.driver.akamai_sps_api_base_url
def create_certificate(self, cert_obj, enqueue=True):
if cert_obj.cert_type == 'san':
try:
if enqueue:
self.mod_san_queue.enqueue_mod_san_request(
json.dumps(cert_obj.to_dict()))
return self.responder.ssl_certificate_provisioned(None, {
'status': 'create_in_progress',
'san cert': None,
# Add logging so it is easier for testing
'created_at': str(datetime.datetime.now()),
'action': (
'San cert request for {0} has been '
'enqueued.'.format(cert_obj.domain_name)
)
})
for san_cert_name in self.san_cert_cnames:
enabled = (
self.cert_info_storage.get_enabled_status(
san_cert_name
)
)
if not enabled:
continue
last_sps_id = (
self.cert_info_storage.get_cert_last_spsid(
san_cert_name
)
)
if last_sps_id not in [None, ""]:
LOG.info('Latest spsId for {0} is: {1}'.format(
san_cert_name,
last_sps_id)
)
resp = self.sps_api_client.get(
self.sps_api_base_url.format(spsId=last_sps_id),
)
if resp.status_code != 200:
raise RuntimeError(
'SPS API Request Failed. '
'Exception: {0}'.format(resp.text)
)
sps_request_info = json.loads(resp.text)[
'requestList'][0]
status = sps_request_info['status']
work_flow_progress = (
sps_request_info['workflowProgress']
)
if status == 'edge host already created or pending':
if work_flow_progress is not None and \
'error' in work_flow_progress.lower():
LOG.info("SPS Pending with Error: {0}".format(
work_flow_progress))
continue
else:
pass
elif status == 'CPS cancelled':
pass
elif status != 'SPS Request Complete':
LOG.info("SPS Not completed for {0}...".format(
san_cert_name))
continue
# issue modify san_cert sps request
cert_info = self.cert_info_storage.get_cert_info(
san_cert_name)
cert_info['add.sans'] = cert_obj.domain_name
string_post_data = '&'.join(
['%s=%s' % (k, v) for (k, v) in cert_info.items()])
LOG.info(
'Post modSan request with request data: {0}'.format(
string_post_data
)
)
resp = self.sps_api_client.post(
self.sps_api_base_url.format(spsId=""),
data=string_post_data.encode('utf-8')
)
if resp.status_code != 202:
raise RuntimeError(
'SPS Request failed. '
'Exception: {0}'.format(resp.text)
)
else:
resp_dict = json.loads(resp.text)
LOG.info(
'modSan request submitted. Response: {0}'.format(
str(resp_dict)
)
)
this_sps_id = resp_dict['spsId']
# get last item in results array and use its jobID
results = resp_dict['Results']['data']
this_job_id = results[0]['results']['jobID']
self.cert_info_storage.save_cert_last_ids(
san_cert_name,
this_sps_id,
this_job_id
)
self.san_mapping_queue.enqueue_san_mapping(
json.dumps({
'san_cert_domain': san_cert_name,
'domain_name': cert_obj.domain_name,
})
)
return self.responder.ssl_certificate_provisioned(
san_cert_name, {
'status': 'create_in_progress',
'san cert': san_cert_name,
'akamai_spsId': this_sps_id,
'created_at': str(datetime.datetime.now()),
'action': 'Waiting for customer domain '
'validation for {0}'.format(
cert_obj.domain_name)
})
else:
self.mod_san_queue.enqueue_mod_san_request(
json.dumps(cert_obj.to_dict()))
return self.responder.ssl_certificate_provisioned(None, {
'status': 'create_in_progress',
'san cert': None,
# Add logging so it is easier for testing
'created_at': str(datetime.datetime.now()),
'action': 'No available san cert for {0} right now,'
' or no san cert info available. Support:'
'Please write down the domain and keep an'
' eye on next available freed-up SAN certs.'
' More provisioning might be needed'.format(
cert_obj.domain_name)
})
except Exception as e:
LOG.exception(
"Error {0} during certificate creation for {1} "
"sending the request sent back to the queue.".format(
e, cert_obj.domain_name
)
)
try:
self.mod_san_queue.enqueue_mod_san_request(
json.dumps(cert_obj.to_dict()))
return self.responder.ssl_certificate_provisioned(None, {
'status': 'create_in_progress',
'san cert': None,
# Add logging so it is easier for testing
'created_at': str(datetime.datetime.now()),
'action': (
'San cert request for {0} has been '
'enqueued.'.format(cert_obj.domain_name)
)
})
except Exception as exc:
LOG.exception("Unable to enqueue {0}, Error: {1}".format(
cert_obj.domain_name,
exc
))
return self.responder.ssl_certificate_provisioned(None, {
'status': 'failed',
'san cert': None,
'created_at': str(datetime.datetime.now()),
'action': 'Waiting for action... Provision '
'san cert failed for {0} failed.'.format(
cert_obj.domain_name)
})
else:
return self.responder.ssl_certificate_provisioned(None, {
'status': 'failed',
'reason': "Cert type : {0} hasn't been implemented".format(
cert_obj.cert_type
)
})

View File

@ -22,6 +22,8 @@ Field Mappings:
updated and documented in each controller class.
"""
from poppy.provider.akamai import certificates
from poppy.provider.akamai import services
CertificateController = certificates.CertificateController
ServiceController = services.ServiceController

View File

@ -284,5 +284,10 @@ class CDNProvider(base.Driver):
@property
def service_controller(self):
"""Returns the driver's hostname controller."""
"""Returns the driver's service controller."""
return controllers.ServiceController(self)
@property
def certificate_controller(self):
"""Returns the driver's certificate controller."""
return controllers.CertificateController(self)

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import traceback
@ -42,30 +41,10 @@ class ServiceController(base.ServiceBase):
def ccu_api_client(self):
return self.driver.ccu_api_client
@property
def sps_api_client(self):
return self.driver.akamai_sps_api_client
@property
def subcustomer_api_client(self):
return self.driver.akamai_sub_customer_api_client
@property
def cert_info_storage(self):
return self.driver.cert_info_storage
@property
def mod_san_queue(self):
return self.driver.mod_san_queue
@property
def san_mapping_queue(self):
return self.driver.san_mapping_queue
@property
def san_cert_cnames(self):
return self.driver.san_cert_cnames
def __init__(self, driver):
super(ServiceController, self).__init__(driver)
@ -74,7 +53,6 @@ class ServiceController(base.ServiceBase):
self.akamai_subcustomer_api_base_url = \
self.driver.akamai_subcustomer_api_base_url
self.ccu_api_base_url = self.driver.akamai_ccu_api_base_url
self.sps_api_base_url = self.driver.akamai_sps_api_base_url
self.request_header = {'Content-type': 'application/json',
'Accept': 'text/plain'}
self.san_cert_hostname_limit = self.driver.san_cert_hostname_limit
@ -575,163 +553,6 @@ class ServiceController(base.ServiceBase):
format(provider_service_id))
return self.responder.failed(str(e))
def create_certificate(self, cert_obj, enqueue=True):
if cert_obj.cert_type == 'san':
try:
if enqueue:
self.mod_san_queue.enqueue_mod_san_request(
json.dumps(cert_obj.to_dict()))
return self.responder.ssl_certificate_provisioned(None, {
'status': 'create_in_progress',
'san cert': None,
# Add logging so it is easier for testing
'created_at': str(datetime.datetime.now()),
'action': (
'San cert request for {0} has been '
'enqueued.'.format(cert_obj.domain_name)
)
})
for san_cert_name in self.san_cert_cnames:
enabled = (
self.cert_info_storage.get_enabled_status(
san_cert_name
)
)
if not enabled:
continue
lastSpsId = (
self.cert_info_storage.get_cert_last_spsid(
san_cert_name
)
)
if lastSpsId not in [None, ""]:
LOG.info('Latest spsId for %s is: %s' % (san_cert_name,
lastSpsId))
resp = self.sps_api_client.get(
self.sps_api_base_url.format(spsId=lastSpsId),
)
if resp.status_code != 200:
raise RuntimeError('SPS API Request Failed'
'Exception: %s' % resp.text)
sps_request_info = json.loads(resp.text)[
'requestList'][0]
status = sps_request_info['status']
workFlowProgress = sps_request_info['workflowProgress']
if status == 'edge host already created or pending':
if workFlowProgress is not None and \
'error' in workFlowProgress.lower():
LOG.info("SPS Pending with Error: {0}".format(
workFlowProgress))
continue
else:
pass
elif status == 'CPS cancelled':
pass
elif status != 'SPS Request Complete':
LOG.info("SPS Not completed for %s..." %
san_cert_name)
continue
# issue modify san_cert sps request
cert_info = self.cert_info_storage.get_cert_info(
san_cert_name)
cert_info['add.sans'] = cert_obj.domain_name
string_post_data = '&'.join(
['%s=%s' % (k, v) for (k, v) in cert_info.items()])
LOG.info('Post modSan request with request data: %s' %
string_post_data)
resp = self.sps_api_client.post(
self.sps_api_base_url.format(spsId=""),
data=string_post_data.encode('utf-8')
)
if resp.status_code != 202:
raise RuntimeError('SPS Request failed.'
'Exception: %s' % resp.text)
else:
resp_dict = json.loads(resp.text)
LOG.info('modSan request submitted. Response: %s' %
str(resp_dict))
this_sps_id = resp_dict['spsId']
# get last item in results array and use its jobID
results = resp_dict['Results']['data']
this_job_id = results[0]['results']['jobID']
self.cert_info_storage.save_cert_last_ids(
san_cert_name,
this_sps_id,
this_job_id
)
self.san_mapping_queue.enqueue_san_mapping(
json.dumps({
'san_cert_domain': san_cert_name,
'domain_name': cert_obj.domain_name,
})
)
return self.responder.ssl_certificate_provisioned(
san_cert_name, {
'status': 'create_in_progress',
'san cert': san_cert_name,
'akamai_spsId': this_sps_id,
'created_at': str(datetime.datetime.now()),
'action': 'Waiting for customer domain '
'validation for %s' %
(cert_obj.domain_name)
})
else:
self.mod_san_queue.enqueue_mod_san_request(
json.dumps(cert_obj.to_dict()))
return self.responder.ssl_certificate_provisioned(None, {
'status': 'create_in_progress',
'san cert': None,
# Add logging so it is easier for testing
'created_at': str(datetime.datetime.now()),
'action': 'No available san cert for %s right now,'
' or no san cert info available. Support:'
'Please write down the domain and keep an'
' eye on next available freed-up SAN certs.'
' More provisioning might be needed' %
(cert_obj.domain_name)
})
except Exception as e:
LOG.exception(
"Error {0} during certificate creation for {1} "
"sending the request sent back to the queue.".format(
e, cert_obj.domain_name
)
)
try:
self.mod_san_queue.enqueue_mod_san_request(
json.dumps(cert_obj.to_dict()))
return self.responder.ssl_certificate_provisioned(None, {
'status': 'create_in_progress',
'san cert': None,
# Add logging so it is easier for testing
'created_at': str(datetime.datetime.now()),
'action': (
'San cert request for {0} has been '
'enqueued.'.format(cert_obj.domain_name)
)
})
except Exception as exc:
LOG.exception("Unable to enqueue {0}, Error: {1}".format(
cert_obj.domain_name,
exc
))
return self.responder.ssl_certificate_provisioned(None, {
'status': 'failed',
'san cert': None,
'created_at': str(datetime.datetime.now()),
'action': 'Waiting for action... Provision '
'san cert failed for {0} failed.'.format(
cert_obj.domain_name)
})
else:
return self.responder.ssl_certificate_provisioned(None, {
'status': 'failed',
'reason': "Cert type : {0} hasn't been implemented".format(
cert_obj.cert_type
)
})
def get_subcustomer_id(self, project_id, domain):
# subcustomer_id now just set for project_id
return ''.join([str(project_id)])

View File

@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from poppy.provider.base import certificates
from poppy.provider.base import driver
from poppy.provider.base import services
CertificateBase = certificates.CertificatesControllerBase
Driver = driver.ProviderDriverBase
ServiceBase = services.ServicesControllerBase

View File

@ -0,0 +1,41 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
from poppy.provider.base import controller
from poppy.provider.base import responder
@six.add_metaclass(abc.ABCMeta)
class CertificatesControllerBase(controller.ProviderControllerBase):
"""Services Controller Base."""
def __init__(self, driver):
super(CertificatesControllerBase, self).__init__(driver)
self.responder = responder.Responder(driver.provider_name)
@abc.abstractmethod
def create_certificate(self, cert_obj, enqueue=True):
"""Create a certificate.
:param cert_obj
:param enqueue
:raises NotImplementedError
"""
raise NotImplementedError

View File

@ -49,7 +49,7 @@ class ProviderDriverBase(object):
@abc.abstractmethod
def is_alive(self):
"""Check whether the storage is ready.
"""Check if provider is alive and return bool indicating the status.
:raises NotImplementedError
"""
@ -57,7 +57,7 @@ class ProviderDriverBase(object):
@abc.abstractproperty
def provider_name(self):
"""provider name.
"""Provider name.
:raises NotImplementedError
"""
@ -65,7 +65,15 @@ class ProviderDriverBase(object):
@abc.abstractproperty
def service_controller(self):
"""Returns the driver's hostname controller.
"""Returns the driver's service controller.
:raises NotImplementedError
"""
raise NotImplementedError
@abc.abstractproperty
def certificate_controller(self):
"""Returns the driver's certificate controller.
:raises NotImplementedError
"""

View File

@ -100,7 +100,6 @@ class Responder(object):
:param provider_service_id
:param purge_url
:param hard
:returns provider msg{provider service id, purge urls}
"""
provider_response = {

View File

@ -63,6 +63,7 @@ class ServicesControllerBase(controller.ProviderControllerBase):
"""purge.
:param provider_service_id
:param hard
:param purge_url
:raises NotImplementedError
"""
@ -83,7 +84,7 @@ class ServicesControllerBase(controller.ProviderControllerBase):
:param project_id
:param domain_name
:param regions
:param region
:raises NotImplementedError
"""
raise NotImplementedError

View File

@ -0,0 +1,25 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from poppy.provider import base
class CertificateController(base.CertificateBase):
def __init__(self, driver):
super(CertificateController, self).__init__(driver)
def create_certificate(self, cert_obj, enqueue=True):
return NotImplemented

View File

@ -22,6 +22,8 @@ Field Mappings:
updated and documented in each controller class.
"""
from poppy.provider.cloudfront import certificates
from poppy.provider.cloudfront import services
CertificateController = certificates.CertificateController
ServiceController = services.ServiceController

View File

@ -80,3 +80,11 @@ class CDNProvider(base.Driver):
:return service controller
"""
return controllers.ServiceController(self)
@property
def certificate_controller(self):
"""Hook for certificate controller.
:return certificate controller
"""
return controllers.CertificateController(self)

View File

@ -0,0 +1,25 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from poppy.provider import base
class CertificateController(base.CertificateBase):
def __init__(self, driver):
super(CertificateController, self).__init__(driver)
def create_certificate(self, cert_obj, enqueue=True):
return NotImplemented

View File

@ -22,6 +22,8 @@ Field Mappings:
updated and documented in each controller class.
"""
from poppy.provider.fastly import certificates
from poppy.provider.fastly import services
CertificateController = certificates.CertificateController
ServiceController = services.ServiceController

View File

@ -91,3 +91,11 @@ class CDNProvider(base.Driver):
:return service controller
"""
return controllers.ServiceController(self)
@property
def certificate_controller(self):
"""Hook for certificate controller.
:return certificate controller
"""
return controllers.CertificateController(self)

View File

@ -0,0 +1,25 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from poppy.provider import base
class CertificateController(base.CertificateBase):
def __init__(self, driver):
super(CertificateController, self).__init__(driver)
def create_certificate(self, cert_obj, enqueue=True):
return NotImplemented

View File

@ -21,7 +21,8 @@ Field Mappings:
letter of their long name. Fields mapping will be
updated and documented in each controller class.
"""
from poppy.provider.maxcdn import certificates
from poppy.provider.maxcdn import services
CertificateController = certificates.CertificateController
ServiceController = services.ServiceController

View File

@ -84,3 +84,11 @@ class CDNProvider(base.Driver):
:return service controller
"""
return controllers.ServiceController(self)
@property
def certificate_controller(self):
"""Hook for certificate controller.
:return certificate controller
"""
return controllers.CertificateController(self)

View File

@ -0,0 +1,25 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from poppy.provider import base
class CertificateController(base.CertificateBase):
def __init__(self, driver):
super(CertificateController, self).__init__(driver)
def create_certificate(self, cert_obj, enqueue=True):
return None

View File

@ -22,6 +22,8 @@ Field Mappings:
updated and documented in each controller class.
"""
from poppy.provider.mock import certificates
from poppy.provider.mock import services
CertificateController = certificates.CertificateController
ServiceController = services.ServiceController

View File

@ -52,3 +52,11 @@ class CDNProvider(base.Driver):
:return service controller
"""
return controllers.ServiceController(self)
@property
def certificate_controller(self):
"""Hook for certificate controller.
:return certificate controller
"""
return controllers.CertificateController(self)

View File

@ -36,7 +36,8 @@ class BackgroundJobControllerTest(base.FunctionalTest):
]
background_job_controller_patcher = mock.patch(
'poppy.provider.akamai.services.ServiceController.san_cert_cnames',
'poppy.provider.akamai.certificates.'
'CertificateController.san_cert_cnames',
new=san_cert_cnames_caller(),
)
background_job_controller_patcher.start()

View File

@ -0,0 +1,357 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import ddt
import mock
from poppy.provider.akamai import certificates
from poppy.transport.pecan.models.request import ssl_certificate
from tests.unit import base
@ddt.ddt
class TestCertificates(base.TestCase):
def setUp(self):
super(TestCertificates, self).setUp()
self.driver = mock.Mock()
self.driver.provider_name = 'Akamai'
self.san_cert_cnames = [str(x) for x in range(7)]
self.driver.san_cert_cnames = self.san_cert_cnames
self.controller = certificates.CertificateController(self.driver)
@ddt.data(("SPS Request Complete", ""),
("edge host already created or pending", None),
("CPS cancelled", None),
("edge host already created or pending", "Some progress info"))
def test_create_ssl_certificate_happy_path(
self,
sps_status_workFlowProgress_tuple):
sps_status, workFlowProgress = sps_status_workFlowProgress_tuple
self.driver.san_cert_cnames = ["secure.san1.poppycdn.com",
"secure.san2.poppycdn.com"]
controller = certificates.CertificateController(self.driver)
data = {
"cert_type": "san",
"domain_name": "www.abc.com",
"flavor_id": "premium"
}
lastSpsId = (
controller.cert_info_storage.get_cert_last_spsid(
"secure.san1.poppycdn.com"))
controller.cert_info_storage.get_cert_info.return_value = {
'cnameHostname': "secure.san1.poppycdn.com",
'jobId': "secure.san1.poppycdn.com",
'issuer': 1789,
'createType': 'modSan',
'ipVersion': 'ipv4',
'slot-deployment.class': 'esslType'
}
cert_info = controller.cert_info_storage.get_cert_info(
"secure.san1.poppycdn.com")
cert_info['add.sans'] = "www.abc.com"
string_post_cert_info = '&'.join(
['%s=%s' % (k, v) for (k, v) in cert_info.items()])
controller.sps_api_client.get.return_value = mock.Mock(
status_code=200,
# Mock an SPS request
text=json.dumps({
"requestList":
[{"resourceUrl": "/config-secure-provisioning-service/"
"v1/sps-requests/1849",
"parameters": [{
"name": "cnameHostname",
"value": "secure.san1.poppycdn.com"
}, {"name": "createType", "value": "modSan"},
{"name": "csr.cn",
"value": "secure.san3.poppycdn.com"},
{"name": "add.sans",
"value": "www.abc.com"}],
"lastStatusChange": "2015-03-19T21:47:10Z",
"spsId": 1789,
"status": sps_status,
"workflowProgress": workFlowProgress,
"jobId": 44306}]})
)
controller.sps_api_client.post.return_value = mock.Mock(
status_code=202,
text=json.dumps({
"spsId": 1789,
"resourceLocation":
"/config-secure-provisioning-service/v1/sps-requests/1856",
"Results": {
"size": 1,
"data": [{
"text": None,
"results": {
"type": "SUCCESS",
"jobID": 44434}
}]}})
)
controller.create_certificate(
ssl_certificate.load_from_json(data),
False
)
controller.sps_api_client.get.assert_called_once_with(
controller.sps_api_base_url.format(spsId=lastSpsId))
controller.sps_api_client.post.assert_called_once_with(
controller.sps_api_base_url.format(spsId=lastSpsId),
data=string_post_cert_info.encode('utf-8'))
return
@ddt.data(("CPS running", ""),
("edge host already created or pending", "Error in it"))
def test_create_ssl_certificate_negative_path(
self,
sps_status_workFlowProgress_tuple):
sps_status, workFlowProgress = sps_status_workFlowProgress_tuple
self.driver.san_cert_cnames = ["secure.san1.poppycdn.com"]
controller = certificates.CertificateController(self.driver)
data = {
"cert_type": "san",
"domain_name": "www.abc.com",
"flavor_id": "premium"
}
lastSpsId = (
controller.cert_info_storage.get_cert_last_spsid(
"secure.san1.poppycdn.com"))
controller.cert_info_storage.get_cert_info.return_value = {
'cnameHostname': "secure.san1.poppycdn.com",
'jobId': "secure.san1.poppycdn.com",
'issuer': 1789,
'createType': 'modSan',
'ipVersion': 'ipv4',
'slot-deployment.class': 'esslType'
}
cert_info = controller.cert_info_storage.get_cert_info(
"secure.san1.poppycdn.com")
cert_info['add.sans'] = "www.abc.com"
controller.sps_api_client.get.return_value = mock.Mock(
status_code=200,
# Mock an SPS request
text=json.dumps({
"requestList":
[{"resourceUrl": "/config-secure-provisioning-service/"
"v1/sps-requests/1849",
"parameters": [{
"name": "cnameHostname",
"value": "secure.san1.poppycdn.com"
}, {"name": "createType", "value": "modSan"},
{"name": "csr.cn",
"value": "secure.san3.poppycdn.com"},
{"name": "add.sans",
"value": "www.abc.com"}],
"lastStatusChange": "2015-03-19T21:47:10Z",
"spsId": 1789,
"status": sps_status,
"workflowProgress": workFlowProgress,
"jobId": 44306}]})
)
controller.sps_api_client.post.return_value = mock.Mock(
status_code=202,
text=json.dumps({
"spsId": 1789,
"resourceLocation":
"/config-secure-provisioning-service/v1/sps-requests/1856",
"Results": {
"size": 1,
"data": [{
"text": None,
"results": {
"type": "SUCCESS",
"jobID": 44434}
}]}})
)
controller.create_certificate(ssl_certificate.load_from_json(data),
False)
controller.sps_api_client.get.assert_called_once_with(
controller.sps_api_base_url.format(spsId=lastSpsId))
self.assertFalse(controller.sps_api_client.post.called)
return
def test_create_ssl_certificate_cert_type_not_implemented(self):
self.driver.san_cert_cnames = ["secure.san1.poppycdn.com",
"secure.san2.poppycdn.com"]
controller = certificates.CertificateController(self.driver)
ssl_cert = mock.Mock()
type(ssl_cert).cert_type = mock.PropertyMock(
return_value='unsupported-type'
)
responder = controller.create_certificate(
ssl_cert,
False
)
self.assertIsNone(responder['Akamai']['cert_domain'])
self.assertEqual(
'failed',
responder['Akamai']['extra_info']['status']
)
self.assertEqual(
"Cert type : unsupported-type hasn't been implemented",
responder['Akamai']['extra_info']['reason']
)
def test_create_ssl_certificate_sps_api_get_failure(self):
self.driver.san_cert_cnames = ["secure.san1.poppycdn.com",
"secure.san2.poppycdn.com"]
controller = certificates.CertificateController(self.driver)
data = {
"cert_type": "san",
"domain_name": "www.abc.com",
"flavor_id": "premium"
}
lastSpsId = (
controller.cert_info_storage.get_cert_last_spsid(
"secure.san1.poppycdn.com"))
controller.cert_info_storage.get_cert_info.return_value = {
'cnameHostname': "secure.san1.poppycdn.com",
'jobId': "secure.san1.poppycdn.com",
'issuer': 1789,
'createType': 'modSan',
'ipVersion': 'ipv4',
'slot-deployment.class': 'esslType'
}
cert_info = controller.cert_info_storage.get_cert_info(
"secure.san1.poppycdn.com")
cert_info['add.sans'] = "www.abc.com"
controller.sps_api_client.get.return_value = mock.Mock(
status_code=404,
# Mock an SPS request
text='SPS ID NOT FOUND'
)
responder = controller.create_certificate(
ssl_certificate.load_from_json(data),
False
)
controller.sps_api_client.get.assert_called_once_with(
controller.sps_api_base_url.format(spsId=lastSpsId))
self.assertIsNone(responder['Akamai']['cert_domain'])
self.assertEqual(
'create_in_progress',
responder['Akamai']['extra_info']['status']
)
self.assertEqual(
'San cert request for www.abc.com has been enqueued.',
responder['Akamai']['extra_info']['action']
)
mod_san_q = self.driver.mod_san_queue
mod_san_q.enqueue_mod_san_request.assert_called_once_with(
json.dumps(ssl_certificate.load_from_json(data).to_dict())
)
def test_create_ssl_certificate_sps_api_post_failure(self):
self.driver.san_cert_cnames = ["secure.san1.poppycdn.com",
"secure.san2.poppycdn.com"]
controller = certificates.CertificateController(self.driver)
data = {
"cert_type": "san",
"domain_name": "www.abc.com",
"flavor_id": "premium"
}
lastSpsId = (
controller.cert_info_storage.get_cert_last_spsid(
"secure.san1.poppycdn.com"))
controller.cert_info_storage.get_cert_info.return_value = {
'cnameHostname': "secure.san1.poppycdn.com",
'jobId': "secure.san1.poppycdn.com",
'issuer': 1789,
'createType': 'modSan',
'ipVersion': 'ipv4',
'slot-deployment.class': 'esslType'
}
cert_info = controller.cert_info_storage.get_cert_info(
"secure.san1.poppycdn.com")
cert_info['add.sans'] = "www.abc.com"
controller.sps_api_client.get.return_value = mock.Mock(
status_code=200,
# Mock an SPS request
text=json.dumps({
"requestList":
[{"resourceUrl": "/config-secure-provisioning-service/"
"v1/sps-requests/1849",
"parameters": [{
"name": "cnameHostname",
"value": "secure.san1.poppycdn.com"
}, {"name": "createType", "value": "modSan"},
{"name": "csr.cn",
"value": "secure.san3.poppycdn.com"},
{"name": "add.sans",
"value": "www.abc.com"}],
"lastStatusChange": "2015-03-19T21:47:10Z",
"spsId": 1789,
"status": "SPS Request Complete",
"workflowProgress": "",
"jobId": 44306}]})
)
controller.sps_api_client.post.return_value = mock.Mock(
status_code=500,
text='INTERNAL SERVER ERROR'
)
responder = controller.create_certificate(
ssl_certificate.load_from_json(data),
False
)
controller.sps_api_client.get.assert_called_once_with(
controller.sps_api_base_url.format(spsId=lastSpsId))
self.assertIsNone(responder['Akamai']['cert_domain'])
self.assertEqual(
'create_in_progress',
responder['Akamai']['extra_info']['status']
)
self.assertEqual(
'San cert request for www.abc.com has been enqueued.',
responder['Akamai']['extra_info']['action']
)
mod_san_q = self.driver.mod_san_queue
mod_san_q.enqueue_mod_san_request.assert_called_once_with(
json.dumps(ssl_certificate.load_from_json(data).to_dict())
)

View File

@ -30,7 +30,6 @@ from poppy.model.service import Service
from poppy.provider.akamai import geo_zone_code_mapping
from poppy.provider.akamai import services
from poppy.transport.pecan.models.request import service
from poppy.transport.pecan.models.request import ssl_certificate
from tests.unit import base
@ -630,325 +629,6 @@ class TestServices(base.TestCase):
self.assertTrue(restriction_rule_valid)
@ddt.data(("SPS Request Complete", ""),
("edge host already created or pending", None),
("CPS cancelled", None),
("edge host already created or pending", "Some progress info"))
def test_create_ssl_certificate_happy_path(
self,
sps_status_workFlowProgress_tuple):
sps_status, workFlowProgress = sps_status_workFlowProgress_tuple
self.driver.san_cert_cnames = ["secure.san1.poppycdn.com",
"secure.san2.poppycdn.com"]
controller = services.ServiceController(self.driver)
data = {
"cert_type": "san",
"domain_name": "www.abc.com",
"flavor_id": "premium"
}
lastSpsId = (
controller.cert_info_storage.get_cert_last_spsid(
"secure.san1.poppycdn.com"))
controller.cert_info_storage.get_cert_info.return_value = {
'cnameHostname': "secure.san1.poppycdn.com",
'jobId': "secure.san1.poppycdn.com",
'issuer': 1789,
'createType': 'modSan',
'ipVersion': 'ipv4',
'slot-deployment.class': 'esslType'
}
cert_info = controller.cert_info_storage.get_cert_info(
"secure.san1.poppycdn.com")
cert_info['add.sans'] = "www.abc.com"
string_post_cert_info = '&'.join(
['%s=%s' % (k, v) for (k, v) in cert_info.items()])
controller.sps_api_client.get.return_value = mock.Mock(
status_code=200,
# Mock an SPS request
text=json.dumps({
"requestList":
[{"resourceUrl": "/config-secure-provisioning-service/"
"v1/sps-requests/1849",
"parameters": [{
"name": "cnameHostname",
"value": "secure.san1.poppycdn.com"
}, {"name": "createType", "value": "modSan"},
{"name": "csr.cn",
"value": "secure.san3.poppycdn.com"},
{"name": "add.sans",
"value": "www.abc.com"}],
"lastStatusChange": "2015-03-19T21:47:10Z",
"spsId": 1789,
"status": sps_status,
"workflowProgress": workFlowProgress,
"jobId": 44306}]})
)
controller.sps_api_client.post.return_value = mock.Mock(
status_code=202,
text=json.dumps({
"spsId": 1789,
"resourceLocation":
"/config-secure-provisioning-service/v1/sps-requests/1856",
"Results": {
"size": 1,
"data": [{
"text": None,
"results": {
"type": "SUCCESS",
"jobID": 44434}
}]}})
)
controller.create_certificate(ssl_certificate.load_from_json(data),
False)
controller.sps_api_client.get.assert_called_once_with(
controller.sps_api_base_url.format(spsId=lastSpsId))
controller.sps_api_client.post.assert_called_once_with(
controller.sps_api_base_url.format(spsId=lastSpsId),
data=string_post_cert_info.encode('utf-8'))
return
@ddt.data(("CPS running", ""),
("edge host already created or pending", "Error in it"))
def test_create_ssl_certificate_negative_path(
self,
sps_status_workFlowProgress_tuple):
sps_status, workFlowProgress = sps_status_workFlowProgress_tuple
self.driver.san_cert_cnames = ["secure.san1.poppycdn.com"]
controller = services.ServiceController(self.driver)
data = {
"cert_type": "san",
"domain_name": "www.abc.com",
"flavor_id": "premium"
}
lastSpsId = (
controller.cert_info_storage.get_cert_last_spsid(
"secure.san1.poppycdn.com"))
controller.cert_info_storage.get_cert_info.return_value = {
'cnameHostname': "secure.san1.poppycdn.com",
'jobId': "secure.san1.poppycdn.com",
'issuer': 1789,
'createType': 'modSan',
'ipVersion': 'ipv4',
'slot-deployment.class': 'esslType'
}
cert_info = controller.cert_info_storage.get_cert_info(
"secure.san1.poppycdn.com")
cert_info['add.sans'] = "www.abc.com"
controller.sps_api_client.get.return_value = mock.Mock(
status_code=200,
# Mock an SPS request
text=json.dumps({
"requestList":
[{"resourceUrl": "/config-secure-provisioning-service/"
"v1/sps-requests/1849",
"parameters": [{
"name": "cnameHostname",
"value": "secure.san1.poppycdn.com"
}, {"name": "createType", "value": "modSan"},
{"name": "csr.cn",
"value": "secure.san3.poppycdn.com"},
{"name": "add.sans",
"value": "www.abc.com"}],
"lastStatusChange": "2015-03-19T21:47:10Z",
"spsId": 1789,
"status": sps_status,
"workflowProgress": workFlowProgress,
"jobId": 44306}]})
)
controller.sps_api_client.post.return_value = mock.Mock(
status_code=202,
text=json.dumps({
"spsId": 1789,
"resourceLocation":
"/config-secure-provisioning-service/v1/sps-requests/1856",
"Results": {
"size": 1,
"data": [{
"text": None,
"results": {
"type": "SUCCESS",
"jobID": 44434}
}]}})
)
controller.create_certificate(ssl_certificate.load_from_json(data),
False)
controller.sps_api_client.get.assert_called_once_with(
controller.sps_api_base_url.format(spsId=lastSpsId))
self.assertFalse(controller.sps_api_client.post.called)
return
def test_create_ssl_certificate_cert_type_not_implemented(self):
self.driver.san_cert_cnames = ["secure.san1.poppycdn.com",
"secure.san2.poppycdn.com"]
controller = services.ServiceController(self.driver)
ssl_cert = mock.Mock()
type(ssl_cert).cert_type = mock.PropertyMock(
return_value='unsupported-type'
)
responder = controller.create_certificate(
ssl_cert,
False
)
self.assertIsNone(responder['Akamai']['cert_domain'])
self.assertEqual(
'failed',
responder['Akamai']['extra_info']['status']
)
self.assertEqual(
"Cert type : unsupported-type hasn't been implemented",
responder['Akamai']['extra_info']['reason']
)
def test_create_ssl_certificate_sps_api_get_failure(self):
self.driver.san_cert_cnames = ["secure.san1.poppycdn.com",
"secure.san2.poppycdn.com"]
controller = services.ServiceController(self.driver)
data = {
"cert_type": "san",
"domain_name": "www.abc.com",
"flavor_id": "premium"
}
lastSpsId = (
controller.cert_info_storage.get_cert_last_spsid(
"secure.san1.poppycdn.com"))
controller.cert_info_storage.get_cert_info.return_value = {
'cnameHostname': "secure.san1.poppycdn.com",
'jobId': "secure.san1.poppycdn.com",
'issuer': 1789,
'createType': 'modSan',
'ipVersion': 'ipv4',
'slot-deployment.class': 'esslType'
}
cert_info = controller.cert_info_storage.get_cert_info(
"secure.san1.poppycdn.com")
cert_info['add.sans'] = "www.abc.com"
controller.sps_api_client.get.return_value = mock.Mock(
status_code=404,
# Mock an SPS request
text='SPS ID NOT FOUND'
)
responder = controller.create_certificate(
ssl_certificate.load_from_json(data),
False
)
controller.sps_api_client.get.assert_called_once_with(
controller.sps_api_base_url.format(spsId=lastSpsId))
self.assertIsNone(responder['Akamai']['cert_domain'])
self.assertEqual(
'create_in_progress',
responder['Akamai']['extra_info']['status']
)
self.assertEqual(
'San cert request for www.abc.com has been enqueued.',
responder['Akamai']['extra_info']['action']
)
mod_san_q = self.driver.mod_san_queue
mod_san_q.enqueue_mod_san_request.assert_called_once_with(
json.dumps(ssl_certificate.load_from_json(data).to_dict())
)
def test_create_ssl_certificate_sps_api_post_failure(self):
self.driver.san_cert_cnames = ["secure.san1.poppycdn.com",
"secure.san2.poppycdn.com"]
controller = services.ServiceController(self.driver)
data = {
"cert_type": "san",
"domain_name": "www.abc.com",
"flavor_id": "premium"
}
lastSpsId = (
controller.cert_info_storage.get_cert_last_spsid(
"secure.san1.poppycdn.com"))
controller.cert_info_storage.get_cert_info.return_value = {
'cnameHostname': "secure.san1.poppycdn.com",
'jobId': "secure.san1.poppycdn.com",
'issuer': 1789,
'createType': 'modSan',
'ipVersion': 'ipv4',
'slot-deployment.class': 'esslType'
}
cert_info = controller.cert_info_storage.get_cert_info(
"secure.san1.poppycdn.com")
cert_info['add.sans'] = "www.abc.com"
controller.sps_api_client.get.return_value = mock.Mock(
status_code=200,
# Mock an SPS request
text=json.dumps({
"requestList":
[{"resourceUrl": "/config-secure-provisioning-service/"
"v1/sps-requests/1849",
"parameters": [{
"name": "cnameHostname",
"value": "secure.san1.poppycdn.com"
}, {"name": "createType", "value": "modSan"},
{"name": "csr.cn",
"value": "secure.san3.poppycdn.com"},
{"name": "add.sans",
"value": "www.abc.com"}],
"lastStatusChange": "2015-03-19T21:47:10Z",
"spsId": 1789,
"status": "SPS Request Complete",
"workflowProgress": "",
"jobId": 44306}]})
)
controller.sps_api_client.post.return_value = mock.Mock(
status_code=500,
text='INTERNAL SERVER ERROR'
)
responder = controller.create_certificate(
ssl_certificate.load_from_json(data),
False
)
controller.sps_api_client.get.assert_called_once_with(
controller.sps_api_base_url.format(spsId=lastSpsId))
self.assertIsNone(responder['Akamai']['cert_domain'])
self.assertEqual(
'create_in_progress',
responder['Akamai']['extra_info']['status']
)
self.assertEqual(
'San cert request for www.abc.com has been enqueued.',
responder['Akamai']['extra_info']['action']
)
mod_san_q = self.driver.mod_san_queue
mod_san_q.enqueue_mod_san_request.assert_called_once_with(
json.dumps(ssl_certificate.load_from_json(data).to_dict())
)
def test_regions(self):
controller = services.ServiceController(self.driver)
self.assertEqual(controller.driver.regions,