Windows SMB: implement 'update_access' method
This change implements the 'update_access' method within the Windows SMB driver with respect to the semantics specified in the base driver. The 'allow_access' and 'deny_access' methods are now removed. Closes-Bug: #1618556 Change-Id: I2b52c48331a2c94ec4dcf34779cac5d1b73a121b
This commit is contained in:
parent
d6fb9e5064
commit
a3579839f9
@ -18,8 +18,6 @@ import os
|
||||
from oslo_log import log
|
||||
from oslo_utils import units
|
||||
|
||||
from manila.common import constants as const
|
||||
from manila import exception
|
||||
from manila.i18n import _LW
|
||||
from manila.share import driver as base_driver
|
||||
from manila.share.drivers import generic
|
||||
@ -164,22 +162,3 @@ class WindowsSMBDriver(generic.GenericShareDriver):
|
||||
disk_number = self._windows_utils.get_disk_number_by_mount_path(
|
||||
server_details, mount_path)
|
||||
return disk_number is not None
|
||||
|
||||
@generic.ensure_server
|
||||
def allow_access(self, context, share, access, share_server=None):
|
||||
"""Allow access to the share."""
|
||||
|
||||
# NOTE(vponomaryov): use direct verification for case some additional
|
||||
# level is added.
|
||||
access_level = access['access_level']
|
||||
if access_level not in (const.ACCESS_LEVEL_RW, const.ACCESS_LEVEL_RO):
|
||||
raise exception.InvalidShareAccessLevel(level=access_level)
|
||||
self._get_helper(share).allow_access(
|
||||
share_server['backend_details'], share['name'],
|
||||
access['access_type'], access['access_level'], access['access_to'])
|
||||
|
||||
@generic.ensure_server
|
||||
def deny_access(self, context, share, access, share_server=None):
|
||||
"""Deny access to the share."""
|
||||
self._get_helper(share).deny_access(
|
||||
share_server['backend_details'], share['name'], access)
|
||||
|
@ -13,13 +13,14 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import os
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from manila.common import constants
|
||||
from manila import exception
|
||||
from manila.i18n import _, _LI
|
||||
from manila.i18n import _LI, _LW
|
||||
from manila.share.drivers import helpers
|
||||
from manila.share.drivers.windows import windows_utils
|
||||
|
||||
@ -33,6 +34,27 @@ class WindowsSMBHelper(helpers.NASHelperBase):
|
||||
|
||||
_NULL_SID = "S-1-0-0"
|
||||
|
||||
_WIN_ACL_ALLOW = 0
|
||||
_WIN_ACL_DENY = 1
|
||||
|
||||
_WIN_ACCESS_RIGHT_FULL = 0
|
||||
_WIN_ACCESS_RIGHT_CHANGE = 1
|
||||
_WIN_ACCESS_RIGHT_READ = 2
|
||||
_WIN_ACCESS_RIGHT_CUSTOM = 3
|
||||
|
||||
_ACCESS_LEVEL_CUSTOM = 'custom'
|
||||
|
||||
_WIN_ACL_MAP = {
|
||||
_WIN_ACCESS_RIGHT_CHANGE: constants.ACCESS_LEVEL_RW,
|
||||
_WIN_ACCESS_RIGHT_FULL: constants.ACCESS_LEVEL_RW,
|
||||
_WIN_ACCESS_RIGHT_READ: constants.ACCESS_LEVEL_RO,
|
||||
_WIN_ACCESS_RIGHT_CUSTOM: _ACCESS_LEVEL_CUSTOM,
|
||||
}
|
||||
|
||||
_SUPPORTED_ACCESS_LEVELS = (constants.ACCESS_LEVEL_RO,
|
||||
constants.ACCESS_LEVEL_RW)
|
||||
_SUPPORTED_ACCESS_TYPES = ('user', )
|
||||
|
||||
def __init__(self, remote_execute, configuration):
|
||||
self._remote_exec = remote_execute
|
||||
self.configuration = configuration
|
||||
@ -74,36 +96,149 @@ class WindowsSMBHelper(helpers.NASHelperBase):
|
||||
server, share_path)
|
||||
return volume_path
|
||||
|
||||
def allow_access(self, server, share_name, access_type, access_level,
|
||||
access_to):
|
||||
"""Add access for share."""
|
||||
if access_type != 'user':
|
||||
reason = _('Only user access type allowed.')
|
||||
raise exception.InvalidShareAccess(reason=reason)
|
||||
def _get_acls(self, server, share_name):
|
||||
cmd = ('Get-SmbShareAccess -Name %(share_name)s | '
|
||||
'Select-Object @("Name", "AccountName", '
|
||||
'"AccessControlType", "AccessRight") | '
|
||||
'ConvertTo-JSON -Compress' % {'share_name': share_name})
|
||||
(out, err) = self._remote_exec(server, cmd)
|
||||
|
||||
self._grant_share_access(server, share_name, access_level, access_to)
|
||||
if not out.strip():
|
||||
return []
|
||||
|
||||
raw_acls = json.loads(out)
|
||||
if isinstance(raw_acls, dict):
|
||||
return [raw_acls]
|
||||
return raw_acls
|
||||
|
||||
def get_access_rules(self, server, share_name):
|
||||
raw_acls = self._get_acls(server, share_name)
|
||||
acls = []
|
||||
|
||||
for raw_acl in raw_acls:
|
||||
access_to = raw_acl['AccountName']
|
||||
access_right = raw_acl['AccessRight']
|
||||
access_level = self._WIN_ACL_MAP[access_right]
|
||||
access_allow = raw_acl["AccessControlType"] == self._WIN_ACL_ALLOW
|
||||
|
||||
if not access_allow:
|
||||
if access_to.lower() == 'everyone' and len(raw_acls) == 1:
|
||||
LOG.debug("No access rules are set yet for share %s",
|
||||
share_name)
|
||||
else:
|
||||
LOG.warning(
|
||||
_LW("Found explicit deny ACE rule that was not "
|
||||
"created by Manila and will be ignored: %s"),
|
||||
raw_acl)
|
||||
continue
|
||||
if access_level == self._ACCESS_LEVEL_CUSTOM:
|
||||
LOG.warning(
|
||||
_LW("Found 'custom' ACE rule that will be ignored: %s"),
|
||||
raw_acl)
|
||||
continue
|
||||
elif access_right == self._WIN_ACCESS_RIGHT_FULL:
|
||||
LOG.warning(
|
||||
_LW("Account '%(access_to)s' was given full access "
|
||||
"right on share %(share_name)s. Manila only "
|
||||
"grants 'change' access."),
|
||||
{'access_to': access_to,
|
||||
'share_name': share_name})
|
||||
|
||||
acl = {
|
||||
'access_to': access_to,
|
||||
'access_level': access_level,
|
||||
'access_type': 'user',
|
||||
}
|
||||
acls.append(acl)
|
||||
return acls
|
||||
|
||||
def _grant_share_access(self, server, share_name, access_level, access_to):
|
||||
access_right = self._SHARE_ACCESS_RIGHT_MAP[access_level]
|
||||
cmd = ["Grant-SmbShareAccess", "-Name", share_name,
|
||||
"-AccessRight", access_right,
|
||||
"-AccountName", access_to, "-Force"]
|
||||
"-AccountName", "'%s'" % access_to, "-Force"]
|
||||
self._remote_exec(server, cmd)
|
||||
self._refresh_acl(server, share_name)
|
||||
LOG.info(_LI("Granted %(access_level)s access to '%(access_to)s' "
|
||||
"on share %(share_name)s"),
|
||||
{'access_level': access_level,
|
||||
'access_to': access_to,
|
||||
'share_name': share_name})
|
||||
|
||||
def _refresh_acl(self, server, share_name):
|
||||
cmd = ['Set-SmbPathAcl', '-ShareName', share_name]
|
||||
self._remote_exec(server, cmd)
|
||||
|
||||
def deny_access(self, server, share_name, access, force=False):
|
||||
access_to = access['access_to']
|
||||
self._revoke_share_access(server, share_name, access_to)
|
||||
|
||||
def _revoke_share_access(self, server, share_name, access_to):
|
||||
cmd = ['Revoke-SmbShareAccess', '-Name', share_name,
|
||||
'-AccountName', access_to, '-Force']
|
||||
'-AccountName', '"%s"' % access_to, '-Force']
|
||||
self._remote_exec(server, cmd)
|
||||
self._refresh_acl(server, share_name)
|
||||
LOG.info(_LI("Revoked access to '%(access_to)s' "
|
||||
"on share %(share_name)s"),
|
||||
{'access_to': access_to,
|
||||
'share_name': share_name})
|
||||
|
||||
def update_access(self, server, share_name, access_rules, add_rules,
|
||||
delete_rules):
|
||||
self.validate_access_rules(
|
||||
access_rules + add_rules,
|
||||
self._SUPPORTED_ACCESS_TYPES,
|
||||
self._SUPPORTED_ACCESS_LEVELS)
|
||||
|
||||
if not (add_rules or delete_rules):
|
||||
existing_rules = self.get_access_rules(server, share_name)
|
||||
add_rules, delete_rules = self._get_rule_updates(
|
||||
existing_rules=existing_rules,
|
||||
requested_rules=access_rules)
|
||||
LOG.debug(("Missing rules: %(add_rules)s, "
|
||||
"superfluous rules: %(delete_rules)s"),
|
||||
{'add_rules': add_rules,
|
||||
'delete_rules': delete_rules})
|
||||
|
||||
# Some rules may have changed, so we'll
|
||||
# treat the deleted rules first.
|
||||
for deleted_rule in delete_rules:
|
||||
try:
|
||||
self.validate_access_rules(
|
||||
[deleted_rule],
|
||||
self._SUPPORTED_ACCESS_TYPES,
|
||||
self._SUPPORTED_ACCESS_LEVELS)
|
||||
except (exception.InvalidShareAccess,
|
||||
exception.InvalidShareAccessLevel):
|
||||
# This check will allow invalid rules to be deleted.
|
||||
LOG.warning(_LW(
|
||||
"Unsupported access level %(level)s or access type "
|
||||
"%(type)s, skipping removal of access rule to "
|
||||
"%(to)s.") % {'level': deleted_rule['access_level'],
|
||||
'type': deleted_rule['access_type'],
|
||||
'to': deleted_rule['access_to']})
|
||||
continue
|
||||
self._revoke_share_access(server, share_name,
|
||||
deleted_rule['access_to'])
|
||||
|
||||
for added_rule in add_rules:
|
||||
self._grant_share_access(server, share_name,
|
||||
added_rule['access_level'],
|
||||
added_rule['access_to'])
|
||||
|
||||
def _subtract_access_rules(self, access_rules, subtracted_rules):
|
||||
# Account names are case insensitive on Windows.
|
||||
filter_rules = lambda rules: [
|
||||
{'access_to': access_rule['access_to'].lower(),
|
||||
'access_level': access_rule['access_level'],
|
||||
'access_type': access_rule['access_type']}
|
||||
for access_rule in rules]
|
||||
|
||||
return [rule for rule in filter_rules(access_rules)
|
||||
if rule not in filter_rules(subtracted_rules)]
|
||||
|
||||
def _get_rule_updates(self, existing_rules, requested_rules):
|
||||
added_rules = self._subtract_access_rules(requested_rules,
|
||||
existing_rules)
|
||||
deleted_rules = self._subtract_access_rules(existing_rules,
|
||||
requested_rules)
|
||||
return added_rules, deleted_rules
|
||||
|
||||
def _get_share_name(self, export_location):
|
||||
return self._windows_utils.normalize_path(
|
||||
|
@ -18,8 +18,6 @@ import mock
|
||||
|
||||
import os
|
||||
|
||||
from manila.common import constants as const
|
||||
from manila import exception
|
||||
from manila.share import configuration
|
||||
from manila.share.drivers import generic
|
||||
from manila.share.drivers.windows import service_instance
|
||||
@ -281,41 +279,3 @@ class WindowsSMBDriverTestCase(test.TestCase):
|
||||
mock.sentinel.server,
|
||||
mock.sentinel.mount_path)
|
||||
self.assertEqual(expected_result, is_mounted)
|
||||
|
||||
@ddt.data(const.ACCESS_LEVEL_RW, const.ACCESS_LEVEL_RO)
|
||||
def test_allow_access(self, access_level):
|
||||
access = {
|
||||
'access_type': 'ip',
|
||||
'access_to': 'fake_dest',
|
||||
'access_level': access_level,
|
||||
}
|
||||
self._drv.allow_access(
|
||||
mock.sentinel.context, self._share, access,
|
||||
share_server=self._share_server)
|
||||
|
||||
self._smb_helper.allow_access.assert_called_once_with(
|
||||
self._share_server['backend_details'],
|
||||
self._share['name'],
|
||||
access['access_type'], access['access_level'],
|
||||
access['access_to'])
|
||||
|
||||
def test_allow_access_unsupported(self):
|
||||
access = {
|
||||
'access_type': 'ip',
|
||||
'access_to': 'fake_dest',
|
||||
'access_level': 'fakefoobar',
|
||||
}
|
||||
self.assertRaises(
|
||||
exception.InvalidShareAccessLevel,
|
||||
self._drv.allow_access,
|
||||
mock.sentinel.context, self._share, access,
|
||||
share_server=self._share_server)
|
||||
|
||||
def test_deny_access(self):
|
||||
access = 'fake_access'
|
||||
self._drv.deny_access(
|
||||
mock.sentinel.context, self._share, access,
|
||||
share_server=self._share_server)
|
||||
self._smb_helper.deny_access.assert_called_once_with(
|
||||
self._share_server['backend_details'],
|
||||
self._share['name'], access)
|
||||
|
@ -41,6 +41,12 @@ class WindowsSMBHelperTestCase(test.TestCase):
|
||||
_FAKE_SHARE_LOCATION = os.path.join(
|
||||
configuration.Configuration(None).share_mount_path,
|
||||
_FAKE_SHARE_NAME)
|
||||
_FAKE_ACCOUNT_NAME = 'FakeDomain\\FakeUser'
|
||||
_FAKE_RW_ACC_RULE = {
|
||||
'access_to': _FAKE_ACCOUNT_NAME,
|
||||
'access_level': constants.ACCESS_LEVEL_RW,
|
||||
'access_type': 'user',
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
self._remote_exec = mock.Mock()
|
||||
@ -103,26 +109,62 @@ class WindowsSMBHelperTestCase(test.TestCase):
|
||||
|
||||
self.assertEqual(mock_get_vol_path.return_value, volume_path)
|
||||
|
||||
@ddt.data('ip', 'user')
|
||||
@ddt.data({'raw_out': '', 'expected': []},
|
||||
{'raw_out': '{"key": "val"}',
|
||||
'expected': [{"key": "val"}]},
|
||||
{'raw_out': '[{"key": "val"}, {"key2": "val2"}]',
|
||||
'expected': [{"key": "val"}, {"key2": "val2"}]})
|
||||
@ddt.unpack
|
||||
def test_get_acls_helper(self, raw_out, expected):
|
||||
self._remote_exec.return_value = (raw_out, mock.sentinel.err)
|
||||
|
||||
rules = self._win_smb_helper._get_acls(mock.sentinel.server,
|
||||
self._FAKE_SHARE_NAME)
|
||||
|
||||
self.assertEqual(expected, rules)
|
||||
expected_cmd = (
|
||||
'Get-SmbShareAccess -Name %s | '
|
||||
'Select-Object @("Name", "AccountName", '
|
||||
'"AccessControlType", "AccessRight") | '
|
||||
'ConvertTo-JSON -Compress') % self._FAKE_SHARE_NAME
|
||||
self._remote_exec.assert_called_once_with(mock.sentinel.server,
|
||||
expected_cmd)
|
||||
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
|
||||
'_grant_share_access')
|
||||
def test_allow_access(self, access_type, mock_grant_share_access):
|
||||
mock_args = (mock.sentinel.server, mock.sentinel.share_name,
|
||||
access_type, mock.sentinel.access_level,
|
||||
mock.sentinel.username)
|
||||
'_get_acls')
|
||||
def test_get_access_rules(self, mock_get_acls):
|
||||
helper = self._win_smb_helper
|
||||
valid_acl = {
|
||||
'AccountName': self._FAKE_ACCOUNT_NAME,
|
||||
'AccessRight': helper._WIN_ACCESS_RIGHT_FULL,
|
||||
'AccessControlType': helper._WIN_ACL_ALLOW,
|
||||
}
|
||||
|
||||
if access_type != 'user':
|
||||
self.assertRaises(exception.InvalidShareAccess,
|
||||
self._win_smb_helper.allow_access,
|
||||
*mock_args)
|
||||
else:
|
||||
self._win_smb_helper.allow_access(*mock_args)
|
||||
valid_acls = [valid_acl,
|
||||
dict(valid_acl,
|
||||
AccessRight=helper._WIN_ACCESS_RIGHT_CHANGE),
|
||||
dict(valid_acl,
|
||||
AccessRight=helper._WIN_ACCESS_RIGHT_READ)]
|
||||
# Those are rules that were not added by us and are expected to
|
||||
# be ignored. When encountering such a rule, a warning message
|
||||
# will be logged.
|
||||
ignored_acls = [
|
||||
dict(valid_acl, AccessRight=helper._WIN_ACCESS_RIGHT_CUSTOM),
|
||||
dict(valid_acl, AccessControlType=helper._WIN_ACL_DENY)]
|
||||
|
||||
mock_grant_share_access.assert_called_once_with(
|
||||
mock.sentinel.server,
|
||||
mock.sentinel.share_name,
|
||||
mock.sentinel.access_level,
|
||||
mock.sentinel.username)
|
||||
mock_get_acls.return_value = valid_acls + ignored_acls
|
||||
# There won't be multiple access rules for the same account,
|
||||
# but we'll ignore this fact for the sake of this test.
|
||||
expected_rules = [self._FAKE_RW_ACC_RULE, self._FAKE_RW_ACC_RULE,
|
||||
dict(self._FAKE_RW_ACC_RULE,
|
||||
access_level=constants.ACCESS_LEVEL_RO)]
|
||||
|
||||
rules = helper.get_access_rules(mock.sentinel.server,
|
||||
mock.sentinel.share_name)
|
||||
self.assertEqual(expected_rules, rules)
|
||||
|
||||
mock_get_acls.assert_called_once_with(mock.sentinel.server,
|
||||
mock.sentinel.share_name)
|
||||
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper, '_refresh_acl')
|
||||
def test_grant_share_access(self, mock_refresh_acl):
|
||||
@ -133,7 +175,7 @@ class WindowsSMBHelperTestCase(test.TestCase):
|
||||
|
||||
cmd = ["Grant-SmbShareAccess", "-Name", mock.sentinel.share_name,
|
||||
"-AccessRight", "Change",
|
||||
"-AccountName", mock.sentinel.username, "-Force"]
|
||||
"-AccountName", "'%s'" % mock.sentinel.username, "-Force"]
|
||||
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
|
||||
mock_refresh_acl.assert_called_once_with(mock.sentinel.server,
|
||||
mock.sentinel.share_name)
|
||||
@ -145,20 +187,6 @@ class WindowsSMBHelperTestCase(test.TestCase):
|
||||
cmd = ['Set-SmbPathAcl', '-ShareName', mock.sentinel.share_name]
|
||||
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
|
||||
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
|
||||
'_revoke_share_access')
|
||||
def test_deny_access(self, mock_revoke_share_access):
|
||||
mock_access = {'access_to': mock.sentinel.username}
|
||||
|
||||
self._win_smb_helper.deny_access(mock.sentinel.server,
|
||||
mock.sentinel.share_name,
|
||||
mock_access)
|
||||
|
||||
mock_revoke_share_access.assert_called_once_with(
|
||||
mock.sentinel.server,
|
||||
mock.sentinel.share_name,
|
||||
mock.sentinel.username)
|
||||
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper, '_refresh_acl')
|
||||
def test_revoke_share_access(self, mock_refresh_acl):
|
||||
self._win_smb_helper._revoke_share_access(mock.sentinel.server,
|
||||
@ -166,11 +194,138 @@ class WindowsSMBHelperTestCase(test.TestCase):
|
||||
mock.sentinel.username)
|
||||
|
||||
cmd = ["Revoke-SmbShareAccess", "-Name", mock.sentinel.share_name,
|
||||
"-AccountName", mock.sentinel.username, "-Force"]
|
||||
"-AccountName", '"%s"' % mock.sentinel.username, "-Force"]
|
||||
self._remote_exec.assert_called_once_with(mock.sentinel.server, cmd)
|
||||
mock_refresh_acl.assert_called_once_with(mock.sentinel.server,
|
||||
mock.sentinel.share_name)
|
||||
|
||||
def test_update_access_invalid_type(self):
|
||||
invalid_access_rule = dict(self._FAKE_RW_ACC_RULE,
|
||||
access_type='ip')
|
||||
self.assertRaises(
|
||||
exception.InvalidShareAccess,
|
||||
self._win_smb_helper.update_access,
|
||||
mock.sentinel.server, mock.sentinel.share_name,
|
||||
[invalid_access_rule], [], [])
|
||||
|
||||
def test_update_access_invalid_level(self):
|
||||
invalid_access_rule = dict(self._FAKE_RW_ACC_RULE,
|
||||
access_level='fake_level')
|
||||
self.assertRaises(
|
||||
exception.InvalidShareAccessLevel,
|
||||
self._win_smb_helper.update_access,
|
||||
mock.sentinel.server, mock.sentinel.share_name,
|
||||
[], [invalid_access_rule], [])
|
||||
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
|
||||
'_revoke_share_access')
|
||||
def test_update_access_deleting_invalid_rule(self, mock_revoke):
|
||||
# We want to make sure that we allow deleting invalid rules.
|
||||
invalid_access_rule = dict(self._FAKE_RW_ACC_RULE,
|
||||
access_level='fake_level')
|
||||
delete_rules = [invalid_access_rule, self._FAKE_RW_ACC_RULE]
|
||||
|
||||
self._win_smb_helper.update_access(
|
||||
mock.sentinel.server, mock.sentinel.share_name,
|
||||
[], [], delete_rules)
|
||||
|
||||
mock_revoke.assert_called_once_with(
|
||||
mock.sentinel.server, mock.sentinel.share_name,
|
||||
self._FAKE_RW_ACC_RULE['access_to'])
|
||||
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
|
||||
'validate_access_rules')
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
|
||||
'get_access_rules')
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
|
||||
'_grant_share_access')
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
|
||||
'_revoke_share_access')
|
||||
def test_update_access(self, mock_revoke, mock_grant,
|
||||
mock_get_access_rules, mock_validate):
|
||||
added_rules = [mock.MagicMock(), mock.MagicMock()]
|
||||
deleted_rules = [mock.MagicMock(), mock.MagicMock()]
|
||||
|
||||
self._win_smb_helper.update_access(
|
||||
mock.sentinel.server, mock.sentinel.share_name,
|
||||
[], added_rules, deleted_rules)
|
||||
|
||||
mock_revoke.assert_has_calls(
|
||||
[mock.call(mock.sentinel.server, mock.sentinel.share_name,
|
||||
deleted_rule['access_to'])
|
||||
for deleted_rule in deleted_rules])
|
||||
|
||||
mock_grant.assert_has_calls(
|
||||
[mock.call(mock.sentinel.server, mock.sentinel.share_name,
|
||||
added_rule['access_level'], added_rule['access_to'])
|
||||
for added_rule in added_rules])
|
||||
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
|
||||
'_get_rule_updates')
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
|
||||
'validate_access_rules')
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
|
||||
'get_access_rules')
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
|
||||
'_grant_share_access')
|
||||
@mock.patch.object(windows_smb_helper.WindowsSMBHelper,
|
||||
'_revoke_share_access')
|
||||
def test_update_access_maintenance(
|
||||
self, mock_revoke, mock_grant,
|
||||
mock_get_access_rules, mock_validate,
|
||||
mock_get_rule_updates):
|
||||
all_rules = mock.MagicMock()
|
||||
added_rules = [mock.MagicMock(), mock.MagicMock()]
|
||||
deleted_rules = [mock.MagicMock(), mock.MagicMock()]
|
||||
|
||||
mock_get_rule_updates.return_value = [
|
||||
added_rules, deleted_rules]
|
||||
|
||||
self._win_smb_helper.update_access(
|
||||
mock.sentinel.server, mock.sentinel.share_name,
|
||||
all_rules, [], [])
|
||||
|
||||
mock_get_access_rules.assert_called_once_with(
|
||||
mock.sentinel.server, mock.sentinel.share_name)
|
||||
mock_get_rule_updates.assert_called_once_with(
|
||||
existing_rules=mock_get_access_rules.return_value,
|
||||
requested_rules=all_rules)
|
||||
mock_revoke.assert_has_calls(
|
||||
[mock.call(mock.sentinel.server, mock.sentinel.share_name,
|
||||
deleted_rule['access_to'])
|
||||
for deleted_rule in deleted_rules])
|
||||
|
||||
mock_grant.assert_has_calls(
|
||||
[mock.call(mock.sentinel.server, mock.sentinel.share_name,
|
||||
added_rule['access_level'], added_rule['access_to'])
|
||||
for added_rule in added_rules])
|
||||
|
||||
def test_get_rule_updates(self):
|
||||
req_rule_0 = self._FAKE_RW_ACC_RULE
|
||||
req_rule_1 = dict(self._FAKE_RW_ACC_RULE,
|
||||
access_to='fake_acc')
|
||||
|
||||
curr_rule_0 = dict(self._FAKE_RW_ACC_RULE,
|
||||
access_to=self._FAKE_RW_ACC_RULE[
|
||||
'access_to'].upper())
|
||||
curr_rule_1 = dict(self._FAKE_RW_ACC_RULE,
|
||||
access_to='fake_acc2')
|
||||
curr_rule_2 = dict(req_rule_1,
|
||||
access_level=constants.ACCESS_LEVEL_RO)
|
||||
|
||||
expected_added_rules = [req_rule_1]
|
||||
expected_deleted_rules = [curr_rule_1, curr_rule_2]
|
||||
|
||||
existing_rules = [curr_rule_0, curr_rule_1, curr_rule_2]
|
||||
requested_rules = [req_rule_0, req_rule_1]
|
||||
|
||||
(added_rules,
|
||||
deleted_rules) = self._win_smb_helper._get_rule_updates(
|
||||
existing_rules, requested_rules)
|
||||
|
||||
self.assertEqual(expected_added_rules, added_rules)
|
||||
self.assertEqual(expected_deleted_rules, deleted_rules)
|
||||
|
||||
def test_get_share_name(self):
|
||||
result = self._win_smb_helper._get_share_name(self._FAKE_SHARE)
|
||||
self.assertEqual(self._FAKE_SHARE_NAME, result)
|
||||
|
Loading…
Reference in New Issue
Block a user