Merge "Fixes for Running Functional Tests with Python 3.5"
This commit is contained in:
commit
7f9839707a
@ -132,14 +132,10 @@ def _do_enforce_content_types(pecan_req, valid_content_types):
|
||||
types passed in by our caller.
|
||||
"""
|
||||
if pecan_req.content_type not in valid_content_types:
|
||||
content_type = pecan_req.content_type
|
||||
if isinstance(content_type, bytes):
|
||||
content_type = content_type.decode('utf-8')
|
||||
m = u._(
|
||||
"Unexpected content type: {type}. Expected content types "
|
||||
"Unexpected content type. Expected content types "
|
||||
"are: {expected}"
|
||||
).format(
|
||||
type=content_type,
|
||||
expected=valid_content_types
|
||||
)
|
||||
pecan.abort(415, m)
|
||||
|
@ -257,8 +257,7 @@ class NewSecretValidator(ValidatorBase):
|
||||
mime_types.is_supported(content_type),
|
||||
schema_name,
|
||||
u._("payload_content_type is not one of {supported}"
|
||||
).format(supplied=content_type,
|
||||
supported=mime_types.SUPPORTED),
|
||||
).format(supported=mime_types.SUPPORTED),
|
||||
"payload_content_type")
|
||||
|
||||
return json_data
|
||||
@ -316,9 +315,8 @@ class NewSecretValidator(ValidatorBase):
|
||||
self._assert_validity(
|
||||
mime_types.is_supported(content_type),
|
||||
schema_name,
|
||||
u._("payload_content_type {supplied} is not one of {supported}"
|
||||
).format(supplied=content_type,
|
||||
supported=mime_types.SUPPORTED),
|
||||
u._("payload_content_type is not one of {supported}"
|
||||
).format(supported=mime_types.SUPPORTED),
|
||||
"payload_content_type")
|
||||
|
||||
self._assert_validity(
|
||||
|
@ -71,7 +71,7 @@ def create_csr_signed_with_wrong_key():
|
||||
|
||||
def create_bad_csr():
|
||||
"""Generate a CSR that will not parse."""
|
||||
return "Bad PKCS10 Data"
|
||||
return b"Bad PKCS10 Data"
|
||||
|
||||
|
||||
def create_csr_with_bad_subject_dn():
|
||||
|
@ -226,7 +226,7 @@ def get_encrypted_private_key_pem():
|
||||
of the private_encrypted.pk8 file.
|
||||
"""
|
||||
|
||||
return """-----BEGIN ENCRYPTED PRIVATE KEY-----
|
||||
return b"""-----BEGIN ENCRYPTED PRIVATE KEY-----
|
||||
MIIE6TAbBgkqhkiG9w0BBQMwDgQIssadeQrYhhACAggABIIEyDNw3SV2b19yy4Q/
|
||||
kTbtJ/p2X2zKDqr7GgLeAowqqhcMfvprI7G8C0XtwxkR4SjMZUXNcmOwQB2kNKtK
|
||||
ZilCz6pSx81iUj4s1fU460XkhkIeV+F7aB2PsTG1oDfPCuzKFjT6EuSE6lFUH89r
|
||||
@ -268,7 +268,7 @@ def get_passphrase_txt():
|
||||
of the passphrase.txt file.
|
||||
"""
|
||||
|
||||
return """password"""
|
||||
return b"""password"""
|
||||
|
||||
|
||||
def get_csr_pem():
|
||||
@ -283,7 +283,7 @@ def get_csr_pem():
|
||||
of the csr.pem file.
|
||||
"""
|
||||
|
||||
return """-----BEGIN CERTIFICATE REQUEST-----
|
||||
return b"""-----BEGIN CERTIFICATE REQUEST-----
|
||||
MIICWzCCAUMCAQAwFjEUMBIGA1UEAwwLZXhhbXBsZS5jb20wggEiMA0GCSqGSIb3
|
||||
DQEBAQUAA4IBDwAwggEKAoIBAQCza2VoDXmBUMmwjFu9F6MM5q/AZ1WjnWA2YNdN
|
||||
y237TrGN/nobDDv8FBBpUPmHNZ04H1LyxFcP8ReFrcIXpifsReu2lAWaqRPxovu5
|
||||
|
@ -37,8 +37,8 @@ conf_multiple_backends_enabled = CONF.keymanager.\
|
||||
|
||||
class TestCase(oslotest.BaseTestCase):
|
||||
max_payload_size = 10000
|
||||
max_sized_payload = 'a' * max_payload_size
|
||||
oversized_payload = 'a' * (max_payload_size + 1)
|
||||
max_sized_payload = b'a' * max_payload_size
|
||||
oversized_payload = b'a' * (max_payload_size + 1)
|
||||
max_field_size = 255
|
||||
max_sized_field = 'a' * max_field_size
|
||||
oversized_field = 'a' * (max_field_size + 1)
|
||||
|
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
import base64
|
||||
import copy
|
||||
import json
|
||||
from oslo_serialization import jsonutils
|
||||
import time
|
||||
|
||||
from OpenSSL import crypto
|
||||
@ -281,7 +281,7 @@ class CertificatesTestCase(base.TestCase):
|
||||
order_resp.model.sub_status_message)
|
||||
|
||||
def confirm_error_message(self, resp, message):
|
||||
resp_dict = json.loads(resp.content)
|
||||
resp_dict = jsonutils.loads(resp.content)
|
||||
self.assertEqual(message, resp_dict['description'])
|
||||
|
||||
@testtools.testcase.attr('positive')
|
||||
@ -426,7 +426,7 @@ class CertificatesTestCase(base.TestCase):
|
||||
create_resp, order_ref = self.behaviors.create_order(test_model)
|
||||
self.assertEqual(400, create_resp.status_code)
|
||||
self.assertIsNone(order_ref)
|
||||
error_description = json.loads(create_resp.content)['description']
|
||||
error_description = jsonutils.loads(create_resp.content)['description']
|
||||
self.assertIn("Invalid PKCS10 Data", error_description)
|
||||
|
||||
@testtools.testcase.attr('negative')
|
||||
|
@ -71,7 +71,7 @@ create_container_rsa_data = {
|
||||
accepted_str_values = {
|
||||
'alphanumeric': ['a2j3j6ll9'],
|
||||
'punctuation': ['~!@#$%^&*()_+`-={}[]|:;<>,.?'],
|
||||
'len_255': [str(bytearray().zfill(255))],
|
||||
'len_255': [base.TestCase.max_sized_field],
|
||||
'uuid': ['54262d9d-4bc7-4821-8df0-dc2ca8e112bb'],
|
||||
'empty': ['']
|
||||
}
|
||||
|
@ -12,7 +12,7 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
from oslo_serialization import jsonutils
|
||||
import sys
|
||||
import time
|
||||
|
||||
@ -295,7 +295,7 @@ class OrdersTestCase(base.TestCase):
|
||||
resp, order_ref = self.behaviors.create_order(test_model)
|
||||
|
||||
# Make sure we actually get a message back
|
||||
error_msg = json.loads(resp.content).get('title')
|
||||
error_msg = jsonutils.loads(resp.content).get('title')
|
||||
|
||||
self.assertEqual(400, resp.status_code)
|
||||
self.assertIsNotNone(error_msg)
|
||||
@ -322,7 +322,7 @@ class OrdersTestCase(base.TestCase):
|
||||
self.assertIsNotNone(order_ref)
|
||||
|
||||
@utils.parameterized_dataset({
|
||||
'negative_maxint': [-sys.maxint],
|
||||
'negative_maxint': [-sys.maxsize],
|
||||
'negative_7': [-7],
|
||||
'negative_1': [-1],
|
||||
'0': [0],
|
||||
|
@ -12,7 +12,7 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
from oslo_serialization import jsonutils
|
||||
from testtools import testcase
|
||||
import uuid
|
||||
|
||||
@ -105,7 +105,8 @@ class SecretMetadataTestCase(base.TestCase):
|
||||
|
||||
get_resp = self.behaviors.get_metadata(secret_ref)
|
||||
self.assertEqual(200, get_resp.status_code)
|
||||
self.assertEqual(json.loads(get_resp.content), self.valid_metadata)
|
||||
self.assertEqual(jsonutils.loads(get_resp.content),
|
||||
self.valid_metadata)
|
||||
|
||||
@testcase.attr('negative')
|
||||
def test_secret_metadata_get_no_secret(self):
|
||||
@ -166,7 +167,8 @@ class SecretMetadataTestCase(base.TestCase):
|
||||
get_resp = self.behaviors.get_metadatum(secret_ref,
|
||||
self.valid_metadatum_key)
|
||||
self.assertEqual(200, get_resp.status_code)
|
||||
self.assertEqual(json.loads(get_resp.content), self.valid_metadatum)
|
||||
self.assertEqual(jsonutils.loads(get_resp.content),
|
||||
self.valid_metadatum)
|
||||
|
||||
@testcase.attr('negative')
|
||||
def test_secret_metadatum_get_wrong_key(self):
|
||||
|
@ -13,9 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import json
|
||||
from oslo_serialization import base64 as oslo_base64
|
||||
from oslo_serialization import jsonutils
|
||||
import six
|
||||
import sys
|
||||
import testtools
|
||||
import time
|
||||
@ -40,7 +40,7 @@ admin_b = CONF.rbac_users.admin_b
|
||||
|
||||
def get_pem_content(pem):
|
||||
b64_content = translations.get_pem_components(pem)[1]
|
||||
return base64.b64decode(b64_content)
|
||||
return oslo_base64.decode_as_bytes(b64_content)
|
||||
|
||||
|
||||
def get_private_key_req():
|
||||
@ -50,7 +50,7 @@ def get_private_key_req():
|
||||
'algorithm': 'rsa',
|
||||
'bit_length': 2048,
|
||||
'secret_type': 'private',
|
||||
'payload': base64.b64encode(keys.get_private_key_pem())}
|
||||
'payload': oslo_base64.encode_as_bytes(keys.get_private_key_pem())}
|
||||
|
||||
|
||||
def get_public_key_req():
|
||||
@ -60,7 +60,7 @@ def get_public_key_req():
|
||||
'algorithm': 'rsa',
|
||||
'bit_length': 2048,
|
||||
'secret_type': 'public',
|
||||
'payload': base64.b64encode(keys.get_public_key_pem())}
|
||||
'payload': oslo_base64.encode_as_bytes(keys.get_public_key_pem())}
|
||||
|
||||
|
||||
def get_certificate_req():
|
||||
@ -70,7 +70,7 @@ def get_certificate_req():
|
||||
'algorithm': 'rsa',
|
||||
'bit_length': 2048,
|
||||
'secret_type': 'certificate',
|
||||
'payload': base64.b64encode(keys.get_certificate_pem())}
|
||||
'payload': oslo_base64.encode_as_bytes(keys.get_certificate_pem())}
|
||||
|
||||
|
||||
def get_passphrase_req():
|
||||
@ -94,7 +94,7 @@ def get_default_data():
|
||||
|
||||
|
||||
def get_default_payload():
|
||||
return "AQIDBAUGBwgBAgMEBQYHCAECAwQFBgcIAQIDBAUGBwg="
|
||||
return b"AQIDBAUGBwgBAgMEBQYHCAECAwQFBgcIAQIDBAUGBwg="
|
||||
|
||||
|
||||
@utils.parameterized_test_case
|
||||
@ -222,8 +222,8 @@ class SecretsTestCase(base.TestCase):
|
||||
payload_content_type='',
|
||||
omit_headers=['Accept'])
|
||||
self.assertEqual(200, get_resp.status_code)
|
||||
self.assertIn(test_model.payload,
|
||||
binascii.b2a_base64(get_resp.content))
|
||||
self.assertEqual(test_model.payload,
|
||||
oslo_base64.encode_as_bytes(get_resp.content))
|
||||
|
||||
@testcase.attr('negative')
|
||||
def test_secret_delete_doesnt_exist(self):
|
||||
@ -293,7 +293,7 @@ class SecretsTestCase(base.TestCase):
|
||||
"""
|
||||
test_model = secret_models.SecretModel(
|
||||
**self.default_secret_create_data)
|
||||
test_model.payload = str(self.oversized_payload)
|
||||
test_model.payload = self.oversized_payload
|
||||
|
||||
resp, secret_ref = self.behaviors.create_secret(test_model)
|
||||
self.assertEqual(413, resp.status_code)
|
||||
@ -395,11 +395,11 @@ class SecretsTestCase(base.TestCase):
|
||||
|
||||
Launchpad bug #1315498.
|
||||
"""
|
||||
oversized_payload = bytearray().zfill(self.max_payload_size + 1)
|
||||
oversized_payload = bytearray(self.oversized_payload)
|
||||
|
||||
# put a value in the middle of the data that does not have a UTF-8
|
||||
# code point. Using // to be python3-friendly.
|
||||
oversized_payload[self.max_payload_size // 2] = b'\xb0'
|
||||
# code point. Using // and 176 to be python3-friendly.
|
||||
oversized_payload[self.max_payload_size // 2] = 176 # 0xb0
|
||||
|
||||
test_model = secret_models.SecretModel(
|
||||
**self.default_secret_create_two_phase_data)
|
||||
@ -411,7 +411,7 @@ class SecretsTestCase(base.TestCase):
|
||||
secret_ref=secret_ref,
|
||||
payload_content_type='application/octet-stream',
|
||||
payload_content_encoding='base64',
|
||||
payload=str(oversized_payload))
|
||||
payload=oversized_payload)
|
||||
self.assertEqual(413, put_resp.status_code)
|
||||
|
||||
@testcase.attr('negative')
|
||||
@ -486,7 +486,7 @@ class SecretsTestCase(base.TestCase):
|
||||
**self.default_secret_create_two_phase_data)
|
||||
test_model.payload_content_encoding = 'base64'
|
||||
test_model.payload_content_type = 'application/octet-stream'
|
||||
test_model.payload = base64.b64encode('abcdef')
|
||||
test_model.payload = oslo_base64.encode_as_bytes('abcdef')
|
||||
|
||||
resp, secret_ref = self.behaviors.create_secret(test_model)
|
||||
self.assertEqual(201, resp.status_code)
|
||||
@ -496,8 +496,8 @@ class SecretsTestCase(base.TestCase):
|
||||
payload_content_type=test_model.payload_content_type,
|
||||
payload_content_encoding=test_model.payload_content_encoding)
|
||||
self.assertEqual(200, get_resp.status_code)
|
||||
self.assertIn(test_model.payload,
|
||||
binascii.b2a_base64(get_resp.content))
|
||||
self.assertEqual(test_model.payload,
|
||||
oslo_base64.encode_as_bytes(get_resp.content))
|
||||
|
||||
@testcase.attr('negative')
|
||||
def test_secret_create_defaults_bad_content_type_check_message(self):
|
||||
@ -511,11 +511,11 @@ class SecretsTestCase(base.TestCase):
|
||||
# first, ensure that the return code is 400
|
||||
self.assertEqual(400, resp.status_code)
|
||||
|
||||
resp_dict = json.loads(resp.content)
|
||||
resp_dict = jsonutils.loads(resp.content)
|
||||
|
||||
self.assertIn(
|
||||
"Provided object does not match schema 'Secret': "
|
||||
"payload_content_type plain-text is not one of ['text/plain', "
|
||||
"payload_content_type is not one of ['text/plain', "
|
||||
"'text/plain;charset=utf-8', 'text/plain; charset=utf-8', "
|
||||
"'application/octet-stream'", resp_dict['description'])
|
||||
self.assertIn("Bad Request", resp_dict['title'])
|
||||
@ -638,11 +638,11 @@ class SecretsTestCase(base.TestCase):
|
||||
@testcase.attr('positive')
|
||||
def test_secret_create_with_valid_bit_length(self, bit_length):
|
||||
"""Covers cases of creating secrets with valid bit lengths."""
|
||||
byte_length = bit_length / 8
|
||||
byte_length = bit_length // 8
|
||||
secret = bytearray(byte_length)
|
||||
for x in range(0, byte_length):
|
||||
secret[x] = x
|
||||
secret64 = base64.b64encode(secret)
|
||||
secret64 = oslo_base64.encode_as_bytes(secret)
|
||||
|
||||
test_model = secret_models.SecretModel(
|
||||
**self.default_secret_create_data)
|
||||
@ -656,7 +656,7 @@ class SecretsTestCase(base.TestCase):
|
||||
'str_type': ['not-an-int'],
|
||||
'empty': [''],
|
||||
'blank': [' '],
|
||||
'negative_maxint': [-sys.maxint],
|
||||
'negative_maxint': [-sys.maxsize],
|
||||
'negative_one': [-1],
|
||||
'zero': [0]
|
||||
})
|
||||
@ -735,10 +735,10 @@ class SecretsTestCase(base.TestCase):
|
||||
self.assertEqual(200, get_resp.status_code)
|
||||
|
||||
if payload_content_encoding == 'base64':
|
||||
self.assertIn(test_model.payload,
|
||||
binascii.b2a_base64(get_resp.content))
|
||||
self.assertEqual(test_model.payload,
|
||||
oslo_base64.encode_as_bytes(get_resp.content))
|
||||
else:
|
||||
self.assertIn(test_model.payload, get_resp.content)
|
||||
self.assertEqual(test_model.payload, get_resp.content)
|
||||
|
||||
@utils.parameterized_dataset({
|
||||
'text_content_type_none_encoding': {
|
||||
@ -776,10 +776,10 @@ class SecretsTestCase(base.TestCase):
|
||||
self.assertEqual(200, get_resp.status_code)
|
||||
|
||||
if payload_content_encoding == 'base64':
|
||||
self.assertIn(test_model.payload,
|
||||
binascii.b2a_base64(get_resp.content))
|
||||
self.assertEqual(test_model.payload,
|
||||
oslo_base64.encode_as_bytes(get_resp.content))
|
||||
else:
|
||||
self.assertIn(test_model.payload, get_resp.content)
|
||||
self.assertEqual(test_model.payload, get_resp.content)
|
||||
|
||||
@utils.parameterized_dataset({
|
||||
'empty_content_type_and_encoding': {
|
||||
@ -881,8 +881,8 @@ class SecretsTestCase(base.TestCase):
|
||||
'array': [['boom']],
|
||||
'int': [123],
|
||||
'none': [None],
|
||||
'bad_character': [unichr(0x0080)],
|
||||
'bad_characters': [unichr(0x1111) + unichr(0xffff)]
|
||||
'bad_character': [six.unichr(0x0080)],
|
||||
'bad_characters': [six.unichr(0x1111) + six.unichr(0xffff)]
|
||||
})
|
||||
@testcase.attr('negative')
|
||||
def test_secret_create_defaults_invalid_payload(self, payload):
|
||||
@ -994,7 +994,7 @@ class SecretsTestCase(base.TestCase):
|
||||
|
||||
@utils.parameterized_dataset({
|
||||
'symmetric': ['symmetric',
|
||||
base64.b64decode(
|
||||
oslo_base64.decode_as_bytes(
|
||||
get_default_payload()),
|
||||
get_default_data()],
|
||||
'private': ['private',
|
||||
@ -1007,7 +1007,7 @@ class SecretsTestCase(base.TestCase):
|
||||
keys.get_certificate_pem(),
|
||||
get_certificate_req()],
|
||||
'passphrase': ['passphrase',
|
||||
'mysecretpassphrase',
|
||||
b'mysecretpassphrase',
|
||||
get_passphrase_req()]
|
||||
})
|
||||
@testcase.attr('positive')
|
||||
@ -1184,7 +1184,7 @@ class SecretsUnauthedTestCase(base.TestCase):
|
||||
|
||||
stored_auth = self.client._auth[
|
||||
self.client._default_user_name].stored_auth
|
||||
project_id = stored_auth.values()[0]['project_id']
|
||||
project_id = list(stored_auth.values())[0]['project_id']
|
||||
self.project_id_header = {
|
||||
'X-Project-Id': project_id
|
||||
}
|
||||
@ -1536,7 +1536,7 @@ class SecretsMultipleBackendTestCase(base.TestCase):
|
||||
'symmetric_type_preferred_store': [
|
||||
admin_a,
|
||||
'symmetric',
|
||||
base64.b64decode(get_default_payload()),
|
||||
oslo_base64.decode_as_bytes(get_default_payload()),
|
||||
get_default_data()
|
||||
],
|
||||
'private_type_preferred_store': [
|
||||
@ -1566,7 +1566,7 @@ class SecretsMultipleBackendTestCase(base.TestCase):
|
||||
'symmetric_type_no_preferred_store': [
|
||||
admin_b,
|
||||
'symmetric',
|
||||
base64.b64decode(get_default_payload()),
|
||||
oslo_base64.decode_as_bytes(get_default_payload()),
|
||||
get_default_data()
|
||||
],
|
||||
'private_type_no_preferred_store': [
|
||||
@ -1590,7 +1590,7 @@ class SecretsMultipleBackendTestCase(base.TestCase):
|
||||
'passphrase_type_no_preferred_store': [
|
||||
admin_b,
|
||||
'passphrase',
|
||||
'mysecretpassphrase',
|
||||
b'mysecretpassphrase',
|
||||
get_passphrase_req()
|
||||
],
|
||||
})
|
||||
|
@ -13,8 +13,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -34,8 +34,7 @@ class BaseModel(object):
|
||||
|
||||
:return: A string of JSON containing the fields in this object
|
||||
"""
|
||||
|
||||
return json.dumps(self.obj_to_dict())
|
||||
return jsonutils.dump_as_bytes(self.obj_to_dict())
|
||||
|
||||
def obj_to_dict(self):
|
||||
"""Create a dict of the values for this model object.
|
||||
@ -75,7 +74,7 @@ class BaseModel(object):
|
||||
:return: a secret object
|
||||
"""
|
||||
try:
|
||||
json_dict = json.loads(serialized_str)
|
||||
json_dict = jsonutils.loads(serialized_str)
|
||||
return cls.dict_to_obj(json_dict)
|
||||
except TypeError as e:
|
||||
LOG.error('Couldn\'t deserialize input: %s\n Because: %s',
|
||||
|
@ -27,7 +27,7 @@ create_secret_defaults_data = {
|
||||
"algorithm": "aes",
|
||||
"bit_length": 256,
|
||||
"mode": "cbc",
|
||||
"payload": "gF6+lLoF3ohA9aPRpt+6bQ==",
|
||||
"payload": b"gF6+lLoF3ohA9aPRpt+6bQ==",
|
||||
"payload_content_type": "application/octet-stream",
|
||||
"payload_content_encoding": "base64",
|
||||
}
|
||||
@ -155,7 +155,7 @@ class ContainersTestCase(base.TestCase):
|
||||
@utils.parameterized_dataset({
|
||||
'alphanumeric': ['a2j3j6ll9'],
|
||||
'punctuation': ['~!@#$%^&*()_+`-={}[]|:;<>,.?'],
|
||||
'len_255': [str(bytearray().zfill(255))],
|
||||
'len_255': [base.TestCase.max_sized_field],
|
||||
'uuid': ['54262d9d-4bc7-4821-8df0-dc2ca8e112bb'],
|
||||
'empty': ['']
|
||||
})
|
||||
|
@ -12,9 +12,9 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import binascii
|
||||
import copy
|
||||
|
||||
from oslo_serialization import base64 as oslo_base64
|
||||
from testtools import testcase
|
||||
|
||||
from barbican.tests import utils
|
||||
@ -28,7 +28,7 @@ default_secret_create_data = {
|
||||
"algorithm": "aes",
|
||||
"bit_length": 256,
|
||||
"mode": "cbc",
|
||||
"payload": "gF6+lLoF3ohA9aPRpt+6bQ==",
|
||||
"payload": b"gF6+lLoF3ohA9aPRpt+6bQ==",
|
||||
"payload_content_type": "application/octet-stream",
|
||||
"payload_content_encoding": "base64",
|
||||
}
|
||||
@ -54,7 +54,7 @@ default_secret_create_emptystrings_data = {
|
||||
"algorithm": '',
|
||||
"bit_length": '',
|
||||
"mode": '',
|
||||
"payload": '',
|
||||
"payload": b'',
|
||||
"payload_content_type": '',
|
||||
"payload_content_encoding": '',
|
||||
}
|
||||
@ -102,13 +102,13 @@ class SecretsTestCase(base.TestCase):
|
||||
resp = self.behaviors.get_secret_metadata(secret_ref)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.assertEqual("ACTIVE", resp.model.status)
|
||||
self.assertGreater(resp.model.secret_ref, 0)
|
||||
self.assertTrue(resp.model.secret_ref)
|
||||
|
||||
@utils.parameterized_dataset({
|
||||
'alphanumeric': ['1f34ds'],
|
||||
'punctuation': ['~!@#$%^&*()_+`-={}[]|:;<>,.?'],
|
||||
'uuid': ['54262d9d-4bc7-4821-8df0-dc2ca8e112bb'],
|
||||
'len_255': [str(bytearray().zfill(255))],
|
||||
'len_255': [base.TestCase.max_sized_field],
|
||||
'empty': [''],
|
||||
'null': [None]
|
||||
})
|
||||
@ -175,8 +175,8 @@ class SecretsTestCase(base.TestCase):
|
||||
get_resp = self.behaviors.get_secret(secret_ref,
|
||||
test_model.payload_content_type)
|
||||
self.assertEqual(200, get_resp.status_code)
|
||||
self.assertIn(test_model.payload,
|
||||
binascii.b2a_base64(get_resp.content))
|
||||
self.assertEqual(test_model.payload,
|
||||
oslo_base64.encode_as_bytes(get_resp.content))
|
||||
|
||||
@testcase.attr('positive')
|
||||
def test_secret_update_two_phase(self):
|
||||
@ -189,7 +189,7 @@ class SecretsTestCase(base.TestCase):
|
||||
self.assertEqual(201, resp.status_code)
|
||||
|
||||
# Update
|
||||
payload = "gF6+lLoF3ohA9aPRpt+6bQ=="
|
||||
payload = b"gF6+lLoF3ohA9aPRpt+6bQ=="
|
||||
payload_content_type = "application/octet-stream"
|
||||
payload_content_encoding = "base64"
|
||||
|
||||
@ -204,8 +204,8 @@ class SecretsTestCase(base.TestCase):
|
||||
secret_ref=secret_ref,
|
||||
payload_content_type=payload_content_type)
|
||||
self.assertEqual(200, sec_resp.status_code)
|
||||
self.assertIn('gF6+lLoF3ohA9aPRpt+6bQ==',
|
||||
binascii.b2a_base64(sec_resp.content))
|
||||
self.assertEqual(b'gF6+lLoF3ohA9aPRpt+6bQ==',
|
||||
oslo_base64.encode_as_bytes(sec_resp.content))
|
||||
|
||||
@testcase.attr('positive')
|
||||
def test_secrets_get_multiple_secrets(self):
|
||||
|
@ -16,7 +16,9 @@ limitations under the License.
|
||||
import logging
|
||||
import os
|
||||
|
||||
from oslo_serialization import base64
|
||||
import requests
|
||||
import six
|
||||
from six.moves import urllib
|
||||
from tempest.lib.common.utils import test_utils
|
||||
|
||||
@ -86,6 +88,17 @@ class BarbicanClient(object):
|
||||
retval.append(username)
|
||||
return retval
|
||||
|
||||
def _attempt_ascii(self, text):
|
||||
"""Attempt to decode to ascii, works with py27 and py35
|
||||
|
||||
Throw an encode or decode exception is text can not be
|
||||
presented in ascii.
|
||||
"""
|
||||
if isinstance(text, six.text_type):
|
||||
return text.encode('ascii')
|
||||
else:
|
||||
return text.decode('ascii')
|
||||
|
||||
def _attempt_to_stringify_content(self, content, content_tag):
|
||||
if content is None:
|
||||
return content
|
||||
@ -93,12 +106,12 @@ class BarbicanClient(object):
|
||||
# NOTE(jaosorior): The content is decoded as ascii since the
|
||||
# logging module has problems with utf-8 strings and will end up
|
||||
# trying to decode this as ascii.
|
||||
return content.decode('ascii')
|
||||
except UnicodeDecodeError:
|
||||
return self._attempt_ascii(content)
|
||||
except (UnicodeDecodeError, UnicodeEncodeError):
|
||||
# NOTE(jaosorior): Since we are using base64 as default and this is
|
||||
# only for logging (in order to debug); Lets not put too much
|
||||
# effort in this and just use encoded string.
|
||||
return content.encode('base64')
|
||||
return base64.encode_as_text(content)
|
||||
|
||||
def stringify_request(self, request_kwargs, response):
|
||||
format_kwargs = {
|
||||
|
9
tox.ini
9
tox.ini
@ -86,6 +86,15 @@ commands =
|
||||
/bin/bash {toxinidir}/functionaltests/pretty_tox.sh '{posargs}'
|
||||
passenv = KMIP_PLUGIN_ENABLED
|
||||
|
||||
[testenv:py35functional]
|
||||
basepython = python3
|
||||
deps = -r{toxinidir}/test-requirements.txt
|
||||
setenv = OS_TEST_PATH={toxinidir}/functionaltests
|
||||
commands =
|
||||
/usr/bin/find . -type f -name "*.py[c|o]" -delete
|
||||
/bin/bash {toxinidir}/functionaltests/pretty_tox.sh '{posargs}'
|
||||
passenv = KMIP_PLUGIN_ENABLED
|
||||
|
||||
[testenv:cmd]
|
||||
# This tox env is purely to make local test development easier
|
||||
# Note: This requires local running instances of Barbican and Keystone
|
||||
|
Loading…
Reference in New Issue
Block a user