Provide the error code in the Windows exception messages

This patch adds a type of exception for the Windows code, which interpolates
the given exception message with the last Windows API error, retrieved with
ctypes.GetLastError. This is useful in the case where we have only the log
at our disposal for debugging and some API method failed with reasons unknown.
Since we can't replicate what the user does everytime, having some additional
clue why an API failed could improve our bug detection workflow.

Change-Id: I364324ad5a8529b5363be3a7c6dc03ca52eb637c
This commit is contained in:
Claudiu Popa 2015-03-16 13:52:58 +02:00
parent 0ed49b14cc
commit 0b40f6368b
10 changed files with 263 additions and 106 deletions

View File

@ -12,6 +12,21 @@
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
class CloudbaseInitException(Exception):
pass
class WindowsCloudbaseInitException(CloudbaseInitException):
def __init__(self, msg="%r", error_code=None):
if error_code is None:
error_code = ctypes.GetLastError()
description = ctypes.FormatError(error_code)
try:
formatted_msg = msg % description
except TypeError:
formatted_msg = msg
super(WindowsCloudbaseInitException, self).__init__(formatted_msg)

View File

@ -313,7 +313,7 @@ class WindowsUtils(base.BaseOSUtils):
ret_val = advapi32.InitiateSystemShutdownW(0, "Cloudbase-Init reboot",
0, True, True)
if not ret_val:
raise exception.CloudbaseInitException("Reboot failed")
raise exception.WindowsCloudbaseInitException("Reboot failed: %r")
def _get_user_wmi_object(self, username):
conn = wmi.WMI(moniker='//./root/cimv2')
@ -390,7 +390,8 @@ class WindowsUtils(base.BaseOSUtils):
0, six.text_type(username), sid, ctypes.byref(cbSid), domainName,
ctypes.byref(cchReferencedDomainName), ctypes.byref(sidNameUse))
if not ret_val:
raise exception.CloudbaseInitException("Cannot get user SID")
raise exception.WindowsCloudbaseInitException(
"Cannot get user SID: %r")
return (sid, domainName.value)
@ -430,7 +431,8 @@ class WindowsUtils(base.BaseOSUtils):
six.text_type(password), 2, 0,
ctypes.byref(token))
if not ret_val:
raise exception.CloudbaseInitException("User logon failed")
raise exception.WindowsCloudbaseInitException(
"User logon failed: %r")
if load_profile:
pi = Win32_PROFILEINFO()
@ -439,8 +441,8 @@ class WindowsUtils(base.BaseOSUtils):
ret_val = userenv.LoadUserProfileW(token, ctypes.byref(pi))
if not ret_val:
kernel32.CloseHandle(token)
raise exception.CloudbaseInitException(
"Cannot load user profile")
raise exception.WindowsCloudbaseInitException(
"Cannot load user profile: %r")
return token
@ -465,7 +467,8 @@ class WindowsUtils(base.BaseOSUtils):
self.ComputerNamePhysicalDnsHostname,
six.text_type(new_host_name))
if not ret_val:
raise exception.CloudbaseInitException("Cannot set host name")
raise exception.WindowsCloudbaseInitException(
"Cannot set host name: %r")
return True
def get_network_adapters(self):
@ -823,8 +826,8 @@ class WindowsUtils(base.BaseOSUtils):
buf = ctypes.create_unicode_buffer(buf_size + 1)
buf_len = kernel32.GetLogicalDriveStringsW(buf_size, buf)
if not buf_len:
raise exception.CloudbaseInitException(
"GetLogicalDriveStringsW failed")
raise exception.WindowsCloudbaseInitException(
"GetLogicalDriveStringsW failed: %r")
return self._split_str_buf_list(buf, buf_len)
@ -865,8 +868,8 @@ class WindowsUtils(base.BaseOSUtils):
ctypes.byref(required_size), None):
if (kernel32.GetLastError() !=
self.ERROR_INSUFFICIENT_BUFFER):
raise exception.CloudbaseInitException(
"SetupDiGetDeviceInterfaceDetailW failed")
raise exception.WindowsCloudbaseInitException(
"SetupDiGetDeviceInterfaceDetailW failed: %r")
pdidd = ctypes.cast(
msvcrt.malloc(ctypes.c_size_t(required_size.value)),
@ -884,8 +887,8 @@ class WindowsUtils(base.BaseOSUtils):
if not setupapi.SetupDiGetDeviceInterfaceDetailW(
handle_disks, ctypes.byref(did), pdidd,
required_size, None, None):
raise exception.CloudbaseInitException(
"SetupDiGetDeviceInterfaceDetailW failed")
raise exception.WindowsCloudbaseInitException(
"SetupDiGetDeviceInterfaceDetailW failed: %r")
device_path = ctypes.cast(
pdidd.contents.DevicePath, wintypes.LPWSTR).value
@ -904,8 +907,8 @@ class WindowsUtils(base.BaseOSUtils):
handle_disk, self.IOCTL_STORAGE_GET_DEVICE_NUMBER,
None, 0, ctypes.byref(sdn), ctypes.sizeof(sdn),
ctypes.byref(b), None):
raise exception.CloudbaseInitException(
'DeviceIoControl failed')
raise exception.WindowsCloudbaseInitException(
'DeviceIoControl failed: %r')
physical_disks.append(
r"\\.\PHYSICALDRIVE%d" % sdn.DeviceNumber)

