Remove six use from cloudbaseinit

Change-Id: Ia4b4add954ee5192dbf437b415d8e698c0a271b8
This commit is contained in:
Jakub Kulik 2024-01-25 08:48:39 +00:00 committed by Jakub Kulík
parent 07cae6e8c7
commit a40f5a1508
58 changed files with 106 additions and 181 deletions

View File

@ -14,11 +14,8 @@
import abc import abc
import six
class Options(object, metaclass=abc.ABCMeta):
@six.add_metaclass(abc.ABCMeta)
class Options(object):
"""Contact class for all the collections of config options.""" """Contact class for all the collections of config options."""

View File

@ -13,13 +13,13 @@
# under the License. # under the License.
import contextlib import contextlib
import io
import os import os
import socket import socket
import time import time
from xml.etree import ElementTree from xml.etree import ElementTree
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
import six
import untangle import untangle
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
@ -115,13 +115,13 @@ class AzureService(base.BaseHTTPMetadataService):
path, data_xml, headers=all_headers)) path, data_xml, headers=all_headers))
if parse_xml: if parse_xml:
return untangle.parse(six.StringIO(encoding.get_as_string(data))) return untangle.parse(io.StringIO(encoding.get_as_string(data)))
else: else:
return data return data
@staticmethod @staticmethod
def _encode_xml(xml_root): def _encode_xml(xml_root):
bio = six.BytesIO() bio = io.BytesIO()
ElementTree.ElementTree(xml_root).write( ElementTree.ElementTree(xml_root).write(
bio, encoding='utf-8', xml_declaration=True) bio, encoding='utf-8', xml_declaration=True)
return bio.getvalue() return bio.getvalue()

View File

@ -21,7 +21,6 @@ import time
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
import requests import requests
import six
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import exception from cloudbaseinit import exception
@ -35,8 +34,7 @@ class NotExistingMetadataException(Exception):
pass pass
@six.add_metaclass(abc.ABCMeta) class BaseMetadataService(object, metaclass=abc.ABCMeta):
class BaseMetadataService(object):
_GZIP_MAGIC_NUMBER = b'\x1f\x8b' _GZIP_MAGIC_NUMBER = b'\x1f\x8b'
def __init__(self): def __init__(self):

View File

@ -13,11 +13,11 @@
# under the License. # under the License.
import contextlib import contextlib
import http.client
import posixpath import posixpath
import urllib
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
from six.moves import http_client
from six.moves import urllib
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.metadata.services import base from cloudbaseinit.metadata.services import base
@ -131,12 +131,12 @@ class CloudStack(base.BaseHTTPMetadataService):
def _password_client(self, body=None, headers=None, decode=True): def _password_client(self, body=None, headers=None, decode=True):
"""Client for the Password Server.""" """Client for the Password Server."""
port = CONF.cloudstack.password_server_port port = CONF.cloudstack.password_server_port
with contextlib.closing(http_client.HTTPConnection( with contextlib.closing(http.client.HTTPConnection(
self._metadata_host, port, timeout=TIMEOUT)) as connection: self._metadata_host, port, timeout=TIMEOUT)) as connection:
try: try:
connection.request("GET", "/", body=body, headers=headers) connection.request("GET", "/", body=body, headers=headers)
response = connection.getresponse() response = connection.getresponse()
except http_client.HTTPException as exc: except http.client.HTTPException as exc:
LOG.error("Request failed: %s", exc) LOG.error("Request failed: %s", exc)
raise raise
@ -145,7 +145,7 @@ class CloudStack(base.BaseHTTPMetadataService):
content = encoding.get_as_string(content) content = encoding.get_as_string(content)
if response.status != 200: if response.status != 200:
raise http_client.HTTPException( raise http.client.HTTPException(
"%(status)s %(reason)s - %(message)r", "%(status)s %(reason)s - %(message)r",
{"status": response.status, "reason": response.reason, {"status": response.status, "reason": response.reason,
"message": content}) "message": content})

View File

@ -12,8 +12,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from urllib import error
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
from six.moves.urllib import error
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.metadata.services import base from cloudbaseinit.metadata.services import base

View File

@ -21,7 +21,6 @@ import socket
import struct import struct
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
import six
from cloudbaseinit.metadata.services import base from cloudbaseinit.metadata.services import base
from cloudbaseinit.models import network as network_model from cloudbaseinit.models import network as network_model
@ -108,8 +107,7 @@ class OpenNebulaService(base.BaseMetadataService):
address_chunks = address.split(".") address_chunks = address.split(".")
gateway_chunks = gateway.split(".") gateway_chunks = gateway.split(".")
netmask_chunks = [] netmask_chunks = []
for achunk, gchunk in six.moves.zip( for achunk, gchunk in zip(address_chunks, gateway_chunks):
address_chunks, gateway_chunks):
if achunk == gchunk: if achunk == gchunk:
nchunk = "255" nchunk = "255"
else: else:

View File

@ -15,11 +15,8 @@
import abc import abc
import tempfile import tempfile
import six
class BaseConfigDriveManager(object, metaclass=abc.ABCMeta):
@six.add_metaclass(abc.ABCMeta)
class BaseConfigDriveManager(object):
def __init__(self): def __init__(self):
self.target_path = tempfile.mkdtemp() self.target_path = tempfile.mkdtemp()

View File

@ -15,12 +15,12 @@
import json import json
import requests import requests
from urllib import error
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import exception from cloudbaseinit import exception
from cloudbaseinit.metadata.services import base from cloudbaseinit.metadata.services import base
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
from six.moves.urllib import error
CONF = cloudbaseinit_conf.CONF CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__) LOG = oslo_logging.getLogger(__name__)

View File

