Add unit tests for secrets.

This commit is contained in:
jfwood
2013-11-22 00:31:27 -06:00
parent dd67d98462
commit f2d6ae7334
3 changed files with 190 additions and 9 deletions

View File

@@ -117,12 +117,12 @@ class Client(object):
return resp.json()
def _check_status_code(self, resp):
status = resp.status_code
status = resp.status_code if resp else None
LOG.debug('Response status {0}'.format(status))
if status == 401:
LOG.error('Auth error: {0}'.format(self._get_error_message(resp)))
raise HTTPAuthError('{0}'.format(self._get_error_message(resp)))
if status >= 500:
if not status or status >= 500:
LOG.error('5xx Server error: {0}'.format(
self._get_error_message(resp)
))

View File

@@ -19,7 +19,9 @@ import mock
import unittest2 as unittest
from barbicanclient import client
from barbicanclient.common import auth
from barbicanclient import secrets
from barbicanclient.openstack.common import timeutils
from barbicanclient.openstack.common import jsonutils
class FakeAuth(object):
@@ -30,6 +32,36 @@ class FakeAuth(object):
self.tenant_id = tenant_id
class FakeResp(object):
def __init__(self, status_code, response_dict):
self.status_code = status_code
self.response_dict = response_dict
def json(self):
resp = self.response_dict
resp['title'] = 'some title here'
return resp
class SecretData(object):
def __init__(self):
self.name = 'Self destruction sequence'
self.payload = 'the magic words are squeamish ossifrage'
self.content = 'text/plain'
self.algorithm = 'AES'
self.created = str(timeutils.utcnow())
self.secret_dict = {'name': self.name,
'status': 'ACTIVE',
'algorithm': self.algorithm,
'created': self.created}
def get_dict(self, secret_ref):
sdict = self.secret_dict
sdict['secret_ref'] = secret_ref
return sdict
class WhenTestingClient(unittest.TestCase):
def setUp(self):
self.auth_endpoint = 'https://localhost:5000/v2.0/'
@@ -93,13 +125,162 @@ class WhenTestingClient(unittest.TestCase):
c._check_status_code(resp)
class WhenTestingSecretsResource(unittest.TestCase):
def setUp(self):
class BaseEntityResource(unittest.TestCase):
def _setUp(self, entity):
self.endpoint = 'https://localhost:9311/v1/'
self.tenant_id = 'tenant_id'
self.tenant_id = '1234567'
self.entity = entity
self.entity_base = self.endpoint + self.tenant_id + "/" + self.entity + "/"
self.entity_href = self.entity_base + '1234'
self.session = mock.MagicMock()
self.session.read.return_value = self.json
def test_should_create_secret
self.client = client.Client(session=self.session,
endpoint=self.endpoint,
tenant_id=self.tenant_id)
class WhenTestingSecretsResourcePost(BaseEntityResource):
def setUp(self):
self._setUp('secrets')
self.secret = SecretData()
def test_should_create(self):
self.session.post.return_value = FakeResp(200, {'secret_ref':
self.entity_href})
secret_href = self.client.secrets\
.store(name=self.secret.name,
payload=self.secret.payload,
payload_content_type=self.secret.content)
self.assertEqual(self.entity_href, secret_href)
# Verify the correct URL was used to make the call.
args, kwargs = self.session.post.call_args
url = args[0]
self.assertEqual(self.entity_base, url)
# Verify that correct information was sent in the call.
data = jsonutils.loads(kwargs['data'])
self.assertEqual(self.secret.name, data['name'])
self.assertEqual(self.secret.payload, data['payload'])
def test_should_fail_create_as_500(self):
self.session.post.return_value = FakeResp(500, {'bogus': 'ditto'})
with self.assertRaises(client.HTTPServerError) as cm:
self.client.secrets.\
store(name=self.secret.name,
payload=self.secret.payload,
payload_content_type=self.secret.content)
def test_should_fail_create_as_401(self):
self.session.post.return_value = FakeResp(401, {'bogus': 'ditto'})
with self.assertRaises(client.HTTPAuthError):
self.client.secrets.store(name=self.secret.name,
payload=self.secret.payload,
payload_content_type=
self.secret.content)
def test_should_fail_create_as_403(self):
self.session.post.return_value = FakeResp(403, {'bogus': 'ditto'})
with self.assertRaises(client.HTTPClientError):
self.client.secrets.store(name=self.secret.name,
payload=self.secret.payload,
payload_content_type=
self.secret.content)
class WhenTestingSecretsResourceGet(BaseEntityResource):
def setUp(self):
self._setUp('secrets')
self.secret_name = 'Self destruction sequence'
self.secret_payload = 'the magic words are squeamish ossifrage'
self.secret_content = 'text/plain'
self.algorithm = 'AES'
self.created = str(timeutils.utcnow())
self.secret = {'secret_ref': self.entity_href,
'name': self.secret_name,
'status': 'ACTIVE',
'algorithm': self.algorithm,
'created': self.created}
def test_should_get(self):
self.session.get.return_value = FakeResp(200, self.secret)
secret = self.client.secrets.get(secret_ref=self.entity_href)
self.assertIsInstance(secret, secrets.Secret)
self.assertEqual(self.entity_href, secret.secret_ref)
# Verify the correct URL was used to make the call.
args, kwargs = self.session.get.call_args
url = args[0]
self.assertEqual(self.entity_href, url)
# Verify that correct information was sent in the call.
self.assertIsNone(kwargs['params'])
# class WhenTestingVerificationsResourcePost(BaseEntityResource):
#
# def setUp(self):
# self._setUp('verifications')
#
# self.resource_type = 'image'
# self.resource_ref = 'https://localhost:9311/v1/images/1234567'
# self.resource_action = 'vm_attach'
# self.impersonation_allowed = True
#
# def test_should_create(self):
# self.session.post.return_value = FakeResp(200, {'verification_ref':
# self.entity_href})
#
# verif_href = self.client\
# .verifications.create(resource_type=self.resource_type,
# resource_ref=self.resource_ref,
# resource_action=self.resource_action)
#
# self.assertEqual(self.entity_href, verif_href)
#
# # Verify the correct URL was used to make the call.
# args, kwargs = self.session.post.call_args
# url = args[0]
# self.assertEqual(self.entity_base, url)
#
# # Verify that correct information was sent in the call.
# data = jsonutils.loads(kwargs['data'])
# self.assertEqual(self.resource_type, data['resource_type'])
# self.assertEqual(self.resource_action, data['resource_action'])
#
#
# class WhenTestingVerificationsResourceGet(BaseEntityResource):
#
# def setUp(self):
# self._setUp('verifications')
#
# self.secret = SecretData()
#
# def test_should_get(self):
# self.session.get.return_value = FakeResp(200,
# self.secret.get_dict())
#
# secret = self.client.secrets.get(secret_ref=self.entity_href)
# self.assertIsInstance(secret, secrets.Secret)
# self.assertEqual(self.entity_href, secret.secret_ref)
#
# # Verify the correct URL was used to make the call.
# args, kwargs = self.session.get.call_args
# url = args[0]
# self.assertEqual(self.entity_href, url)
#
# # Verify that correct information was sent in the call.
# self.assertIsNone(kwargs['params'])

View File

@@ -81,7 +81,7 @@ class VerificationManager(base.BaseEntityManager):
resource_type=None,
resource_ref=None,
resource_action=None,
impersonation_allowed=None):
impersonation_allowed=False):
"""
Creates a new Verification in Barbican