Add support for HDFS native protocol driver

Add basic support for hdfs native protocol exports.

A manila share in this driver is a sub-dir in hdfs root dir.
Instances talk directly to the hdfs backend with 'hdfs' protocol.
Access to the share is allowed by user based access type, which
is aligned with HDFS ACLs to support access control of multiple
users and groups.

Implements blueprint hdfs-driver

Change-Id: Id9bd391d829e11d332b03c2245691ea42fc30bcc
This commit is contained in:
Sun Jun 2015-01-30 11:21:40 +08:00
parent 75da4bc5b5
commit 310dc5523a
8 changed files with 876 additions and 1 deletions

View File

@ -497,3 +497,7 @@ class SSHException(ManilaException):
class SopAPIError(Invalid):
message = _("%(err)s")
class HDFSException(ManilaException):
message = _("HDFS exception occurred!")

View File

@ -52,6 +52,7 @@ import manila.share.drivers.emc.driver
import manila.share.drivers.generic
import manila.share.drivers.glusterfs
import manila.share.drivers.glusterfs_native
import manila.share.drivers.hdfs.hdfs_native
import manila.share.drivers.hds.sop
import manila.share.drivers.hp.hp_3par_driver
import manila.share.drivers.huawei.huawei_nas
@ -109,6 +110,7 @@ _global_opt_lists = [
manila.share.drivers.generic.share_opts,
manila.share.drivers.glusterfs.GlusterfsManilaShare_opts,
manila.share.drivers.glusterfs_native.glusterfs_native_manila_share_opts,
manila.share.drivers.hdfs.hdfs_native.hdfs_native_share_opts,
manila.share.drivers.hds.sop.hdssop_share_opts,
manila.share.drivers.hp.hp_3par_driver.HP3PAR_OPTS,
manila.share.drivers.huawei.huawei_nas.huawei_opts,

View File

@ -108,7 +108,7 @@ class API(base.Base):
# TODO(rushiagr): Find a suitable place to keep all the allowed
# share types so that it becomes easier to add one
if share_proto.lower() not in ['nfs', 'cifs', 'glusterfs']:
if share_proto.lower() not in ['nfs', 'cifs', 'glusterfs', 'hdfs']:
msg = (_("Invalid share type provided: %s") % share_proto)
raise exception.InvalidInput(reason=msg)

View File

View File

@ -0,0 +1,443 @@
# Copyright (c) 2015 Intel, Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""HDFS native protocol (hdfs) driver for manila shares.
Manila share is a directory in HDFS. And this share does not use
service VM instance (share server). The instance directly talks
to the the HDFS cluster.
The initial version only supports single namenode and flat network.
Configuration Requirements:
To enable access control, HDFS file system must have ACLs enabled.
"""
import math
import os
import pipes
import socket
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_utils import units
import six
from manila import exception
from manila.i18n import _
from manila.openstack.common import log as logging
from manila.share import driver
from manila import utils
LOG = logging.getLogger(__name__)
hdfs_native_share_opts = [
cfg.StrOpt('hdfs_namenode_ip',
default=None,
help='The IP of the HDFS namenode.'),
cfg.IntOpt('hdfs_namenode_port',
default=9000,
help='The port of HDFS namenode service.'),
cfg.IntOpt('hdfs_ssh_port',
default=22,
help='HDFS namenode SSH port.'),
cfg.StrOpt('hdfs_ssh_name',
default=None,
help='HDFS namenode ssh login name.'),
cfg.StrOpt('hdfs_ssh_pw',
default=None,
help='HDFS namenode SSH login password, '
'This parameter is not necessary, if '
'\'hdfs_ssh_private_key\' is configured.'),
cfg.StrOpt('hdfs_ssh_private_key',
default=None,
help='Path to HDFS namenode SSH private '
'key for login.'),
]
CONF = cfg.CONF
CONF.register_opts(hdfs_native_share_opts)
class HDFSNativeShareDriver(driver.ExecuteMixin, driver.ShareDriver):
"""HDFS Share Driver.
Executes commands relating to shares.
API version history:
1.0 - Initial Version
"""
def __init__(self, db, *args, **kwargs):
super(HDFSNativeShareDriver, self).__init__(False, *args, **kwargs)
self.db = db
self.configuration.append_config_values(hdfs_native_share_opts)
self.backend_name = self.configuration.safe_get(
'share_backend_name') or 'HDFS-Native'
self.ssh_connections = {}
self._hdfs_execute = None
self._hdfs_bin = None
self._hdfs_base_path = None
def do_setup(self, context):
"""Do initialization while the share driver starts."""
super(HDFSNativeShareDriver, self).do_setup(context)
host = self.configuration.hdfs_namenode_ip
local_hosts = socket.gethostbyname_ex(socket.gethostname())[2]
if host in local_hosts:
self._hdfs_execute = self._hdfs_local_execute
else:
self._hdfs_execute = self._hdfs_remote_execute
self._hdfs_bin = self._get_hdfs_bin_path()
self._hdfs_base_path = (
'hdfs://' + self.configuration.hdfs_namenode_ip + ':'
+ six.text_type(self.configuration.hdfs_namenode_port))
def _hdfs_local_execute(self, *cmd, **kwargs):
if 'run_as_root' not in kwargs:
kwargs.update({'run_as_root': True})
return utils.execute(*cmd, **kwargs)
def _hdfs_remote_execute(self, *cmd, **kwargs):
host = self.configuration.hdfs_namenode_ip
check_exit_code = kwargs.pop('check_exit_code', False)
return self._run_ssh(host, cmd, check_exit_code)
def _run_ssh(self, host, cmd_list, check_exit_code=False):
command = ' '.join(pipes.quote(cmd_arg) for cmd_arg in cmd_list)
connection = self.ssh_connections.get(host)
if not connection:
hdfs_ssh_name = self.configuration.hdfs_ssh_name
password = self.configuration.hdfs_ssh_pw
privatekey = self.configuration.hdfs_ssh_private_key
hdfs_ssh_port = self.configuration.hdfs_ssh_port
ssh_conn_timeout = self.configuration.ssh_conn_timeout
min_size = self.configuration.ssh_min_pool_conn
max_size = self.configuration.ssh_max_pool_conn
ssh_pool = utils.SSHPool(host,
hdfs_ssh_port,
ssh_conn_timeout,
hdfs_ssh_name,
password=password,
privatekey=privatekey,
min_size=min_size,
max_size=max_size)
ssh = ssh_pool.create()
self.ssh_connections[host] = (ssh_pool, ssh)
else:
ssh_pool, ssh = connection
if not ssh.get_transport().is_active():
ssh_pool.remove(ssh)
ssh = ssh_pool.create()
self.ssh_connections[host] = (ssh_pool, ssh)
try:
return processutils.ssh_execute(
ssh,
command,
check_exit_code=check_exit_code)
except Exception as e:
msg = (_('Error running SSH command: %(cmd)s. '
'Error: %(excmsg)s.') %
{'cmd': command, 'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
def _get_hdfs_bin_path(self):
try:
(out, __) = self._hdfs_execute('locate', '/bin/hdfs')
except exception.ProcessExecutionError as e:
msg = (_('Can not get the execution path of hdfs. '
'Error: %(excmsg)s.') %
{'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
lines = out.splitlines()
if lines and lines[0].endswith('/bin/hdfs'):
return lines[0]
else:
msg = _('Can not get the execution path of hdfs.')
LOG.error(msg)
raise exception.HDFSException(msg)
def _create_share(self, share):
"""Creates a share."""
if share['share_proto'].lower() != 'hdfs':
msg = _('Only HDFS protocol supported!')
LOG.error(msg)
raise exception.HDFSException(msg)
share_dir = '/' + share['name']
sizestr = six.text_type(share['size']) + 'g'
try:
self._hdfs_execute(self._hdfs_bin, 'dfs',
'-mkdir', share_dir)
except exception.ProcessExecutionError as e:
msg = (_('Failed to create directory in hdfs for the '
'share %(sharename)s. Error: %(excmsg)s.') %
{'sharename': share['name'],
'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
try:
self._hdfs_execute(self._hdfs_bin, 'dfsadmin',
'-setSpaceQuota', sizestr, share_dir)
except exception.ProcessExecutionError as e:
msg = (_('Failed to set space quota for the '
'share %(sharename)s. Error: %(excmsg)s.') %
{'sharename': share['name'],
'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
try:
self._hdfs_execute(self._hdfs_bin, 'dfsadmin',
'-allowSnapshot', share_dir)
except exception.ProcessExecutionError as e:
msg = (_('Failed to allow snapshot for the '
'share %(sharename)s. Error: %(excmsg)s.') %
{'sharename': share['name'],
'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
def _get_share_path(self, share):
"""Return share path on storage provider."""
return os.path.join(self._hdfs_base_path, share['name'])
def _get_snapshot_path(self, snapshot):
"""Return snapshot path on storage provider."""
snapshot_dir = '.snapshot'
return os.path.join('/', snapshot['share_name'],
snapshot_dir, snapshot['name'])
def get_network_allocations_number(self):
return 0
def create_share(self, context, share, share_server=None):
"""Create a HDFS directory which acted as a share."""
self._create_share(share)
return self._get_share_path(share)
def create_share_from_snapshot(self, context, share, snapshot,
share_server=None):
"""Creates a snapshot."""
self._create_share(share)
share_path = '/' + share['name']
snapshot_path = self._get_snapshot_path(snapshot)
cmd = [self._hdfs_bin, 'dfs', '-cp', '-r',
snapshot_path, share_path]
try:
self._hdfs_execute(*cmd)
except exception.ProcessExecutionError as e:
msg = (_('Failed to create share %(sharename)s from '
'snapshot %(snapshotname)s. Error: %(excmsg)s.') %
{'sharename': share['name'],
'snapshotname': snapshot['name'],
'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
return self._get_share_path(share)
def create_snapshot(self, context, snapshot, share_server=None):
"""Creates a snapshot."""
share_dir = '/' + snapshot['share_name']
snapshot_name = snapshot['name']
cmd = [self._hdfs_bin, 'dfs', '-createSnapshot',
share_dir, snapshot_name]
try:
self._hdfs_execute(*cmd)
except exception.ProcessExecutionError as e:
msg = (_('Failed to create snapshot %(snapshotname)s for '
'the share %(sharename)s. Error: %(excmsg)s.') %
{'snapshotname': snapshot_name,
'sharename': snapshot['share_name'],
'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
def delete_share(self, context, share, share_server=None):
"""Deletes share storage."""
share_dir = '/' + share['name']
cmd = [self._hdfs_bin, 'dfs', '-rm', '-r', share_dir]
try:
self._hdfs_execute(*cmd)
except exception.ProcessExecutionError as e:
msg = (_('Failed to delete share %(sharename)s. '
'Error: %(excmsg)s.') %
{'sharename': share['name'],
'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
def delete_snapshot(self, context, snapshot, share_server=None):
"""Deletes a snapshot."""
share_dir = '/' + snapshot['share_name']
cmd = [self._hdfs_bin, 'dfs', '-deleteSnapshot',
share_dir, snapshot['name']]
try:
self._hdfs_execute(*cmd)
except exception.ProcessExecutionError as e:
msg = (_('Failed to delete snapshot %(snapshotname)s. '
'Error: %(excmsg)s.') %
{'snapshotname': snapshot['name'],
'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
def ensure_share(self, context, share, share_server=None):
"""Ensure the storage are exported."""
def allow_access(self, context, share, access, share_server=None):
"""Allows access to the share for a given user."""
if access['access_type'] != 'user':
msg = _("Only 'user' access type allowed!")
LOG.error(msg)
raise exception.InvalidShareAccess(msg)
# Note(jun): For directories in HDFS, the x permission is
# required to access a child of the directory.
if access['access_level'] == 'rw':
access_level = 'rwx'
elif access['access_level'] == 'ro':
access_level = 'r-x'
else:
msg = (_('The access level %(accesslevel)s was unsupported.') %
{'accesslevel': access['access_level']})
LOG.error(msg)
raise exception.InvalidShareAccess(msg)
share_dir = '/' + share['name']
user_access = ':'.join([access['access_type'],
access['access_to'],
access_level])
cmd = [self._hdfs_bin, 'dfs', '-setfacl', '-m', '-R',
user_access, share_dir]
try:
(__, out) = self._hdfs_execute(*cmd, check_exit_code=True)
except exception.ProcessExecutionError as e:
msg = (_('Failed to set ACL of share %(sharename)s for '
'user: %(username)s'
'Error: %(excmsg)s.') %
{'sharename': share['name'],
'username': access['access_to'],
'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
def deny_access(self, context, share, access, share_server=None):
"""Denies the access to the share for a given user."""
share_dir = '/' + share['name']
access_name = ':'.join([access['access_type'], access['access_to']])
cmd = [self._hdfs_bin, 'dfs', '-setfacl', '-x', '-R',
access_name, share_dir]
try:
(__, out) = self._hdfs_execute(*cmd, check_exit_code=True)
except exception.ProcessExecutionError as e:
msg = (_('Failed to deny ACL of share %(sharename)s for '
'user: %(username)s'
'Error: %(excmsg)s.') %
{'sharename': share['name'],
'username': access['access_to'],
'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
def _check_hdfs_state(self):
try:
(out, __) = self._hdfs_execute(self._hdfs_bin, 'fsck', '/')
except exception.ProcessExecutionError as e:
msg = _('Failed to check the utility of hdfs.')
LOG.error(msg)
raise exception.HDFSException(msg)
lines = out.splitlines()
try:
hdfs_state = lines[1].split()[1]
except (IndexError, ValueError) as e:
msg = (_('Failed to check hdfs state. Error: %(excmsg)s.') %
{'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
if hdfs_state.upper() != 'HEALTHY':
return False
return True
def check_for_setup_error(self):
"""Return an error if the prerequisites are met."""
if not self.configuration.hdfs_namenode_ip:
msg = _('Not specify the hdfs cluster yet! '
'Add the ip of hdfs namenode in the '
'hdfs_namenode_ip configuration parameter.')
LOG.error(msg)
raise exception.HDFSException(msg)
if not self._check_hdfs_state():
msg = _('HDFS is not in healthy state.')
LOG.error(msg)
raise exception.HDFSException(msg)
def _get_available_capacity(self):
"""Calculate available space on path."""
try:
(out, __) = self._hdfs_execute(self._hdfs_bin, 'dfsadmin',
'-report')
except exception.ProcessExecutionError as e:
msg = (_('Failed to check available capacity for hdfs.'
'Error: %(excmsg)s.') %
{'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
lines = out.splitlines()
try:
total = int(lines[1].split()[2])
free = int(lines[2].split()[2])
except (IndexError, ValueError) as e:
msg = (_('Failed to get hdfs capacity info. '
'Error: %(excmsg)s.') %
{'excmsg': six.text_type(e)})
LOG.error(msg)
raise exception.HDFSException(msg)
return total, free
def _update_share_stats(self):
"""Retrieves stats info of share directories group."""
data = dict(share_backend_name=self.backend_name,
storage_protocol='HDFS',
reserved_percentage=self.configuration.
reserved_share_percentage)
total, free = self._get_available_capacity()
data['total_capacity_gb'] = math.ceil(total / units.Gi)
data['free_capacity_gb'] = math.ceil(free / units.Gi)
super(HDFSNativeShareDriver, self)._update_share_stats(data)

View File

@ -50,6 +50,7 @@ def fake_access(**kwargs):
'id': 'fakeaccid',
'access_type': 'ip',
'access_to': '10.0.0.1',
'access_level': 'rw',
'state': 'active',
}
access.update(kwargs)

View File

@ -0,0 +1,425 @@
# Copyright 2015 Intel, Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for HDFS native protocol driver module."""
import socket
import mock
from oslo_concurrency import processutils
from oslo_config import cfg
import six
from manila import context
from manila import exception
import manila.share.configuration as config
import manila.share.drivers.hdfs.hdfs_native as hdfs_native
from manila import test
from manila.tests import fake_share
from manila import utils
CONF = cfg.CONF
class HDFSNativeShareDriverTestCase(test.TestCase):
"""Tests HDFSNativeShareDriver."""
def setUp(self):
super(HDFSNativeShareDriverTestCase, self).setUp()
self._context = context.get_admin_context()
self._hdfs_execute = mock.Mock(return_value=('', ''))
self.local_ip = '192.168.1.1'
CONF.set_default('driver_handles_share_servers', False)
CONF.set_default('hdfs_namenode_ip', self.local_ip)
CONF.set_default('hdfs_ssh_name', 'fake_sshname')
CONF.set_default('hdfs_ssh_pw', 'fake_sshpw')
CONF.set_default('hdfs_ssh_private_key', 'fake_sshkey')
self.fake_conf = config.Configuration(None)
self._db = mock.Mock()
self._driver = hdfs_native.HDFSNativeShareDriver(
self._db,
execute=self._hdfs_execute,
configuration=self.fake_conf)
self._driver._hdfs_bin = 'fake_hdfs_bin'
self.share = fake_share.fake_share(share_proto='HDFS')
self.snapshot = fake_share.fake_snapshot(share_proto='HDFS')
self.access = fake_share.fake_access(access_type='user')
self.fakesharepath = 'hdfs://1.2.3.4:5/share-0'
self.fakesnapshotpath = '/share-0/.snapshot/snapshot-0'
socket.gethostname = mock.Mock(return_value='testserver')
socket.gethostbyname_ex = mock.Mock(return_value=(
'localhost',
['localhost.localdomain', 'testserver'],
['127.0.0.1', self.local_ip]))
def test_do_setup(self):
self._driver._get_hdfs_bin_path = mock.Mock()
self._driver.do_setup(self._context)
self._driver._get_hdfs_bin_path.assert_called_once_with()
def test_create_share(self):
self._driver._create_share = mock.Mock()
self._driver._get_share_path = mock.Mock(
return_value=self.fakesharepath)
result = self._driver.create_share(self._context, self.share,
share_server=None)
self._driver._create_share.assert_called_once_with(self.share)
self._driver._get_share_path.assert_called_once_with(self.share)
self.assertEqual(self.fakesharepath, result)
def test_create_share_unsupported_proto(self):
self._driver._get_share_path = mock.Mock()
self.assertRaises(exception.HDFSException,
self._driver.create_share,
self._context,
fake_share.fake_share(),
share_server=None)
self.assertFalse(self._driver._get_share_path.called)
def test__create_share(self):
share_dir = '/' + self.share['name']
sizestr = six.text_type(self.share['size']) + 'g'
self._driver._hdfs_execute = mock.Mock(return_value=True)
self._driver._create_share(self.share)
self._driver._hdfs_execute.assert_any_call(
'fake_hdfs_bin', 'dfs', '-mkdir', share_dir)
self._driver._hdfs_execute.assert_any_call(
'fake_hdfs_bin', 'dfsadmin', '-setSpaceQuota', sizestr, share_dir)
self._driver._hdfs_execute.assert_any_call(
'fake_hdfs_bin', 'dfsadmin', '-allowSnapshot', share_dir)
def test__create_share_exception(self):
share_dir = '/' + self.share['name']
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver._create_share, self.share)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-mkdir', share_dir)
def test_create_share_from_snapshot(self):
self._driver._hdfs_execute = mock.Mock(return_value=True)
self._driver._create_share = mock.Mock(return_value=True)
self._driver._get_share_path = mock.Mock(return_value=self.
fakesharepath)
self._driver._get_snapshot_path = mock.Mock(return_value=self.
fakesnapshotpath)
result = self._driver.create_share_from_snapshot(self._context,
self.share,
self.snapshot,
share_server=None)
self._driver._create_share.assert_called_once_with(self.share)
self._driver._get_snapshot_path.assert_called_once_with(
self.snapshot)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-cp', '-r',
self.fakesnapshotpath, '/' + self.share['name'])
self._driver._get_share_path.assert_called_once_with(self.share)
self.assertEqual(self.fakesharepath, result)
def test_create_share_from_snapshot_exception(self):
self._driver._create_share = mock.Mock(return_value=True)
self._driver._get_snapshot_path = mock.Mock(return_value=self.
fakesnapshotpath)
self._driver._get_share_path = mock.Mock(return_value=self.
fakesharepath)
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver.create_share_from_snapshot,
self._context, self.share,
self.snapshot, share_server=None)
self._driver._create_share.assert_called_once_with(self.share)
self._driver._get_snapshot_path.assert_called_once_with(self.snapshot)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-cp', '-r',
self.fakesnapshotpath, '/' + self.share['name'])
self.assertFalse(self._driver._get_share_path.called)
def test_create_snapshot(self):
self._driver._hdfs_execute = mock.Mock(return_value=True)
self._driver.create_snapshot(self._context, self.snapshot,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-createSnapshot',
'/' + self.snapshot['share_name'], self.snapshot['name'])
def test_create_snapshot_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver.create_snapshot, self._context,
self.snapshot, share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-createSnapshot',
'/' + self.snapshot['share_name'], self.snapshot['name'])
def test_delete_share(self):
self._driver._hdfs_execute = mock.Mock(return_value=True)
self._driver.delete_share(self._context,
self.share,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-rm', '-r',
'/' + self.share['name'])
def test_delete_share_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver.delete_share,
self._context,
self.share,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-rm', '-r',
'/' + self.share['name'])
def test_delete_snapshot(self):
self._driver._hdfs_execute = mock.Mock(return_value=True)
self._driver.delete_snapshot(self._context,
self.snapshot,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-deleteSnapshot',
'/' + self.snapshot['share_name'], self.snapshot['name'])
def test_delete_snapshot_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver.delete_snapshot,
self._context,
self.snapshot,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-deleteSnapshot',
'/' + self.snapshot['share_name'], self.snapshot['name'])
def test_allow_access(self):
self._driver._hdfs_execute = mock.Mock(
return_value=['', ''])
share_dir = '/' + self.share['name']
user_access = ':'.join([self.access['access_type'],
self.access['access_to'],
'rwx'])
cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-m', '-R',
user_access, share_dir]
self._driver.allow_access(self._context, self.share, self.access,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
*cmd, check_exit_code=True)
def test_allow_access_invalid_access_type(self):
self.assertRaises(exception.InvalidShareAccess,
self._driver.allow_access,
self._context,
self.share,
fake_share.fake_access(
access_type='invalid_access_type'),
share_server=None)
def test_allow_access_invalid_access_level(self):
self.assertRaises(exception.InvalidShareAccess,
self._driver.allow_access,
self._context,
self.share,
fake_share.fake_access(
access_level='invalid_access_level'),
share_server=None)
def test_allow_access_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
share_dir = '/' + self.share['name']
user_access = ':'.join([self.access['access_type'],
self.access['access_to'],
'rwx'])
cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-m', '-R',
user_access, share_dir]
self.assertRaises(exception.HDFSException,
self._driver.allow_access,
self._context,
self.share,
self.access,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
*cmd, check_exit_code=True)
def test_deny_access(self):
self._driver._hdfs_execute = mock.Mock(return_value=['', ''])
share_dir = '/' + self.share['name']
access_name = ':'.join([self.access['access_type'],
self.access['access_to']])
cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-x', '-R',
access_name, share_dir]
self._driver.deny_access(self._context,
self.share,
self.access,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
*cmd, check_exit_code=True)
def test_deny_access_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
share_dir = '/' + self.share['name']
access_name = ':'.join([self.access['access_type'],
self.access['access_to']])
cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-x', '-R',
access_name, share_dir]
self.assertRaises(exception.HDFSException,
self._driver.deny_access,
self._context,
self.share,
self.access,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
*cmd, check_exit_code=True)
def test__check_hdfs_state_healthy(self):
fake_out = "fakeinfo\n...Status: HEALTHY"
self._driver._hdfs_execute = mock.Mock(return_value=(fake_out, ''))
result = self._driver._check_hdfs_state()
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'fsck', '/')
self.assertEqual(True, result)
def test__check_hdfs_state_down(self):
fake_out = "fakeinfo\n...Status: DOWN"
self._driver._hdfs_execute = mock.Mock(return_value=(fake_out, ''))
result = self._driver._check_hdfs_state()
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'fsck', '/')
self.assertEqual(False, result)
def test__check_hdfs_state_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver._check_hdfs_state)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'fsck', '/')
def test__get_available_capacity(self):
fake_out = 'Configured Capacity: 2.4\n' + \
'Total Capacity: 2\n' + \
'DFS free: 1'
self._driver._hdfs_execute = mock.Mock(return_value=(fake_out, ''))
total, free = self._driver._get_available_capacity()
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfsadmin', '-report')
self.assertEqual(2, total)
self.assertEqual(1, free)
def test__get_available_capacity_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver._get_available_capacity)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfsadmin', '-report')
def test_get_share_stats_refresh_false(self):
self._driver._stats = {'fake_key': 'fake_value'}
result = self._driver.get_share_stats(False)
self.assertEqual(self._driver._stats, result)
def test_get_share_stats_refresh_true(self):
self._driver._get_available_capacity = mock.Mock(
return_value=(11111.0, 12345.0))
result = self._driver.get_share_stats(True)
expected_keys = [
'QoS_support', 'driver_version', 'share_backend_name',
'free_capacity_gb', 'total_capacity_gb',
'driver_handles_share_servers',
'reserved_percentage', 'vendor_name', 'storage_protocol',
]
for key in expected_keys:
self.assertIn(key, result)
self.assertEqual('HDFS', result['storage_protocol'])
self._driver._get_available_capacity.assert_called_once_with()
def test__hdfs_local_execute(self):
cmd = 'testcmd'
self.mock_object(utils, 'execute', mock.Mock(return_value=True))
self._driver._hdfs_local_execute(cmd)
utils.execute.assert_called_once_with(cmd, run_as_root=True)
def test__hdfs_remote_execute(self):
self._driver._run_ssh = mock.Mock(return_value=True)
cmd = 'testcmd'
self._driver._hdfs_remote_execute(cmd, check_exit_code=True)
self._driver._run_ssh.assert_called_once_with(
self.local_ip, tuple([cmd]), True)
def test__run_ssh(self):
ssh_output = 'fake_ssh_output'
cmd_list = ['fake', 'cmd']
ssh = mock.Mock()
ssh.get_transport = mock.Mock()
ssh.get_transport().is_active = mock.Mock(return_value=True)
ssh_pool = mock.Mock()
ssh_pool.create = mock.Mock(return_value=ssh)
self.mock_object(utils, 'SSHPool', mock.Mock(return_value=ssh_pool))
self.mock_object(processutils, 'ssh_execute',
mock.Mock(return_value=ssh_output))
result = self._driver._run_ssh(self.local_ip, cmd_list)
utils.SSHPool.assert_called_once_with(
self._driver.configuration.hdfs_namenode_ip,
self._driver.configuration.hdfs_ssh_port,
self._driver.configuration.ssh_conn_timeout,
self._driver.configuration.hdfs_ssh_name,
password=self._driver.configuration.hdfs_ssh_pw,
privatekey=self._driver.configuration.hdfs_ssh_private_key,
min_size=self._driver.configuration.ssh_min_pool_conn,
max_size=self._driver.configuration.ssh_max_pool_conn)
ssh_pool.create.assert_called_once_with()
ssh.get_transport().is_active.assert_called_once_with()
processutils.ssh_execute.assert_called_once_with(
ssh, 'fake cmd', check_exit_code=False)
self.assertEqual(ssh_output, result)
def test__run_ssh_exception(self):
cmd_list = ['fake', 'cmd']
ssh = mock.Mock()
ssh.get_transport = mock.Mock()
ssh.get_transport().is_active = mock.Mock(return_value=True)
ssh_pool = mock.Mock()
ssh_pool.create = mock.Mock(return_value=ssh)
self.mock_object(utils, 'SSHPool', mock.Mock(return_value=ssh_pool))
self.mock_object(processutils, 'ssh_execute',
mock.Mock(side_effect=Exception))
self.assertRaises(exception.HDFSException,
self._driver._run_ssh,
self.local_ip,
cmd_list)
utils.SSHPool.assert_called_once_with(
self._driver.configuration.hdfs_namenode_ip,
self._driver.configuration.hdfs_ssh_port,
self._driver.configuration.ssh_conn_timeout,
self._driver.configuration.hdfs_ssh_name,
password=self._driver.configuration.hdfs_ssh_pw,
privatekey=self._driver.configuration.hdfs_ssh_private_key,
min_size=self._driver.configuration.ssh_min_pool_conn,
max_size=self._driver.configuration.ssh_max_pool_conn)
ssh_pool.create.assert_called_once_with()
ssh.get_transport().is_active.assert_called_once_with()
processutils.ssh_execute.assert_called_once_with(
ssh, 'fake cmd', check_exit_code=False)