Switch to testtools and make barbican compatible with Python2.6

Change-Id: I1e08612d6c9d43808d1f1db166ea1a807c63aae8
This commit is contained in:
Donald Stufft 2014-03-12 23:15:11 -04:00
parent 4d4d534a0c
commit a5d2705e90
13 changed files with 724 additions and 449 deletions

View File

@ -21,12 +21,13 @@ resource classes. For RBAC tests of these classes, see the
import base64
import json
import unittest
import urllib
import falcon
import mock
import testtools
from barbican.api import strip_whitespace
from barbican.api import resources as res
from barbican.common import exception as excep
@ -100,8 +101,10 @@ def create_container(id_ref):
return container
class WhenTestingVersionResource(unittest.TestCase):
class WhenTestingVersionResource(testtools.TestCase):
def setUp(self):
super(WhenTestingVersionResource, self).setUp()
self.req = mock.MagicMock()
self.resp = mock.MagicMock()
self.resource = res.VersionResource()
@ -119,10 +122,8 @@ class WhenTestingVersionResource(unittest.TestCase):
self.assertEqual('current', parsed_body['v1'])
class BaseSecretsResource(unittest.TestCase):
class BaseSecretsResource(testtools.TestCase):
"""Base test class for the Secrets resource."""
def setUp(self):
pass
def _init(self, payload=b'not-encrypted',
payload_content_type='text/plain',
@ -305,9 +306,11 @@ class BaseSecretsResource(unittest.TestCase):
self.payload_content_encoding
self.stream.read.return_value = json.dumps(self.secret_req)
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.keystone_id
)
self.assertEqual(falcon.HTTP_413, exception.status)
def _test_should_fail_due_to_empty_payload(self):
@ -323,9 +326,11 @@ class BaseSecretsResource(unittest.TestCase):
self.payload_content_encoding
self.stream.read.return_value = json.dumps(self.secret_req)
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.keystone_id
)
self.assertEqual(falcon.HTTP_400, exception.status)
@ -364,9 +369,11 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
'payload': self.payload}
self.stream.read.return_value = json.dumps(self.secret_req)
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.keystone_id,
)
self.assertEqual(falcon.HTTP_400, exception.status)
def test_create_secret_content_type_text_plain(self):
@ -424,9 +431,11 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
self.secret_req = {'payload_content_type':
'text/plain'}
self.stream.read.return_value = json.dumps(self.secret_req)
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.keystone_id,
)
self.assertEqual(falcon.HTTP_400, exception.status)
self.secret_req = {'payload_content_type':
@ -440,6 +449,7 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretsResource):
def setUp(self):
super(WhenCreatingBinarySecretsUsingSecretsResource, self).setUp()
self._init(payload="...lOtfqHaUUpe6NqLABgquYQ==",
payload_content_type='application/octet-stream',
payload_content_encoding='base64')
@ -475,9 +485,11 @@ class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretsResource):
'payload_content_type': 'application/octet-stream'}
)
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.keystone_id
)
self.assertEqual(falcon.HTTP_400, exception.status)
def test_create_secret_fails_with_binary_payload_bad_encoding(self):
@ -491,9 +503,11 @@ class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretsResource):
'payload_content_encoding': 'bogus64'}
)
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.keystone_id
)
self.assertEqual(falcon.HTTP_400, exception.status)
def test_create_secret_fails_with_binary_payload_no_content_type(self):
@ -506,9 +520,11 @@ class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretsResource):
'payload_content_encoding': 'base64'}
)
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.keystone_id
)
self.assertEqual(falcon.HTTP_400, exception.status)
def test_create_secret_fails_with_bad_payload(self):
@ -522,14 +538,18 @@ class WhenCreatingBinarySecretsUsingSecretsResource(BaseSecretsResource):
'payload_content_encoding': 'base64'}
)
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.keystone_id
)
self.assertEqual(falcon.HTTP_400, exception.status)
class WhenGettingSecretsListUsingSecretsResource(unittest.TestCase):
class WhenGettingSecretsListUsingSecretsResource(testtools.TestCase):
def setUp(self):
super(WhenGettingSecretsListUsingSecretsResource, self).setUp()
self.tenant_id = 'tenant1234'
self.keystone_id = 'keystone1234'
self.name = 'name 1234 !@#$%^&*()_+=-{}[];:<>,./?'
@ -677,8 +697,13 @@ class WhenGettingSecretsListUsingSecretsResource(unittest.TestCase):
return '/v1/{0}/secrets'.format(keystone_id)
class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
class WhenGettingPuttingOrDeletingSecretUsingSecretResource(
testtools.TestCase):
def setUp(self):
super(
WhenGettingPuttingOrDeletingSecretUsingSecretResource,
self,
).setUp()
self.tenant_id = 'tenantid1234'
self.keystone_id = 'keystone1234'
self.name = 'name1234'
@ -815,30 +840,32 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
def test_should_throw_exception_for_get_when_secret_not_found(self):
self.secret_repo.get.return_value = None
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_get(self.req, self.resp, self.keystone_id,
self.secret.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_get,
self.req, self.resp, self.keystone_id, self.secret.id,
)
self.assertEqual(falcon.HTTP_404, exception.status)
def test_should_throw_exception_for_get_when_accept_not_supported(self):
self.req.accept = 'bogusaccept'
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_get(self.req, self.resp, self.keystone_id,
self.secret.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_get,
self.req, self.resp, self.keystone_id, self.secret.id,
)
self.assertEqual(falcon.HTTP_406, exception.status)
def test_should_throw_exception_for_get_when_datum_not_available(self):
self.req.accept = 'text/plain'
self.secret.encrypted_data = []
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_get(self.req, self.resp, self.keystone_id,
self.secret.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_get,
self.req, self.resp, self.keystone_id, self.secret.id,
)
self.assertEqual(falcon.HTTP_404, exception.status)
def test_should_put_secret_as_plain(self):
@ -887,10 +914,11 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
# mock Content-Encoding header
self.req.get_header.return_value = 'bogusencoding'
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_put(self.req, self.resp, self.keystone_id,
self.secret.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_put,
self.req, self.resp, self.keystone_id, self.secret.id,
)
self.assertEqual(falcon.HTTP_400, exception.status)
def test_should_fail_put_secret_as_json(self):
@ -898,10 +926,11 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
self.req.content_type = 'application/json'
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_put(self.req, self.resp, self.keystone_id,
self.secret.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_put,
self.req, self.resp, self.keystone_id, self.secret.id,
)
self.assertEqual(falcon.HTTP_415, exception.status)
def test_should_fail_put_secret_not_found(self):
@ -910,10 +939,11 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
# Force error, due to secret not found.
self.secret_repo.get.return_value = None
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_put(self.req, self.resp, self.keystone_id,
self.secret.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_put,
self.req, self.resp, self.keystone_id, self.secret.id,
)
self.assertEqual(falcon.HTTP_404, exception.status)
def test_should_fail_put_secret_no_payload(self):
@ -922,10 +952,11 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
# Force error due to no data passed in the request.
self.stream.read.return_value = None
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_put(self.req, self.resp, self.keystone_id,
self.secret.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_put,
self.req, self.resp, self.keystone_id, self.secret.id,
)
self.assertEqual(falcon.HTTP_400, exception.status)
def test_should_fail_put_secret_with_existing_datum(self):
@ -934,11 +965,11 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
# Force error due to secret already having data
self.secret.encrypted_data = [self.datum]
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_put(self.req, self.resp, self.keystone_id,
self.secret.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_put,
self.req, self.resp, self.keystone_id, self.secret.id,
)
self.assertEqual(falcon.HTTP_409, exception.status)
def test_should_fail_due_to_empty_payload(self):
@ -946,10 +977,11 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
self.stream.read.return_value = ''
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_put(self.req, self.resp, self.keystone_id,
self.secret.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_put,
self.req, self.resp, self.keystone_id, self.secret.id,
)
self.assertEqual(falcon.HTTP_400, exception.status)
def test_should_fail_due_to_plain_text_too_large(self):
@ -959,10 +991,11 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
2 * validators.DEFAULT_MAX_SECRET_BYTES)])
self.stream.read.return_value = big_text
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_put(self.req, self.resp, self.keystone_id,
self.secret.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_put,
self.req, self.resp, self.keystone_id, self.secret.id,
)
self.assertEqual(falcon.HTTP_413, exception.status)
def test_should_delete_secret(self):
@ -977,10 +1010,11 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
self.secret_repo.delete_entity_by_id.side_effect = excep.NotFound(
"Test not found exception")
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_delete(self.req, self.resp, self.keystone_id,
self.secret.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_delete,
self.req, self.resp, self.keystone_id, self.secret.id,
)
self.assertEqual(falcon.HTTP_404, exception.status)
def _setup_for_puts(self):
@ -997,8 +1031,10 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(unittest.TestCase):
self.req.stream = self.stream
class WhenCreatingOrdersUsingOrdersResource(unittest.TestCase):
class WhenCreatingOrdersUsingOrdersResource(testtools.TestCase):
def setUp(self):
super(WhenCreatingOrdersUsingOrdersResource, self).setUp()
self.secret_name = 'name'
self.secret_payload_content_type = 'application/octet-stream'
self.secret_algorithm = "aes"
@ -1055,24 +1091,28 @@ class WhenCreatingOrdersUsingOrdersResource(unittest.TestCase):
def test_should_fail_add_new_order_no_secret(self):
self.stream.read.return_value = '{}'
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp,
self.tenant_keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.tenant_keystone_id,
)
self.assertEqual(falcon.HTTP_400, exception.status)
def test_should_fail_add_new_order_bad_json(self):
self.stream.read.return_value = ''
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp,
self.tenant_keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.tenant_keystone_id,
)
self.assertEqual(falcon.HTTP_400, exception.status)
class WhenGettingOrdersListUsingOrdersResource(unittest.TestCase):
class WhenGettingOrdersListUsingOrdersResource(testtools.TestCase):
def setUp(self):
super(WhenGettingOrdersListUsingOrdersResource, self).setUp()
self.tenant_id = 'tenant1234'
self.keystone_id = 'keystoneid1234'
self.name = 'name1234'
@ -1172,8 +1212,9 @@ class WhenGettingOrdersListUsingOrdersResource(unittest.TestCase):
return '/v1/{0}/orders'.format(self.keystone_id)
class WhenGettingOrDeletingOrderUsingOrderResource(unittest.TestCase):
class WhenGettingOrDeletingOrderUsingOrderResource(testtools.TestCase):
def setUp(self):
super(WhenGettingOrDeletingOrderUsingOrderResource, self).setUp()
self.tenant_keystone_id = 'keystoneid1234'
self.requestor = 'requestor1234'
@ -1208,27 +1249,30 @@ class WhenGettingOrDeletingOrderUsingOrderResource(unittest.TestCase):
def test_should_throw_exception_for_get_when_order_not_found(self):
self.order_repo.get.return_value = None
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_get(self.req, self.resp, self.tenant_keystone_id,
self.order.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_get,
self.req, self.resp, self.tenant_keystone_id, self.order.id,
)
self.assertEqual(falcon.HTTP_404, exception.status)
def test_should_throw_exception_for_delete_when_order_not_found(self):
self.order_repo.delete_entity_by_id.side_effect = excep.NotFound(
"Test not found exception")
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_delete(self.req, self.resp,
self.tenant_keystone_id,
self.order.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_delete,
self.req, self.resp, self.tenant_keystone_id, self.order.id,
)
self.assertEqual(falcon.HTTP_404, exception.status)
class WhenAddingNavigationHrefs(unittest.TestCase):
class WhenAddingNavigationHrefs(testtools.TestCase):
def setUp(self):
super(WhenAddingNavigationHrefs, self).setUp()
self.resource_name = 'orders'
self.keystone_id = '12345'
self.num_elements = 100
@ -1274,8 +1318,12 @@ class WhenAddingNavigationHrefs(unittest.TestCase):
self.assertNotIn('next', data_with_hrefs)
class WhenCreatingVerificationsUsingVerificationsResource(unittest.TestCase):
class WhenCreatingVerificationsUsingVerificationsResource(testtools.TestCase):
def setUp(self):
super(
WhenCreatingVerificationsUsingVerificationsResource,
self,
).setUp()
self.resource_type = 'image'
self.resource_ref = 'http://www.images.com/v1/images/12345'
self.resource_action = 'vm_attach'
@ -1332,10 +1380,11 @@ class WhenCreatingVerificationsUsingVerificationsResource(unittest.TestCase):
self.json = json.dumps(self.verify_req)
self.stream.read.return_value = self.json
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp,
self.tenant_keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.tenant_keystone_id,
)
self.assertEqual(falcon.HTTP_400, exception.status)
def test_should_fail_verification_unsupported_resource_type(self):
@ -1343,24 +1392,30 @@ class WhenCreatingVerificationsUsingVerificationsResource(unittest.TestCase):
self.json = json.dumps(self.verify_req)
self.stream.read.return_value = self.json
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp,
self.tenant_keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.tenant_keystone_id,
)
self.assertEqual(falcon.HTTP_400, exception.status)
def test_should_fail_verification_bad_json(self):
self.stream.read.return_value = ''
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp,
self.tenant_keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.tenant_keystone_id,
)
self.assertEqual(falcon.HTTP_400, exception.status)
class WhenGettingOrDeletingVerificationUsingVerifyResource(unittest.TestCase):
class WhenGettingOrDeletingVerificationUsingVerifyResource(testtools.TestCase):
def setUp(self):
super(
WhenGettingOrDeletingVerificationUsingVerifyResource,
self,
).setUp()
self.tenant_keystone_id = 'keystoneid1234'
self.requestor = 'requestor1234'
@ -1395,21 +1450,22 @@ class WhenGettingOrDeletingVerificationUsingVerifyResource(unittest.TestCase):
def test_should_throw_exception_for_get_when_verify_not_found(self):
self.verify_repo.get.return_value = None
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_get(self.req, self.resp, self.tenant_keystone_id,
self.verification.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_get,
self.req, self.resp, self.tenant_keystone_id, self.verification.id,
)
self.assertEqual(falcon.HTTP_404, exception.status)
def test_should_throw_exception_for_delete_when_verify_not_found(self):
self.verify_repo.delete_entity_by_id.side_effect = excep.NotFound(
"Test not found exception")
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_delete(self.req, self.resp,
self.tenant_keystone_id,
self.verification.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_delete,
self.req, self.resp, self.tenant_keystone_id, self.verification.id,
)
self.assertEqual(falcon.HTTP_404, exception.status)
def _create_verification(self, id="id",
@ -1427,8 +1483,10 @@ class WhenGettingOrDeletingVerificationUsingVerifyResource(unittest.TestCase):
return verification
class WhenGettingVerificationsListUsingResource(unittest.TestCase):
class WhenGettingVerificationsListUsingResource(testtools.TestCase):
def setUp(self):
super(WhenGettingVerificationsListUsingResource, self).setUp()
self.tenant_id = 'tenant1234'
self.keystone_id = 'keystoneid1234'
@ -1517,7 +1575,7 @@ class WhenGettingVerificationsListUsingResource(unittest.TestCase):
return '/v1/{0}/verifications'.format(self.keystone_id)
class TestingJsonSanitization(unittest.TestCase):
class TestingJsonSanitization(testtools.TestCase):
def test_json_sanitization_without_array(self):
json_without_array = {"name": "name", "algorithm": "AES",
@ -1555,8 +1613,10 @@ class TestingJsonSanitization(unittest.TestCase):
.endswith(' '), "whitespace should be gone")
class WhenCreatingContainersUsingContainersResource(unittest.TestCase):
class WhenCreatingContainersUsingContainersResource(testtools.TestCase):
def setUp(self):
super(WhenCreatingContainersUsingContainersResource, self).setUp()
self.name = 'test container name'
self.type = 'generic'
self.secret_refs = [
@ -1619,23 +1679,31 @@ class WhenCreatingContainersUsingContainersResource(unittest.TestCase):
def test_should_fail_container_bad_json(self):
self.stream.read.return_value = ''
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp,
self.tenant_keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.tenant_keystone_id,
)
self.assertEqual(falcon.HTTP_400, exception.status)
def test_should_throw_exception_when_secret_ref_doesnt_exist(self):
self.secret_repo.get.return_value = None
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_post(self.req, self.resp, self.tenant_keystone_id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_post,
self.req, self.resp, self.tenant_keystone_id,
)
self.assertEqual(falcon.HTTP_404, exception.status)
class WhenGettingOrDeletingContainerUsingContainerResource(unittest.TestCase):
class WhenGettingOrDeletingContainerUsingContainerResource(testtools.TestCase):
def setUp(self):
super(
WhenGettingOrDeletingContainerUsingContainerResource,
self,
).setUp()
self.tenant_keystone_id = 'keystoneid1234'
self.tenant_internal_id = 'tenantid1234'
@ -1678,26 +1746,29 @@ class WhenGettingOrDeletingContainerUsingContainerResource(unittest.TestCase):
def test_should_throw_exception_for_get_when_container_not_found(self):
self.container_repo.get.return_value = None
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_get(self.req, self.resp, self.tenant_keystone_id,
self.container.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_get,
self.req, self.resp, self.tenant_keystone_id, self.container.id,
)
self.assertEqual(falcon.HTTP_404, exception.status)
def test_should_throw_exception_for_delete_when_container_not_found(self):
self.container_repo.delete_entity_by_id.side_effect = excep.NotFound(
"Test not found exception")
with self.assertRaises(falcon.HTTPError) as cm:
self.resource.on_delete(self.req, self.resp,
self.tenant_keystone_id,
self.container.id)
exception = cm.exception
exception = self.assertRaises(
falcon.HTTPError,
self.resource.on_delete,
self.req, self.resp, self.tenant_keystone_id, self.container.id,
)
self.assertEqual(falcon.HTTP_404, exception.status)
class WhenGettingContainersListUsingResource(unittest.TestCase):
class WhenGettingContainersListUsingResource(testtools.TestCase):
def setUp(self):
super(WhenGettingContainersListUsingResource, self).setUp()
self.tenant_id = 'tenant1234'
self.keystone_id = 'keystoneid1234'

View File

@ -20,7 +20,8 @@ For typical-flow business logic tests of these classes, see the
"""
import os
import unittest
import testtools
import falcon
import mock
@ -40,9 +41,10 @@ TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
ENFORCER = policy.Enforcer()
class BaseTestCase(unittest.TestCase):
class BaseTestCase(testtools.TestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
CONF(args=['--config-dir', TEST_VAR_DIR])
self.policy_enforcer = ENFORCER
self.policy_enforcer.load_rules(True)
@ -79,9 +81,8 @@ class BaseTestCase(unittest.TestCase):
def _assert_post_rbac_exception(self, exception, role):
"""Assert that we received the expected RBAC-passed exception."""
fail_msg = "Expected RBAC pass on role '{0}'".format(role)
self.assertEqual(falcon.HTTP_500, exception.status, msg=fail_msg)
self.assertEquals('Read Error', exception.title, msg=fail_msg)
self.assertEqual(falcon.HTTP_500, exception.status)
self.assertEquals('Read Error', exception.title)
def _generate_get_error(self):
"""Falcon exception generator to throw from early-exit mocks.
@ -110,9 +111,8 @@ class BaseTestCase(unittest.TestCase):
# Force an exception early past the RBAC passing.
self.req.stream = self._generate_stream_for_exit()
with self.assertRaises(falcon.HTTPError) as cm:
method_under_test()
self._assert_post_rbac_exception(cm.exception, role)
exception = self.assertRaises(falcon.HTTPError, method_under_test)
self._assert_post_rbac_exception(exception, role)
self.setUp() # Need to re-setup
@ -128,13 +128,8 @@ class BaseTestCase(unittest.TestCase):
self.req = self._generate_req(roles=[role] if role else [],
accept=accept)
with self.assertRaises(falcon.HTTPError) as cm:
method_under_test()
exception = cm.exception
self.assertEqual(falcon.HTTP_403, exception.status,
msg="Expected RBAC fail for role '{0}'".format(
role))
exception = self.assertRaises(falcon.HTTPError, method_under_test)
self.assertEqual(falcon.HTTP_403, exception.status)
self.setUp() # Need to re-setup

View File

@ -12,17 +12,17 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import mock
import testtools
from barbican.common import utils
class WhenTestingAcceptEncodingGetter(unittest.TestCase):
class WhenTestingAcceptEncodingGetter(testtools.TestCase):
def setUp(self):
super(WhenTestingAcceptEncodingGetter, self).setUp()
self.req = mock.Mock()
def test_parses_accept_encoding_header(self):

View File

@ -15,6 +15,9 @@
import unittest
import datetime
import testtools
from barbican.common import exception as excep
from barbican.common import validators
@ -27,9 +30,11 @@ def suite():
return suite
class WhenTestingSecretValidator(unittest.TestCase):
class WhenTestingSecretValidator(testtools.TestCase):
def setUp(self):
super(WhenTestingSecretValidator, self).setUp()
self.name = 'name'
self.payload = b'not-encrypted'
self.payload_content_type = 'text/plain'
@ -123,50 +128,62 @@ class WhenTestingSecretValidator(unittest.TestCase):
def test_should_fail_numeric_name(self):
self.secret_req['name'] = 123
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.secret_req)
self.assertEqual('name', e.exception.invalid_property)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
self.assertEqual('name', exception.invalid_property)
def test_should_fail_negative_bit_length(self):
self.secret_req['bit_length'] = -23
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.secret_req)
self.assertEqual('bit_length', e.exception.invalid_property)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
self.assertEqual('bit_length', exception.invalid_property)
def test_should_fail_non_integer_bit_length(self):
self.secret_req['bit_length'] = "23"
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.secret_req)
self.assertEqual('bit_length', e.exception.invalid_property)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
self.assertEqual('bit_length', exception.invalid_property)
def test_validation_should_fail_with_empty_payload(self):
self.secret_req['payload'] = ' '
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.secret_req)
self.assertEqual('payload', e.exception.invalid_property)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
self.assertEqual('payload', exception.invalid_property)
def test_should_fail_already_expired(self):
self.secret_req['expiration'] = '2004-02-28T19:14:44.180394'
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.secret_req)
self.assertEqual('expiration', e.exception.invalid_property)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
self.assertEqual('expiration', exception.invalid_property)
def test_should_fail_expiration_nonsense(self):
self.secret_req['expiration'] = 'nonsense'
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.secret_req)
self.assertEqual('expiration', e.exception.invalid_property)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
self.assertEqual('expiration', exception.invalid_property)
def test_should_fail_all_nulls(self):
self.secret_req = {'name': None,
@ -174,8 +191,11 @@ class WhenTestingSecretValidator(unittest.TestCase):
'bit_length': None,
'mode': None}
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.secret_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
def test_should_fail_all_empties(self):
self.secret_req = {'name': '',
@ -183,14 +203,20 @@ class WhenTestingSecretValidator(unittest.TestCase):
'bit_length': '',
'mode': ''}
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.secret_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
def test_should_fail_no_payload_content_type(self):
del self.secret_req['payload_content_type']
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.secret_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
def test_should_fail_with_message_w_bad_payload_content_type(self):
self.secret_req['payload_content_type'] = 'plain/text'
@ -215,27 +241,39 @@ class WhenTestingSecretValidator(unittest.TestCase):
def test_should_fail_with_mixed_case_wrong_payload_content_type(self):
self.secret_req['payload_content_type'] = 'TeXT/PlaneS'
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.secret_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
def test_should_fail_with_upper_case_wrong_payload_content_type(self):
self.secret_req['payload_content_type'] = 'TEXT/PLANE'
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.secret_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
def test_should_fail_with_plain_text_and_encoding(self):
self.secret_req['payload_content_encoding'] = 'base64'
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.secret_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
def test_should_fail_with_wrong_encoding(self):
self.secret_req['payload_content_type'] = 'application/octet-stream'
self.secret_req['payload_content_encoding'] = 'unsupported'
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.secret_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
def test_should_validate_with_wrong_encoding(self):
self.secret_req['payload_content_type'] = 'application/octet-stream'
@ -244,9 +282,11 @@ class WhenTestingSecretValidator(unittest.TestCase):
self.validator.validate(self.secret_req)
class WhenTestingOrderValidator(unittest.TestCase):
class WhenTestingOrderValidator(testtools.TestCase):
def setUp(self):
super(WhenTestingOrderValidator, self).setUp()
self.name = 'name'
self.secret_algorithm = 'aes'
self.secret_bit_length = 128
@ -299,72 +339,99 @@ class WhenTestingOrderValidator(unittest.TestCase):
def test_should_fail_numeric_name(self):
self.secret_req['name'] = 123
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.order_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.order_req,
)
self.assertEqual('name', e.exception.invalid_property)
self.assertEqual('name', exception.invalid_property)
def test_should_fail_bad_mode(self):
self.secret_req['mode'] = 'badmode'
with self.assertRaises(excep.UnsupportedField) as e:
self.validator.validate(self.order_req)
exception = self.assertRaises(
excep.UnsupportedField,
self.validator.validate,
self.order_req,
)
self.assertEqual('mode', e.exception.invalid_field)
self.assertEqual('mode', exception.invalid_field)
def test_should_fail_negative_bit_length(self):
self.secret_req['bit_length'] = -23
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.order_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.order_req,
)
self.assertEqual('bit_length', e.exception.invalid_property)
self.assertEqual('bit_length', exception.invalid_property)
def test_should_fail_non_integer_bit_length(self):
self.secret_req['bit_length'] = "23"
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.order_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.order_req,
)
self.assertEqual('bit_length', e.exception.invalid_property)
self.assertEqual('bit_length', exception.invalid_property)
def test_should_fail_non_multiple_eight_bit_length(self):
self.secret_req['bit_length'] = 129
with self.assertRaises(excep.UnsupportedField) as e:
self.validator.validate(self.order_req)
exception = self.assertRaises(
excep.UnsupportedField,
self.validator.validate,
self.order_req,
)
self.assertEqual('bit_length', e.exception.invalid_field)
self.assertEqual('bit_length', exception.invalid_field)
def test_should_fail_secret_not_order_schema_provided(self):
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.secret_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.secret_req,
)
self.assertEqual('secret', e.exception.invalid_property)
self.assertEqual('secret', exception.invalid_property)
def test_should_fail_payload_provided(self):
self.secret_req['payload'] = ' '
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.order_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.order_req,
)
self.assertTrue('payload' in e.exception.invalid_property)
self.assertTrue('payload' in exception.invalid_property)
def test_should_fail_already_expired(self):
self.secret_req['expiration'] = '2004-02-28T19:14:44.180394'
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.order_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.order_req,
)
self.assertEqual('expiration', e.exception.invalid_property)
self.assertEqual('expiration', exception.invalid_property)
def test_should_fail_expiration_nonsense(self):
self.secret_req['expiration'] = 'nonsense'
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.order_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.order_req,
)
self.assertEqual('expiration', e.exception.invalid_property)
self.assertEqual('expiration', exception.invalid_property)
def test_should_fail_all_nulls(self):
self.secret_req = {'name': None,
@ -373,8 +440,11 @@ class WhenTestingOrderValidator(unittest.TestCase):
'mode': None}
self.order_req = {'secret': self.secret_req}
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.order_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.order_req,
)
def test_should_fail_all_empties(self):
self.secret_req = {'name': '',
@ -383,41 +453,58 @@ class WhenTestingOrderValidator(unittest.TestCase):
'mode': ''}
self.order_req = {'secret': self.secret_req}
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.order_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.order_req,
)
def test_should_fail_no_payload_content_type(self):
del self.secret_req['payload_content_type']
with self.assertRaises(excep.UnsupportedField):
self.validator.validate(self.order_req)
self.assertRaises(
excep.UnsupportedField,
self.validator.validate,
self.order_req,
)
def test_should_fail_unsupported_payload_content_type(self):
self.secret_req['payload_content_type'] = 'text/plain'
with self.assertRaises(excep.UnsupportedField):
self.validator.validate(self.order_req)
self.assertRaises(
excep.UnsupportedField,
self.validator.validate,
self.order_req,
)
def test_should_fail_empty_mode(self):
del self.secret_req['mode']
del self.secret_req['mode']
with self.assertRaises(excep.UnsupportedField) as e:
self.validator.validate(self.order_req)
exception = self.assertRaises(
excep.UnsupportedField,
self.validator.validate,
self.order_req,
)
self.assertEqual('mode', e.exception.invalid_field)
self.assertEqual('mode', exception.invalid_field)
def test_should_fail_empty_algorithm(self):
del self.secret_req['algorithm']
with self.assertRaises(excep.UnsupportedField) as e:
self.validator.validate(self.order_req)
exception = self.assertRaises(
excep.UnsupportedField,
self.validator.validate,
self.order_req,
)
self.assertEqual('algorithm', e.exception.invalid_field)
self.assertEqual('algorithm', exception.invalid_field)
class WhenTestingContainerValidator(unittest.TestCase):
class WhenTestingContainerValidator(testtools.TestCase):
def setUp(self):
super(WhenTestingContainerValidator, self).setUp()
self.name = 'name'
self.type = 'generic'
self.secret_refs = [
@ -451,8 +538,11 @@ class WhenTestingContainerValidator(unittest.TestCase):
def test_should_fail_no_type(self):
del self.container_req['type']
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.container_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
#TODO: (hgedikli) figure out why invalid_property is null here
#self.assertEqual('type', e.exception.invalid_property)
@ -460,26 +550,35 @@ class WhenTestingContainerValidator(unittest.TestCase):
def test_should_fail_empty_type(self):
self.container_req['type'] = ''
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
self.assertEqual('type', e.exception.invalid_property)
self.assertEqual('type', exception.invalid_property)
def test_should_fail_not_supported_type(self):
self.container_req['type'] = 'testtype'
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
self.assertEqual('type', e.exception.invalid_property)
self.assertEqual('type', exception.invalid_property)
def test_should_fail_numeric_name(self):
self.container_req['name'] = 123
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
self.assertEqual('name', e.exception.invalid_property)
self.assertEqual('name', exception.invalid_property)
def test_should_fail_all_nulls(self):
self.container_req = {'name': None,
@ -487,16 +586,22 @@ class WhenTestingContainerValidator(unittest.TestCase):
'bit_length': None,
'secret_refs': None}
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.container_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
def test_should_fail_all_empties(self):
self.container_req = {'name': '',
'type': '',
'secret_refs': []}
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.container_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
def test_should_validate_empty_secret_refs(self):
self.container_req['secret_refs'] = []
@ -505,34 +610,48 @@ class WhenTestingContainerValidator(unittest.TestCase):
def test_should_fail_no_secret_ref_in_secret_refs(self):
del self.container_req['secret_refs'][0]['secret_ref']
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.container_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
def test_should_fail_empty_secret_ref_in_secret_refs(self):
self.container_req['secret_refs'][0]['secret_ref'] = ''
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.container_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
def test_should_fail_numeric_secret_ref_in_secret_refs(self):
self.container_req['secret_refs'][0]['secret_ref'] = 123
with self.assertRaises(excep.InvalidObject):
self.validator.validate(self.container_req)
self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
def test_should_fail_duplicate_names_in_secret_refs(self):
self.container_req['secret_refs'].append(
self.container_req['secret_refs'][0])
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
self.assertEqual('secret_refs', e.exception.invalid_property)
self.assertEqual('secret_refs', exception.invalid_property)
class WhenTestingRSAContainerValidator(unittest.TestCase):
class WhenTestingRSAContainerValidator(testtools.TestCase):
def setUp(self):
super(WhenTestingRSAContainerValidator, self).setUp()
self.name = 'name'
self.type = 'rsa'
self.secret_refs = [
@ -559,26 +678,35 @@ class WhenTestingRSAContainerValidator(unittest.TestCase):
def test_should_fail_no_names_in_secret_refs(self):
del self.container_req['secret_refs'][0]['name']
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
self.assertEqual('secret_refs', e.exception.invalid_property)
self.assertEqual('secret_refs', exception.invalid_property)
def test_should_fail_empty_names_in_secret_refs(self):
self.container_req['secret_refs'][0]['name'] = ''
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
self.assertEqual('secret_refs', e.exception.invalid_property)
self.assertEqual('secret_refs', exception.invalid_property)
def test_should_fail_unsupported_names_in_secret_refs(self):
self.container_req['secret_refs'][0]['name'] = 'testttt'
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
self.assertEqual('secret_refs', e.exception.invalid_property)
self.assertEqual('secret_refs', exception.invalid_property)
def test_should_fail_more_than_3_secret_refs(self):
new_secret_ref = {
@ -587,10 +715,13 @@ class WhenTestingRSAContainerValidator(unittest.TestCase):
}
self.container_req['secret_refs'].append(new_secret_ref)
with self.assertRaises(excep.InvalidObject) as e:
self.validator.validate(self.container_req)
exception = self.assertRaises(
excep.InvalidObject,
self.validator.validate,
self.container_req,
)
self.assertEqual('secret_refs', e.exception.invalid_property)
self.assertEqual('secret_refs', exception.invalid_property)
if __name__ == '__main__':

View File

@ -15,7 +15,7 @@
import base64
import mock
import unittest
import testtools
from barbican.crypto import extension_manager as em
from barbican.crypto import mime_types as mt
@ -41,9 +41,10 @@ class TestSupportsCryptoPlugin(CryptoPluginBase):
return False
class WhenTestingNormalizeBeforeEncryptionForBinary(unittest.TestCase):
class WhenTestingNormalizeBeforeEncryptionForBinary(testtools.TestCase):
def setUp(self):
super(WhenTestingNormalizeBeforeEncryptionForBinary, self).setUp()
self.unencrypted = 'AAAAAAAA'
self.content_type = 'application/octet-stream'
self.content_encoding = 'base64'
@ -69,42 +70,46 @@ class WhenTestingNormalizeBeforeEncryptionForBinary(unittest.TestCase):
def test_encrypt_fail_binary_unknown_encoding(self):
self.content_encoding = 'gzip'
with self.assertRaises(em.CryptoContentEncodingNotSupportedException)\
as cm:
unenc, content = em. \
normalize_before_encryption(self.unencrypted,
self.content_type,
self.content_encoding,
self.enforce_text_only)
ex = cm.exception
ex = self.assertRaises(
em.CryptoContentEncodingNotSupportedException,
em.normalize_before_encryption,
self.unencrypted,
self.content_type,
self.content_encoding,
self.enforce_text_only,
)
self.assertEqual(self.content_encoding, ex.content_encoding)
def test_encrypt_fail_binary_force_text_based_no_encoding(self):
self.content_encoding = None
self.enforce_text_only = True
with self.assertRaises(em.CryptoContentEncodingMustBeBase64):
unenc, content = em. \
normalize_before_encryption(self.unencrypted,
self.content_type,
self.content_encoding,
self.enforce_text_only)
self.assertRaises(
em.CryptoContentEncodingMustBeBase64,
em.normalize_before_encryption,
self.unencrypted,
self.content_type,
self.content_encoding,
self.enforce_text_only,
)
def test_encrypt_fail_unknown_content_type(self):
self.content_type = 'bogus'
with self.assertRaises(em.CryptoContentTypeNotSupportedException)\
as cm:
unenc, content = em \
.normalize_before_encryption(self.unencrypted,
self.content_type,
self.content_encoding,
self.enforce_text_only)
ex = cm.exception
ex = self.assertRaises(
em.CryptoContentTypeNotSupportedException,
em.normalize_before_encryption,
self.unencrypted,
self.content_type,
self.content_encoding,
self.enforce_text_only,
)
self.assertEqual(self.content_type, ex.content_type)
class WhenTestingNormalizeBeforeEncryptionForText(unittest.TestCase):
class WhenTestingNormalizeBeforeEncryptionForText(testtools.TestCase):
def setUp(self):
super(WhenTestingNormalizeBeforeEncryptionForText, self).setUp()
self.unencrypted = 'AAAAAAAA'
self.content_type = 'text/plain'
self.content_encoding = 'base64'
@ -130,41 +135,51 @@ class WhenTestingNormalizeBeforeEncryptionForText(unittest.TestCase):
def test_raises_on_bogus_content_type(self):
content_type = 'text/plain; charset=ISO-8859-1'
with self.assertRaises(em.CryptoContentTypeNotSupportedException):
unenc, content = em.normalize_before_encryption(
self.unencrypted,
content_type,
self.content_encoding,
self.enforce_text_only
)
self.assertRaises(
em.CryptoContentTypeNotSupportedException,
em.normalize_before_encryption,
self.unencrypted,
content_type,
self.content_encoding,
self.enforce_text_only
)
def test_raises_on_no_payload(self):
content_type = 'text/plain; charset=ISO-8859-1'
with self.assertRaises(em.CryptoNoPayloadProvidedException):
unenc, content = em.normalize_before_encryption(
None,
content_type,
self.content_encoding,
self.enforce_text_only
)
self.assertRaises(
em.CryptoNoPayloadProvidedException,
em.normalize_before_encryption,
None,
content_type,
self.content_encoding,
self.enforce_text_only
)
class WhenTestingAnalyzeBeforeDecryption(unittest.TestCase):
class WhenTestingAnalyzeBeforeDecryption(testtools.TestCase):
def setUp(self):
super(WhenTestingAnalyzeBeforeDecryption, self).setUp()
self.content_type = 'application/octet-stream'
def test_decrypt_fail_bogus_content_type(self):
self.content_type = 'bogus'
with self.assertRaises(em.CryptoAcceptNotSupportedException) as cm:
em.analyze_before_decryption(self.content_type)
ex = cm.exception
ex = self.assertRaises(
em.CryptoAcceptNotSupportedException,
em.analyze_before_decryption,
self.content_type,
)
self.assertEqual(self.content_type, ex.accept)
class WhenTestingDenormalizeAfterDecryption(unittest.TestCase):
class WhenTestingDenormalizeAfterDecryption(testtools.TestCase):
def setUp(self):
super(WhenTestingDenormalizeAfterDecryption, self).setUp()
self.unencrypted = 'AAAAAAAA'
self.content_type = 'application/octet-stream'
@ -181,21 +196,28 @@ class WhenTestingDenormalizeAfterDecryption(unittest.TestCase):
def test_decrypt_fail_unknown_content_type(self):
self.content_type = 'bogus'
with self.assertRaises(em.CryptoGeneralException):
em.denormalize_after_decryption(self.unencrypted,
self.content_type)
self.assertRaises(
em.CryptoGeneralException,
em.denormalize_after_decryption,
self.unencrypted,
self.content_type,
)
def test_decrypt_fail_binary_as_plain(self):
self.unencrypted = '\xff'
self.content_type = 'text/plain'
with self.assertRaises(em.CryptoAcceptNotSupportedException):
em.denormalize_after_decryption(self.unencrypted,
self.content_type)
self.assertRaises(
em.CryptoAcceptNotSupportedException,
em.denormalize_after_decryption,
self.unencrypted,
self.content_type,
)
class WhenTestingCryptoExtensionManager(unittest.TestCase):
class WhenTestingCryptoExtensionManager(testtools.TestCase):
def setUp(self):
super(WhenTestingCryptoExtensionManager, self).setUp()
self.manager = em.CryptoExtensionManager()
def test_create_supported_algorithm(self):
@ -206,45 +228,51 @@ class WhenTestingCryptoExtensionManager(unittest.TestCase):
self.assertEqual(skg, self.manager._determine_type('des'))
def test_create_unsupported_algorithm(self):
with self.assertRaises(em.CryptoAlgorithmNotSupportedException):
self.manager._determine_type('faux_alg')
self.assertRaises(
em.CryptoAlgorithmNotSupportedException,
self.manager._determine_type,
'faux_alg',
)
def test_encrypt_no_plugin_found(self):
self.manager.extensions = []
with self.assertRaises(em.CryptoPluginNotFound):
self.manager.encrypt(
'payload',
'content_type',
'content_encoding',
mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock()
)
self.assertRaises(
em.CryptoPluginNotFound,
self.manager.encrypt,
'payload',
'content_type',
'content_encoding',
mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock(),
)
def test_encrypt_no_supported_plugin(self):
plugin = TestSupportsCryptoPlugin()
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
with self.assertRaises(em.CryptoSupportedPluginNotFound):
self.manager.encrypt(
'payload',
'content_type',
'content_encoding',
mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock()
)
self.assertRaises(
em.CryptoSupportedPluginNotFound,
self.manager.encrypt,
'payload',
'content_type',
'content_encoding',
mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock(),
)
def test_decrypt_no_plugin_found(self):
""" Passing mocks here causes CryptoPluginNotFound because the mock
won't match any of the available plugins
"""
with self.assertRaises(em.CryptoPluginNotFound):
self.manager.decrypt(
'text/plain',
mock.MagicMock(),
mock.MagicMock()
)
self.assertRaises(
em.CryptoPluginNotFound,
self.manager.decrypt,
'text/plain',
mock.MagicMock(),
mock.MagicMock(),
)
def test_decrypt_no_supported_plugin_found(self):
""" Similar to test_decrypt_no_plugin_found, but in this case
@ -255,46 +283,50 @@ class WhenTestingCryptoExtensionManager(unittest.TestCase):
fake_datum = mock.MagicMock()
fake_datum.kek_meta_tenant = mock.MagicMock()
fake_secret.encrypted_data = [fake_datum]
with self.assertRaises(em.CryptoPluginNotFound):
self.manager.decrypt(
'text/plain',
fake_secret,
mock.MagicMock()
)
self.assertRaises(
em.CryptoPluginNotFound,
self.manager.decrypt,
'text/plain',
fake_secret,
mock.MagicMock(),
)
def test_generate_data_encryption_key_no_plugin_found(self):
self.manager.extensions = []
with self.assertRaises(em.CryptoPluginNotFound):
self.manager.generate_data_encryption_key(
mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock()
)
self.assertRaises(
em.CryptoPluginNotFound,
self.manager.generate_data_encryption_key,
mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock(),
)
def test_generate_data_encryption_key_no_supported_plugin(self):
plugin = TestSupportsCryptoPlugin()
plugin_mock = mock.MagicMock(obj=plugin)
self.manager.extensions = [plugin_mock]
with self.assertRaises(em.CryptoSupportedPluginNotFound):
self.manager.generate_data_encryption_key(
mock.MagicMock(algorithm='AES'),
mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock()
)
self.assertRaises(
em.CryptoSupportedPluginNotFound,
self.manager.generate_data_encryption_key,
mock.MagicMock(algorithm='AES'),
mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock(),
)
def test_find_or_create_kek_objects_bind_returns_none(self):
plugin = TestSupportsCryptoPlugin()
kek_repo = mock.MagicMock(name='kek_repo')
bind_completed = mock.MagicMock(bind_completed=False)
kek_repo.find_or_create_kek_datum.return_value = bind_completed
with self.assertRaises(em.CryptoKEKBindingException):
self.manager._find_or_create_kek_objects(
plugin,
mock.MagicMock(),
kek_repo
)
self.assertRaises(
em.CryptoKEKBindingException,
self.manager._find_or_create_kek_objects,
plugin,
mock.MagicMock(),
kek_repo,
)
def test_find_or_create_kek_objects_saves_to_repo(self):
kek_repo = mock.MagicMock(name='kek_repo')

View File

@ -13,16 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import testtools
from barbican.crypto import mime_types
from barbican.model import models
class WhenTestingIsBase64ProcessingNeeded(unittest.TestCase):
def setUp(self):
pass
class WhenTestingIsBase64ProcessingNeeded(testtools.TestCase):
def test_is_base64_needed(self):
r = mime_types.is_base64_processing_needed('application/octet-stream',
@ -51,7 +48,7 @@ class WhenTestingIsBase64ProcessingNeeded(unittest.TestCase):
self.assertFalse(r)
class WhenTestingIsBase64ProcessingSupported(unittest.TestCase):
class WhenTestingIsBase64ProcessingSupported(testtools.TestCase):
def test_is_base64_supported_application_octet_stream(self):
r = mime_types.is_base64_encoding_supported('application/octet-stream')
@ -65,9 +62,11 @@ class WhenTestingIsBase64ProcessingSupported(unittest.TestCase):
self.assertFalse(r)
class WhenTestingAugmentFieldsWithContentTypes(unittest.TestCase):
class WhenTestingAugmentFieldsWithContentTypes(testtools.TestCase):
def setUp(self):
super(WhenTestingAugmentFieldsWithContentTypes, self).setUp()
self.secret = models.Secret({})
self.secret.secret_id = "secret#1"
self.datum = models.EncryptedDatum(self.secret)
@ -114,7 +113,7 @@ class WhenTestingAugmentFieldsWithContentTypes(unittest.TestCase):
self.assertEqual(self.datum.content_type, content_types['default'])
class WhenTestingNormalizationOfMIMETypes(unittest.TestCase):
class WhenTestingNormalizationOfMIMETypes(testtools.TestCase):
def test_plain_text_normalization(self):
mimes = ['text/plain',

View File

@ -14,15 +14,18 @@
# limitations under the License.
import mock
import unittest
import testtools
from barbican.crypto import p11_crypto
from barbican.crypto import plugin as plugin_import
class WhenTestingP11CryptoPlugin(unittest.TestCase):
class WhenTestingP11CryptoPlugin(testtools.TestCase):
def setUp(self):
super(WhenTestingP11CryptoPlugin, self).setUp()
self.p11_mock = mock.MagicMock(CKR_OK=0, CKF_RW_SESSION='RW',
name='PyKCS11 mock')
self.patcher = mock.patch('barbican.crypto.p11_crypto.PyKCS11',
@ -36,6 +39,7 @@ class WhenTestingP11CryptoPlugin(unittest.TestCase):
self.session = self.pkcs11.openSession()
def tearDown(self):
super(WhenTestingP11CryptoPlugin, self).tearDown()
self.patcher.stop()
def test_create_calls_generate_random(self):
@ -54,18 +58,22 @@ class WhenTestingP11CryptoPlugin(unittest.TestCase):
def test_create_errors_when_rand_length_is_not_as_requested(self):
self.session.generateRandom.return_value = [1, 2, 3, 4, 5, 6, 7]
with self.assertRaises(p11_crypto.P11CryptoPluginException):
self.plugin.create(
192,
plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
"AES"
)
self.assertRaises(
p11_crypto.P11CryptoPluginException,
self.plugin.create,
192,
plugin_import.PluginSupportTypes.SYMMETRIC_KEY_GENERATION,
"AES",
)
def test_raises_error_with_no_library_path(self):
m = mock.MagicMock()
m.p11_crypto_plugin = mock.MagicMock(library_path=None)
with self.assertRaises(ValueError):
p11_crypto.P11CryptoPlugin(m)
self.assertRaises(
ValueError,
p11_crypto.P11CryptoPlugin,
m,
)
def test_raises_error_with_bad_library_path(self):
m = mock.MagicMock()
@ -73,8 +81,11 @@ class WhenTestingP11CryptoPlugin(unittest.TestCase):
m.p11_crypto_plugin = mock.MagicMock(library_path="/dev/null")
# TODO: Really raises PyKCS11.PyKCS11Error
with self.assertRaises(Exception):
p11_crypto.P11CryptoPlugin(m)
self.assertRaises(
Exception,
p11_crypto.P11CryptoPlugin,
m,
)
def test_init_builds_sessions_and_login(self):
self.pkcs11.openSession.assert_any_call(1)
@ -83,8 +94,11 @@ class WhenTestingP11CryptoPlugin(unittest.TestCase):
def test_get_key_by_label_with_two_keys(self):
self.session.findObjects.return_value = ['key1', 'key2']
with self.assertRaises(p11_crypto.P11CryptoPluginKeyException):
self.plugin._get_key_by_label('mylabel')
self.assertRaises(
p11_crypto.P11CryptoPluginKeyException,
self.plugin._get_key_by_label,
'mylabel',
)
def test_get_key_by_label_with_one_key(self):
key = 'key1'
@ -108,8 +122,10 @@ class WhenTestingP11CryptoPlugin(unittest.TestCase):
def test_generate_iv_with_invalid_response_size(self):
self.session.generateRandom.return_value = [1, 2, 3, 4, 5, 6, 7]
with self.assertRaises(p11_crypto.P11CryptoPluginException):
self.plugin._generate_iv()
self.assertRaises(
p11_crypto.P11CryptoPluginException,
self.plugin._generate_iv,
)
def test_build_gcm_params(self):
class GCM_Mock(object):
@ -182,11 +198,11 @@ class WhenTestingP11CryptoPlugin(unittest.TestCase):
gk = self.pkcs11.lib.C_Generate_Key
# this is a way to test to make sure methods are NOT called
self.assertItemsEqual([], gk.call_args_list)
self.assertEqual([], gk.call_args_list)
t = self.session._template2ckattrlist
self.assertItemsEqual([], t.call_args_list)
self.assertEqual([], t.call_args_list)
m = self.p11_mock.LowLevel.CK_MECHANISM
self.assertItemsEqual([], m.call_args_list)
self.assertEqual([], m.call_args_list)
def test_supports_encrypt_decrypt(self):
self.assertTrue(

View File

@ -15,7 +15,8 @@
from Crypto import Random
from mock import MagicMock
import unittest
import testtools
from barbican.crypto import plugin
@ -49,9 +50,10 @@ class TestCryptoPlugin(plugin.CryptoPluginBase):
return False
class WhenTestingSimpleCryptoPlugin(unittest.TestCase):
class WhenTestingSimpleCryptoPlugin(testtools.TestCase):
def setUp(self):
super(WhenTestingSimpleCryptoPlugin, self).setUp()
self.plugin = plugin.SimpleCryptoPlugin()
def test_pad_binary_string(self):
@ -84,8 +86,13 @@ class WhenTestingSimpleCryptoPlugin(unittest.TestCase):
unencrypted = u'unicode_beer\U0001F37A'
secret = MagicMock()
secret.mime_type = 'text/plain'
with self.assertRaises(ValueError):
self.plugin.encrypt(unencrypted, MagicMock(), MagicMock())
self.assertRaises(
ValueError,
self.plugin.encrypt,
unencrypted,
MagicMock(),
MagicMock(),
)
def test_byte_string_encryption(self):
unencrypted = b'some_secret'

View File

@ -13,13 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import testtools
from barbican.model import models
class WhenCreatingNewSecret(unittest.TestCase):
class WhenCreatingNewSecret(testtools.TestCase):
def setUp(self):
super(WhenCreatingNewSecret, self).setUp()
self.parsed_secret = {'name': 'name',
'algorithm': 'algorithm',
'bit_length': 512,
@ -36,8 +37,9 @@ class WhenCreatingNewSecret(unittest.TestCase):
self.assertEqual(secret.mode, self.parsed_secret['mode'])
class WhenCreatingNewContainer(unittest.TestCase):
class WhenCreatingNewContainer(testtools.TestCase):
def setUp(self):
super(WhenCreatingNewContainer, self).setUp()
self.parsed_container = {'name': 'name',
'type': 'generic',
'secret_refs': [

View File

@ -13,17 +13,17 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest
import testtools
from oslo.config import cfg
from barbican.model.repositories import clean_paging_values
class WhenCleaningRepositoryPagingParameters(unittest.TestCase):
class WhenCleaningRepositoryPagingParameters(testtools.TestCase):
@classmethod
def setUpClass(cls):
cls.CONF = cfg.CONF
def setUp(self):
super(WhenCleaningRepositoryPagingParameters, self).setUp()
self.CONF = cfg.CONF
def test_parameters_not_assigned(self):
""" The cleaner should use defaults when params are not specified"""

View File

@ -13,10 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import falcon
import mock
import testtools
from barbican.crypto import extension_manager as em
from barbican.model import models
@ -25,9 +24,11 @@ from barbican.openstack.common import timeutils
from barbican.tasks import resources
class WhenBeginningOrder(unittest.TestCase):
class WhenBeginningOrder(testtools.TestCase):
def setUp(self):
super(WhenBeginningOrder, self).setUp()
self.requestor = 'requestor1234'
self.order = models.Order()
self.order.id = "id1"
@ -119,8 +120,12 @@ class WhenBeginningOrder(unittest.TestCase):
self.order_repo.get = mock.MagicMock(return_value=None,
side_effect=ValueError())
with self.assertRaises(ValueError):
self.resource.process(self.order.id, self.keystone_id)
self.assertRaises(
ValueError,
self.resource.process,
self.order.id,
self.keystone_id,
)
# Order state doesn't change because can't retrieve it to change it.
self.assertEqual(models.States.PENDING, self.order.status)
@ -130,8 +135,12 @@ class WhenBeginningOrder(unittest.TestCase):
self.tenant_repo.get = mock.MagicMock(return_value=None,
side_effect=ValueError())
with self.assertRaises(ValueError):
self.resource.process(self.order.id, self.keystone_id)
self.assertRaises(
ValueError,
self.resource.process,
self.order.id,
self.keystone_id,
)
self.assertEqual(models.States.ERROR, self.order.status)
self.assertEqual(falcon.HTTP_500, self.order.error_status_code)
@ -143,8 +152,12 @@ class WhenBeginningOrder(unittest.TestCase):
self.order_repo.save = mock.MagicMock(return_value=None,
side_effect=ValueError())
with self.assertRaises(ValueError):
self.resource.process(self.order.id, self.keystone_id)
self.assertRaises(
ValueError,
self.resource.process,
self.order.id,
self.keystone_id,
)
def test_should_fail_during_error_report_fail(self):
# Force an error during the error-report handling after
@ -160,13 +173,19 @@ class WhenBeginningOrder(unittest.TestCase):
# Should see the original exception (TypeError) instead of the
# secondary one (ValueError).
with self.assertRaises(TypeError):
self.resource.process(self.order.id, self.keystone_id)
self.assertRaises(
TypeError,
self.resource.process,
self.order.id,
self.keystone_id,
)
class WhenPerformingVerification(unittest.TestCase):
class WhenPerformingVerification(testtools.TestCase):
def setUp(self):
super(WhenPerformingVerification, self).setUp()
self.verif = models.Verification()
self.verif.id = "id1"
@ -214,8 +233,12 @@ class WhenPerformingVerification(unittest.TestCase):
self.verif_repo.get = mock.MagicMock(return_value=None,
side_effect=ValueError())
with self.assertRaises(ValueError):
self.resource.process(self.verif.id, self.keystone_id)
self.assertRaises(
ValueError,
self.resource.process,
self.verif.id,
self.keystone_id,
)
# Verification state doesn't change because can't retrieve
# it to change it.
@ -226,5 +249,9 @@ class WhenPerformingVerification(unittest.TestCase):
self.verif_repo.save = mock.MagicMock(return_value=None,
side_effect=ValueError())
with self.assertRaises(ValueError):
self.resource.process(self.verif.id, self.keystone_id)
self.assertRaises(
ValueError,
self.resource.process,
self.verif.id,
self.keystone_id,
)

View File

@ -12,11 +12,10 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import testtools
class BaseTestCase(unittest.TestCase):
class BaseTestCase(testtools.TestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
self.order_id = 'order1234'

10
tox.ini
View File

@ -1,16 +1,12 @@
[tox]
envlist = pep8,py27
envlist = pep8,py26,py27
[testenv]
setenv =
VIRTUAL_ENV={envdir}
PYTHONPATH = {toxinidir}/etc/barbican
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
[testenv:py27]
commands = python setup.py testr --coverage
commands =
python setup.py testr --coverage
coverage combine
coverage report -m