Merge "[part1] Remove six"

This commit is contained in:
Zuul 2021-02-25 17:09:59 +00:00 committed by Gerrit Code Review
commit 35bc9bef6a
17 changed files with 36 additions and 49 deletions

View File

@ -16,7 +16,6 @@
import time
from oslo_log import log
import six
from tempest.api.compute import base
from tempest.common import compute
@ -241,7 +240,7 @@ class AttachInterfacesTestJSON(AttachInterfacesTestBase):
except lib_exc.BadRequest as e:
msg = ('Multiple possible networks found, use a Network ID to be '
'more specific.')
if not CONF.compute.fixed_network_name and six.text_type(e) == msg:
if not CONF.compute.fixed_network_name and str(e) == msg:
raise
else:
ifs.append(iface)
@ -450,7 +449,7 @@ class AttachInterfacesV270Test(AttachInterfacesTestBase):
except lib_exc.BadRequest as e:
msg = ('Multiple possible networks found, use a Network ID to be '
'more specific.')
if not CONF.compute.fixed_network_name and six.text_type(e) == msg:
if not CONF.compute.fixed_network_name and str(e) == msg:
raise
else:
# just to check the response schema

View File

@ -69,7 +69,7 @@ class NoVNCConsoleTestJSON(base.BaseV2ComputeTest):
resp = urllib3.PoolManager().request('GET', vnc_url)
# Make sure that the GET request was accepted by the novncproxy
self.assertEqual(resp.status, 200, 'Got a Bad HTTP Response on the '
'initial call: ' + six.text_type(resp.status))
'initial call: ' + str(resp.status))
# Do some basic validation to make sure it is an expected HTML document
resp_data = resp.data.decode()
# This is needed in the case of example: <html lang="en">
@ -165,11 +165,11 @@ class NoVNCConsoleTestJSON(base.BaseV2ComputeTest):
self._websocket.response.startswith(b'HTTP/1.1 101 Switching '
b'Protocols'),
'Incorrect HTTP return status code: {}'.format(
six.text_type(self._websocket.response)
str(self._websocket.response)
)
)
_required_header = 'upgrade: websocket'
_response = six.text_type(self._websocket.response).lower()
_response = str(self._websocket.response).lower()
self.assertIn(
_required_header,
_response,

View File

@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
from tempest.api.identity import base
from tempest import config
from tempest.lib.common.utils import data_utils
@ -70,8 +68,8 @@ class TokensV3TestJSON(base.BaseIdentityV3AdminTest):
orig_expires_at = token_auth['token']['expires_at']
orig_user = token_auth['token']['user']
self.assertIsInstance(token_auth['token']['expires_at'], six.text_type)
self.assertIsInstance(token_auth['token']['issued_at'], six.text_type)
self.assertIsInstance(token_auth['token']['expires_at'], str)
self.assertIsInstance(token_auth['token']['issued_at'], str)
self.assertEqual(['password'], token_auth['token']['methods'])
self.assertEqual(user['id'], token_auth['token']['user']['id'])
self.assertEqual(user['name'], token_auth['token']['user']['name'])
@ -91,7 +89,7 @@ class TokensV3TestJSON(base.BaseIdentityV3AdminTest):
self.assertEqual(orig_expires_at, token_auth['token']['expires_at'],
'Expiration time should match original token')
self.assertIsInstance(token_auth['token']['issued_at'], six.text_type)
self.assertIsInstance(token_auth['token']['issued_at'], str)
self.assertEqual(set(['password', 'token']),
set(token_auth['token']['methods']))
self.assertEqual(orig_user, token_auth['token']['user'],

View File

@ -16,7 +16,6 @@
import ipaddress
import netaddr
import six
import testtools
from tempest.api.network import base_security_groups as sec_base
@ -234,15 +233,15 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
# Get two IP addresses
ip_address_1 = None
ip_address_2 = None
ip_network = ipaddress.ip_network(six.text_type(subnet['cidr']))
ip_network = ipaddress.ip_network(str(subnet['cidr']))
for ip in ip_network:
if ip == ip_network.network_address:
continue
if ip_address_1 is None:
ip_address_1 = six.text_type(ip)
ip_address_1 = str(ip)
else:
ip_address_2 = ip_address_1
ip_address_1 = six.text_type(ip)
ip_address_1 = str(ip)
# Make sure these two IP addresses have different substring
if ip_address_1[:-1] != ip_address_2[:-1]:
break

View File

@ -62,7 +62,7 @@ class AccountTest(base.BaseObjectTest):
self.assertIsNotNone(container_list)
for container_name in self.containers:
self.assertIn(six.text_type(container_name).encode('utf-8'),
self.assertIn(str(container_name).encode('utf-8'),
container_list)
@decorators.idempotent_id('884ec421-fbad-4fcc-916b-0580f2699565')

View File

@ -341,7 +341,7 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
tenant_id=tenant_id,
enable_dhcp=self.network_resources['dhcp'],
ip_version=(ipaddress.ip_network(
six.text_type(subnet_cidr)).version))
str(subnet_cidr)).version))
else:
resp_body = self.subnets_admin_client.\
create_subnet(network_id=network_id,
@ -349,7 +349,7 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
name=subnet_name,
tenant_id=tenant_id,
ip_version=(ipaddress.ip_network(
six.text_type(subnet_cidr)).version))
str(subnet_cidr)).version))
break
except lib_exc.BadRequest as e:
if 'overlaps with another subnet' not in str(e):

