From 1ff2cb912eb7035281c1d7ae8abd1af6e7560e4d Mon Sep 17 00:00:00 2001 From: Alessandro Pilotti Date: Wed, 15 Feb 2017 11:18:28 +0200 Subject: [PATCH] Adds KMS and AVMA licensing automation Improves the licensing plugin by adding support for KMS and AVMA automated product key assignment and KMS host settings. Change-Id: I361391172a5b857c64c074c7d816322cde13d38c Co-Authored-By: Stefan Caraiman Implements: blueprint improve-licensing-plugin --- cloudbaseinit/conf/default.py | 13 + cloudbaseinit/constant.py | 3 + cloudbaseinit/metadata/services/base.py | 6 + cloudbaseinit/osutils/base.py | 3 + cloudbaseinit/osutils/windows.py | 21 ++ cloudbaseinit/plugins/windows/licensing.py | 84 +++++-- .../tests/plugins/windows/test_licensing.py | 168 ++++++++----- .../tests/utils/windows/test_licensing.py | 231 ++++++++++++++++++ cloudbaseinit/utils/windows/licensing.py | 175 +++++++++++++ cloudbaseinit/utils/windows/productkeys.py | 119 +++++++++ 10 files changed, 737 insertions(+), 86 deletions(-) create mode 100644 cloudbaseinit/tests/utils/windows/test_licensing.py create mode 100644 cloudbaseinit/utils/windows/licensing.py create mode 100644 cloudbaseinit/utils/windows/productkeys.py diff --git a/cloudbaseinit/conf/default.py b/cloudbaseinit/conf/default.py index 70a2c7e9..771ceae3 100644 --- a/cloudbaseinit/conf/default.py +++ b/cloudbaseinit/conf/default.py @@ -69,6 +69,19 @@ class GlobalOptions(conf_base.Options): cfg.BoolOpt( 'activate_windows', default=False, help='Activates Windows automatically'), + cfg.BoolOpt( + 'set_kms_product_key', default=False, + help='Sets the KMS product key for this operating system'), + cfg.BoolOpt( + 'set_avma_product_key', default=False, + help='Sets the AVMA product key for this operating system'), + cfg.StrOpt( + 'kms_host', default=None, + help='The KMS host address in form [:], ' + 'e.g: "kmshost:1688"'), + cfg.BoolOpt( + 'log_licensing_info', default=True, + help='Logs the operating system licensing information'), cfg.BoolOpt( 'winrm_enable_basic_auth', default=True, help='Enables basic authentication for the WinRM ' diff --git a/cloudbaseinit/constant.py b/cloudbaseinit/constant.py index 9f21e23c..6fe93a67 100644 --- a/cloudbaseinit/constant.py +++ b/cloudbaseinit/constant.py @@ -35,3 +35,6 @@ ALWAYS_CHANGE = 'always' NEVER_CHANGE = 'no' LOGON_PASSWORD_CHANGE_OPTIONS = [CLEAR_TEXT_INJECTED_ONLY, NEVER_CHANGE, ALWAYS_CHANGE] + +VOL_ACT_KMS = "KMS" +VOL_ACT_AVMA = "AVMA" diff --git a/cloudbaseinit/metadata/services/base.py b/cloudbaseinit/metadata/services/base.py index 601a6d04..f9f8deb2 100644 --- a/cloudbaseinit/metadata/services/base.py +++ b/cloudbaseinit/metadata/services/base.py @@ -190,6 +190,12 @@ class BaseMetadataService(object): def post_rdp_cert_thumbprint(self, thumbprint): pass + def get_kms_host(self): + pass + + def get_use_avma_licensing(self): + pass + class BaseHTTPMetadataService(BaseMetadataService): diff --git a/cloudbaseinit/osutils/base.py b/cloudbaseinit/osutils/base.py index ce41ee1f..11304ae9 100644 --- a/cloudbaseinit/osutils/base.py +++ b/cloudbaseinit/osutils/base.py @@ -98,6 +98,9 @@ class BaseOSUtils(object): metric): raise NotImplementedError() + def get_os_version(self): + raise NotImplementedError() + def check_os_version(self, major, minor, build=0): raise NotImplementedError() diff --git a/cloudbaseinit/osutils/windows.py b/cloudbaseinit/osutils/windows.py index 490115c5..66d5e123 100644 --- a/cloudbaseinit/osutils/windows.py +++ b/cloudbaseinit/osutils/windows.py @@ -199,6 +199,10 @@ msvcrt.malloc.restype = ctypes.c_void_p msvcrt.free.argtypes = [ctypes.c_void_p] msvcrt.free.restype = None +ntdll.RtlGetVersion.argtypes = [ + ctypes.POINTER(Win32_OSVERSIONINFOEX_W)] +ntdll.RtlGetVersion.restype = wintypes.DWORD + ntdll.RtlVerifyVersionInfo.argtypes = [ ctypes.POINTER(Win32_OSVERSIONINFOEX_W), wintypes.DWORD, wintypes.ULARGE_INTEGER] @@ -1021,6 +1025,23 @@ class WindowsUtils(base.BaseOSUtils): raise exception.CloudbaseInitException( 'Unable to add route: %s' % err) + def get_os_version(self): + vi = Win32_OSVERSIONINFOEX_W() + vi.dwOSVersionInfoSize = ctypes.sizeof(Win32_OSVERSIONINFOEX_W) + ret_val = ntdll.RtlGetVersion(ctypes.byref(vi)) + if ret_val: + raise exception.WindowsCloudbaseInitException( + "RtlGetVersion failed with error: %s" % ret_val) + return {"major_version": vi.dwMajorVersion, + "minor_version": vi.dwMinorVersion, + "build_number": vi.dwBuildNumber, + "platform_id": vi.dwPlatformId, + "csd_version": vi.szCSDVersion, + "service_pack_major": vi.wServicePackMajor, + "service_pack_minor": vi.wServicePackMinor, + "suite_mask": vi.wSuiteMask, + "product_type": vi.wProductType} + def check_os_version(self, major, minor, build=0): vi = Win32_OSVERSIONINFOEX_W() vi.dwOSVersionInfoSize = ctypes.sizeof(Win32_OSVERSIONINFOEX_W) diff --git a/cloudbaseinit/plugins/windows/licensing.py b/cloudbaseinit/plugins/windows/licensing.py index 7ee5e4a8..b06ed2a4 100644 --- a/cloudbaseinit/plugins/windows/licensing.py +++ b/cloudbaseinit/plugins/windows/licensing.py @@ -12,15 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. -import os - from oslo_log import log as oslo_logging from cloudbaseinit import conf as cloudbaseinit_conf -from cloudbaseinit import exception +from cloudbaseinit import constant from cloudbaseinit.osutils import factory as osutils_factory from cloudbaseinit.plugins.common import base - +from cloudbaseinit.utils.windows import licensing CONF = cloudbaseinit_conf.CONF LOG = oslo_logging.getLogger(__name__) @@ -28,27 +26,55 @@ LOG = oslo_logging.getLogger(__name__) class WindowsLicensingPlugin(base.BasePlugin): - def _run_slmgr(self, osutils, args): - if osutils.check_sysnative_dir_exists(): - cscript_dir = osutils.get_sysnative_dir() + def _set_product_key(self, service, manager): + if not CONF.set_kms_product_key and not CONF.set_avma_product_key: + return + + description, license_family, is_current = manager.get_kms_product() + if is_current: + LOG.info('Product "%s" is already the current one, no need to set ' + 'a product key', description) else: - cscript_dir = osutils.get_system32_dir() + use_avma = service.get_use_avma_licensing() + if use_avma is None: + use_avma = CONF.set_avma_product_key + LOG.debug("Use AVMA: %s", use_avma) - # Not SYSNATIVE, as it is already executed by a x64 process - slmgr_dir = osutils.get_system32_dir() + product_key = None + if use_avma: + product_key = manager.get_volume_activation_product_key( + license_family, constant.VOL_ACT_AVMA) + if not product_key: + LOG.error("AVMA product key not found for this OS") - cscript_path = os.path.join(cscript_dir, "cscript.exe") - slmgr_path = os.path.join(slmgr_dir, "slmgr.vbs") + if not product_key and CONF.set_kms_product_key: + product_key = manager.get_volume_activation_product_key( + license_family, constant.VOL_ACT_KMS) + if not product_key: + LOG.error("KMS product key not found for this OS") - (out, err, exit_code) = osutils.execute_process( - [cscript_path, slmgr_path] + args, shell=False, decode_output=True) + if product_key: + LOG.info("Setting product key: %s", product_key) + manager.set_product_key(product_key) - if exit_code: - raise exception.CloudbaseInitException( - 'slmgr.vbs failed with error code %(exit_code)s.\n' - 'Output: %(out)s\nError: %(err)s' % {'exit_code': exit_code, - 'out': out, 'err': err}) - return out + def _set_kms_host(self, service, manager): + kms_host = service.get_kms_host() or CONF.kms_host + if kms_host: + LOG.info("Setting KMS host: %s", kms_host) + manager.set_kms_host(*kms_host.split(':')) + + def _activate_windows(self, service, manager): + if CONF.activate_windows: + # note(alexpilotti): KMS clients activate themselves + # so this could be skipped if a KMS host is set + LOG.info("Activating Windows") + activation_result = manager.activate_windows() + LOG.debug("Activation result:\n%s" % activation_result) + + def _log_licensing_info(self, manager): + if CONF.log_licensing_info: + license_info = manager.get_licensing_info() + LOG.info('Microsoft Windows license info:\n%s' % license_info) def execute(self, service, shared_data): osutils = osutils_factory.get_os_utils() @@ -57,12 +83,18 @@ class WindowsLicensingPlugin(base.BasePlugin): LOG.info("Licensing info and activation are not available on " "Nano Server") else: - license_info = self._run_slmgr(osutils, ['/dlv']) - LOG.info('Microsoft Windows license info:\n%s' % license_info) + manager = licensing.get_licensing_manager() - if CONF.activate_windows: - LOG.info("Activating Windows") - activation_result = self._run_slmgr(osutils, ['/ato']) - LOG.debug("Activation result:\n%s" % activation_result) + eval_end_date = manager.is_eval() + if eval_end_date: + LOG.info("Evaluation license, skipping activation. " + "Evaluation end date: %s", eval_end_date) + else: + self._set_product_key(service, manager) + self._set_kms_host(service, manager) + self._activate_windows(service, manager) + manager.refresh_status() + + self._log_licensing_info(manager) return base.PLUGIN_EXECUTION_DONE, False diff --git a/cloudbaseinit/tests/plugins/windows/test_licensing.py b/cloudbaseinit/tests/plugins/windows/test_licensing.py index 414e53e6..1fd49239 100644 --- a/cloudbaseinit/tests/plugins/windows/test_licensing.py +++ b/cloudbaseinit/tests/plugins/windows/test_licensing.py @@ -12,7 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. -import os +import importlib import unittest try: @@ -20,90 +20,138 @@ try: except ImportError: import mock -from cloudbaseinit import exception from cloudbaseinit.plugins.common import base -from cloudbaseinit.plugins.windows import licensing from cloudbaseinit.tests import testutils +MODPATH = "cloudbaseinit.plugins.windows.licensing" + + class WindowsLicensingPluginTests(unittest.TestCase): def setUp(self): + self._wmi_mock = mock.MagicMock() + self._module_patcher = mock.patch.dict( + 'sys.modules', { + 'wmi': self._wmi_mock}) + self.snatcher = testutils.LogSnatcher(MODPATH) + self._module_patcher.start() + licensing = importlib.import_module(MODPATH) self._licensing = licensing.WindowsLicensingPlugin() - def _test_run_slmgr(self, sysnative, exit_code): - mock_osutils = mock.MagicMock() - get_system32_dir_calls = [mock.call()] - cscript_path = os.path.join('cscrypt path', "cscript.exe") - slmgr_path = os.path.join('slmgr path', "slmgr.vbs") + def tearDown(self): + self._module_patcher.stop() - mock_osutils.check_sysnative_dir_exists.return_value = sysnative - mock_osutils.get_sysnative_dir.return_value = 'cscrypt path' - if not sysnative: - mock_osutils.get_system32_dir.side_effect = ['cscrypt path', - 'slmgr path'] + @testutils.ConfPatcher('set_kms_product_key', True) + @testutils.ConfPatcher('set_avma_product_key', True) + def _test_set_product_key(self, description=None, + license_family=None, is_current=None): + mock_service = mock.Mock() + mock_manager = mock.Mock() + fake_key = mock.sentinel.key + mock_service.get_use_avma_licensing.return_value = None + mock_manager.get_kms_product.return_value = (description, + license_family, + is_current) + mock_manager.get_volume_activation_product_key.return_value = fake_key + with self.snatcher: + self._licensing._set_product_key(mock_service, mock_manager) + mock_manager.get_kms_product.assert_called_once_with() + if is_current: + expected_logs = ['Product "%s" is already the current one, ' + 'no need to set a product key' % description] + self.assertEqual(self.snatcher.output, expected_logs) + return else: - mock_osutils.get_system32_dir.return_value = 'slmgr path' - mock_osutils.execute_process.return_value = ('fake output', None, - exit_code) + mock_service.get_use_avma_licensing.assert_called_once_with() + (mock_manager.get_volume_activation_product_key. + assert_called_once_with(None, 'AVMA')) + mock_manager.set_product_key.assert_called_once_with(fake_key) - if exit_code: - self.assertRaises(exception.CloudbaseInitException, - self._licensing._run_slmgr, - mock_osutils, ['fake args']) - else: - response = self._licensing._run_slmgr(osutils=mock_osutils, - args=['fake args']) - self.assertEqual('fake output', response) + def test_set_product_key(self): + self._test_set_product_key() - mock_osutils.check_sysnative_dir_exists.assert_called_once_with() - if sysnative: - mock_osutils.get_sysnative_dir.assert_called_once_with() - else: - get_system32_dir_calls.append(mock.call()) + def test_set_product_key_is_current(self): + self._test_set_product_key(is_current=True) - mock_osutils.execute_process.assert_called_once_with( - [cscript_path, slmgr_path, 'fake args'], - shell=False, decode_output=True) - self.assertEqual(get_system32_dir_calls, - mock_osutils.get_system32_dir.call_args_list) + def test_set_kms_host(self): + mock_service = mock.Mock() + mock_manager = mock.Mock() + mock_host = "127.0.0.1:1688" + expected_host_call = mock_host.split(':') + mock_service.get_kms_host.return_value = mock_host + expected_logs = ["Setting KMS host: %s" % mock_host] + with self.snatcher: + self._licensing._set_kms_host(mock_service, mock_manager) + self.assertEqual(self.snatcher.output, expected_logs) + mock_manager.set_kms_host.assert_called_once_with(*expected_host_call) - def test_run_slmgr_sysnative(self): - self._test_run_slmgr(sysnative=True, exit_code=None) - - def test_run_slmgr_not_sysnative(self): - self._test_run_slmgr(sysnative=False, exit_code=None) - - def test_run_slmgr_exit_code(self): - self._test_run_slmgr(sysnative=True, exit_code='fake exit code') + def test_activate_windows(self): + activate_result = mock.Mock() + mock_service = mock.Mock() + mock_manager = mock.Mock() + mock_manager.activate_windows.return_value = activate_result + expected_logs = [ + "Activating Windows", + "Activation result:\n%s" % activate_result] + with testutils.ConfPatcher('activate_windows', True): + with self.snatcher: + self._licensing._activate_windows(mock_service, mock_manager) + self.assertEqual(self.snatcher.output, expected_logs) + mock_manager.activate_windows.assert_called_once_with() + @mock.patch(MODPATH + ".WindowsLicensingPlugin._activate_windows") + @mock.patch(MODPATH + ".WindowsLicensingPlugin._set_kms_host") + @mock.patch(MODPATH + ".WindowsLicensingPlugin._set_product_key") @mock.patch('cloudbaseinit.osutils.factory.get_os_utils') - @mock.patch('cloudbaseinit.plugins.windows.licensing' - '.WindowsLicensingPlugin._run_slmgr') - def _test_execute(self, mock_run_slmgr, mock_get_os_utils, - activate_windows=None, nano=False): + @mock.patch('cloudbaseinit.utils.windows.licensing.get_licensing_manager') + def _test_execute(self, mock_get_licensing_manager, + mock_get_os_utils, + mock_set_product_key, + mock_set_kms_host, + mock_activate_windows, + nano=False, is_eval=True): + mock_service = mock.Mock() + mock_manager = mock.Mock() + mock_get_licensing_manager.return_value = mock_manager mock_osutils = mock.MagicMock() mock_osutils.is_nano_server.return_value = nano - run_slmgr_calls = [mock.call(mock_osutils, ['/dlv'])] mock_get_os_utils.return_value = mock_osutils - - with testutils.ConfPatcher('activate_windows', activate_windows): - response = self._licensing.execute(service=None, shared_data=None) + mock_manager.is_eval.return_value = is_eval + mock_manager.get_licensing_info.return_value = "fake" + expected_logs = [] + with self.snatcher: + response = self._licensing.execute(service=mock_service, + shared_data=None) mock_get_os_utils.assert_called_once_with() if nano: + expected_logs = ["Licensing info and activation are " + "not available on Nano Server"] + self.assertEqual(self.snatcher.output, expected_logs) return # no activation available - if activate_windows: - run_slmgr_calls.append(mock.call(mock_osutils, ['/ato'])) + else: + if not is_eval: + mock_set_product_key.assert_called_once_with(mock_service, + mock_manager) + mock_set_kms_host.assert_called_once_with(mock_service, + mock_manager) + mock_activate_windows.assert_called_once_with(mock_service, + mock_manager) + else: + expected_logs.append("Evaluation license, skipping activation" + ". Evaluation end date: %s" % is_eval) + expected_logs.append('Microsoft Windows license info:\nfake') + mock_manager.get_licensing_info.assert_called_once_with() - self.assertEqual(run_slmgr_calls, mock_run_slmgr.call_args_list) self.assertEqual((base.PLUGIN_EXECUTION_DONE, False), response) + self.assertEqual(self.snatcher.output, expected_logs) - def test_execute_activate_windows_true(self): - self._test_execute(activate_windows=True) - - def test_execute_activate_windows_false(self): - self._test_execute(activate_windows=False) - - def test_execute_activate_windows_nano(self): + def test_execute_nano(self): self._test_execute(nano=True) + + def test_execute_is_evaluated(self): + self._test_execute() + + def test_execute(self): + self._test_execute(is_eval=False) diff --git a/cloudbaseinit/tests/utils/windows/test_licensing.py b/cloudbaseinit/tests/utils/windows/test_licensing.py new file mode 100644 index 00000000..3d7d4ee7 --- /dev/null +++ b/cloudbaseinit/tests/utils/windows/test_licensing.py @@ -0,0 +1,231 @@ +# Copyright (c) 2017 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import importlib +import unittest + +try: + import unittest.mock as mock +except ImportError: + import mock + +from cloudbaseinit import exception +from cloudbaseinit.tests import testutils + + +MODPATH = "cloudbaseinit.utils.windows.licensing" + + +class LicensingTest(unittest.TestCase): + + def setUp(self): + self._wmi_mock = mock.MagicMock() + self._module_patcher = mock.patch.dict( + 'sys.modules', { + 'wmi': self._wmi_mock}) + self.snatcher = testutils.LogSnatcher(MODPATH) + self._module_patcher.start() + self.licensing = importlib.import_module(MODPATH) + self._licensing = self.licensing.LicensingManager() + self._licensing_v2 = self.licensing.LicensingManagerV2() + + def tearDown(self): + self._module_patcher.stop() + + @mock.patch('cloudbaseinit.osutils.factory.get_os_utils') + def _test_run_slmgr(self, mock_get_os_utils, ret_val=0, + sysnative=True): + mock_args = [mock.sentinel.args] + mock_outval = b"fake-out" + mock_cscriptdir = r"fake\cscript\dir" + mock_osutils = mock.Mock() + mock_osutils.get_sysnative_dir.return_value = mock_cscriptdir + mock_osutils.get_system32_dir.return_value = mock_cscriptdir + mock_get_os_utils.return_value = mock_osutils + mock_osutils.get_system32_dir.return_value = r"fakedir" + mock_osutils.check_sysnative_dir_exists.return_value = sysnative + mock_osutils.execute_process.return_value = ( + mock_outval, mock.sentinel.err, ret_val) + + if ret_val: + self.assertRaises(exception.CloudbaseInitException, + self._licensing._run_slmgr, mock_args) + else: + res_out = self._licensing._run_slmgr(mock_args) + self.assertEqual(res_out, "fake-out") + self.assertEqual(mock_osutils.execute_process.call_count, 1) + + def test_run_slmgr_sys_native(self): + self._test_run_slmgr() + + def test_run_slmgr_system32(self): + self._test_run_slmgr(sysnative=False) + + def test_run_slmgr_fail(self): + self._test_run_slmgr(ret_val=1) + + @mock.patch(MODPATH + ".LicensingManager._run_slmgr") + def test_get_licensing_info(self, mock_run_slmgr): + mock_out = mock.sentinel.out_val + mock_run_slmgr.return_value = mock_out + res = self._licensing.get_licensing_info() + mock_run_slmgr.assert_called_once_with(['/dlv']) + self.assertEqual(res, mock_out) + + @mock.patch(MODPATH + ".LicensingManager._run_slmgr") + def test_activate_windows(self, mock_run_slmgr): + mock_out = mock.sentinel.out_val + mock_run_slmgr.return_value = mock_out + res = self._licensing.activate_windows() + mock_run_slmgr.assert_called_once_with(['/ato']) + self.assertEqual(res, mock_out) + + @mock.patch(MODPATH + ".LicensingManager._run_slmgr") + def test_set_kms_host(self, mock_run_slmgr): + mock_out = mock.sentinel.out_val + mock_kms = mock.sentinel.kms_host + mock_run_slmgr.return_value = mock_out + res = self._licensing.set_kms_host(mock_kms) + expected_host = "%s:%s" % (mock_kms, self.licensing.DEFAULT_KMS_PORT) + mock_run_slmgr.assert_called_once_with(['/skms', expected_host]) + self.assertEqual(res, mock_out) + + @mock.patch(MODPATH + ".LicensingManager._run_slmgr") + def test_set_kms_auto_discovery(self, mock_run_slmgr): + mock_out = mock.sentinel.out_val + mock_run_slmgr.return_value = mock_out + res = self._licensing.set_kms_auto_discovery() + mock_run_slmgr.assert_called_once_with(['/ckms']) + self.assertEqual(res, mock_out) + + @mock.patch(MODPATH + ".LicensingManager._run_slmgr") + def test_set_product_key(self, mock_run_slmgr): + mock_out = mock.sentinel.out_val + mock_product_key = mock.sentinel.product_key + mock_run_slmgr.return_value = mock_out + res = self._licensing.set_product_key(mock_product_key) + mock_run_slmgr.assert_called_once_with(['/ipk', mock_product_key]) + self.assertEqual(res, mock_out) + + def test_is_eval_v1(self): + with self.assertRaises(NotImplementedError): + self._licensing.is_eval() + + def test_get_kms_product_v1(self): + with self.assertRaises(NotImplementedError): + self._licensing.get_kms_product() + + def test_get_volume_activation_product_key_v1(self): + with self.assertRaises(NotImplementedError): + self._licensing.get_volume_activation_product_key('fake') + + def test_get_service(self): + mock_result = mock.Mock() + conn = self._wmi_mock.WMI + conn.SoftwareLicensingService.return_value = [mock_result] + self._licensing_v2._get_service() + self.assertIsNotNone(self._licensing_v2._service) + + @mock.patch(MODPATH + '.LicensingManagerV2._get_service') + def test_set_product_key_v2(self, mock_get_service): + mock_product_key = mock.Mock() + mock_service = mock.Mock() + mock_get_service.return_value = mock_service + self._licensing_v2.set_product_key(mock_product_key) + mock_get_service.assert_called_once_with() + mock_service.InstallProductKey.assert_called_once_with( + mock_product_key) + + @mock.patch(MODPATH + '.LicensingManagerV2._get_service') + def test_set_kms_auto_discovery_v2(self, mock_get_service): + mock_service = mock.Mock() + mock_get_service.return_value = mock_service + self._licensing_v2.set_kms_auto_discovery() + mock_get_service.assert_called_once_with() + mock_service.ClearKeyManagementServiceMachine.assert_called_once_with() + mock_service.ClearKeyManagementServicePort.assert_called_once_with() + + @mock.patch(MODPATH + '.LicensingManagerV2._get_service') + def test_set_kms_host_v2(self, mock_get_service): + mock_service = mock.Mock() + mock_host = mock.sentinel.host + mock_port = mock.sentinel.port + mock_get_service.return_value = mock_service + self._licensing_v2.set_kms_host(mock_host, mock_port) + mock_get_service.assert_called_once_with() + mock_service.SetKeyManagementServiceMachine.assert_called_once_with( + mock_host) + mock_service.SetKeyManagementServicePort.assert_called_once_with( + mock_port) + + @mock.patch(MODPATH + '.LicensingManagerV2._get_service') + def test_refresh_status_v2(self, mock_get_service): + mock_service = mock.Mock() + mock_get_service.return_value = mock_service + self._licensing_v2.refresh_status() + mock_get_service.assert_called_once_with() + mock_service.RefreshLicenseStatus.assert_called_once_with() + + def test_is_current_product(self): + mock_product = mock.Mock() + mock_product.PartialProductKey = "fake-key" + res = self._licensing_v2._is_current_product(mock_product) + self.assertTrue(res) + + def test_get_products(self): + mock_result = mock.Mock() + conn = self._wmi_mock.WMI + conn.query.return_value = mock_result + res = self._licensing_v2._get_products() + self.assertEqual(res, self._licensing_v2._products) + + @mock.patch(MODPATH + ".LicensingManagerV2._get_products") + def test_is_eval(self, mock_get_products): + mock_product = mock.Mock() + mock_product.ApplicationId = self.licensing.WINDOWS_APP_ID + mock_product.Description = u"TIMEBASED_EVAL" + mock_product.EvaluationEndDate = "fake" + mock_get_products.return_value = [mock_product] + res = self._licensing_v2.is_eval() + self.assertEqual(res, "fake") + + @mock.patch(MODPATH + ".LicensingManagerV2._get_products") + def _test_get_kms_product(self, mock_get_products, products=()): + mock_get_products.return_value = products + if not products: + self.assertRaises(exception.ItemNotFoundException, + self._licensing_v2.get_kms_product) + return + res = self._licensing_v2.get_kms_product() + self.assertIsNotNone(res) + + def test_get_kms_product_no_keys(self): + self._test_get_kms_product() + + def test_get_kms_product(self): + mock_product = mock.Mock() + mock_product.ApplicationId = self.licensing.WINDOWS_APP_ID + mock_product.Description = u"VOLUME_KMSCLIENT" + self._test_get_kms_product(products=[mock_product]) + + @mock.patch('cloudbaseinit.osutils.factory.get_os_utils') + def test_get_volume_activation_product_key(self, mock_get_os_utils): + mock_os_version = {'major_version': 10, 'minor_version': 0} + expected_key = "WC2BQ-8NRM3-FDDYY-2BFGV-KHKQY" + mock_osutils = mock.Mock() + mock_get_os_utils.return_value = mock_osutils + mock_osutils.get_os_version.return_value = mock_os_version + res = self._licensing_v2.get_volume_activation_product_key( + license_family="ServerStandard") + self.assertEqual(res, expected_key) diff --git a/cloudbaseinit/utils/windows/licensing.py b/cloudbaseinit/utils/windows/licensing.py new file mode 100644 index 00000000..8bd9d3bf --- /dev/null +++ b/cloudbaseinit/utils/windows/licensing.py @@ -0,0 +1,175 @@ +# Copyright 2014 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +import wmi + +from oslo_log import log as oslo_logging + +from cloudbaseinit import constant +from cloudbaseinit import exception +from cloudbaseinit.osutils import factory as osutils_factory +from cloudbaseinit.utils.windows import productkeys + +LOG = oslo_logging.getLogger(__name__) + +WINDOWS_APP_ID = "55c92734-d682-4d71-983e-d6ec3f16059f" +DEFAULT_KMS_PORT = 1688 + + +def get_licensing_manager(): + osutils = osutils_factory.get_os_utils() + if osutils.check_os_version(6, 1): + return LicensingManagerV2() + else: + return LicensingManager() + + +class LicensingManager(object): + @staticmethod + def _run_slmgr(args): + osutils = osutils_factory.get_os_utils() + + if osutils.check_sysnative_dir_exists(): + cscript_dir = osutils.get_sysnative_dir() + else: + cscript_dir = osutils.get_system32_dir() + + # Not SYSNATIVE, as it is already executed by a x64 process + slmgr_dir = osutils.get_system32_dir() + + cscript_path = os.path.join(cscript_dir, "cscript.exe") + slmgr_path = os.path.join(slmgr_dir, "slmgr.vbs") + + (out, err, exit_code) = osutils.execute_process( + [cscript_path, slmgr_path] + args, shell=False, decode_output=True) + + if exit_code: + raise exception.CloudbaseInitException( + 'slmgr.vbs failed with error code %(exit_code)s.\n' + 'Output: %(out)s\nError: %(err)s' % {'exit_code': exit_code, + 'out': out, 'err': err}) + return out.decode(errors='replace') + + def get_licensing_info(self): + return self._run_slmgr(['/dlv']) + + def activate_windows(self): + return self._run_slmgr(['/ato']) + + def set_kms_host(self, host, port=DEFAULT_KMS_PORT): + kms_host = "%s:%s" % (host, port) + return self._run_slmgr(['/skms', kms_host]) + + def set_kms_auto_discovery(self): + return self._run_slmgr(['/ckms']) + + def set_product_key(self, product_key): + return self._run_slmgr(['/ipk', product_key]) + + def is_eval(self): + raise NotImplementedError() + + def get_kms_product(self): + raise NotImplementedError() + + def get_volume_activation_product_key(self, license_family, + vol_act_type=constant.VOL_ACT_KMS): + raise NotImplementedError() + + def refresh_status(self): + pass + + +class LicensingManagerV2(LicensingManager): + def __init__(self): + self._products = None + self._service = None + + def _get_service(self): + if not self._service: + conn = wmi.WMI(moniker='//./root/cimv2') + self._service = conn.SoftwareLicensingService()[0] + return self._service + + def set_product_key(self, product_key): + service = self._get_service() + service.InstallProductKey(product_key) + + def set_kms_auto_discovery(self): + service = self._get_service() + service.ClearKeyManagementServiceMachine() + service.ClearKeyManagementServicePort() + + def set_kms_host(self, host, port=DEFAULT_KMS_PORT): + service = self._get_service() + service.SetKeyManagementServiceMachine(host) + service.SetKeyManagementServicePort(port) + + def refresh_status(self): + service = self._get_service() + service.RefreshLicenseStatus() + + def _get_products(self): + if not self._products: + conn = wmi.WMI(moniker='//./root/cimv2') + self._products = conn.query( + 'SELECT ID, ApplicationId, PartialProductKey, ' + 'LicenseFamily, LicenseIsAddon, Description, ' + 'EvaluationEndDate, Name FROM ' + 'SoftwareLicensingProduct WHERE ' + 'LicenseIsAddon=False') + return self._products + + @staticmethod + def _is_current_product(product): + return bool(product.PartialProductKey) + + def is_eval(self): + def _is_eval(product): + return (u"TIMEBASED_EVAL" in product.Description or + product.EvaluationEndDate != u"16010101000000.000000-000") + + for product in self._get_products(): + app_id = product.ApplicationId.lower() + if (app_id == WINDOWS_APP_ID and _is_eval(product) and + self._is_current_product(product)): + return product.EvaluationEndDate + + def get_kms_product(self): + def _is_kms_client(product): + # note(alexpilotti): could check for + # KeyManagementServiceProductKeyID + return u"VOLUME_KMSCLIENT" in product.Description + + for product in self._get_products(): + app_id = product.ApplicationId.lower() + if app_id == WINDOWS_APP_ID and _is_kms_client(product): + return (product.Description, product.LicenseFamily, + self._is_current_product(product)) + + raise exception.ItemNotFoundException("KMS client product not found") + + def get_volume_activation_product_key(self, license_family, + vol_act_type=constant.VOL_ACT_KMS): + osutils = osutils_factory.get_os_utils() + os_version = osutils.get_os_version() + os_major = os_version["major_version"] + os_minor = os_version["minor_version"] + + product_keys_map = productkeys.SKU_TO_PRODUCT_KEY_MAP.get( + (os_major, os_minor, vol_act_type), {}) + + return product_keys_map.get(license_family) diff --git a/cloudbaseinit/utils/windows/productkeys.py b/cloudbaseinit/utils/windows/productkeys.py new file mode 100644 index 00000000..2206c6fd --- /dev/null +++ b/cloudbaseinit/utils/windows/productkeys.py @@ -0,0 +1,119 @@ +# Copyright 2017 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from cloudbaseinit import constant + +SKU_TO_PRODUCT_KEY_MAP = { + # KMS: https://technet.microsoft.com/en-US/jj612867.aspx + # AVMA: https://technet.microsoft.com/en-us/library/dn303421.aspx + + (6, 1, constant.VOL_ACT_KMS): { + # Windows 7, Windows server 2008 R2 + "Business": "FJ82H-XT6CR-J8D7P-XQJJ2-GPDD4", + "BusinessN": "MRPKT-YTG23-K7D7T-X2JMM-QY7MG", + "BusinessE": "W82YF-2Q76Y-63HXB-FGJG9-GF7QX", + "Enterprise": "33PXH-7Y6KF-2VJC9-XBBR8-HVTHH", + "EnterpriseN": "YDRBP-3D83W-TY26F-D46B2-XCKRJ", + "EnterpriseE": "C29WB-22CC8-VJ326-GHFJW-H9DH4", + "ServerComputeCluster": "FKJQ8-TMCVP-FRMR7-4WR42-3JCD7", + "ServerDatacenter": "74YFP-3QFB3-KQT8W-PMXWJ-7M648", + "ServerEnterprise": "489J6-VHDMP-X63PK-3K798-CPX3Y", + "ServerEnterpriseIA64": "GT63C-RJFQ3-4GMB6-BRFB9-CB83V", + "ServerStandard": "YC6KT-GKW9T-YTKYR-T4X34-R7VHC", + "ServerWeb": "6TPJF-RBVHG-WBW2R-86QPH-6RTM4", + }, + + (6, 2, constant.VOL_ACT_KMS): { + # Windows 8, Windows server 2012 + "Core": "BN3D2-R7TKB-3YPBD-8DRP2-27GG4", + "CoreARM": "DXHJF-N9KQX-MFPVR-GHGQK-Y7RKV", + "CoreCountrySpecific": "4K36P-JN4VD-GDC6V-KDT89-DYFKP", + "CoreN": "8N2M2-HWPGY-7PGT9-HGDD8-GVGGY", + "CoreSingleLanguage": "2WN2H-YGCQR-KFX6K-CD6TF-84YXQ", + "Enterprise": "32JNW-9KQ84-P47T8-D8GGY-CWCK7", + "EnterpriseN": "JMNMF-RHW7P-DMY6X-RF3DR-X2BQT", + "Professional": "NG4HW-VH26C-733KW-K6F98-J8CK4", + "ProfessionalN": "XCVCF-2NXM9-723PB-MHCB7-2RYQQ", + "ProfessionalWMC": "GNBB8-YVD74-QJHX6-27H4K-8QHDG", + "ServerDatacenter": "48HP8-DN98B-MYWDG-T2DCC-8W83P", + "ServerDatacenterCore": "48HP8-DN98B-MYWDG-T2DCC-8W83P", + "ServerMultiPointStandard": "HM7DN-YVMH3-46JC3-XYTG7-CYQJJ", + "ServerMultiPointPremium": "XNH6W-2V9GX-RGJ4K-Y8X6F-QGJ2G", + "ServerStandard": "XC9B7-NBPP2-83J2H-RHMBY-92BT4", + "ServerStandardCore": "XC9B7-NBPP2-83J2H-RHMBY-92BT4", + }, + + (6, 3, constant.VOL_ACT_KMS): { + # Windows 8.1, Windows server 2012 R2 + "CoreARM": "XYTND-K6QKT-K2MRH-66RTM-43JKP", + "ServerStandard": "D2N9P-3P6X9-2R39C-7RTCD-MDVJX", + "ServerCloudStorageCore": "3NPTF-33KPT-GGBPR-YX76B-39KDD", + "ServerCloudStorage": "3NPTF-33KPT-GGBPR-YX76B-39KDD", + "EmbeddedIndustryA": "VHXM3-NR6FT-RY6RT-CK882-KW2CJ", + "CoreN": "7B9N3-D94CG-YTVHR-QBPX3-RJP64", + "CoreSingleLanguage": "BB6NG-PQ82V-VRDPW-8XVD2-V8P66", + "ServerDatacenterCore": "W3GGN-FT8W3-Y4M27-J84CP-Q3VJ9", + "Professional": "GCRJD-8NW9H-F2CDX-CCM8D-9D6T9", + "ServerSolutionCore": "KNC87-3J2TX-XB4WP-VCPJV-M4FWM", + "ServerSolution": "KNC87-3J2TX-XB4WP-VCPJV-M4FWM", + "EmbeddedIndustryE": "FNFKF-PWTVT-9RC8H-32HB2-JB34X", + "ProfessionalN": "HMCNV-VVBFX-7HMBH-CTY9B-B4FXY", + "EmbeddedIndustry": "NMMPB-38DD4-R2823-62W8D-VXKJB", + "CoreCountrySpecific": "NCTT7-2RGK8-WMHRF-RY7YQ-JTXG3", + "ProfessionalWMC": "789NJ-TQK6T-6XTH8-J39CJ-J8D3P", + "ServerDatacenter": "W3GGN-FT8W3-Y4M27-J84CP-Q3VJ9", + "ServerStandardCore": "D2N9P-3P6X9-2R39C-7RTCD-MDVJX", + "Enterprise": "MHF9N-XY6XB-WVXMC-BTDCT-MKKG7", + "Core": "M9Q9P-WNJJT-6PXPY-DWX8H-6XWKK", + "EnterpriseN": "TT4HM-HN7YT-62K67-RGRQJ-JFFXW", + }, + + (6, 3, constant.VOL_ACT_AVMA): { + # Windows server 2012 R2 + "ServerSolutionCore": "K2XGM-NMBT3-2R6Q8-WF2FK-P36R2", + "ServerDatacenterCore": "Y4TGP-NPTV9-HTC2H-7MGQ3-DV4TW", + "ServerStandardCore": "DBGBW-NPF86-BJVTX-K3WKJ-MTB6V", + "ServerSolution": "K2XGM-NMBT3-2R6Q8-WF2FK-P36R2", + "ServerDatacenter": "Y4TGP-NPTV9-HTC2H-7MGQ3-DV4TW", + "ServerStandard": "DBGBW-NPF86-BJVTX-K3WKJ-MTB6V", + }, + + (10, 0, constant.VOL_ACT_KMS): { + # Windows 10, Windows Server 2016 + "Professional": "W269N-WFGWX-YVC9B-4J6C9-T83GX", + "ProfessionalN": "MH37W-N47XK-V7XM9-C7227-GCQG9", + "Enterprise": "NPPR9-FWDCX-D2C8J-H872K-2YT43", + "EnterpriseN": "DPH2V-TTNVB-4X9Q3-TJR4H-KHJW4", + "ServerDatacenterCore": "CB7KF-BWN84-R7R2Y-793K2-8XDDG", + "ServerDatacenter": "CB7KF-BWN84-R7R2Y-793K2-8XDDG", + "ServerStandardCore": "WC2BQ-8NRM3-FDDYY-2BFGV-KHKQY", + "ServerStandard": "WC2BQ-8NRM3-FDDYY-2BFGV-KHKQY", + "ServerSolutionCore": "JCKRF-N37P4-C2D82-9YXRT-4M63B", + "ServerSolution": "JCKRF-N37P4-C2D82-9YXRT-4M63B", + "ServerCloudStorage": "QN4C6-GBJD2-FB422-GHWJK-GJG2R", + "ServerCloudStorageCore": "QN4C6-GBJD2-FB422-GHWJK-GJG2R", + "ServerAzureCor": "VP34G-4NPPG-79JTQ-864T4-R3MQX", + "ServerAzureCorCore": "VP34G-4NPPG-79JTQ-864T4-R3MQX", + }, + + (10, 0, constant.VOL_ACT_AVMA): { + # Windows server 2016 + "ServerSolutionCore": "B4YNW-62DX9-W8V6M-82649-MHBKQ", + "ServerDatacenterCore": "TMJ3Y-NTRTM-FJYXT-T22BY-CWG3J", + "ServerStandardCore": "C3RCX-M6NRP-6CXC9-TW2F2-4RHYD", + "ServerSolution": "B4YNW-62DX9-W8V6M-82649-MHBKQ", + "ServerDatacenter": "TMJ3Y-NTRTM-FJYXT-T22BY-CWG3J", + "ServerStandard": "C3RCX-M6NRP-6CXC9-TW2F2-4RHYD", + }, +}