Add code to populate CA tables and select plugin based on ca_id

Added code to orders to validate the input if ca_id is provided.
When an order is created, the plugin is selected based on the provided
ca_id, or any project or global preferred ca_ids.

Also added code to populate and update the CA tables based on
calls to the plugin.

And of course, lots and lots of tests.

Change-Id: Icbf1fd3563e1804932eea82d209bd67e5af73edb
Implements: blueprint identify-cas
This commit is contained in:
Ade Lee
2015-03-17 16:57:22 -04:00
parent 169d1af582
commit c435dfbaf5
13 changed files with 620 additions and 6 deletions

View File

@@ -11,10 +11,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import mock
import testtools
from barbican.model import models
from barbican.plugin.interface import certificate_manager as cm
from barbican.tests import utils
class WhenTestingCertificateEventPluginManager(testtools.TestCase):
@@ -99,7 +103,8 @@ class WhenTestingCertificateEventPluginManager(testtools.TestCase):
)
class WhenTestingCertificatePluginManager(testtools.TestCase):
class WhenTestingCertificatePluginManager(utils.BaseTestCase,
utils.MockModelRepositoryMixin):
def setUp(self):
super(WhenTestingCertificatePluginManager, self).setUp()
@@ -111,6 +116,43 @@ class WhenTestingCertificatePluginManager(testtools.TestCase):
cm.CertificateRequestType.CUSTOM_REQUEST]
self.plugin_returned.supported_request_types.return_value = types_list
self.plugin_returned.supports.return_value = True
self.plugin_loaded = mock.MagicMock(obj=self.plugin_returned)
expiration = (datetime.datetime.utcnow() + datetime.timedelta(
days=cm.CA_INFO_DEFAULT_EXPIRATION_DAYS))
ca_info = {
cm.INFO_NAME: "my_ca",
cm.INFO_DESCRIPTION: "Certificate Authority my_ca",
cm.INFO_CA_SIGNING_CERT: "Undefined",
cm.INFO_INTERMEDIATES: "Undefined",
cm.INFO_EXPIRATION: expiration.isoformat()
}
self.plugin_returned.get_ca_info.return_value = {
'plugin_ca_id1': ca_info
}
parsed_ca = {
'plugin_name': self.plugin_name,
'plugin_ca_id': 'plugin_ca_id1',
'name': self.plugin_name,
'description': 'Master CA for default plugin',
'ca_signing_certificate': 'ZZZZZ',
'intermediates': 'YYYYY'
}
self.ca = models.CertificateAuthority(parsed_ca)
self.ca.id = 'ca_id'
self.ca_repo = mock.MagicMock()
self.ca_repo.get_by_create_date.return_value = (
self.ca, 0, 1, 1)
self.ca_repo.create_from.return_value = None
self.ca_repo.get.return_value = self.ca
self.project = models.Project()
self.project.id = '12345'
self.setup_ca_repository_mock(self.ca_repo)
self.plugin_loaded = mock.MagicMock(obj=self.plugin_returned)
self.manager = cm.CertificatePluginManager()
self.manager.extensions = [self.plugin_loaded]
@@ -119,6 +161,18 @@ class WhenTestingCertificatePluginManager(testtools.TestCase):
self.assertEqual(self.plugin_returned,
self.manager.get_plugin_by_name(self.plugin_name))
def test_get_plugin_by_ca_id(self):
self.assertEqual(self.plugin_returned,
self.manager.get_plugin_by_ca_id('ca_id'))
def test_raises_error_with_no_plugin_by_ca_id_found(self):
self.ca_repo.get.return_value = None
self.assertRaises(
cm.CertificatePluginNotFoundForCAID,
self.manager.get_plugin_by_ca_id,
'any-name-here'
)
def test_raises_error_with_no_plugin_by_name_found(self):
self.manager.extensions = []
self.assertRaises(
@@ -154,3 +208,79 @@ class WhenTestingCertificatePluginManager(testtools.TestCase):
self.manager.get_plugin,
self.cert_spec
)
def test_get_plugin_with_ca_to_be_added(self):
self.ca_repo.get_by_create_date.return_value = (
None, 0, 1, 0)
self.assertEqual(self.plugin_returned,
self.manager.get_plugin(self.cert_spec))
def test_refresh_ca_list(self):
utc_now = datetime.datetime.utcnow()
expired_time = utc_now - datetime.timedelta(days=1)
expiration = utc_now + datetime.timedelta(days=1)
ca1_info = {
cm.INFO_NAME: "expired_ca_to_be_modified",
cm.INFO_DESCRIPTION: "expired_ca to be modified",
cm.INFO_CA_SIGNING_CERT: "XXXXXXX-expired-XXXXXX",
cm.INFO_INTERMEDIATES: "YYYYYYY-expired-YYYYYYY",
cm.INFO_EXPIRATION: expired_time.isoformat()
}
ca1_modified_info = {
cm.INFO_NAME: "expired_ca_to_be_modified",
cm.INFO_DESCRIPTION: "expired_ca to be modified",
cm.INFO_CA_SIGNING_CERT: "XXXXXXX-no-longer-expired-XXXXXX",
cm.INFO_INTERMEDIATES: "YYYYYYY-no-longer-expired-YYYYYYY",
cm.INFO_EXPIRATION: expiration.isoformat()
}
ca2_info = {
cm.INFO_NAME: "expired_ca_to_be_deleted",
cm.INFO_DESCRIPTION: "expired ca to be deleted",
cm.INFO_CA_SIGNING_CERT: "XXXX-expired-to-be-deleted-XXXX",
cm.INFO_INTERMEDIATES: "YYYY-expired-to-be-deleted-YYYY",
cm.INFO_EXPIRATION: expired_time.isoformat()
}
ca3_info = {
cm.INFO_NAME: "new-ca-to-be-added",
cm.INFO_DESCRIPTION: "new-ca-to-be-added",
cm.INFO_CA_SIGNING_CERT: "XXXX-to-be-addeed-XXXX",
cm.INFO_INTERMEDIATES: "YYYY-to-be-added-YYYY",
cm.INFO_EXPIRATION: expiration.isoformat()
}
self.plugin_returned.get_ca_info.return_value = {
'plugin_ca_id_ca1': ca1_modified_info,
'plugin_ca_id_ca3': ca3_info
}
parsed_ca1 = dict(ca1_info)
parsed_ca1[cm.PLUGIN_CA_ID] = 'plugin_ca_id_ca1'
parsed_ca1['plugin_name'] = self.plugin_name
ca1 = models.CertificateAuthority(parsed_ca1)
ca1.id = "ca1_id"
parsed_ca2 = dict(ca2_info)
parsed_ca2[cm.PLUGIN_CA_ID] = 'plugin_ca_id_ca2'
parsed_ca2['plugin_name'] = self.plugin_name
ca2 = models.CertificateAuthority(parsed_ca2)
ca2.id = "ca2_id"
side_effect = [(None, 0, 4, 0),
([ca1, ca2], 0, 4, 2)]
self.ca_repo.get_by_create_date.side_effect = side_effect
self.manager.refresh_ca_table()
self.plugin_returned.get_ca_info.assert_called_once()
self.ca_repo.update_entity.assert_called_once_with(
ca1,
ca1_modified_info)
self.ca_repo.delete_entity_by_id.assert_called_once_with(
ca2.id,
None)
self.ca_repo.create_from.assert_called_once()