Add methods for handling file ACLs

This change adds methods that can be used for granting/revoking
file or folder permissions, as well as copying ACLs.

Co-Authored-By: Lucian Petrut <lpetrut@cloudbasesolutions.com>

Change-Id: I01efbc312d8ea81c3a4e1fd82cccc695c7472b5f
This commit is contained in:
Simona Iuliana Toader 2016-09-26 10:11:39 -07:00 committed by Lucian Petrut
parent b9d5d0b18b
commit 91febc2021
10 changed files with 549 additions and 1 deletions

View File

@ -172,3 +172,25 @@ CLUSTER_GROUP_PENDING = 4
EXPORT_CONFIG_SNAPSHOTS_ALL = 0
EXPORT_CONFIG_NO_SNAPSHOTS = 1
EXPORT_CONFIG_ONE_SNAPSHOT = 2
# ACE inheritance flags
ACE_OBJECT_INHERIT = 0x1
ACE_CONTAINER_INHERIT = 0x2
ACE_NO_PROPAGATE_INHERIT = 0x4
ACE_INHERIT_ONLY = 0x8
ACE_INHERITED = 0x10
# ACE access masks
ACE_GENERIC_READ = 0x80000000
ACE_GENERIC_WRITE = 0x40000000
ACE_GENERIC_EXECUTE = 0x20000000
ACE_GENERIC_ALL = 0x10000000
# ACE access modes
ACE_NOT_USED_ACCESS = 0
ACE_GRANT_ACCESS = 1
ACE_SET_ACCESS = 2
ACE_DENY_ACCESS = 3
ACE_REVOKE_ACCESS = 4
ACE_SET_AUDIT_SUCCESS = 5
ACE_SET_AUDIT_FAILURE = 6

View File

View File

@ -0,0 +1,26 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslotest import base
class OsWinBaseFunctionalTestCase(base.BaseTestCase):
def setUp(self):
super(OsWinBaseFunctionalTestCase, self).setUp()
if not os.name == 'nt':
raise self.skipException("os-win functional tests can only "
"be run on Windows.")

View File

@ -0,0 +1,81 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import re
import tempfile
from os_win import _utils
from os_win import constants
from os_win.tests.functional import test_base
from os_win import utilsfactory
class PathUtilsTestCase(test_base.OsWinBaseFunctionalTestCase):
def setUp(self):
super(PathUtilsTestCase, self).setUp()
self._pathutils = utilsfactory.get_pathutils()
def _get_raw_icacls_info(self, path):
return _utils.execute("icacls.exe", path)[0]
def _assert_contains_ace(self, path, access_to, access_flags):
raw_out = self._get_raw_icacls_info(path)
# The flags will be matched regardless of
# other flags and their order.
escaped_access_flags = access_flags.replace(
"(", "(?=.*\(").replace(")", r"\))")
pattern = "%s:%s.*" % (access_to, escaped_access_flags)
match = re.findall(pattern, raw_out,
flags=re.IGNORECASE | re.MULTILINE)
if not match:
fail_msg = ("The file does not contain the expected ACL rules. "
"Raw icacls output: %s. Expected access rule: %s")
expected_rule = ":".join([access_to, access_flags])
self.fail(fail_msg % (raw_out, expected_rule))
def test_acls(self):
tmp_suffix = 'oswin-func-test'
tmp_dir = tempfile.mkdtemp(suffix=tmp_suffix)
self.addCleanup(self._pathutils.rmtree, tmp_dir)
tmp_file_paths = []
for idx in range(2):
tmp_file_path = os.path.join(tmp_dir,
'tmp_file_%s' % idx)
with open(tmp_file_path, 'w') as f:
f.write('test')
tmp_file_paths.append(tmp_file_path)
trustee = "NULL SID"
self._pathutils.add_acl_rule(
path=tmp_dir,
trustee_name=trustee,
access_rights=constants.ACE_GENERIC_READ,
access_mode=constants.ACE_GRANT_ACCESS,
inheritance_flags=(constants.ACE_OBJECT_INHERIT |
constants.ACE_CONTAINER_INHERIT))
self._pathutils.add_acl_rule(
path=tmp_file_paths[0],
trustee_name=trustee,
access_rights=constants.ACE_GENERIC_WRITE,
access_mode=constants.ACE_GRANT_ACCESS)
self._pathutils.copy_acls(tmp_file_paths[0], tmp_file_paths[1])
self._assert_contains_ace(tmp_dir, trustee, "(OI)(CI).*(GR)")
for path in tmp_file_paths:
self._assert_contains_ace(path, trustee, ("(W,Rc)"))

