Remove six from common module

This is part of the steps to remove usage of six library, which is no
longer needed since python 2 support was removed.

Change-Id: I14ebd809b39079d06a8ecc8f747b6bb80d550acb
This commit is contained in:
Takashi Kajinami 2024-02-17 11:57:06 +09:00
parent 91f181e3ad
commit 627b9a8260
8 changed files with 27 additions and 44 deletions

View File

@ -17,13 +17,10 @@ Certificate manager API
""" """
import abc import abc
import six
from magnum.common.x509 import operations from magnum.common.x509 import operations
@six.add_metaclass(abc.ABCMeta) class Cert(object, metaclass=abc.ABCMeta):
class Cert(object):
"""Base class to represent all certificates.""" """Base class to represent all certificates."""
@abc.abstractmethod @abc.abstractmethod
@ -52,8 +49,7 @@ class Cert(object):
pass pass
@six.add_metaclass(abc.ABCMeta) class CertManager(object, metaclass=abc.ABCMeta):
class CertManager(object):
"""Base Cert Manager Interface """Base Cert Manager Interface
A Cert Manager is responsible for managing certificates for TLS. A Cert Manager is responsible for managing certificates for TLS.

View File

@ -14,7 +14,6 @@
from magnum.common.cert_manager import cert_manager from magnum.common.cert_manager import cert_manager
from magnum import objects from magnum import objects
import six
class Cert(cert_manager.Cert): class Cert(cert_manager.Cert):
@ -59,9 +58,9 @@ class CertManager(cert_manager.CertManager):
:returns: the UUID of the stored cert :returns: the UUID of the stored cert
""" """
if six.PY3 and isinstance(certificate, six.binary_type): if isinstance(certificate, bytes):
certificate = certificate.decode() certificate = certificate.decode()
if six.PY3 and isinstance(private_key, six.binary_type): if isinstance(private_key, bytes):
private_key = private_key.decode() private_key = private_key.decode()
x509keypair = {'certificate': certificate, 'private_key': private_key, x509keypair = {'certificate': certificate, 'private_key': private_key,
'private_key_passphrase': private_key_passphrase, 'private_key_passphrase': private_key_passphrase,

View File

@ -24,7 +24,6 @@ import sys
from keystoneclient import exceptions as keystone_exceptions from keystoneclient import exceptions as keystone_exceptions
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import six
import magnum.conf import magnum.conf
from magnum.i18n import _ from magnum.i18n import _
@ -104,9 +103,7 @@ class MagnumException(Exception):
super(MagnumException, self).__init__(self.message) super(MagnumException, self).__init__(self.message)
def __str__(self): def __str__(self):
if six.PY3: return self.message
return self.message
return self.message.encode('utf-8')
def __unicode__(self): def __unicode__(self):
return self.message return self.message
@ -115,7 +112,7 @@ class MagnumException(Exception):
if self.__class__.__name__.endswith('_Remote'): if self.__class__.__name__.endswith('_Remote'):
return self.args[0] return self.args[0]
else: else:
return six.text_type(self) return str(self)
class ObjectNotFound(MagnumException): class ObjectNotFound(MagnumException):

View File

@ -19,8 +19,6 @@ The IDs each comprise 12 (lower-case) alphanumeric characters.
import base64 import base64
import uuid import uuid
import six
from magnum.i18n import _ from magnum.i18n import _
@ -30,7 +28,7 @@ def _to_byte_string(value, num_bits):
Padding is added at the end (i.e. after the least-significant bit) if Padding is added at the end (i.e. after the least-significant bit) if
required. required.
""" """
shifts = six.moves.xrange(num_bits - 8, -8, -8) shifts = range(num_bits - 8, -8, -8)
byte_at = lambda off: ((value >> off # noqa: E731 byte_at = lambda off: ((value >> off # noqa: E731
if off >= 0 else value << -off) & 0xff) if off >= 0 else value << -off) & 0xff)
return ''.join(chr(byte_at(offset)) for offset in shifts) return ''.join(chr(byte_at(offset)) for offset in shifts)
@ -41,7 +39,7 @@ def get_id(source_uuid):
The supplied UUID must be a version 4 UUID object. The supplied UUID must be a version 4 UUID object.
""" """
if isinstance(source_uuid, six.string_types): if isinstance(source_uuid, str):
source_uuid = uuid.UUID(source_uuid) source_uuid = uuid.UUID(source_uuid)
if source_uuid.version != 4: if source_uuid.version != 4:
raise ValueError(_('Invalid UUID version (%d)') % source_uuid.version) raise ValueError(_('Invalid UUID version (%d)') % source_uuid.version)
@ -50,12 +48,9 @@ def get_id(source_uuid):
# (see RFC4122, Section 4.4) # (see RFC4122, Section 4.4)
random_bytes = _to_byte_string(source_uuid.time, 60) random_bytes = _to_byte_string(source_uuid.time, 60)
# The first 12 bytes (= 60 bits) of base32-encoded output is our data # The first 12 bytes (= 60 bits) of base32-encoded output is our data
encoded = base64.b32encode(six.b(random_bytes))[:12] encoded = base64.b32encode(random_bytes.encode('latin-1'))[:12]
if six.PY3: return encoded.lower().decode('utf-8')
return encoded.lower().decode('utf-8')
else:
return encoded.lower()
def generate_id(): def generate_id():

View File

