Fixes for Running Functional Tests with Python 3.5

Fixing the standard things when making code py27/p35 compatible.

Also, removing the logging of the passed value of an HTTP
header.  If the value could not be encoded to log, then there
are Tracebacks that showed up with Python 3.5.  Since the value
can be passed by a user, it should either be scrubbed before logging
or not logged, to prevent possible content injection in the log
stream.

Change-Id: I8df1553acb6c7e5f75a1b50f024dc032ca982a93
This commit is contained in:
Dave McCowan 2017-04-04 22:36:43 -04:00
parent 7d8074fddb
commit 2909fda016
15 changed files with 99 additions and 82 deletions

View File

@ -132,14 +132,10 @@ def _do_enforce_content_types(pecan_req, valid_content_types):
types passed in by our caller.
"""
if pecan_req.content_type not in valid_content_types:
content_type = pecan_req.content_type
if isinstance(content_type, bytes):
content_type = content_type.decode('utf-8')
m = u._(
"Unexpected content type: {type}. Expected content types "
"Unexpected content type. Expected content types "
"are: {expected}"
).format(
type=content_type,
expected=valid_content_types
)
pecan.abort(415, m)

View File

@ -257,8 +257,7 @@ class NewSecretValidator(ValidatorBase):
mime_types.is_supported(content_type),
schema_name,
u._("payload_content_type is not one of {supported}"
).format(supplied=content_type,
supported=mime_types.SUPPORTED),
).format(supported=mime_types.SUPPORTED),
"payload_content_type")
return json_data
@ -316,9 +315,8 @@ class NewSecretValidator(ValidatorBase):
self._assert_validity(
mime_types.is_supported(content_type),
schema_name,
u._("payload_content_type {supplied} is not one of {supported}"
).format(supplied=content_type,
supported=mime_types.SUPPORTED),
u._("payload_content_type is not one of {supported}"
).format(supported=mime_types.SUPPORTED),
"payload_content_type")
self._assert_validity(

View File

@ -71,7 +71,7 @@ def create_csr_signed_with_wrong_key():
def create_bad_csr():
"""Generate a CSR that will not parse."""
return "Bad PKCS10 Data"
return b"Bad PKCS10 Data"
def create_csr_with_bad_subject_dn():

View File

@ -226,7 +226,7 @@ def get_encrypted_private_key_pem():
of the private_encrypted.pk8 file.
"""
return """-----BEGIN ENCRYPTED PRIVATE KEY-----
return b"""-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIE6TAbBgkqhkiG9w0BBQMwDgQIssadeQrYhhACAggABIIEyDNw3SV2b19yy4Q/
kTbtJ/p2X2zKDqr7GgLeAowqqhcMfvprI7G8C0XtwxkR4SjMZUXNcmOwQB2kNKtK
ZilCz6pSx81iUj4s1fU460XkhkIeV+F7aB2PsTG1oDfPCuzKFjT6EuSE6lFUH89r
@ -268,7 +268,7 @@ def get_passphrase_txt():
of the passphrase.txt file.
"""
return """password"""
return b"""password"""
def get_csr_pem():
@ -283,7 +283,7 @@ def get_csr_pem():
of the csr.pem file.
"""
return """-----BEGIN CERTIFICATE REQUEST-----
return b"""-----BEGIN CERTIFICATE REQUEST-----
MIICWzCCAUMCAQAwFjEUMBIGA1UEAwwLZXhhbXBsZS5jb20wggEiMA0GCSqGSIb3
DQEBAQUAA4IBDwAwggEKAoIBAQCza2VoDXmBUMmwjFu9F6MM5q/AZ1WjnWA2YNdN
y237TrGN/nobDDv8FBBpUPmHNZ04H1LyxFcP8ReFrcIXpifsReu2lAWaqRPxovu5

View File