View File

@ -0,0 +1,124 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from os_win.tests import test_base
from os_win.utils import _acl_utils
@ddt.ddt
class ACLUtilsTestCase(test_base.OsWinBaseTestCase):
def setUp(self):
super(ACLUtilsTestCase, self).setUp()
self._setup_lib_mocks()
self._acl_utils = _acl_utils.ACLUtils()
self._acl_utils._win32_utils = mock.Mock()
self._mock_run = self._acl_utils._win32_utils.run_and_check_output
def _setup_lib_mocks(self):
self._ctypes = mock.Mock()
self._ctypes.c_wchar_p = lambda x: (x, "c_wchar_p")
self._ctypes.c_uint = lambda x: (x, 'c_uint')
self._ctypes.c_ulong = lambda x: (x, 'c_ulong')
mock.patch.multiple(_acl_utils,
ctypes=self._ctypes,
advapi=mock.DEFAULT,
kernel32=mock.DEFAULT,
create=True).start()
def test_get_void_pp(self):
pp_void = self._acl_utils._get_void_pp()
self.assertEqual(pp_void, self._ctypes.pointer.return_value)
self._ctypes.pointer.assert_called_once_with(
self._ctypes.c_void_p.return_value)
self._ctypes.c_void_p.assert_called_once_with()
@ddt.data(
{'security_info_flags':
(_acl_utils.OWNER_SECURITY_INFORMATION |
_acl_utils.GROUP_SECURITY_INFORMATION |
_acl_utils.DACL_SECURITY_INFORMATION),
'expected_info': ['pp_sid_owner', 'pp_sid_group',
'pp_dacl', 'pp_sec_desc']},
{'security_info_flags': _acl_utils.SACL_SECURITY_INFORMATION,
'expected_info': ['pp_sacl', 'pp_sec_desc']})
@ddt.unpack
@mock.patch.object(_acl_utils.ACLUtils, '_get_void_pp')
def test_get_named_security_info(self, mock_get_void_pp,
security_info_flags,
expected_info):
sec_info = self._acl_utils.get_named_security_info(
mock.sentinel.obj_name,
mock.sentinel.obj_type,
security_info_flags)
self.assertEqual(set(expected_info), set(sec_info.keys()))
for field in expected_info:
self.assertEqual(sec_info[field],
mock_get_void_pp.return_value)
self._mock_run.assert_called_once_with(
_acl_utils.advapi.GetNamedSecurityInfoW,
self._ctypes.c_wchar_p(mock.sentinel.obj_name),
self._ctypes.c_uint(mock.sentinel.obj_type),
self._ctypes.c_uint(security_info_flags),
sec_info.get('pp_sid_owner'),
sec_info.get('pp_sid_group'),
sec_info.get('pp_dacl'),
sec_info.get('pp_sacl'),
sec_info['pp_sec_desc'])
@mock.patch.object(_acl_utils.ACLUtils, '_get_void_pp')
def test_set_entries_in_acl(self, mock_get_void_pp):
new_acl = mock_get_void_pp.return_value
returned_acl = self._acl_utils.set_entries_in_acl(
mock.sentinel.entry_count,
mock.sentinel.entry_list,
mock.sentinel.old_acl)
self.assertEqual(new_acl, returned_acl)
self._mock_run.assert_called_once_with(
_acl_utils.advapi.SetEntriesInAclW,
self._ctypes.c_ulong(mock.sentinel.entry_count),
mock.sentinel.entry_list,
mock.sentinel.old_acl,
new_acl)
mock_get_void_pp.assert_called_once_with()
def test_set_named_security_info(self):
self._acl_utils.set_named_security_info(
mock.sentinel.obj_name,
mock.sentinel.obj_type,
mock.sentinel.security_info_flags,
mock.sentinel.p_sid_owner,
mock.sentinel.p_sid_group,
mock.sentinel.p_dacl,
mock.sentinel.p_sacl)
self._mock_run.assert_called_once_with(
_acl_utils.advapi.SetNamedSecurityInfoW,
self._ctypes.c_wchar_p(mock.sentinel.obj_name),
self._ctypes.c_uint(mock.sentinel.obj_type),
self._ctypes.c_uint(mock.sentinel.security_info_flags),
mock.sentinel.p_sid_owner,
mock.sentinel.p_sid_group,
mock.sentinel.p_dacl,
mock.sentinel.p_sacl)

View File

