feat: Delete SSL Cert endpoint

REQUEST:
    DELETE /ssl_certificate/{domain_name}
RESPONSE:
    202 ACCEPTED

Change-Id: I7ca25c0012a27c66e312ae8470614ec03f8e2c24
This commit is contained in:
Sriram Madapusi Vasudevan 2015-10-15 12:15:07 -04:00 committed by tonytan4ever
parent 964d7cf43a
commit 5a5e5fc979
12 changed files with 232 additions and 10 deletions

1
.gitignore vendored
View File

@ -6,6 +6,7 @@
# Packages
*.egg
*.eggs
*.egg-info
dist
build

View File

@ -0,0 +1,40 @@
# Copyright (c) 2015 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from taskflow.patterns import graph_flow
from taskflow.patterns import linear_flow
from taskflow import retry
from poppy.distributed_task.taskflow.task import delete_ssl_certificate_tasks
from poppy.openstack.common import log
LOG = log.getLogger(__name__)
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
def delete_ssl_certificate():
flow = graph_flow.Flow('Deleting poppy ssl certificate').add(
linear_flow.Flow("Deleting poppy ssl certificate",
retry=retry.Times(5)).add(
delete_ssl_certificate_tasks.DeleteProviderSSLCertificateTask()
),
delete_ssl_certificate_tasks.SendNotificationTask(),
delete_ssl_certificate_tasks.DeleteStorageSSLCertificateTask()
)
return flow

View File

@ -0,0 +1,71 @@
# Copyright (c) 2015 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from taskflow import task
from poppy.distributed_task.utils import memoized_controllers
from poppy.openstack.common import log
LOG = log.getLogger(__name__)
conf = cfg.CONF
conf(project='poppy', prog='poppy', args=[])
class DeleteProviderSSLCertificateTask(task.Task):
default_provides = "responders"
def execute(self, providers_list_json, cert_obj_json):
# Note(tonytan4ever): For right now there is no
# way to code the process of deleting a certificate object
# from Akamai
responders = []
return responders
class SendNotificationTask(task.Task):
def execute(self, project_id, responders, domain_name, cert_type):
service_controller = memoized_controllers.task_controllers('poppy')
notification_content = (
"Project ID: %s, Domain Name: %s, Cert type: %s" %
(project_id, domain_name, cert_type))
for n_driver in service_controller._driver.notification:
service_controller.notification_wrapper.send(
n_driver,
n_driver.obj.notification_subject,
notification_content)
return
class DeleteStorageSSLCertificateTask(task.Task):
def execute(self, project_id, domain_name, cert_type):
service_controller, self.storage_controller = \
memoized_controllers.task_controllers('poppy', 'storage')
self.storage_controller.delete_cert(project_id, domain_name, cert_type)
def revert(self, *args, **kwargs):
try:
if getattr(self, 'storage_controller') \
and self.storage_controller._driver.session:
self.storage_controller._driver.close_connection()
LOG.info('Cassandra session being shutdown')
except AttributeError:
LOG.info('Cassandra session already shutdown')

View File

@ -271,6 +271,7 @@ class DefaultServicesController(base.ServicesController):
service_new_json['service_id'] = service_old.service_id
service_new = service.Service.init_from_dict(project_id,
service_new_json)
store = str(uuid.uuid4()).replace('-', '_')
service_new.provider_details = service_old.provider_details
@ -348,6 +349,7 @@ class DefaultServicesController(base.ServicesController):
providers[0].provider_id.title()]
)
domain.cert_info = new_cert_obj
if hasattr(self, store):
delattr(self, store)

View File

@ -16,6 +16,7 @@
import json
from poppy.distributed_task.taskflow.flow import create_ssl_certificate
from poppy.distributed_task.taskflow.flow import delete_ssl_certificate
from poppy.manager import base
@ -55,3 +56,15 @@ class DefaultSSLCertificateController(base.SSLCertificateController):
create_ssl_certificate.create_ssl_certificate,
**kwargs)
return kwargs
def delete_ssl_certificate(self, project_id, domain_name,
cert_type):
kwargs = {
'project_id': project_id,
'domain_name': domain_name,
'cert_type': cert_type
}
self.distributed_task_controller.submit_task(
delete_ssl_certificate.delete_ssl_certificate,
**kwargs)
return kwargs

View File

@ -36,7 +36,7 @@ MAIL_NOTIFICATION_OPTIONS = [
cfg.ListOpt('recipients',
help='A list of emails addresses to receive notification '),
cfg.StrOpt('notification_subject',
default='Poppy SSL Certificate Provisioned',
default='Poppy SSL Certificate Provisioned/Deleted',
help='The subject of the email notification ')
]

View File

