diff --git a/barbicanclient/secrets.py b/barbicanclient/secrets.py index f4dda80e..81ba1d7e 100644 --- a/barbicanclient/secrets.py +++ b/barbicanclient/secrets.py @@ -139,6 +139,12 @@ class SecretManager(base.BaseEntityManager): raise ValueError('secret_ref is required.') if not content_type: secret = self.get(secret_ref) + if secret.content_types is None: + raise ValueError('Secret has no encrypted data to decrypt.') + if 'default' not in secret.content_types: + raise ValueError("Must specify decrypt content-type as " + "secret does not specify a 'default' " + "content-type.") content_type = secret.content_types['default'] headers = {'Accept': content_type} return self.api.get_raw(secret_ref, headers) diff --git a/barbicanclient/test/test_client.py b/barbicanclient/test/test_client.py index f1afeed9..0fb878a9 100644 --- a/barbicanclient/test/test_client.py +++ b/barbicanclient/test/test_client.py @@ -33,20 +33,27 @@ class FakeAuth(object): class FakeResp(object): - def __init__(self, status_code, response_dict): + def __init__(self, status_code, response_dict=None, content=None): self.status_code = status_code self.response_dict = response_dict + self.content = content def json(self): + if self.response_dict is None: + return None resp = self.response_dict resp['title'] = 'some title here' return resp + def content(self): + return self.content + class SecretData(object): def __init__(self): self.name = 'Self destruction sequence' self.payload = 'the magic words are squeamish ossifrage' + self.payload_content_type = 'text/plain' self.content = 'text/plain' self.algorithm = 'AES' self.created = str(timeutils.utcnow()) @@ -56,13 +63,15 @@ class SecretData(object): 'algorithm': self.algorithm, 'created': self.created} - def get_dict(self, secret_ref): - sdict = self.secret_dict - sdict['secret_ref'] = secret_ref - return sdict + def get_dict(self, secret_ref, content_types_dict=None): + secret_dict = self.secret_dict + secret_dict['secret_ref'] = secret_ref + if content_types_dict: + secret_dict['content_types'] = content_types_dict + return secret_dict -class WhenTestingClient(unittest.TestCase): +class WhenTestingClientInit(unittest.TestCase): def setUp(self): self.auth_endpoint = 'https://localhost:5000/v2.0/' self.auth_token = 'fake_auth_token' @@ -125,6 +134,86 @@ class WhenTestingClient(unittest.TestCase): c._check_status_code(resp) +class WhenTestingClientWithSession(unittest.TestCase): + def setUp(self): + self.endpoint = 'https://localhost:9311/v1/' + self.tenant_id = '1234567' + + self.entity = 'dummy-entity' + self.entity_base = self.endpoint + self.tenant_id + "/" + self.entity + "/" + self.entity_href = self.entity_base + '1234' + + self.entity_name = 'name' + self.entity_dict = {'name': self.entity_name} + + self.session = mock.MagicMock() + + self.client = client.Client(session=self.session, + endpoint=self.endpoint, + tenant_id=self.tenant_id) + + def test_should_post(self): + self.session.post.return_value = FakeResp(200, {'entity_ref': + self.entity_href}) + + resp_dict = self.client.post(self.entity, self.entity_dict) + + self.assertEqual(self.entity_href, resp_dict['entity_ref']) + + # Verify the correct URL was used to make the call. + args, kwargs = self.session.post.call_args + url = args[0] + self.assertEqual(self.entity_base, url) + + # Verify that correct information was sent in the call. + data = jsonutils.loads(kwargs['data']) + self.assertEqual(self.entity_name, data['name']) + + def test_should_get(self): + self.session.get.return_value = FakeResp(200, {'name': + self.entity_name}) + + resp_dict = self.client.get(self.entity_href) + + self.assertEqual(self.entity_name, resp_dict['name']) + + # Verify the correct URL was used to make the call. + args, kwargs = self.session.get.call_args + url = args[0] + self.assertEqual(self.entity_href, url) + + # Verify that correct information was sent in the call. + headers = kwargs['headers'] + self.assertEqual('application/json', headers['Accept']) + + def test_should_get_raw(self): + self.session.get.return_value = FakeResp(200, content='content') + + headers = {'Accept': 'application/octet-stream'} + content = self.client.get_raw(self.entity_href, headers) + + self.assertEqual('content', content) + + # Verify the correct URL was used to make the call. + args, kwargs = self.session.get.call_args + url = args[0] + self.assertEqual(self.entity_href, url) + + # Verify that correct information was sent in the call. + headers = kwargs['headers'] + self.assertEqual('application/octet-stream', headers['Accept']) + + def test_should_delete(self): + self.session.delete.return_value = FakeResp(200) + + self.client.delete(self.entity_href) + + # Verify the correct URL was used to make the call. + args, kwargs = self.session.delete.call_args + url = args[0] + self.assertEqual(self.entity_href, url) + + class BaseEntityResource(unittest.TestCase): def _setUp(self, entity): self.endpoint = 'https://localhost:9311/v1/' @@ -134,100 +223,122 @@ class BaseEntityResource(unittest.TestCase): self.entity_base = self.endpoint + self.tenant_id + "/" + self.entity + "/" self.entity_href = self.entity_base + '1234' - self.session = mock.MagicMock() - - self.client = client.Client(session=self.session, - endpoint=self.endpoint, - tenant_id=self.tenant_id) - - -class WhenTestingSecretsResourcePost(BaseEntityResource): - - def setUp(self): - self._setUp('secrets') - - self.secret = SecretData() - - def test_should_create(self): - self.session.post.return_value = FakeResp(200, {'secret_ref': - self.entity_href}) - - secret_href = self.client.secrets\ - .store(name=self.secret.name, - payload=self.secret.payload, - payload_content_type=self.secret.content) - - self.assertEqual(self.entity_href, secret_href) - - # Verify the correct URL was used to make the call. - args, kwargs = self.session.post.call_args - url = args[0] - self.assertEqual(self.entity_base, url) - - # Verify that correct information was sent in the call. - data = jsonutils.loads(kwargs['data']) - self.assertEqual(self.secret.name, data['name']) - self.assertEqual(self.secret.payload, data['payload']) - - def test_should_fail_create_as_500(self): - self.session.post.return_value = FakeResp(500, {'bogus': 'ditto'}) - - with self.assertRaises(client.HTTPServerError) as cm: - self.client.secrets.\ - store(name=self.secret.name, - payload=self.secret.payload, - payload_content_type=self.secret.content) - - def test_should_fail_create_as_401(self): - self.session.post.return_value = FakeResp(401, {'bogus': 'ditto'}) - - with self.assertRaises(client.HTTPAuthError): - self.client.secrets.store(name=self.secret.name, - payload=self.secret.payload, - payload_content_type= - self.secret.content) - - def test_should_fail_create_as_403(self): - self.session.post.return_value = FakeResp(403, {'bogus': 'ditto'}) - - with self.assertRaises(client.HTTPClientError): - self.client.secrets.store(name=self.secret.name, - payload=self.secret.payload, - payload_content_type= - self.secret.content) - - -class WhenTestingSecretsResourceGet(BaseEntityResource): - - def setUp(self): - self._setUp('secrets') - - self.secret_name = 'Self destruction sequence' - self.secret_payload = 'the magic words are squeamish ossifrage' - self.secret_content = 'text/plain' - self.algorithm = 'AES' - self.created = str(timeutils.utcnow()) - - self.secret = {'secret_ref': self.entity_href, - 'name': self.secret_name, - 'status': 'ACTIVE', - 'algorithm': self.algorithm, - 'created': self.created} - - def test_should_get(self): - self.session.get.return_value = FakeResp(200, self.secret) - - secret = self.client.secrets.get(secret_ref=self.entity_href) - self.assertIsInstance(secret, secrets.Secret) - self.assertEqual(self.entity_href, secret.secret_ref) - - # Verify the correct URL was used to make the call. - args, kwargs = self.session.get.call_args - url = args[0] - self.assertEqual(self.entity_href, url) - - # Verify that correct information was sent in the call. - self.assertIsNone(kwargs['params']) + self.api = mock.MagicMock() +# +# +# class WhenTestingSecretsManager(BaseEntityResource): +# +# def setUp(self): +# self._setUp('secrets') +# +# self.secret = SecretData() +# +# self.manager = secrets.SecretManager(self.api) +# +# def test_should_store(self): +# self.api.post.return_value = {'secret_ref': self.entity_href} +# +# secret_href = self.manager\ +# .store(name=self.secret.name, +# payload=self.secret.payload, +# payload_content_type=self.secret.content) +# +# self.assertEqual(self.entity_href, secret_href) +# +# # Verify the correct URL was used to make the call. +# args, kwargs = self.api.post.call_args +# entity_resp = args[0] +# self.assertEqual(self.entity, entity_resp) +# +# # Verify that correct information was sent in the call. +# secret_resp = args[1] +# self.assertEqual(self.secret.name, secret_resp['name']) +# self.assertEqual(self.secret.payload, secret_resp['payload']) +# self.assertEqual(self.secret.payload_content_type, +# secret_resp['payload_content_type']) +# +# def test_should_get(self): +# self.api.get.return_value = self.secret.get_dict(self.entity_href) +# +# secret = self.manager.get(secret_ref=self.entity_href) +# self.assertIsInstance(secret, secrets.Secret) +# self.assertEqual(self.entity_href, secret.secret_ref) +# +# # Verify the correct URL was used to make the call. +# args, kwargs = self.api.get.call_args +# url = args[0] +# self.assertEqual(self.entity_href, url) +# +# def test_should_decrypt_with_content_type(self): +# decrypted = 'decrypted text here' +# self.api.get_raw.return_value = decrypted +# +# secret = self.manager.decrypt(secret_ref=self.entity_href, +# content_type='application/octet-stream') +# self.assertEqual(decrypted, secret) +# +# # Verify the correct URL was used to make the call. +# args, kwargs = self.api.get_raw.call_args +# url = args[0] +# self.assertEqual(self.entity_href, url) +# +# # Verify that correct information was sent in the call. +# headers = args[1] +# self.assertEqual('application/octet-stream', headers['Accept']) +# +# def test_should_decrypt_without_content_type(self): +# content_types_dict = {'default': 'application/octet-stream'} +# self.api.get.return_value = self.secret.get_dict(self.entity_href, +# content_types_dict) +# decrypted = 'decrypted text here' +# self.api.get_raw.return_value = decrypted +# +# secret = self.manager.decrypt(secret_ref=self.entity_href) +# self.assertEqual(decrypted, secret) +# +# # Verify the correct URL was used to make the call. +# args, kwargs = self.api.get.call_args +# url = args[0] +# self.assertEqual(self.entity_href, url) +# +# # Verify the correct URL was used to make the call. +# args, kwargs = self.api.get_raw.call_args +# url = args[0] +# self.assertEqual(self.entity_href, url) +# +# # Verify that correct information was sent in the call. +# headers = args[1] +# self.assertEqual('application/octet-stream', headers['Accept']) +# +# def test_should_delete(self): +# self.manager.delete(secret_ref=self.entity_href) +# +# # Verify the correct URL was used to make the call. +# args, kwargs = self.api.delete.call_args +# url = args[0] +# self.assertEqual(self.entity_href, url) +# +# def test_should_fail_get_no_href(self): +# with self.assertRaises(ValueError): +# self.manager.get(None) +# +# def test_should_fail_decrypt_no_content_types(self): +# self.api.get.return_value = self.secret.get_dict(self.entity_href) +# +# with self.assertRaises(ValueError): +# self.manager.decrypt(secret_ref=self.entity_href) +# +# def test_should_fail_decrypt_no_default_content_type(self): +# content_types_dict = {'no-default': 'application/octet-stream'} +# self.api.get.return_value = self.secret.get_dict(self.entity_href, +# content_types_dict) +# +# with self.assertRaises(ValueError): +# self.manager.decrypt(secret_ref=self.entity_href) +# +# def test_should_fail_delete_no_href(self): +# with self.assertRaises(ValueError): +# self.manager.delete(None) # class WhenTestingVerificationsResourcePost(BaseEntityResource): diff --git a/barbicanclient/test/test_client_orders.py b/barbicanclient/test/test_client_orders.py new file mode 100644 index 00000000..4518dd9c --- /dev/null +++ b/barbicanclient/test/test_client_orders.py @@ -0,0 +1,96 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from barbicanclient import orders +from barbicanclient.openstack.common import timeutils +from barbicanclient.test import test_client +from barbicanclient.test import test_client_secrets as test_secrets + + +class OrderData(object): + def __init__(self): + self.created = str(timeutils.utcnow()) + + self.secret = test_secrets.SecretData() + self.order_dict = {'created': self.created, + 'secret': self.secret.get_dict()} + + def get_dict(self, order_ref, secret_ref=None): + order = self.order_dict + order['order_ref'] = order_ref + if secret_ref: + order['secret_ref'] = secret_ref + return order + + +class WhenTestingOrdersManager(test_client.BaseEntityResource): + + def setUp(self): + self._setUp('orders') + + self.order = OrderData() + + self.manager = orders.OrderManager(self.api) + + def test_should_create(self): + self.api.post.return_value = {'order_ref': self.entity_href} + + order_href = self.manager\ + .create(name=self.order.secret.name, + algorithm=self.order.secret.algorithm, + payload_content_type=self.order.secret.content) + + self.assertEqual(self.entity_href, order_href) + + # Verify the correct URL was used to make the call. + args, kwargs = self.api.post.call_args + entity_resp = args[0] + self.assertEqual(self.entity, entity_resp) + + # Verify that correct information was sent in the call. + order_req = args[1] + self.assertEqual(self.order.secret.name, order_req['secret']['name']) + self.assertEqual(self.order.secret.algorithm, + order_req['secret']['algorithm']) + self.assertEqual(self.order.secret.payload_content_type, + order_req['secret']['payload_content_type']) + + def test_should_get(self): + self.api.get.return_value = self.order.get_dict(self.entity_href) + + order = self.manager.get(order_ref=self.entity_href) + self.assertIsInstance(order, orders.Order) + self.assertEqual(self.entity_href, order.order_ref) + + # Verify the correct URL was used to make the call. + args, kwargs = self.api.get.call_args + url = args[0] + self.assertEqual(self.entity_href, url) + + def test_should_delete(self): + self.manager.delete(order_ref=self.entity_href) + + # Verify the correct URL was used to make the call. + args, kwargs = self.api.delete.call_args + url = args[0] + self.assertEqual(self.entity_href, url) + + def test_should_fail_get_no_href(self): + with self.assertRaises(ValueError): + self.manager.get(None) + + def test_should_fail_delete_no_href(self): + with self.assertRaises(ValueError): + self.manager.delete(None) diff --git a/barbicanclient/test/test_client_secrets.py b/barbicanclient/test/test_client_secrets.py new file mode 100644 index 00000000..9d9e1aa5 --- /dev/null +++ b/barbicanclient/test/test_client_secrets.py @@ -0,0 +1,156 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from barbicanclient.test import test_client +from barbicanclient import secrets +from barbicanclient.openstack.common import timeutils + + +class SecretData(object): + def __init__(self): + self.name = 'Self destruction sequence' + self.payload = 'the magic words are squeamish ossifrage' + self.payload_content_type = 'text/plain' + self.content = 'text/plain' + self.algorithm = 'AES' + self.created = str(timeutils.utcnow()) + + self.secret_dict = {'name': self.name, + 'status': 'ACTIVE', + 'algorithm': self.algorithm, + 'created': self.created} + + def get_dict(self, secret_ref=None, content_types_dict=None): + secret = self.secret_dict + if secret_ref: + secret['secret_ref'] = secret_ref + if content_types_dict: + secret['content_types'] = content_types_dict + return secret + + +class WhenTestingSecretsManager(test_client.BaseEntityResource): + + def setUp(self): + self._setUp('secrets') + + self.secret = SecretData() + + self.manager = secrets.SecretManager(self.api) + + def test_should_store(self): + self.api.post.return_value = {'secret_ref': self.entity_href} + + secret_href = self.manager\ + .store(name=self.secret.name, + payload=self.secret.payload, + payload_content_type=self.secret.content) + + self.assertEqual(self.entity_href, secret_href) + + # Verify the correct URL was used to make the call. + args, kwargs = self.api.post.call_args + entity_resp = args[0] + self.assertEqual(self.entity, entity_resp) + + # Verify that correct information was sent in the call. + secret_req = args[1] + self.assertEqual(self.secret.name, secret_req['name']) + self.assertEqual(self.secret.payload, secret_req['payload']) + self.assertEqual(self.secret.payload_content_type, + secret_req['payload_content_type']) + + def test_should_get(self): + self.api.get.return_value = self.secret.get_dict(self.entity_href) + + secret = self.manager.get(secret_ref=self.entity_href) + self.assertIsInstance(secret, secrets.Secret) + self.assertEqual(self.entity_href, secret.secret_ref) + + # Verify the correct URL was used to make the call. + args, kwargs = self.api.get.call_args + url = args[0] + self.assertEqual(self.entity_href, url) + + def test_should_decrypt_with_content_type(self): + decrypted = 'decrypted text here' + self.api.get_raw.return_value = decrypted + + secret = self.manager.decrypt(secret_ref=self.entity_href, + content_type='application/octet-stream') + self.assertEqual(decrypted, secret) + + # Verify the correct URL was used to make the call. + args, kwargs = self.api.get_raw.call_args + url = args[0] + self.assertEqual(self.entity_href, url) + + # Verify that correct information was sent in the call. + headers = args[1] + self.assertEqual('application/octet-stream', headers['Accept']) + + def test_should_decrypt_without_content_type(self): + content_types_dict = {'default': 'application/octet-stream'} + self.api.get.return_value = self.secret.get_dict(self.entity_href, + content_types_dict) + decrypted = 'decrypted text here' + self.api.get_raw.return_value = decrypted + + secret = self.manager.decrypt(secret_ref=self.entity_href) + self.assertEqual(decrypted, secret) + + # Verify the correct URL was used to make the call. + args, kwargs = self.api.get.call_args + url = args[0] + self.assertEqual(self.entity_href, url) + + # Verify the correct URL was used to make the call. + args, kwargs = self.api.get_raw.call_args + url = args[0] + self.assertEqual(self.entity_href, url) + + # Verify that correct information was sent in the call. + headers = args[1] + self.assertEqual('application/octet-stream', headers['Accept']) + + def test_should_delete(self): + self.manager.delete(secret_ref=self.entity_href) + + # Verify the correct URL was used to make the call. + args, kwargs = self.api.delete.call_args + url = args[0] + self.assertEqual(self.entity_href, url) + + def test_should_fail_get_no_href(self): + with self.assertRaises(ValueError): + self.manager.get(None) + + def test_should_fail_decrypt_no_content_types(self): + self.api.get.return_value = self.secret.get_dict(self.entity_href) + + with self.assertRaises(ValueError): + self.manager.decrypt(secret_ref=self.entity_href) + + def test_should_fail_decrypt_no_default_content_type(self): + content_types_dict = {'no-default': 'application/octet-stream'} + self.api.get.return_value = self.secret.get_dict(self.entity_href, + content_types_dict) + + with self.assertRaises(ValueError): + self.manager.decrypt(secret_ref=self.entity_href) + + def test_should_fail_delete_no_href(self): + with self.assertRaises(ValueError): + self.manager.delete(None) diff --git a/barbicanclient/test/test_client_verifications.py b/barbicanclient/test/test_client_verifications.py new file mode 100644 index 00000000..c72120f9 --- /dev/null +++ b/barbicanclient/test/test_client_verifications.py @@ -0,0 +1,102 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from barbicanclient import verifications as verify +from barbicanclient.openstack.common import timeutils +from barbicanclient.test import test_client +from barbicanclient.test import test_client_secrets as test_secrets + + +class VerificationData(object): + def __init__(self): + self.created = str(timeutils.utcnow()) + + self.resource_type = 'image' + self.resource_ref = 'http://www.image.com/v1/images/1234' + self.resource_action = 'vm_attach' + self.impersonation_allowed = True + + self.verification_dict = {'created': self.created, + 'resource_type': self.resource_type, + 'resource_ref': self.resource_ref, + 'resource_action': self.resource_action, + 'impersonation_allowed': + self.impersonation_allowed} + + def get_dict(self, verification_ref): + verify = self.verification_dict + verify['verification_ref'] = verification_ref + return verify + + +class WhenTestingVerificationsManager(test_client.BaseEntityResource): + + def setUp(self): + self._setUp('verifications') + + self.verify = VerificationData() + + self.manager = verify.VerificationManager(self.api) + + def test_should_create(self): + self.api.post.return_value = {'verification_ref': self.entity_href} + + order_href = self.manager\ + .create(resource_type=self.verify.resource_type, + resource_ref=self.verify.resource_ref, + resource_action=self.verify.resource_action) + + self.assertEqual(self.entity_href, order_href) + + # Verify the correct URL was used to make the call. + args, kwargs = self.api.post.call_args + entity_resp = args[0] + self.assertEqual(self.entity, entity_resp) + + # Verify that correct information was sent in the call. + verify_req = args[1] + self.assertEqual(self.verify.resource_type, verify_req['resource_type']) + self.assertEqual(self.verify.resource_action, + verify_req['resource_action']) + self.assertEqual(self.verify.resource_ref, + verify_req['resource_ref']) + + # def test_should_get(self): + # self.api.get.return_value = self.order.get_dict(self.entity_href) + # + # order = self.manager.get(order_ref=self.entity_href) + # self.assertIsInstance(order, orders.Order) + # self.assertEqual(self.entity_href, order.order_ref) + # + # # Verify the correct URL was used to make the call. + # args, kwargs = self.api.get.call_args + # url = args[0] + # self.assertEqual(self.entity_href, url) + # + # def test_should_delete(self): + # self.manager.delete(order_ref=self.entity_href) + # + # # Verify the correct URL was used to make the call. + # args, kwargs = self.api.delete.call_args + # url = args[0] + # self.assertEqual(self.entity_href, url) + # + # def test_should_fail_get_no_href(self): + # with self.assertRaises(ValueError): + # self.manager.get(None) + # + # def test_should_fail_delete_no_href(self): + # with self.assertRaises(ValueError): + # self.manager.delete(None)