@ -12,13 +12,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import os
import shutil
import mock
from os_win import constants
from os_win import exceptions
from os_win.tests import test_base
from os_win.utils import _acl_utils
from os_win.utils import pathutils
@ -31,7 +34,9 @@ class PathUtilsTestCase(test_base.OsWinBaseTestCase):
self._pathutils = pathutils.PathUtils()
self._pathutils._win32_utils = mock.Mock()
self._pathutils._acl_utils = mock.Mock()
self._mock_run = self._pathutils._win32_utils.run_and_check_output
self._acl_utils = self._pathutils._acl_utils
def _setup_lib_mocks(self):
self._ctypes = mock.Mock()
@ -39,10 +44,15 @@ class PathUtilsTestCase(test_base.OsWinBaseTestCase):
self._wintypes.BOOL = lambda x: (x, 'BOOL')
self._ctypes.c_wchar_p = lambda x: (x, "c_wchar_p")
self._ctypes.pointer = lambda x: (x, 'pointer')
self._ctypes_patcher = mock.patch.object(
pathutils, 'ctypes', new=self._ctypes)
self._ctypes_patcher.start()
mock.patch.multiple(pathutils,
wintypes=self._wintypes,
ctypes=self._ctypes, kernel32=mock.DEFAULT,
kernel32=mock.DEFAULT,
create=True).start()
@mock.patch.object(pathutils.PathUtils, 'rename')
@ -211,3 +221,86 @@ class PathUtilsTestCase(test_base.OsWinBaseTestCase):
[mock.call(mock.sentinel.src), mock.call(mock.sentinel.dest)])
mock_copytree.assert_called_once_with(mock.sentinel.src,
mock.sentinel.dest)
def test_add_acl_rule(self):
# We raise an expected exception in order to
# easily verify the resource cleanup.
raised_exc = exceptions.OSWinException
self._ctypes_patcher.stop()
fake_trustee = 'FAKEDOMAIN\\FakeUser'
mock_sec_info = dict(pp_sec_desc=mock.Mock(),
pp_dacl=mock.Mock())
self._acl_utils.get_named_security_info.return_value = mock_sec_info
self._acl_utils.set_named_security_info.side_effect = raised_exc
pp_new_dacl = self._acl_utils.set_entries_in_acl.return_value
self.assertRaises(raised_exc,
self._pathutils.add_acl_rule,
path=mock.sentinel.path,
trustee_name=fake_trustee,
access_rights=constants.ACE_GENERIC_READ,
access_mode=constants.ACE_GRANT_ACCESS,
inheritance_flags=constants.ACE_OBJECT_INHERIT)
self._acl_utils.get_named_security_info.assert_called_once_with(
obj_name=mock.sentinel.path,
obj_type=_acl_utils.SE_FILE_OBJECT,
security_info_flags=_acl_utils.DACL_SECURITY_INFORMATION)
self._acl_utils.set_entries_in_acl.assert_called_once_with(
entry_count=1,
p_explicit_entry_list=mock.ANY,
p_old_acl=mock_sec_info['pp_dacl'].contents)
self._acl_utils.set_named_security_info.assert_called_once_with(
obj_name=mock.sentinel.path,
obj_type=_acl_utils.SE_FILE_OBJECT,
security_info_flags=_acl_utils.DACL_SECURITY_INFORMATION,
p_dacl=pp_new_dacl.contents)
p_access = self._acl_utils.set_entries_in_acl.call_args_list[0][1][
'p_explicit_entry_list']
access = ctypes.cast(
p_access,
ctypes.POINTER(_acl_utils.EXPLICIT_ACCESS)).contents
self.assertEqual(constants.ACE_GENERIC_READ,
access.grfAccessPermissions)
self.assertEqual(constants.ACE_GRANT_ACCESS,
access.grfAccessMode)
self.assertEqual(constants.ACE_OBJECT_INHERIT,
access.grfInheritance)
self.assertEqual(_acl_utils.TRUSTEE_IS_NAME,
access.Trustee.TrusteeForm)
self.assertEqual(fake_trustee,
access.Trustee.pstrName)
self._pathutils._win32_utils.local_free.assert_has_calls(
[mock.call(pointer)
for pointer in [mock_sec_info['pp_sec_desc'].contents,
pp_new_dacl.contents]])
def test_copy_acls(self):
raised_exc = exceptions.OSWinException
mock_sec_info = dict(pp_sec_desc=mock.Mock(),
pp_dacl=mock.Mock())
self._acl_utils.get_named_security_info.return_value = mock_sec_info
self._acl_utils.set_named_security_info.side_effect = raised_exc
self.assertRaises(raised_exc,
self._pathutils.copy_acls,
mock.sentinel.src,
mock.sentinel.dest)
self._acl_utils.get_named_security_info.assert_called_once_with(
obj_name=mock.sentinel.src,
obj_type=_acl_utils.SE_FILE_OBJECT,
security_info_flags=_acl_utils.DACL_SECURITY_INFORMATION)
self._acl_utils.set_named_security_info.assert_called_once_with(
obj_name=mock.sentinel.dest,
obj_type=_acl_utils.SE_FILE_OBJECT,
security_info_flags=_acl_utils.DACL_SECURITY_INFORMATION,
p_dacl=mock_sec_info['pp_dacl'].contents)
self._pathutils._win32_utils.local_free.assert_called_once_with(
mock_sec_info['pp_sec_desc'].contents)

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from oslotest import base
@ -22,6 +23,7 @@ from os_win import exceptions
from os_win.utils import win32utils
@ddt.ddt
class Win32UtilsTestCase(base.BaseTestCase):
def setUp(self):
super(Win32UtilsTestCase, self).setUp()
@ -209,3 +211,14 @@ class Win32UtilsTestCase(base.BaseTestCase):
def get_com_error_hresult_missing_excepinfo(self):
ret_val = self._win32_utils.get_com_error_hresult(None)
self.assertIsNone(ret_val)
@ddt.data(0, 1)
@mock.patch.object(win32utils.LOG, 'exception')
def test_local_free(self, ret_val, mock_log_exc):
mock_localfree = win32utils.kernel32.LocalFree
mock_localfree.return_value = ret_val
self._win32_utils.local_free(mock.sentinel.handle)
mock_localfree.assert_any_call(mock.sentinel.handle)
self.assertEqual(bool(ret_val), mock_log_exc.called)