@ -215,6 +215,11 @@ CQL_SEARCH_CERT_BY_DOMAIN = '''
WHERE domain_name = %(domain_name)s
'''
CQL_DELETE_CERT = '''
DELETE FROM certificate_info
WHERE domain_name = %(domain_name)s
'''
CQL_UPDATE_SERVICE = CQL_CREATE_SERVICE
CQL_GET_PROVIDER_DETAILS = '''
@ -537,6 +542,43 @@ class ServicesController(base.ServicesController):
else:
return None
def delete_cert(self, project_id, domain_name, cert_type):
"""delete_cert
Delete a certificate.
:param project_id
:param domain_name
:param cert_type
:raises ValueError
"""
args = {
'domain_name': domain_name.lower()
}
stmt = query.SimpleStatement(
CQL_SEARCH_CERT_BY_DOMAIN,
consistency_level=self._driver.consistency_level)
results = self.session.execute(stmt, args)
if results:
for r in results:
r_project_id = str(r.get('project_id'))
r_cert_type = str(r.get('cert_type'))
if r_project_id == str(project_id) and \
r_cert_type == str(cert_type):
args = {
'domain_name': str(r.get('domain_name'))
}
stmt = query.SimpleStatement(
CQL_DELETE_CERT,
consistency_level=self._driver.consistency_level)
self.session.execute(stmt, args)
else:
raise ValueError("No certificate found for: %, type: %s" %
(domain_name, cert_type))
def create(self, project_id, service_obj):
"""create.

View File

@ -166,6 +166,10 @@ class ServicesController(base.ServicesController):
def create_cert(self, project_id, cert_obj):
pass
def delete_cert(self, project_id, domain_name, cert_type):
if "non_exist" in domain_name:
raise ValueError("No certs on this domain")
@staticmethod
def format_result(result):
service_id = result.get('service_id')

View File

@ -61,3 +61,25 @@ class SSLCertificateController(base.Controller, hooks.HookController):
'Reason: %s' % str(e))
return pecan.Response(None, 202)
@pecan.expose('json')
@decorators.validate(
domain_name=rule.Rule(
helpers.is_valid_domain_by_name(),
helpers.abort_with_message)
)
def delete(self, domain_name):
# For now we only support 'san' cert type
cert_type = pecan.request.GET.get('cert_type', 'san')
certificate_controller = \
self._driver.manager.ssl_certificate_controller
try:
certificate_controller.delete_ssl_certificate(
self.project_id, domain_name, cert_type
)
except ValueError as e:
pecan.abort(400, detail='Delete ssl certificate failed. '
'Reason: %s' % str(e))
return pecan.Response(None, 202)

View File

@ -33,7 +33,7 @@ class TestCreateSSLCertificate(base.TestBase):
flavor_id = test_data.get('flavor_id') or self.flavor_id
project_id = self.client.project_id
if test_data.get("missing_flavor_id", False):
flavor_id = None
self.flavor_id = None
resp = self.client.create_ssl_certificate(
cert_type=cert_type,
@ -50,13 +50,13 @@ class TestCreateSSLCertificate(base.TestBase):
self.skipTest('Create ssl certificate needs to'
' be run when commanded')
cert_type = test_data.get('cert_type')
self.cert_type = test_data.get('cert_type')
rand_string = self.generate_random_string()
domain_name = rand_string + test_data.get('domain_name')
flavor_id = test_data.get('flavor_id') or self.flavor_id
project_id = self.client.project_id
resp = self.client.create_ssl_certificate(
cert_type=cert_type,
cert_type=self.cert_type,
domain_name=domain_name,
flavor_id=flavor_id,
project_id=project_id
@ -64,10 +64,9 @@ class TestCreateSSLCertificate(base.TestBase):
self.assertEqual(resp.status_code, 202)
def tearDown(self):
# @todo(malini): Add delete cert when the endpoint is ready.
# self.client.delete_ssl_certificate(
# cert_type=cert_type,
# domain_name=domain_name,
# flavor_id=flavor_id
# )
self.client.delete_ssl_certificate(
cert_type=self.cert_type,
domain_name=self.domain_name,
flavor_id=self.flavor_id
)
super(TestCreateSSLCertificate, self).tearDown()

View File

@ -453,3 +453,17 @@ class PoppyClient(client.AutoMarshallingHTTPClient):
return self.request('POST', url, request_entity=requests_object,
requestslib_kwargs=requestslib_kwargs)
def delete_ssl_certificate(self, cert_type=None,
domain_name=None, flavor_id=None,
requestslib_kwargs=None,):
"""Deletes SSL Certificate
:return: Response Object containing response code 202
GET
ssl_certificate
"""
url = '{0}/ssl_certificate/{1}'.format(self.url, domain_name)
return self.request('DELETE', url,
requestslib_kwargs=requestslib_kwargs)

View File

@ -89,5 +89,19 @@ class SSLCertificateControllerTest(base.FunctionalTest):
expect_errors=True)
self.assertEqual(400, response.status_code)
def test_delete_cert(self):
# create with errorenous data: invalid json data
response = self.app.delete('/v1.0/ssl_certificate/blog.test.com',
headers={'X-Project-ID': self.project_id}
)
self.assertEqual(202, response.status_code)
def test_delete_cert_non_exist(self):
# create with errorenous data: invalid json data
response = self.app.delete('/v1.0/ssl_certificate/blog.non_exist.com',
headers={'X-Project-ID': self.project_id},
expect_errors=True)
self.assertEqual(400, response.status_code)
def tearDown(self):
super(SSLCertificateControllerTest, self).tearDown()