@ -20,12 +20,11 @@ import re
import struct import struct
import subprocess import subprocess
import time import time
import winreg
import netaddr import netaddr
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
import pywintypes import pywintypes
import six
from six.moves import winreg
from tzlocal import windows_tz from tzlocal import windows_tz
import win32api import win32api
from win32com import client from win32com import client
@ -602,7 +601,7 @@ class WindowsUtils(base.BaseOSUtils):
sidNameUse = wintypes.DWORD() sidNameUse = wintypes.DWORD()
ret_val = advapi32.LookupAccountNameW( ret_val = advapi32.LookupAccountNameW(
0, six.text_type(username), sid, ctypes.byref(cbSid), domainName, 0, str(username), sid, ctypes.byref(cbSid), domainName,
ctypes.byref(cchReferencedDomainName), ctypes.byref(sidNameUse)) ctypes.byref(cchReferencedDomainName), ctypes.byref(sidNameUse))
if not ret_val: if not ret_val:
raise exception.WindowsCloudbaseInitException( raise exception.WindowsCloudbaseInitException(
@ -613,9 +612,9 @@ class WindowsUtils(base.BaseOSUtils):
def add_user_to_local_group(self, username, groupname): def add_user_to_local_group(self, username, groupname):
lmi = Win32_LOCALGROUP_MEMBERS_INFO_3() lmi = Win32_LOCALGROUP_MEMBERS_INFO_3()
lmi.lgrmi3_domainandname = six.text_type(username) lmi.lgrmi3_domainandname = str(username)
ret_val = netapi32.NetLocalGroupAddMembers(0, six.text_type(groupname), ret_val = netapi32.NetLocalGroupAddMembers(0, str(groupname),
3, ctypes.pointer(lmi), 1) 3, ctypes.pointer(lmi), 1)
if ret_val == self.NERR_GroupNotFound: if ret_val == self.NERR_GroupNotFound:
@ -652,9 +651,9 @@ class WindowsUtils(base.BaseOSUtils):
{"username": username, "domain": domain}) {"username": username, "domain": domain})
token = wintypes.HANDLE() token = wintypes.HANDLE()
ret_val = advapi32.LogonUserW(six.text_type(username), ret_val = advapi32.LogonUserW(str(username),
six.text_type(domain), str(domain),
six.text_type(password), str(password),
logon_type, logon_type,
self.LOGON32_PROVIDER_DEFAULT, self.LOGON32_PROVIDER_DEFAULT,
ctypes.byref(token)) ctypes.byref(token))
@ -665,7 +664,7 @@ class WindowsUtils(base.BaseOSUtils):
if load_profile: if load_profile:
pi = Win32_PROFILEINFO() pi = Win32_PROFILEINFO()
pi.dwSize = ctypes.sizeof(Win32_PROFILEINFO) pi.dwSize = ctypes.sizeof(Win32_PROFILEINFO)
pi.lpUserName = six.text_type(username) pi.lpUserName = str(username)
ret_val = userenv.LoadUserProfileW(token, ctypes.byref(pi)) ret_val = userenv.LoadUserProfileW(token, ctypes.byref(pi))
if not ret_val: if not ret_val:
kernel32.CloseHandle(token) kernel32.CloseHandle(token)
@ -756,8 +755,7 @@ class WindowsUtils(base.BaseOSUtils):
def set_host_name(self, new_host_name): def set_host_name(self, new_host_name):
ret_val = kernel32.SetComputerNameExW( ret_val = kernel32.SetComputerNameExW(
self.ComputerNamePhysicalDnsHostname, self.ComputerNamePhysicalDnsHostname, str(new_host_name))
six.text_type(new_host_name))
if not ret_val: if not ret_val:
raise exception.WindowsCloudbaseInitException( raise exception.WindowsCloudbaseInitException(
"Cannot set host name: %r") "Cannot set host name: %r")
@ -1394,7 +1392,7 @@ class WindowsUtils(base.BaseOSUtils):
def get_volume_label(self, drive): def get_volume_label(self, drive):
max_label_size = 261 max_label_size = 261
label = ctypes.create_unicode_buffer(max_label_size) label = ctypes.create_unicode_buffer(max_label_size)
ret_val = kernel32.GetVolumeInformationW(six.text_type(drive), label, ret_val = kernel32.GetVolumeInformationW(str(drive), label,
max_label_size, 0, 0, 0, 0, 0) max_label_size, 0, 0, 0, 0, 0)
if ret_val: if ret_val:
return label.value return label.value
@ -1404,8 +1402,7 @@ class WindowsUtils(base.BaseOSUtils):
volume_name = ctypes.create_unicode_buffer(max_volume_name_len) volume_name = ctypes.create_unicode_buffer(max_volume_name_len)
if not kernel32.GetVolumeNameForVolumeMountPointW( if not kernel32.GetVolumeNameForVolumeMountPointW(
six.text_type(mount_point), volume_name, str(mount_point), volume_name, max_volume_name_len):
max_volume_name_len):
if kernel32.GetLastError() in [self.ERROR_INVALID_NAME, if kernel32.GetLastError() in [self.ERROR_INVALID_NAME,
self.ERROR_PATH_NOT_FOUND]: self.ERROR_PATH_NOT_FOUND]:
raise exception.ItemNotFoundException( raise exception.ItemNotFoundException(

View File

@ -15,7 +15,6 @@
import abc import abc
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
import six
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.osutils import factory as osutils_factory from cloudbaseinit.osutils import factory as osutils_factory
@ -26,8 +25,7 @@ CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__) LOG = oslo_logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta) class BaseCreateUserPlugin(base.BasePlugin, metaclass=abc.ABCMeta):
class BaseCreateUserPlugin(base.BasePlugin):
"""This is a base class for creating or modifying an user.""" """This is a base class for creating or modifying an user."""
@abc.abstractmethod @abc.abstractmethod

View File

@ -14,11 +14,8 @@
import abc import abc
import six
class BaseUserDataPlugin(object, metaclass=abc.ABCMeta):
@six.add_metaclass(abc.ABCMeta)
class BaseUserDataPlugin(object):
def __init__(self, mime_type): def __init__(self, mime_type):
self._mime_type = mime_type self._mime_type = mime_type

View File

@ -14,11 +14,8 @@
import abc import abc
import six
class BaseCloudConfigPlugin(object, metaclass=abc.ABCMeta):
@six.add_metaclass(abc.ABCMeta)
class BaseCloudConfigPlugin(object):
"""Base plugin class for cloud-config plugins.""" """Base plugin class for cloud-config plugins."""
@abc.abstractmethod @abc.abstractmethod

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import six
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
@ -45,7 +43,7 @@ class GroupsPlugin(base.BaseCloudConfigPlugin):
group_name = None group_name = None
group_users = [] group_users = []
if isinstance(item, six.string_types): if isinstance(item, str):
group_name = item group_name = item
elif isinstance(item, dict): elif isinstance(item, dict):
try: try:

View File

@ -13,7 +13,6 @@
# under the License. # under the License.
import os import os
import six
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
@ -49,7 +48,7 @@ class RunCmdPlugin(base.BaseCloudConfigPlugin):
entries = 0 entries = 0
for command in commands: for command in commands:
if isinstance(command, six.string_types): if isinstance(command, str):
script_content += command script_content += command
elif isinstance(command, (list, tuple)): elif isinstance(command, (list, tuple)):
subcommand_content = [] subcommand_content = []

View File