121
os_win/utils/_acl_utils.py Normal file
View File

@ -0,0 +1,121 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import sys
from os_win.utils import win32utils
if sys.platform == 'win32':
advapi = ctypes.windll.AdvApi32
OWNER_SECURITY_INFORMATION = 0x00000001
GROUP_SECURITY_INFORMATION = 0x00000002
DACL_SECURITY_INFORMATION = 0x00000004
SACL_SECURITY_INFORMATION = 0x00000008
# Trustee form constants
TRUSTEE_IS_NAME = 1
# Indicates a file or directory object.
SE_FILE_OBJECT = 1
class TRUSTEE(ctypes.Structure):
_fields_ = [('pMultipleTrustee', ctypes.c_void_p),
('MultipleTrusteeOperation', ctypes.c_uint),
('TrusteeForm', ctypes.c_uint),
('TrusteeType', ctypes.c_uint),
('pstrName', ctypes.c_wchar_p)]
class EXPLICIT_ACCESS(ctypes.Structure):
_fields_ = [('grfAccessPermissions', ctypes.c_ulong),
('grfAccessMode', ctypes.c_uint),
('grfInheritance', ctypes.c_ulong),
('Trustee', TRUSTEE)]
class ACLUtils(object):
def __init__(self):
self._win32_utils = win32utils.Win32Utils()
@staticmethod
def _get_void_pp():
return ctypes.pointer(ctypes.c_void_p())
def get_named_security_info(self, obj_name, obj_type, security_info_flags):
"""Retrieve object security information.
:param security_info_flags: specifies which informations will
be retrieved.
:param ret_val: dict, containing pointers to the requested structures.
Note that the returned security descriptor will have
to be freed using LocalFree.
Some requested information may not be present, in
which case the according pointers will be NULL.
"""
sec_info = {}
if security_info_flags & OWNER_SECURITY_INFORMATION:
sec_info['pp_sid_owner'] = self._get_void_pp()
if security_info_flags & GROUP_SECURITY_INFORMATION:
sec_info['pp_sid_group'] = self._get_void_pp()
if security_info_flags & DACL_SECURITY_INFORMATION:
sec_info['pp_dacl'] = self._get_void_pp()
if security_info_flags & SACL_SECURITY_INFORMATION:
sec_info['pp_sacl'] = self._get_void_pp()
sec_info['pp_sec_desc'] = self._get_void_pp()
self._win32_utils.run_and_check_output(
advapi.GetNamedSecurityInfoW,
ctypes.c_wchar_p(obj_name),
ctypes.c_uint(obj_type),
ctypes.c_uint(security_info_flags),
sec_info.get('pp_sid_owner'),
sec_info.get('pp_sid_group'),
sec_info.get('pp_dacl'),
sec_info.get('pp_sacl'),
sec_info['pp_sec_desc'])
return sec_info
def set_entries_in_acl(self, entry_count, p_explicit_entry_list,
p_old_acl):
"""Merge new ACEs into an existing ACL, returing a new ACL."""
pp_new_acl = self._get_void_pp()
self._win32_utils.run_and_check_output(
advapi.SetEntriesInAclW,
ctypes.c_ulong(entry_count),
p_explicit_entry_list,
p_old_acl,
pp_new_acl)
return pp_new_acl
def set_named_security_info(self, obj_name, obj_type, security_info_flags,
p_sid_owner=None, p_sid_group=None,
p_dacl=None, p_sacl=None):
self._win32_utils.run_and_check_output(
advapi.SetNamedSecurityInfoW,
ctypes.c_wchar_p(obj_name),
ctypes.c_uint(obj_type),
ctypes.c_uint(security_info_flags),
p_sid_owner,
p_sid_group,
p_dacl,
p_sacl)

