manila/manila/tests/share/drivers/hdfs/test_hdfs_native.py

424 lines
19 KiB
Python

# Copyright 2015 Intel, Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for HDFS native protocol driver module."""
import socket
import mock
from oslo_concurrency import processutils
from oslo_config import cfg
import six
from manila import context
from manila import exception
import manila.share.configuration as config
import manila.share.drivers.hdfs.hdfs_native as hdfs_native
from manila import test
from manila.tests import fake_share
from manila import utils
CONF = cfg.CONF
class HDFSNativeShareDriverTestCase(test.TestCase):
"""Tests HDFSNativeShareDriver."""
def setUp(self):
super(HDFSNativeShareDriverTestCase, self).setUp()
self._context = context.get_admin_context()
self._hdfs_execute = mock.Mock(return_value=('', ''))
self.local_ip = '192.168.1.1'
CONF.set_default('driver_handles_share_servers', False)
CONF.set_default('hdfs_namenode_ip', self.local_ip)
CONF.set_default('hdfs_ssh_name', 'fake_sshname')
CONF.set_default('hdfs_ssh_pw', 'fake_sshpw')
CONF.set_default('hdfs_ssh_private_key', 'fake_sshkey')
self.fake_conf = config.Configuration(None)
self._driver = hdfs_native.HDFSNativeShareDriver(
execute=self._hdfs_execute,
configuration=self.fake_conf)
self._driver._hdfs_bin = 'fake_hdfs_bin'
self.share = fake_share.fake_share(share_proto='HDFS')
self.snapshot = fake_share.fake_snapshot(share_proto='HDFS')
self.access = fake_share.fake_access(access_type='user')
self.fakesharepath = 'hdfs://1.2.3.4:5/share-0'
self.fakesnapshotpath = '/share-0/.snapshot/snapshot-0'
socket.gethostname = mock.Mock(return_value='testserver')
socket.gethostbyname_ex = mock.Mock(return_value=(
'localhost',
['localhost.localdomain', 'testserver'],
['127.0.0.1', self.local_ip]))
def test_do_setup(self):
self._driver._get_hdfs_bin_path = mock.Mock()
self._driver.do_setup(self._context)
self._driver._get_hdfs_bin_path.assert_called_once_with()
def test_create_share(self):
self._driver._create_share = mock.Mock()
self._driver._get_share_path = mock.Mock(
return_value=self.fakesharepath)
result = self._driver.create_share(self._context, self.share,
share_server=None)
self._driver._create_share.assert_called_once_with(self.share)
self._driver._get_share_path.assert_called_once_with(self.share)
self.assertEqual(self.fakesharepath, result)
def test_create_share_unsupported_proto(self):
self._driver._get_share_path = mock.Mock()
self.assertRaises(exception.HDFSException,
self._driver.create_share,
self._context,
fake_share.fake_share(),
share_server=None)
self.assertFalse(self._driver._get_share_path.called)
def test__create_share(self):
share_dir = '/' + self.share['name']
sizestr = six.text_type(self.share['size']) + 'g'
self._driver._hdfs_execute = mock.Mock(return_value=True)
self._driver._create_share(self.share)
self._driver._hdfs_execute.assert_any_call(
'fake_hdfs_bin', 'dfs', '-mkdir', share_dir)
self._driver._hdfs_execute.assert_any_call(
'fake_hdfs_bin', 'dfsadmin', '-setSpaceQuota', sizestr, share_dir)
self._driver._hdfs_execute.assert_any_call(
'fake_hdfs_bin', 'dfsadmin', '-allowSnapshot', share_dir)
def test__create_share_exception(self):
share_dir = '/' + self.share['name']
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver._create_share, self.share)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-mkdir', share_dir)
def test_create_share_from_snapshot(self):
self._driver._hdfs_execute = mock.Mock(return_value=True)
self._driver._create_share = mock.Mock(return_value=True)
self._driver._get_share_path = mock.Mock(return_value=self.
fakesharepath)
self._driver._get_snapshot_path = mock.Mock(return_value=self.
fakesnapshotpath)
result = self._driver.create_share_from_snapshot(self._context,
self.share,
self.snapshot,
share_server=None)
self._driver._create_share.assert_called_once_with(self.share)
self._driver._get_snapshot_path.assert_called_once_with(
self.snapshot)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-cp', '-r',
self.fakesnapshotpath, '/' + self.share['name'])
self._driver._get_share_path.assert_called_once_with(self.share)
self.assertEqual(self.fakesharepath, result)
def test_create_share_from_snapshot_exception(self):
self._driver._create_share = mock.Mock(return_value=True)
self._driver._get_snapshot_path = mock.Mock(return_value=self.
fakesnapshotpath)
self._driver._get_share_path = mock.Mock(return_value=self.
fakesharepath)
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver.create_share_from_snapshot,
self._context, self.share,
self.snapshot, share_server=None)
self._driver._create_share.assert_called_once_with(self.share)
self._driver._get_snapshot_path.assert_called_once_with(self.snapshot)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-cp', '-r',
self.fakesnapshotpath, '/' + self.share['name'])
self.assertFalse(self._driver._get_share_path.called)
def test_create_snapshot(self):
self._driver._hdfs_execute = mock.Mock(return_value=True)
self._driver.create_snapshot(self._context, self.snapshot,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-createSnapshot',
'/' + self.snapshot['share_name'], self.snapshot['name'])
def test_create_snapshot_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver.create_snapshot, self._context,
self.snapshot, share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-createSnapshot',
'/' + self.snapshot['share_name'], self.snapshot['name'])
def test_delete_share(self):
self._driver._hdfs_execute = mock.Mock(return_value=True)
self._driver.delete_share(self._context,
self.share,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-rm', '-r',
'/' + self.share['name'])
def test_delete_share_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver.delete_share,
self._context,
self.share,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-rm', '-r',
'/' + self.share['name'])
def test_delete_snapshot(self):
self._driver._hdfs_execute = mock.Mock(return_value=True)
self._driver.delete_snapshot(self._context,
self.snapshot,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-deleteSnapshot',
'/' + self.snapshot['share_name'], self.snapshot['name'])
def test_delete_snapshot_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver.delete_snapshot,
self._context,
self.snapshot,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfs', '-deleteSnapshot',
'/' + self.snapshot['share_name'], self.snapshot['name'])
def test_allow_access(self):
self._driver._hdfs_execute = mock.Mock(
return_value=['', ''])
share_dir = '/' + self.share['name']
user_access = ':'.join([self.access['access_type'],
self.access['access_to'],
'rwx'])
cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-m', '-R',
user_access, share_dir]
self._driver.allow_access(self._context, self.share, self.access,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
*cmd, check_exit_code=True)
def test_allow_access_invalid_access_type(self):
self.assertRaises(exception.InvalidShareAccess,
self._driver.allow_access,
self._context,
self.share,
fake_share.fake_access(
access_type='invalid_access_type'),
share_server=None)
def test_allow_access_invalid_access_level(self):
self.assertRaises(exception.InvalidShareAccess,
self._driver.allow_access,
self._context,
self.share,
fake_share.fake_access(
access_level='invalid_access_level'),
share_server=None)
def test_allow_access_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
share_dir = '/' + self.share['name']
user_access = ':'.join([self.access['access_type'],
self.access['access_to'],
'rwx'])
cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-m', '-R',
user_access, share_dir]
self.assertRaises(exception.HDFSException,
self._driver.allow_access,
self._context,
self.share,
self.access,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
*cmd, check_exit_code=True)
def test_deny_access(self):
self._driver._hdfs_execute = mock.Mock(return_value=['', ''])
share_dir = '/' + self.share['name']
access_name = ':'.join([self.access['access_type'],
self.access['access_to']])
cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-x', '-R',
access_name, share_dir]
self._driver.deny_access(self._context,
self.share,
self.access,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
*cmd, check_exit_code=True)
def test_deny_access_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
share_dir = '/' + self.share['name']
access_name = ':'.join([self.access['access_type'],
self.access['access_to']])
cmd = ['fake_hdfs_bin', 'dfs', '-setfacl', '-x', '-R',
access_name, share_dir]
self.assertRaises(exception.HDFSException,
self._driver.deny_access,
self._context,
self.share,
self.access,
share_server=None)
self._driver._hdfs_execute.assert_called_once_with(
*cmd, check_exit_code=True)
def test__check_hdfs_state_healthy(self):
fake_out = "fakeinfo\n...Status: HEALTHY"
self._driver._hdfs_execute = mock.Mock(return_value=(fake_out, ''))
result = self._driver._check_hdfs_state()
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'fsck', '/')
self.assertEqual(True, result)
def test__check_hdfs_state_down(self):
fake_out = "fakeinfo\n...Status: DOWN"
self._driver._hdfs_execute = mock.Mock(return_value=(fake_out, ''))
result = self._driver._check_hdfs_state()
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'fsck', '/')
self.assertEqual(False, result)
def test__check_hdfs_state_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver._check_hdfs_state)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'fsck', '/')
def test__get_available_capacity(self):
fake_out = 'Configured Capacity: 2.4\n' + \
'Total Capacity: 2\n' + \
'DFS free: 1'
self._driver._hdfs_execute = mock.Mock(return_value=(fake_out, ''))
total, free = self._driver._get_available_capacity()
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfsadmin', '-report')
self.assertEqual(2, total)
self.assertEqual(1, free)
def test__get_available_capacity_exception(self):
self._driver._hdfs_execute = mock.Mock(
side_effect=exception.ProcessExecutionError)
self.assertRaises(exception.HDFSException,
self._driver._get_available_capacity)
self._driver._hdfs_execute.assert_called_once_with(
'fake_hdfs_bin', 'dfsadmin', '-report')
def test_get_share_stats_refresh_false(self):
self._driver._stats = {'fake_key': 'fake_value'}
result = self._driver.get_share_stats(False)
self.assertEqual(self._driver._stats, result)
def test_get_share_stats_refresh_true(self):
self._driver._get_available_capacity = mock.Mock(
return_value=(11111.0, 12345.0))
result = self._driver.get_share_stats(True)
expected_keys = [
'QoS_support', 'driver_version', 'share_backend_name',
'free_capacity_gb', 'total_capacity_gb',
'driver_handles_share_servers',
'reserved_percentage', 'vendor_name', 'storage_protocol',
]
for key in expected_keys:
self.assertIn(key, result)
self.assertEqual('HDFS', result['storage_protocol'])
self._driver._get_available_capacity.assert_called_once_with()
def test__hdfs_local_execute(self):
cmd = 'testcmd'
self.mock_object(utils, 'execute', mock.Mock(return_value=True))
self._driver._hdfs_local_execute(cmd)
utils.execute.assert_called_once_with(cmd, run_as_root=False)
def test__hdfs_remote_execute(self):
self._driver._run_ssh = mock.Mock(return_value=True)
cmd = 'testcmd'
self._driver._hdfs_remote_execute(cmd, check_exit_code=True)
self._driver._run_ssh.assert_called_once_with(
self.local_ip, tuple([cmd]), True)
def test__run_ssh(self):
ssh_output = 'fake_ssh_output'
cmd_list = ['fake', 'cmd']
ssh = mock.Mock()
ssh.get_transport = mock.Mock()
ssh.get_transport().is_active = mock.Mock(return_value=True)
ssh_pool = mock.Mock()
ssh_pool.create = mock.Mock(return_value=ssh)
self.mock_object(utils, 'SSHPool', mock.Mock(return_value=ssh_pool))
self.mock_object(processutils, 'ssh_execute',
mock.Mock(return_value=ssh_output))
result = self._driver._run_ssh(self.local_ip, cmd_list)
utils.SSHPool.assert_called_once_with(
self._driver.configuration.hdfs_namenode_ip,
self._driver.configuration.hdfs_ssh_port,
self._driver.configuration.ssh_conn_timeout,
self._driver.configuration.hdfs_ssh_name,
password=self._driver.configuration.hdfs_ssh_pw,
privatekey=self._driver.configuration.hdfs_ssh_private_key,
min_size=self._driver.configuration.ssh_min_pool_conn,
max_size=self._driver.configuration.ssh_max_pool_conn)
ssh_pool.create.assert_called_once_with()
ssh.get_transport().is_active.assert_called_once_with()
processutils.ssh_execute.assert_called_once_with(
ssh, 'fake cmd', check_exit_code=False)
self.assertEqual(ssh_output, result)
def test__run_ssh_exception(self):
cmd_list = ['fake', 'cmd']
ssh = mock.Mock()
ssh.get_transport = mock.Mock()
ssh.get_transport().is_active = mock.Mock(return_value=True)
ssh_pool = mock.Mock()
ssh_pool.create = mock.Mock(return_value=ssh)
self.mock_object(utils, 'SSHPool', mock.Mock(return_value=ssh_pool))
self.mock_object(processutils, 'ssh_execute',
mock.Mock(side_effect=Exception))
self.assertRaises(exception.HDFSException,
self._driver._run_ssh,
self.local_ip,
cmd_list)
utils.SSHPool.assert_called_once_with(
self._driver.configuration.hdfs_namenode_ip,
self._driver.configuration.hdfs_ssh_port,
self._driver.configuration.ssh_conn_timeout,
self._driver.configuration.hdfs_ssh_name,
password=self._driver.configuration.hdfs_ssh_pw,
privatekey=self._driver.configuration.hdfs_ssh_private_key,
min_size=self._driver.configuration.ssh_min_pool_conn,
max_size=self._driver.configuration.ssh_max_pool_conn)
ssh_pool.create.assert_called_once_with()
ssh.get_transport().is_active.assert_called_once_with()
processutils.ssh_execute.assert_called_once_with(
ssh, 'fake cmd', check_exit_code=False)