diff --git a/barbican/tests/api/test_resources.py b/barbican/tests/api/test_resources.py index 581d5b878..9071a9236 100644 --- a/barbican/tests/api/test_resources.py +++ b/barbican/tests/api/test_resources.py @@ -21,9 +21,9 @@ resource classes. For RBAC tests of these classes, see the import base64 import logging +import mimetypes import urllib -import mimetypes import mock import pecan import testtools @@ -44,9 +44,9 @@ LOG = logging.getLogger(__name__) def get_barbican_env(keystone_id): - """Create and return a barbican.context for use with - the RBAC decorator by injecting the provided - keystone_id + """Create and return a barbican.context for use with the RBAC decorator + + Injects the provided keystone_id. """ kwargs = {'roles': None, 'user': None, @@ -221,8 +221,8 @@ class BaseSecretsResource(FunctionalTest): if payload_content_type: self.secret_req['payload_content_type'] = payload_content_type if payload_content_encoding: - self.secret_req['payload_content_encoding'] = \ - payload_content_encoding + self.secret_req['payload_content_encoding'] = ( + payload_content_encoding) self.keystone_id = 'keystone1234' self.tenant_entity_id = 'tid1234' @@ -283,16 +283,18 @@ class BaseSecretsResource(FunctionalTest): expiration_raw = expiration_raw[:-6].replace('12', '17', 1) expiration_tz = timeutils.parse_isotime(expiration_raw.strip()) expected['expiration'] = timeutils.normalize_time(expiration_tz) - mock_store_secret\ - .assert_called_once_with( - self.secret_req.get('payload'), - self.secret_req.get('payload_content_type', - 'application/octet-stream'), - self.secret_req.get('payload_content_encoding'), - expected, None, self.tenant, mock.ANY, - transport_key_needed=False, - transport_key_id=None - ) + mock_store_secret.assert_called_once_with( + self.secret_req.get('payload'), + self.secret_req.get('payload_content_type', + 'application/octet-stream'), + self.secret_req.get('payload_content_encoding'), + expected, + None, + self.tenant, + mock.ANY, + transport_key_needed=False, + transport_key_id=None + ) @mock.patch('barbican.plugin.resources.store_secret') def _test_should_add_new_secret_one_step(self, mock_store_secret, @@ -312,18 +314,18 @@ class BaseSecretsResource(FunctionalTest): expected = dict(self.secret_req) expected['expiration'] = None - mock_store_secret\ - .assert_called_once_with( - self.secret_req.get('payload'), - self.secret_req.get('payload_content_type', - 'application/octet-stream'), - self.secret_req.get('payload_content_encoding'), - expected, None, - self.tenant if check_tenant_id else mock.ANY, - mock.ANY, - transport_key_needed=False, - transport_key_id=None - ) + mock_store_secret.assert_called_once_with( + self.secret_req.get('payload'), + self.secret_req.get('payload_content_type', + 'application/octet-stream'), + self.secret_req.get('payload_content_encoding'), + expected, + None, + self.tenant if check_tenant_id else mock.ANY, + mock.ANY, + transport_key_needed=False, + transport_key_id=None + ) @mock.patch('barbican.plugin.resources.store_secret') def _test_should_add_new_secret_one_step_with_tkey_id( @@ -341,18 +343,18 @@ class BaseSecretsResource(FunctionalTest): expected = dict(self.secret_req) expected['expiration'] = None - mock_store_secret\ - .assert_called_once_with( - self.secret_req.get('payload'), - self.secret_req.get('payload_content_type', - 'application/octet-stream'), - self.secret_req.get('payload_content_encoding'), - expected, None, - self.tenant if check_tenant_id else mock.ANY, - mock.ANY, - transport_key_needed=False, - transport_key_id=self.transport_key_id - ) + mock_store_secret.assert_called_once_with( + self.secret_req.get('payload'), + self.secret_req.get('payload_content_type', + 'application/octet-stream'), + self.secret_req.get('payload_content_encoding'), + expected, + None, + self.tenant if check_tenant_id else mock.ANY, + mock.ANY, + transport_key_needed=False, + transport_key_id=self.transport_key_id + ) def _test_should_add_new_secret_if_tenant_does_not_exist(self): self.tenant_repo.get.return_value = None @@ -418,9 +420,10 @@ class BaseSecretsResource(FunctionalTest): 'mode': self.secret_mode, 'payload': big_text, 'payload_content_type': self.payload_content_type} - if self.payload_content_encoding: - self.secret_req['payload_content_encoding'] = \ - self.payload_content_encoding + + payload_encoding = self.payload_content_encoding + if payload_encoding: + self.secret_req['payload_content_encoding'] = payload_encoding self.app.post_json('/secrets/', self.secret_req) def _test_should_raise_due_to_payload_too_large(self): @@ -434,9 +437,10 @@ class BaseSecretsResource(FunctionalTest): 'mode': self.secret_mode, 'payload': big_text, 'payload_content_type': self.payload_content_type} - if self.payload_content_encoding: - self.secret_req['payload_content_encoding'] = \ - self.payload_content_encoding + + payload_encoding = self.payload_content_encoding + if payload_encoding: + self.secret_req['payload_content_encoding'] = payload_encoding resp = self.app.post_json( '/secrets/', @@ -451,11 +455,13 @@ class BaseSecretsResource(FunctionalTest): 'bit_length': self.secret_bit_length, 'mode': self.secret_mode, 'payload': ''} - if self.payload_content_type: - self.secret_req['payload_content_type'] = self.payload_content_type - if self.payload_content_encoding: - self.secret_req['payload_content_encoding'] = \ - self.payload_content_encoding + + payload_type = self.payload_content_type + payload_encoding = self.payload_content_encoding + if payload_type: + self.secret_req['payload_content_type'] = payload_type + if payload_encoding: + self.secret_req['payload_content_encoding'] = payload_encoding resp = self.app.post_json( '/secrets/', @@ -509,7 +515,7 @@ class WhenCreatingPlainTextSecretsUsingSecretsResource(BaseSecretsResource): ) self.assertEqual(resp.status_int, 400) -#TODO(jwood) These tests are integration-style unit tests of the REST -> Pecan +# TODO(jwood) These tests are integration-style unit tests of the REST -> Pecan # resources, which are more painful to test now that we have a two-tier # plugin lookup framework. These tests should instead be re-located to # unit tests of the secret_store.py and store_crypto.py modules. A separate @@ -779,14 +785,16 @@ class WhenGettingSecretsListUsingSecretsResource(FunctionalTest): ) # Verify that the name is unquoted correctly in the # secrets.on_get function prior to searching the repo. - self.secret_repo.get_by_create_date \ - .assert_called_once_with(self.keystone_id, - offset_arg=u'{0}'.format(self.offset), - limit_arg=u'{0}'.format(self.limit), - suppress_exception=True, - name=self.name, - alg=None, mode=None, - bits=0) + self.secret_repo.get_by_create_date.assert_called_once_with( + self.keystone_id, + offset_arg=u'{0}'.format(self.offset), + limit_arg=u'{0}'.format(self.limit), + suppress_exception=True, + name=self.name, + alg=None, + mode=None, + bits=0 + ) self.assertIn('secrets', resp.namespace) secrets = resp.namespace['secrets'] @@ -799,13 +807,16 @@ class WhenGettingSecretsListUsingSecretsResource(FunctionalTest): dict((k, v) for k, v in self.params.items() if v is not None) ) - self.secret_repo.get_by_create_date \ - .assert_called_once_with(self.keystone_id, - offset_arg=u'{0}'.format(self.offset), - limit_arg=u'{0}'.format(self.limit), - suppress_exception=True, - name='', alg=None, mode=None, - bits=0) + self.secret_repo.get_by_create_date.assert_called_once_with( + self.keystone_id, + offset_arg=u'{0}'.format(self.offset), + limit_arg=u'{0}'.format(self.limit), + suppress_exception=True, + name='', + alg=None, + mode=None, + bits=0 + ) self.assertTrue('previous' in resp.namespace) self.assertTrue('next' in resp.namespace) @@ -840,13 +851,16 @@ class WhenGettingSecretsListUsingSecretsResource(FunctionalTest): dict((k, v) for k, v in self.params.items() if v is not None) ) - self.secret_repo.get_by_create_date \ - .assert_called_once_with(self.keystone_id, - offset_arg=u'{0}'.format(self.offset), - limit_arg=u'{0}'.format(self.limit), - suppress_exception=True, - name='', alg=None, mode=None, - bits=0) + self.secret_repo.get_by_create_date.assert_called_once_with( + self.keystone_id, + offset_arg=u'{0}'.format(self.offset), + limit_arg=u'{0}'.format(self.limit), + suppress_exception=True, + name='', + alg=None, + mode=None, + bits=0 + ) self.assertFalse('previous' in resp.namespace) self.assertFalse('next' in resp.namespace) @@ -949,10 +963,10 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): '/secrets/{0}/'.format(self.secret.id), headers={'Accept': 'application/json', 'Accept-Encoding': 'gzip'} ) - self.secret_repo \ - .get.assert_called_once_with(entity_id=self.secret.id, - keystone_id=self.keystone_id, - suppress_exception=True) + self.secret_repo.get.assert_called_once_with( + entity_id=self.secret.id, + keystone_id=self.keystone_id, + suppress_exception=True) self.assertEqual(resp.status_int, 200) self.assertNotIn('content_encodings', resp.namespace) @@ -971,19 +985,20 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): headers={'Accept': 'text/plain'} ) - self.secret_repo \ - .get.assert_called_once_with(entity_id=self.secret.id, - keystone_id=self.keystone_id, - suppress_exception=True) + self.secret_repo.get.assert_called_once_with( + entity_id=self.secret.id, + keystone_id=self.keystone_id, + suppress_exception=True) self.assertEqual(resp.status_int, 200) self.assertEqual(resp.body, data) - mock_get_secret\ - .assert_called_once_with('text/plain', - self.secret, - self.tenant, - None, - None) + mock_get_secret.assert_called_once_with( + 'text/plain', + self.secret, + self.tenant, + None, + None + ) @mock.patch('barbican.plugin.resources.get_secret') def test_should_get_secret_as_plain_with_twsk(self, mock_get_secret): @@ -997,19 +1012,20 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): headers={'Accept': 'text/plain'} ) - self.secret_repo \ - .get.assert_called_once_with(entity_id=self.secret.id, - keystone_id=self.keystone_id, - suppress_exception=True) + self.secret_repo.get.assert_called_once_with( + entity_id=self.secret.id, + keystone_id=self.keystone_id, + suppress_exception=True) self.assertEqual(resp.status_int, 200) self.assertEqual(resp.body, data) - mock_get_secret\ - .assert_called_once_with('text/plain', - self.secret, - self.tenant, - twsk, - self.transport_key_model.transport_key) + mock_get_secret.assert_called_once_with( + 'text/plain', + self.secret, + self.tenant, + twsk, + self.transport_key_model.transport_key + ) @mock.patch('barbican.plugin.resources.get_secret') def test_should_throw_exception_for_get_when_twsk_but_no_tkey_id( @@ -1025,10 +1041,10 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): expect_errors=True ) - self.secret_repo \ - .get.assert_called_once_with(entity_id=self.secret.id, - keystone_id=self.keystone_id, - suppress_exception=True) + self.secret_repo.get.assert_called_once_with( + entity_id=self.secret.id, + keystone_id=self.keystone_id, + suppress_exception=True) self.assertEqual(resp.status_int, 400) @mock.patch('barbican.plugin.resources.get_transport_key_id_for_retrieval') @@ -1042,10 +1058,10 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): headers={'Accept': 'application/json', 'Accept-Encoding': 'gzip'} ) - self.secret_repo \ - .get.assert_called_once_with(entity_id=self.secret.id, - keystone_id=self.keystone_id, - suppress_exception=True) + self.secret_repo.get.assert_called_once_with( + entity_id=self.secret.id, + keystone_id=self.keystone_id, + suppress_exception=True) self.assertEqual(resp.status_int, 200) @@ -1067,10 +1083,10 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): headers={'Accept': 'application/json', 'Accept-Encoding': 'gzip'} ) - self.secret_repo \ - .get.assert_called_once_with(entity_id=self.secret.id, - keystone_id=self.keystone_id, - suppress_exception=True) + self.secret_repo.get.assert_called_once_with( + entity_id=self.secret.id, + keystone_id=self.keystone_id, + suppress_exception=True) self.assertEqual(resp.status_int, 200) @@ -1103,12 +1119,13 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): self.assertEqual(resp.body, data) - mock_get_secret\ - .assert_called_once_with('application/octet-stream', - self.secret, - self.tenant, - None, - None) + mock_get_secret.assert_called_once_with( + 'application/octet-stream', + self.secret, + self.tenant, + None, + None + ) def test_should_throw_exception_for_get_when_secret_not_found(self): self.secret_repo.get.return_value = None @@ -1140,11 +1157,15 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): self.assertEqual(resp.status_int, 204) - mock_store_secret\ - .assert_called_once_with('plain text', 'text/plain', None, - self.secret.to_dict_fields(), - self.secret, self.tenant, mock.ANY, - transport_key_id=None) + mock_store_secret.assert_called_once_with( + 'plain text', + 'text/plain', None, + self.secret.to_dict_fields(), + self.secret, + self.tenant, + mock.ANY, + transport_key_id=None + ) @mock.patch('barbican.plugin.resources.store_secret') def test_should_put_secret_as_plain_with_tkey_id(self, mock_store_secret): @@ -1159,11 +1180,15 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): self.assertEqual(resp.status_int, 204) - mock_store_secret\ - .assert_called_once_with('plain text', 'text/plain', None, - self.secret.to_dict_fields(), - self.secret, self.tenant, mock.ANY, - transport_key_id=self.transport_key_id) + mock_store_secret.assert_called_once_with( + 'plain text', + 'text/plain', None, + self.secret.to_dict_fields(), + self.secret, + self.tenant, + mock.ANY, + transport_key_id=self.transport_key_id + ) @mock.patch('barbican.plugin.resources.store_secret') def test_should_put_secret_as_binary(self, mock_store_secret): @@ -1180,12 +1205,16 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): self.assertEqual(resp.status_int, 204) - mock_store_secret\ - .assert_called_once_with('plain text', 'application/octet-stream', - None, - self.secret.to_dict_fields(), - self.secret, self.tenant, mock.ANY, - transport_key_id=None) + mock_store_secret.assert_called_once_with( + 'plain text', + 'application/octet-stream', + None, + self.secret.to_dict_fields(), + self.secret, + self.tenant, + mock.ANY, + transport_key_id=None + ) @mock.patch('barbican.plugin.resources.store_secret') def test_should_put_secret_as_binary_with_tkey_id(self, mock_store_secret): @@ -1203,12 +1232,16 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): self.assertEqual(resp.status_int, 204) - mock_store_secret\ - .assert_called_once_with('plain text', 'application/octet-stream', - None, - self.secret.to_dict_fields(), - self.secret, self.tenant, mock.ANY, - transport_key_id=self.transport_key_id) + mock_store_secret.assert_called_once_with( + 'plain text', + 'application/octet-stream', + None, + self.secret.to_dict_fields(), + self.secret, + self.tenant, + mock.ANY, + transport_key_id=self.transport_key_id + ) @mock.patch('barbican.plugin.resources.store_secret') def test_should_put_encoded_secret_as_binary(self, mock_store_secret): @@ -1226,11 +1259,15 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): self.assertEqual(resp.status_int, 204) - mock_store_secret\ - .assert_called_once_with(payload, 'application/octet-stream', - 'base64', self.secret.to_dict_fields(), - self.secret, self.tenant, mock.ANY, - transport_key_id=None) + mock_store_secret.assert_called_once_with( + payload, + 'application/octet-stream', + 'base64', self.secret.to_dict_fields(), + self.secret, + self.tenant, + mock.ANY, + transport_key_id=None + ) def test_should_raise_to_put_secret_with_unsupported_encoding(self): self.secret.encrypted_data = [] @@ -1329,8 +1366,9 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): '/secrets/{0}/'.format(self.secret.id) ) - mock_delete_secret\ - .assert_called_once_with(self.secret, self.keystone_id, mock.ANY) + mock_delete_secret.assert_called_once_with(self.secret, + self.keystone_id, + mock.ANY) def test_should_throw_exception_for_delete_when_secret_not_found(self): self.secret_repo.get.return_value = None @@ -1340,7 +1378,7 @@ class WhenGettingPuttingOrDeletingSecretUsingSecretResource(FunctionalTest): expect_errors=True ) self.assertEqual(resp.status_int, 404) - #Error response should have json content type + # Error response should have json content type self.assertEqual(resp.content_type, "application/json") @@ -1389,8 +1427,7 @@ class WhenCreatingOrdersUsingOrdersResource(FunctionalTest): self.order_req = { 'secret': { 'name': self.secret_name, - 'payload_content_type': - self.secret_payload_content_type, + 'payload_content_type': self.secret_payload_content_type, 'algorithm': self.secret_algorithm, 'bit_length': self.secret_bit_length, 'mode': self.secret_mode @@ -1404,9 +1441,8 @@ class WhenCreatingOrdersUsingOrdersResource(FunctionalTest): ) self.assertEqual(resp.status_int, 202) - self.queue_resource.process_order \ - .assert_called_once_with(order_id=None, - keystone_id=self.tenant_keystone_id) + self.queue_resource.process_order.assert_called_once_with( + order_id=None, keystone_id=self.tenant_keystone_id) args, kwargs = self.order_repo.create_from.call_args order = args[0] @@ -1434,8 +1470,7 @@ class WhenCreatingOrdersUsingOrdersResource(FunctionalTest): self.unsupported_req = { 'secret': { 'name': self.secret_name, - 'payload_content_type': - self.secret_payload_content_type, + 'payload_content_type': self.secret_payload_content_type, 'algorithm': "not supported", 'bit_length': self.secret_bit_length, 'mode': self.secret_mode @@ -1517,11 +1552,12 @@ class WhenGettingOrdersListUsingOrdersResource(FunctionalTest): def test_should_get_list_orders(self): resp = self.app.get('/orders/', self.params) - self.order_repo.get_by_create_date \ - .assert_called_once_with(self.keystone_id, - offset_arg=u'{0}'.format(self.offset), - limit_arg=u'{0}'.format(self.limit), - suppress_exception=True) + self.order_repo.get_by_create_date.assert_called_once_with( + self.keystone_id, + offset_arg=u'{0}'.format(self.offset), + limit_arg=u'{0}'.format(self.limit), + suppress_exception=True + ) self.assertTrue('previous' in resp.namespace) self.assertTrue('next' in resp.namespace) @@ -1549,11 +1585,12 @@ class WhenGettingOrdersListUsingOrdersResource(FunctionalTest): resp = self.app.get('/orders/', self.params) - self.order_repo.get_by_create_date \ - .assert_called_once_with(self.keystone_id, - offset_arg=u'{0}'.format(self.offset), - limit_arg=u'{0}'.format(self.limit), - suppress_exception=True) + self.order_repo.get_by_create_date.assert_called_once_with( + self.keystone_id, + offset_arg=u'{0}'.format(self.offset), + limit_arg=u'{0}'.format(self.limit), + suppress_exception=True + ) self.assertFalse('previous' in resp.namespace) self.assertFalse('next' in resp.namespace) @@ -1603,16 +1640,15 @@ class WhenGettingOrDeletingOrderUsingOrderResource(FunctionalTest): def test_should_get_order(self): self.app.get('/orders/{0}/'.format(self.order.id)) - self.order_repo.get \ - .assert_called_once_with(entity_id=self.order.id, - keystone_id=self.tenant_keystone_id, - suppress_exception=True) + self.order_repo.get.assert_called_once_with( + entity_id=self.order.id, + keystone_id=self.tenant_keystone_id, + suppress_exception=True) def test_should_delete_order(self): self.app.delete('/orders/{0}/'.format(self.order.id)) - self.order_repo.delete_entity_by_id \ - .assert_called_once_with(entity_id=self.order.id, - keystone_id=self.tenant_keystone_id) + self.order_repo.delete_entity_by_id.assert_called_once_with( + entity_id=self.order.id, keystone_id=self.tenant_keystone_id) def test_should_throw_exception_for_get_when_order_not_found(self): self.order_repo.get.return_value = None @@ -1630,7 +1666,7 @@ class WhenGettingOrDeletingOrderUsingOrderResource(FunctionalTest): expect_errors=True ) self.assertEqual(resp.status_int, 404) - #Error response should have json content type + # Error response should have json content type self.assertEqual(resp.content_type, "application/json") @@ -1687,9 +1723,8 @@ class WhenCreatingTypeOrdersUsingOrdersResource(FunctionalTest): ) self.assertEqual(resp.status_int, 202) - self.queue_resource.process_type_order \ - .assert_called_once_with(order_id=None, - keystone_id=self.tenant_keystone_id) + self.queue_resource.process_type_order.assert_called_once_with( + order_id=None, keystone_id=self.tenant_keystone_id) args, kwargs = self.order_repo.create_from.call_args order = args[0] @@ -1940,19 +1975,18 @@ class WhenGettingOrDeletingContainerUsingContainerResource(FunctionalTest): self.container.id )) - self.container_repo.get \ - .assert_called_once_with(entity_id=self.container.id, - keystone_id=self.tenant_keystone_id, - suppress_exception=True) + self.container_repo.get.assert_called_once_with( + entity_id=self.container.id, + keystone_id=self.tenant_keystone_id, + suppress_exception=True) def test_should_delete_container(self): self.app.delete('/containers/{0}/'.format( self.container.id )) - self.container_repo.delete_entity_by_id \ - .assert_called_once_with(entity_id=self.container.id, - keystone_id=self.tenant_keystone_id) + self.container_repo.delete_entity_by_id.assert_called_once_with( + entity_id=self.container.id, keystone_id=self.tenant_keystone_id) def test_should_throw_exception_for_get_when_container_not_found(self): self.container_repo.get.return_value = None @@ -1969,7 +2003,7 @@ class WhenGettingOrDeletingContainerUsingContainerResource(FunctionalTest): self.container.id ), expect_errors=True) self.assertEqual(resp.status_int, 404) - #Error response should have json content type + # Error response should have json content type self.assertEqual(resp.content_type, "application/json") @@ -2127,17 +2161,20 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest): self.secret_repo = mock.MagicMock() def test_should_get_consumer(self): - self.consumer_repo.get_by_container_id.return_value = \ - ([self.consumer], 0, 0, 1) + ret_val = ([self.consumer], 0, 0, 1) + self.consumer_repo.get_by_container_id.return_value = ret_val + resp = self.app.get('/containers/{0}/consumers/'.format( self.container.id )) self.assertEqual(resp.status_int, 200) - self.consumer_repo.get_by_container_id \ - .assert_called_once_with(self.container.id, - limit_arg=None, offset_arg=0, - suppress_exception=True) + self.consumer_repo.get_by_container_id.assert_called_once_with( + self.container.id, + limit_arg=None, + offset_arg=0, + suppress_exception=True + ) self.assertEqual(self.consumer.name, resp.json['consumers'][0]['name']) self.assertEqual(self.consumer.URL, resp.json['consumers'][0]['URL']) @@ -2165,8 +2202,7 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest): self.assertEqual(resp.status_int, 404) def test_should_get_no_consumers(self): - self.consumer_repo.get_by_container_id.return_value = \ - ([], 0, 0, 0) + self.consumer_repo.get_by_container_id.return_value = ([], 0, 0, 0) resp = self.app.get('/containers/{0}/consumers/'.format( self.container.id )) @@ -2177,9 +2213,8 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest): self.container.id ), self.consumer_ref) - self.consumer_repo.delete_entity_by_id \ - .assert_called_once_with(self.consumer.id, - self.tenant_keystone_id) + self.consumer_repo.delete_entity_by_id.assert_called_once_with( + self.consumer.id, self.tenant_keystone_id) def test_should_fail_deleting_consumer_bad_json(self): resp = self.app.delete( @@ -2197,7 +2232,7 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest): ), self.consumer_ref, expect_errors=True) self.consumer_repo.get_by_values.return_value = old_return self.assertEqual(resp.status_int, 404) - #Error response should have json content type + # Error response should have json content type self.assertEqual(resp.content_type, "application/json") def test_should_404_on_delete_when_consumer_not_found_later(self): @@ -2207,7 +2242,7 @@ class WhenGettingOrDeletingConsumersUsingConsumerResource(FunctionalTest): ), self.consumer_ref, expect_errors=True) self.consumer_repo.delete_entity_by_id.side_effect = None self.assertEqual(resp.status_int, 404) - #Error response should have json content type + # Error response should have json content type self.assertEqual(resp.content_type, "application/json") @@ -2262,11 +2297,12 @@ class WhenGettingContainersListUsingResource(FunctionalTest): self.params ) - self.container_repo.get_by_create_date \ - .assert_called_once_with(self.keystone_id, - offset_arg=u'{0}'.format(self.offset), - limit_arg=u'{0}'.format(self.limit), - suppress_exception=True) + self.container_repo.get_by_create_date.assert_called_once_with( + self.keystone_id, + offset_arg=u'{0}'.format(self.offset), + limit_arg=u'{0}'.format(self.limit), + suppress_exception=True + ) self.assertTrue('previous' in resp.namespace) self.assertTrue('next' in resp.namespace) @@ -2300,11 +2336,12 @@ class WhenGettingContainersListUsingResource(FunctionalTest): self.params ) - self.container_repo.get_by_create_date \ - .assert_called_once_with(self.keystone_id, - offset_arg=u'{0}'.format(self.offset), - limit_arg=u'{0}'.format(self.limit), - suppress_exception=True) + self.container_repo.get_by_create_date.assert_called_once_with( + self.keystone_id, + offset_arg=u'{0}'.format(self.offset), + limit_arg=u'{0}'.format(self.limit), + suppress_exception=True + ) self.assertFalse('previous' in resp.namespace) self.assertFalse('next' in resp.namespace) @@ -2313,7 +2350,6 @@ class WhenGettingContainersListUsingResource(FunctionalTest): if limit_arg: offset = int(offset_arg) limit = int(limit_arg) - return '/containers' \ - '?limit={0}&offset={1}'.format(limit, offset) + return '/containers?limit={0}&offset={1}'.format(limit, offset) else: return '/containers' diff --git a/barbican/tests/api/test_resources_policy.py b/barbican/tests/api/test_resources_policy.py index 73600c2b0..94c7dbf75 100644 --- a/barbican/tests/api/test_resources_policy.py +++ b/barbican/tests/api/test_resources_policy.py @@ -21,10 +21,9 @@ For typical-flow business logic tests of these classes, see the import os -import testtools - import mock from oslo.config import cfg +import testtools from webob import exc from barbican.api.controllers import consumers diff --git a/barbican/tests/api/test_transport_keys_resource.py b/barbican/tests/api/test_transport_keys_resource.py index 33c8c9974..51f25ccc5 100644 --- a/barbican/tests/api/test_transport_keys_resource.py +++ b/barbican/tests/api/test_transport_keys_resource.py @@ -110,8 +110,8 @@ class WhenGettingTransKeysListUsingTransportKeysResource(FunctionalTest): self._init() class RootController(object): - transport_keys = controllers.transportkeys.\ - TransportKeysController(self.repo) + transport_keys = controllers.transportkeys.TransportKeysController( + self.repo) return RootController() @@ -146,11 +146,12 @@ class WhenGettingTransKeysListUsingTransportKeysResource(FunctionalTest): resp = self.app.get('/transport_keys/', self.params) - self.repo.get_by_create_date \ - .assert_called_once_with(plugin_name=None, - offset_arg=u'{0}'.format(self.offset), - limit_arg=u'{0}'.format(self.limit), - suppress_exception=True) + self.repo.get_by_create_date.assert_called_once_with( + plugin_name=None, + offset_arg=u'{0}'.format(self.offset), + limit_arg=u'{0}'.format(self.limit), + suppress_exception=True + ) self.assertTrue('previous' in resp.namespace) self.assertTrue('next' in resp.namespace) @@ -180,11 +181,12 @@ class WhenGettingTransKeysListUsingTransportKeysResource(FunctionalTest): resp = self.app.get('/transport_keys/', self.params) - self.repo.get_by_create_date \ - .assert_called_once_with(plugin_name=None, - offset_arg=u'{0}'.format(self.offset), - limit_arg=u'{0}'.format(self.limit), - suppress_exception=True) + self.repo.get_by_create_date.assert_called_once_with( + plugin_name=None, + offset_arg=u'{0}'.format(self.offset), + limit_arg=u'{0}'.format(self.limit), + suppress_exception=True + ) self.assertFalse('previous' in resp.namespace) self.assertFalse('next' in resp.namespace) @@ -212,8 +214,8 @@ class WhenCreatingTransKeysListUsingTransportKeysResource(FunctionalTest): self._init() class RootController(object): - transport_keys = controllers.transportkeys.\ - TransportKeysController(self.repo) + transport_keys = controllers.transportkeys.TransportKeysController( + self.repo) return RootController() @@ -278,8 +280,8 @@ class WhenGettingOrDeletingTransKeyUsingTransportKeyResource(FunctionalTest): self._init() class RootController(object): - transport_keys = controllers.transportkeys.\ - TransportKeysController(self.repo) + transport_keys = controllers.transportkeys.TransportKeysController( + self.repo) return RootController() @@ -311,9 +313,8 @@ class WhenGettingOrDeletingTransKeyUsingTransportKeyResource(FunctionalTest): def test_should_delete_transport_key(self): self.app.delete('/transport_keys/{0}/'.format(self.tkey.id)) - self.repo.delete_entity_by_id \ - .assert_called_once_with(entity_id=self.tkey.id, - keystone_id=self.tenant_keystone_id) + self.repo.delete_entity_by_id.assert_called_once_with( + entity_id=self.tkey.id, keystone_id=self.tenant_keystone_id) def test_should_throw_exception_for_delete_when_trans_key_not_found(self): self.repo.delete_entity_by_id.side_effect = excep.NotFound( diff --git a/functionaltests/api/test_versions.py b/functionaltests/api/test_versions.py index da3582c5f..044c78844 100644 --- a/functionaltests/api/test_versions.py +++ b/functionaltests/api/test_versions.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json + import testtools from functionaltests.api import base @@ -22,8 +23,7 @@ class VersionDiscoveryTestCase(base.TestCase): @testtools.skipIf(True, 'Skip until blueprint fix-version-api is complete') def test_get_root_discovers_v1(self): - """Covers retrieving version data for Barbican. - """ + """Covers retrieving version data for Barbican.""" resp, body = self.client.get(' ') body = json.loads(body) diff --git a/functionaltests/api/v1/test_consumers.py b/functionaltests/api/v1/test_consumers.py index f33dfefa9..cf5fb5159 100644 --- a/functionaltests/api/v1/test_consumers.py +++ b/functionaltests/api/v1/test_consumers.py @@ -68,16 +68,20 @@ class ConsumersTestCase(base.TestCase): # Set up two secrets secret_json_data = json.dumps(create_secret_data) resp, body = self.client.post( - '/secrets', secret_json_data, headers={ - 'content-type': 'application/json'}) + '/secrets', + secret_json_data, + headers={'content-type': 'application/json'} + ) self.assertEqual(resp.status, 201) returned_data = json.loads(body) secret_ref_1 = returned_data['secret_ref'] self.assertIsNotNone(secret_ref_1) resp, body = self.client.post( - '/secrets', secret_json_data, headers={ - 'content-type': 'application/json'}) + '/secrets', + secret_json_data, + headers={'content-type': 'application/json'} + ) self.assertEqual(resp.status, 201) returned_data = json.loads(body) @@ -99,8 +103,10 @@ class ConsumersTestCase(base.TestCase): self.container_id = container_ref.split('/')[-1] def test_create_consumer(self): - """Covers consumer creation. All of the data needed to - create the consumer is provided in a single POST. + """Covers consumer creation. + + All of the data needed to create the consumer is provided in a + single POST. """ json_data = json.dumps(create_consumer_data) resp, body = self.client.post( @@ -114,12 +120,14 @@ class ConsumersTestCase(base.TestCase): self.assertIn(create_consumer_data, consumer_data) def test_delete_consumer(self): - """Covers consumer deletion. A consumer is first created, and then - the consumer is deleted and verified to no longer exist. + """Covers consumer deletion. + + A consumer is first created, and then the consumer is deleted and + verified to no longer exist. """ json_data = json.dumps(create_consumer_data_for_delete) - #Register the consumer once + # Register the consumer once resp, body = self.client.post( '/containers/{0}/consumers'.format(self.container_id), json_data, headers={'content-type': 'application/json'}) @@ -130,7 +138,7 @@ class ConsumersTestCase(base.TestCase): self.assertIsNotNone(consumer_data) self.assertIn(create_consumer_data_for_delete, consumer_data) - #Delete the consumer + # Delete the consumer resp, body = self.client.delete( '/containers/{0}/consumers'.format(self.container_id), body=json_data, headers={'content-type': 'application/json'}) @@ -142,12 +150,10 @@ class ConsumersTestCase(base.TestCase): self.assertNotIn(create_consumer_data_for_delete, consumer_data) def test_recreate_consumer(self): - """Covers consumer deletion. A consumer is first created, and then - the consumer is deleted and verified to no longer exist. - """ + """Covers consumer recreation.""" json_data = json.dumps(create_consumer_data_for_recreate) - #Register the consumer once + # Register the consumer once resp, body = self.client.post( '/containers/{0}/consumers'.format(self.container_id), json_data, headers={'content-type': 'application/json'}) @@ -158,7 +164,7 @@ class ConsumersTestCase(base.TestCase): self.assertIsNotNone(consumer_data) self.assertIn(create_consumer_data_for_recreate, consumer_data) - #Delete the consumer + # Delete the consumer resp, body = self.client.delete( '/containers/{0}/consumers'.format(self.container_id), body=json_data, headers={'content-type': 'application/json'}) @@ -169,7 +175,7 @@ class ConsumersTestCase(base.TestCase): self.assertIsNotNone(consumer_data) self.assertNotIn(create_consumer_data_for_recreate, consumer_data) - #Register the consumer again + # Register the consumer again resp, body = self.client.post( '/containers/{0}/consumers'.format(self.container_id), json_data, headers={'content-type': 'application/json'}) @@ -181,12 +187,10 @@ class ConsumersTestCase(base.TestCase): self.assertIn(create_consumer_data_for_recreate, consumer_data) def test_create_consumer_is_idempotent(self): - """Covers consumer deletion. A consumer is first created, and then - the consumer is deleted and verified to no longer exist. - """ + """Covers checking that create consumer is idempotent.""" json_data = json.dumps(create_consumer_data_for_idempotency) - #Register the consumer once + # Register the consumer once resp, body = self.client.post( '/containers/{0}/consumers'.format(self.container_id), json_data, headers={'content-type': 'application/json'}) @@ -197,7 +201,7 @@ class ConsumersTestCase(base.TestCase): self.assertIsNotNone(consumer_data) self.assertIn(create_consumer_data_for_idempotency, consumer_data) - #Register the consumer again, without deleting it first + # Register the consumer again, without deleting it first resp, body = self.client.post( '/containers/{0}/consumers'.format(self.container_id), json_data, headers={'content-type': 'application/json'}) diff --git a/functionaltests/api/v1/test_containers.py b/functionaltests/api/v1/test_containers.py index b38c6ee6a..e0d914305 100644 --- a/functionaltests/api/v1/test_containers.py +++ b/functionaltests/api/v1/test_containers.py @@ -15,9 +15,10 @@ import json import re -from functionaltests.api import base from tempest import exceptions +from functionaltests.api import base + create_secret_data = { "name": "AES key", "expiration": "2018-02-28T19:14:44.180394", @@ -48,8 +49,10 @@ class ContainersTestCase(base.TestCase): def _create_a_secret(self): secret_json_data = json.dumps(create_secret_data) resp, body = self.client.post( - '/secrets', secret_json_data, headers={ - 'content-type': 'application/json'}) + '/secrets', + secret_json_data, + headers={'content-type': 'application/json'} + ) self.assertEqual(resp.status, 201) returned_data = json.loads(body) @@ -113,15 +116,19 @@ class ContainersTestCase(base.TestCase): self.secret_id_2 = secret_ref_2.split('/')[-1] def test_create_container(self): - """Covers container creation. All of the data needed to - create the container is provided in a single POST. + """Covers container creation. + + All of the data needed to create the container is provided in a + single POST. """ container_ref = self._create_a_container() self.assertIsNotNone(container_ref) def test_delete_container(self): - """Covers container deletion. A container is first created, and then - the container is deleted and verified to no longer exist. + """Covers container deletion. + + A container is first created, and then the container is deleted + and verified to no longer exist. """ # Create the container container_ref = self._create_a_container() @@ -142,8 +149,9 @@ class ContainersTestCase(base.TestCase): self._get_a_secret(self.secret_id_2) def test_get_container(self): - """Covers container retrieval. A container is first created, and then - the container is retrieved. + """Covers container retrieval. + + A container is first created, and then the container is retrieved. """ # Create a container container_ref = self._create_a_container() @@ -153,7 +161,9 @@ class ContainersTestCase(base.TestCase): self._get_a_container(container_id) def test_list_containers(self): - """Covers listing containers. A container is first created, and then + """Covers listing containers. + + A container is first created, and then we check the container list to make sure it has a non-zero total and has a `containers` element. """ @@ -166,7 +176,9 @@ class ContainersTestCase(base.TestCase): self.assertIsNotNone(list_data.get('containers')) def test_containers_secret_refs_correctly_formatted(self): - """Create a container (so we are guaranteed to have at least one), then + """Correctly formated secret refs in a container + + Create a container (so we are guaranteed to have at least one), then retrieve that container and check the secret_ref formatting to make sure "secret_ref" attributes contain proper HATEOAS URIs. Then do a container list and verify the same for each container in the list. diff --git a/functionaltests/api/v1/test_orders.py b/functionaltests/api/v1/test_orders.py index ac30f565b..a82e720db 100644 --- a/functionaltests/api/v1/test_orders.py +++ b/functionaltests/api/v1/test_orders.py @@ -31,8 +31,10 @@ create_order_data = { class OrdersTestCase(base.TestCase): def test_create_order(self): - """Covers order creation. All of the data needed to - create the order is provided in a single POST. + """Covers order creation. + + All of the data needed to create the order is provided in a + single POST. """ json_data = json.dumps(create_order_data) resp, body = self.client.post( diff --git a/functionaltests/api/v1/test_secrets.py b/functionaltests/api/v1/test_secrets.py index 0b310f58e..cc1cab7ad 100644 --- a/functionaltests/api/v1/test_secrets.py +++ b/functionaltests/api/v1/test_secrets.py @@ -47,8 +47,10 @@ two_phase_payload_data = { class SecretsTestCase(base.TestCase): def test_create_secret_single_phase(self): - """Covers single phase secret creation. All of the data needed to - create the secret, including payload, is provided in a single POST. + """Covers single phase secret creation. + + All of the data needed to create the secret, including payload, + is provided in a single POST. """ json_data = json.dumps(one_phase_create_data) resp, body = self.client.post( @@ -61,9 +63,11 @@ class SecretsTestCase(base.TestCase): self.assertIsNotNone(secret_ref) def test_create_secret_two_phase(self): - """Covers two phase secret creation. The first call, a POST, provides - the metadata about the secret - everything except the payload. A - subsequent call (PUT) provides the payload. + """Covers two phase secret creation. + + The first call, a POST, provides the metadata about the + secret - everything except the payload. A subsequent call (PUT) + provides the payload. """ # phase 1 - POST secret without payload json_data = json.dumps(two_phase_create_data)