diff --git a/os_win/constants.py b/os_win/constants.py index 3b1216d..25273f2 100644 --- a/os_win/constants.py +++ b/os_win/constants.py @@ -172,3 +172,25 @@ CLUSTER_GROUP_PENDING = 4 EXPORT_CONFIG_SNAPSHOTS_ALL = 0 EXPORT_CONFIG_NO_SNAPSHOTS = 1 EXPORT_CONFIG_ONE_SNAPSHOT = 2 + +# ACE inheritance flags +ACE_OBJECT_INHERIT = 0x1 +ACE_CONTAINER_INHERIT = 0x2 +ACE_NO_PROPAGATE_INHERIT = 0x4 +ACE_INHERIT_ONLY = 0x8 +ACE_INHERITED = 0x10 + +# ACE access masks +ACE_GENERIC_READ = 0x80000000 +ACE_GENERIC_WRITE = 0x40000000 +ACE_GENERIC_EXECUTE = 0x20000000 +ACE_GENERIC_ALL = 0x10000000 + +# ACE access modes +ACE_NOT_USED_ACCESS = 0 +ACE_GRANT_ACCESS = 1 +ACE_SET_ACCESS = 2 +ACE_DENY_ACCESS = 3 +ACE_REVOKE_ACCESS = 4 +ACE_SET_AUDIT_SUCCESS = 5 +ACE_SET_AUDIT_FAILURE = 6 diff --git a/os_win/tests/functional/__init__.py b/os_win/tests/functional/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/os_win/tests/functional/test_base.py b/os_win/tests/functional/test_base.py new file mode 100644 index 0000000..c497d9a --- /dev/null +++ b/os_win/tests/functional/test_base.py @@ -0,0 +1,26 @@ +# Copyright 2016 Cloudbase Solutions Srl +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from oslotest import base + + +class OsWinBaseFunctionalTestCase(base.BaseTestCase): + def setUp(self): + super(OsWinBaseFunctionalTestCase, self).setUp() + if not os.name == 'nt': + raise self.skipException("os-win functional tests can only " + "be run on Windows.") diff --git a/os_win/tests/functional/test_pathutils.py b/os_win/tests/functional/test_pathutils.py new file mode 100644 index 0000000..18d4e23 --- /dev/null +++ b/os_win/tests/functional/test_pathutils.py @@ -0,0 +1,81 @@ +# Copyright 2016 Cloudbase Solutions Srl +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import re +import tempfile + +from os_win import _utils +from os_win import constants +from os_win.tests.functional import test_base +from os_win import utilsfactory + + +class PathUtilsTestCase(test_base.OsWinBaseFunctionalTestCase): + def setUp(self): + super(PathUtilsTestCase, self).setUp() + self._pathutils = utilsfactory.get_pathutils() + + def _get_raw_icacls_info(self, path): + return _utils.execute("icacls.exe", path)[0] + + def _assert_contains_ace(self, path, access_to, access_flags): + raw_out = self._get_raw_icacls_info(path) + + # The flags will be matched regardless of + # other flags and their order. + escaped_access_flags = access_flags.replace( + "(", "(?=.*\(").replace(")", r"\))") + pattern = "%s:%s.*" % (access_to, escaped_access_flags) + + match = re.findall(pattern, raw_out, + flags=re.IGNORECASE | re.MULTILINE) + if not match: + fail_msg = ("The file does not contain the expected ACL rules. " + "Raw icacls output: %s. Expected access rule: %s") + expected_rule = ":".join([access_to, access_flags]) + self.fail(fail_msg % (raw_out, expected_rule)) + + def test_acls(self): + tmp_suffix = 'oswin-func-test' + tmp_dir = tempfile.mkdtemp(suffix=tmp_suffix) + self.addCleanup(self._pathutils.rmtree, tmp_dir) + + tmp_file_paths = [] + for idx in range(2): + tmp_file_path = os.path.join(tmp_dir, + 'tmp_file_%s' % idx) + with open(tmp_file_path, 'w') as f: + f.write('test') + tmp_file_paths.append(tmp_file_path) + + trustee = "NULL SID" + self._pathutils.add_acl_rule( + path=tmp_dir, + trustee_name=trustee, + access_rights=constants.ACE_GENERIC_READ, + access_mode=constants.ACE_GRANT_ACCESS, + inheritance_flags=(constants.ACE_OBJECT_INHERIT | + constants.ACE_CONTAINER_INHERIT)) + self._pathutils.add_acl_rule( + path=tmp_file_paths[0], + trustee_name=trustee, + access_rights=constants.ACE_GENERIC_WRITE, + access_mode=constants.ACE_GRANT_ACCESS) + self._pathutils.copy_acls(tmp_file_paths[0], tmp_file_paths[1]) + + self._assert_contains_ace(tmp_dir, trustee, "(OI)(CI).*(GR)") + for path in tmp_file_paths: + self._assert_contains_ace(path, trustee, ("(W,Rc)")) diff --git a/os_win/tests/utils/test_aclutils.py b/os_win/tests/utils/test_aclutils.py new file mode 100644 index 0000000..f93aa41 --- /dev/null +++ b/os_win/tests/utils/test_aclutils.py @@ -0,0 +1,124 @@ +# Copyright 2016 Cloudbase Solutions Srl +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ddt +import mock + +from os_win.tests import test_base +from os_win.utils import _acl_utils + + +@ddt.ddt +class ACLUtilsTestCase(test_base.OsWinBaseTestCase): + def setUp(self): + super(ACLUtilsTestCase, self).setUp() + self._setup_lib_mocks() + + self._acl_utils = _acl_utils.ACLUtils() + self._acl_utils._win32_utils = mock.Mock() + self._mock_run = self._acl_utils._win32_utils.run_and_check_output + + def _setup_lib_mocks(self): + self._ctypes = mock.Mock() + self._ctypes.c_wchar_p = lambda x: (x, "c_wchar_p") + self._ctypes.c_uint = lambda x: (x, 'c_uint') + self._ctypes.c_ulong = lambda x: (x, 'c_ulong') + + mock.patch.multiple(_acl_utils, + ctypes=self._ctypes, + advapi=mock.DEFAULT, + kernel32=mock.DEFAULT, + create=True).start() + + def test_get_void_pp(self): + pp_void = self._acl_utils._get_void_pp() + + self.assertEqual(pp_void, self._ctypes.pointer.return_value) + self._ctypes.pointer.assert_called_once_with( + self._ctypes.c_void_p.return_value) + self._ctypes.c_void_p.assert_called_once_with() + + @ddt.data( + {'security_info_flags': + (_acl_utils.OWNER_SECURITY_INFORMATION | + _acl_utils.GROUP_SECURITY_INFORMATION | + _acl_utils.DACL_SECURITY_INFORMATION), + 'expected_info': ['pp_sid_owner', 'pp_sid_group', + 'pp_dacl', 'pp_sec_desc']}, + {'security_info_flags': _acl_utils.SACL_SECURITY_INFORMATION, + 'expected_info': ['pp_sacl', 'pp_sec_desc']}) + @ddt.unpack + @mock.patch.object(_acl_utils.ACLUtils, '_get_void_pp') + def test_get_named_security_info(self, mock_get_void_pp, + security_info_flags, + expected_info): + sec_info = self._acl_utils.get_named_security_info( + mock.sentinel.obj_name, + mock.sentinel.obj_type, + security_info_flags) + + self.assertEqual(set(expected_info), set(sec_info.keys())) + for field in expected_info: + self.assertEqual(sec_info[field], + mock_get_void_pp.return_value) + + self._mock_run.assert_called_once_with( + _acl_utils.advapi.GetNamedSecurityInfoW, + self._ctypes.c_wchar_p(mock.sentinel.obj_name), + self._ctypes.c_uint(mock.sentinel.obj_type), + self._ctypes.c_uint(security_info_flags), + sec_info.get('pp_sid_owner'), + sec_info.get('pp_sid_group'), + sec_info.get('pp_dacl'), + sec_info.get('pp_sacl'), + sec_info['pp_sec_desc']) + + @mock.patch.object(_acl_utils.ACLUtils, '_get_void_pp') + def test_set_entries_in_acl(self, mock_get_void_pp): + new_acl = mock_get_void_pp.return_value + + returned_acl = self._acl_utils.set_entries_in_acl( + mock.sentinel.entry_count, + mock.sentinel.entry_list, + mock.sentinel.old_acl) + + self.assertEqual(new_acl, returned_acl) + self._mock_run.assert_called_once_with( + _acl_utils.advapi.SetEntriesInAclW, + self._ctypes.c_ulong(mock.sentinel.entry_count), + mock.sentinel.entry_list, + mock.sentinel.old_acl, + new_acl) + mock_get_void_pp.assert_called_once_with() + + def test_set_named_security_info(self): + self._acl_utils.set_named_security_info( + mock.sentinel.obj_name, + mock.sentinel.obj_type, + mock.sentinel.security_info_flags, + mock.sentinel.p_sid_owner, + mock.sentinel.p_sid_group, + mock.sentinel.p_dacl, + mock.sentinel.p_sacl) + + self._mock_run.assert_called_once_with( + _acl_utils.advapi.SetNamedSecurityInfoW, + self._ctypes.c_wchar_p(mock.sentinel.obj_name), + self._ctypes.c_uint(mock.sentinel.obj_type), + self._ctypes.c_uint(mock.sentinel.security_info_flags), + mock.sentinel.p_sid_owner, + mock.sentinel.p_sid_group, + mock.sentinel.p_dacl, + mock.sentinel.p_sacl) diff --git a/os_win/tests/utils/test_pathutils.py b/os_win/tests/utils/test_pathutils.py index 7bef22d..aa5cf1b 100644 --- a/os_win/tests/utils/test_pathutils.py +++ b/os_win/tests/utils/test_pathutils.py @@ -12,13 +12,16 @@ # License for the specific language governing permissions and limitations # under the License. +import ctypes import os import shutil import mock +from os_win import constants from os_win import exceptions from os_win.tests import test_base +from os_win.utils import _acl_utils from os_win.utils import pathutils @@ -31,7 +34,9 @@ class PathUtilsTestCase(test_base.OsWinBaseTestCase): self._pathutils = pathutils.PathUtils() self._pathutils._win32_utils = mock.Mock() + self._pathutils._acl_utils = mock.Mock() self._mock_run = self._pathutils._win32_utils.run_and_check_output + self._acl_utils = self._pathutils._acl_utils def _setup_lib_mocks(self): self._ctypes = mock.Mock() @@ -39,10 +44,15 @@ class PathUtilsTestCase(test_base.OsWinBaseTestCase): self._wintypes.BOOL = lambda x: (x, 'BOOL') self._ctypes.c_wchar_p = lambda x: (x, "c_wchar_p") + self._ctypes.pointer = lambda x: (x, 'pointer') + + self._ctypes_patcher = mock.patch.object( + pathutils, 'ctypes', new=self._ctypes) + self._ctypes_patcher.start() mock.patch.multiple(pathutils, wintypes=self._wintypes, - ctypes=self._ctypes, kernel32=mock.DEFAULT, + kernel32=mock.DEFAULT, create=True).start() @mock.patch.object(pathutils.PathUtils, 'rename') @@ -211,3 +221,86 @@ class PathUtilsTestCase(test_base.OsWinBaseTestCase): [mock.call(mock.sentinel.src), mock.call(mock.sentinel.dest)]) mock_copytree.assert_called_once_with(mock.sentinel.src, mock.sentinel.dest) + + def test_add_acl_rule(self): + # We raise an expected exception in order to + # easily verify the resource cleanup. + raised_exc = exceptions.OSWinException + self._ctypes_patcher.stop() + + fake_trustee = 'FAKEDOMAIN\\FakeUser' + mock_sec_info = dict(pp_sec_desc=mock.Mock(), + pp_dacl=mock.Mock()) + self._acl_utils.get_named_security_info.return_value = mock_sec_info + self._acl_utils.set_named_security_info.side_effect = raised_exc + pp_new_dacl = self._acl_utils.set_entries_in_acl.return_value + + self.assertRaises(raised_exc, + self._pathutils.add_acl_rule, + path=mock.sentinel.path, + trustee_name=fake_trustee, + access_rights=constants.ACE_GENERIC_READ, + access_mode=constants.ACE_GRANT_ACCESS, + inheritance_flags=constants.ACE_OBJECT_INHERIT) + + self._acl_utils.get_named_security_info.assert_called_once_with( + obj_name=mock.sentinel.path, + obj_type=_acl_utils.SE_FILE_OBJECT, + security_info_flags=_acl_utils.DACL_SECURITY_INFORMATION) + self._acl_utils.set_entries_in_acl.assert_called_once_with( + entry_count=1, + p_explicit_entry_list=mock.ANY, + p_old_acl=mock_sec_info['pp_dacl'].contents) + self._acl_utils.set_named_security_info.assert_called_once_with( + obj_name=mock.sentinel.path, + obj_type=_acl_utils.SE_FILE_OBJECT, + security_info_flags=_acl_utils.DACL_SECURITY_INFORMATION, + p_dacl=pp_new_dacl.contents) + + p_access = self._acl_utils.set_entries_in_acl.call_args_list[0][1][ + 'p_explicit_entry_list'] + access = ctypes.cast( + p_access, + ctypes.POINTER(_acl_utils.EXPLICIT_ACCESS)).contents + + self.assertEqual(constants.ACE_GENERIC_READ, + access.grfAccessPermissions) + self.assertEqual(constants.ACE_GRANT_ACCESS, + access.grfAccessMode) + self.assertEqual(constants.ACE_OBJECT_INHERIT, + access.grfInheritance) + self.assertEqual(_acl_utils.TRUSTEE_IS_NAME, + access.Trustee.TrusteeForm) + self.assertEqual(fake_trustee, + access.Trustee.pstrName) + + self._pathutils._win32_utils.local_free.assert_has_calls( + [mock.call(pointer) + for pointer in [mock_sec_info['pp_sec_desc'].contents, + pp_new_dacl.contents]]) + + def test_copy_acls(self): + raised_exc = exceptions.OSWinException + + mock_sec_info = dict(pp_sec_desc=mock.Mock(), + pp_dacl=mock.Mock()) + self._acl_utils.get_named_security_info.return_value = mock_sec_info + self._acl_utils.set_named_security_info.side_effect = raised_exc + + self.assertRaises(raised_exc, + self._pathutils.copy_acls, + mock.sentinel.src, + mock.sentinel.dest) + + self._acl_utils.get_named_security_info.assert_called_once_with( + obj_name=mock.sentinel.src, + obj_type=_acl_utils.SE_FILE_OBJECT, + security_info_flags=_acl_utils.DACL_SECURITY_INFORMATION) + self._acl_utils.set_named_security_info.assert_called_once_with( + obj_name=mock.sentinel.dest, + obj_type=_acl_utils.SE_FILE_OBJECT, + security_info_flags=_acl_utils.DACL_SECURITY_INFORMATION, + p_dacl=mock_sec_info['pp_dacl'].contents) + + self._pathutils._win32_utils.local_free.assert_called_once_with( + mock_sec_info['pp_sec_desc'].contents) diff --git a/os_win/tests/utils/test_win32utils.py b/os_win/tests/utils/test_win32utils.py index 2ad8cae..79c2984 100644 --- a/os_win/tests/utils/test_win32utils.py +++ b/os_win/tests/utils/test_win32utils.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import ddt import mock from oslotest import base @@ -22,6 +23,7 @@ from os_win import exceptions from os_win.utils import win32utils +@ddt.ddt class Win32UtilsTestCase(base.BaseTestCase): def setUp(self): super(Win32UtilsTestCase, self).setUp() @@ -209,3 +211,14 @@ class Win32UtilsTestCase(base.BaseTestCase): def get_com_error_hresult_missing_excepinfo(self): ret_val = self._win32_utils.get_com_error_hresult(None) self.assertIsNone(ret_val) + + @ddt.data(0, 1) + @mock.patch.object(win32utils.LOG, 'exception') + def test_local_free(self, ret_val, mock_log_exc): + mock_localfree = win32utils.kernel32.LocalFree + mock_localfree.return_value = ret_val + + self._win32_utils.local_free(mock.sentinel.handle) + + mock_localfree.assert_any_call(mock.sentinel.handle) + self.assertEqual(bool(ret_val), mock_log_exc.called) diff --git a/os_win/utils/_acl_utils.py b/os_win/utils/_acl_utils.py new file mode 100644 index 0000000..81e405c --- /dev/null +++ b/os_win/utils/_acl_utils.py @@ -0,0 +1,121 @@ +# Copyright 2016 Cloudbase Solutions Srl +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ctypes +import sys + +from os_win.utils import win32utils + +if sys.platform == 'win32': + advapi = ctypes.windll.AdvApi32 + + +OWNER_SECURITY_INFORMATION = 0x00000001 +GROUP_SECURITY_INFORMATION = 0x00000002 +DACL_SECURITY_INFORMATION = 0x00000004 +SACL_SECURITY_INFORMATION = 0x00000008 + +# Trustee form constants +TRUSTEE_IS_NAME = 1 + +# Indicates a file or directory object. +SE_FILE_OBJECT = 1 + + +class TRUSTEE(ctypes.Structure): + _fields_ = [('pMultipleTrustee', ctypes.c_void_p), + ('MultipleTrusteeOperation', ctypes.c_uint), + ('TrusteeForm', ctypes.c_uint), + ('TrusteeType', ctypes.c_uint), + ('pstrName', ctypes.c_wchar_p)] + + +class EXPLICIT_ACCESS(ctypes.Structure): + _fields_ = [('grfAccessPermissions', ctypes.c_ulong), + ('grfAccessMode', ctypes.c_uint), + ('grfInheritance', ctypes.c_ulong), + ('Trustee', TRUSTEE)] + + +class ACLUtils(object): + def __init__(self): + self._win32_utils = win32utils.Win32Utils() + + @staticmethod + def _get_void_pp(): + return ctypes.pointer(ctypes.c_void_p()) + + def get_named_security_info(self, obj_name, obj_type, security_info_flags): + """Retrieve object security information. + + :param security_info_flags: specifies which informations will + be retrieved. + :param ret_val: dict, containing pointers to the requested structures. + Note that the returned security descriptor will have + to be freed using LocalFree. + Some requested information may not be present, in + which case the according pointers will be NULL. + """ + sec_info = {} + + if security_info_flags & OWNER_SECURITY_INFORMATION: + sec_info['pp_sid_owner'] = self._get_void_pp() + if security_info_flags & GROUP_SECURITY_INFORMATION: + sec_info['pp_sid_group'] = self._get_void_pp() + if security_info_flags & DACL_SECURITY_INFORMATION: + sec_info['pp_dacl'] = self._get_void_pp() + if security_info_flags & SACL_SECURITY_INFORMATION: + sec_info['pp_sacl'] = self._get_void_pp() + sec_info['pp_sec_desc'] = self._get_void_pp() + + self._win32_utils.run_and_check_output( + advapi.GetNamedSecurityInfoW, + ctypes.c_wchar_p(obj_name), + ctypes.c_uint(obj_type), + ctypes.c_uint(security_info_flags), + sec_info.get('pp_sid_owner'), + sec_info.get('pp_sid_group'), + sec_info.get('pp_dacl'), + sec_info.get('pp_sacl'), + sec_info['pp_sec_desc']) + + return sec_info + + def set_entries_in_acl(self, entry_count, p_explicit_entry_list, + p_old_acl): + """Merge new ACEs into an existing ACL, returing a new ACL.""" + pp_new_acl = self._get_void_pp() + + self._win32_utils.run_and_check_output( + advapi.SetEntriesInAclW, + ctypes.c_ulong(entry_count), + p_explicit_entry_list, + p_old_acl, + pp_new_acl) + + return pp_new_acl + + def set_named_security_info(self, obj_name, obj_type, security_info_flags, + p_sid_owner=None, p_sid_group=None, + p_dacl=None, p_sacl=None): + self._win32_utils.run_and_check_output( + advapi.SetNamedSecurityInfoW, + ctypes.c_wchar_p(obj_name), + ctypes.c_uint(obj_type), + ctypes.c_uint(security_info_flags), + p_sid_owner, + p_sid_group, + p_dacl, + p_sacl) diff --git a/os_win/utils/pathutils.py b/os_win/utils/pathutils.py index 0ffdf60..44f31d5 100644 --- a/os_win/utils/pathutils.py +++ b/os_win/utils/pathutils.py @@ -26,6 +26,7 @@ import six from os_win._i18n import _ from os_win import exceptions +from os_win.utils import _acl_utils from os_win.utils import win32utils if sys.platform == 'win32': @@ -42,6 +43,7 @@ class PathUtils(object): def __init__(self): self._win32_utils = win32utils.Win32Utils() + self._acl_utils = _acl_utils.ACLUtils() def open(self, path, mode): """Wrapper on __builtin__.open used to simplify unit testing.""" @@ -170,3 +172,61 @@ class PathUtils(object): finally: if tmp_file_path: fileutils.delete_if_exists(tmp_file_path) + + def add_acl_rule(self, path, trustee_name, + access_rights, access_mode, + inheritance_flags=0): + """Adds the requested access rule to a file or object. + + Can be used for granting/revoking access. + """ + p_to_free = [] + + try: + sec_info = self._acl_utils.get_named_security_info( + obj_name=path, + obj_type=_acl_utils.SE_FILE_OBJECT, + security_info_flags=_acl_utils.DACL_SECURITY_INFORMATION) + p_to_free.append(sec_info['pp_sec_desc'].contents) + + access = _acl_utils.EXPLICIT_ACCESS() + access.grfAccessPermissions = access_rights + access.grfAccessMode = access_mode + access.grfInheritance = inheritance_flags + access.Trustee.TrusteeForm = _acl_utils.TRUSTEE_IS_NAME + access.Trustee.pstrName = ctypes.c_wchar_p(trustee_name) + + pp_new_dacl = self._acl_utils.set_entries_in_acl( + entry_count=1, + p_explicit_entry_list=ctypes.pointer(access), + p_old_acl=sec_info['pp_dacl'].contents) + p_to_free.append(pp_new_dacl.contents) + + self._acl_utils.set_named_security_info( + obj_name=path, + obj_type=_acl_utils.SE_FILE_OBJECT, + security_info_flags=_acl_utils.DACL_SECURITY_INFORMATION, + p_dacl=pp_new_dacl.contents) + finally: + for p in p_to_free: + self._win32_utils.local_free(p) + + def copy_acls(self, source_path, dest_path): + p_to_free = [] + + try: + sec_info_flags = _acl_utils.DACL_SECURITY_INFORMATION + sec_info = self._acl_utils.get_named_security_info( + obj_name=source_path, + obj_type=_acl_utils.SE_FILE_OBJECT, + security_info_flags=sec_info_flags) + p_to_free.append(sec_info['pp_sec_desc'].contents) + + self._acl_utils.set_named_security_info( + obj_name=dest_path, + obj_type=_acl_utils.SE_FILE_OBJECT, + security_info_flags=sec_info_flags, + p_dacl=sec_info['pp_dacl'].contents) + finally: + for p in p_to_free: + self._win32_utils.local_free(p) diff --git a/os_win/utils/win32utils.py b/os_win/utils/win32utils.py index 38c7ced..eef6c5f 100644 --- a/os_win/utils/win32utils.py +++ b/os_win/utils/win32utils.py @@ -19,6 +19,7 @@ import sys from oslo_log import log as logging +from os_win._i18n import _LE from os_win import _utils from os_win import exceptions @@ -128,3 +129,10 @@ class Win32Utils(object): return ctypes.c_uint(com_error.excepinfo[5]).value except Exception: LOG.debug("Unable to retrieve COM error hresult: %s", com_error) + + def local_free(self, handle): + try: + self._run_and_check_output(kernel32.LocalFree, handle) + except exceptions.Win32Exception: + LOG.exception(_LE("Could not deallocate memory. " + "There could be a memory leak."))