Smoke tests for secrets in Barbican Functional Tests

Also updated secret models and behaviors for a more usable api.
Modified client to create models using updated model API.

Change-Id: I414f4869e1013caec0a2e6c69e4e176aba4e43e7
This commit is contained in:
Thomas Dinkjian
2014-10-22 15:02:07 -05:00
parent 12bed77f89
commit d559253f9b
5 changed files with 255 additions and 164 deletions

View File

@@ -13,11 +13,11 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functionaltests.api.v1.behaviors.base_behaviors import BaseBehaviors
from functionaltests.api.v1.behaviors import base_behaviors
from functionaltests.api.v1.models import secret_models
class SecretBehaviors(BaseBehaviors):
class SecretBehaviors(base_behaviors.BaseBehaviors):
def create_secret(self, model):
"""Create a secret from the data in the model.
@@ -32,32 +32,58 @@ class SecretBehaviors(BaseBehaviors):
secret_ref = returned_data.get('secret_ref')
if secret_ref:
self.created_entities.append(secret_ref)
return resp, secret_ref
def update_secret_payload(self, secret_ref, payload, content_type):
def update_secret_payload(self, secret_ref, payload, payload_content_type,
payload_content_encoding):
"""Updates a secret's payload data
:param secret_ref: HATEOS ref of the secret to be deleted
:return: A request response object
"""
headers = {'Content-Type': content_type}
headers = {'Content-Type': payload_content_type,
'Content-Encoding': payload_content_encoding}
return self.client.put(secret_ref, data=payload, extra_headers=headers)
def get_secret(self, secret_ref, payload_content_type,
payload_content_encoding=None):
headers = {'Accept': payload_content_type,
'Accept-Encoding': payload_content_encoding}
return self.client.get(secret_ref, extra_headers=headers)
def get_secret_metadata(self, secret_ref):
"""Retrieves a secret's metadata
:param secret_ref: HATEOS ref of the secret to be deleted
:param secret_ref: HATEOS ref of the secret to be retrieved
:return: A request response object
"""
return self.client.get(
secret_ref, response_model_type=secret_models.SecretModel)
def get_secrets(self, limit=10, offset=0):
"""Handles getting a list of secrets
:param limit: limits number of returned secrets
:param offset: represents how many records to skip before retrieving
the list
"""
resp = self.client.get('secrets', params={'limit': limit,
'offset': offset})
secrets_list = resp.json()
secrets, next_ref, prev_ref = self.client.get_list_of_models(
secrets_list, secret_models.SecretModel)
return resp, secrets, next_ref, prev_ref
def delete_secret(self, secret_ref, extra_headers=None):
"""Delete a secret.
:param secret_ref: HATEOS ref of the secret to be deleted
:param extra_headers: Optional HTTP headers to add to the request
:return: A request response object
:return A request response object
"""
resp = self.client.delete(secret_ref, extra_headers=extra_headers)
self.created_entities.remove(secret_ref)

View File

@@ -1,154 +0,0 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import testtools
from barbican.tests import utils
from functionaltests.api import base
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.models import secret_models
one_phase_create_data = {
"name": "AES key",
"expiration": "2018-02-28T19:14:44.180394",
"algorithm": "aes",
"bit_length": 256,
"mode": "cbc",
"payload": "gF6+lLoF3ohA9aPRpt+6bQ==",
"payload_content_type": "application/octet-stream",
"payload_content_encoding": "base64",
}
two_phase_create_data = {
"name": "AES key",
"expiration": "2018-02-28T19:14:44.180394",
"algorithm": "aes",
"bit_length": 256,
"mode": "cbc",
"payload_content_encoding": "base64",
}
@utils.parameterized_test_case
class SecretsTestCase(base.TestCase):
def setUp(self):
super(SecretsTestCase, self).setUp()
self.behaviors = secret_behaviors.SecretBehaviors(self.client)
self.one_phase_secret_model = secret_models.SecretModel(
**one_phase_create_data)
self.two_phase_secret_model = secret_models.SecretModel(
**two_phase_create_data)
def tearDown(self):
self.behaviors.delete_all_created_secrets()
super(SecretsTestCase, self).tearDown()
def test_create_secret_single_phase(self):
"""Covers single phase secret creation.
Verify that a secret gets created with the correct http
response code and a secret reference.
"""
resp, secret_ref = self.behaviors.create_secret(
self.one_phase_secret_model)
self.assertEqual(resp.status_code, 201)
self.assertIsNotNone(secret_ref)
def test_get_created_secret_metadata(self):
"""Covers retrieval of a created secret's metadata."""
create_model = self.one_phase_secret_model
resp, secret_ref = self.behaviors.create_secret(create_model)
self.assertEqual(resp.status_code, 201)
self.assertIsNotNone(secret_ref)
get_resp = self.behaviors.get_secret_metadata(secret_ref)
get_model = get_resp.model
self.assertEqual(get_resp.status_code, 200)
self.assertEqual(get_model.name, create_model.name)
self.assertEqual(get_model.algorithm, create_model.algorithm)
self.assertEqual(get_model.bit_length, create_model.bit_length)
self.assertEqual(get_model.mode, create_model.mode)
@testtools.skip('Skip until we can fix two-step creation')
def test_create_secret_two_phase(self):
"""Covers two phase secret creation.
Verify that a secret gets created with the correct http
response code and a secret reference.
"""
# Phase 1
resp, secret_ref = self.behaviors.create_secret(
self.two_phase_secret_model)
self.assertEqual(resp.status_code, 201)
self.assertIsNotNone(secret_ref)
# Phase 2
update_resp = self.behaviors.update_secret_payload(
secret_ref, 'YmFt', 'text/plain')
self.assertEqual(update_resp.status_code, 204)
def test_delete_secret_with_accept_application_json(self):
"""Covers Launchpad Bug #1326481."""
create_model = self.one_phase_secret_model
resp, secret_ref = self.behaviors.create_secret(create_model)
self.assertEqual(resp.status_code, 201)
headers = {'Accept': 'application/json'}
resp = self.behaviors.delete_secret(secret_ref, extra_headers=headers)
self.assertEqual(resp.status_code, 204)
@utils.parameterized_dataset({
'str_type': ['not-an-int'],
'empty': [''],
'blank': [' '],
'negative_maxint': [-sys.maxint],
'negative_one': [-1],
'zero': [0]
})
def test_creating_secret_w_invalid_bit_length(self, bit_length):
"""Covers cases of creating a secret with invalid bit lengths."""
create_model = self.one_phase_secret_model
create_model.override_values(bit_length=bit_length)
resp, secret_ref = self.behaviors.create_secret(create_model)
self.assertEqual(resp.status_code, 400)
@utils.parameterized_dataset({
'name': {'name': 'a' * 256},
'algorithm': {'algorithm': 'a' * 256},
'mode': {'mode': 'a' * 256},
'expiration': {'expiration': 'a' * 256},
'content_type': {'payload_content_type': 'a' * 256},
'content_encoding': {'payload_content_encoding': 'a' * 256}
})
def test_create_secret_with_oversized_string_in(self, **kwargs):
"""Covers negative cases for an oversized string values."""
create_model = self.one_phase_secret_model
create_model.override_values(**kwargs)
resp, secret_ref = self.behaviors.create_secret(create_model)
self.assertEqual(resp.status_code, 400)