@ -37,8 +37,8 @@ conf_multiple_backends_enabled = CONF.keymanager.\
class TestCase(oslotest.BaseTestCase):
max_payload_size = 10000
max_sized_payload = 'a' * max_payload_size
oversized_payload = 'a' * (max_payload_size + 1)
max_sized_payload = b'a' * max_payload_size
oversized_payload = b'a' * (max_payload_size + 1)
max_field_size = 255
max_sized_field = 'a' * max_field_size
oversized_field = 'a' * (max_field_size + 1)

View File

@ -14,7 +14,7 @@
# limitations under the License.
import base64
import copy
import json
from oslo_serialization import jsonutils
import time
from OpenSSL import crypto
@ -281,7 +281,7 @@ class CertificatesTestCase(base.TestCase):
order_resp.model.sub_status_message)
def confirm_error_message(self, resp, message):
resp_dict = json.loads(resp.content)
resp_dict = jsonutils.loads(resp.content)
self.assertEqual(message, resp_dict['description'])
@testtools.testcase.attr('positive')
@ -426,7 +426,7 @@ class CertificatesTestCase(base.TestCase):
create_resp, order_ref = self.behaviors.create_order(test_model)
self.assertEqual(400, create_resp.status_code)
self.assertIsNone(order_ref)
error_description = json.loads(create_resp.content)['description']
error_description = jsonutils.loads(create_resp.content)['description']
self.assertIn("Invalid PKCS10 Data", error_description)
@testtools.testcase.attr('negative')

View File