View File

@ -15,7 +15,6 @@
import importlib
import os
import unittest
try:
import unittest.mock as mock
@ -26,11 +25,12 @@ import six
from cloudbaseinit import exception
from cloudbaseinit.tests import fake
from cloudbaseinit.tests import testutils
CONF = cfg.CONF
class TestWindowsUtils(unittest.TestCase):
class TestWindowsUtils(testutils.CloudbaseInitTestBase):
'''Tests for the windows utils class.'''
_CONFIG_NAME = 'FakeConfig'
@ -101,14 +101,16 @@ class TestWindowsUtils(unittest.TestCase):
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'._enable_shutdown_privilege')
def _test_reboot(self, mock_enable_shutdown_privilege, ret_value):
def _test_reboot(self, mock_enable_shutdown_privilege, ret_value,
expected_ret_value=None):
advapi32 = self._windll_mock.advapi32
advapi32.InitiateSystemShutdownW = mock.MagicMock(
return_value=ret_value)
if not ret_value:
self.assertRaises(exception.CloudbaseInitException,
self._winutils.reboot)
with self.assert_raises_windows_message(
"Reboot failed: %r", expected_ret_value):
self._winutils.reboot()
else:
self._winutils.reboot()
@ -121,7 +123,7 @@ class TestWindowsUtils(unittest.TestCase):
self._test_reboot(ret_value=True)
def test_reboot_failed(self):
self._test_reboot(ret_value=None)
self._test_reboot(ret_value=None, expected_ret_value=100)
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'._sanitize_wmi_input')
@ -256,7 +258,7 @@ class TestWindowsUtils(unittest.TestCase):
def test_set_password_expiration_no_object(self):
self._test_set_user_password_expiration(fake_obj=None)
def _test_get_user_sid_and_domain(self, ret_val):
def _test_get_user_sid_and_domain(self, ret_val, last_error=None):
cbSid = mock.Mock()
sid = mock.Mock()
size = 1024
@ -272,10 +274,11 @@ class TestWindowsUtils(unittest.TestCase):
advapi32.LookupAccountNameW.return_value = ret_val
if ret_val is None:
self.assertRaises(
exception.CloudbaseInitException,
self._winutils._get_user_sid_and_domain,
self._USERNAME)
with self.assert_raises_windows_message(
"Cannot get user SID: %r",
last_error):
self._winutils._get_user_sid_and_domain(
self._USERNAME)
else:
response = self._winutils._get_user_sid_and_domain(self._USERNAME)
@ -291,7 +294,7 @@ class TestWindowsUtils(unittest.TestCase):
self._test_get_user_sid_and_domain(ret_val=fake_obj)
def test_get_user_sid_and_domain_no_return_value(self):
self._test_get_user_sid_and_domain(ret_val=None)
self._test_get_user_sid_and_domain(ret_val=None, last_error=100)
@mock.patch('cloudbaseinit.osutils.windows'
'.Win32_LOCALGROUP_MEMBERS_INFO_3')
@ -372,8 +375,8 @@ class TestWindowsUtils(unittest.TestCase):
@mock.patch('cloudbaseinit.osutils.windows.Win32_PROFILEINFO')
def _test_create_user_logon_session(self, mock_Win32_PROFILEINFO, logon,
loaduser,
load_profile=True):
loaduser, load_profile=True,
last_error=None):
self._wintypes_mock.HANDLE = mock.MagicMock()
pi = self.windows_utils.Win32_PROFILEINFO()
advapi32 = self._windll_mock.advapi32
@ -383,20 +386,20 @@ class TestWindowsUtils(unittest.TestCase):
advapi32.LogonUserW.return_value = logon
if not logon:
self.assertRaises(
exception.CloudbaseInitException,
self._winutils.create_user_logon_session,
self._USERNAME, self._PASSWORD, domain='.',
load_profile=load_profile)
with self.assert_raises_windows_message(
"User logon failed: %r", last_error):
self._winutils.create_user_logon_session(
self._USERNAME, self._PASSWORD, domain='.',
load_profile=load_profile)
elif load_profile and not loaduser:
userenv.LoadUserProfileW.return_value = None
kernel32.CloseHandle.return_value = None
self.assertRaises(exception.CloudbaseInitException,
self._winutils.create_user_logon_session,
self._USERNAME, self._PASSWORD, domain='.',
load_profile=load_profile)
with self.assert_raises_windows_message(
"Cannot load user profile: %r", last_error):
self._winutils.create_user_logon_session(
self._USERNAME, self._PASSWORD, domain='.',
load_profile=load_profile)
userenv.LoadUserProfileW.assert_called_with(
self._wintypes_mock.HANDLE.return_value,
@ -427,28 +430,38 @@ class TestWindowsUtils(unittest.TestCase):
self.assertTrue(response is not None)
def test_create_user_logon_session_fail_load_false(self):
self._test_create_user_logon_session(0, 0, True)
self._test_create_user_logon_session(logon=0, loaduser=0,
load_profile=True,
last_error=100)
def test_create_user_logon_session_fail_load_true(self):
self._test_create_user_logon_session(0, 0, False)
self._test_create_user_logon_session(logon=0, loaduser=0,
load_profile=False,
last_error=100)
def test_create_user_logon_session_load_true(self):
m = mock.Mock()
n = mock.Mock()
self._test_create_user_logon_session(m, n, True)
self._test_create_user_logon_session(logon=m, loaduser=n,
load_profile=True)
def test_create_user_logon_session_load_false(self):
m = mock.Mock()
n = mock.Mock()
self._test_create_user_logon_session(m, n, False)
self._test_create_user_logon_session(logon=m, loaduser=n,
load_profile=False)
def test_create_user_logon_session_no_load_true(self):
m = mock.Mock()
self._test_create_user_logon_session(m, None, True)
self._test_create_user_logon_session(logon=m, loaduser=None,
load_profile=True,
last_error=100)
def test_create_user_logon_session_no_load_false(self):
m = mock.Mock()
self._test_create_user_logon_session(m, None, False)
self._test_create_user_logon_session(logon=m, loaduser=None,
load_profile=False,
last_error=100)
def test_close_user_logon_session(self):
token = mock.Mock()
@ -459,12 +472,14 @@ class TestWindowsUtils(unittest.TestCase):
self._windll_mock.kernel32.CloseHandle.assert_called_with(token)
@mock.patch('ctypes.windll.kernel32.SetComputerNameExW')
def _test_set_host_name(self, mock_SetComputerNameExW, ret_value):
def _test_set_host_name(self, mock_SetComputerNameExW, ret_value,
last_error=None):
mock_SetComputerNameExW.return_value = ret_value
if not ret_value:
self.assertRaises(exception.CloudbaseInitException,
self._winutils.set_host_name, 'fake name')
with self.assert_raises_windows_message(
"Cannot set host name: %r", last_error):
self._winutils.set_host_name('fake name')
else:
self.assertTrue(self._winutils.set_host_name('fake name'))
@ -476,7 +491,7 @@ class TestWindowsUtils(unittest.TestCase):
self._test_set_host_name(ret_value='fake response')
def test_set_host_exception(self):
self._test_set_host_name(ret_value=None)
self._test_set_host_name(ret_value=None, last_error=100)
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'.get_user_sid')
@ -1013,7 +1028,7 @@ class TestWindowsUtils(unittest.TestCase):
mock_generate_random_password.assert_called_once_with(length)
self.assertEqual('Passw0rd', response)
def _test_get_logical_drives(self, buf_length):
def _test_get_logical_drives(self, buf_length, last_error=None):
mock_buf = mock.MagicMock()
mock_buf.__getitem__.side_effect = ['1', '\x00']
mock_get_drives = self._windll_mock.kernel32.GetLogicalDriveStringsW
@ -1022,9 +1037,9 @@ class TestWindowsUtils(unittest.TestCase):
mock_get_drives.return_value = buf_length
if buf_length is None:
self.assertRaises(exception.CloudbaseInitException,
self._winutils._get_logical_drives)
with self.assert_raises_windows_message(
"GetLogicalDriveStringsW failed: %r", last_error):
self._winutils._get_logical_drives()
else:
response = self._winutils._get_logical_drives()
@ -1033,7 +1048,7 @@ class TestWindowsUtils(unittest.TestCase):
self.assertEqual(['1'], response)
def test_get_logical_drives_exception(self):
self._test_get_logical_drives(buf_length=None)
self._test_get_logical_drives(buf_length=None, last_error=100)
def test_get_logical_drives(self):
self._test_get_logical_drives(buf_length=2)
@ -1056,7 +1071,8 @@ class TestWindowsUtils(unittest.TestCase):
@mock.patch('cloudbaseinit.osutils.windows.Win32_STORAGE_DEVICE_NUMBER')
def _test_get_physical_disks(self, mock_sdn, mock_setupapi, mock_kernel32,
mock_msvcrt, handle_disks, last_error,
interface_detail, disk_handle, io_control):
interface_detail, disk_handle, io_control,
last_error_code=None):
sizeof_calls = [
mock.call(
@ -1095,9 +1111,18 @@ class TestWindowsUtils(unittest.TestCase):
interface_detail) or (
disk_handle == self._winutils.INVALID_HANDLE_VALUE) or (
not io_control):
self.assertRaises(exception.CloudbaseInitException,
self._winutils.get_physical_disks)
if not io_control:
with self.assert_raises_windows_message(
"DeviceIoControl failed: %r", last_error_code):
self._winutils.get_physical_disks()
elif not interface_detail:
with self.assert_raises_windows_message(
"SetupDiGetDeviceInterfaceDetailW failed: %r",
last_error_code):
self._winutils.get_physical_disks()
else:
self.assertRaises(exception.CloudbaseInitException,
self._winutils.get_physical_disks)
else:
response = self._winutils.get_physical_disks()
@ -1163,6 +1188,7 @@ class TestWindowsUtils(unittest.TestCase):
self._test_get_physical_disks(
handle_disks=mock_handle_disks,
last_error='other', interface_detail=None,
last_error_code=100,
disk_handle=mock_disk_handle, io_control=True)
def test_get_physical_disks_invalid_disk_handle(self):
@ -1180,6 +1206,7 @@ class TestWindowsUtils(unittest.TestCase):
handle_disks=mock_handle_disks,
last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER,
interface_detail='fake interface detail',
last_error_code=100,
disk_handle=mock_disk_handle, io_control=False)
def test_get_physical_disks_handle_disks_invalid(self):

