Updating API unit and functional tests to new hacking standards

Global requirements has updating the version of hacking to a minimum of
0.9.2 and it has brought a whole slew of changes.

Change-Id: Ic294fbf95b01ca8186a4035b1365ce660bc4cee8
This commit is contained in:
John Vrbanac 2014-09-01 23:57:12 -05:00
parent 0494ff46f2
commit fc49c999a5
8 changed files with 327 additions and 269 deletions

View File

@ -21,9 +21,9 @@ resource classes. For RBAC tests of these classes, see the
import base64
import logging
import mimetypes
import urllib
import mimetypes
import mock
import pecan
import testtools
@ -44,9 +44,9 @@ LOG = logging.getLogger(__name__)
def get_barbican_env(keystone_id):
"""Create and return a barbican.context for use with
the RBAC decorator by injecting the provided
keystone_id
"""Create and return a barbican.context for use with the RBAC decorator
Injects the provided keystone_id.
"""
kwargs = {'roles': None,
'user': None,
@ -221,8 +221,8 @@ class BaseSecretsResource(FunctionalTest):
if payload_content_type:
self.secret_req['payload_content_type'] = payload_content_type
if payload_content_encoding:
self.secret_req['payload_content_encoding'] = \
payload_content_encoding
self.secret_req['payload_content_encoding'] = (
payload_content_encoding)
self.keystone_id = 'keystone1234'
self.tenant_entity_id = 'tid1234'
@ -283,16 +283,18 @@ class BaseSecretsResource(FunctionalTest):
expiration_raw = expiration_raw[:-6].replace('12', '17', 1)
expiration_tz = timeutils.parse_isotime(expiration_raw.strip())
expected['expiration'] = timeutils.normalize_time(expiration_tz)
mock_store_secret\
.assert_called_once_with(
self.secret_req.get('payload'),
self.secret_req.get('payload_content_type',
'application/octet-stream'),
self.secret_req.get('payload_content_encoding'),
expected, None, self.tenant, mock.ANY,
transport_key_needed=False,
transport_key_id=None
)
mock_store_secret.assert_called_once_with(
self.secret_req.get('payload'),
self.secret_req.get('payload_content_type',
'application/octet-stream'),
self.secret_req.get('payload_content_encoding'),
expected,
None,
self.tenant,
mock.ANY,
transport_key_needed=False,
transport_key_id=None
)
@mock.patch('barbican.plugin.resources.store_secret')
def _test_should_add_new_secret_one_step(self, mock_store_secret,
@ -312,18 +314,18 @@ class BaseSecretsResource(FunctionalTest):
expected = dict(self.secret_req)
expected['expiration'] = None
mock_store_secret\
.assert_called_once_with(
self.secret_req.get('payload'),
self.secret_req.get('payload_content_type',
'application/octet-stream'),
self.secret_req.get('payload_content_encoding'),
expected, None,
self.tenant if check_tenant_id else mock.ANY,
mock.ANY,
transport_key_needed=False,
transport_key_id=None
)
mock_store_secret.assert_called_once_with(
self.secret_req.get('payload'),
self.secret_req.get('payload_content_type',
'application/octet-stream'),
self.secret_req.get('payload_content_encoding'),
expected,
None,
self.tenant if check_tenant_id else mock.ANY,
mock.ANY,
transport_key_needed=False,
transport_key_id=None
)
@mock.patch('barbican.plugin.resources.store_secret')
def _test_should_add_new_secret_one_step_with_tkey_id(
@ -341,18 +343,18 @@ class BaseSecretsResource(FunctionalTest):
expected = dict(self.secret_req)
expected['expiration'] = None
mock_store_secret\
.assert_called_once_with(
self.secret_req.get('payload'),
self.secret_req.get('payload_content_type',
'application/octet-stream'),
self.secret_req.get('payload_content_encoding'),
expected, None,
self.tenant if check_tenant_id else mock.ANY,
mock.ANY,
transport_key_needed=False,
transport_key_id=self.transport_key_id
)
mock_store_secret.assert_called_once_with(
self.secret_req.get('payload'),
self.secret_req.get('payload_content_type',
'application/octet-stream'),
self.secret_req.get('payload_content_encoding'),
expected,
None,
self.tenant if check_tenant_id else mock.ANY,
mock.ANY,
transport_key_needed=False,
transport_key_id=self.transport_key_id
)
def _test_should_add_new_secret_if_tenant_does_not_exist(self):
self.tenant_repo.get.return_value = None
@ -418,9 +420,10 @@ class BaseSecretsResource(FunctionalTest):
'mode': self.secret_mode,
'payload': big_text,
'payload_content_type': self.payload_content_type}
if self.payload_content_encoding:
self.secret_req['payload_content_encoding'] = \
self.payload_content_encoding
payload_encoding = self.payload_content_encoding
if payload_encoding:
self.secret_req['payload_content_encoding'] = payload_encoding
self.app.post_json('/secrets/', self.secret_req)
def _test_should_raise_due_to_payload_too_large(self):
@ -434,9 +437,10 @@ class BaseSecretsResource(FunctionalTest):
'mode': self.secret_mode,
'payload': big_text,
'payload_content_type': self.payload_content_type}
if self.payload_content_encoding:
self.secret_req['payload_content_encoding'] = \
self.payload_content_encoding
payload_encoding = self.payload_content_encoding
if payload_encoding:
self.secret_req['payload_content_encoding'] = payload_encoding
resp = self.app.post_json(
'/secrets/',
@ -451,11 +455,13 @@ class BaseSecretsResource(FunctionalTest):
'bit_length': self.secret_bit_length,
'mode': self.secret_mode,
'payload': ''}
if self.payload_content_type:
self.secret_req['payload_content_type'] = self.payload_content_type
if self.payload_content_encoding:
self.secret_req['payload_content_encoding'] = \
self.payload_content_encoding
payload_type = self.payload_content_type
payload_encoding = self.payload_content_encoding
if payload_type:
self.secret_req['payload_content_type'] = payload_type
if payload_encoding:
self.secret_req['payload_content_encoding'] = payload_encoding
resp = self.app.post_json(
'/secrets/',
@ -509,7 +515,7 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource):
)
self.assertEqual(resp.status_int, 400)
#TODO(jwood) These tests are integration-style unit tests of the REST -> Pecan
# TODO(jwood) These tests are integration-style unit tests of the REST -> Pecan
# resources, which are more painful to test now that we have a two-tier
# plugin lookup framework. These tests should instead be re-located to
# unit tests of the secret_store.py and store_crypto.py modules. A separate
@ -779,14 +785,16 @@ class WhenGettingSecretsListUsingSecretsResource(FunctionalTest):
)
# Verify that the name is unquoted correctly in the
# secrets.on_get function prior to searching the repo.
self.secret_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True,
name=self.name,
alg=None, mode=None,
bits=0)
self.secret_repo.get_by_create_date.assert_called_once_with(
self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True,
name=self.name,
alg=None,
mode=None,
bits=0
)
self.assertIn('secrets', resp.namespace)
secrets = resp.namespace['secrets']
@ -799,13 +807,16 @@ class WhenGettingSecretsListUsingSecretsResource(FunctionalTest):
dict((k, v) for k, v in self.params.items() if v is not None)
)
self.secret_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True,
name='', alg=None, mode=None,
bits=0)
self.secret_repo.get_by_create_date.assert_called_once_with(
self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True,
name='',
alg=None,
mode=None,
bits=0
)
self.assertTrue('previous' in resp.namespace)
self.assertTrue('next' in resp.namespace)
@ -840,13 +851,16 @@ class WhenGettingSecretsListUsingSecretsResource(FunctionalTest):
dict((k, v) for k, v in self.params.items() if v is not None)
)
self.secret_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True,
name='', alg=None, mode=None,
bits=0)
self.secret_repo.get_by_create_date.assert_called_once_with(
self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True,
name='',
alg=None,
mode=None,
bits=0
)
self.assertFalse('previous' in resp.namespace)
self.assertFalse('next' in resp.namespace)
@ -949,10 +963,10 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
'/secrets/{0}/'.format(self.secret.id),
headers={'Accept': 'application/json', 'Accept-Encoding': 'gzip'}
)
self.secret_repo \
.get.assert_called_once_with(entity_id=self.secret.id,
keystone_id=self.keystone_id,
suppress_exception=True)
self.secret_repo.get.assert_called_once_with(
entity_id=self.secret.id,
keystone_id=self.keystone_id,
suppress_exception=True)
self.assertEqual(resp.status_int, 200)
self.assertNotIn('content_encodings', resp.namespace)
@ -971,19 +985,20 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
headers={'Accept': 'text/plain'}
)
self.secret_repo \
.get.assert_called_once_with(entity_id=self.secret.id,
keystone_id=self.keystone_id,
suppress_exception=True)
self.secret_repo.get.assert_called_once_with(
entity_id=self.secret.id,
keystone_id=self.keystone_id,
suppress_exception=True)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, data)
mock_get_secret\
.assert_called_once_with('text/plain',
self.secret,
self.tenant,
None,
None)
mock_get_secret.assert_called_once_with(
'text/plain',
self.secret,
self.tenant,
None,
None
)
@mock.patch('barbican.plugin.resources.get_secret')
def test_should_get_secret_as_plain_with_twsk(self, mock_get_secret):
@ -997,19 +1012,20 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
headers={'Accept': 'text/plain'}
)
self.secret_repo \
.get.assert_called_once_with(entity_id=self.secret.id,
keystone_id=self.keystone_id,
suppress_exception=True)
self.secret_repo.get.assert_called_once_with(
entity_id=self.secret.id,
keystone_id=self.keystone_id,
suppress_exception=True)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, data)
mock_get_secret\
.assert_called_once_with('text/plain',
self.secret,
self.tenant,
twsk,
self.transport_key_model.transport_key)
mock_get_secret.assert_called_once_with(
'text/plain',
self.secret,
self.tenant,
twsk,
self.transport_key_model.transport_key
)
@mock.patch('barbican.plugin.resources.get_secret')
def test_should_throw_exception_for_get_when_twsk_but_no_tkey_id(
@ -1025,10 +1041,10 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
expect_errors=True
)
self.secret_repo \
.get.assert_called_once_with(entity_id=self.secret.id,
keystone_id=self.keystone_id,
suppress_exception=True)
self.secret_repo.get.assert_called_once_with(
entity_id=self.secret.id,
keystone_id=self.keystone_id,
suppress_exception=True)
self.assertEqual(resp.status_int, 400)
@mock.patch('barbican.plugin.resources.get_transport_key_id_for_retrieval')
@ -1042,10 +1058,10 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
headers={'Accept': 'application/json', 'Accept-Encoding': 'gzip'}
)
self.secret_repo \
.get.assert_called_once_with(entity_id=self.secret.id,
keystone_id=self.keystone_id,
suppress_exception=True)
self.secret_repo.get.assert_called_once_with(
entity_id=self.secret.id,
keystone_id=self.keystone_id,
suppress_exception=True)
self.assertEqual(resp.status_int, 200)
@ -1067,10 +1083,10 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
headers={'Accept': 'application/json', 'Accept-Encoding': 'gzip'}
)
self.secret_repo \
.get.assert_called_once_with(entity_id=self.secret.id,
keystone_id=self.keystone_id,
suppress_exception=True)
self.secret_repo.get.assert_called_once_with(
entity_id=self.secret.id,
keystone_id=self.keystone_id,
suppress_exception=True)
self.assertEqual(resp.status_int, 200)
@ -1103,12 +1119,13 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.assertEqual(resp.body, data)
mock_get_secret\
.assert_called_once_with('application/octet-stream',
self.secret,
self.tenant,
None,
None)
mock_get_secret.assert_called_once_with(
'application/octet-stream',
self.secret,
self.tenant,
None,
None
)
def test_should_throw_exception_for_get_when_secret_not_found(self):
self.secret_repo.get.return_value = None
@ -1140,11 +1157,15 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.assertEqual(resp.status_int, 204)
mock_store_secret\
.assert_called_once_with('plain text', 'text/plain', None,
self.secret.to_dict_fields(),
self.secret, self.tenant, mock.ANY,
transport_key_id=None)
mock_store_secret.assert_called_once_with(
'plain text',
'text/plain', None,
self.secret.to_dict_fields(),
self.secret,
self.tenant,
mock.ANY,
transport_key_id=None
)
@mock.patch('barbican.plugin.resources.store_secret')
def test_should_put_secret_as_plain_with_tkey_id(self, mock_store_secret):
@ -1159,11 +1180,15 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.assertEqual(resp.status_int, 204)
mock_store_secret\
.assert_called_once_with('plain text', 'text/plain', None,
self.secret.to_dict_fields(),
self.secret, self.tenant, mock.ANY,
transport_key_id=self.transport_key_id)
mock_store_secret.assert_called_once_with(
'plain text',
'text/plain', None,
self.secret.to_dict_fields(),
self.secret,
self.tenant,
mock.ANY,
transport_key_id=self.transport_key_id
)
@mock.patch('barbican.plugin.resources.store_secret')
def test_should_put_secret_as_binary(self, mock_store_secret):
@ -1180,12 +1205,16 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.assertEqual(resp.status_int, 204)
mock_store_secret\
.assert_called_once_with('plain text', 'application/octet-stream',
None,
self.secret.to_dict_fields(),
self.secret, self.tenant, mock.ANY,
transport_key_id=None)
mock_store_secret.assert_called_once_with(
'plain text',
'application/octet-stream',
None,
self.secret.to_dict_fields(),
self.secret,
self.tenant,
mock.ANY,
transport_key_id=None
)
@mock.patch('barbican.plugin.resources.store_secret')
def test_should_put_secret_as_binary_with_tkey_id(self, mock_store_secret):
@ -1203,12 +1232,16 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.assertEqual(resp.status_int, 204)
mock_store_secret\
.assert_called_once_with('plain text', 'application/octet-stream',
None,
self.secret.to_dict_fields(),
self.secret, self.tenant, mock.ANY,
transport_key_id=self.transport_key_id)
mock_store_secret.assert_called_once_with(
'plain text',
'application/octet-stream',
None,
self.secret.to_dict_fields(),
self.secret,
self.tenant,
mock.ANY,
transport_key_id=self.transport_key_id
)
@mock.patch('barbican.plugin.resources.store_secret')
def test_should_put_encoded_secret_as_binary(self, mock_store_secret):
@ -1226,11 +1259,15 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
self.assertEqual(resp.status_int, 204)
mock_store_secret\
.assert_called_once_with(payload, 'application/octet-stream',
'base64', self.secret.to_dict_fields(),
self.secret, self.tenant, mock.ANY,
transport_key_id=None)
mock_store_secret.assert_called_once_with(
payload,
'application/octet-stream',
'base64', self.secret.to_dict_fields(),
self.secret,
self.tenant,
mock.ANY,
transport_key_id=None
)
def test_should_raise_to_put_secret_with_unsupported_encoding(self):
self.secret.encrypted_data = []
@ -1329,8 +1366,9 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
'/secrets/{0}/'.format(self.secret.id)
)
mock_delete_secret\
.assert_called_once_with(self.secret, self.keystone_id, mock.ANY)
mock_delete_secret.assert_called_once_with(self.secret,
self.keystone_id,
mock.ANY)
def test_should_throw_exception_for_delete_when_secret_not_found(self):
self.secret_repo.get.return_value = None
@ -1340,7 +1378,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest):
expect_errors=True
)
self.assertEqual(resp.status_int, 404)
#Error response should have json content type
# Error response should have json content type
self.assertEqual(resp.content_type, "application/json")
@ -1389,8 +1427,7 @@ class WhenCreatingOrdersUsingOrdersResource(FunctionalTest):
self.order_req = {
'secret': {
'name': self.secret_name,
'payload_content_type':
self.secret_payload_content_type,
'payload_content_type': self.secret_payload_content_type,
'algorithm': self.secret_algorithm,
'bit_length': self.secret_bit_length,
'mode': self.secret_mode
@ -1404,9 +1441,8 @@ class WhenCreatingOrdersUsingOrdersResource(FunctionalTest):
)
self.assertEqual(resp.status_int, 202)
self.queue_resource.process_order \
.assert_called_once_with(order_id=None,
keystone_id=self.tenant_keystone_id)
self.queue_resource.process_order.assert_called_once_with(
order_id=None, keystone_id=self.tenant_keystone_id)
args, kwargs = self.order_repo.create_from.call_args
order = args[0]
@ -1434,8 +1470,7 @@ class WhenCreatingOrdersUsingOrdersResource(FunctionalTest):
self.unsupported_req = {
'secret': {
'name': self.secret_name,
'payload_content_type':
self.secret_payload_content_type,
'payload_content_type': self.secret_payload_content_type,
'algorithm': "not supported",
'bit_length': self.secret_bit_length,
'mode': self.secret_mode
@ -1517,11 +1552,12 @@ class WhenGettingOrdersListUsingOrdersResource(FunctionalTest):
def test_should_get_list_orders(self):
resp = self.app.get('/orders/', self.params)
self.order_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True)
self.order_repo.get_by_create_date.assert_called_once_with(
self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True
)
self.assertTrue('previous' in resp.namespace)
self.assertTrue('next' in resp.namespace)
@ -1549,11 +1585,12 @@ class WhenGettingOrdersListUsingOrdersResource(FunctionalTest):
resp = self.app.get('/orders/', self.params)
self.order_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True)
self.order_repo.get_by_create_date.assert_called_once_with(
self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True
)
self.assertFalse('previous' in resp.namespace)
self.assertFalse('next' in resp.namespace)
@ -1603,16 +1640,15 @@ class WhenGettingOrDeletingOrderUsingOrderResource(FunctionalTest):
def test_should_get_order(self):
self.app.get('/orders/{0}/'.format(self.order.id))
self.order_repo.get \
.assert_called_once_with(entity_id=self.order.id,
keystone_id=self.tenant_keystone_id,
suppress_exception=True)
self.order_repo.get.assert_called_once_with(
entity_id=self.order.id,
keystone_id=self.tenant_keystone_id,
suppress_exception=True)
def test_should_delete_order(self):
self.app.delete('/orders/{0}/'.format(self.order.id))
self.order_repo.delete_entity_by_id \
.assert_called_once_with(entity_id=self.order.id,
keystone_id=self.tenant_keystone_id)
self.order_repo.delete_entity_by_id.assert_called_once_with(
entity_id=self.order.id, keystone_id=self.tenant_keystone_id)
def test_should_throw_exception_for_get_when_order_not_found(self):
self.order_repo.get.return_value = None
@ -1630,7 +1666,7 @@ class WhenGettingOrDeletingOrderUsingOrderResource(FunctionalTest):
expect_errors=True
)
self.assertEqual(resp.status_int, 404)
#Error response should have json content type
# Error response should have json content type
self.assertEqual(resp.content_type, "application/json")
@ -1687,9 +1723,8 @@ class WhenCreatingTypeOrdersUsingOrdersResource(FunctionalTest):
)
self.assertEqual(resp.status_int, 202)
self.queue_resource.process_type_order \
.assert_called_once_with(order_id=None,
keystone_id=self.tenant_keystone_id)
self.queue_resource.process_type_order.assert_called_once_with(
order_id=None, keystone_id=self.tenant_keystone_id)
args, kwargs = self.order_repo.create_from.call_args
order = args[0]
@ -1940,19 +1975,18 @@ class WhenGettingOrDeletingContainerUsingContainerResource(FunctionalTest):
self.container.id
))
self.container_repo.get \
.assert_called_once_with(entity_id=self.container.id,
keystone_id=self.tenant_keystone_id,
suppress_exception=True)
self.container_repo.get.assert_called_once_with(
entity_id=self.container.id,
keystone_id=self.tenant_keystone_id,
suppress_exception=True)
def test_should_delete_container(self):
self.app.delete('/containers/{0}/'.format(
self.container.id
))
self.container_repo.delete_entity_by_id \
.assert_called_once_with(entity_id=self.container.id,
keystone_id=self.tenant_keystone_id)
self.container_repo.delete_entity_by_id.assert_called_once_with(
entity_id=self.container.id, keystone_id=self.tenant_keystone_id)
def test_should_throw_exception_for_get_when_container_not_found(self):
self.container_repo.get.return_value = None
@ -1969,7 +2003,7 @@ class WhenGettingOrDeletingContainerUsingContainerResource(FunctionalTest):
self.container.id
), expect_errors=True)
self.assertEqual(resp.status_int, 404)
#Error response should have json content type
# Error response should have json content type
self.assertEqual(resp.content_type, "application/json")
@ -2127,17 +2161,20 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
self.secret_repo = mock.MagicMock()
def test_should_get_consumer(self):
self.consumer_repo.get_by_container_id.return_value = \
([self.consumer], 0, 0, 1)
ret_val = ([self.consumer], 0, 0, 1)
self.consumer_repo.get_by_container_id.return_value = ret_val
resp = self.app.get('/containers/{0}/consumers/'.format(
self.container.id
))
self.assertEqual(resp.status_int, 200)
self.consumer_repo.get_by_container_id \
.assert_called_once_with(self.container.id,
limit_arg=None, offset_arg=0,
suppress_exception=True)
self.consumer_repo.get_by_container_id.assert_called_once_with(
self.container.id,
limit_arg=None,
offset_arg=0,
suppress_exception=True
)
self.assertEqual(self.consumer.name, resp.json['consumers'][0]['name'])
self.assertEqual(self.consumer.URL, resp.json['consumers'][0]['URL'])
@ -2165,8 +2202,7 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
self.assertEqual(resp.status_int, 404)
def test_should_get_no_consumers(self):
self.consumer_repo.get_by_container_id.return_value = \
([], 0, 0, 0)
self.consumer_repo.get_by_container_id.return_value = ([], 0, 0, 0)
resp = self.app.get('/containers/{0}/consumers/'.format(
self.container.id
))
@ -2177,9 +2213,8 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
self.container.id
), self.consumer_ref)
self.consumer_repo.delete_entity_by_id \
.assert_called_once_with(self.consumer.id,
self.tenant_keystone_id)
self.consumer_repo.delete_entity_by_id.assert_called_once_with(
self.consumer.id, self.tenant_keystone_id)
def test_should_fail_deleting_consumer_bad_json(self):
resp = self.app.delete(
@ -2197,7 +2232,7 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
), self.consumer_ref, expect_errors=True)
self.consumer_repo.get_by_values.return_value = old_return
self.assertEqual(resp.status_int, 404)
#Error response should have json content type
# Error response should have json content type
self.assertEqual(resp.content_type, "application/json")
def test_should_404_on_delete_when_consumer_not_found_later(self):
@ -2207,7 +2242,7 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest):
), self.consumer_ref, expect_errors=True)
self.consumer_repo.delete_entity_by_id.side_effect = None
self.assertEqual(resp.status_int, 404)
#Error response should have json content type
# Error response should have json content type
self.assertEqual(resp.content_type, "application/json")
@ -2262,11 +2297,12 @@ class WhenGettingContainersListUsingResource(FunctionalTest):
self.params
)
self.container_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True)
self.container_repo.get_by_create_date.assert_called_once_with(
self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True
)
self.assertTrue('previous' in resp.namespace)
self.assertTrue('next' in resp.namespace)
@ -2300,11 +2336,12 @@ class WhenGettingContainersListUsingResource(FunctionalTest):
self.params
)
self.container_repo.get_by_create_date \
.assert_called_once_with(self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True)
self.container_repo.get_by_create_date.assert_called_once_with(
self.keystone_id,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True
)
self.assertFalse('previous' in resp.namespace)
self.assertFalse('next' in resp.namespace)
@ -2313,7 +2350,6 @@ class WhenGettingContainersListUsingResource(FunctionalTest):
if limit_arg:
offset = int(offset_arg)
limit = int(limit_arg)
return '/containers' \
'?limit={0}&offset={1}'.format(limit, offset)
return '/containers?limit={0}&offset={1}'.format(limit, offset)
else:
return '/containers'

View File

@ -21,10 +21,9 @@ For typical-flow business logic tests of these classes, see the
import os
import testtools
import mock
from oslo.config import cfg
import testtools
from webob import exc
from barbican.api.controllers import consumers

View File

@ -110,8 +110,8 @@ class WhenGettingTransKeysListUsingTransportKeysResource(FunctionalTest):
self._init()
class RootController(object):
transport_keys = controllers.transportkeys.\
TransportKeysController(self.repo)
transport_keys = controllers.transportkeys.TransportKeysController(
self.repo)
return RootController()
@ -146,11 +146,12 @@ class WhenGettingTransKeysListUsingTransportKeysResource(FunctionalTest):
resp = self.app.get('/transport_keys/',
self.params)
self.repo.get_by_create_date \
.assert_called_once_with(plugin_name=None,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True)
self.repo.get_by_create_date.assert_called_once_with(
plugin_name=None,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True
)
self.assertTrue('previous' in resp.namespace)
self.assertTrue('next' in resp.namespace)
@ -180,11 +181,12 @@ class WhenGettingTransKeysListUsingTransportKeysResource(FunctionalTest):
resp = self.app.get('/transport_keys/',
self.params)
self.repo.get_by_create_date \
.assert_called_once_with(plugin_name=None,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True)
self.repo.get_by_create_date.assert_called_once_with(
plugin_name=None,
offset_arg=u'{0}'.format(self.offset),
limit_arg=u'{0}'.format(self.limit),
suppress_exception=True
)
self.assertFalse('previous' in resp.namespace)
self.assertFalse('next' in resp.namespace)
@ -212,8 +214,8 @@ class WhenCreatingTransKeysListUsingTransportKeysResource(FunctionalTest):
self._init()
class RootController(object):
transport_keys = controllers.transportkeys.\
TransportKeysController(self.repo)
transport_keys = controllers.transportkeys.TransportKeysController(
self.repo)
return RootController()
@ -278,8 +280,8 @@ class WhenGettingOrDeletingTransKeyUsingTransportKeyResource(FunctionalTest):
self._init()
class RootController(object):
transport_keys = controllers.transportkeys.\
TransportKeysController(self.repo)
transport_keys = controllers.transportkeys.TransportKeysController(
self.repo)
return RootController()
@ -311,9 +313,8 @@ class WhenGettingOrDeletingTransKeyUsingTransportKeyResource(FunctionalTest):
def test_should_delete_transport_key(self):
self.app.delete('/transport_keys/{0}/'.format(self.tkey.id))
self.repo.delete_entity_by_id \
.assert_called_once_with(entity_id=self.tkey.id,
keystone_id=self.tenant_keystone_id)
self.repo.delete_entity_by_id.assert_called_once_with(
entity_id=self.tkey.id, keystone_id=self.tenant_keystone_id)
def test_should_throw_exception_for_delete_when_trans_key_not_found(self):
self.repo.delete_entity_by_id.side_effect = excep.NotFound(

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import testtools
from functionaltests.api import base
@ -22,8 +23,7 @@ class VersionDiscoveryTestCase(base.TestCase):
@testtools.skipIf(True, 'Skip until blueprint fix-version-api is complete')
def test_get_root_discovers_v1(self):
"""Covers retrieving version data for Barbican.
"""
"""Covers retrieving version data for Barbican."""
resp, body = self.client.get(' ')
body = json.loads(body)

View File

@ -68,16 +68,20 @@ class ConsumersTestCase(base.TestCase):
# Set up two secrets
secret_json_data = json.dumps(create_secret_data)
resp, body = self.client.post(
'/secrets', secret_json_data, headers={
'content-type': 'application/json'})
'/secrets',
secret_json_data,
headers={'content-type': 'application/json'}
)
self.assertEqual(resp.status, 201)
returned_data = json.loads(body)
secret_ref_1 = returned_data['secret_ref']
self.assertIsNotNone(secret_ref_1)
resp, body = self.client.post(
'/secrets', secret_json_data, headers={
'content-type': 'application/json'})
'/secrets',
secret_json_data,
headers={'content-type': 'application/json'}
)
self.assertEqual(resp.status, 201)
returned_data = json.loads(body)
@ -99,8 +103,10 @@ class ConsumersTestCase(base.TestCase):
self.container_id = container_ref.split('/')[-1]
def test_create_consumer(self):
"""Covers consumer creation. All of the data needed to
create the consumer is provided in a single POST.
"""Covers consumer creation.
All of the data needed to create the consumer is provided in a
single POST.
"""
json_data = json.dumps(create_consumer_data)
resp, body = self.client.post(
@ -114,12 +120,14 @@ class ConsumersTestCase(base.TestCase):
self.assertIn(create_consumer_data, consumer_data)
def test_delete_consumer(self):
"""Covers consumer deletion. A consumer is first created, and then
the consumer is deleted and verified to no longer exist.
"""Covers consumer deletion.
A consumer is first created, and then the consumer is deleted and
verified to no longer exist.
"""
json_data = json.dumps(create_consumer_data_for_delete)
#Register the consumer once
# Register the consumer once
resp, body = self.client.post(
'/containers/{0}/consumers'.format(self.container_id),
json_data, headers={'content-type': 'application/json'})
@ -130,7 +138,7 @@ class ConsumersTestCase(base.TestCase):
self.assertIsNotNone(consumer_data)
self.assertIn(create_consumer_data_for_delete, consumer_data)
#Delete the consumer
# Delete the consumer
resp, body = self.client.delete(
'/containers/{0}/consumers'.format(self.container_id),
body=json_data, headers={'content-type': 'application/json'})
@ -142,12 +150,10 @@ class ConsumersTestCase(base.TestCase):
self.assertNotIn(create_consumer_data_for_delete, consumer_data)
def test_recreate_consumer(self):
"""Covers consumer deletion. A consumer is first created, and then
the consumer is deleted and verified to no longer exist.
"""
"""Covers consumer recreation."""
json_data = json.dumps(create_consumer_data_for_recreate)
#Register the consumer once
# Register the consumer once
resp, body = self.client.post(
'/containers/{0}/consumers'.format(self.container_id),
json_data, headers={'content-type': 'application/json'})
@ -158,7 +164,7 @@ class ConsumersTestCase(base.TestCase):
self.assertIsNotNone(consumer_data)
self.assertIn(create_consumer_data_for_recreate, consumer_data)
#Delete the consumer
# Delete the consumer
resp, body = self.client.delete(
'/containers/{0}/consumers'.format(self.container_id),
body=json_data, headers={'content-type': 'application/json'})
@ -169,7 +175,7 @@ class ConsumersTestCase(base.TestCase):
self.assertIsNotNone(consumer_data)
self.assertNotIn(create_consumer_data_for_recreate, consumer_data)
#Register the consumer again
# Register the consumer again
resp, body = self.client.post(
'/containers/{0}/consumers'.format(self.container_id),
json_data, headers={'content-type': 'application/json'})
@ -181,12 +187,10 @@ class ConsumersTestCase(base.TestCase):
self.assertIn(create_consumer_data_for_recreate, consumer_data)
def test_create_consumer_is_idempotent(self):
"""Covers consumer deletion. A consumer is first created, and then
the consumer is deleted and verified to no longer exist.
"""
"""Covers checking that create consumer is idempotent."""
json_data = json.dumps(create_consumer_data_for_idempotency)
#Register the consumer once
# Register the consumer once
resp, body = self.client.post(
'/containers/{0}/consumers'.format(self.container_id),
json_data, headers={'content-type': 'application/json'})
@ -197,7 +201,7 @@ class ConsumersTestCase(base.TestCase):
self.assertIsNotNone(consumer_data)
self.assertIn(create_consumer_data_for_idempotency, consumer_data)
#Register the consumer again, without deleting it first
# Register the consumer again, without deleting it first
resp, body = self.client.post(
'/containers/{0}/consumers'.format(self.container_id),
json_data, headers={'content-type': 'application/json'})

View File

@ -15,9 +15,10 @@
import json
import re
from functionaltests.api import base
from tempest import exceptions
from functionaltests.api import base
create_secret_data = {
"name": "AES key",
"expiration": "2018-02-28T19:14:44.180394",
@ -48,8 +49,10 @@ class ContainersTestCase(base.TestCase):
def _create_a_secret(self):
secret_json_data = json.dumps(create_secret_data)
resp, body = self.client.post(
'/secrets', secret_json_data, headers={
'content-type': 'application/json'})
'/secrets',
secret_json_data,
headers={'content-type': 'application/json'}
)
self.assertEqual(resp.status, 201)
returned_data = json.loads(body)
@ -113,15 +116,19 @@ class ContainersTestCase(base.TestCase):
self.secret_id_2 = secret_ref_2.split('/')[-1]
def test_create_container(self):
"""Covers container creation. All of the data needed to
create the container is provided in a single POST.
"""Covers container creation.
All of the data needed to create the container is provided in a
single POST.
"""
container_ref = self._create_a_container()
self.assertIsNotNone(container_ref)
def test_delete_container(self):
"""Covers container deletion. A container is first created, and then
the container is deleted and verified to no longer exist.
"""Covers container deletion.
A container is first created, and then the container is deleted
and verified to no longer exist.
"""
# Create the container
container_ref = self._create_a_container()
@ -142,8 +149,9 @@ class ContainersTestCase(base.TestCase):
self._get_a_secret(self.secret_id_2)
def test_get_container(self):
"""Covers container retrieval. A container is first created, and then
the container is retrieved.
"""Covers container retrieval.
A container is first created, and then the container is retrieved.
"""
# Create a container
container_ref = self._create_a_container()
@ -153,7 +161,9 @@ class ContainersTestCase(base.TestCase):
self._get_a_container(container_id)
def test_list_containers(self):
"""Covers listing containers. A container is first created, and then
"""Covers listing containers.
A container is first created, and then
we check the container list to make sure it has a non-zero total and
has a `containers` element.
"""
@ -166,7 +176,9 @@ class ContainersTestCase(base.TestCase):
self.assertIsNotNone(list_data.get('containers'))
def test_containers_secret_refs_correctly_formatted(self):
"""Create a container (so we are guaranteed to have at least one), then
"""Correctly formated secret refs in a container
Create a container (so we are guaranteed to have at least one), then
retrieve that container and check the secret_ref formatting to make
sure "secret_ref" attributes contain proper HATEOAS URIs. Then do a
container list and verify the same for each container in the list.

View File

@ -31,8 +31,10 @@ create_order_data = {
class OrdersTestCase(base.TestCase):
def test_create_order(self):
"""Covers order creation. All of the data needed to
create the order is provided in a single POST.
"""Covers order creation.
All of the data needed to create the order is provided in a
single POST.
"""
json_data = json.dumps(create_order_data)
resp, body = self.client.post(

View File

@ -47,8 +47,10 @@ two_phase_payload_data = {
class SecretsTestCase(base.TestCase):
def test_create_secret_single_phase(self):
"""Covers single phase secret creation. All of the data needed to
create the secret, including payload, is provided in a single POST.
"""Covers single phase secret creation.
All of the data needed to create the secret, including payload,
is provided in a single POST.
"""
json_data = json.dumps(one_phase_create_data)
resp, body = self.client.post(
@ -61,9 +63,11 @@ class SecretsTestCase(base.TestCase):
self.assertIsNotNone(secret_ref)
def test_create_secret_two_phase(self):
"""Covers two phase secret creation. The first call, a POST, provides
the metadata about the secret - everything except the payload. A
subsequent call (PUT) provides the payload.
"""Covers two phase secret creation.
The first call, a POST, provides the metadata about the
secret - everything except the payload. A subsequent call (PUT)
provides the payload.
"""
# phase 1 - POST secret without payload
json_data = json.dumps(two_phase_create_data)