View File

@ -26,6 +26,7 @@ import six
from os_win._i18n import _
from os_win import exceptions
from os_win.utils import _acl_utils
from os_win.utils import win32utils
if sys.platform == 'win32':
@ -42,6 +43,7 @@ class PathUtils(object):
def __init__(self):
self._win32_utils = win32utils.Win32Utils()
self._acl_utils = _acl_utils.ACLUtils()
def open(self, path, mode):
"""Wrapper on __builtin__.open used to simplify unit testing."""
@ -170,3 +172,61 @@ class PathUtils(object):
finally:
if tmp_file_path:
fileutils.delete_if_exists(tmp_file_path)
def add_acl_rule(self, path, trustee_name,
access_rights, access_mode,
inheritance_flags=0):
"""Adds the requested access rule to a file or object.
Can be used for granting/revoking access.
"""
p_to_free = []
try:
sec_info = self._acl_utils.get_named_security_info(
obj_name=path,
obj_type=_acl_utils.SE_FILE_OBJECT,
security_info_flags=_acl_utils.DACL_SECURITY_INFORMATION)
p_to_free.append(sec_info['pp_sec_desc'].contents)
access = _acl_utils.EXPLICIT_ACCESS()
access.grfAccessPermissions = access_rights
access.grfAccessMode = access_mode
access.grfInheritance = inheritance_flags
access.Trustee.TrusteeForm = _acl_utils.TRUSTEE_IS_NAME
access.Trustee.pstrName = ctypes.c_wchar_p(trustee_name)
pp_new_dacl = self._acl_utils.set_entries_in_acl(
entry_count=1,
p_explicit_entry_list=ctypes.pointer(access),
p_old_acl=sec_info['pp_dacl'].contents)
p_to_free.append(pp_new_dacl.contents)
self._acl_utils.set_named_security_info(
obj_name=path,
obj_type=_acl_utils.SE_FILE_OBJECT,
security_info_flags=_acl_utils.DACL_SECURITY_INFORMATION,
p_dacl=pp_new_dacl.contents)
finally:
for p in p_to_free:
self._win32_utils.local_free(p)
def copy_acls(self, source_path, dest_path):
p_to_free = []
try:
sec_info_flags = _acl_utils.DACL_SECURITY_INFORMATION
sec_info = self._acl_utils.get_named_security_info(
obj_name=source_path,
obj_type=_acl_utils.SE_FILE_OBJECT,
security_info_flags=sec_info_flags)
p_to_free.append(sec_info['pp_sec_desc'].contents)
self._acl_utils.set_named_security_info(
obj_name=dest_path,
obj_type=_acl_utils.SE_FILE_OBJECT,
security_info_flags=sec_info_flags,
p_dacl=sec_info['pp_dacl'].contents)
finally:
for p in p_to_free:
self._win32_utils.local_free(p)

View File

@ -19,6 +19,7 @@ import sys
from oslo_log import log as logging
from os_win._i18n import _LE
from os_win import _utils
from os_win import exceptions
@ -128,3 +129,10 @@ class Win32Utils(object):
return ctypes.c_uint(com_error.excepinfo[5]).value
except Exception:
LOG.debug("Unable to retrieve COM error hresult: %s", com_error)
def local_free(self, handle):
try:
self._run_and_check_output(kernel32.LocalFree, handle)
except exceptions.Win32Exception:
LOG.exception(_LE("Could not deallocate memory. "
"There could be a memory leak."))