View File

@ -0,0 +1,35 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import unittest
from cloudbaseinit import exception
from cloudbaseinit.tests import testutils
WINDOWS = os.name == "nt"
@unittest.skipUnless(WINDOWS, "This requires the Windows platform.")
class TestException(testutils.CloudbaseInitTestBase):
def test_windows_exception_no_error_code_given(self):
with self.assert_raises_windows_message("Test %r", error_code=100):
raise exception.WindowsCloudbaseInitException("Test %r")
def test_windows_exception_error_code_given(self):
with self.assert_raises_windows_message("Test %r", error_code=100):
raise exception.WindowsCloudbaseInitException("Test %r",
error_code=100)

View File

@ -18,9 +18,16 @@ import logging as base_logging
import os
import shutil
import tempfile
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from oslo.config import cfg
from cloudbaseinit import exception
from cloudbaseinit.openstack.common import log as logging
@ -30,6 +37,7 @@ __all__ = (
'create_tempfile',
'create_tempdir',
'LogSnatcher',
'CloudbaseInitTestBase',
'ConfPatcher',
)
@ -145,3 +153,48 @@ class ConfPatcher(object):
def __exit__(self, exc_type, exc_val, exc_tb):
self._conf.set_override(self._key, self._original_value)
class CloudbaseInitTestBase(unittest.TestCase):
"""A test base class, which provides a couple of useful methods."""
@contextlib.contextmanager
def assert_raises_windows_message(
self, expected_msg, error_code,
exc=exception.WindowsCloudbaseInitException):
"""Helper method for testing raised error messages
This assert method is similar to :meth:`~assertRaises`, but
it can only be used as a context manager. It will check that the
block of the with statement raises an exception of type :class:`exc`,
having as message the result of the interpolation between
`expected_msg` and a formatted string, obtained through
`ctypes.FormatError(error_code)`.
"""
# Can't use the decorator form, since it will not be properly set
# after the function passes control with the `yield` (so the
# with statement block will have the original value, not the
# mocked one).
with self.assertRaises(exc) as cm:
with mock.patch('cloudbaseinit.exception.'
'ctypes.FormatError',
create=True) as mock_format_error:
with mock.patch('cloudbaseinit.exception.ctypes.'
'GetLastError',
create=True) as mock_get_last_error:
mock_format_error.return_value = "description"
yield
if mock_get_last_error.called:
# This can be called when the error code is not given,
# but we don't have control over that, so test that
# it's actually called only once.
mock_get_last_error.assert_called_once_with()
mock_format_error.assert_called_once_with(
mock_get_last_error.return_value)
else:
mock_format_error.assert_called_once_with(error_code)
expected_msg = expected_msg % mock_format_error.return_value
self.assertEqual(expected_msg, cm.exception.args[0])

