EMC Isilon Driver Support For CIFS Read-Only Share

Added support for read-only shares to the CIFS protocol in the Isilon
driver.

Change-Id: I0dacb5538071af31cd07a187fead1b34e19db58d
Implements: blueprint emc-isilon-driver-readonly-share
This commit is contained in:
Shaun Edwards 2016-02-18 14:07:14 -08:00
parent 220aa7a618
commit 5b6d52487d
5 changed files with 431 additions and 34 deletions

View File

@ -82,7 +82,7 @@ Mapping of share drivers and share access rules support
+----------------------------------------+--------------+----------------+------------+--------------+----------------+------------+
| EMC VNX | NFS (J) | CIFS (J) | \- | NFS (L) | CIFS (L) | \- |
+----------------------------------------+--------------+----------------+------------+--------------+----------------+------------+
| EMC Isilon | NFS,CIFS (K) | \- | \- | NFS (M) | \- | \- |
| EMC Isilon | NFS,CIFS (K) | CIFS (M) | \- | NFS (M) | CIFS (M) | \- |
+----------------------------------------+--------------+----------------+------------+--------------+----------------+------------+
| Red Hat GlusterFS | NFS (J) | \- | \- | \- | \- | \- |
+----------------------------------------+--------------+----------------+------------+--------------+----------------+------------+

View File