@ -71,7 +71,7 @@ create_container_rsa_data = {
accepted_str_values = {
'alphanumeric': ['a2j3j6ll9'],
'punctuation': ['~!@#$%^&*()_+`-={}[]|:;<>,.?'],
'len_255': [str(bytearray().zfill(255))],
'len_255': [base.TestCase.max_sized_field],
'uuid': ['54262d9d-4bc7-4821-8df0-dc2ca8e112bb'],
'empty': ['']
}

View File

@ -12,7 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from oslo_serialization import jsonutils
import sys
import time
@ -295,7 +295,7 @@ class OrdersTestCase(base.TestCase):
resp, order_ref = self.behaviors.create_order(test_model)
# Make sure we actually get a message back
error_msg = json.loads(resp.content).get('title')
error_msg = jsonutils.loads(resp.content).get('title')
self.assertEqual(400, resp.status_code)
self.assertIsNotNone(error_msg)
@ -322,7 +322,7 @@ class OrdersTestCase(base.TestCase):
self.assertIsNotNone(order_ref)
@utils.parameterized_dataset({
'negative_maxint': [-sys.maxint],
'negative_maxint': [-sys.maxsize],
'negative_7': [-7],
'negative_1': [-1],
'0': [0],

View File

@ -12,7 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from oslo_serialization import jsonutils
from testtools import testcase
import uuid
@ -105,7 +105,8 @@ class SecretMetadataTestCase(base.TestCase):
get_resp = self.behaviors.get_metadata(secret_ref)
self.assertEqual(200, get_resp.status_code)
self.assertEqual(json.loads(get_resp.content), self.valid_metadata)
self.assertEqual(jsonutils.loads(get_resp.content),
self.valid_metadata)
@testcase.attr('negative')
def test_secret_metadata_get_no_secret(self):
@ -166,7 +167,8 @@ class SecretMetadataTestCase(base.TestCase):
get_resp = self.behaviors.get_metadatum(secret_ref,
self.valid_metadatum_key)
self.assertEqual(200, get_resp.status_code)
self.assertEqual(json.loads(get_resp.content), self.valid_metadatum)
self.assertEqual(jsonutils.loads(get_resp.content),
self.valid_metadatum)
@testcase.attr('negative')
def test_secret_metadatum_get_wrong_key(self):

View File

@ -13,9 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import binascii
import json
from oslo_serialization import base64 as oslo_base64
from oslo_serialization import jsonutils
import six
import sys
import testtools
import time
@ -40,7 +40,7 @@ admin_b = CONF.rbac_users.admin_b
def get_pem_content(pem):
b64_content = translations.get_pem_components(pem)[1]
return base64.b64decode(b64_content)
return oslo_base64.decode_as_bytes(b64_content)
def get_private_key_req():
@ -50,7 +50,7 @@ def get_private_key_req():
'algorithm': 'rsa',
'bit_length': 2048,
'secret_type': 'private',
'payload': base64.b64encode(keys.get_private_key_pem())}
'payload': oslo_base64.encode_as_bytes(keys.get_private_key_pem())}
def get_public_key_req():
@ -60,7 +60,7 @@ def get_public_key_req():
'algorithm': 'rsa',
'bit_length': 2048,
'secret_type': 'public',
'payload': base64.b64encode(keys.get_public_key_pem())}
'payload': oslo_base64.encode_as_bytes(keys.get_public_key_pem())}
def get_certificate_req():
@ -70,7 +70,7 @@ def get_certificate_req():
'algorithm': 'rsa',
'bit_length': 2048,
'secret_type': 'certificate',
'payload': base64.b64encode(keys.get_certificate_pem())}
'payload': oslo_base64.encode_as_bytes(keys.get_certificate_pem())}
def get_passphrase_req():
@ -94,7 +94,7 @@ def get_default_data():
def get_default_payload():
return "AQIDBAUGBwgBAgMEBQYHCAECAwQFBgcIAQIDBAUGBwg="
return b"AQIDBAUGBwgBAgMEBQYHCAECAwQFBgcIAQIDBAUGBwg="
@utils.parameterized_test_case
@ -226,8 +226,8 @@ class SecretsTestCase(base.TestCase):
payload_content_type='',
omit_headers=['Accept'])
self.assertEqual(200, get_resp.status_code)
self.assertIn(test_model.payload,
binascii.b2a_base64(get_resp.content))
self.assertEqual(test_model.payload,
oslo_base64.encode_as_bytes(get_resp.content))
@testcase.attr('negative')
def test_secret_delete_doesnt_exist(self):
@ -297,7 +297,7 @@ class SecretsTestCase(base.TestCase):
"""
test_model = secret_models.SecretModel(
**self.default_secret_create_data)
test_model.payload = str(self.oversized_payload)
test_model.payload = self.oversized_payload
resp, secret_ref = self.behaviors.create_secret(test_model)
self.assertEqual(413, resp.status_code)
@ -399,11 +399,11 @@ class SecretsTestCase(base.TestCase):
Launchpad bug #1315498.
"""
oversized_payload = bytearray().zfill(self.max_payload_size + 1)
oversized_payload = bytearray(self.oversized_payload)
# put a value in the middle of the data that does not have a UTF-8
# code point. Using // to be python3-friendly.
oversized_payload[self.max_payload_size // 2] = b'\xb0'
# code point. Using // and 176 to be python3-friendly.
oversized_payload[self.max_payload_size // 2] = 176 # 0xb0
test_model = secret_models.SecretModel(
**self.default_secret_create_two_phase_data)
@ -415,7 +415,7 @@ class SecretsTestCase(base.TestCase):
secret_ref=secret_ref,
payload_content_type='application/octet-stream',
payload_content_encoding='base64',
payload=str(oversized_payload))
payload=oversized_payload)
self.assertEqual(413, put_resp.status_code)
@testcase.attr('negative')
@ -490,7 +490,7 @@ class SecretsTestCase(base.TestCase):
**self.default_secret_create_two_phase_data)
test_model.payload_content_encoding = 'base64'
test_model.payload_content_type = 'application/octet-stream'
test_model.payload = base64.b64encode('abcdef')
test_model.payload = oslo_base64.encode_as_bytes('abcdef')
resp, secret_ref = self.behaviors.create_secret(test_model)
self.assertEqual(201, resp.status_code)
@ -500,8 +500,8 @@ class SecretsTestCase(base.TestCase):
payload_content_type=test_model.payload_content_type,
payload_content_encoding=test_model.payload_content_encoding)
self.assertEqual(200, get_resp.status_code)
self.assertIn(test_model.payload,
binascii.b2a_base64(get_resp.content))
self.assertEqual(test_model.payload,
oslo_base64.encode_as_bytes(get_resp.content))
@testcase.attr('negative')
def test_secret_create_defaults_bad_content_type_check_message(self):
@ -515,11 +515,11 @@ class SecretsTestCase(base.TestCase):
# first, ensure that the return code is 400
self.assertEqual(400, resp.status_code)
resp_dict = json.loads(resp.content)
resp_dict = jsonutils.loads(resp.content)
self.assertIn(
"Provided object does not match schema 'Secret': "
"payload_content_type plain-text is not one of ['text/plain', "
"payload_content_type is not one of ['text/plain', "
"'text/plain;charset=utf-8', 'text/plain; charset=utf-8', "
"'application/octet-stream'", resp_dict['description'])
self.assertIn("Bad Request", resp_dict['title'])
@ -642,11 +642,11 @@ class SecretsTestCase(base.TestCase):
@testcase.attr('positive')
def test_secret_create_with_valid_bit_length(self, bit_length):
"""Covers cases of creating secrets with valid bit lengths."""
byte_length = bit_length / 8
byte_length = bit_length // 8
secret = bytearray(byte_length)
for x in range(0, byte_length):
secret[x] = x
secret64 = base64.b64encode(secret)
secret64 = oslo_base64.encode_as_bytes(secret)
test_model = secret_models.SecretModel(
**self.default_secret_create_data)
@ -660,7 +660,7 @@ class SecretsTestCase(base.TestCase):
'str_type': ['not-an-int'],
'empty': [''],
'blank': [' '],
'negative_maxint': [-sys.maxint],
'negative_maxint': [-sys.maxsize],
'negative_one': [-1],
'zero': [0]
})
@ -739,10 +739,10 @@ class SecretsTestCase(base.TestCase):
self.assertEqual(200, get_resp.status_code)
if payload_content_encoding == 'base64':
self.assertIn(test_model.payload,
binascii.b2a_base64(get_resp.content))
self.assertEqual(test_model.payload,
oslo_base64.encode_as_bytes(get_resp.content))
else:
self.assertIn(test_model.payload, get_resp.content)
self.assertEqual(test_model.payload, get_resp.content)
@utils.parameterized_dataset({
'text_content_type_none_encoding': {
@ -780,10 +780,10 @@ class SecretsTestCase(base.TestCase):
self.assertEqual(200, get_resp.status_code)
if payload_content_encoding == 'base64':
self.assertIn(test_model.payload,
binascii.b2a_base64(get_resp.content))
self.assertEqual(test_model.payload,
oslo_base64.encode_as_bytes(get_resp.content))
else:
self.assertIn(test_model.payload, get_resp.content)
self.assertEqual(test_model.payload, get_resp.content)
@utils.parameterized_dataset({
'empty_content_type_and_encoding': {
@ -885,8 +885,8 @@ class SecretsTestCase(base.TestCase):
'array': [['boom']],
'int': [123],
'none': [None],
'bad_character': [unichr(0x0080)],
'bad_characters': [unichr(0x1111) + unichr(0xffff)]
'bad_character': [six.unichr(0x0080)],
'bad_characters': [six.unichr(0x1111) + six.unichr(0xffff)]
})
@testcase.attr('negative')
def test_secret_create_defaults_invalid_payload(self, payload):
@ -998,7 +998,7 @@ class SecretsTestCase(base.TestCase):
@utils.parameterized_dataset({
'symmetric': ['symmetric',
base64.b64decode(
oslo_base64.decode_as_bytes(
get_default_payload()),
get_default_data()],
'private': ['private',
@ -1011,7 +1011,7 @@ class SecretsTestCase(base.TestCase):
keys.get_certificate_pem(),
get_certificate_req()],
'passphrase': ['passphrase',
'mysecretpassphrase',
b'mysecretpassphrase',
get_passphrase_req()]
})
@testcase.attr('positive')
@ -1180,7 +1180,7 @@ class SecretsUnauthedTestCase(base.TestCase):
stored_auth = self.client._auth[
self.client._default_user_name].stored_auth
project_id = stored_auth.values()[0]['project_id']
project_id = list(stored_auth.values())[0]['project_id']
self.project_id_header = {
'X-Project-Id': project_id
}
@ -1532,7 +1532,7 @@ class SecretsMultipleBackendTestCase(base.TestCase):
'symmetric_type_preferred_store': [
admin_a,
'symmetric',
base64.b64decode(get_default_payload()),
oslo_base64.decode_as_bytes(get_default_payload()),
get_default_data()
],
'private_type_preferred_store': [
@ -1562,7 +1562,7 @@ class SecretsMultipleBackendTestCase(base.TestCase):
'symmetric_type_no_preferred_store': [
admin_b,
'symmetric',
base64.b64decode(get_default_payload()),
oslo_base64.decode_as_bytes(get_default_payload()),
get_default_data()
],
'private_type_no_preferred_store': [
@ -1586,7 +1586,7 @@ class SecretsMultipleBackendTestCase(base.TestCase):
'passphrase_type_no_preferred_store': [
admin_b,
'passphrase',
'mysecretpassphrase',
b'mysecretpassphrase',
get_passphrase_req()
],
})

View File

@ -13,8 +13,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import json
import logging
from oslo_serialization import jsonutils
LOG = logging.getLogger(__name__)
@ -34,8 +34,7 @@ class BaseModel(object):
:return: A string of JSON containing the fields in this object
"""
return json.dumps(self.obj_to_dict())
return jsonutils.dump_as_bytes(self.obj_to_dict())
def obj_to_dict(self):
"""Create a dict of the values for this model object.
@ -75,7 +74,7 @@ class BaseModel(object):
:return: a secret object
"""
try:
json_dict = json.loads(serialized_str)
json_dict = jsonutils.loads(serialized_str)
return cls.dict_to_obj(json_dict)
except TypeError as e:
LOG.error('Couldn\'t deserialize input: %s\n Because: %s',

View File

@ -27,7 +27,7 @@ create_secret_defaults_data = {
"algorithm": "aes",
"bit_length": 256,
"mode": "cbc",
"payload": "gF6+lLoF3ohA9aPRpt+6bQ==",
"payload": b"gF6+lLoF3ohA9aPRpt+6bQ==",
"payload_content_type": "application/octet-stream",
"payload_content_encoding": "base64",
}
@ -155,7 +155,7 @@ class ContainersTestCase(base.TestCase):
@utils.parameterized_dataset({
'alphanumeric': ['a2j3j6ll9'],
'punctuation': ['~!@#$%^&*()_+`-={}[]|:;<>,.?'],
'len_255': [str(bytearray().zfill(255))],
'len_255': [base.TestCase.max_sized_field],
'uuid': ['54262d9d-4bc7-4821-8df0-dc2ca8e112bb'],
'empty': ['']
})

View File

@ -12,9 +12,9 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import binascii
import copy
from oslo_serialization import base64 as oslo_base64
from testtools import testcase
from barbican.tests import utils
@ -28,7 +28,7 @@ default_secret_create_data = {
"algorithm": "aes",
"bit_length": 256,
"mode": "cbc",
"payload": "gF6+lLoF3ohA9aPRpt+6bQ==",
"payload": b"gF6+lLoF3ohA9aPRpt+6bQ==",
"payload_content_type": "application/octet-stream",
"payload_content_encoding": "base64",
}
@ -54,7 +54,7 @@ default_secret_create_emptystrings_data = {
"algorithm": '',
"bit_length": '',
"mode": '',
"payload": '',
"payload": b'',
"payload_content_type": '',
"payload_content_encoding": '',
}
@ -102,13 +102,13 @@ class SecretsTestCase(base.TestCase):
resp = self.behaviors.get_secret_metadata(secret_ref)
self.assertEqual(200, resp.status_code)
self.assertEqual("ACTIVE", resp.model.status)
self.assertGreater(resp.model.secret_ref, 0)
self.assertTrue(resp.model.secret_ref)
@utils.parameterized_dataset({
'alphanumeric': ['1f34ds'],
'punctuation': ['~!@#$%^&*()_+`-={}[]|:;<>,.?'],
'uuid': ['54262d9d-4bc7-4821-8df0-dc2ca8e112bb'],
'len_255': [str(bytearray().zfill(255))],
'len_255': [base.TestCase.max_sized_field],
'empty': [''],
'null': [None]
})
@ -175,8 +175,8 @@ class SecretsTestCase(base.TestCase):
get_resp = self.behaviors.get_secret(secret_ref,
test_model.payload_content_type)
self.assertEqual(200, get_resp.status_code)
self.assertIn(test_model.payload,
binascii.b2a_base64(get_resp.content))
self.assertEqual(test_model.payload,
oslo_base64.encode_as_bytes(get_resp.content))
@testcase.attr('positive')
def test_secret_update_two_phase(self):
@ -189,7 +189,7 @@ class SecretsTestCase(base.TestCase):
self.assertEqual(201, resp.status_code)
# Update
payload = "gF6+lLoF3ohA9aPRpt+6bQ=="
payload = b"gF6+lLoF3ohA9aPRpt+6bQ=="
payload_content_type = "application/octet-stream"
payload_content_encoding = "base64"
@ -204,8 +204,8 @@ class SecretsTestCase(base.TestCase):
secret_ref=secret_ref,
payload_content_type=payload_content_type)
self.assertEqual(200, sec_resp.status_code)
self.assertIn('gF6+lLoF3ohA9aPRpt+6bQ==',
binascii.b2a_base64(sec_resp.content))
self.assertEqual(b'gF6+lLoF3ohA9aPRpt+6bQ==',
oslo_base64.encode_as_bytes(sec_resp.content))
@testcase.attr('positive')
def test_secrets_get_multiple_secrets(self):

View File

@ -16,7 +16,9 @@ limitations under the License.
import logging
import os
from oslo_serialization import base64
import requests
import six
from six.moves import urllib
from tempest.lib.common.utils import test_utils
@ -86,6 +88,17 @@ class BarbicanClient(object):
retval.append(username)
return retval
def _attempt_ascii(self, text):
"""Attempt to decode to ascii, works with py27 and py35
Throw an encode or decode exception is text can not be
presented in ascii.
"""
if isinstance(text, six.text_type):
return text.encode('ascii')
else:
return text.decode('ascii')
def _attempt_to_stringify_content(self, content, content_tag):
if content is None:
return content
@ -93,12 +106,12 @@ class BarbicanClient(object):
# NOTE(jaosorior): The content is decoded as ascii since the
# logging module has problems with utf-8 strings and will end up
# trying to decode this as ascii.
return content.decode('ascii')
except UnicodeDecodeError:
return self._attempt_ascii(content)
except (UnicodeDecodeError, UnicodeEncodeError):
# NOTE(jaosorior): Since we are using base64 as default and this is
# only for logging (in order to debug); Lets not put too much
# effort in this and just use encoded string.
return content.encode('base64')
return base64.encode_as_text(content)
def stringify_request(self, request_kwargs, response):
format_kwargs = {

View File

@ -73,6 +73,15 @@ commands =
/bin/bash {toxinidir}/functionaltests/pretty_tox.sh '{posargs}'
passenv = KMIP_PLUGIN_ENABLED
[testenv:py35functional]
basepython = python3
deps = -r{toxinidir}/test-requirements.txt
setenv = OS_TEST_PATH={toxinidir}/functionaltests
commands =
/usr/bin/find . -type f -name "*.py[c|o]" -delete
/bin/bash {toxinidir}/functionaltests/pretty_tox.sh '{posargs}'
passenv = KMIP_PLUGIN_ENABLED
[testenv:cmd]
# This tox env is purely to make local test development easier
# Note: This requires local running instances of Barbican and Keystone