@ -15,7 +15,6 @@
import datetime import datetime
import os import os
import pytz import pytz
import six
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
@ -41,11 +40,11 @@ class UsersPlugin(base.BaseCloudConfigPlugin):
groups = data.get('groups', None) groups = data.get('groups', None)
primary_group = data.get('primary_group', None) primary_group = data.get('primary_group', None)
user_groups = [] user_groups = []
if isinstance(groups, six.string_types): if isinstance(groups, str):
user_groups.extend(groups.split(', ')) user_groups.extend(groups.split(', '))
elif isinstance(groups, (list, tuple)): elif isinstance(groups, (list, tuple)):
user_groups.extend(groups) user_groups.extend(groups)
if isinstance(primary_group, six.string_types): if isinstance(primary_group, str):
user_groups.extend(primary_group.split(', ')) user_groups.extend(primary_group.split(', '))
elif isinstance(primary_group, (list, tuple)): elif isinstance(primary_group, (list, tuple)):
user_groups.extend(primary_group) user_groups.extend(primary_group)
@ -68,7 +67,7 @@ class UsersPlugin(base.BaseCloudConfigPlugin):
expiredate = data.get('expiredate', None) expiredate = data.get('expiredate', None)
expire_interval = None expire_interval = None
if isinstance(expiredate, six.string_types): if isinstance(expiredate, str):
year, month, day = map(int, expiredate.split('-')) year, month, day = map(int, expiredate.split('-'))
expiredate = datetime.datetime(year=year, month=month, day=day, expiredate = datetime.datetime(year=year, month=month, day=day,
tzinfo=pytz.utc) tzinfo=pytz.utc)

View File

@ -18,7 +18,6 @@ import io
import os import os
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
import six
from cloudbaseinit import exception from cloudbaseinit import exception
from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import ( from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import (
@ -59,7 +58,7 @@ def _convert_permissions(permissions):
def _process_content(content, encoding): def _process_content(content, encoding):
"""Decode the content taking into consideration the encoding.""" """Decode the content taking into consideration the encoding."""
result = content result = content
if six.PY3 and not isinstance(result, six.binary_type): if not isinstance(result, bytes):
# At this point, content will be string, which is wrong for Python 3. # At this point, content will be string, which is wrong for Python 3.
result = result.encode() result = result.encode()

View File

@ -15,8 +15,6 @@
import os import os
import six
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.plugins.common.userdataplugins import base from cloudbaseinit.plugins.common.userdataplugins import base
from cloudbaseinit.plugins.common import userdatautils from cloudbaseinit.plugins.common import userdatautils
@ -47,6 +45,6 @@ class HeatPlugin(base.BaseUserDataPlugin):
# Normalize the payload to bytes, since `execute_user_data_script` # Normalize the payload to bytes, since `execute_user_data_script`
# operates on bytes and `get_payload` returns a string on # operates on bytes and `get_payload` returns a string on
# Python 3. # Python 3.
if isinstance(payload, six.text_type): if isinstance(payload, str):
payload = payload.encode() payload = payload.encode()
return userdatautils.execute_user_data_script(payload) return userdatautils.execute_user_data_script(payload)

View File

@ -15,10 +15,10 @@
import datetime import datetime
import os import os
import shutil import shutil
import winreg
import zipfile import zipfile
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
from six.moves import winreg
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import exception from cloudbaseinit import exception

View File

@ -151,7 +151,7 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
def test_get_iso_file_size(self): def test_get_iso_file_size(self):
self._test_get_iso_file_size() self._test_get_iso_file_size()
@mock.patch("six.moves.builtins.open", new=OPEN) @mock.patch("builtins.open", new=OPEN)
def test_write_iso_file(self): def test_write_iso_file(self):
file_path = "fake\\path" file_path = "fake\\path"
file_size = 100 * 512 file_size = 100 * 512
@ -293,7 +293,7 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.' 'WindowsConfigDriveManager.'
'_extract_iso_from_devices') '_extract_iso_from_devices')
@mock.patch("six.moves.builtins.map") @mock.patch("builtins.map")
def test_get_config_drive_from_raw_hdd(self, mock_map, def test_get_config_drive_from_raw_hdd(self, mock_map,
mock_extract_iso_from_devices): mock_extract_iso_from_devices):
Disk = self.conf_module.disk.Disk Disk = self.conf_module.disk.Disk

View File