View File

@ -13,7 +13,6 @@
# under the License.
import importlib
import unittest
try:
import unittest.mock as mock
@ -21,9 +20,10 @@ except ImportError:
import mock
from cloudbaseinit import exception as cbinit_exception
from cloudbaseinit.tests import testutils
class WindowsPhysicalDiskUtilsTests(unittest.TestCase):
class WindowsPhysicalDiskUtilsTests(testutils.CloudbaseInitTestBase):
def setUp(self):
self._ctypes_mock = mock.MagicMock()
@ -100,7 +100,8 @@ class WindowsPhysicalDiskUtilsTests(unittest.TestCase):
@mock.patch('cloudbaseinit.utils.windows.physical_disk'
'.Win32_DiskGeometry')
def _test_get_geometry(self, mock_Win32_DiskGeometry, _geom, ret_val):
def _test_get_geometry(self, mock_Win32_DiskGeometry, _geom, ret_val,
last_error=None):
mock_DeviceIoControl = self.physical_disk.kernel32.DeviceIoControl
expect_byref = [mock.call(mock_Win32_DiskGeometry.return_value),
mock.call(
@ -110,8 +111,9 @@ class WindowsPhysicalDiskUtilsTests(unittest.TestCase):
self.physical_disk.kernel32.DeviceIoControl.return_value = ret_val
if not ret_val:
self.assertRaises(cbinit_exception.CloudbaseInitException,
self._phys_disk_class.get_geometry)
with self.assert_raises_windows_message(
"Cannot get disk geometry: %r", last_error):
self._phys_disk_class.get_geometry()
elif _geom:
response = self._phys_disk_class.get_geometry()
self.assertEqual(_geom, response)
@ -142,10 +144,12 @@ class WindowsPhysicalDiskUtilsTests(unittest.TestCase):
def test_get_geometry_no_geom(self):
self._test_get_geometry(_geom=None,
ret_val=mock.sentinel.ret_val)
ret_val=mock.sentinel.ret_val,
last_error=100)
def test_get_geometry_no_geom_exception(self):
self._test_get_geometry(_geom=None, ret_val=None)
self._test_get_geometry(_geom=None, ret_val=None,
last_error=100)
def _test_seek(self, exception):
expect_DWORD = [mock.call(0), mock.call(1)]
@ -174,13 +178,14 @@ class WindowsPhysicalDiskUtilsTests(unittest.TestCase):
def test_seek_exception(self):
self._test_seek(exception=True)
def _test_read(self, ret_val):
def _test_read(self, ret_val, last_error=None):
bytes_to_read = mock.sentinel.bytes_to_read
self.physical_disk.kernel32.ReadFile.return_value = ret_val
if not ret_val:
self.assertRaises(cbinit_exception.CloudbaseInitException,
self._phys_disk_class.read, bytes_to_read)
with self.assert_raises_windows_message(
"Read exception: %r", last_error):
self._phys_disk_class.read(bytes_to_read)
else:
response = self._phys_disk_class.read(bytes_to_read)
@ -204,4 +209,4 @@ class WindowsPhysicalDiskUtilsTests(unittest.TestCase):
self._test_read(ret_val=mock.sentinel.ret_val)
def test_read_exception(self):
self._test_read(ret_val=None)
self._test_read(ret_val=None, last_error=100)

View File

@ -13,17 +13,16 @@
# under the License.
import importlib
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import exception
from cloudbaseinit.tests import testutils
class WindowsVirtualDiskUtilsTests(unittest.TestCase):
class WindowsVirtualDiskUtilsTests(testutils.CloudbaseInitTestBase):
def setUp(self):
self._ctypes_mock = mock.MagicMock()
@ -73,9 +72,10 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase):
self._vdisk_class._handle = handle
if ret_val:
self.assertRaises(exception.CloudbaseInitException,
self._vdisk_class.open)
with self.assert_raises_windows_message(
"Cannot open virtual disk: %r",
ret_val):
self._vdisk_class.open()
else:
self._vdisk_class.open()
if handle:
@ -95,7 +95,7 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase):
self._test_open(handle=None, ret_val=None)
def test_open_exception(self):
self._test_open(handle=None, ret_val=mock.sentinel.error_value)
self._test_open(handle=None, ret_val=100)
def test_open_handle_exists(self):
self._test_open(handle=None, ret_val=None)
@ -106,8 +106,10 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase):
virtdisk.AttachVirtualDisk.return_value = ret_val
if ret_val:
self.assertRaises(exception.CloudbaseInitException,
self._vdisk_class.attach)
with self.assert_raises_windows_message(
"Cannot attach virtual disk: %r",
ret_val):
self._vdisk_class.attach()
else:
self._vdisk_class.attach()
@ -119,7 +121,7 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase):
self._test_attach(ret_val=None)
def test_attach_exception(self):
self._test_attach(ret_val=mock.sentinel.error_value)
self._test_attach(ret_val=100)
def _test_detach(self, ret_val):
virtdisk = self._ctypes_mock.windll.virtdisk
@ -127,8 +129,9 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase):
virtdisk.DetachVirtualDisk.return_value = ret_val
if ret_val:
self.assertRaises(exception.CloudbaseInitException,
self._vdisk_class.detach)
with self.assert_raises_windows_message(
"Cannot detach virtual disk: %r", ret_val):
self._vdisk_class.detach()
else:
self._vdisk_class.detach()
@ -140,7 +143,7 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase):
self._test_detach(ret_val=None)
def test_detach_exception(self):
self._test_detach(ret_val=mock.sentinel.error_value)
self._test_detach(ret_val=100)
def _test_get_physical_path(self, ret_val):
virtdisk = self._ctypes_mock.windll.virtdisk
@ -150,8 +153,9 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase):
buf = self._ctypes_mock.create_unicode_buffer.return_value
if ret_val:
self.assertRaises(exception.CloudbaseInitException,
self._vdisk_class.get_physical_path)
with self.assert_raises_windows_message(
"Cannot get virtual disk physical path: %r", ret_val):
self._vdisk_class.get_physical_path()
else:
response = self._vdisk_class.get_physical_path()
self.assertEqual(buf.value, response)
@ -163,14 +167,23 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase):
buf)
virtdisk.GetVirtualDiskPhysicalPath.assert_called_once_with(
self._vdisk_class._handle, self._ctypes_mock.byref.return_value)
self._vdisk_class._handle,
self._ctypes_mock.byref.return_value,
self._ctypes_mock.create_unicode_buffer.return_value)
self._ctypes_mock.byref.assert_called_once_with(
self._ctypes_mock.wintypes.DWORD.return_value)
self._ctypes_mock.create_unicode_buffer.assert_called_once_with(1024)
def test_get_physical_path(self):
self._test_get_physical_path(ret_val=None)
def test_get_physical_path_fails(self):
self._test_get_physical_path(ret_val=100)
@mock.patch('cloudbaseinit.utils.windows.virtual_disk'
'.VirtualDisk.get_physical_path')
def _test_get_cdrom_drive_mount_point(self, mock_get_physical_path,
buf_len, ret_val):
buf_len, ret_val, last_error=None):
buf = self._ctypes_mock.create_unicode_buffer.return_value
kernel32 = self.virtual_disk.kernel32
kernel32.GetLogicalDriveStringsW.return_value = buf_len
@ -186,11 +199,13 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase):
expected_create_unicode_buffer = [mock.call(2048)]
if not buf_len:
self.assertRaises(exception.CloudbaseInitException,
self._vdisk_class.get_cdrom_drive_mount_point)
with self.assert_raises_windows_message(
"Cannot enumerate logical devices: %r", last_error):
self._vdisk_class.get_cdrom_drive_mount_point()
elif not ret_val:
self.assertRaises(exception.CloudbaseInitException,
self._vdisk_class.get_cdrom_drive_mount_point)
with self.assert_raises_windows_message(
"Cannot query NT device: %r", last_error):
self._vdisk_class.get_cdrom_drive_mount_point()
expected_create_unicode_buffer.append(mock.call(2048))
expected_sizeof.append(mock.call(self._ctypes_mock.wintypes.WCHAR))
@ -230,10 +245,12 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase):
kernel32.GetLogicalDriveStringsW.assert_called_once_with(1, buf)
def test_get_cdrom_drive_mount_point_exception_buf_len(self):
self._test_get_cdrom_drive_mount_point(buf_len=0, ret_val=1)
self._test_get_cdrom_drive_mount_point(buf_len=0, ret_val=1,
last_error=100)
def test_get_cdrom_drive_mount_point_exception_query(self):
self._test_get_cdrom_drive_mount_point(buf_len=1, ret_val=0)
self._test_get_cdrom_drive_mount_point(buf_len=1, ret_val=0,
last_error=100)
def test_get_cdrom_drive_mount_point(self):
self._test_get_cdrom_drive_mount_point(buf_len=1, ret_val=1)

