diff --git a/manila/exception.py b/manila/exception.py index b2e7c0c322..a446f2af55 100644 --- a/manila/exception.py +++ b/manila/exception.py @@ -497,3 +497,7 @@ class SSHException(ManilaException): class SopAPIError(Invalid): message = _("%(err)s") + + +class HDFSException(ManilaException): + message = _("HDFS exception occurred!") diff --git a/manila/opts.py b/manila/opts.py index b6d785751b..c81e21d675 100644 --- a/manila/opts.py +++ b/manila/opts.py @@ -52,6 +52,7 @@ import manila.share.drivers.emc.driver import manila.share.drivers.generic import manila.share.drivers.glusterfs import manila.share.drivers.glusterfs_native +import manila.share.drivers.hdfs.hdfs_native import manila.share.drivers.hds.sop import manila.share.drivers.hp.hp_3par_driver import manila.share.drivers.huawei.huawei_nas @@ -109,6 +110,7 @@ _global_opt_lists = [ manila.share.drivers.generic.share_opts, manila.share.drivers.glusterfs.GlusterfsManilaShare_opts, manila.share.drivers.glusterfs_native.glusterfs_native_manila_share_opts, + manila.share.drivers.hdfs.hdfs_native.hdfs_native_share_opts, manila.share.drivers.hds.sop.hdssop_share_opts, manila.share.drivers.hp.hp_3par_driver.HP3PAR_OPTS, manila.share.drivers.huawei.huawei_nas.huawei_opts, diff --git a/manila/share/api.py b/manila/share/api.py index 38d023e90b..5939c0364c 100644 --- a/manila/share/api.py +++ b/manila/share/api.py @@ -108,7 +108,7 @@ class API(base.Base): # TODO(rushiagr): Find a suitable place to keep all the allowed # share types so that it becomes easier to add one - if share_proto.lower() not in ['nfs', 'cifs', 'glusterfs']: + if share_proto.lower() not in ['nfs', 'cifs', 'glusterfs', 'hdfs']: msg = (_("Invalid share type provided: %s") % share_proto) raise exception.InvalidInput(reason=msg) diff --git a/manila/share/drivers/hdfs/__init__.py b/manila/share/drivers/hdfs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/manila/share/drivers/hdfs/hdfs_native.py b/manila/share/drivers/hdfs/hdfs_native.py new file mode 100644 index 0000000000..e5bed8217a --- /dev/null +++ b/manila/share/drivers/hdfs/hdfs_native.py @@ -0,0 +1,443 @@ +# Copyright (c) 2015 Intel, Corp. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""HDFS native protocol (hdfs) driver for manila shares. + +Manila share is a directory in HDFS. And this share does not use +service VM instance (share server). The instance directly talks +to the the HDFS cluster. + +The initial version only supports single namenode and flat network. + +Configuration Requirements: + To enable access control, HDFS file system must have ACLs enabled. +""" + +import math +import os +import pipes +import socket + +from oslo_concurrency import processutils +from oslo_config import cfg +from oslo_utils import units +import six + +from manila import exception +from manila.i18n import _ +from manila.openstack.common import log as logging +from manila.share import driver +from manila import utils + +LOG = logging.getLogger(__name__) + +hdfs_native_share_opts = [ + cfg.StrOpt('hdfs_namenode_ip', + default=None, + help='The IP of the HDFS namenode.'), + cfg.IntOpt('hdfs_namenode_port', + default=9000, + help='The port of HDFS namenode service.'), + cfg.IntOpt('hdfs_ssh_port', + default=22, + help='HDFS namenode SSH port.'), + cfg.StrOpt('hdfs_ssh_name', + default=None, + help='HDFS namenode ssh login name.'), + cfg.StrOpt('hdfs_ssh_pw', + default=None, + help='HDFS namenode SSH login password, ' + 'This parameter is not necessary, if ' + '\'hdfs_ssh_private_key\' is configured.'), + cfg.StrOpt('hdfs_ssh_private_key', + default=None, + help='Path to HDFS namenode SSH private ' + 'key for login.'), +] + +CONF = cfg.CONF +CONF.register_opts(hdfs_native_share_opts) + + +class HDFSNativeShareDriver(driver.ExecuteMixin, driver.ShareDriver): + """HDFS Share Driver. + + Executes commands relating to shares. + API version history: + + 1.0 - Initial Version + """ + + def __init__(self, db, *args, **kwargs): + super(HDFSNativeShareDriver, self).__init__(False, *args, **kwargs) + self.db = db + self.configuration.append_config_values(hdfs_native_share_opts) + self.backend_name = self.configuration.safe_get( + 'share_backend_name') or 'HDFS-Native' + self.ssh_connections = {} + self._hdfs_execute = None + self._hdfs_bin = None + self._hdfs_base_path = None + + def do_setup(self, context): + """Do initialization while the share driver starts.""" + super(HDFSNativeShareDriver, self).do_setup(context) + host = self.configuration.hdfs_namenode_ip + local_hosts = socket.gethostbyname_ex(socket.gethostname())[2] + if host in local_hosts: + self._hdfs_execute = self._hdfs_local_execute + else: + self._hdfs_execute = self._hdfs_remote_execute + + self._hdfs_bin = self._get_hdfs_bin_path() + self._hdfs_base_path = ( + 'hdfs://' + self.configuration.hdfs_namenode_ip + ':' + + six.text_type(self.configuration.hdfs_namenode_port)) + + def _hdfs_local_execute(self, *cmd, **kwargs): + if 'run_as_root' not in kwargs: + kwargs.update({'run_as_root': True}) + + return utils.execute(*cmd, **kwargs) + + def _hdfs_remote_execute(self, *cmd, **kwargs): + host = self.configuration.hdfs_namenode_ip + check_exit_code = kwargs.pop('check_exit_code', False) + + return self._run_ssh(host, cmd, check_exit_code) + + def _run_ssh(self, host, cmd_list, check_exit_code=False): + command = ' '.join(pipes.quote(cmd_arg) for cmd_arg in cmd_list) + connection = self.ssh_connections.get(host) + if not connection: + hdfs_ssh_name = self.configuration.hdfs_ssh_name + password = self.configuration.hdfs_ssh_pw + privatekey = self.configuration.hdfs_ssh_private_key + hdfs_ssh_port = self.configuration.hdfs_ssh_port + ssh_conn_timeout = self.configuration.ssh_conn_timeout + min_size = self.configuration.ssh_min_pool_conn + max_size = self.configuration.ssh_max_pool_conn + + ssh_pool = utils.SSHPool(host, + hdfs_ssh_port, + ssh_conn_timeout, + hdfs_ssh_name, + password=password, + privatekey=privatekey, + min_size=min_size, + max_size=max_size) + ssh = ssh_pool.create() + self.ssh_connections[host] = (ssh_pool, ssh) + else: + ssh_pool, ssh = connection + + if not ssh.get_transport().is_active(): + ssh_pool.remove(ssh) + ssh = ssh_pool.create() + self.ssh_connections[host] = (ssh_pool, ssh) + + try: + return processutils.ssh_execute( + ssh, + command, + check_exit_code=check_exit_code) + except Exception as e: + msg = (_('Error running SSH command: %(cmd)s. ' + 'Error: %(excmsg)s.') % + {'cmd': command, 'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + + def _get_hdfs_bin_path(self): + try: + (out, __) = self._hdfs_execute('locate', '/bin/hdfs') + except exception.ProcessExecutionError as e: + msg = (_('Can not get the execution path of hdfs. ' + 'Error: %(excmsg)s.') % + {'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + + lines = out.splitlines() + if lines and lines[0].endswith('/bin/hdfs'): + return lines[0] + else: + msg = _('Can not get the execution path of hdfs.') + LOG.error(msg) + raise exception.HDFSException(msg) + + def _create_share(self, share): + """Creates a share.""" + if share['share_proto'].lower() != 'hdfs': + msg = _('Only HDFS protocol supported!') + LOG.error(msg) + raise exception.HDFSException(msg) + + share_dir = '/' + share['name'] + sizestr = six.text_type(share['size']) + 'g' + + try: + self._hdfs_execute(self._hdfs_bin, 'dfs', + '-mkdir', share_dir) + except exception.ProcessExecutionError as e: + msg = (_('Failed to create directory in hdfs for the ' + 'share %(sharename)s. Error: %(excmsg)s.') % + {'sharename': share['name'], + 'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + + try: + self._hdfs_execute(self._hdfs_bin, 'dfsadmin', + '-setSpaceQuota', sizestr, share_dir) + except exception.ProcessExecutionError as e: + msg = (_('Failed to set space quota for the ' + 'share %(sharename)s. Error: %(excmsg)s.') % + {'sharename': share['name'], + 'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + + try: + self._hdfs_execute(self._hdfs_bin, 'dfsadmin', + '-allowSnapshot', share_dir) + except exception.ProcessExecutionError as e: + msg = (_('Failed to allow snapshot for the ' + 'share %(sharename)s. Error: %(excmsg)s.') % + {'sharename': share['name'], + 'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + + def _get_share_path(self, share): + """Return share path on storage provider.""" + return os.path.join(self._hdfs_base_path, share['name']) + + def _get_snapshot_path(self, snapshot): + """Return snapshot path on storage provider.""" + snapshot_dir = '.snapshot' + return os.path.join('/', snapshot['share_name'], + snapshot_dir, snapshot['name']) + + def get_network_allocations_number(self): + return 0 + + def create_share(self, context, share, share_server=None): + """Create a HDFS directory which acted as a share.""" + self._create_share(share) + return self._get_share_path(share) + + def create_share_from_snapshot(self, context, share, snapshot, + share_server=None): + """Creates a snapshot.""" + self._create_share(share) + share_path = '/' + share['name'] + snapshot_path = self._get_snapshot_path(snapshot) + + cmd = [self._hdfs_bin, 'dfs', '-cp', '-r', + snapshot_path, share_path] + try: + self._hdfs_execute(*cmd) + except exception.ProcessExecutionError as e: + msg = (_('Failed to create share %(sharename)s from ' + 'snapshot %(snapshotname)s. Error: %(excmsg)s.') % + {'sharename': share['name'], + 'snapshotname': snapshot['name'], + 'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + + return self._get_share_path(share) + + def create_snapshot(self, context, snapshot, share_server=None): + """Creates a snapshot.""" + share_dir = '/' + snapshot['share_name'] + snapshot_name = snapshot['name'] + + cmd = [self._hdfs_bin, 'dfs', '-createSnapshot', + share_dir, snapshot_name] + try: + self._hdfs_execute(*cmd) + except exception.ProcessExecutionError as e: + msg = (_('Failed to create snapshot %(snapshotname)s for ' + 'the share %(sharename)s. Error: %(excmsg)s.') % + {'snapshotname': snapshot_name, + 'sharename': snapshot['share_name'], + 'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + + def delete_share(self, context, share, share_server=None): + """Deletes share storage.""" + share_dir = '/' + share['name'] + + cmd = [self._hdfs_bin, 'dfs', '-rm', '-r', share_dir] + try: + self._hdfs_execute(*cmd) + except exception.ProcessExecutionError as e: + msg = (_('Failed to delete share %(sharename)s. ' + 'Error: %(excmsg)s.') % + {'sharename': share['name'], + 'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + + def delete_snapshot(self, context, snapshot, share_server=None): + """Deletes a snapshot.""" + share_dir = '/' + snapshot['share_name'] + + cmd = [self._hdfs_bin, 'dfs', '-deleteSnapshot', + share_dir, snapshot['name']] + try: + self._hdfs_execute(*cmd) + except exception.ProcessExecutionError as e: + msg = (_('Failed to delete snapshot %(snapshotname)s. ' + 'Error: %(excmsg)s.') % + {'snapshotname': snapshot['name'], + 'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + + def ensure_share(self, context, share, share_server=None): + """Ensure the storage are exported.""" + + def allow_access(self, context, share, access, share_server=None): + """Allows access to the share for a given user.""" + if access['access_type'] != 'user': + msg = _("Only 'user' access type allowed!") + LOG.error(msg) + raise exception.InvalidShareAccess(msg) + + # Note(jun): For directories in HDFS, the x permission is + # required to access a child of the directory. + if access['access_level'] == 'rw': + access_level = 'rwx' + elif access['access_level'] == 'ro': + access_level = 'r-x' + else: + msg = (_('The access level %(accesslevel)s was unsupported.') % + {'accesslevel': access['access_level']}) + LOG.error(msg) + raise exception.InvalidShareAccess(msg) + + share_dir = '/' + share['name'] + user_access = ':'.join([access['access_type'], + access['access_to'], + access_level]) + + cmd = [self._hdfs_bin, 'dfs', '-setfacl', '-m', '-R', + user_access, share_dir] + try: + (__, out) = self._hdfs_execute(*cmd, check_exit_code=True) + except exception.ProcessExecutionError as e: + msg = (_('Failed to set ACL of share %(sharename)s for ' + 'user: %(username)s' + 'Error: %(excmsg)s.') % + {'sharename': share['name'], + 'username': access['access_to'], + 'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + + def deny_access(self, context, share, access, share_server=None): + """Denies the access to the share for a given user.""" + share_dir = '/' + share['name'] + access_name = ':'.join([access['access_type'], access['access_to']]) + + cmd = [self._hdfs_bin, 'dfs', '-setfacl', '-x', '-R', + access_name, share_dir] + try: + (__, out) = self._hdfs_execute(*cmd, check_exit_code=True) + except exception.ProcessExecutionError as e: + msg = (_('Failed to deny ACL of share %(sharename)s for ' + 'user: %(username)s' + 'Error: %(excmsg)s.') % + {'sharename': share['name'], + 'username': access['access_to'], + 'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + + def _check_hdfs_state(self): + try: + (out, __) = self._hdfs_execute(self._hdfs_bin, 'fsck', '/') + except exception.ProcessExecutionError as e: + msg = _('Failed to check the utility of hdfs.') + LOG.error(msg) + raise exception.HDFSException(msg) + lines = out.splitlines() + try: + hdfs_state = lines[1].split()[1] + except (IndexError, ValueError) as e: + msg = (_('Failed to check hdfs state. Error: %(excmsg)s.') % + {'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + if hdfs_state.upper() != 'HEALTHY': + return False + return True + + def check_for_setup_error(self): + """Return an error if the prerequisites are met.""" + if not self.configuration.hdfs_namenode_ip: + msg = _('Not specify the hdfs cluster yet! ' + 'Add the ip of hdfs namenode in the ' + 'hdfs_namenode_ip configuration parameter.') + LOG.error(msg) + raise exception.HDFSException(msg) + + if not self._check_hdfs_state(): + msg = _('HDFS is not in healthy state.') + LOG.error(msg) + raise exception.HDFSException(msg) + + def _get_available_capacity(self): + """Calculate available space on path.""" + try: + (out, __) = self._hdfs_execute(self._hdfs_bin, 'dfsadmin', + '-report') + except exception.ProcessExecutionError as e: + msg = (_('Failed to check available capacity for hdfs.' + 'Error: %(excmsg)s.') % + {'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + + lines = out.splitlines() + try: + total = int(lines[1].split()[2]) + free = int(lines[2].split()[2]) + except (IndexError, ValueError) as e: + msg = (_('Failed to get hdfs capacity info. ' + 'Error: %(excmsg)s.') % + {'excmsg': six.text_type(e)}) + LOG.error(msg) + raise exception.HDFSException(msg) + return total, free + + def _update_share_stats(self): + """Retrieves stats info of share directories group.""" + + data = dict(share_backend_name=self.backend_name, + storage_protocol='HDFS', + reserved_percentage=self.configuration. + reserved_share_percentage) + + total, free = self._get_available_capacity() + + data['total_capacity_gb'] = math.ceil(total / units.Gi) + data['free_capacity_gb'] = math.ceil(free / units.Gi) + + super(HDFSNativeShareDriver, self)._update_share_stats(data) diff --git a/manila/tests/fake_share.py b/manila/tests/fake_share.py index da6cefb29e..2f4a52964f 100644 --- a/manila/tests/fake_share.py +++ b/manila/tests/fake_share.py @@ -50,6 +50,7 @@ def fake_access(**kwargs): 'id': 'fakeaccid', 'access_type': 'ip', 'access_to': '10.0.0.1', + 'access_level': 'rw', 'state': 'active', } access.update(kwargs) diff --git a/manila/tests/share/drivers/hdfs/__init__.py b/manila/tests/share/drivers/hdfs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/manila/tests/share/drivers/hdfs/test_hdfs_native.py b/manila/tests/share/drivers/hdfs/test_hdfs_native.py new file mode 100644 index 0000000000..296bf286af --- /dev/null +++ b/manila/tests/share/drivers/hdfs/test_hdfs_native.py @@ -0,0 +1,425 @@ +# Copyright 2015 Intel, Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Unit tests for HDFS native protocol driver module.""" + +import socket + +import mock +from oslo_concurrency import processutils +from oslo_config import cfg +import six + +from manila import context +from manila import exception +import manila.share.configuration as config +import manila.share.drivers.hdfs.hdfs_native as hdfs_native +from manila import test +from manila.tests import fake_share +from manila import utils + + +CONF = cfg.CONF + + +class HDFSNativeShareDriverTestCase(test.TestCase): + """Tests HDFSNativeShareDriver.""" + + def setUp(self): + super(HDFSNativeShareDriverTestCase, self).setUp() + self._context = context.get_admin_context() + self._hdfs_execute = mock.Mock(return_value=('', '')) + self.local_ip = '192.168.1.1' + + CONF.set_default('driver_handles_share_servers', False) + CONF.set_default('hdfs_namenode_ip', self.local_ip) + CONF.set_default('hdfs_ssh_name', 'fake_sshname') + CONF.set_default('hdfs_ssh_pw', 'fake_sshpw') + CONF.set_default('hdfs_ssh_private_key', 'fake_sshkey') + + self.fake_conf = config.Configuration(None) + self._db = mock.Mock() + self._driver = hdfs_native.HDFSNativeShareDriver( + self._db, + execute=self._hdfs_execute, + configuration=self.fake_conf) + self._driver._hdfs_bin = 'fake_hdfs_bin' + self.share = fake_share.fake_share(share_proto='HDFS') + self.snapshot = fake_share.fake_snapshot(share_proto='HDFS') + self.access = fake_share.fake_access(access_type='user') + self.fakesharepath = 'hdfs://1.2.3.4:5/share-0' + self.fakesnapshotpath = '/share-0/.snapshot/snapshot-0' + + socket.gethostname = mock.Mock(return_value='testserver') + socket.gethostbyname_ex = mock.Mock(return_value=( + 'localhost', + ['localhost.localdomain', 'testserver'], + ['127.0.0.1', self.local_ip])) + + def test_do_setup(self): + self._driver._get_hdfs_bin_path = mock.Mock() + self._driver.do_setup(self._context) + self._driver._get_hdfs_bin_path.assert_called_once_with() + + def test_create_share(self): + self._driver._create_share = mock.Mock() + self._driver._get_share_path = mock.Mock( + return_value=self.fakesharepath) + result = self._driver.create_share(self._context, self.share, + share_server=None) + self._driver._create_share.assert_called_once_with(self.share) + self._driver._get_share_path.assert_called_once_with(self.share) + + self.assertEqual(self.fakesharepath, result) + + def test_create_share_unsupported_proto(self): + self._driver._get_share_path = mock.Mock() + self.assertRaises(exception.HDFSException, + self._driver.create_share, + self._context, + fake_share.fake_share(), + share_server=None) + self.assertFalse(self._driver._get_share_path.called) + + def test__create_share(self): + share_dir = '/' + self.share['name'] + sizestr = six.text_type(self.share['size']) + 'g' + self._driver._hdfs_execute = mock.Mock(return_value=True) + self._driver._create_share(self.share) + self._driver._hdfs_execute.assert_any_call( + 'fake_hdfs_bin', 'dfs', '-mkdir', share_dir) + self._driver._hdfs_execute.assert_any_call( + 'fake_hdfs_bin', 'dfsadmin', '-setSpaceQuota', sizestr, share_dir) + self._driver._hdfs_execute.assert_any_call( + 'fake_hdfs_bin', 'dfsadmin', '-allowSnapshot', share_dir) + + def test__create_share_exception(self): + share_dir = '/' + self.share['name'] + self._driver._hdfs_execute = mock.Mock( + side_effect=exception.ProcessExecutionError) + self.assertRaises(exception.HDFSException, + self._driver._create_share, self.share) + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'dfs', '-mkdir', share_dir) + + def test_create_share_from_snapshot(self): + self._driver._hdfs_execute = mock.Mock(return_value=True) + self._driver._create_share = mock.Mock(return_value=True) + self._driver._get_share_path = mock.Mock(return_value=self. + fakesharepath) + self._driver._get_snapshot_path = mock.Mock(return_value=self. + fakesnapshotpath) + result = self._driver.create_share_from_snapshot(self._context, + self.share, + self.snapshot, + share_server=None) + self._driver._create_share.assert_called_once_with(self.share) + self._driver._get_snapshot_path.assert_called_once_with( + self.snapshot) + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'dfs', '-cp', '-r', + self.fakesnapshotpath, '/' + self.share['name']) + self._driver._get_share_path.assert_called_once_with(self.share) + self.assertEqual(self.fakesharepath, result) + + def test_create_share_from_snapshot_exception(self): + self._driver._create_share = mock.Mock(return_value=True) + self._driver._get_snapshot_path = mock.Mock(return_value=self. + fakesnapshotpath) + self._driver._get_share_path = mock.Mock(return_value=self. + fakesharepath) + self._driver._hdfs_execute = mock.Mock( + side_effect=exception.ProcessExecutionError) + self.assertRaises(exception.HDFSException, + self._driver.create_share_from_snapshot, + self._context, self.share, + self.snapshot, share_server=None) + self._driver._create_share.assert_called_once_with(self.share) + self._driver._get_snapshot_path.assert_called_once_with(self.snapshot) + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'dfs', '-cp', '-r', + self.fakesnapshotpath, '/' + self.share['name']) + self.assertFalse(self._driver._get_share_path.called) + + def test_create_snapshot(self): + self._driver._hdfs_execute = mock.Mock(return_value=True) + self._driver.create_snapshot(self._context, self.snapshot, + share_server=None) + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'dfs', '-createSnapshot', + '/' + self.snapshot['share_name'], self.snapshot['name']) + + def test_create_snapshot_exception(self): + self._driver._hdfs_execute = mock.Mock( + side_effect=exception.ProcessExecutionError) + self.assertRaises(exception.HDFSException, + self._driver.create_snapshot, self._context, + self.snapshot, share_server=None) + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'dfs', '-createSnapshot', + '/' + self.snapshot['share_name'], self.snapshot['name']) + + def test_delete_share(self): + self._driver._hdfs_execute = mock.Mock(return_value=True) + self._driver.delete_share(self._context, + self.share, + share_server=None) + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'dfs', '-rm', '-r', + '/' + self.share['name']) + + def test_delete_share_exception(self): + self._driver._hdfs_execute = mock.Mock( + side_effect=exception.ProcessExecutionError) + self.assertRaises(exception.HDFSException, + self._driver.delete_share, + self._context, + self.share, + share_server=None) + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'dfs', '-rm', '-r', + '/' + self.share['name']) + + def test_delete_snapshot(self): + self._driver._hdfs_execute = mock.Mock(return_value=True) + self._driver.delete_snapshot(self._context, + self.snapshot, + share_server=None) + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'dfs', '-deleteSnapshot', + '/' + self.snapshot['share_name'], self.snapshot['name']) + + def test_delete_snapshot_exception(self): + self._driver._hdfs_execute = mock.Mock( + side_effect=exception.ProcessExecutionError) + self.assertRaises(exception.HDFSException, + self._driver.delete_snapshot, + self._context, + self.snapshot, + share_server=None) + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'dfs', '-deleteSnapshot', + '/' + self.snapshot['share_name'], self.snapshot['name']) + + def test_allow_access(self): + self._driver._hdfs_execute = mock.Mock( + return_value=['', '']) + share_dir = '/' + self.share['name'] + user_access = ':'.join([self.access['access_type'], + self.access['access_to'], + 'rwx']) + cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-m', '-R', + user_access, share_dir] + self._driver.allow_access(self._context, self.share, self.access, + share_server=None) + self._driver._hdfs_execute.assert_called_once_with( + *cmd, check_exit_code=True) + + def test_allow_access_invalid_access_type(self): + self.assertRaises(exception.InvalidShareAccess, + self._driver.allow_access, + self._context, + self.share, + fake_share.fake_access( + access_type='invalid_access_type'), + share_server=None) + + def test_allow_access_invalid_access_level(self): + self.assertRaises(exception.InvalidShareAccess, + self._driver.allow_access, + self._context, + self.share, + fake_share.fake_access( + access_level='invalid_access_level'), + share_server=None) + + def test_allow_access_exception(self): + self._driver._hdfs_execute = mock.Mock( + side_effect=exception.ProcessExecutionError) + share_dir = '/' + self.share['name'] + user_access = ':'.join([self.access['access_type'], + self.access['access_to'], + 'rwx']) + cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-m', '-R', + user_access, share_dir] + self.assertRaises(exception.HDFSException, + self._driver.allow_access, + self._context, + self.share, + self.access, + share_server=None) + self._driver._hdfs_execute.assert_called_once_with( + *cmd, check_exit_code=True) + + def test_deny_access(self): + self._driver._hdfs_execute = mock.Mock(return_value=['', '']) + share_dir = '/' + self.share['name'] + access_name = ':'.join([self.access['access_type'], + self.access['access_to']]) + cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-x', '-R', + access_name, share_dir] + self._driver.deny_access(self._context, + self.share, + self.access, + share_server=None) + self._driver._hdfs_execute.assert_called_once_with( + *cmd, check_exit_code=True) + + def test_deny_access_exception(self): + self._driver._hdfs_execute = mock.Mock( + side_effect=exception.ProcessExecutionError) + share_dir = '/' + self.share['name'] + access_name = ':'.join([self.access['access_type'], + self.access['access_to']]) + cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-x', '-R', + access_name, share_dir] + self.assertRaises(exception.HDFSException, + self._driver.deny_access, + self._context, + self.share, + self.access, + share_server=None) + self._driver._hdfs_execute.assert_called_once_with( + *cmd, check_exit_code=True) + + def test__check_hdfs_state_healthy(self): + fake_out = "fakeinfo\n...Status: HEALTHY" + self._driver._hdfs_execute = mock.Mock(return_value=(fake_out, '')) + result = self._driver._check_hdfs_state() + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'fsck', '/') + self.assertEqual(True, result) + + def test__check_hdfs_state_down(self): + fake_out = "fakeinfo\n...Status: DOWN" + self._driver._hdfs_execute = mock.Mock(return_value=(fake_out, '')) + result = self._driver._check_hdfs_state() + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'fsck', '/') + self.assertEqual(False, result) + + def test__check_hdfs_state_exception(self): + self._driver._hdfs_execute = mock.Mock( + side_effect=exception.ProcessExecutionError) + self.assertRaises(exception.HDFSException, + self._driver._check_hdfs_state) + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'fsck', '/') + + def test__get_available_capacity(self): + fake_out = 'Configured Capacity: 2.4\n' + \ + 'Total Capacity: 2\n' + \ + 'DFS free: 1' + self._driver._hdfs_execute = mock.Mock(return_value=(fake_out, '')) + total, free = self._driver._get_available_capacity() + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'dfsadmin', '-report') + self.assertEqual(2, total) + self.assertEqual(1, free) + + def test__get_available_capacity_exception(self): + self._driver._hdfs_execute = mock.Mock( + side_effect=exception.ProcessExecutionError) + self.assertRaises(exception.HDFSException, + self._driver._get_available_capacity) + self._driver._hdfs_execute.assert_called_once_with( + 'fake_hdfs_bin', 'dfsadmin', '-report') + + def test_get_share_stats_refresh_false(self): + self._driver._stats = {'fake_key': 'fake_value'} + result = self._driver.get_share_stats(False) + self.assertEqual(self._driver._stats, result) + + def test_get_share_stats_refresh_true(self): + self._driver._get_available_capacity = mock.Mock( + return_value=(11111.0, 12345.0)) + result = self._driver.get_share_stats(True) + expected_keys = [ + 'QoS_support', 'driver_version', 'share_backend_name', + 'free_capacity_gb', 'total_capacity_gb', + 'driver_handles_share_servers', + 'reserved_percentage', 'vendor_name', 'storage_protocol', + ] + for key in expected_keys: + self.assertIn(key, result) + self.assertEqual('HDFS', result['storage_protocol']) + self._driver._get_available_capacity.assert_called_once_with() + + def test__hdfs_local_execute(self): + cmd = 'testcmd' + self.mock_object(utils, 'execute', mock.Mock(return_value=True)) + self._driver._hdfs_local_execute(cmd) + utils.execute.assert_called_once_with(cmd, run_as_root=True) + + def test__hdfs_remote_execute(self): + self._driver._run_ssh = mock.Mock(return_value=True) + cmd = 'testcmd' + self._driver._hdfs_remote_execute(cmd, check_exit_code=True) + self._driver._run_ssh.assert_called_once_with( + self.local_ip, tuple([cmd]), True) + + def test__run_ssh(self): + ssh_output = 'fake_ssh_output' + cmd_list = ['fake', 'cmd'] + ssh = mock.Mock() + ssh.get_transport = mock.Mock() + ssh.get_transport().is_active = mock.Mock(return_value=True) + ssh_pool = mock.Mock() + ssh_pool.create = mock.Mock(return_value=ssh) + self.mock_object(utils, 'SSHPool', mock.Mock(return_value=ssh_pool)) + self.mock_object(processutils, 'ssh_execute', + mock.Mock(return_value=ssh_output)) + result = self._driver._run_ssh(self.local_ip, cmd_list) + utils.SSHPool.assert_called_once_with( + self._driver.configuration.hdfs_namenode_ip, + self._driver.configuration.hdfs_ssh_port, + self._driver.configuration.ssh_conn_timeout, + self._driver.configuration.hdfs_ssh_name, + password=self._driver.configuration.hdfs_ssh_pw, + privatekey=self._driver.configuration.hdfs_ssh_private_key, + min_size=self._driver.configuration.ssh_min_pool_conn, + max_size=self._driver.configuration.ssh_max_pool_conn) + ssh_pool.create.assert_called_once_with() + ssh.get_transport().is_active.assert_called_once_with() + processutils.ssh_execute.assert_called_once_with( + ssh, 'fake cmd', check_exit_code=False) + self.assertEqual(ssh_output, result) + + def test__run_ssh_exception(self): + cmd_list = ['fake', 'cmd'] + ssh = mock.Mock() + ssh.get_transport = mock.Mock() + ssh.get_transport().is_active = mock.Mock(return_value=True) + ssh_pool = mock.Mock() + ssh_pool.create = mock.Mock(return_value=ssh) + self.mock_object(utils, 'SSHPool', mock.Mock(return_value=ssh_pool)) + self.mock_object(processutils, 'ssh_execute', + mock.Mock(side_effect=Exception)) + self.assertRaises(exception.HDFSException, + self._driver._run_ssh, + self.local_ip, + cmd_list) + utils.SSHPool.assert_called_once_with( + self._driver.configuration.hdfs_namenode_ip, + self._driver.configuration.hdfs_ssh_port, + self._driver.configuration.ssh_conn_timeout, + self._driver.configuration.hdfs_ssh_name, + password=self._driver.configuration.hdfs_ssh_pw, + privatekey=self._driver.configuration.hdfs_ssh_private_key, + min_size=self._driver.configuration.ssh_min_pool_conn, + max_size=self._driver.configuration.ssh_max_pool_conn) + ssh_pool.create.assert_called_once_with() + ssh.get_transport().is_active.assert_called_once_with() + processutils.ssh_execute.assert_called_once_with( + ssh, 'fake cmd', check_exit_code=False)