@ -37,14 +37,12 @@ class AzureServiceTest(unittest.TestCase):
self._mock_untangle = mock.MagicMock() self._mock_untangle = mock.MagicMock()
self._mock_ctypes = mock.MagicMock() self._mock_ctypes = mock.MagicMock()
self._mock_wintypes = mock.MagicMock() self._mock_wintypes = mock.MagicMock()
self._moves_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict( self._module_patcher = mock.patch.dict(
'sys.modules', 'sys.modules',
{'untangle': self._mock_untangle, {'untangle': self._mock_untangle,
'ctypes': self._mock_ctypes, 'ctypes': self._mock_ctypes,
'ctypes.wintypes': self._mock_wintypes, 'ctypes.wintypes': self._mock_wintypes,
'six.moves': self._moves_mock
}) })
self._module_patcher.start() self._module_patcher.start()
self._azureservice_module = importlib.import_module( self._azureservice_module = importlib.import_module(

View File

@ -117,7 +117,7 @@ class TestBaseConfigDriveService(unittest.TestCase):
@mock.patch('os.path.join') @mock.patch('os.path.join')
def test_get_data(self, mock_join, mock_normpath): def test_get_data(self, mock_join, mock_normpath):
fake_path = os.path.join('fake', 'path') fake_path = os.path.join('fake', 'path')
with mock.patch('six.moves.builtins.open', with mock.patch('builtins.open',
mock.mock_open(read_data='fake data'), create=True): mock.mock_open(read_data='fake data'), create=True):
response = self._config_drive._get_data(fake_path) response = self._config_drive._get_data(fake_path)
self.assertEqual('fake data', response) self.assertEqual('fake data', response)

View File

@ -15,12 +15,12 @@
import functools import functools
import socket import socket
import unittest import unittest
import urllib
try: try:
import unittest.mock as mock import unittest.mock as mock
except ImportError: except ImportError:
import mock import mock
from six.moves import urllib
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.metadata.services import base from cloudbaseinit.metadata.services import base

View File

@ -14,12 +14,12 @@
import os import os
import unittest import unittest
from urllib import error
try: try:
import unittest.mock as mock import unittest.mock as mock
except ImportError: except ImportError:
import mock import mock
from six.moves.urllib import error
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.metadata.services import base from cloudbaseinit.metadata.services import base

View File

@ -213,7 +213,7 @@ class TestNoCloudConfigDriveService(unittest.TestCase):
@mock.patch('os.path.join') @mock.patch('os.path.join')
def test_get_data(self, mock_join, mock_normpath): def test_get_data(self, mock_join, mock_normpath):
fake_path = os.path.join('fake', 'path') fake_path = os.path.join('fake', 'path')
with mock.patch('six.moves.builtins.open', with mock.patch('builtins.open',
mock.mock_open(read_data='fake data'), create=True): mock.mock_open(read_data='fake data'), create=True):
response = self._config_drive._get_data(fake_path) response = self._config_drive._get_data(fake_path)
self.assertEqual('fake data', response) self.assertEqual('fake data', response)

View File

@ -126,7 +126,7 @@ class _TestOpenNebulaService(unittest.TestCase):
self._service = opennebulaservice.OpenNebulaService() self._service = opennebulaservice.OpenNebulaService()
@mock.patch("six.moves.builtins.open", new=OPEN) @mock.patch("builtins.open", new=OPEN)
class TestOpenNebulaService(_TestOpenNebulaService): class TestOpenNebulaService(_TestOpenNebulaService):
@classmethod @classmethod

View File

@ -37,14 +37,12 @@ class OvfServiceTest(unittest.TestCase):
self._mock_untangle = mock.MagicMock() self._mock_untangle = mock.MagicMock()
self._mock_ctypes = mock.MagicMock() self._mock_ctypes = mock.MagicMock()
self._mock_wintypes = mock.MagicMock() self._mock_wintypes = mock.MagicMock()
self._moves_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict( self._module_patcher = mock.patch.dict(
'sys.modules', 'sys.modules',
{'untangle': self._mock_untangle, {'untangle': self._mock_untangle,
'ctypes': self._mock_ctypes, 'ctypes': self._mock_ctypes,
'ctypes.wintypes': self._mock_wintypes, 'ctypes.wintypes': self._mock_wintypes,
'six.moves': self._moves_mock
}) })
self._module_patcher.start() self._module_patcher.start()
self._ovfservice_module = importlib.import_module( self._ovfservice_module = importlib.import_module(

View File

@ -14,14 +14,13 @@
import importlib import importlib
import unittest import unittest
from urllib import error
try: try:
import unittest.mock as mock import unittest.mock as mock
except ImportError: except ImportError:
import mock import mock
from six.moves.urllib import error
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import exception from cloudbaseinit import exception
from cloudbaseinit.tests import testutils from cloudbaseinit.tests import testutils

View File

@ -24,7 +24,6 @@ try:
import unittest.mock as mock import unittest.mock as mock
except ImportError: except ImportError:
import mock import mock
import six
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import exception from cloudbaseinit import exception
@ -66,7 +65,7 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._mi_mock = mock.MagicMock() self._mi_mock = mock.MagicMock()
self._wmi_mock = mock.MagicMock() self._wmi_mock = mock.MagicMock()
self._wmi_mock.x_wmi = WMIError self._wmi_mock.x_wmi = WMIError
self._moves_mock = mock.MagicMock() self._winreg_mock = mock.MagicMock()
self._xmlrpc_client_mock = mock.MagicMock() self._xmlrpc_client_mock = mock.MagicMock()
self._ctypes_mock = mock.MagicMock() self._ctypes_mock = mock.MagicMock()
self._tzlocal_mock = mock.Mock() self._tzlocal_mock = mock.Mock()
@ -86,12 +85,12 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
'winerror': self._winerror_mock, 'winerror': self._winerror_mock,
'mi': self._mi_mock, 'mi': self._mi_mock,
'wmi': self._wmi_mock, 'wmi': self._wmi_mock,
'six.moves': self._moves_mock, 'xmlrpc.client': self._xmlrpc_client_mock,
'six.moves.xmlrpc_client': self._xmlrpc_client_mock,
'ctypes': self._ctypes_mock, 'ctypes': self._ctypes_mock,
'pywintypes': self._pywintypes_mock, 'pywintypes': self._pywintypes_mock,
'tzlocal': self._tzlocal_mock, 'tzlocal': self._tzlocal_mock,
'winioctlcon': mock.MagicMock()}) 'winioctlcon': mock.MagicMock(),
'winreg': self._winreg_mock})
_module_patcher.start() _module_patcher.start()
self.addCleanup(_module_patcher.stop) self.addCleanup(_module_patcher.stop)
@ -100,7 +99,6 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
with mock.patch("cloudbaseinit.utils.windows.disk.GUID"): with mock.patch("cloudbaseinit.utils.windows.disk.GUID"):
self.windows_utils = importlib.import_module(module_path) self.windows_utils = importlib.import_module(module_path)
self._winreg_mock = self._moves_mock.winreg
self._windll_mock = self._ctypes_mock.windll self._windll_mock = self._ctypes_mock.windll
self._wintypes_mock = self._ctypes_mock.wintypes self._wintypes_mock = self._ctypes_mock.wintypes
self._client_mock = self._win32com_mock.client self._client_mock = self._win32com_mock.client
@ -322,7 +320,7 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
response = self._winutils._get_user_sid_and_domain(self._USERNAME) response = self._winutils._get_user_sid_and_domain(self._USERNAME)
advapi32.LookupAccountNameW.assert_called_with( advapi32.LookupAccountNameW.assert_called_with(
0, six.text_type(self._USERNAME), sid, 0, str(self._USERNAME), sid,
self._ctypes_mock.byref(cbSid), domainName, self._ctypes_mock.byref(cbSid), domainName,
self._ctypes_mock.byref(cchReferencedDomainName), self._ctypes_mock.byref(cchReferencedDomainName),
self._ctypes_mock.byref(sidNameUse)) self._ctypes_mock.byref(sidNameUse))
@ -369,12 +367,11 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
group_name) group_name)
netapi32.NetLocalGroupAddMembers.assert_called_with( netapi32.NetLocalGroupAddMembers.assert_called_with(
0, six.text_type(group_name), 3, 0, str(group_name), 3,
self._ctypes_mock.pointer.return_value, 1) self._ctypes_mock.pointer.return_value, 1)
self._ctypes_mock.pointer.assert_called_once_with(lmi) self._ctypes_mock.pointer.assert_called_once_with(lmi)
self.assertEqual(lmi.lgrmi3_domainandname, self.assertEqual(lmi.lgrmi3_domainandname, str(self._USERNAME))
six.text_type(self._USERNAME))
def test_add_user_to_local_group_no_error(self): def test_add_user_to_local_group_no_error(self):
self._test_add_user_to_local_group(ret_value=0) self._test_add_user_to_local_group(ret_value=0)
@ -567,7 +564,7 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
mock_SetComputerNameExW.assert_called_with( mock_SetComputerNameExW.assert_called_with(
self._winutils.ComputerNamePhysicalDnsHostname, self._winutils.ComputerNamePhysicalDnsHostname,
six.text_type('fake name')) str('fake name'))
def test_set_host_name(self): def test_set_host_name(self):
self._test_set_host_name(ret_value='fake response') self._test_set_host_name(ret_value='fake response')

