From f2d6ae73344a75c8e19a47326977d6ea66db4664 Mon Sep 17 00:00:00 2001 From: jfwood Date: Fri, 22 Nov 2013 00:31:27 -0600 Subject: [PATCH] Add unit tests for secrets. --- barbicanclient/client.py | 4 +- barbicanclient/test/test_client.py | 193 ++++++++++++++++++++++++++++- barbicanclient/verifications.py | 2 +- 3 files changed, 190 insertions(+), 9 deletions(-) diff --git a/barbicanclient/client.py b/barbicanclient/client.py index 414c4252..0912a038 100644 --- a/barbicanclient/client.py +++ b/barbicanclient/client.py @@ -117,12 +117,12 @@ class Client(object): return resp.json() def _check_status_code(self, resp): - status = resp.status_code + status = resp.status_code if resp else None LOG.debug('Response status {0}'.format(status)) if status == 401: LOG.error('Auth error: {0}'.format(self._get_error_message(resp))) raise HTTPAuthError('{0}'.format(self._get_error_message(resp))) - if status >= 500: + if not status or status >= 500: LOG.error('5xx Server error: {0}'.format( self._get_error_message(resp) )) diff --git a/barbicanclient/test/test_client.py b/barbicanclient/test/test_client.py index f3298de3..f1afeed9 100644 --- a/barbicanclient/test/test_client.py +++ b/barbicanclient/test/test_client.py @@ -19,7 +19,9 @@ import mock import unittest2 as unittest from barbicanclient import client -from barbicanclient.common import auth +from barbicanclient import secrets +from barbicanclient.openstack.common import timeutils +from barbicanclient.openstack.common import jsonutils class FakeAuth(object): @@ -30,6 +32,36 @@ class FakeAuth(object): self.tenant_id = tenant_id +class FakeResp(object): + def __init__(self, status_code, response_dict): + self.status_code = status_code + self.response_dict = response_dict + + def json(self): + resp = self.response_dict + resp['title'] = 'some title here' + return resp + + +class SecretData(object): + def __init__(self): + self.name = 'Self destruction sequence' + self.payload = 'the magic words are squeamish ossifrage' + self.content = 'text/plain' + self.algorithm = 'AES' + self.created = str(timeutils.utcnow()) + + self.secret_dict = {'name': self.name, + 'status': 'ACTIVE', + 'algorithm': self.algorithm, + 'created': self.created} + + def get_dict(self, secret_ref): + sdict = self.secret_dict + sdict['secret_ref'] = secret_ref + return sdict + + class WhenTestingClient(unittest.TestCase): def setUp(self): self.auth_endpoint = 'https://localhost:5000/v2.0/' @@ -93,13 +125,162 @@ class WhenTestingClient(unittest.TestCase): c._check_status_code(resp) -class WhenTestingSecretsResource(unittest.TestCase): - def setUp(self): +class BaseEntityResource(unittest.TestCase): + def _setUp(self, entity): self.endpoint = 'https://localhost:9311/v1/' - self.tenant_id = 'tenant_id' + self.tenant_id = '1234567' + + self.entity = entity + self.entity_base = self.endpoint + self.tenant_id + "/" + self.entity + "/" + self.entity_href = self.entity_base + '1234' self.session = mock.MagicMock() - self.session.read.return_value = self.json - def test_should_create_secret + self.client = client.Client(session=self.session, + endpoint=self.endpoint, + tenant_id=self.tenant_id) + +class WhenTestingSecretsResourcePost(BaseEntityResource): + + def setUp(self): + self._setUp('secrets') + + self.secret = SecretData() + + def test_should_create(self): + self.session.post.return_value = FakeResp(200, {'secret_ref': + self.entity_href}) + + secret_href = self.client.secrets\ + .store(name=self.secret.name, + payload=self.secret.payload, + payload_content_type=self.secret.content) + + self.assertEqual(self.entity_href, secret_href) + + # Verify the correct URL was used to make the call. + args, kwargs = self.session.post.call_args + url = args[0] + self.assertEqual(self.entity_base, url) + + # Verify that correct information was sent in the call. + data = jsonutils.loads(kwargs['data']) + self.assertEqual(self.secret.name, data['name']) + self.assertEqual(self.secret.payload, data['payload']) + + def test_should_fail_create_as_500(self): + self.session.post.return_value = FakeResp(500, {'bogus': 'ditto'}) + + with self.assertRaises(client.HTTPServerError) as cm: + self.client.secrets.\ + store(name=self.secret.name, + payload=self.secret.payload, + payload_content_type=self.secret.content) + + def test_should_fail_create_as_401(self): + self.session.post.return_value = FakeResp(401, {'bogus': 'ditto'}) + + with self.assertRaises(client.HTTPAuthError): + self.client.secrets.store(name=self.secret.name, + payload=self.secret.payload, + payload_content_type= + self.secret.content) + + def test_should_fail_create_as_403(self): + self.session.post.return_value = FakeResp(403, {'bogus': 'ditto'}) + + with self.assertRaises(client.HTTPClientError): + self.client.secrets.store(name=self.secret.name, + payload=self.secret.payload, + payload_content_type= + self.secret.content) + + +class WhenTestingSecretsResourceGet(BaseEntityResource): + + def setUp(self): + self._setUp('secrets') + + self.secret_name = 'Self destruction sequence' + self.secret_payload = 'the magic words are squeamish ossifrage' + self.secret_content = 'text/plain' + self.algorithm = 'AES' + self.created = str(timeutils.utcnow()) + + self.secret = {'secret_ref': self.entity_href, + 'name': self.secret_name, + 'status': 'ACTIVE', + 'algorithm': self.algorithm, + 'created': self.created} + + def test_should_get(self): + self.session.get.return_value = FakeResp(200, self.secret) + + secret = self.client.secrets.get(secret_ref=self.entity_href) + self.assertIsInstance(secret, secrets.Secret) + self.assertEqual(self.entity_href, secret.secret_ref) + + # Verify the correct URL was used to make the call. + args, kwargs = self.session.get.call_args + url = args[0] + self.assertEqual(self.entity_href, url) + + # Verify that correct information was sent in the call. + self.assertIsNone(kwargs['params']) + + +# class WhenTestingVerificationsResourcePost(BaseEntityResource): +# +# def setUp(self): +# self._setUp('verifications') +# +# self.resource_type = 'image' +# self.resource_ref = 'https://localhost:9311/v1/images/1234567' +# self.resource_action = 'vm_attach' +# self.impersonation_allowed = True +# +# def test_should_create(self): +# self.session.post.return_value = FakeResp(200, {'verification_ref': +# self.entity_href}) +# +# verif_href = self.client\ +# .verifications.create(resource_type=self.resource_type, +# resource_ref=self.resource_ref, +# resource_action=self.resource_action) +# +# self.assertEqual(self.entity_href, verif_href) +# +# # Verify the correct URL was used to make the call. +# args, kwargs = self.session.post.call_args +# url = args[0] +# self.assertEqual(self.entity_base, url) +# +# # Verify that correct information was sent in the call. +# data = jsonutils.loads(kwargs['data']) +# self.assertEqual(self.resource_type, data['resource_type']) +# self.assertEqual(self.resource_action, data['resource_action']) +# +# +# class WhenTestingVerificationsResourceGet(BaseEntityResource): +# +# def setUp(self): +# self._setUp('verifications') +# +# self.secret = SecretData() +# +# def test_should_get(self): +# self.session.get.return_value = FakeResp(200, +# self.secret.get_dict()) +# +# secret = self.client.secrets.get(secret_ref=self.entity_href) +# self.assertIsInstance(secret, secrets.Secret) +# self.assertEqual(self.entity_href, secret.secret_ref) +# +# # Verify the correct URL was used to make the call. +# args, kwargs = self.session.get.call_args +# url = args[0] +# self.assertEqual(self.entity_href, url) +# +# # Verify that correct information was sent in the call. +# self.assertIsNone(kwargs['params']) diff --git a/barbicanclient/verifications.py b/barbicanclient/verifications.py index 656a54f8..566640bb 100644 --- a/barbicanclient/verifications.py +++ b/barbicanclient/verifications.py @@ -81,7 +81,7 @@ class VerificationManager(base.BaseEntityManager): resource_type=None, resource_ref=None, resource_action=None, - impersonation_allowed=None): + impersonation_allowed=False): """ Creates a new Verification in Barbican