@ -12,11 +12,11 @@
# under the License. # under the License.
"""Utility for fetching a resource (e.g. a manifest) from a URL.""" """Utility for fetching a resource (e.g. a manifest) from a URL."""
import urllib
from oslo_log import log as logging from oslo_log import log as logging
import requests import requests
from requests import exceptions from requests import exceptions
from six.moves import urllib
from magnum.common import exception from magnum.common import exception
import magnum.conf import magnum.conf

View File

@ -29,7 +29,6 @@ import netaddr
from oslo_concurrency import processutils from oslo_concurrency import processutils
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import netutils from oslo_utils import netutils
import six
from magnum.common import exception from magnum.common import exception
import magnum.conf import magnum.conf
@ -164,7 +163,7 @@ def safe_rstrip(value, chars=None):
:return: Stripped value. :return: Stripped value.
""" """
if not isinstance(value, six.string_types): if not isinstance(value, str):
LOG.warning("Failed to remove trailing character. " LOG.warning("Failed to remove trailing character. "
"Returning original object. " "Returning original object. "
"Supplied object is not a string: %s,", value) "Supplied object is not a string: %s,", value)

View File

@ -13,7 +13,6 @@
# under the License. # under the License.
import datetime import datetime
import six
import uuid import uuid
from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import rsa
@ -104,10 +103,10 @@ def _generate_certificate(issuer_name, subject_name, extensions,
organization_name=None, ca_key=None, organization_name=None, ca_key=None,
encryption_password=None, ca_key_password=None): encryption_password=None, ca_key_password=None):
if not isinstance(subject_name, six.text_type): if not isinstance(subject_name, str):
subject_name = six.text_type(subject_name.decode('utf-8')) subject_name = subject_name.decode('utf-8')
if organization_name and not isinstance(organization_name, six.text_type): if organization_name and not isinstance(organization_name, str):
organization_name = six.text_type(organization_name.decode('utf-8')) organization_name = organization_name.decode('utf-8')
private_key = rsa.generate_private_key( private_key = rsa.generate_private_key(
public_exponent=65537, public_exponent=65537,
@ -132,8 +131,8 @@ def _generate_certificate(issuer_name, subject_name, extensions,
csr = csr.sign(private_key, hashes.SHA256()) csr = csr.sign(private_key, hashes.SHA256())
if six.PY3 and isinstance(encryption_password, six.text_type): if isinstance(encryption_password, str):
encryption_password = encryption_password.encode() encryption_password = encryption_password.encode('latin-1')
if encryption_password: if encryption_password:
encryption_algorithm = serialization.BestAvailableEncryption( encryption_algorithm = serialization.BestAvailableEncryption(
@ -161,10 +160,10 @@ def _generate_certificate(issuer_name, subject_name, extensions,
def _load_pem_private_key(ca_key, ca_key_password=None): def _load_pem_private_key(ca_key, ca_key_password=None):
if not isinstance(ca_key, rsa.RSAPrivateKey): if not isinstance(ca_key, rsa.RSAPrivateKey):
if isinstance(ca_key, six.text_type): if isinstance(ca_key, str):
ca_key = six.b(str(ca_key)) ca_key = ca_key.encode('latin-1')
if isinstance(ca_key_password, six.text_type): if isinstance(ca_key_password, str):
ca_key_password = six.b(str(ca_key_password)) ca_key_password = ca_key_password.encode('latin-1')
ca_key = serialization.load_pem_private_key( ca_key = serialization.load_pem_private_key(
ca_key, ca_key,
@ -188,11 +187,11 @@ def sign(csr, issuer_name, ca_key, ca_key_password=None,
ca_key = _load_pem_private_key(ca_key, ca_key_password) ca_key = _load_pem_private_key(ca_key, ca_key_password)
if not isinstance(issuer_name, six.text_type): if not isinstance(issuer_name, str):
issuer_name = six.text_type(issuer_name.decode('utf-8')) issuer_name = issuer_name.decode('utf-8')
if isinstance(csr, six.text_type): if isinstance(csr, str):
csr = six.b(str(csr)) csr = csr.encode('latin-1')
if not isinstance(csr, x509.CertificateSigningRequest): if not isinstance(csr, x509.CertificateSigningRequest):
try: try:
csr = x509.load_pem_x509_csr(csr) csr = x509.load_pem_x509_csr(csr)

View File

@ -208,8 +208,7 @@ class TestX509(base.BaseTestCase):
self.assertIsInstance(private_key, rsa.RSAPrivateKey) self.assertIsInstance(private_key, rsa.RSAPrivateKey)
@mock.patch('cryptography.x509.load_pem_x509_csr') @mock.patch('cryptography.x509.load_pem_x509_csr')
@mock.patch('six.b') def test_sign_with_unicode_csr(self, mock_load_pem):
def test_sign_with_unicode_csr(self, mock_six, mock_load_pem):
ca_key = self._generate_private_key() ca_key = self._generate_private_key()
private_key = self._generate_private_key() private_key = self._generate_private_key()
csr_obj = self._build_csr(private_key) csr_obj = self._build_csr(private_key)
@ -219,7 +218,6 @@ class TestX509(base.BaseTestCase):
mock_load_pem.return_value = csr_obj mock_load_pem.return_value = csr_obj
operations.sign(csr, self.issuer_name, ca_key, operations.sign(csr, self.issuer_name, ca_key,
skip_validation=True) skip_validation=True)
mock_six.assert_called_once_with(csr)
@mock.patch('cryptography.x509.load_pem_x509_csr') @mock.patch('cryptography.x509.load_pem_x509_csr')
def test_sign_empty_chars(self, mock_load_pem): def test_sign_empty_chars(self, mock_load_pem):