@ -17,7 +17,6 @@
Isilon specific NAS backend plugin.
"""
import os
from oslo_config import cfg
from oslo_log import log
from oslo_utils import units
@ -181,21 +180,10 @@ class IsilonStorageConnection(base.StorageConnection):
def allow_access(self, context, share, access, share_server):
"""Allow access to the share."""
# TODO(sedwards): Look into supporting ro/rw access to shares
if access['access_type'] != 'ip':
message = _('Only ip access type allowed.')
LOG.error(message)
raise exception.ShareBackendException(msg=message)
access_ip = access['access_to']
if share['share_proto'] == 'NFS':
export_path = self._get_container_path(share)
self._nfs_allow_access(
access_ip, export_path, access['access_level'])
self._nfs_allow_access(share, access)
elif share['share_proto'] == 'CIFS':
self._cifs_allow_access(
access_ip, share, access['access_level'])
self._cifs_allow_access(share, access)
else:
message = _(
'Unsupported share protocol: %s. Only "NFS" and '
@ -204,9 +192,18 @@ class IsilonStorageConnection(base.StorageConnection):
LOG.error(message)
raise exception.InvalidShare(reason=message)
def _nfs_allow_access(self, access_ip, export_path, access_level):
def _nfs_allow_access(self, share, access):
"""Allow access to nfs share."""
access_type = access['access_type']
if access_type != 'ip':
message = _('Only "ip" access type allowed for the NFS'
'protocol.')
LOG.error(message)
raise exception.InvalidShareAccess(reason=message)
export_path = self._get_container_path(share)
access_ip = access['access_to']
access_level = access['access_level']
share_id = self._isilon_api.lookup_nfs_export(export_path)
share_access_group = 'clients'
@ -227,9 +224,22 @@ class IsilonStorageConnection(base.StorageConnection):
resp = self._isilon_api.request('PUT', url, data=export_params)
resp.raise_for_status()
def _cifs_allow_access(self, ip, share, access_level):
"""Allow access to cifs share."""
def _cifs_allow_access(self, share, access):
access_type = access['access_type']
access_to = access['access_to']
access_level = access['access_level']
if access_type == 'ip':
access_ip = access['access_to']
self._cifs_allow_access_ip(access_ip, share, access_level)
elif access_type == 'user':
self._cifs_allow_access_user(access_to, share, access_level)
else:
message = _('Only "ip" and "user" access types allowed for '
'CIFS protocol.')
LOG.error(message)
raise exception.InvalidShareAccess(reason=message)
def _cifs_allow_access_ip(self, ip, share, access_level):
if access_level == const.ACCESS_LEVEL_RO:
message = _('Only RW Access allowed for CIFS Protocol when using '
'the "ip" access type.')
@ -247,22 +257,34 @@ class IsilonStorageConnection(base.StorageConnection):
r = self._isilon_api.request('PUT', url, data=data)
r.raise_for_status()
def _cifs_allow_access_user(self, user, share, access_level):
if access_level == const.ACCESS_LEVEL_RW:
smb_permission = isilon_api.SmbPermission.rw
elif access_level == const.ACCESS_LEVEL_RO:
smb_permission = isilon_api.SmbPermission.ro
else:
message = _('Only "RW" and "RO" access levels are supported.')
LOG.error(message)
raise exception.InvalidShareAccess(reason=message)
self._isilon_api.smb_permissions_add(share['name'], user,
smb_permission)
def deny_access(self, context, share, access, share_server):
"""Deny access to the share."""
if share['share_proto'] == 'NFS':
self._nfs_deny_access(share, access)
elif share['share_proto'] == 'CIFS':
self._cifs_deny_access(share, access)
def _nfs_deny_access(self, share, access):
"""Deny access to nfs share."""
if access['access_type'] != 'ip':
return
denied_ip = access['access_to']
access_level = access['access_level']
if share['share_proto'] == 'NFS':
self._nfs_deny_access(denied_ip, share, access_level)
elif share['share_proto'] == 'CIFS':
self._cifs_deny_access(denied_ip, share)
def _nfs_deny_access(self, denied_ip, share, access_level):
"""Deny access to nfs share."""
share_access_group = 'clients'
if access_level == const.ACCESS_LEVEL_RO:
share_access_group = 'read_only_clients'
@ -305,7 +327,20 @@ class IsilonStorageConnection(base.StorageConnection):
return export
def _cifs_deny_access(self, denied_ip, share):
def _cifs_deny_access(self, share, access):
access_type = access['access_type']
if access_type == 'ip':
self._cifs_deny_access_ip(access['access_to'], share)
elif access_type == 'user':
self._cifs_deny_access_user(share, access)
else:
message = _('Access type for CIFS deny access request was '
'"%(access_type)s". Only "user" and "ip" access types '
'are supported for CIFS protocol access.') % {
'access_type': access_type}
LOG.warning(message)
def _cifs_deny_access_ip(self, denied_ip, share):
"""Deny access to cifs share."""
share_json = self._isilon_api.lookup_smb_share(share['name'])
@ -319,6 +354,10 @@ class IsilonStorageConnection(base.StorageConnection):
resp = self._isilon_api.request('PUT', url, data=share_params)
resp.raise_for_status()
def _cifs_deny_access_user(self, share, access):
self._isilon_api.smb_permissions_remove(share['name'], access[
'access_to'])
def check_for_setup_error(self):
"""Check for setup error."""

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from enum import Enum
from oslo_serialization import jsonutils
import requests
import six
@ -164,7 +165,7 @@ class IsilonApi(object):
otherwise
"""
data = {}
data = {'permissions': []}
data['name'] = share_name
data['path'] = share_path
url = self.host_url + '/platform/1/protocols/smb/shares'
@ -264,9 +265,73 @@ class IsilonApi(object):
quota_id = quota_json['id']
self.quota_modify_size(quota_id, size)
def smb_permissions_add(self, share_name, user, smb_permission):
smb_share = self.lookup_smb_share(share_name)
permissions = smb_share['permissions']
# lookup given user string
user_json = self.auth_lookup_user(user)
auth_mappings = user_json['mapping']
if len(auth_mappings) > 1:
message = (_('More than one mapping found for user "%(user)s".')
% {'user': user})
raise exception.ShareBackendException(msg=message)
user_sid = auth_mappings[0]['user']['sid']
new_permission = {
'permission': smb_permission.value,
'permission_type': 'allow',
'trustee': user_sid
}
url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self.host_url, share_name)
new_permissions = list(permissions)
new_permissions.append(new_permission)
data = {'permissions': new_permissions}
r = self.request('PUT', url, data=data)
r.raise_for_status()
def smb_permissions_remove(self, share_name, user):
smb_share = self.lookup_smb_share(share_name)
permissions = smb_share['permissions']
# find the perm to remove
perm_to_remove = None
for perm in list(permissions):
if perm['trustee']['name'] == user:
perm_to_remove = perm
if perm_to_remove is not None:
permissions.remove(perm)
else:
message = _('Attempting to remove permission for user "%(user)s", '
'but this user was not found in the share\'s '
'(%(share)s) permissions list.') % {'user': user,
'share': smb_share}
raise exception.ShareBackendException(msg=message)
self.request('PUT', '{0}/platform/1/protocols/smb/shares/{1}'.format(
self.host_url, share_name), data={'permissions': permissions})
def auth_lookup_user(self, user_string):
url = '{0}/platform/1/auth/mapping/users/lookup'.format(self.host_url)
r = self.request('GET', url, params={"user": user_string})
if r.status_code == 404:
raise exception.ShareBackendException(msg='user not found')
elif r.status_code != 200:
r.raise_for_status()
return r.json()
def request(self, method, url, headers=None, data=None, params=None):
if data is not None:
data = jsonutils.dumps(data)
r = self.session.request(method, url, headers=headers, data=data,
verify=self.verify_ssl_cert, params=params)
return r
class SmbPermission(Enum):
full = 'full'
rw = 'change'
ro = 'read'

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from oslo_log import log
from oslo_utils import units
@ -21,11 +22,13 @@ import six
from manila.common import constants as const
from manila import exception
from manila.share.drivers.emc.plugins.isilon import isilon
from manila.share.drivers.emc.plugins.isilon.isilon_api import SmbPermission
from manila import test
LOG = log.getLogger(__name__)
@ddt.ddt
class IsilonTest(test.TestCase):
"""Integration test for the Isilon Manila driver."""
@ -227,7 +230,7 @@ class IsilonTest(test.TestCase):
self._mock_isilon_api.request.assert_called_once_with(
'PUT', expected_url, data=expected_data)
def test_deny_access_invalid_access_type(self):
def test_deny_access_nfs_invalid_access_type(self):
share = {'name': self.SHARE_NAME, 'share_proto': 'NFS'}
access = {'access_type': 'foo_access_type', 'access_to': '10.0.0.1'}
@ -235,6 +238,14 @@ class IsilonTest(test.TestCase):
self.storage_connection.deny_access(
self.mock_context, share, access, None)
def test_deny_access_cifs_invalid_access_type(self):
share = {'name': self.SHARE_NAME, 'share_proto': 'CIFS'}
access = {'access_type': 'foo_access_type', 'access_to': '10.0.0.1'}
# This operation should return silently
self.storage_connection.deny_access(self.mock_context, share, access,
None)
def test_deny_access_invalid_share_protocol(self):
share = {'name': self.SHARE_NAME, 'share_proto': 'FOO'}
access = {'access_type': 'ip', 'access_to': '10.0.0.1',
@ -268,6 +279,18 @@ class IsilonTest(test.TestCase):
self.storage_connection.deny_access,
self.mock_context, share, access, None)
def test_deny_access_nfs_share_does_not_contain_required_key(self):
share = {'name': self.SHARE_NAME, 'share_proto': 'NFS'}
access = {
'access_type': 'ip',
'access_to': '10.0.0.1',
'access_level': const.ACCESS_LEVEL_RW,
}
self._mock_isilon_api.get_nfs_export.return_value = {}
self.assertRaises(exception.ShareBackendException,
self.storage_connection.deny_access,
self.mock_context, share, access, None)
def test_allow_access_multiple_ip_nfs(self):
"""Verifies adding an IP to a whitelist with pre-existing ips.
@ -374,6 +397,63 @@ class IsilonTest(test.TestCase):
self._mock_isilon_api.request.assert_called_once_with(
'PUT', expected_url, data=expected_data)
@ddt.data(
('foo', const.ACCESS_LEVEL_RW, SmbPermission.rw),
('testuser', const.ACCESS_LEVEL_RO, SmbPermission.ro),
)
def test_allow_access_with_cifs_user(self, data):
# setup
share_name = self.SHARE_NAME
user, access_level, expected_smb_perm = data
share = {'name': share_name, 'share_proto': 'CIFS'}
access = {'access_type': 'user',
'access_to': user,
'access_level': access_level}
self.storage_connection.allow_access(self.mock_context, share,
access, None)
self._mock_isilon_api.smb_permissions_add.assert_called_once_with(
share_name, user, expected_smb_perm)
def test_allow_access_with_cifs_user_invalid_access_level(self):
share = {'name': self.SHARE_NAME, 'share_proto': 'CIFS'}
access = {
'access_type': 'user',
'access_to': 'foo',
'access_level': 'everything',
}
self.assertRaises(exception.InvalidShareAccess,
self.storage_connection.allow_access,
self.mock_context, share, access, None)
def test_allow_access_with_cifs_invalid_access_type(self):
share_name = self.SHARE_NAME
share = {'name': share_name, 'share_proto': 'CIFS'}
access = {'access_type': 'fooaccesstype',
'access_to': 'testuser',
'access_level': const.ACCESS_LEVEL_RW}
self.assertRaises(exception.InvalidShareAccess,
self.storage_connection.allow_access,
self.mock_context, share, access, None)
def test_deny_access_with_cifs_user(self):
share_name = self.SHARE_NAME
user_to_remove = 'testuser'
share = {'name': share_name, 'share_proto': 'CIFS'}
access = {'access_type': 'user',
'access_to': user_to_remove,
'access_level': const.ACCESS_LEVEL_RW}
self.assertFalse(self._mock_isilon_api.smb_permissions_remove.called)
self.storage_connection.deny_access(self.mock_context, share, access,
None)
self._mock_isilon_api.smb_permissions_remove.assert_called_with(
share_name, user_to_remove)
def test_allow_access_invalid_access_type(self):
# setup
share_name = self.SHARE_NAME
@ -383,7 +463,7 @@ class IsilonTest(test.TestCase):
# verify method under test throws the expected exception
self.assertRaises(
exception.ShareBackendException,
exception.InvalidShareAccess,
self.storage_connection.allow_access,
self.mock_context, share, access, None)

View File

@ -19,6 +19,7 @@ import requests
import requests_mock
import six
from manila import exception
from manila.share.drivers.emc.plugins.isilon import isilon_api
from manila import test
@ -360,10 +361,11 @@ class IsilonApiTest(test.TestCase):
self.assertEqual(expected_return_value, r)
self.assertEqual(1, len(m.request_history))
expected_request_data = json.loads(
'{{"name": "{0}", "path": "{1}"}}'.format(
share_name, share_path)
)
expected_request_data = {
'name': share_name,
'path': share_path,
'permissions': []
}
self.assertEqual(expected_request_data,
json.loads(m.request_history[0].body))
@ -612,6 +614,217 @@ class IsilonApiTest(test.TestCase):
)
self.assertEqual(400, e.response.status_code)
@ddt.data(
('foouser', isilon_api.SmbPermission.rw),
('testuser', isilon_api.SmbPermission.ro),
)
def test_smb_permission_add(self, data):
user, smb_permission = data
share_name = 'testshare'
with requests_mock.mock() as m:
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self._mock_url, share_name)
share_data = {
'shares': [
{'permissions': []}
]
}
m.get(papi_share_url, status_code=200, json=share_data)
auth_url = '{0}/platform/1/auth/mapping/users/lookup?user={1}' \
''.format(self._mock_url, user)
example_sid = 'SID:S-1-5-21'
sid_json = {
'id': example_sid,
'name': user,
'type': 'user'
}
auth_json = {'mapping': [
{'user': {'sid': sid_json}}
]}
m.get(auth_url, status_code=200, json=auth_json)
m.put(papi_share_url)
self.isilon_api.smb_permissions_add(share_name, user,
smb_permission)
perms_put_request = m.request_history[2]
expected_perm_request_json = {
'permissions': [
{'permission': smb_permission.value,
'permission_type': 'allow',
'trustee': sid_json
}
]
}
self.assertEqual(expected_perm_request_json,
json.loads(perms_put_request.body))
@requests_mock.mock()
def test_smb_permission_add_with_multiple_users_found(self, m):
user = 'foouser'
smb_permission = isilon_api.SmbPermission.rw
share_name = 'testshare'
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self._mock_url, share_name)
share_data = {
'shares': [
{'permissions': []}
]
}
m.get(papi_share_url, status_code=200, json=share_data)
auth_url = '{0}/platform/1/auth/mapping/users/lookup?user={1}' \
''.format(self._mock_url, user)
example_sid = 'SID:S-1-5-21'
sid_json = {
'id': example_sid,
'name': user,
'type': 'user'
}
auth_json = {'mapping': [
{'user': {'sid': sid_json}},
{'user': {'sid': sid_json}},
]}
m.get(auth_url, status_code=200, json=auth_json)
m.put(papi_share_url)
self.assertRaises(exception.ShareBackendException,
self.isilon_api.smb_permissions_add,
share_name, user, smb_permission)
@requests_mock.mock()
def test_smb_permission_remove(self, m):
share_name = 'testshare'
user = 'testuser'
share_data = {
'permissions': [{
'permission': 'change',
'permission_type': 'allow',
'trustee': {
'id': 'SID:S-1-5-21',
'name': user,
'type': 'user',
}
}]
}
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self._mock_url, share_name)
m.get(papi_share_url, status_code=200, json={'shares': [share_data]})
num_existing_perms = len(self.isilon_api.lookup_smb_share(share_name))
self.assertEqual(1, num_existing_perms)
m.put(papi_share_url)
self.isilon_api.smb_permissions_remove(share_name, user)
smb_put_request = m.request_history[2]
expected_body = {'permissions': []}
expected_body = json.dumps(expected_body)
self.assertEqual(expected_body, smb_put_request.body)
@requests_mock.mock()
def test_smb_permission_remove_with_multiple_existing_perms(self, m):
share_name = 'testshare'
user = 'testuser'
foouser_perms = {
'permission': 'change',
'permission_type': 'allow',
'trustee': {
'id': 'SID:S-1-5-21',
'name': 'foouser',
'type': 'user',
}
}
user_perms = {
'permission': 'change',
'permission_type': 'allow',
'trustee': {
'id': 'SID:S-1-5-22',
'name': user,
'type': 'user',
}
}
share_data = {
'permissions': [
foouser_perms,
user_perms,
]
}
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self._mock_url, share_name)
m.get(papi_share_url, status_code=200, json={'shares': [share_data]})
num_existing_perms = len(self.isilon_api.lookup_smb_share(
share_name)['permissions'])
self.assertEqual(2, num_existing_perms)
m.put(papi_share_url)
self.isilon_api.smb_permissions_remove(share_name, user)
smb_put_request = m.request_history[2]
expected_body = {'permissions': [foouser_perms]}
expected_body = json.dumps(expected_body)
self.assertEqual(json.loads(expected_body),
json.loads(smb_put_request.body))
@requests_mock.mock()
def test_smb_permission_remove_with_empty_perms_list(self, m):
share_name = 'testshare'
user = 'testuser'
share_data = {'permissions': []}
papi_share_url = '{0}/platform/1/protocols/smb/shares/{1}'.format(
self._mock_url, share_name)
m.get(papi_share_url, status_code=200, json={'shares': [share_data]})
m.put(papi_share_url)
self.assertRaises(exception.ShareBackendException,
self.isilon_api.smb_permissions_remove,
share_name, user)
@requests_mock.mock()
def test_auth_lookup_user(self, m):
user = 'foo'
auth_url = '{0}/platform/1/auth/mapping/users/lookup?user={1}'.format(
self._mock_url, user)
example_sid = 'SID:S-1-5-21'
sid_json = {
'id': example_sid,
'name': user,
'type': 'user'
}
auth_json = {
'mapping': [
{'user': {'sid': sid_json}}
]
}
m.get(auth_url, status_code=200, json=auth_json)
returned_auth_json = self.isilon_api.auth_lookup_user(user)
self.assertEqual(auth_json, returned_auth_json)
@requests_mock.mock()
def test_auth_lookup_user_with_nonexistent_user(self, m):
user = 'nonexistent'
auth_url = '{0}/platform/1/auth/mapping/users/lookup?user={1}'.format(
self._mock_url, user)
m.get(auth_url, status_code=404)
self.assertRaises(exception.ShareBackendException,
self.isilon_api.auth_lookup_user, user)
@requests_mock.mock()
def test_auth_lookup_user_with_backend_error(self, m):
user = 'foo'
auth_url = '{0}/platform/1/auth/mapping/users/lookup?user={1}'.format(
self._mock_url, user)
m.get(auth_url, status_code=400)
self.assertRaises(requests.exceptions.HTTPError,
self.isilon_api.auth_lookup_user, user)
def _add_create_directory_response(self, m, path, is_recursive):
url = '{0}/namespace{1}?recursive={2}'.format(
self._mock_url, path, six.text_type(is_recursive))