View File

@ -15,7 +15,6 @@
import jsonschema
from oslo_serialization import base64
from oslo_utils import timeutils
import six
# JSON Schema validator and format checker used for JSON Schema validation
JSONSCHEMA_VALIDATOR = jsonschema.Draft4Validator
@ -43,7 +42,7 @@ def _validate_datetime_format(instance):
@jsonschema.FormatChecker.cls_checks('base64')
def _validate_base64_format(instance):
try:
if isinstance(instance, six.text_type):
if isinstance(instance, str):
instance = instance.encode('utf-8')
base64.decode_as_bytes(instance)
except TypeError:

View File

@ -17,7 +17,6 @@ import os
from oslo_concurrency import lockutils
from oslo_log import log as logging
import six
import yaml
from tempest.lib import auth
@ -138,7 +137,7 @@ class PreProvisionedCredentialProvider(cred_provider.CredentialProvider):
temp_hash = hashlib.md5()
account_for_hash = dict((k, v) for (k, v) in account.items()
if k in cls.HASH_CRED_FIELDS)
temp_hash.update(six.text_type(account_for_hash).encode('utf-8'))
temp_hash.update(str(account_for_hash).encode('utf-8'))
temp_hash_key = temp_hash.hexdigest()
hash_dict['creds'][temp_hash_key] = account
for role in roles:
@ -391,7 +390,7 @@ class PreProvisionedCredentialProvider(cred_provider.CredentialProvider):
def get_creds_by_roles(self, roles, force_new=False):
roles = list(set(roles))
exist_creds = self._creds.get(six.text_type(roles).encode(
exist_creds = self._creds.get(str(roles).encode(
'utf-8'), None)
# The force kwarg is used to allocate an additional set of creds with
# the same role list. The index used for the previously allocation
@ -401,11 +400,11 @@ class PreProvisionedCredentialProvider(cred_provider.CredentialProvider):
elif exist_creds and force_new:
# NOTE(andreaf) In py3.x encode returns bytes, and b'' is bytes
# In py2.7 encode returns strings, and b'' is still string
new_index = six.text_type(roles).encode('utf-8') + b'-' + \
six.text_type(len(self._creds)).encode('utf-8')
new_index = str(roles).encode('utf-8') + b'-' + \
str(len(self._creds)).encode('utf-8')
self._creds[new_index] = exist_creds
net_creds = self._get_creds(roles=roles)
self._creds[six.text_type(roles).encode('utf-8')] = net_creds
self._creds[str(roles).encode('utf-8')] = net_creds
return net_creds
def clear_creds(self):

View File

@ -418,7 +418,7 @@ class RestClient(object):
def _safe_body(self, body, maxlen=4096):
# convert a structure into a string safely
try:
text = six.text_type(body)
text = str(body)
except UnicodeDecodeError:
# if this isn't actually text, return marker that
return "<BinaryData: removed>"

View File

@ -33,7 +33,7 @@ class OAUTHTokenClient(rest_client.RestClient):
def _escape(self, s):
"""Escape a unicode string in an OAuth-compatible fashion."""
safe = b'~'
s = s.encode('utf-8') if isinstance(s, six.text_type) else s
s = s.encode('utf-8') if isinstance(s, str) else s
s = urlparse.quote(s, safe)
if isinstance(s, six.binary_type):
s = s.decode('utf-8')
@ -47,8 +47,8 @@ class OAUTHTokenClient(rest_client.RestClient):
verifier=None,
http_method='GET'):
"""Generate OAUTH params along with signature."""
timestamp = six.text_type(int(time.time()))
nonce = six.text_type(random.getrandbits(64)) + timestamp
timestamp = str(int(time.time()))
nonce = str(random.getrandbits(64)) + timestamp
oauth_params = [
('oauth_nonce', nonce),
('oauth_timestamp', timestamp),

View File

@ -15,7 +15,6 @@
from unittest import mock
from oslo_utils import uuidutils
import six
from tempest.api.compute import base as compute_base
from tempest.common import waiters
@ -128,9 +127,9 @@ class TestBaseV2ComputeTest(base.TestCase):
mock.sentinel.server_id, wait_until='active')
# make our assertions
if fault:
self.assertIn(fault, six.text_type(ex))
self.assertIn(fault, str(ex))
else:
self.assertNotIn(fault, six.text_type(ex))
self.assertNotIn(fault, str(ex))
if compute_base.BaseV2ComputeTest.is_requested_microversion_compatible(
'2.35'):
status = 'ACTIVE'

View File

@ -21,7 +21,6 @@ import tempfile
from unittest import mock
import fixtures
import six
from tempest.cmd import run
from tempest.cmd import workspace
@ -153,12 +152,12 @@ class TestRunReturnCode(base.TestCase):
subprocess.call(['stestr', 'init'])
out, err = self.assertRunExit(['tempest', 'run', '-l'], 0)
tests = out.split()
tests = sorted([six.text_type(x.rstrip()) for x in tests if x])
tests = sorted([str(x.rstrip()) for x in tests if x])
result = [
six.text_type('tests.test_failing.FakeTestClass.test_pass'),
six.text_type('tests.test_failing.FakeTestClass.test_pass_list'),
six.text_type('tests.test_passing.FakeTestClass.test_pass'),
six.text_type('tests.test_passing.FakeTestClass.test_pass_list'),
str('tests.test_failing.FakeTestClass.test_pass'),
str('tests.test_failing.FakeTestClass.test_pass_list'),
str('tests.test_passing.FakeTestClass.test_pass'),
str('tests.test_passing.FakeTestClass.test_pass_list'),
]
# NOTE(mtreinish): on python 3 the subprocess prints b'' around
# stdout.

View File

@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
import testtools
from tempest.lib.common import api_version_utils
@ -31,7 +30,7 @@ class TestVersionSkipLogic(base.TestCase):
cfg_max_version)
except testtools.TestCase.skipException as e:
if not expected_skip:
raise testtools.TestCase.failureException(six.text_type(e))
raise testtools.TestCase.failureException(str(e))
def test_version_min_in_range(self):
self._test_version('2.2', '2.10', '2.1', '2.7')