View File

@ -99,7 +99,7 @@ def get_adapter_addresses():
if ret_val:
raise exception.CloudbaseInitException(
"GetAdaptersAddresses failed")
"GetAdaptersAddresses failed: %r" % ret_val)
p_curr_addr = p_addr
while p_curr_addr:

View File

@ -84,8 +84,8 @@ class PhysicalDisk(object):
ctypes.byref(bytes_returned),
0)
if not ret_val:
raise exception.CloudbaseInitException(
"Cannot get disk geometry")
raise exception.WindowsCloudbaseInitException(
"Cannot get disk geometry: %r")
self._geom = geom
return self._geom
@ -105,5 +105,6 @@ class PhysicalDisk(object):
ret_val = kernel32.ReadFile(self._handle, buf, bytes_to_read,
ctypes.byref(bytes_read), 0)
if not ret_val:
raise exception.CloudbaseInitException("Read exception")
raise exception.WindowsCloudbaseInitException(
"Read exception: %r")
return (buf, bytes_read.value)

View File

@ -84,22 +84,23 @@ class VirtualDisk(object):
self.OPEN_VIRTUAL_DISK_FLAG_NONE, 0,
ctypes.byref(handle))
if ret_val:
raise exception.CloudbaseInitException("Cannot open virtual disk")
raise exception.WindowsCloudbaseInitException(
"Cannot open virtual disk: %r", ret_val)
self._handle = handle
def attach(self):
ret_val = virtdisk.AttachVirtualDisk(
self._handle, 0, self.ATTACH_VIRTUAL_DISK_FLAG_READ_ONLY, 0, 0, 0)
if ret_val:
raise exception.CloudbaseInitException(
"Cannot attach virtual disk")
raise exception.WindowsCloudbaseInitException(
"Cannot attach virtual disk: %r", ret_val)
def detach(self):
ret_val = virtdisk.DetachVirtualDisk(
self._handle, self.DETACH_VIRTUAL_DISK_FLAG_NONE, 0)
if ret_val:
raise exception.CloudbaseInitException(
"Cannot detach virtual disk")
raise exception.WindowsCloudbaseInitException(
"Cannot detach virtual disk: %r", ret_val)
def get_physical_path(self):
buf = ctypes.create_unicode_buffer(1024)
@ -108,8 +109,8 @@ class VirtualDisk(object):
ctypes.byref(bufLen),
buf)
if ret_val:
raise exception.CloudbaseInitException(
"Cannot get virtual disk physical path")
raise exception.WindowsCloudbaseInitException(
"Cannot get virtual disk physical path: %r", ret_val)
return buf.value
def get_cdrom_drive_mount_point(self):
@ -120,8 +121,8 @@ class VirtualDisk(object):
buf_len = kernel32.GetLogicalDriveStringsW(
ctypes.sizeof(buf) / ctypes.sizeof(wintypes.WCHAR), buf)
if not buf_len:
raise exception.CloudbaseInitException(
"Cannot enumerate logical devices")
raise exception.WindowsCloudbaseInitException(
"Cannot enumerate logical devices: %r")
cdrom_dev = self.get_physical_path().rsplit('\\')[-1].upper()
@ -135,8 +136,8 @@ class VirtualDisk(object):
ctypes.sizeof(dev) /
ctypes.sizeof(wintypes.WCHAR))
if not ret_val:
raise exception.CloudbaseInitException(
"Cannot query NT device")
raise exception.WindowsCloudbaseInitException(
"Cannot query NT device: %r")
if dev.value.rsplit('\\')[-1].upper() == cdrom_dev:
mount_point = curr_drive