diff --git a/os_win/_utils.py b/os_win/_utils.py index e8f82a3c..f813d833 100644 --- a/os_win/_utils.py +++ b/os_win/_utils.py @@ -92,7 +92,8 @@ def get_wrapped_function(function): def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1, - max_sleep_time=1, exceptions=(), error_codes=()): + max_sleep_time=1, exceptions=(), error_codes=(), + pass_retry_context=False): """Retries invoking the decorated method in case of expected exceptions. :param max_retry_count: The maximum number of retries performed. If 0, no @@ -109,6 +110,13 @@ def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1, for example in case of Win32Exception. If this argument is not passed, retries will be performed for any of the expected exceptions. + :param pass_retry_context: Convenient way of letting a method aware of + this decorator prevent a retry from being + performed. The decorated method must accept an + argument called 'retry_context', which will + include a dict containing the 'prevent_retry' + field. If this field is set, no further retries + will be performed. """ if isinstance(error_codes, six.integer_types): @@ -120,6 +128,10 @@ def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1, sleep_time = 0 time_start = time.time() + retry_context = dict(prevent_retry=False) + if pass_retry_context: + kwargs['retry_context'] = retry_context + while True: try: return f(*args, **kwargs) @@ -137,6 +149,7 @@ def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1, else 'undefined') should_retry = ( + not retry_context['prevent_retry'] and expected_err_code and tries_left and (time_left == 'undefined' or diff --git a/os_win/constants.py b/os_win/constants.py index 861dd039..9acdbff3 100644 --- a/os_win/constants.py +++ b/os_win/constants.py @@ -161,3 +161,10 @@ DNS_ZONE_TRANSFER_ALLOWED_ANY_HOST = 0 DNS_ZONE_TRANSFER_ALLOWED_NAME_SERVERS = 1 DNS_ZONE_TRANSFER_ALLOWED_SECONDARY_SERVERS = 2 DNS_ZONE_TRANSFER_NOT_ALLOWED = 3 + +CLUSTER_GROUP_STATE_UNKNOWN = -1 +CLUSTER_GROUP_ONLINE = 0 +CLUSTER_GROUP_OFFLINE = 1 +CLUSTER_GROUP_FAILED = 2 +CLUSTER_GROUP_PARTIAL_ONLINE = 3 +CLUSTER_GROUP_PENDING = 4 diff --git a/os_win/exceptions.py b/os_win/exceptions.py index 8f59459e..5f641482 100644 --- a/os_win/exceptions.py +++ b/os_win/exceptions.py @@ -172,3 +172,18 @@ class DNSZoneAlreadyExists(DNSException): class JobTerminateFailed(HyperVException): msg_fmt = _("Could not terminate the requested job(s).") + + +class ClusterException(OSWinException): + pass + + +class ClusterWin32Exception(ClusterException, Win32Exception): + pass + + +class InvalidClusterGroupState(ClusterException): + msg_fmt = _("The cluster group %(group_name)s is in an invalid state. " + "Expected state %(expected_state)s. Expected owner node: " + "%(expected_node)s. Current group state: %(group_state)s. " + "Current owner node: %(owner_node)s.") diff --git a/os_win/tests/test_utils.py b/os_win/tests/test_utils.py index 273337e4..b2e31afe 100644 --- a/os_win/tests/test_utils.py +++ b/os_win/tests/test_utils.py @@ -159,6 +159,32 @@ class UtilsTestCase(base.BaseTestCase): self._test_retry_decorator_no_retry( expected_exceptions=(IOError, AttributeError)) + @mock.patch('time.sleep') + def test_retry_decorator_explicitly_avoid_retry(self, mock_sleep): + # Tests the case when there is a function aware of the retry + # decorator and explicitly requests that no retry should be + # performed. + + def func_side_effect(fake_arg, retry_context): + self.assertEqual(mock.sentinel.arg, fake_arg) + self.assertEqual(retry_context, dict(prevent_retry=False)) + + retry_context['prevent_retry'] = True + raise exceptions.Win32Exception(message='fake_exc', + error_code=1) + + fake_func, mock_side_effect = ( + self._get_fake_func_with_retry_decorator( + exceptions=exceptions.Win32Exception, + side_effect=func_side_effect, + pass_retry_context=True)) + + self.assertRaises(exceptions.Win32Exception, + fake_func, mock.sentinel.arg) + + self.assertEqual(1, mock_side_effect.call_count) + self.assertFalse(mock_sleep.called) + @mock.patch('socket.getaddrinfo') def test_get_ips(self, mock_getaddrinfo): ips = ['1.2.3.4', '5.6.7.8'] diff --git a/os_win/tests/utils/compute/test_clusapi_utils.py b/os_win/tests/utils/compute/test_clusapi_utils.py new file mode 100644 index 00000000..da9d7d6c --- /dev/null +++ b/os_win/tests/utils/compute/test_clusapi_utils.py @@ -0,0 +1,269 @@ +# Copyright 2016 Cloudbase Solutions Srl +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ctypes + +import ddt +import mock + +from os_win import constants +from os_win import exceptions +from os_win.tests import test_base +from os_win.utils.compute import _clusapi_utils + + +@ddt.ddt +class ClusApiUtilsTestCase(test_base.OsWinBaseTestCase): + def setUp(self): + super(ClusApiUtilsTestCase, self).setUp() + + self._clusapi = mock.patch.object( + _clusapi_utils, 'clusapi', create=True).start() + + self._clusapi_utils = _clusapi_utils.ClusApiUtils() + + self._run_patcher = mock.patch.object(self._clusapi_utils, + '_run_and_check_output') + self._mock_run = self._run_patcher.start() + + def _mock_ctypes(self): + self._ctypes = mock.Mock() + # This is used in order to easily make assertions on the variables + # passed by reference. + self._ctypes.byref = lambda x: (x, "byref") + self._ctypes.c_wchar_p = lambda x: (x, 'c_wchar_p') + self._ctypes.sizeof = lambda x: (x, 'sizeof') + + mock.patch.object(_clusapi_utils, 'ctypes', self._ctypes).start() + + def test_run_and_check_output(self): + self._clusapi_utils._win32utils = mock.Mock() + self._clusapi_utils._run_and_check_output = ( + self._run_patcher.temp_original) + + mock_win32utils_run_and_check_output = ( + self._clusapi_utils._win32utils.run_and_check_output) + + ret_val = self._clusapi_utils._run_and_check_output( + mock.sentinel.func, + mock.sentinel.arg, + fake_kwarg=mock.sentinel.kwarg) + + mock_win32utils_run_and_check_output.assert_called_once_with( + mock.sentinel.func, + mock.sentinel.arg, + fake_kwarg=mock.sentinel.kwarg, + failure_exc=exceptions.ClusterWin32Exception) + self.assertEqual(mock_win32utils_run_and_check_output.return_value, + ret_val) + + def test_dword_align(self): + self.assertEqual(8, self._clusapi_utils._dword_align(5)) + self.assertEqual(4, self._clusapi_utils._dword_align(4)) + + def test_get_clusprop_value_struct(self): + val_type = ctypes.c_ubyte * 3 + expected_padding_sz = 1 + + clusprop_val_struct = self._clusapi_utils._get_clusprop_value_struct( + val_type) + + expected_fields = [('syntax', _clusapi_utils.DWORD), + ('length', _clusapi_utils.DWORD), + ('value', val_type), + ('_padding', ctypes.c_ubyte * expected_padding_sz)] + self.assertEqual(expected_fields, clusprop_val_struct._fields_) + + def test_get_property_list_entry(self): + fake_prop_name = 'fake prop name' + fake_prop_syntax = 1 + fake_prop_val = (ctypes.c_wchar * 10)() + fake_prop_val.value = 'fake prop' + + entry = self._clusapi_utils.get_property_list_entry( + name=fake_prop_name, + syntax=fake_prop_syntax, + value=fake_prop_val) + + self.assertEqual(_clusapi_utils.CLUSPROP_SYNTAX_NAME, + entry.name.syntax) + self.assertEqual(fake_prop_name, + entry.name.value) + self.assertEqual( + ctypes.sizeof(ctypes.c_wchar) * (len(fake_prop_name) + 1), + entry.name.length) + + self.assertEqual(fake_prop_syntax, + entry.value.syntax) + self.assertEqual(bytearray(fake_prop_val), + bytearray(entry.value.value)) + self.assertEqual( + ctypes.sizeof(fake_prop_val), + entry.value.length) + + self.assertEqual(_clusapi_utils.CLUSPROP_SYNTAX_ENDMARK, + entry._endmark) + + def test_get_property_list(self): + entry_0 = self._clusapi_utils.get_property_list_entry( + name='fake prop name', + syntax=1, + value=ctypes.c_uint(2)) + entry_1 = self._clusapi_utils.get_property_list_entry( + name='fake prop name', + syntax=2, + value=ctypes.c_ubyte(5)) + + prop_list = self._clusapi_utils.get_property_list( + [entry_0, entry_1]) + + self.assertEqual(2, prop_list.count) + self.assertEqual(bytearray(entry_0) + bytearray(entry_1), + prop_list.entries_buff) + + @ddt.data('fake cluster name', None) + def test_open_cluster(self, cluster_name): + self._mock_ctypes() + + handle = self._clusapi_utils.open_cluster(cluster_name) + + expected_handle_arg = ( + self._ctypes.c_wchar_p(cluster_name) + if cluster_name else None) + self._mock_run.assert_called_once_with( + self._clusapi.OpenCluster, + expected_handle_arg, + **self._clusapi_utils._open_handle_check_flags) + + self.assertEqual(self._mock_run.return_value, handle) + + def test_open_cluster_group(self): + self._mock_ctypes() + + handle = self._clusapi_utils.open_cluster_group( + mock.sentinel.cluster_handle, + mock.sentinel.group_name) + + self._mock_run.assert_called_once_with( + self._clusapi.OpenClusterGroup, + mock.sentinel.cluster_handle, + self._ctypes.c_wchar_p(mock.sentinel.group_name), + **self._clusapi_utils._open_handle_check_flags) + + self.assertEqual(self._mock_run.return_value, handle) + + def test_open_cluster_node(self): + self._mock_ctypes() + + handle = self._clusapi_utils.open_cluster_node( + mock.sentinel.cluster_handle, + mock.sentinel.node_name) + + self._mock_run.assert_called_once_with( + self._clusapi.OpenClusterNode, + mock.sentinel.cluster_handle, + self._ctypes.c_wchar_p(mock.sentinel.node_name), + **self._clusapi_utils._open_handle_check_flags) + + self.assertEqual(self._mock_run.return_value, handle) + + def test_close_cluster(self): + self._clusapi_utils.close_cluster(mock.sentinel.handle) + self._clusapi.CloseCluster.assert_called_once_with( + mock.sentinel.handle) + + def test_close_cluster_group(self): + self._clusapi_utils.close_cluster_group(mock.sentinel.handle) + self._clusapi.CloseClusterGroup.assert_called_once_with( + mock.sentinel.handle) + + def test_close_cluster_node(self): + self._clusapi_utils.close_cluster_node(mock.sentinel.handle) + self._clusapi.CloseClusterNode.assert_called_once_with( + mock.sentinel.handle) + + def test_cancel_cluster_group_operation(self): + self._clusapi_utils.cancel_cluster_group_operation( + mock.sentinel.group_handle) + + self._mock_run.assert_called_once_with( + self._clusapi.CancelClusterGroupOperation, + mock.sentinel.group_handle, + 0, + ignored_error_codes=[_clusapi_utils.ERROR_IO_PENDING]) + + @ddt.data(mock.sentinel.prop_list, None) + def test_move_cluster_group(self, prop_list): + self._mock_ctypes() + + expected_prop_list_arg = ( + self._ctypes.byref(prop_list) if prop_list else None) + expected_prop_list_sz = ( + self._ctypes.sizeof(prop_list) if prop_list else 0) + + self._clusapi_utils.move_cluster_group( + mock.sentinel.group_handle, + mock.sentinel.dest_node_handle, + mock.sentinel.move_flags, + prop_list) + + self._mock_run.assert_called_once_with( + self._clusapi.MoveClusterGroupEx, + mock.sentinel.group_handle, + mock.sentinel.dest_node_handle, + mock.sentinel.move_flags, + expected_prop_list_arg, + expected_prop_list_sz, + ignored_error_codes=[_clusapi_utils.ERROR_IO_PENDING]) + + def test_get_cluster_group_state(self): + owner_node = 'fake owner node' + + def fake_get_state(inst, + group_handle, node_name_buff, node_name_len, + error_ret_vals, error_on_nonzero_ret_val, + ret_val_is_err_code): + self.assertEqual(mock.sentinel.group_handle, group_handle) + + # Those arguments would not normally get to the ClusApi + # function, instead being used by the helper invoking + # it and catching errors. For convenience, we validate + # those arguments at this point. + self.assertEqual([constants.CLUSTER_GROUP_STATE_UNKNOWN], + error_ret_vals) + self.assertFalse(error_on_nonzero_ret_val) + self.assertFalse(ret_val_is_err_code) + + node_name_len_arg = ctypes.cast( + node_name_len, + ctypes.POINTER(_clusapi_utils.DWORD)).contents + self.assertEqual(self._clusapi_utils._MAX_NODE_NAME, + node_name_len_arg.value) + + node_name_arg = ctypes.cast( + node_name_buff, + ctypes.POINTER( + ctypes.c_wchar * + self._clusapi_utils._MAX_NODE_NAME)).contents + node_name_arg.value = owner_node + return mock.sentinel.group_state + + self._mock_run.side_effect = fake_get_state + + state_info = self._clusapi_utils.get_cluster_group_state( + mock.sentinel.group_handle) + expected_state_info = dict(state=mock.sentinel.group_state, + owner_node=owner_node) + self.assertDictEqual(expected_state_info, state_info) diff --git a/os_win/tests/utils/compute/test_clusterutils.py b/os_win/tests/utils/compute/test_clusterutils.py index 8bbe945e..d078606a 100644 --- a/os_win/tests/utils/compute/test_clusterutils.py +++ b/os_win/tests/utils/compute/test_clusterutils.py @@ -13,13 +13,17 @@ # License for the specific language governing permissions and limitations # under the License. +import ddt import mock +from os_win import constants from os_win import exceptions from os_win.tests import test_base +from os_win.utils.compute import _clusapi_utils from os_win.utils.compute import clusterutils +@ddt.ddt class ClusterUtilsTestCase(test_base.OsWinBaseTestCase): """Unit tests for the Hyper-V ClusterUtilsBase class.""" @@ -34,6 +38,8 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase): self._clusterutils = clusterutils.ClusterUtils() self._clusterutils._conn_cluster = mock.MagicMock() self._clusterutils._cluster = mock.MagicMock() + self._clusterutils._clusapi_utils = mock.Mock() + self._clusapi = self._clusterutils._clusapi_utils def test_init_hyperv_conn(self): fake_cluster_name = "fake_cluster" @@ -277,24 +283,138 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase): @mock.patch.object(clusterutils.ClusterUtils, '_migrate_vm') def test_live_migrate_vm(self, mock_migrate_vm): self._clusterutils.live_migrate_vm(self._FAKE_VM_NAME, - self._FAKE_HOST) + self._FAKE_HOST, + mock.sentinel.timeout) + + exp_valid_transition_states = [constants.CLUSTER_GROUP_PENDING] mock_migrate_vm.assert_called_once_with( self._FAKE_VM_NAME, self._FAKE_HOST, + self._clusterutils._LIVE_MIGRATION_TYPE, + constants.CLUSTER_GROUP_ONLINE, + exp_valid_transition_states, + mock.sentinel.timeout) + + @mock.patch.object(_clusapi_utils, 'DWORD') + @mock.patch.object(clusterutils.ClusterUtils, + '_wait_for_cluster_group_state') + @ddt.data(None, exceptions.ClusterException) + def test_migrate_vm(self, raised_exc, mock_wait_group, mock_dword): + mock_wait_group.side_effect = raised_exc + + migrate_args = (self._FAKE_VM_NAME, + self._FAKE_HOST, + self._clusterutils._LIVE_MIGRATION_TYPE, + constants.CLUSTER_GROUP_ONLINE, + mock.sentinel.valid_transition_states, + mock.sentinel.timeout) + + if raised_exc: + self.assertRaises(raised_exc, + self._clusterutils._migrate_vm, + *migrate_args) + else: + self._clusterutils._migrate_vm(*migrate_args) + + mock_dword.assert_called_once_with( self._clusterutils._LIVE_MIGRATION_TYPE) - @mock.patch.object(clusterutils.ClusterUtils, '_lookup_vm_group_check') - def test_migrate_vm(self, mock_lookup_vm_group_check): - vm_group = mock.MagicMock() - mock_lookup_vm_group_check.return_value = vm_group + self._clusapi.get_property_list_entry.assert_has_calls( + [mock.call(prop_name, + _clusapi_utils.CLUSPROP_SYNTAX_LIST_VALUE_DWORD, + mock_dword.return_value) + for prop_name in (_clusapi_utils.CLUSPROP_NAME_VM, + _clusapi_utils.CLUSPROP_NAME_VM_CONFIG)]) - self._clusterutils._migrate_vm( - self._FAKE_VM_NAME, self._FAKE_HOST, - self._clusterutils._LIVE_MIGRATION_TYPE) + expected_prop_entries = [ + self._clusapi.get_property_list_entry.return_value] * 2 + self._clusapi.get_property_list.assert_called_once_with( + expected_prop_entries) - vm_group.MoveToNewNodeParams.assert_called_once_with( - self._clusterutils._IGNORE_LOCKED, + expected_migrate_flags = ( + _clusapi_utils.CLUSAPI_GROUP_MOVE_RETURN_TO_SOURCE_NODE_ON_ERROR | + _clusapi_utils.CLUSAPI_GROUP_MOVE_QUEUE_ENABLED | + _clusapi_utils.CLUSAPI_GROUP_MOVE_HIGH_PRIORITY_START) + + exp_clus_h = self._clusapi.open_cluster.return_value + exp_clus_node_h = self._clusapi.open_cluster_node.return_value + exp_clus_group_h = self._clusapi.open_cluster_group.return_value + + self._clusapi.open_cluster.assert_called_once_with() + self._clusapi.open_cluster_group.assert_called_once_with( + exp_clus_h, self._FAKE_VM_NAME) + self._clusapi.open_cluster_node.assert_called_once_with( + exp_clus_h, self._FAKE_HOST) + + self._clusapi.move_cluster_group.assert_called_once_with( + exp_clus_group_h, exp_clus_node_h, expected_migrate_flags, + self._clusapi.get_property_list.return_value) + + mock_wait_group.assert_called_once_with( + self._FAKE_VM_NAME, exp_clus_group_h, + constants.CLUSTER_GROUP_ONLINE, self._FAKE_HOST, - [self._clusterutils._LIVE_MIGRATION_TYPE]) + mock.sentinel.valid_transition_states, + mock.sentinel.timeout) + + self._clusapi.close_cluster_group.assert_called_once_with( + exp_clus_group_h) + self._clusapi.close_cluster_node.assert_called_once_with( + exp_clus_node_h) + self._clusapi.close_cluster.assert_called_once_with(exp_clus_h) + + @mock.patch.object(clusterutils._utils, 'time') + def test_wait_for_clus_group_state_failed(self, mock_time): + desired_host = self._FAKE_HOST + desired_state = constants.CLUSTER_GROUP_ONLINE + valid_transition_states = [constants.CLUSTER_GROUP_PENDING] + + group_states = [dict(owner_node='other node', + state=desired_state), + dict(owner_node=desired_host, + state=constants.CLUSTER_GROUP_PENDING), + dict(owner_node=desired_host, + state=constants.CLUSTER_GROUP_FAILED)] + self._clusapi.get_cluster_group_state.side_effect = group_states + + # We don't want a timeout to be raised. We expect the tested + # function to force breaking the retry loop when the cluster + # group gets into a 'failed' state. + # + # As a precaution measure, we're still forcing a timeout at + # some point, to avoid an infinite loop if something goes wrong. + mock_time.time.side_effect = [0] * 10 + [100] + + self.assertRaises(exceptions.InvalidClusterGroupState, + self._clusterutils._wait_for_cluster_group_state, + mock.sentinel.group_name, + mock.sentinel.group_handle, + desired_state, + desired_host, + valid_transition_states, + timeout=10) + + self._clusapi.get_cluster_group_state.assert_has_calls( + [mock.call(mock.sentinel.group_handle)] * 3) + + @mock.patch.object(clusterutils._utils, 'time') + def test_wait_for_clus_group_state_success(self, mock_time): + desired_host = self._FAKE_HOST + desired_state = constants.CLUSTER_GROUP_ONLINE + + group_state = dict(owner_node=desired_host.upper(), + state=desired_state) + self._clusapi.get_cluster_group_state.return_value = group_state + + self._clusterutils._wait_for_cluster_group_state( + mock.sentinel.group_name, + mock.sentinel.group_handle, + desired_state, + desired_host, + [], + timeout=10) + + self._clusapi.get_cluster_group_state.assert_called_once_with( + mock.sentinel.group_handle) @mock.patch.object(clusterutils, 'tpool') @mock.patch.object(clusterutils, 'patcher') diff --git a/os_win/utils/compute/_clusapi_utils.py b/os_win/utils/compute/_clusapi_utils.py new file mode 100644 index 00000000..527499cf --- /dev/null +++ b/os_win/utils/compute/_clusapi_utils.py @@ -0,0 +1,197 @@ +# Copyright 2016 Cloudbase Solutions Srl +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ctypes +import sys + +if sys.platform == 'win32': + clusapi = ctypes.windll.clusapi + +from os_win import constants +from os_win import exceptions +from os_win.utils import win32utils + +DWORD = ctypes.c_ulong + +CLUSPROP_SYNTAX_NAME = 262147 +CLUSPROP_SYNTAX_ENDMARK = 0 +CLUSPROP_SYNTAX_LIST_VALUE_DWORD = 65538 + +CLUSAPI_GROUP_MOVE_RETURN_TO_SOURCE_NODE_ON_ERROR = 2 +CLUSAPI_GROUP_MOVE_QUEUE_ENABLED = 4 +CLUSAPI_GROUP_MOVE_HIGH_PRIORITY_START = 8 + +ERROR_IO_PENDING = 997 + +CLUSPROP_NAME_VM = 'Virtual Machine' +CLUSPROP_NAME_VM_CONFIG = 'Virtual Machine Configuration' + + +class ClusApiUtils(object): + _MAX_NODE_NAME = 255 + + _open_handle_check_flags = dict(ret_val_is_err_code=False, + error_on_nonzero_ret_val=False, + error_ret_vals=[0, None]) + + def __init__(self): + self._win32utils = win32utils.Win32Utils() + + def _run_and_check_output(self, *args, **kwargs): + kwargs['failure_exc'] = exceptions.ClusterWin32Exception + return self._win32utils.run_and_check_output(*args, **kwargs) + + def _dword_align(self, value): + return (value + 3) & ~3 + + def _get_clusprop_value_struct(self, val_type): + def _get_padding(): + # The cluster property entries must be 4B aligned. + val_sz = ctypes.sizeof(val_type) + return self._dword_align(val_sz) - val_sz + + # For convenience, as opposed to the homonymous ClusAPI + # structure, we add the actual value as well. + class CLUSPROP_VALUE(ctypes.Structure): + _fields_ = [('syntax', DWORD), + ('length', DWORD), + ('value', val_type), + ('_padding', ctypes.c_ubyte * _get_padding())] + return CLUSPROP_VALUE + + def get_property_list_entry(self, name, syntax, value): + # The value argument must have a ctypes type. + name_len = len(name) + 1 + val_sz = ctypes.sizeof(value) + + class CLUSPROP_LIST_ENTRY(ctypes.Structure): + _fields_ = [ + ('name', self._get_clusprop_value_struct( + val_type=ctypes.c_wchar * name_len)), + ('value', self._get_clusprop_value_struct( + val_type=ctypes.c_ubyte * val_sz)), + ('_endmark', DWORD) + ] + + entry = CLUSPROP_LIST_ENTRY() + entry.name.syntax = CLUSPROP_SYNTAX_NAME + entry.name.length = name_len * ctypes.sizeof(ctypes.c_wchar) + entry.name.value = name + + entry.value.syntax = syntax + entry.value.length = val_sz + entry.value.value[0:val_sz] = bytearray(value) + + entry._endmark = CLUSPROP_SYNTAX_ENDMARK + + return entry + + def get_property_list(self, property_entries): + prop_entries_sz = sum([ctypes.sizeof(entry) + for entry in property_entries]) + + class CLUSPROP_LIST(ctypes.Structure): + _fields_ = [('count', DWORD), + ('entries_buff', ctypes.c_ubyte * prop_entries_sz)] + + prop_list = CLUSPROP_LIST(count=len(property_entries)) + + pos = 0 + for prop_entry in property_entries: + prop_entry_sz = ctypes.sizeof(prop_entry) + prop_list.entries_buff[pos:prop_entry_sz + pos] = bytearray( + prop_entry) + pos += prop_entry_sz + + return prop_list + + def open_cluster(self, cluster_name=None): + """Returns a handle for the requested cluster. + + :param cluster_name: (Optional) specifies the name of the cluster + to be opened. If None, the cluster that the + local node belongs to will be opened. + """ + p_clus_name = ctypes.c_wchar_p(cluster_name) if cluster_name else None + handle = self._run_and_check_output(clusapi.OpenCluster, + p_clus_name, + **self._open_handle_check_flags) + return handle + + def open_cluster_group(self, cluster_handle, group_name): + handle = self._run_and_check_output(clusapi.OpenClusterGroup, + cluster_handle, + ctypes.c_wchar_p(group_name), + **self._open_handle_check_flags) + return handle + + def open_cluster_node(self, cluster_handle, node_name): + handle = self._run_and_check_output(clusapi.OpenClusterNode, + cluster_handle, + ctypes.c_wchar_p(node_name), + **self._open_handle_check_flags) + return handle + + def close_cluster(self, cluster_handle): + # This function will always return 'True'. Closing the cluster + # handle will also invalidate handles opened using it. + clusapi.CloseCluster(cluster_handle) + + def close_cluster_group(self, group_handle): + # TODO(lpetrut): The following functions can fail, in which case + # 'False' will be returned. We may want to handle this situation. + clusapi.CloseClusterGroup(group_handle) + + def close_cluster_node(self, node_handle): + clusapi.CloseClusterNode(node_handle) + + def cancel_cluster_group_operation(self, group_handle): + """Requests a pending move operation to be canceled.""" + # This only applies to move operations requested by + # MoveClusterGroup(Ex), thus it will not apply to fail overs. + self._run_and_check_output( + clusapi.CancelClusterGroupOperation, + group_handle, + 0, # cancel flags (reserved for future use by MS) + ignored_error_codes=[ERROR_IO_PENDING]) + + def move_cluster_group(self, group_handle, destination_node_handle, + move_flags, property_list): + prop_list_p = ctypes.byref(property_list) if property_list else None + prop_list_sz = ctypes.sizeof(property_list) if property_list else 0 + + self._run_and_check_output(clusapi.MoveClusterGroupEx, + group_handle, + destination_node_handle, + move_flags, + prop_list_p, + prop_list_sz, + ignored_error_codes=[ERROR_IO_PENDING]) + + def get_cluster_group_state(self, group_handle): + node_name_len = DWORD(self._MAX_NODE_NAME) + node_name_buff = (ctypes.c_wchar * node_name_len.value)() + + group_state = self._run_and_check_output( + clusapi.GetClusterGroupState, + group_handle, + ctypes.byref(node_name_buff), + ctypes.byref(node_name_len), + error_ret_vals=[constants.CLUSTER_GROUP_STATE_UNKNOWN], + error_on_nonzero_ret_val=False, + ret_val_is_err_code=False) + + return {'state': group_state, + 'owner_node': node_name_buff.value} diff --git a/os_win/utils/compute/clusterutils.py b/os_win/utils/compute/clusterutils.py index 92dcb68d..0bf7d17b 100644 --- a/os_win/utils/compute/clusterutils.py +++ b/os_win/utils/compute/clusterutils.py @@ -25,8 +25,11 @@ from eventlet import tpool from oslo_log import log as logging from os_win._i18n import _, _LE +from os_win import _utils +from os_win import constants from os_win import exceptions from os_win.utils import baseutils +from os_win.utils.compute import _clusapi_utils LOG = logging.getLogger(__name__) @@ -55,6 +58,7 @@ class ClusterUtils(baseutils.BaseUtils): def __init__(self, host='.'): self._instance_name_regex = re.compile('Virtual Machine (.*)') + self._clusapi_utils = _clusapi_utils.ClusApiUtils() if sys.platform == 'win32': self._init_hyperv_conn(host) @@ -178,20 +182,90 @@ class ClusterUtils(baseutils.BaseUtils): def vm_exists(self, vm_name): return self._lookup_vm(vm_name) is not None - def live_migrate_vm(self, vm_name, new_host): - self._migrate_vm(vm_name, new_host, self._LIVE_MIGRATION_TYPE) + def live_migrate_vm(self, vm_name, new_host, timeout=None): + valid_transition_states = [constants.CLUSTER_GROUP_PENDING] + self._migrate_vm(vm_name, new_host, self._LIVE_MIGRATION_TYPE, + constants.CLUSTER_GROUP_ONLINE, + valid_transition_states, + timeout) + + def _migrate_vm(self, vm_name, new_host, migration_type, + exp_state_after_migr, valid_transition_states, + timeout): + syntax = _clusapi_utils.CLUSPROP_SYNTAX_LIST_VALUE_DWORD + migr_type = _clusapi_utils.DWORD(migration_type) + + prop_entries = [ + self._clusapi_utils.get_property_list_entry( + _clusapi_utils.CLUSPROP_NAME_VM, syntax, migr_type), + self._clusapi_utils.get_property_list_entry( + _clusapi_utils.CLUSPROP_NAME_VM_CONFIG, syntax, migr_type) + ] + prop_list = self._clusapi_utils.get_property_list(prop_entries) + + flags = ( + _clusapi_utils.CLUSAPI_GROUP_MOVE_RETURN_TO_SOURCE_NODE_ON_ERROR | + _clusapi_utils.CLUSAPI_GROUP_MOVE_QUEUE_ENABLED | + _clusapi_utils.CLUSAPI_GROUP_MOVE_HIGH_PRIORITY_START) + + cluster_handle = None + group_handle = None + dest_node_handle = None - def _migrate_vm(self, vm_name, new_host, migration_type): - vm_group = self._lookup_vm_group_check(vm_name) try: - vm_group.MoveToNewNodeParams(self._IGNORE_LOCKED, new_host, - [migration_type]) - except Exception as e: - LOG.error(_LE('Exception during cluster live migration of ' - '%(vm_name)s to %(host)s: %(exception)s'), - {'vm_name': vm_name, - 'host': new_host, - 'exception': e}) + cluster_handle = self._clusapi_utils.open_cluster() + group_handle = self._clusapi_utils.open_cluster_group( + cluster_handle, vm_name) + dest_node_handle = self._clusapi_utils.open_cluster_node( + cluster_handle, new_host) + + self._clusapi_utils.move_cluster_group(group_handle, + dest_node_handle, + flags, + prop_list) + self._wait_for_cluster_group_state(vm_name, + group_handle, + exp_state_after_migr, + new_host, + valid_transition_states, + timeout) + finally: + if group_handle: + self._clusapi_utils.close_cluster_group(group_handle) + if dest_node_handle: + self._clusapi_utils.close_cluster_node(dest_node_handle) + if cluster_handle: + self._clusapi_utils.close_cluster(cluster_handle) + + def _wait_for_cluster_group_state(self, group_name, group_handle, + desired_state, desired_node, + valid_transition_states, timeout): + @_utils.retry_decorator(max_retry_count=None, + timeout=timeout, + exceptions=exceptions.InvalidClusterGroupState, + pass_retry_context=True) + def _ensure_group_state(retry_context): + state_info = self._clusapi_utils.get_cluster_group_state( + group_handle) + owner_node = state_info['owner_node'] + group_state = state_info['state'] + + reached_desired_state = desired_state == group_state + reached_desired_node = desired_node.lower() == owner_node.lower() + + if not (reached_desired_state and reached_desired_node): + valid_states = [desired_state] + valid_transition_states + valid_state = group_state in valid_states + retry_context['prevent_retry'] = not valid_state + + raise exceptions.InvalidClusterGroupState( + group_name=group_name, + expected_state=desired_state, + expected_node=desired_node, + group_state=group_state, + owner_node=owner_node) + + _ensure_group_state() def monitor_vm_failover(self, callback): """Creates a monitor to check for new WMI MSCluster_Resource