View File

@ -18,7 +18,6 @@ try:
import unittest.mock as mock import unittest.mock as mock
except ImportError: except ImportError:
import mock import mock
import six
from cloudbaseinit.plugins.common import base from cloudbaseinit.plugins.common import base
from cloudbaseinit.plugins.common import mtu from cloudbaseinit.plugins.common import mtu
@ -79,7 +78,7 @@ class MTUPluginTests(unittest.TestCase):
self.assertFalse(mock_get_os_utils().set_network_mtu.called) self.assertFalse(mock_get_os_utils().set_network_mtu.called)
def test_execute_success(self, mock_get_os_utils): def test_execute_success(self, mock_get_os_utils):
dhcp_options = {dhcp.OPTION_MTU: six.b("\x00\x04")} dhcp_options = {dhcp.OPTION_MTU: "\x00\x04".encode("latin-1")}
self._test_execute(mock_get_os_utils, self._test_execute(mock_get_os_utils,
dhcp_options=dhcp_options) dhcp_options=dhcp_options)

View File

@ -35,17 +35,16 @@ class AzureGuestAgentPluginTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.mock_wmi = mock.MagicMock() self.mock_wmi = mock.MagicMock()
self._moves_mock = mock.MagicMock() self._winreg_mock = mock.MagicMock()
patcher = mock.patch.dict( patcher = mock.patch.dict(
"sys.modules", "sys.modules",
{ {
"wmi": self.mock_wmi, "wmi": self.mock_wmi,
"six.moves": self._moves_mock "winreg": self._winreg_mock
} }
) )
patcher.start() patcher.start()
self.addCleanup(patcher.stop) self.addCleanup(patcher.stop)
self._winreg_mock = self._moves_mock.winreg
self._azureguestagent = importlib.import_module(MODPATH) self._azureguestagent = importlib.import_module(MODPATH)
self._azureagentplugin = self._azureguestagent.AzureGuestAgentPlugin() self._azureagentplugin = self._azureguestagent.AzureGuestAgentPlugin()
self.snatcher = testutils.LogSnatcher(MODPATH) self.snatcher = testutils.LogSnatcher(MODPATH)

View File