View File

@ -17,7 +17,6 @@ import os
import shutil
from unittest import mock
import six
import testtools
import fixtures
@ -109,7 +108,7 @@ class TestPreProvisionedCredentials(base.TestCase):
hash = hashlib.md5()
account_for_hash = dict((k, v) for (k, v) in account.items()
if k in hash_fields)
hash.update(six.text_type(account_for_hash).encode('utf-8'))
hash.update(str(account_for_hash).encode('utf-8'))
temp_hash = hash.hexdigest()
hash_list.append(temp_hash)
return hash_list

View File

@ -274,7 +274,7 @@ class TestSshClient(base.TestCase):
client = ssh.Client('localhost', 'root', timeout=2)
exc = self.assertRaises(exceptions.SSHExecCommandFailed,
client.exec_command, "test")
self.assertIn('R' + self._utf8_string, six.text_type(exc))
self.assertIn('R' + self._utf8_string, str(exc))
def test_exec_command_no_select(self):
gsc_mock = self.patch('tempest.lib.common.ssh.Client.'

View File

@ -16,8 +16,6 @@ import os
import re
import subprocess
import six
from tempest.tests import base
@ -32,7 +30,7 @@ class TestTestList(base.TestCase):
self.assertEqual(0, p.returncode,
"test discovery failed, one or more files cause an "
"error on import %s" % ids)
ids = six.text_type(ids).split('\n')
ids = str(ids).split('\n')
for test_id in ids:
if re.match(r'(\w+\.){3}\w+', test_id):
if not test_id.startswith('tempest.'):

View File

@ -13,7 +13,6 @@
# under the License.
from oslo_config import cfg
import six
import testtools
from tempest.api.compute import base as compute_base
@ -75,7 +74,7 @@ class TestMicroversionsTestsClass(base.TestCase):
self.assertRaises(testtools.TestCase.skipException,
test_class.skip_checks)
except testtools.TestCase.skipException as e:
raise testtools.TestCase.failureException(six.text_type(e))
raise testtools.TestCase.failureException(str(e))
def test_config_version_none_none(self):
expected_pass_tests = [VersionTestNoneTolatest, VersionTestNoneTo2_2]