View File

@@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functionaltests.api.v1.models.base_models import BaseModel

View File

@@ -0,0 +1,194 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import binascii
from testtools import testcase
from functionaltests.api import base
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.models import secret_models
secret_create_defaults_data = {
"name": "AES key",
"expiration": "2018-02-28T19:14:44.180394",
"algorithm": "aes",
"bit_length": 256,
"mode": "cbc",
"payload": "gF6+lLoF3ohA9aPRpt+6bQ==",
"payload_content_type": "application/octet-stream",
"payload_content_encoding": "base64",
}
# Any field with None will be created in the model with None as the value
# but will be omitted in the final request to the server.
#
# secret_create_nones_data is effectively an empty json request to the server.
secret_create_nones_data = {
"name": None,
"expiration": None,
"algorithm": None,
"bit_length": None,
"mode": None,
"payload": None,
"payload_content_type": None,
"payload_content_encoding": None,
}
secret_create_emptystrings_data = {
"name": '',
"expiration": '',
"algorithm": '',
"bit_length": '',
"mode": '',
"payload": '',
"payload_content_type": '',
"payload_content_encoding": '',
}
secret_create_two_phase_data = {
"name": "AES key",
"expiration": "2018-02-28T19:14:44.180394",
"algorithm": "aes",
"bit_length": 256,
"mode": "cbc",
}
class SecretsTestCase(base.TestCase):
def setUp(self):
super(SecretsTestCase, self).setUp()
self.behaviors = secret_behaviors.SecretBehaviors(self.client)
def tearDown(self):
self.behaviors.delete_all_created_secrets()
super(SecretsTestCase, self).tearDown()
@testcase.attr('positive')
def test_secret_create_defaults_no_expiration(self):
"""Covers creating a secret without an expiration."""
test_model = secret_models.SecretModel(**secret_create_defaults_data)
overrides = {"expiration": None}
test_model.override_values(**overrides)
resp, secret_ref = self.behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
resp = self.behaviors.get_secret_metadata(secret_ref)
self.assertEqual(resp.model.status, "ACTIVE")
self.assertGreater(resp.model.secret_ref, 0)
@testcase.attr('positive')
def test_secret_get_defaults_metadata(self):
"""Covers getting and checking a secret's metadata."""
test_model = secret_models.SecretModel(**secret_create_defaults_data)
resp, secret_ref = self.behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
resp = self.behaviors.get_secret_metadata(secret_ref)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.model.status, "ACTIVE")
self.assertEqual(resp.model.name, test_model.name)
self.assertEqual(resp.model.mode, test_model.mode)
self.assertEqual(resp.model.algorithm, test_model.algorithm)
self.assertEqual(resp.model.bit_length, test_model.bit_length)
@testcase.attr('positive')
def test_secret_create_defaults(self):
"""Covers single phase secret creation.
Verify that a secret gets created with the correct http
response code and a secret reference.
"""
test_model = secret_models.SecretModel(**secret_create_defaults_data)
resp, secret_ref = self.behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
self.assertIsNotNone(secret_ref)
@testcase.attr('positive')
def test_secret_delete_defaults(self):
"""Covers deleting a secret."""
test_model = secret_models.SecretModel(**secret_create_defaults_data)
resp, secret_ref = self.behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
del_resp = self.behaviors.delete_secret(secret_ref)
self.assertEqual(del_resp.status_code, 204)
@testcase.attr('positive')
def test_secret_get_defaults(self):
"""Covers getting a secret's payload data."""
test_model = secret_models.SecretModel(**secret_create_defaults_data)
resp, secret_ref = self.behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
get_resp = self.behaviors.get_secret(secret_ref,
test_model.payload_content_type)
self.assertEqual(get_resp.status_code, 200)
self.assertIn(test_model.payload,
binascii.b2a_base64(get_resp.content))
@testcase.attr('positive')
def test_secret_update_defaults_two_phase(self):
"""Covers updating a secret's payload data."""
# Create
test_model = secret_models.SecretModel(**secret_create_two_phase_data)
resp, secret_ref = self.behaviors.create_secret(test_model)
self.assertEqual(resp.status_code, 201)
# Update
payload = "gF6+lLoF3ohA9aPRpt+6bQ=="
payload_content_type = "application/octet-stream"
payload_content_encoding = "base64"
update_resp = self.behaviors.update_secret_payload(
secret_ref, payload=payload,
payload_content_type=payload_content_type,
payload_content_encoding=payload_content_encoding)
self.assertEqual(update_resp.status_code, 204)
# Get/Check Updated
sec_resp = self.behaviors.get_secret(
secret_ref=secret_ref,
payload_content_type=payload_content_type)
self.assertEqual(sec_resp.status_code, 200)
self.assertIn('gF6+lLoF3ohA9aPRpt+6bQ==',
binascii.b2a_base64(sec_resp.content))
@testcase.attr('positive')
def test_secrets_get_defaults_multiple_secrets(self):
"""Covers getting a list of secrets.
Creates 11 secrets then returns a list of 2 secrets
"""
test_model = secret_models.SecretModel(**secret_create_defaults_data)
limit = 2
offset = 0
for i in range(0, 11):
self.behaviors.create_secret(test_model)
resp, secrets_list, next_ref, prev_ref = self.behaviors.get_secrets(
limit=limit, offset=offset)
self.assertEqual(resp.status_code, 200)
self.assertEqual(len(secrets_list), limit)
self.assertIsNone(prev_ref)
self.assertIsNotNone(next_ref)