@ -33,12 +33,10 @@ class BootConfigPluginTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.mock_wmi = mock.MagicMock() self.mock_wmi = mock.MagicMock()
self._moves_mock = mock.MagicMock()
patcher = mock.patch.dict( patcher = mock.patch.dict(
"sys.modules", "sys.modules",
{ {
"wmi": self.mock_wmi, "wmi": self.mock_wmi,
"six.moves": self._moves_mock,
'ctypes': mock.MagicMock(), 'ctypes': mock.MagicMock(),
'ctypes.windll': mock.MagicMock(), 'ctypes.windll': mock.MagicMock(),
'ctypes.wintypes': mock.MagicMock(), 'ctypes.wintypes': mock.MagicMock(),

View File

@ -32,12 +32,12 @@ class RDPPluginTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.mock_wmi = mock.MagicMock() self.mock_wmi = mock.MagicMock()
self._moves_mock = mock.MagicMock() self._winreg_mock = mock.MagicMock()
patcher = mock.patch.dict( patcher = mock.patch.dict(
"sys.modules", "sys.modules",
{ {
"wmi": self.mock_wmi, "wmi": self.mock_wmi,
"six.moves": self._moves_mock "winreg": self._winreg_mock
} }
) )
patcher.start() patcher.start()

View File

@ -32,19 +32,17 @@ class ConfigWinRMCertificateAuthPluginTests(unittest.TestCase):
self._ctypes_mock = mock.MagicMock() self._ctypes_mock = mock.MagicMock()
self._win32com_mock = mock.MagicMock() self._win32com_mock = mock.MagicMock()
self._pywintypes_mock = mock.MagicMock() self._pywintypes_mock = mock.MagicMock()
self._moves_mock = mock.MagicMock() self._winreg_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict( self._module_patcher = mock.patch.dict(
'sys.modules', 'sys.modules',
{'ctypes': self._ctypes_mock, {'ctypes': self._ctypes_mock,
'win32com': self._win32com_mock, 'win32com': self._win32com_mock,
'pywintypes': self._pywintypes_mock, 'pywintypes': self._pywintypes_mock,
'six.moves': self._moves_mock}) 'winreg': self._winreg_mock})
self._module_patcher.start() self._module_patcher.start()
self._winreg_mock = self._moves_mock.winreg
self.winrmcert = importlib.import_module( self.winrmcert = importlib.import_module(
'cloudbaseinit.plugins.windows.winrmcertificateauth') 'cloudbaseinit.plugins.windows.winrmcertificateauth')
self._certif_auth = self.winrmcert.ConfigWinRMCertificateAuthPlugin() self._certif_auth = self.winrmcert.ConfigWinRMCertificateAuthPlugin()

View File

@ -33,7 +33,7 @@ class ConfigWinRMListenerPluginTests(unittest.TestCase):
self._mock_wintypes = mock.MagicMock() self._mock_wintypes = mock.MagicMock()
self._mock_pywintypes = mock.MagicMock() self._mock_pywintypes = mock.MagicMock()
self._mock_win32 = mock.MagicMock() self._mock_win32 = mock.MagicMock()
self._moves_mock = mock.MagicMock() self._winreg_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict( self._module_patcher = mock.patch.dict(
'sys.modules', 'sys.modules',
@ -41,10 +41,9 @@ class ConfigWinRMListenerPluginTests(unittest.TestCase):
'ctypes.wintypes': self._mock_wintypes, 'ctypes.wintypes': self._mock_wintypes,
'pywintypes': self._mock_pywintypes, 'pywintypes': self._mock_pywintypes,
'win32com': self._mock_win32, 'win32com': self._mock_win32,
'six.moves': self._moves_mock 'winreg': self._winreg_mock
}) })
self._module_patcher.start() self._module_patcher.start()
self._winreg_mock = self._moves_mock.winreg
winrmlistener = importlib.import_module('cloudbaseinit.plugins.' winrmlistener = importlib.import_module('cloudbaseinit.plugins.'
'windows.winrmlistener') 'windows.winrmlistener')

View File

@ -16,7 +16,6 @@ import importlib
import unittest import unittest
import mock import mock
import six
from cloudbaseinit.tests import testutils from cloudbaseinit.tests import testutils
@ -51,7 +50,7 @@ class TestVersion(unittest.TestCase):
result = self.version._read_url(mock_url) result = self.version._read_url(mock_url)
headers = {'User-Agent': self.version._PRODUCT_NAME} headers = {'User-Agent': self.version._PRODUCT_NAME}
mock_get.assert_called_once_with(mock_url, verify=six.PY3, mock_get.assert_called_once_with(mock_url, verify=True,
headers=headers) headers=headers)
request = mock_get.return_value request = mock_get.return_value
request.raise_for_status.assert_called_once_with() request.raise_for_status.assert_called_once_with()

View File

@ -29,14 +29,14 @@ CONF = cloudbaseinit_conf.CONF
class NetworkUtilsTest(unittest.TestCase): class NetworkUtilsTest(unittest.TestCase):
@mock.patch('six.moves.urllib.request.urlopen') @mock.patch('urllib.request.urlopen')
def test_check_url(self, mock_url_open): def test_check_url(self, mock_url_open):
mock_url_open.return_value = None mock_url_open.return_value = None
self.assertTrue(network.check_url("fake_url")) self.assertTrue(network.check_url("fake_url"))
@mock.patch('sys.platform', new='win32') @mock.patch('sys.platform', new='win32')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils') @mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
@mock.patch('six.moves.urllib.parse.urlparse') @mock.patch('urllib.parse.urlparse')
def _test_check_metadata_ip_route(self, mock_urlparse, mock_get_os_utils, def _test_check_metadata_ip_route(self, mock_urlparse, mock_get_os_utils,
side_effect): side_effect):
mock_utils = mock.MagicMock() mock_utils = mock.MagicMock()

View File

@ -31,15 +31,14 @@ class TestWSMStorageManager(unittest.TestCase):
def setUp(self): def setUp(self):
self._mock_ctypes = mock.MagicMock() self._mock_ctypes = mock.MagicMock()
self.mock_wmi = mock.MagicMock() self.mock_wmi = mock.MagicMock()
self._moves_mock = mock.MagicMock() self._winreg_mock = mock.MagicMock()
self._winreg_mock = self._moves_mock.winreg
self._kernel32_mock = mock.MagicMock() self._kernel32_mock = mock.MagicMock()
patcher = mock.patch.dict( patcher = mock.patch.dict(
"sys.modules", "sys.modules",
{ {
"wmi": self.mock_wmi, "wmi": self.mock_wmi,
"six.moves": self._moves_mock, "winreg": self._winreg_mock,
"ctypes": self._mock_ctypes, "ctypes": self._mock_ctypes,
"oslo_log": mock.MagicMock() "oslo_log": mock.MagicMock()
} }

View File

@ -27,12 +27,12 @@ class WindowsNetworkUtilsTests(unittest.TestCase):
def setUp(self): def setUp(self):
self._ctypes_mock = mock.MagicMock() self._ctypes_mock = mock.MagicMock()
self._moves_mock = mock.MagicMock() self._winreg_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict( self._module_patcher = mock.patch.dict(
'sys.modules', 'sys.modules',
{'ctypes': self._ctypes_mock, {'ctypes': self._ctypes_mock,
'six.moves': self._moves_mock}) 'winreg': self._winreg_mock})
self._module_patcher.start() self._module_patcher.start()
@ -89,10 +89,10 @@ class WindowsNetworkUtilsTests(unittest.TestCase):
def _test_get_registry_dhcp_server(self, dhcp_server, exception=None): def _test_get_registry_dhcp_server(self, dhcp_server, exception=None):
fake_adapter = mock.sentinel.fake_adapter_name fake_adapter = mock.sentinel.fake_adapter_name
self._moves_mock.winreg.QueryValueEx.return_value = [dhcp_server] self._winreg_mock.QueryValueEx.return_value = [dhcp_server]
if exception: if exception:
self._moves_mock.winreg.QueryValueEx.side_effect = [exception] self._winreg_mock.QueryValueEx.side_effect = [exception]
if exception.errno != 2: if exception.errno != 2:
self.assertRaises(cbinit_exception.CloudbaseInitException, self.assertRaises(cbinit_exception.CloudbaseInitException,
@ -105,14 +105,14 @@ class WindowsNetworkUtilsTests(unittest.TestCase):
else: else:
self.assertEqual(dhcp_server, response) self.assertEqual(dhcp_server, response)
self._moves_mock.winreg.OpenKey.assert_called_once_with( self._winreg_mock.OpenKey.assert_called_once_with(
self._moves_mock.winreg.HKEY_LOCAL_MACHINE, self._winreg_mock.HKEY_LOCAL_MACHINE,
"SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\" "SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\"
"Interfaces\\%s" % fake_adapter, 0, "Interfaces\\%s" % fake_adapter, 0,
self._moves_mock.winreg.KEY_READ) self._winreg_mock.KEY_READ)
self._moves_mock.winreg.QueryValueEx.assert_called_once_with( self._winreg_mock.QueryValueEx.assert_called_once_with(
self._moves_mock.winreg.OpenKey.return_value.__enter__(), self._winreg_mock.OpenKey.return_value.__enter__(),
"DhcpServer") "DhcpServer")
def test_get_registry_dhcp_server(self): def test_get_registry_dhcp_server(self):

View File

@ -31,12 +31,11 @@ class RdpTest(unittest.TestCase):
def setUp(self): def setUp(self):
self._wmi_mock = mock.MagicMock() self._wmi_mock = mock.MagicMock()
self._moves_mock = mock.MagicMock() self._winreg_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict( self._module_patcher = mock.patch.dict(
'sys.modules', { 'sys.modules', {
'wmi': self._wmi_mock, 'wmi': self._wmi_mock,
'six.moves': self._moves_mock}) 'winreg': self._winreg_mock})
self._winreg_mock = self._moves_mock.winreg
self.snatcher = testutils.LogSnatcher(MODPATH) self.snatcher = testutils.LogSnatcher(MODPATH)
self._module_patcher.start() self._module_patcher.start()
self.rdp = importlib.import_module(MODPATH) self.rdp = importlib.import_module(MODPATH)

View File

@ -26,14 +26,13 @@ from cloudbaseinit.tests import testutils
class WindowsSecurityUtilsTests(unittest.TestCase): class WindowsSecurityUtilsTests(unittest.TestCase):
def setUp(self): def setUp(self):
self._moves_mock = mock.MagicMock() self._winreg_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict( self._module_patcher = mock.patch.dict(
'sys.modules', 'sys.modules',
{'six.moves': self._moves_mock}) {'winreg': self._winreg_mock})
self._module_patcher.start() self._module_patcher.start()
self._winreg_mock = self._moves_mock.winreg
self.security = importlib.import_module( self.security = importlib.import_module(
"cloudbaseinit.utils.windows.security") "cloudbaseinit.utils.windows.security")

View File

@ -32,7 +32,6 @@ class FakeWindowsError(Exception):
class TestTimezone(unittest.TestCase): class TestTimezone(unittest.TestCase):
def setUp(self): def setUp(self):
self._mock_moves = mock.MagicMock()
self._mock_winreg = mock.Mock() self._mock_winreg = mock.Mock()
self._mock_ctypes = mock.Mock() self._mock_ctypes = mock.Mock()
self._mock_win32security = mock.Mock() self._mock_win32security = mock.Mock()
@ -42,11 +41,10 @@ class TestTimezone(unittest.TestCase):
self._module_patcher = mock.patch.dict( self._module_patcher = mock.patch.dict(
'sys.modules', 'sys.modules',
{'ctypes': self._mock_ctypes, {'ctypes': self._mock_ctypes,
'six.moves': self._mock_moves, 'winreg': self._mock_winreg,
'win32process': self._mock_win32process, 'win32process': self._mock_win32process,
'win32security': self._mock_win32security}) 'win32security': self._mock_win32security})
self._module_patcher.start() self._module_patcher.start()
self._mock_moves.winreg = self._mock_winreg
self._timezone_module = importlib.import_module( self._timezone_module = importlib.import_module(
'cloudbaseinit.utils.windows.timezone') 'cloudbaseinit.utils.windows.timezone')
self._timezone_module.WindowsError = FakeWindowsError self._timezone_module.WindowsError = FakeWindowsError

View File

@ -13,7 +13,6 @@
# under the License. # under the License.
import importlib import importlib
import six
import unittest import unittest
try: try:
@ -231,7 +230,7 @@ class CryptoAPICertManagerTests(unittest.TestCase):
mock_CertOpenStore.assert_called_with( mock_CertOpenStore.assert_called_with(
self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0,
self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE, self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE,
six.text_type(self.x509.STORE_NAME_MY)) str(self.x509.STORE_NAME_MY))
mock_get_cert_thumprint.assert_called_once_with( mock_get_cert_thumprint.assert_called_once_with(
mock_CertCreateSelfSignCertificate()) mock_CertCreateSelfSignCertificate())
mock_add_system_time_interval.assert_has_calls( mock_add_system_time_interval.assert_has_calls(
@ -396,7 +395,7 @@ class CryptoAPICertManagerTests(unittest.TestCase):
mock_CertOpenStore.assert_called_with( mock_CertOpenStore.assert_called_with(
self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0,
self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE, self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE,
six.text_type(self.x509.STORE_NAME_MY)) str(self.x509.STORE_NAME_MY))
mock_CertAddEncodedCertificateToStore.assert_called_with( mock_CertAddEncodedCertificateToStore.assert_called_with(
mock_CertOpenStore(), mock_CertOpenStore(),
@ -510,7 +509,7 @@ class CryptoAPICertManagerTests(unittest.TestCase):
flags = mock.sentinel.current_user flags = mock.sentinel.current_user
mock_cert_open_store.assert_called_once_with( mock_cert_open_store.assert_called_once_with(
mock.sentinel.store_prov_system, 0, 0, flags, mock.sentinel.store_prov_system, 0, 0, flags,
six.text_type(self.x509.STORE_NAME_MY)) str(self.x509.STORE_NAME_MY))
if store_handle: if store_handle:
mock_add_cert_store.assert_called_once_with( mock_add_cert_store.assert_called_once_with(
mock_cert_open_store.return_value, cert_context_p, mock_cert_open_store.return_value, cert_context_p,

View File

@ -16,7 +16,6 @@
import re import re
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
import six
from cloudbaseinit.models import network as network_model from cloudbaseinit.models import network as network_model
@ -109,7 +108,7 @@ def _add_nic(iface, nics):
def parse(data): def parse(data):
"""Parse the received content and obtain network details.""" """Parse the received content and obtain network details."""
if not data or not isinstance(data, six.string_types): if not data or not isinstance(data, str):
LOG.error("Invalid Debian config to parse:\n%s", data) LOG.error("Invalid Debian config to parse:\n%s", data)
return return

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import six
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
@ -21,7 +19,7 @@ LOG = oslo_logging.getLogger(__name__)
def get_as_string(value): def get_as_string(value):
if value is None or isinstance(value, six.text_type): if value is None or isinstance(value, str):
return value return value
else: else:
try: try:
@ -33,7 +31,7 @@ def get_as_string(value):
def write_file(target_path, data, mode='wb'): def write_file(target_path, data, mode='wb'):
if isinstance(data, six.text_type) and 'b' in mode: if isinstance(data, str) and 'b' in mode:
data = data.encode() data = data.encode()
with open(target_path, mode) as f: with open(target_path, mode) as f:
@ -41,7 +39,4 @@ def write_file(target_path, data, mode='wb'):
def hex_to_bytes(value): def hex_to_bytes(value):
if six.PY2:
return value.decode("hex")
else:
return bytes.fromhex(value) return bytes.fromhex(value)

View File

@ -14,7 +14,6 @@
import logging import logging
import serial import serial
import six
from oslo_log import formatters from oslo_log import formatters
from oslo_log import log from oslo_log import log
@ -29,7 +28,7 @@ def _safe_write(function):
"""Avoid issues related to unicode strings handling.""" """Avoid issues related to unicode strings handling."""
def _wrapper(message): def _wrapper(message):
# Unicode strings are not properly handled by the serial module # Unicode strings are not properly handled by the serial module
if isinstance(message, six.text_type): if isinstance(message, str):
function(message.encode("utf-8")) function(message.encode("utf-8"))
else: else:
function(message) function(message)

View File

@ -18,10 +18,10 @@ import netaddr
import socket import socket
import struct import struct
import sys import sys
from urllib import parse
from urllib import request
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
from six.moves.urllib import parse
from six.moves.urllib import request
from cloudbaseinit.osutils import factory as osutils_factory from cloudbaseinit.osutils import factory as osutils_factory

View File

@ -14,11 +14,8 @@
import abc import abc
import six
class BaseNetworkTeamManager(object, metaclass=abc.ABCMeta):
@six.add_metaclass(abc.ABCMeta)
class BaseNetworkTeamManager(object):
@abc.abstractmethod @abc.abstractmethod
def create_team(self, team_name, mode, load_balancing_algorithm, def create_team(self, team_name, mode, load_balancing_algorithm,
members, mac_address, primary_nic_name=None, members, mac_address, primary_nic_name=None,

View File

@ -14,11 +14,9 @@
import abc import abc
import re import re
import six
@six.add_metaclass(abc.ABCMeta) class BaseTemplateEngine(object, metaclass=abc.ABCMeta):
class BaseTemplateEngine(object):
def __init__(self): def __init__(self):
self._template_matcher = re.compile(r"##\s*template:(.*)", re.I) self._template_matcher = re.compile(r"##\s*template:(.*)", re.I)

View File

@ -20,7 +20,6 @@ from ctypes import wintypes
import random import random
import re import re
import six
import winioctlcon import winioctlcon
from cloudbaseinit import exception from cloudbaseinit import exception
@ -146,8 +145,7 @@ class Win32_DRIVE_LAYOUT_INFORMATION_EX(ctypes.Structure):
] ]
@six.add_metaclass(abc.ABCMeta) class BaseDevice(object, metaclass=abc.ABCMeta):
class BaseDevice(object):
"""Base class for devices like disks and partitions. """Base class for devices like disks and partitions.
It has common methods for getting physical disk geometry, It has common methods for getting physical disk geometry,

View File

@ -13,9 +13,9 @@
# under the License. # under the License.
import ctypes import ctypes
import winreg
from ctypes import wintypes from ctypes import wintypes
from six.moves import winreg
from cloudbaseinit import exception from cloudbaseinit import exception
from cloudbaseinit.utils.windows import iphlpapi from cloudbaseinit.utils.windows import iphlpapi

View File

@ -12,8 +12,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import winreg
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
from six.moves import winreg
from cloudbaseinit import exception from cloudbaseinit import exception
from cloudbaseinit.utils.windows import wmi_loader from cloudbaseinit.utils.windows import wmi_loader

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from six.moves import winreg import winreg
class WindowsSecurityUtils(object): class WindowsSecurityUtils(object):

View File

@ -14,16 +14,13 @@
import abc import abc
import six
SAN_POLICY_UNKNOWN = 0 SAN_POLICY_UNKNOWN = 0
SAN_POLICY_ONLINE = 1 SAN_POLICY_ONLINE = 1
SAN_POLICY_OFFLINE_SHARED = 2 SAN_POLICY_OFFLINE_SHARED = 2
SAN_POLICY_OFFLINE = 3 SAN_POLICY_OFFLINE = 3
@six.add_metaclass(abc.ABCMeta) class BaseStorageManager(object, metaclass=abc.ABCMeta):
class BaseStorageManager(object):
@abc.abstractmethod @abc.abstractmethod
def extend_volumes(self, volume_indexes=None): def extend_volumes(self, volume_indexes=None):

View File

@ -13,9 +13,9 @@
# under the License. # under the License.
import ctypes import ctypes
import winreg
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
from six.moves import winreg
from cloudbaseinit import exception from cloudbaseinit import exception
from cloudbaseinit.utils.windows import kernel32 from cloudbaseinit.utils.windows import kernel32

View File

@ -16,8 +16,8 @@ import ctypes
from ctypes import wintypes from ctypes import wintypes
import os import os
import struct import struct
import winreg
from six.moves import winreg
import win32security import win32security
from cloudbaseinit import exception from cloudbaseinit import exception

View File

@ -17,8 +17,6 @@ import ctypes
from ctypes import wintypes from ctypes import wintypes
import uuid import uuid
import six
from cloudbaseinit.utils import encoding from cloudbaseinit.utils import encoding
from cloudbaseinit.utils.windows import cryptoapi from cloudbaseinit.utils.windows import cryptoapi
from cloudbaseinit.utils import x509constants from cloudbaseinit.utils import x509constants
@ -226,8 +224,7 @@ class CryptoAPICertManager(object):
flags = cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER flags = cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER
store_handle = cryptoapi.CertOpenStore( store_handle = cryptoapi.CertOpenStore(
cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags, cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags, str(store_name))
six.text_type(store_name))
if not store_handle: if not store_handle:
raise cryptoapi.CryptoAPIException() raise cryptoapi.CryptoAPIException()
@ -297,8 +294,7 @@ class CryptoAPICertManager(object):
flags |= cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER flags |= cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER
store_handle = cryptoapi.CertOpenStore( store_handle = cryptoapi.CertOpenStore(
cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags, cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags, str(store_name))
six.text_type(store_name))
if not store_handle: if not store_handle:
raise cryptoapi.CryptoAPIException() raise cryptoapi.CryptoAPIException()
@ -364,8 +360,7 @@ class CryptoAPICertManager(object):
flags = cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER flags = cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER
store_handle = cryptoapi.CertOpenStore( store_handle = cryptoapi.CertOpenStore(
cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags, cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags, str(store_name))
six.text_type(store_name))
if not store_handle: if not store_handle:
raise cryptoapi.CryptoAPIException() raise cryptoapi.CryptoAPIException()
@ -500,8 +495,7 @@ class CryptoAPICertManager(object):
flags = cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER flags = cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER
store_handle = cryptoapi.CertOpenStore( store_handle = cryptoapi.CertOpenStore(
cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags, cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags, str(store_name))
six.text_type(store_name))
if not store_handle: if not store_handle:
raise cryptoapi.CryptoAPIException() raise cryptoapi.CryptoAPIException()

View File

@ -18,7 +18,6 @@ import threading
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
import pbr.version import pbr.version
import requests import requests
import six
_UPDATE_CHECK_URL = 'https://www.cloudbase.it/checkupdates.php?p={0}&v={1}' _UPDATE_CHECK_URL = 'https://www.cloudbase.it/checkupdates.php?p={0}&v={1}'
@ -27,9 +26,7 @@ LOG = oslo_logging.getLogger(__name__)
def _read_url(url): def _read_url(url):
# Disable certificate verification on Python 2 as req = requests.get(url, verify=True,
# requests' CA list is incomplete. Works fine on Python3.
req = requests.get(url, verify=six.PY3,
headers={'User-Agent': _PRODUCT_NAME}) headers={'User-Agent': _PRODUCT_NAME})
req.raise_for_status() req.raise_for_status()
if req.text: if req.text:

View File

@ -4,7 +4,6 @@ netaddr>=0.7.6
pyserial pyserial
oslo.config oslo.config
oslo.log oslo.log
six>=1.7.0
Babel>=1.3 Babel>=1.3
oauthlib oauthlib
netifaces netifaces