From f3de712905b7e784d5cfa061cbf9a27215a43391 Mon Sep 17 00:00:00 2001 From: Lucian Petrut Date: Fri, 26 Aug 2016 13:18:08 +0300 Subject: [PATCH] Fix clustered vm live migration At the moment, in order to migrate a clustered VM, we move the according cluster resource group using the MoveToNewNodeParams WMI method. This method accepts an undocumented 'Parameters' argument, which is then passed to the underlying MoveClusterGroupEx clusapi.dll function. While the latter expects a property list as described at the following link, we were passing the desired migration type directly. https://msdn.microsoft.com/en-us/library/aa371809(v=vs.85).aspx This worked on Windows Server 2012R2 but is causing issues on WS 2016. Resource group end up hanging in a 'Pending' state. This change addresses this issue by using clusapi.dll functions directly, as the according WMI methods are not working, throwing a generic error even though the VMs are migrated properly. Closes-Bug: #1618425 Change-Id: Idfcd3505cbbf2754addeba4f1ebeb880f3b9a56b --- os_win/_utils.py | 15 +- os_win/constants.py | 7 + os_win/exceptions.py | 15 + os_win/tests/test_utils.py | 26 ++ .../tests/utils/compute/test_clusapi_utils.py | 269 ++++++++++++++++++ .../tests/utils/compute/test_clusterutils.py | 142 ++++++++- os_win/utils/compute/_clusapi_utils.py | 197 +++++++++++++ os_win/utils/compute/clusterutils.py | 98 ++++++- 8 files changed, 745 insertions(+), 24 deletions(-) create mode 100644 os_win/tests/utils/compute/test_clusapi_utils.py create mode 100644 os_win/utils/compute/_clusapi_utils.py diff --git a/os_win/_utils.py b/os_win/_utils.py index e8f82a3c..f813d833 100644 --- a/os_win/_utils.py +++ b/os_win/_utils.py @@ -92,7 +92,8 @@ def get_wrapped_function(function): def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1, - max_sleep_time=1, exceptions=(), error_codes=()): + max_sleep_time=1, exceptions=(), error_codes=(), + pass_retry_context=False): """Retries invoking the decorated method in case of expected exceptions. :param max_retry_count: The maximum number of retries performed. If 0, no @@ -109,6 +110,13 @@ def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1, for example in case of Win32Exception. If this argument is not passed, retries will be performed for any of the expected exceptions. + :param pass_retry_context: Convenient way of letting a method aware of + this decorator prevent a retry from being + performed. The decorated method must accept an + argument called 'retry_context', which will + include a dict containing the 'prevent_retry' + field. If this field is set, no further retries + will be performed. """ if isinstance(error_codes, six.integer_types): @@ -120,6 +128,10 @@ def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1, sleep_time = 0 time_start = time.time() + retry_context = dict(prevent_retry=False) + if pass_retry_context: + kwargs['retry_context'] = retry_context + while True: try: return f(*args, **kwargs) @@ -137,6 +149,7 @@ def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1, else 'undefined') should_retry = ( + not retry_context['prevent_retry'] and expected_err_code and tries_left and (time_left == 'undefined' or diff --git a/os_win/constants.py b/os_win/constants.py index 861dd039..9acdbff3 100644 --- a/os_win/constants.py +++ b/os_win/constants.py @@ -161,3 +161,10 @@ DNS_ZONE_TRANSFER_ALLOWED_ANY_HOST = 0 DNS_ZONE_TRANSFER_ALLOWED_NAME_SERVERS = 1 DNS_ZONE_TRANSFER_ALLOWED_SECONDARY_SERVERS = 2 DNS_ZONE_TRANSFER_NOT_ALLOWED = 3 + +CLUSTER_GROUP_STATE_UNKNOWN = -1 +CLUSTER_GROUP_ONLINE = 0 +CLUSTER_GROUP_OFFLINE = 1 +CLUSTER_GROUP_FAILED = 2 +CLUSTER_GROUP_PARTIAL_ONLINE = 3 +CLUSTER_GROUP_PENDING = 4 diff --git a/os_win/exceptions.py b/os_win/exceptions.py index 8f59459e..5f641482 100644 --- a/os_win/exceptions.py +++ b/os_win/exceptions.py @@ -172,3 +172,18 @@ class DNSZoneAlreadyExists(DNSException): class JobTerminateFailed(HyperVException): msg_fmt = _("Could not terminate the requested job(s).") + + +class ClusterException(OSWinException): + pass + + +class ClusterWin32Exception(ClusterException, Win32Exception): + pass + + +class InvalidClusterGroupState(ClusterException): + msg_fmt = _("The cluster group %(group_name)s is in an invalid state. " + "Expected state %(expected_state)s. Expected owner node: " + "%(expected_node)s. Current group state: %(group_state)s. " + "Current owner node: %(owner_node)s.") diff --git a/os_win/tests/test_utils.py b/os_win/tests/test_utils.py index 273337e4..b2e31afe 100644 --- a/os_win/tests/test_utils.py +++ b/os_win/tests/test_utils.py @@ -159,6 +159,32 @@ class UtilsTestCase(base.BaseTestCase): self._test_retry_decorator_no_retry( expected_exceptions=(IOError, AttributeError)) + @mock.patch('time.sleep') + def test_retry_decorator_explicitly_avoid_retry(self, mock_sleep): + # Tests the case when there is a function aware of the retry + # decorator and explicitly requests that no retry should be + # performed. + + def func_side_effect(fake_arg, retry_context): + self.assertEqual(mock.sentinel.arg, fake_arg) + self.assertEqual(retry_context, dict(prevent_retry=False)) + + retry_context['prevent_retry'] = True + raise exceptions.Win32Exception(message='fake_exc', + error_code=1) + + fake_func, mock_side_effect = ( + self._get_fake_func_with_retry_decorator( + exceptions=exceptions.Win32Exception, + side_effect=func_side_effect, + pass_retry_context=True)) + + self.assertRaises(exceptions.Win32Exception, + fake_func, mock.sentinel.arg) + + self.assertEqual(1, mock_side_effect.call_count) + self.assertFalse(mock_sleep.called) + @mock.patch('socket.getaddrinfo') def test_get_ips(self, mock_getaddrinfo): ips = ['1.2.3.4', '5.6.7.8'] diff --git a/os_win/tests/utils/compute/test_clusapi_utils.py b/os_win/tests/utils/compute/test_clusapi_utils.py new file mode 100644 index 00000000..da9d7d6c --- /dev/null +++ b/os_win/tests/utils/compute/test_clusapi_utils.py @@ -0,0 +1,269 @@ +# Copyright 2016 Cloudbase Solutions Srl +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ctypes + +import ddt +import mock + +from os_win import constants +from os_win import exceptions +from os_win.tests import test_base +from os_win.utils.compute import _clusapi_utils + + +@ddt.ddt +class ClusApiUtilsTestCase(test_base.OsWinBaseTestCase): + def setUp(self): + super(ClusApiUtilsTestCase, self).setUp() + + self._clusapi = mock.patch.object( + _clusapi_utils, 'clusapi', create=True).start() + + self._clusapi_utils = _clusapi_utils.ClusApiUtils() + + self._run_patcher = mock.patch.object(self._clusapi_utils, + '_run_and_check_output') + self._mock_run = self._run_patcher.start() + + def _mock_ctypes(self): + self._ctypes = mock.Mock() + # This is used in order to easily make assertions on the variables + # passed by reference. + self._ctypes.byref = lambda x: (x, "byref") + self._ctypes.c_wchar_p = lambda x: (x, 'c_wchar_p') + self._ctypes.sizeof = lambda x: (x, 'sizeof') + + mock.patch.object(_clusapi_utils, 'ctypes', self._ctypes).start() + + def test_run_and_check_output(self): + self._clusapi_utils._win32utils = mock.Mock() + self._clusapi_utils._run_and_check_output = ( + self._run_patcher.temp_original) + + mock_win32utils_run_and_check_output = ( + self._clusapi_utils._win32utils.run_and_check_output) + + ret_val = self._clusapi_utils._run_and_check_output( + mock.sentinel.func, + mock.sentinel.arg, + fake_kwarg=mock.sentinel.kwarg) + + mock_win32utils_run_and_check_output.assert_called_once_with( + mock.sentinel.func, + mock.sentinel.arg, + fake_kwarg=mock.sentinel.kwarg, + failure_exc=exceptions.ClusterWin32Exception) + self.assertEqual(mock_win32utils_run_and_check_output.return_value, + ret_val) + + def test_dword_align(self): + self.assertEqual(8, self._clusapi_utils._dword_align(5)) + self.assertEqual(4, self._clusapi_utils._dword_align(4)) + + def test_get_clusprop_value_struct(self): + val_type = ctypes.c_ubyte * 3 + expected_padding_sz = 1 + + clusprop_val_struct = self._clusapi_utils._get_clusprop_value_struct( + val_type) + + expected_fields = [('syntax', _clusapi_utils.DWORD), + ('length', _clusapi_utils.DWORD), + ('value', val_type), + ('_padding', ctypes.c_ubyte * expected_padding_sz)] + self.assertEqual(expected_fields, clusprop_val_struct._fields_) + + def test_get_property_list_entry(self): + fake_prop_name = 'fake prop name' + fake_prop_syntax = 1 + fake_prop_val = (ctypes.c_wchar * 10)() + fake_prop_val.value = 'fake prop' + + entry = self._clusapi_utils.get_property_list_entry( + name=fake_prop_name, + syntax=fake_prop_syntax, + value=fake_prop_val) + + self.assertEqual(_clusapi_utils.CLUSPROP_SYNTAX_NAME, + entry.name.syntax) + self.assertEqual(fake_prop_name, + entry.name.value) + self.assertEqual( + ctypes.sizeof(ctypes.c_wchar) * (len(fake_prop_name) + 1), + entry.name.length) + + self.assertEqual(fake_prop_syntax, + entry.value.syntax) + self.assertEqual(bytearray(fake_prop_val), + bytearray(entry.value.value)) + self.assertEqual( + ctypes.sizeof(fake_prop_val), + entry.value.length) + + self.assertEqual(_clusapi_utils.CLUSPROP_SYNTAX_ENDMARK, + entry._endmark) + + def test_get_property_list(self): + entry_0 = self._clusapi_utils.get_property_list_entry( + name='fake prop name', + syntax=1, + value=ctypes.c_uint(2)) + entry_1 = self._clusapi_utils.get_property_list_entry( + name='fake prop name', + syntax=2, + value=ctypes.c_ubyte(5)) + + prop_list = self._clusapi_utils.get_property_list( + [entry_0, entry_1]) + + self.assertEqual(2, prop_list.count) + self.assertEqual(bytearray(entry_0) + bytearray(entry_1), + prop_list.entries_buff) + + @ddt.data('fake cluster name', None) + def test_open_cluster(self, cluster_name): + self._mock_ctypes() + + handle = self._clusapi_utils.open_cluster(cluster_name) + + expected_handle_arg = ( + self._ctypes.c_wchar_p(cluster_name) + if cluster_name else None) + self._mock_run.assert_called_once_with( + self._clusapi.OpenCluster, + expected_handle_arg, + **self._clusapi_utils._open_handle_check_flags) + + self.assertEqual(self._mock_run.return_value, handle) + + def test_open_cluster_group(self): + self._mock_ctypes() + + handle = self._clusapi_utils.open_cluster_group( + mock.sentinel.cluster_handle, + mock.sentinel.group_name) + + self._mock_run.assert_called_once_with( + self._clusapi.OpenClusterGroup, + mock.sentinel.cluster_handle, + self._ctypes.c_wchar_p(mock.sentinel.group_name), + **self._clusapi_utils._open_handle_check_flags) + + self.assertEqual(self._mock_run.return_value, handle) + + def test_open_cluster_node(self): + self._mock_ctypes() + + handle = self._clusapi_utils.open_cluster_node( + mock.sentinel.cluster_handle, + mock.sentinel.node_name) + + self._mock_run.assert_called_once_with( + self._clusapi.OpenClusterNode, + mock.sentinel.cluster_handle, + self._ctypes.c_wchar_p(mock.sentinel.node_name), + **self._clusapi_utils._open_handle_check_flags) + + self.assertEqual(self._mock_run.return_value, handle) + + def test_close_cluster(self): + self._clusapi_utils.close_cluster(mock.sentinel.handle) + self._clusapi.CloseCluster.assert_called_once_with( + mock.sentinel.handle) + + def test_close_cluster_group(self): + self._clusapi_utils.close_cluster_group(mock.sentinel.handle) + self._clusapi.CloseClusterGroup.assert_called_once_with( + mock.sentinel.handle) + + def test_close_cluster_node(self): + self._clusapi_utils.close_cluster_node(mock.sentinel.handle) + self._clusapi.CloseClusterNode.assert_called_once_with( + mock.sentinel.handle) + + def test_cancel_cluster_group_operation(self): + self._clusapi_utils.cancel_cluster_group_operation( + mock.sentinel.group_handle) + + self._mock_run.assert_called_once_with( + self._clusapi.CancelClusterGroupOperation, + mock.sentinel.group_handle, + 0, + ignored_error_codes=[_clusapi_utils.ERROR_IO_PENDING]) + + @ddt.data(mock.sentinel.prop_list, None) + def test_move_cluster_group(self, prop_list): + self._mock_ctypes() + + expected_prop_list_arg = ( + self._ctypes.byref(prop_list) if prop_list else None) + expected_prop_list_sz = ( + self._ctypes.sizeof(prop_list) if prop_list else 0) + + self._clusapi_utils.move_cluster_group( + mock.sentinel.group_handle, + mock.sentinel.dest_node_handle, + mock.sentinel.move_flags, + prop_list) + + self._mock_run.assert_called_once_with( + self._clusapi.MoveClusterGroupEx, + mock.sentinel.group_handle, + mock.sentinel.dest_node_handle, + mock.sentinel.move_flags, + expected_prop_list_arg, + expected_prop_list_sz, + ignored_error_codes=[_clusapi_utils.ERROR_IO_PENDING]) + + def test_get_cluster_group_state(self): + owner_node = 'fake owner node' + + def fake_get_state(inst, + group_handle, node_name_buff, node_name_len, + error_ret_vals, error_on_nonzero_ret_val, + ret_val_is_err_code): + self.assertEqual(mock.sentinel.group_handle, group_handle) + + # Those arguments would not normally get to the ClusApi + # function, instead being used by the helper invoking + # it and catching errors. For convenience, we validate + # those arguments at this point. + self.assertEqual([constants.CLUSTER_GROUP_STATE_UNKNOWN], + error_ret_vals) + self.assertFalse(error_on_nonzero_ret_val) + self.assertFalse(ret_val_is_err_code) + + node_name_len_arg = ctypes.cast( + node_name_len, + ctypes.POINTER(_clusapi_utils.DWORD)).contents + self.assertEqual(self._clusapi_utils._MAX_NODE_NAME, + node_name_len_arg.value) + + node_name_arg = ctypes.cast( + node_name_buff, + ctypes.POINTER( + ctypes.c_wchar * + self._clusapi_utils._MAX_NODE_NAME)).contents + node_name_arg.value = owner_node + return mock.sentinel.group_state + + self._mock_run.side_effect = fake_get_state + + state_info = self._clusapi_utils.get_cluster_group_state( + mock.sentinel.group_handle) + expected_state_info = dict(state=mock.sentinel.group_state, + owner_node=owner_node) + self.assertDictEqual(expected_state_info, state_info) diff --git a/os_win/tests/utils/compute/test_clusterutils.py b/os_win/tests/utils/compute/test_clusterutils.py index 8bbe945e..d078606a 100644 --- a/os_win/tests/utils/compute/test_clusterutils.py +++ b/os_win/tests/utils/compute/test_clusterutils.py @@ -13,13 +13,17 @@ # License for the specific language governing permissions and limitations # under the License. +import ddt import mock +from os_win import constants from os_win import exceptions from os_win.tests import test_base +from os_win.utils.compute import _clusapi_utils from os_win.utils.compute import clusterutils +@ddt.ddt class ClusterUtilsTestCase(test_base.OsWinBaseTestCase): """Unit tests for the Hyper-V ClusterUtilsBase class.""" @@ -34,6 +38,8 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase): self._clusterutils = clusterutils.ClusterUtils() self._clusterutils._conn_cluster = mock.MagicMock() self._clusterutils._cluster = mock.MagicMock() + self._clusterutils._clusapi_utils = mock.Mock() + self._clusapi = self._clusterutils._clusapi_utils def test_init_hyperv_conn(self): fake_cluster_name = "fake_cluster" @@ -277,24 +283,138 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase): @mock.patch.object(clusterutils.ClusterUtils, '_migrate_vm') def test_live_migrate_vm(self, mock_migrate_vm): self._clusterutils.live_migrate_vm(self._FAKE_VM_NAME, - self._FAKE_HOST) + self._FAKE_HOST, + mock.sentinel.timeout) + + exp_valid_transition_states = [constants.CLUSTER_GROUP_PENDING] mock_migrate_vm.assert_called_once_with( self._FAKE_VM_NAME, self._FAKE_HOST, + self._clusterutils._LIVE_MIGRATION_TYPE, + constants.CLUSTER_GROUP_ONLINE, + exp_valid_transition_states, + mock.sentinel.timeout) + + @mock.patch.object(_clusapi_utils, 'DWORD') + @mock.patch.object(clusterutils.ClusterUtils, + '_wait_for_cluster_group_state') + @ddt.data(None, exceptions.ClusterException) + def test_migrate_vm(self, raised_exc, mock_wait_group, mock_dword): + mock_wait_group.side_effect = raised_exc + + migrate_args = (self._FAKE_VM_NAME, + self._FAKE_HOST, + self._clusterutils._LIVE_MIGRATION_TYPE, + constants.CLUSTER_GROUP_ONLINE, + mock.sentinel.valid_transition_states, + mock.sentinel.timeout) + + if raised_exc: + self.assertRaises(raised_exc, + self._clusterutils._migrate_vm, + *migrate_args) + else: + self._clusterutils._migrate_vm(*migrate_args) + + mock_dword.assert_called_once_with( self._clusterutils._LIVE_MIGRATION_TYPE) - @mock.patch.object(clusterutils.ClusterUtils, '_lookup_vm_group_check') - def test_migrate_vm(self, mock_lookup_vm_group_check): - vm_group = mock.MagicMock() - mock_lookup_vm_group_check.return_value = vm_group + self._clusapi.get_property_list_entry.assert_has_calls( + [mock.call(prop_name, + _clusapi_utils.CLUSPROP_SYNTAX_LIST_VALUE_DWORD, + mock_dword.return_value) + for prop_name in (_clusapi_utils.CLUSPROP_NAME_VM, + _clusapi_utils.CLUSPROP_NAME_VM_CONFIG)]) - self._clusterutils._migrate_vm( - self._FAKE_VM_NAME, self._FAKE_HOST, - self._clusterutils._LIVE_MIGRATION_TYPE) + expected_prop_entries = [ + self._clusapi.get_property_list_entry.return_value] * 2 + self._clusapi.get_property_list.assert_called_once_with( + expected_prop_entries) - vm_group.MoveToNewNodeParams.assert_called_once_with( - self._clusterutils._IGNORE_LOCKED, + expected_migrate_flags = ( + _clusapi_utils.CLUSAPI_GROUP_MOVE_RETURN_TO_SOURCE_NODE_ON_ERROR | + _clusapi_utils.CLUSAPI_GROUP_MOVE_QUEUE_ENABLED | + _clusapi_utils.CLUSAPI_GROUP_MOVE_HIGH_PRIORITY_START) + + exp_clus_h = self._clusapi.open_cluster.return_value + exp_clus_node_h = self._clusapi.open_cluster_node.return_value + exp_clus_group_h = self._clusapi.open_cluster_group.return_value + + self._clusapi.open_cluster.assert_called_once_with() + self._clusapi.open_cluster_group.assert_called_once_with( + exp_clus_h, self._FAKE_VM_NAME) + self._clusapi.open_cluster_node.assert_called_once_with( + exp_clus_h, self._FAKE_HOST) + + self._clusapi.move_cluster_group.assert_called_once_with( + exp_clus_group_h, exp_clus_node_h, expected_migrate_flags, + self._clusapi.get_property_list.return_value) + + mock_wait_group.assert_called_once_with( + self._FAKE_VM_NAME, exp_clus_group_h, + constants.CLUSTER_GROUP_ONLINE, self._FAKE_HOST, - [self._clusterutils._LIVE_MIGRATION_TYPE]) + mock.sentinel.valid_transition_states, + mock.sentinel.timeout) + + self._clusapi.close_cluster_group.assert_called_once_with( + exp_clus_group_h) + self._clusapi.close_cluster_node.assert_called_once_with( + exp_clus_node_h) + self._clusapi.close_cluster.assert_called_once_with(exp_clus_h) + + @mock.patch.object(clusterutils._utils, 'time') + def test_wait_for_clus_group_state_failed(self, mock_time): + desired_host = self._FAKE_HOST + desired_state = constants.CLUSTER_GROUP_ONLINE + valid_transition_states = [constants.CLUSTER_GROUP_PENDING] + + group_states = [dict(owner_node='other node', + state=desired_state), + dict(owner_node=desired_host, + state=constants.CLUSTER_GROUP_PENDING), + dict(owner_node=desired_host, + state=constants.CLUSTER_GROUP_FAILED)] + self._clusapi.get_cluster_group_state.side_effect = group_states + + # We don't want a timeout to be raised. We expect the tested + # function to force breaking the retry loop when the cluster + # group gets into a 'failed' state. + # + # As a precaution measure, we're still forcing a timeout at + # some point, to avoid an infinite loop if something goes wrong. + mock_time.time.side_effect = [0] * 10 + [100] + + self.assertRaises(exceptions.InvalidClusterGroupState, + self._clusterutils._wait_for_cluster_group_state, + mock.sentinel.group_name, + mock.sentinel.group_handle, + desired_state, + desired_host, + valid_transition_states, + timeout=10) + + self._clusapi.get_cluster_group_state.assert_has_calls( + [mock.call(mock.sentinel.group_handle)] * 3) + + @mock.patch.object(clusterutils._utils, 'time') + def test_wait_for_clus_group_state_success(self, mock_time): + desired_host = self._FAKE_HOST + desired_state = constants.CLUSTER_GROUP_ONLINE + + group_state = dict(owner_node=desired_host.upper(), + state=desired_state) + self._clusapi.get_cluster_group_state.return_value = group_state + + self._clusterutils._wait_for_cluster_group_state( + mock.sentinel.group_name, + mock.sentinel.group_handle, + desired_state, + desired_host, + [], + timeout=10) + + self._clusapi.get_cluster_group_state.assert_called_once_with( + mock.sentinel.group_handle) @mock.patch.object(clusterutils, 'tpool') @mock.patch.object(clusterutils, 'patcher') diff --git a/os_win/utils/compute/_clusapi_utils.py b/os_win/utils/compute/_clusapi_utils.py new file mode 100644 index 00000000..527499cf --- /dev/null +++ b/os_win/utils/compute/_clusapi_utils.py @@ -0,0 +1,197 @@ +# Copyright 2016 Cloudbase Solutions Srl +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ctypes +import sys + +if sys.platform == 'win32': + clusapi = ctypes.windll.clusapi + +from os_win import constants +from os_win import exceptions +from os_win.utils import win32utils + +DWORD = ctypes.c_ulong + +CLUSPROP_SYNTAX_NAME = 262147 +CLUSPROP_SYNTAX_ENDMARK = 0 +CLUSPROP_SYNTAX_LIST_VALUE_DWORD = 65538 + +CLUSAPI_GROUP_MOVE_RETURN_TO_SOURCE_NODE_ON_ERROR = 2 +CLUSAPI_GROUP_MOVE_QUEUE_ENABLED = 4 +CLUSAPI_GROUP_MOVE_HIGH_PRIORITY_START = 8 + +ERROR_IO_PENDING = 997 + +CLUSPROP_NAME_VM = 'Virtual Machine' +CLUSPROP_NAME_VM_CONFIG = 'Virtual Machine Configuration' + + +class ClusApiUtils(object): + _MAX_NODE_NAME = 255 + + _open_handle_check_flags = dict(ret_val_is_err_code=False, + error_on_nonzero_ret_val=False, + error_ret_vals=[0, None]) + + def __init__(self): + self._win32utils = win32utils.Win32Utils() + + def _run_and_check_output(self, *args, **kwargs): + kwargs['failure_exc'] = exceptions.ClusterWin32Exception + return self._win32utils.run_and_check_output(*args, **kwargs) + + def _dword_align(self, value): + return (value + 3) & ~3 + + def _get_clusprop_value_struct(self, val_type): + def _get_padding(): + # The cluster property entries must be 4B aligned. + val_sz = ctypes.sizeof(val_type) + return self._dword_align(val_sz) - val_sz + + # For convenience, as opposed to the homonymous ClusAPI + # structure, we add the actual value as well. + class CLUSPROP_VALUE(ctypes.Structure): + _fields_ = [('syntax', DWORD), + ('length', DWORD), + ('value', val_type), + ('_padding', ctypes.c_ubyte * _get_padding())] + return CLUSPROP_VALUE + + def get_property_list_entry(self, name, syntax, value): + # The value argument must have a ctypes type. + name_len = len(name) + 1 + val_sz = ctypes.sizeof(value) + + class CLUSPROP_LIST_ENTRY(ctypes.Structure): + _fields_ = [ + ('name', self._get_clusprop_value_struct( + val_type=ctypes.c_wchar * name_len)), + ('value', self._get_clusprop_value_struct( + val_type=ctypes.c_ubyte * val_sz)), + ('_endmark', DWORD) + ] + + entry = CLUSPROP_LIST_ENTRY() + entry.name.syntax = CLUSPROP_SYNTAX_NAME + entry.name.length = name_len * ctypes.sizeof(ctypes.c_wchar) + entry.name.value = name + + entry.value.syntax = syntax + entry.value.length = val_sz + entry.value.value[0:val_sz] = bytearray(value) + + entry._endmark = CLUSPROP_SYNTAX_ENDMARK + + return entry + + def get_property_list(self, property_entries): + prop_entries_sz = sum([ctypes.sizeof(entry) + for entry in property_entries]) + + class CLUSPROP_LIST(ctypes.Structure): + _fields_ = [('count', DWORD), + ('entries_buff', ctypes.c_ubyte * prop_entries_sz)] + + prop_list = CLUSPROP_LIST(count=len(property_entries)) + + pos = 0 + for prop_entry in property_entries: + prop_entry_sz = ctypes.sizeof(prop_entry) + prop_list.entries_buff[pos:prop_entry_sz + pos] = bytearray( + prop_entry) + pos += prop_entry_sz + + return prop_list + + def open_cluster(self, cluster_name=None): + """Returns a handle for the requested cluster. + + :param cluster_name: (Optional) specifies the name of the cluster + to be opened. If None, the cluster that the + local node belongs to will be opened. + """ + p_clus_name = ctypes.c_wchar_p(cluster_name) if cluster_name else None + handle = self._run_and_check_output(clusapi.OpenCluster, + p_clus_name, + **self._open_handle_check_flags) + return handle + + def open_cluster_group(self, cluster_handle, group_name): + handle = self._run_and_check_output(clusapi.OpenClusterGroup, + cluster_handle, + ctypes.c_wchar_p(group_name), + **self._open_handle_check_flags) + return handle + + def open_cluster_node(self, cluster_handle, node_name): + handle = self._run_and_check_output(clusapi.OpenClusterNode, + cluster_handle, + ctypes.c_wchar_p(node_name), + **self._open_handle_check_flags) + return handle + + def close_cluster(self, cluster_handle): + # This function will always return 'True'. Closing the cluster + # handle will also invalidate handles opened using it. + clusapi.CloseCluster(cluster_handle) + + def close_cluster_group(self, group_handle): + # TODO(lpetrut): The following functions can fail, in which case + # 'False' will be returned. We may want to handle this situation. + clusapi.CloseClusterGroup(group_handle) + + def close_cluster_node(self, node_handle): + clusapi.CloseClusterNode(node_handle) + + def cancel_cluster_group_operation(self, group_handle): + """Requests a pending move operation to be canceled.""" + # This only applies to move operations requested by + # MoveClusterGroup(Ex), thus it will not apply to fail overs. + self._run_and_check_output( + clusapi.CancelClusterGroupOperation, + group_handle, + 0, # cancel flags (reserved for future use by MS) + ignored_error_codes=[ERROR_IO_PENDING]) + + def move_cluster_group(self, group_handle, destination_node_handle, + move_flags, property_list): + prop_list_p = ctypes.byref(property_list) if property_list else None + prop_list_sz = ctypes.sizeof(property_list) if property_list else 0 + + self._run_and_check_output(clusapi.MoveClusterGroupEx, + group_handle, + destination_node_handle, + move_flags, + prop_list_p, + prop_list_sz, + ignored_error_codes=[ERROR_IO_PENDING]) + + def get_cluster_group_state(self, group_handle): + node_name_len = DWORD(self._MAX_NODE_NAME) + node_name_buff = (ctypes.c_wchar * node_name_len.value)() + + group_state = self._run_and_check_output( + clusapi.GetClusterGroupState, + group_handle, + ctypes.byref(node_name_buff), + ctypes.byref(node_name_len), + error_ret_vals=[constants.CLUSTER_GROUP_STATE_UNKNOWN], + error_on_nonzero_ret_val=False, + ret_val_is_err_code=False) + + return {'state': group_state, + 'owner_node': node_name_buff.value} diff --git a/os_win/utils/compute/clusterutils.py b/os_win/utils/compute/clusterutils.py index 92dcb68d..0bf7d17b 100644 --- a/os_win/utils/compute/clusterutils.py +++ b/os_win/utils/compute/clusterutils.py @@ -25,8 +25,11 @@ from eventlet import tpool from oslo_log import log as logging from os_win._i18n import _, _LE +from os_win import _utils +from os_win import constants from os_win import exceptions from os_win.utils import baseutils +from os_win.utils.compute import _clusapi_utils LOG = logging.getLogger(__name__) @@ -55,6 +58,7 @@ class ClusterUtils(baseutils.BaseUtils): def __init__(self, host='.'): self._instance_name_regex = re.compile('Virtual Machine (.*)') + self._clusapi_utils = _clusapi_utils.ClusApiUtils() if sys.platform == 'win32': self._init_hyperv_conn(host) @@ -178,20 +182,90 @@ class ClusterUtils(baseutils.BaseUtils): def vm_exists(self, vm_name): return self._lookup_vm(vm_name) is not None - def live_migrate_vm(self, vm_name, new_host): - self._migrate_vm(vm_name, new_host, self._LIVE_MIGRATION_TYPE) + def live_migrate_vm(self, vm_name, new_host, timeout=None): + valid_transition_states = [constants.CLUSTER_GROUP_PENDING] + self._migrate_vm(vm_name, new_host, self._LIVE_MIGRATION_TYPE, + constants.CLUSTER_GROUP_ONLINE, + valid_transition_states, + timeout) + + def _migrate_vm(self, vm_name, new_host, migration_type, + exp_state_after_migr, valid_transition_states, + timeout): + syntax = _clusapi_utils.CLUSPROP_SYNTAX_LIST_VALUE_DWORD + migr_type = _clusapi_utils.DWORD(migration_type) + + prop_entries = [ + self._clusapi_utils.get_property_list_entry( + _clusapi_utils.CLUSPROP_NAME_VM, syntax, migr_type), + self._clusapi_utils.get_property_list_entry( + _clusapi_utils.CLUSPROP_NAME_VM_CONFIG, syntax, migr_type) + ] + prop_list = self._clusapi_utils.get_property_list(prop_entries) + + flags = ( + _clusapi_utils.CLUSAPI_GROUP_MOVE_RETURN_TO_SOURCE_NODE_ON_ERROR | + _clusapi_utils.CLUSAPI_GROUP_MOVE_QUEUE_ENABLED | + _clusapi_utils.CLUSAPI_GROUP_MOVE_HIGH_PRIORITY_START) + + cluster_handle = None + group_handle = None + dest_node_handle = None - def _migrate_vm(self, vm_name, new_host, migration_type): - vm_group = self._lookup_vm_group_check(vm_name) try: - vm_group.MoveToNewNodeParams(self._IGNORE_LOCKED, new_host, - [migration_type]) - except Exception as e: - LOG.error(_LE('Exception during cluster live migration of ' - '%(vm_name)s to %(host)s: %(exception)s'), - {'vm_name': vm_name, - 'host': new_host, - 'exception': e}) + cluster_handle = self._clusapi_utils.open_cluster() + group_handle = self._clusapi_utils.open_cluster_group( + cluster_handle, vm_name) + dest_node_handle = self._clusapi_utils.open_cluster_node( + cluster_handle, new_host) + + self._clusapi_utils.move_cluster_group(group_handle, + dest_node_handle, + flags, + prop_list) + self._wait_for_cluster_group_state(vm_name, + group_handle, + exp_state_after_migr, + new_host, + valid_transition_states, + timeout) + finally: + if group_handle: + self._clusapi_utils.close_cluster_group(group_handle) + if dest_node_handle: + self._clusapi_utils.close_cluster_node(dest_node_handle) + if cluster_handle: + self._clusapi_utils.close_cluster(cluster_handle) + + def _wait_for_cluster_group_state(self, group_name, group_handle, + desired_state, desired_node, + valid_transition_states, timeout): + @_utils.retry_decorator(max_retry_count=None, + timeout=timeout, + exceptions=exceptions.InvalidClusterGroupState, + pass_retry_context=True) + def _ensure_group_state(retry_context): + state_info = self._clusapi_utils.get_cluster_group_state( + group_handle) + owner_node = state_info['owner_node'] + group_state = state_info['state'] + + reached_desired_state = desired_state == group_state + reached_desired_node = desired_node.lower() == owner_node.lower() + + if not (reached_desired_state and reached_desired_node): + valid_states = [desired_state] + valid_transition_states + valid_state = group_state in valid_states + retry_context['prevent_retry'] = not valid_state + + raise exceptions.InvalidClusterGroupState( + group_name=group_name, + expected_state=desired_state, + expected_node=desired_node, + group_state=group_state, + owner_node=owner_node) + + _ensure_group_state() def monitor_vm_failover(self, callback): """Creates a monitor to check for new WMI MSCluster_Resource