View File

@@ -87,8 +87,31 @@ class BarbicanClient(object):
return self._auth_provider.base_url(filters)
def get_list_of_models(self, item_list, model_type):
"""Takes a list of barbican objects and creates a list of models
:param item_list: the json returned from a barbican GET request for
a list of objects
:param model_type: The model used in the creation of the list of models
:return A list of models and the refs for next and previous lists.
"""
models, next_ref, prev_ref = [], None, None
for item in item_list:
if 'next' == item:
next_ref = item_list.get('next')
elif 'previous' == item:
prev_ref = item_list.get('previous')
elif item in ('secrets', 'orders', 'containers'):
for entity in item_list.get(item):
models.append(model_type(entity))
return models, next_ref, prev_ref
def request(self, method, url, data=None, extra_headers=None,
use_auth=True, response_model_type=None, request_model=None):
use_auth=True, response_model_type=None, request_model=None,
params=None):
"""Prepares and sends http request through Requests."""
if 'http' not in url:
url = os.path.join(self.get_base_url(), url)
@@ -109,7 +132,8 @@ class BarbicanClient(object):
'url': url,
'headers': headers,
'data': data,
'timeout': self.timeout
'timeout': self.timeout,
'params': params
}
if use_auth:
call_kwargs['auth'] = self._auth