manila/manila/tests/share/drivers/test_generic.py

1841 lines
78 KiB
Python

# Copyright (c) 2014 NetApp, Inc.
# Copyright (c) 2015 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for the Generic driver module."""
import os
import ddt
import mock
from oslo_concurrency import processutils
from oslo_config import cfg
from manila.common import constants as const
from manila import compute
from manila import context
from manila import exception
import manila.share.configuration
from manila.share.drivers import generic
from manila.share import share_types
from manila import test
from manila.tests import fake_compute
from manila.tests import fake_service_instance
from manila.tests import fake_share
from manila.tests import fake_utils
from manila.tests import fake_volume
from manila import utils
from manila import volume
CONF = cfg.CONF
def get_fake_manage_share():
return {
'id': 'fake',
'share_proto': 'NFS',
'share_type_id': 'fake',
'export_locations': [
{'path': '10.0.0.1:/foo/fake/path'},
{'path': '11.0.0.1:/bar/fake/path'},
],
}
@ddt.ddt
class GenericShareDriverTestCase(test.TestCase):
"""Tests GenericShareDriver."""
def setUp(self):
super(GenericShareDriverTestCase, self).setUp()
self._context = context.get_admin_context()
self._execute = mock.Mock(return_value=('', ''))
self._helper_cifs = mock.Mock()
self._helper_nfs = mock.Mock()
CONF.set_default('driver_handles_share_servers', True)
self.fake_conf = manila.share.configuration.Configuration(None)
self.fake_private_storage = mock.Mock()
self.mock_object(self.fake_private_storage, 'get',
mock.Mock(return_value=None))
with mock.patch.object(
generic.service_instance,
'ServiceInstanceManager',
fake_service_instance.FakeServiceInstanceManager):
self._driver = generic.GenericShareDriver(
private_storage=self.fake_private_storage,
execute=self._execute, configuration=self.fake_conf)
self._driver.service_tenant_id = 'service tenant id'
self._driver.service_network_id = 'service network id'
self._driver.compute_api = fake_compute.API()
self._driver.volume_api = fake_volume.API()
self._driver.share_networks_locks = {}
self._driver.get_service_instance = mock.Mock()
self._driver.share_networks_servers = {}
self._driver.admin_context = self._context
self.fake_sn = {"id": "fake_sn_id"}
self.fake_net_info = {
"id": "fake_srv_id",
"share_network_id": "fake_sn_id"
}
fsim = fake_service_instance.FakeServiceInstanceManager()
sim = mock.Mock(return_value=fsim)
self._driver.instance_manager = sim
self._driver.service_instance_manager = sim
self.fake_server = sim._create_service_instance(
context="fake", instance_name="fake",
share_network_id=self.fake_sn["id"], old_server_ip="fake")
self.mock_object(utils, 'synchronized',
mock.Mock(return_value=lambda f: f))
self.mock_object(generic.os.path, 'exists',
mock.Mock(return_value=True))
self._driver._helpers = {
'CIFS': self._helper_cifs,
'NFS': self._helper_nfs,
}
self.share = fake_share.fake_share(share_proto='NFS')
self.server = {
'instance_id': 'fake_instance_id',
'ip': 'fake_ip',
'username': 'fake_username',
'password': 'fake_password',
'pk_path': 'fake_pk_path',
'backend_details': {
'ip': '1.2.3.4',
'instance_id': 'fake'
}
}
self.access = fake_share.fake_access()
self.snapshot = fake_share.fake_snapshot()
def test_do_setup(self):
self.mock_object(volume, 'API')
self.mock_object(compute, 'API')
self.mock_object(self._driver, '_setup_helpers')
self._driver.do_setup(self._context)
volume.API.assert_called_once_with()
compute.API.assert_called_once_with()
self._driver._setup_helpers.assert_called_once_with()
def test_setup_helpers(self):
self._driver._helpers = {}
CONF.set_default('share_helpers', ['NFS=fakenfs'])
self.mock_object(generic.importutils, 'import_class',
mock.Mock(return_value=self._helper_nfs))
self._driver._setup_helpers()
generic.importutils.import_class.assert_has_calls([
mock.call('fakenfs')
])
self._helper_nfs.assert_called_once_with(
self._execute,
self._driver._ssh_exec,
self.fake_conf
)
self.assertEqual(len(self._driver._helpers), 1)
def test_setup_helpers_no_helpers(self):
self._driver._helpers = {}
CONF.set_default('share_helpers', [])
self.assertRaises(exception.ManilaException,
self._driver._setup_helpers)
def test_create_share(self):
volume = 'fake_volume'
volume2 = 'fake_volume2'
self._helper_nfs.create_export.return_value = 'fakelocation'
self.mock_object(self._driver, '_allocate_container',
mock.Mock(return_value=volume))
self.mock_object(self._driver, '_attach_volume',
mock.Mock(return_value=volume2))
self.mock_object(self._driver, '_format_device')
self.mock_object(self._driver, '_mount_device')
result = self._driver.create_share(
self._context, self.share, share_server=self.server)
self.assertEqual(result, 'fakelocation')
self._driver._allocate_container.assert_called_once_with(
self._driver.admin_context, self.share)
self._driver._attach_volume.assert_called_once_with(
self._driver.admin_context, self.share,
self.server['backend_details']['instance_id'],
volume)
self._driver._format_device.assert_called_once_with(
self.server['backend_details'], volume2)
self._driver._mount_device.assert_called_once_with(
self.share, self.server['backend_details'], volume2)
def test_create_share_exception(self):
share = fake_share.fake_share(share_network_id=None)
self.assertRaises(exception.ManilaException, self._driver.create_share,
self._context, share)
def test_create_share_invalid_helper(self):
self._driver._helpers = {'CIFS': self._helper_cifs}
self.assertRaises(exception.InvalidShare, self._driver.create_share,
self._context, self.share, share_server=self.server)
def test_format_device(self):
volume = {'mountpoint': 'fake_mount_point'}
self.mock_object(self._driver, '_ssh_exec',
mock.Mock(return_value=('', '')))
self._driver._format_device(self.server, volume)
self._driver._ssh_exec.assert_called_once_with(
self.server,
['sudo', 'mkfs.%s' % self.fake_conf.share_volume_fstype,
volume['mountpoint']])
def test_mount_device_not_present(self):
server = {'instance_id': 'fake_server_id'}
mount_path = self._driver._get_mount_path(self.share)
volume = {'mountpoint': 'fake_mount_point'}
self.mock_object(self._driver, '_is_device_mounted',
mock.Mock(return_value=False))
self.mock_object(self._driver, '_sync_mount_temp_and_perm_files')
self.mock_object(self._driver, '_ssh_exec',
mock.Mock(return_value=('', '')))
self._driver._mount_device(self.share, server, volume)
self._driver._is_device_mounted.assert_called_once_with(
mount_path, server, volume)
self._driver._sync_mount_temp_and_perm_files.assert_called_once_with(
server)
self._driver._ssh_exec.assert_called_once_with(
server,
['sudo mkdir -p', mount_path,
'&&', 'sudo mount', volume['mountpoint'], mount_path,
'&& sudo chmod 777', mount_path],
)
def test_mount_device_present(self):
mount_path = '/fake/mount/path'
volume = {'mountpoint': 'fake_mount_point'}
self.mock_object(self._driver, '_is_device_mounted',
mock.Mock(return_value=True))
self.mock_object(self._driver, '_get_mount_path',
mock.Mock(return_value=mount_path))
self.mock_object(generic.LOG, 'warning')
self._driver._mount_device(self.share, self.server, volume)
self._driver._get_mount_path.assert_called_once_with(self.share)
self._driver._is_device_mounted.assert_called_once_with(
mount_path, self.server, volume)
generic.LOG.warning.assert_called_once_with(mock.ANY, mock.ANY)
def test_mount_device_exception_raised(self):
volume = {'mountpoint': 'fake_mount_point'}
self.mock_object(
self._driver, '_is_device_mounted',
mock.Mock(side_effect=exception.ProcessExecutionError))
self.assertRaises(
exception.ShareBackendException,
self._driver._mount_device,
self.share,
self.server,
volume,
)
self._driver._is_device_mounted.assert_called_once_with(
self._driver._get_mount_path(self.share), self.server, volume)
def test_unmount_device_present(self):
mount_path = '/fake/mount/path'
self.mock_object(self._driver, '_is_device_mounted',
mock.Mock(return_value=True))
self.mock_object(self._driver, '_sync_mount_temp_and_perm_files')
self.mock_object(self._driver, '_get_mount_path',
mock.Mock(return_value=mount_path))
self.mock_object(self._driver, '_ssh_exec',
mock.Mock(return_value=('', '')))
self._driver._unmount_device(self.share, self.server)
self._driver._get_mount_path.assert_called_once_with(self.share)
self._driver._is_device_mounted.assert_called_once_with(
mount_path, self.server)
self._driver._sync_mount_temp_and_perm_files.assert_called_once_with(
self.server)
self._driver._ssh_exec.assert_called_once_with(
self.server,
['sudo umount', mount_path, '&& sudo rmdir', mount_path],
)
def test_unmount_device_not_present(self):
mount_path = '/fake/mount/path'
self.mock_object(self._driver, '_is_device_mounted',
mock.Mock(return_value=False))
self.mock_object(self._driver, '_get_mount_path',
mock.Mock(return_value=mount_path))
self.mock_object(generic.LOG, 'warning')
self._driver._unmount_device(self.share, self.server)
self._driver._get_mount_path.assert_called_once_with(self.share)
self._driver._is_device_mounted.assert_called_once_with(
mount_path, self.server)
generic.LOG.warning.assert_called_once_with(mock.ANY, mock.ANY)
def test_is_device_mounted_true(self):
volume = {'mountpoint': 'fake_mount_point', 'id': 'fake_id'}
mount_path = '/fake/mount/path'
mounts = "%(dev)s on %(path)s" % {'dev': volume['mountpoint'],
'path': mount_path}
self.mock_object(self._driver, '_ssh_exec',
mock.Mock(return_value=(mounts, '')))
result = self._driver._is_device_mounted(
mount_path, self.server, volume)
self._driver._ssh_exec.assert_called_once_with(
self.server, ['sudo', 'mount'])
self.assertEqual(result, True)
def test_is_device_mounted_true_no_volume_provided(self):
mount_path = '/fake/mount/path'
mounts = "/fake/dev/path on %(path)s type fake" % {'path': mount_path}
self.mock_object(self._driver, '_ssh_exec',
mock.Mock(return_value=(mounts, '')))
result = self._driver._is_device_mounted(mount_path, self.server)
self._driver._ssh_exec.assert_called_once_with(
self.server, ['sudo', 'mount'])
self.assertEqual(result, True)
def test_is_device_mounted_false(self):
mount_path = '/fake/mount/path'
volume = {'mountpoint': 'fake_mount_point', 'id': 'fake_id'}
mounts = "%(dev)s on %(path)s" % {'dev': '/fake',
'path': mount_path}
self.mock_object(self._driver, '_ssh_exec',
mock.Mock(return_value=(mounts, '')))
result = self._driver._is_device_mounted(
mount_path, self.server, volume)
self._driver._ssh_exec.assert_called_once_with(
self.server, ['sudo', 'mount'])
self.assertEqual(result, False)
def test_is_device_mounted_false_no_volume_provided(self):
mount_path = '/fake/mount/path'
mounts = "%(path)s" % {'path': 'fake'}
self.mock_object(self._driver, '_ssh_exec',
mock.Mock(return_value=(mounts, '')))
self.mock_object(self._driver, '_get_mount_path',
mock.Mock(return_value=mount_path))
result = self._driver._is_device_mounted(mount_path, self.server)
self._driver._ssh_exec.assert_called_once_with(
self.server, ['sudo', 'mount'])
self.assertEqual(result, False)
def test_sync_mount_temp_and_perm_files(self):
self.mock_object(self._driver, '_ssh_exec')
self._driver._sync_mount_temp_and_perm_files(self.server)
self._driver._ssh_exec.has_calls(
mock.call(
self.server,
['sudo', 'cp', const.MOUNT_FILE_TEMP, const.MOUNT_FILE]),
mock.call(self.server, ['sudo', 'mount', '-a']))
def test_sync_mount_temp_and_perm_files_raise_error_on_copy(self):
self.mock_object(
self._driver, '_ssh_exec',
mock.Mock(side_effect=exception.ProcessExecutionError))
self.assertRaises(
exception.ShareBackendException,
self._driver._sync_mount_temp_and_perm_files,
self.server
)
self._driver._ssh_exec.assert_called_once_with(
self.server,
['sudo', 'cp', const.MOUNT_FILE_TEMP, const.MOUNT_FILE])
def test_sync_mount_temp_and_perm_files_raise_error_on_mount(self):
def raise_error_on_mount(*args, **kwargs):
if args[1][1] == 'cp':
raise exception.ProcessExecutionError()
self.mock_object(self._driver, '_ssh_exec',
mock.Mock(side_effect=raise_error_on_mount))
self.assertRaises(
exception.ShareBackendException,
self._driver._sync_mount_temp_and_perm_files,
self.server
)
self._driver._ssh_exec.has_calls(
mock.call(
self.server,
['sudo', 'cp', const.MOUNT_FILE_TEMP, const.MOUNT_FILE]),
mock.call(self.server, ['sudo', 'mount', '-a']))
def test_get_mount_path(self):
result = self._driver._get_mount_path(self.share)
self.assertEqual(result, os.path.join(CONF.share_mount_path,
self.share['name']))
def test_attach_volume_not_attached(self):
available_volume = fake_volume.FakeVolume()
attached_volume = fake_volume.FakeVolume(status='in-use')
self.mock_object(self._driver.compute_api, 'instance_volume_attach')
self.mock_object(self._driver.volume_api, 'get',
mock.Mock(return_value=attached_volume))
result = self._driver._attach_volume(self._context, self.share,
'fake_inst_id', available_volume)
self._driver.compute_api.instance_volume_attach.\
assert_called_once_with(self._context, 'fake_inst_id',
available_volume['id'])
self._driver.volume_api.get.assert_called_once_with(
self._context, attached_volume['id'])
self.assertEqual(result, attached_volume)
def test_attach_volume_attached_correct(self):
fake_server = fake_compute.FakeServer()
attached_volume = fake_volume.FakeVolume(status='in-use')
self.mock_object(self._driver.compute_api, 'instance_volumes_list',
mock.Mock(return_value=[attached_volume]))
result = self._driver._attach_volume(self._context, self.share,
fake_server, attached_volume)
self.assertEqual(result, attached_volume)
def test_attach_volume_attached_incorrect(self):
fake_server = fake_compute.FakeServer()
attached_volume = fake_volume.FakeVolume(status='in-use')
anoter_volume = fake_volume.FakeVolume(id='fake_id2', status='in-use')
self.mock_object(self._driver.compute_api, 'instance_volumes_list',
mock.Mock(return_value=[anoter_volume]))
self.assertRaises(exception.ManilaException,
self._driver._attach_volume, self._context,
self.share, fake_server, attached_volume)
def test_attach_volume_failed_attach(self):
fake_server = fake_compute.FakeServer()
available_volume = fake_volume.FakeVolume()
self.mock_object(self._driver.compute_api, 'instance_volume_attach',
mock.Mock(side_effect=exception.ManilaException))
self.assertRaises(exception.ManilaException,
self._driver._attach_volume,
self._context, self.share, fake_server,
available_volume)
def test_attach_volume_error(self):
fake_server = fake_compute.FakeServer()
available_volume = fake_volume.FakeVolume()
error_volume = fake_volume.FakeVolume(status='error')
self.mock_object(self._driver.compute_api, 'instance_volume_attach')
self.mock_object(self._driver.volume_api, 'get',
mock.Mock(return_value=error_volume))
self.assertRaises(exception.ManilaException,
self._driver._attach_volume,
self._context, self.share,
fake_server, available_volume)
def test_get_volume(self):
volume = fake_volume.FakeVolume(
name=CONF.volume_name_template % self.share['id'])
self.mock_object(self._driver.volume_api, 'get_all',
mock.Mock(return_value=[volume]))
result = self._driver._get_volume(self._context, self.share['id'])
self.assertEqual(result, volume)
self._driver.volume_api.get_all.assert_called_once_with(
self._context, {'all_tenants': True, 'name': volume['name']})
def test_get_volume_with_private_data(self):
volume = fake_volume.FakeVolume()
self.mock_object(self._driver.volume_api, 'get',
mock.Mock(return_value=volume))
self.mock_object(self.fake_private_storage, 'get',
mock.Mock(return_value=volume['id']))
result = self._driver._get_volume(self._context, self.share['id'])
self.assertEqual(result, volume)
self._driver.volume_api.get.assert_called_once_with(
self._context, volume['id'])
self.fake_private_storage.get.assert_called_once_with(
self.share['id'], 'volume_id'
)
def test_get_volume_none(self):
vol_name = (
self._driver.configuration.volume_name_template % self.share['id'])
self.mock_object(self._driver.volume_api, 'get_all',
mock.Mock(return_value=[]))
result = self._driver._get_volume(self._context, self.share['id'])
self.assertEqual(result, None)
self._driver.volume_api.get_all.assert_called_once_with(
self._context, {'all_tenants': True, 'name': vol_name})
def test_get_volume_error(self):
volume = fake_volume.FakeVolume(
name=CONF.volume_name_template % self.share['id'])
self.mock_object(self._driver.volume_api, 'get_all',
mock.Mock(return_value=[volume, volume]))
self.assertRaises(exception.ManilaException,
self._driver._get_volume,
self._context, self.share['id'])
self._driver.volume_api.get_all.assert_called_once_with(
self._context, {'all_tenants': True, 'name': volume['name']})
def test_get_volume_snapshot(self):
volume_snapshot = fake_volume.FakeVolumeSnapshot(
name=self._driver.configuration.volume_snapshot_name_template %
self.snapshot['id'])
self.mock_object(self._driver.volume_api, 'get_all_snapshots',
mock.Mock(return_value=[volume_snapshot]))
result = self._driver._get_volume_snapshot(self._context,
self.snapshot['id'])
self.assertEqual(result, volume_snapshot)
self._driver.volume_api.get_all_snapshots.assert_called_once_with(
self._context, {'name': volume_snapshot['name']})
def test_get_volume_snapshot_with_private_data(self):
volume_snapshot = fake_volume.FakeVolumeSnapshot()
self.mock_object(self._driver.volume_api, 'get_snapshot',
mock.Mock(return_value=volume_snapshot))
self.mock_object(self.fake_private_storage, 'get',
mock.Mock(return_value=volume_snapshot['id']))
result = self._driver._get_volume_snapshot(self._context,
self.snapshot['id'])
self.assertEqual(result, volume_snapshot)
self._driver.volume_api.get_snapshot.assert_called_once_with(
self._context, volume_snapshot['id'])
self.fake_private_storage.get.assert_called_once_with(
self.snapshot['id'], 'volume_snapshot_id'
)
def test_get_volume_snapshot_none(self):
snap_name = (
self._driver.configuration.volume_snapshot_name_template %
self.share['id'])
self.mock_object(self._driver.volume_api, 'get_all_snapshots',
mock.Mock(return_value=[]))
result = self._driver._get_volume_snapshot(self._context,
self.share['id'])
self.assertEqual(result, None)
self._driver.volume_api.get_all_snapshots.assert_called_once_with(
self._context, {'name': snap_name})
def test_get_volume_snapshot_error(self):
volume_snapshot = fake_volume.FakeVolumeSnapshot(
name=self._driver.configuration.volume_snapshot_name_template %
self.snapshot['id'])
self.mock_object(
self._driver.volume_api, 'get_all_snapshots',
mock.Mock(return_value=[volume_snapshot, volume_snapshot]))
self.assertRaises(
exception.ManilaException, self._driver._get_volume_snapshot,
self._context, self.snapshot['id'])
self._driver.volume_api.get_all_snapshots.assert_called_once_with(
self._context, {'name': volume_snapshot['name']})
def test_detach_volume(self):
available_volume = fake_volume.FakeVolume()
attached_volume = fake_volume.FakeVolume(status='in-use')
self.mock_object(self._driver, '_get_volume',
mock.Mock(return_value=attached_volume))
self.mock_object(self._driver.compute_api, 'instance_volumes_list',
mock.Mock(return_value=[attached_volume]))
self.mock_object(self._driver.compute_api, 'instance_volume_detach')
self.mock_object(self._driver.volume_api, 'get',
mock.Mock(return_value=available_volume))
self._driver._detach_volume(self._context, self.share,
self.server['backend_details'])
self._driver.compute_api.instance_volume_detach.\
assert_called_once_with(
self._context,
self.server['backend_details']['instance_id'],
available_volume['id'])
self._driver.volume_api.get.assert_called_once_with(
self._context, available_volume['id'])
def test_detach_volume_detached(self):
available_volume = fake_volume.FakeVolume()
attached_volume = fake_volume.FakeVolume(status='in-use')
self.mock_object(self._driver, '_get_volume',
mock.Mock(return_value=attached_volume))
self.mock_object(self._driver.compute_api, 'instance_volumes_list',
mock.Mock(return_value=[]))
self.mock_object(self._driver.volume_api, 'get',
mock.Mock(return_value=available_volume))
self.mock_object(self._driver.compute_api, 'instance_volume_detach')
self._driver._detach_volume(self._context, self.share,
self.server['backend_details'])
self.assertFalse(self._driver.volume_api.get.called)
self.assertFalse(
self._driver.compute_api.instance_volume_detach.called)
def test_allocate_container(self):
fake_vol = fake_volume.FakeVolume()
self.fake_conf.cinder_volume_type = 'fake_volume_type'
self.mock_object(self._driver.volume_api, 'create',
mock.Mock(return_value=fake_vol))
result = self._driver._allocate_container(self._context, self.share)
self.assertEqual(result, fake_vol)
self._driver.volume_api.create.assert_called_once_with(
self._context,
self.share['size'],
CONF.volume_name_template % self.share['id'],
'',
snapshot=None,
volume_type='fake_volume_type')
def test_allocate_container_with_snaphot(self):
fake_vol = fake_volume.FakeVolume()
fake_vol_snap = fake_volume.FakeVolumeSnapshot()
self.mock_object(self._driver, '_get_volume_snapshot',
mock.Mock(return_value=fake_vol_snap))
self.mock_object(self._driver.volume_api, 'create',
mock.Mock(return_value=fake_vol))
result = self._driver._allocate_container(self._context,
self.share,
self.snapshot)
self.assertEqual(result, fake_vol)
self._driver.volume_api.create.assert_called_once_with(
self._context,
self.share['size'],
CONF.volume_name_template % self.share['id'],
'',
snapshot=fake_vol_snap,
volume_type=None)
def test_allocate_container_error(self):
fake_vol = fake_volume.FakeVolume(status='error')
self.mock_object(self._driver.volume_api, 'create',
mock.Mock(return_value=fake_vol))
self.assertRaises(exception.ManilaException,
self._driver._allocate_container,
self._context,
self.share)
def test_wait_for_available_volume(self):
fake_volume = {'status': 'creating', 'id': 'fake'}
fake_available_volume = {'status': 'available', 'id': 'fake'}
self.mock_object(self._driver.volume_api, 'get',
mock.Mock(return_value=fake_available_volume))
actual_result = self._driver._wait_for_available_volume(
fake_volume, 5, "error", "timeout")
self.assertEqual(fake_available_volume, actual_result)
self._driver.volume_api.get.assert_called_once_with(
mock.ANY, fake_volume['id'])
@ddt.data(mock.Mock(return_value={'status': 'creating', 'id': 'fake'}),
mock.Mock(return_value={'status': 'error', 'id': 'fake'}))
def test_wait_for_available_volume_invalid(self, volume_get_mock):
fake_volume = {'status': 'creating', 'id': 'fake'}
self.mock_object(self._driver.volume_api, 'get', volume_get_mock)
self.assertRaises(
exception.ManilaException,
self._driver._wait_for_available_volume,
fake_volume, 1, "error", "timeout"
)
def test_deallocate_container(self):
fake_vol = fake_volume.FakeVolume()
self.mock_object(self._driver, '_get_volume',
mock.Mock(return_value=fake_vol))
self.mock_object(self._driver.volume_api, 'delete')
self.mock_object(self._driver.volume_api, 'get', mock.Mock(
side_effect=exception.VolumeNotFound(volume_id=fake_vol['id'])))
self._driver._deallocate_container(self._context, self.share)
self._driver._get_volume.assert_called_once_with(
self._context, self.share['id'])
self._driver.volume_api.delete.assert_called_once_with(
self._context, fake_vol['id'])
self._driver.volume_api.get.assert_called_once_with(
self._context, fake_vol['id'])
def test_create_share_from_snapshot(self):
vol1 = 'fake_vol1'
vol2 = 'fake_vol2'
self._helper_nfs.create_export.return_value = 'fakelocation'
self.mock_object(self._driver, '_allocate_container',
mock.Mock(return_value=vol1))
self.mock_object(self._driver, '_attach_volume',
mock.Mock(return_value=vol2))
self.mock_object(self._driver, '_mount_device')
result = self._driver.create_share_from_snapshot(
self._context,
self.share,
self.snapshot,
share_server=self.server)
self.assertEqual(result, 'fakelocation')
self._driver._allocate_container.assert_called_once_with(
self._driver.admin_context, self.share, self.snapshot)
self._driver._attach_volume.assert_called_once_with(
self._driver.admin_context, self.share,
self.server['backend_details']['instance_id'], vol1)
self._driver._mount_device.assert_called_once_with(
self.share, self.server['backend_details'], vol2)
self._helper_nfs.create_export.assert_called_once_with(
self.server['backend_details'], self.share['name'])
def test_create_share_from_snapshot_invalid_helper(self):
self._driver._helpers = {'CIFS': self._helper_cifs}
self.assertRaises(exception.InvalidShare,
self._driver.create_share_from_snapshot,
self._context, self.share, self.snapshot,
share_server=self.server)
def test_delete_share_no_share_servers_handling(self):
self.mock_object(self._driver, '_deallocate_container')
self.mock_object(
self._driver.service_instance_manager,
'get_common_server', mock.Mock(return_value=self.server))
self.mock_object(
self._driver.service_instance_manager,
'ensure_service_instance', mock.Mock(return_value=False))
CONF.set_default('driver_handles_share_servers', False)
self._driver.delete_share(self._context, self.share)
self._driver.service_instance_manager.get_common_server.\
assert_called_once_with()
self._driver._deallocate_container.assert_called_once_with(
self._driver.admin_context, self.share)
self._driver.service_instance_manager.ensure_service_instance.\
assert_called_once_with(
self._context, self.server['backend_details'])
def test_delete_share(self):
self.mock_object(self._driver, '_unmount_device')
self.mock_object(self._driver, '_detach_volume')
self.mock_object(self._driver, '_deallocate_container')
self._driver.delete_share(
self._context, self.share, share_server=self.server)
self._helper_nfs.remove_export.assert_called_once_with(
self.server['backend_details'], self.share['name'])
self._driver._unmount_device.assert_called_once_with(
self.share, self.server['backend_details'])
self._driver._detach_volume.assert_called_once_with(
self._driver.admin_context, self.share,
self.server['backend_details'])
self._driver._deallocate_container.assert_called_once_with(
self._driver.admin_context, self.share)
self._driver.service_instance_manager.ensure_service_instance.\
assert_called_once_with(
self._context, self.server['backend_details'])
def test_delete_share_without_share_server(self):
self.mock_object(self._driver, '_unmount_device')
self.mock_object(self._driver, '_detach_volume')
self.mock_object(self._driver, '_deallocate_container')
self._driver.delete_share(
self._context, self.share, share_server=None)
self.assertFalse(self._helper_nfs.remove_export.called)
self.assertFalse(self._driver._unmount_device.called)
self.assertFalse(self._driver._detach_volume.called)
self._driver._deallocate_container.assert_called_once_with(
self._driver.admin_context, self.share)
def test_delete_share_without_server_backend_details(self):
self.mock_object(self._driver, '_unmount_device')
self.mock_object(self._driver, '_detach_volume')
self.mock_object(self._driver, '_deallocate_container')
fake_share_server = {
'instance_id': 'fake_instance_id',
'ip': 'fake_ip',
'username': 'fake_username',
'password': 'fake_password',
'pk_path': 'fake_pk_path',
'backend_details': {}
}
self._driver.delete_share(
self._context, self.share, share_server=fake_share_server)
self.assertFalse(self._helper_nfs.remove_export.called)
self.assertFalse(self._driver._unmount_device.called)
self.assertFalse(self._driver._detach_volume.called)
self._driver._deallocate_container.assert_called_once_with(
self._driver.admin_context, self.share)
def test_delete_share_without_server_availability(self):
self.mock_object(self._driver, '_unmount_device')
self.mock_object(self._driver, '_detach_volume')
self.mock_object(self._driver, '_deallocate_container')
self.mock_object(
self._driver.service_instance_manager,
'ensure_service_instance', mock.Mock(return_value=False))
self._driver.delete_share(
self._context, self.share, share_server=self.server)
self.assertFalse(self._helper_nfs.remove_export.called)
self.assertFalse(self._driver._unmount_device.called)
self.assertFalse(self._driver._detach_volume.called)
self._driver._deallocate_container.assert_called_once_with(
self._driver.admin_context, self.share)
self._driver.service_instance_manager.ensure_service_instance.\
assert_called_once_with(
self._context, self.server['backend_details'])
def test_delete_share_invalid_helper(self):
self._driver._helpers = {'CIFS': self._helper_cifs}
self.assertRaises(exception.InvalidShare,
self._driver.delete_share,
self._context, self.share, share_server=self.server)
def test_create_snapshot(self):
fake_vol = fake_volume.FakeVolume()
fake_vol_snap = fake_volume.FakeVolumeSnapshot(share_id=fake_vol['id'])
self.mock_object(self._driver, '_get_volume',
mock.Mock(return_value=fake_vol))
self.mock_object(self._driver.volume_api, 'create_snapshot_force',
mock.Mock(return_value=fake_vol_snap))
self._driver.create_snapshot(self._context, fake_vol_snap,
share_server=self.server)
self._driver._get_volume.assert_called_once_with(
self._driver.admin_context, fake_vol_snap['share_id'])
self._driver.volume_api.create_snapshot_force.assert_called_once_with(
self._context,
fake_vol['id'],
CONF.volume_snapshot_name_template % fake_vol_snap['id'],
''
)
def test_delete_snapshot(self):
fake_vol_snap = fake_volume.FakeVolumeSnapshot()
fake_vol_snap2 = {'id': 'fake_vol_snap2'}
self.mock_object(self._driver, '_get_volume_snapshot',
mock.Mock(return_value=fake_vol_snap2))
self.mock_object(self._driver.volume_api, 'delete_snapshot')
self.mock_object(
self._driver.volume_api, 'get_snapshot',
mock.Mock(side_effect=exception.VolumeSnapshotNotFound(
snapshot_id=fake_vol_snap['id'])))
self._driver.delete_snapshot(self._context, fake_vol_snap,
share_server=self.server)
self._driver._get_volume_snapshot.assert_called_once_with(
self._driver.admin_context, fake_vol_snap['id'])
self._driver.volume_api.delete_snapshot.assert_called_once_with(
self._driver.admin_context, fake_vol_snap2['id'])
self._driver.volume_api.get_snapshot.assert_called_once_with(
self._driver.admin_context, fake_vol_snap2['id'])
def test_ensure_share(self):
vol1 = 'fake_vol1'
vol2 = 'fake_vol2'
self._helper_nfs.create_export.return_value = 'fakelocation'
self.mock_object(self._driver, '_get_volume',
mock.Mock(return_value=vol1))
self.mock_object(self._driver, '_attach_volume',
mock.Mock(return_value=vol2))
self.mock_object(self._driver, '_mount_device')
self._driver.ensure_share(
self._context, self.share, share_server=self.server)
self._driver._get_volume.assert_called_once_with(
self._context, self.share['id'])
self._driver._attach_volume.assert_called_once_with(
self._context, self.share,
self.server['backend_details']['instance_id'], vol1)
self._driver._mount_device.assert_called_once_with(
self.share, self.server['backend_details'], vol2)
self._helper_nfs.create_export.assert_called_once_with(
self.server['backend_details'], self.share['name'], recreate=True)
def test_ensure_share_volume_is_absent(self):
self.mock_object(
self._driver, '_get_volume', mock.Mock(return_value=None))
self.mock_object(self._driver, '_attach_volume')
self._driver.ensure_share(
self._context, self.share, share_server=self.server)
self._driver._get_volume.assert_called_once_with(
self._context, self.share['id'])
self.assertFalse(self._driver._attach_volume.called)
def test_ensure_share_invalid_helper(self):
self._driver._helpers = {'CIFS': self._helper_cifs}
self.assertRaises(exception.InvalidShare, self._driver.ensure_share,
self._context, self.share, share_server=self.server)
@ddt.data(const.ACCESS_LEVEL_RW, const.ACCESS_LEVEL_RO)
def test_allow_access(self, access_level):
access = {
'access_type': 'ip',
'access_to': 'fake_dest',
'access_level': access_level,
}
self._driver.allow_access(
self._context, self.share, access, share_server=self.server)
self._driver._helpers[self.share['share_proto']].\
allow_access.assert_called_once_with(
self.server['backend_details'], self.share['name'],
access['access_type'], access['access_level'],
access['access_to'])
def test_allow_access_unsupported(self):
access = {
'access_type': 'ip',
'access_to': 'fake_dest',
'access_level': 'fakefoobar',
}
self.assertRaises(
exception.InvalidShareAccessLevel,
self._driver.allow_access,
self._context, self.share, access, share_server=self.server)
def test_deny_access(self):
access = 'fake_access'
self._driver.deny_access(
self._context, self.share, access, share_server=self.server)
self._driver._helpers[
self.share['share_proto']].deny_access.assert_called_once_with(
self.server['backend_details'], self.share['name'], access)
@ddt.data(fake_share.fake_share(),
fake_share.fake_share(share_proto='NFSBOGUS'),
fake_share.fake_share(share_proto='CIFSBOGUS'))
def test__get_helper_with_wrong_proto(self, share):
self.assertRaises(exception.InvalidShare,
self._driver._get_helper, share)
def test__setup_server(self):
sim = self._driver.instance_manager
net_info = {
'server_id': 'fake',
'neutron_net_id': 'fake-net-id',
'neutron_subnet_id': 'fake-subnet-id',
}
self._driver.setup_server(net_info)
sim.set_up_service_instance.assert_called_once_with(
self._context, net_info)
def test__setup_server_revert(self):
def raise_exception(*args, **kwargs):
raise exception.ServiceInstanceException
net_info = {'server_id': 'fake',
'neutron_net_id': 'fake-net-id',
'neutron_subnet_id': 'fake-subnet-id'}
self.mock_object(self._driver.service_instance_manager,
'set_up_service_instance',
mock.Mock(side_effect=raise_exception))
self.assertRaises(exception.ServiceInstanceException,
self._driver.setup_server,
net_info)
def test__teardown_server(self):
server_details = {
'instance_id': 'fake_instance_id',
'subnet_id': 'fake_subnet_id',
'router_id': 'fake_router_id',
}
self._driver.teardown_server(server_details)
self._driver.service_instance_manager.delete_service_instance.\
assert_called_once_with(
self._driver.admin_context, server_details)
def test_ssh_exec_connection_not_exist(self):
ssh_output = 'fake_ssh_output'
cmd = ['fake', 'command']
ssh = mock.Mock()
ssh.get_transport = mock.Mock()
ssh.get_transport().is_active = mock.Mock(return_value=True)
ssh_pool = mock.Mock()
ssh_pool.create = mock.Mock(return_value=ssh)
self.mock_object(utils, 'SSHPool', mock.Mock(return_value=ssh_pool))
self.mock_object(processutils, 'ssh_execute',
mock.Mock(return_value=ssh_output))
self._driver.ssh_connections = {}
result = self._driver._ssh_exec(self.server, cmd)
utils.SSHPool.assert_called_once_with(
self.server['ip'], 22, None, self.server['username'],
self.server['password'], self.server['pk_path'], max_size=1)
ssh_pool.create.assert_called_once_with()
processutils.ssh_execute.assert_called_once_with(ssh, 'fake command')
ssh.get_transport().is_active.assert_called_once_with()
self.assertEqual(
self._driver.ssh_connections,
{self.server['instance_id']: (ssh_pool, ssh)}
)
self.assertEqual(ssh_output, result)
def test_ssh_exec_connection_exist(self):
ssh_output = 'fake_ssh_output'
cmd = ['fake', 'command']
ssh = mock.Mock()
ssh.get_transport = mock.Mock()
ssh.get_transport().is_active = mock.Mock(side_effect=lambda: True)
ssh_pool = mock.Mock()
self.mock_object(processutils, 'ssh_execute',
mock.Mock(return_value=ssh_output))
self._driver.ssh_connections = {
self.server['instance_id']: (ssh_pool, ssh)
}
result = self._driver._ssh_exec(self.server, cmd)
processutils.ssh_execute.assert_called_once_with(ssh, 'fake command')
ssh.get_transport().is_active.assert_called_once_with()
self.assertEqual(
self._driver.ssh_connections,
{self.server['instance_id']: (ssh_pool, ssh)}
)
self.assertEqual(ssh_output, result)
def test_ssh_exec_connection_recreation(self):
ssh_output = 'fake_ssh_output'
cmd = ['fake', 'command']
ssh = mock.Mock()
ssh.get_transport = mock.Mock()
ssh.get_transport().is_active = mock.Mock(side_effect=lambda: False)
ssh_pool = mock.Mock()
ssh_pool.create = mock.Mock(side_effect=lambda: ssh)
ssh_pool.remove = mock.Mock()
self.mock_object(processutils, 'ssh_execute',
mock.Mock(return_value=ssh_output))
self._driver.ssh_connections = {
self.server['instance_id']: (ssh_pool, ssh)
}
result = self._driver._ssh_exec(self.server, cmd)
processutils.ssh_execute.assert_called_once_with(ssh, 'fake command')
ssh.get_transport().is_active.assert_called_once_with()
ssh_pool.create.assert_called_once_with()
ssh_pool.remove.assert_called_once_with(ssh)
self.assertEqual(
self._driver.ssh_connections,
{self.server['instance_id']: (ssh_pool, ssh)}
)
self.assertEqual(ssh_output, result)
def test_get_share_stats_refresh_false(self):
self._driver._stats = {'fake_key': 'fake_value'}
result = self._driver.get_share_stats(False)
self.assertEqual(self._driver._stats, result)
def test_get_share_stats_refresh_true(self):
fake_stats = {'fake_key': 'fake_value'}
self._driver._stats = fake_stats
expected_keys = [
'QoS_support', 'driver_version', 'share_backend_name',
'free_capacity_gb', 'total_capacity_gb',
'driver_handles_share_servers',
'reserved_percentage', 'vendor_name', 'storage_protocol',
]
result = self._driver.get_share_stats(True)
self.assertNotEqual(fake_stats, result)
for key in expected_keys:
self.assertIn(key, result)
self.assertEqual(True, result['driver_handles_share_servers'])
self.assertEqual('Open Source', result['vendor_name'])
def test_manage_invalid_driver_mode(self):
CONF.set_default('driver_handles_share_servers', True)
self.assertRaises(exception.InvalidDriverMode,
self._driver.manage_existing, 'fake', {})
def _setup_manage_mocks(self,
get_share_type_extra_specs='False',
is_device_mounted=True,
server_details=None):
CONF.set_default('driver_handles_share_servers', False)
self.mock_object(share_types, 'get_share_type_extra_specs',
mock.Mock(return_value=get_share_type_extra_specs))
self.mock_object(self._driver, '_is_device_mounted',
mock.Mock(return_value=is_device_mounted))
self.mock_object(self._driver, 'service_instance_manager')
server = {'backend_details': server_details}
self.mock_object(self._driver.service_instance_manager,
'get_common_server',
mock.Mock(return_value=server))
def test_manage_invalid_protocol(self):
share = {'share_proto': 'fake_proto'}
self._setup_manage_mocks()
self.assertRaises(exception.InvalidShare,
self._driver.manage_existing, share, {})
def test_manage_share_type_mismatch(self):
share = {'share_proto': 'NFS', 'share_type_id': 'fake'}
self._setup_manage_mocks(get_share_type_extra_specs='True')
self.assertRaises(exception.ManageExistingShareTypeMismatch,
self._driver.manage_existing, share, {})
share_types.get_share_type_extra_specs.assert_called_once_with(
share['share_type_id'],
const.ExtraSpecs.DRIVER_HANDLES_SHARE_SERVERS
)
def test_manage_not_mounted_share(self):
share = get_fake_manage_share()
fake_path = '/foo/bar'
self._setup_manage_mocks(is_device_mounted=False)
self.mock_object(
self._driver._helpers[share['share_proto']],
'get_share_path_by_export_location',
mock.Mock(return_value=fake_path))
self.assertRaises(exception.ManageInvalidShare,
self._driver.manage_existing, share, {})
self.assertEqual(
1,
self._driver.service_instance_manager.get_common_server.call_count)
self._driver._is_device_mounted.assert_called_once_with(
fake_path, None)
self._driver._helpers[share['share_proto']].\
get_share_path_by_export_location.assert_called_once_with(
None, share['export_locations'][0]['path'])
def test_manage_share_not_attached_to_cinder_volume_invalid_size(self):
share = get_fake_manage_share()
server_details = {}
fake_path = '/foo/bar'
self._setup_manage_mocks(server_details=server_details)
self.mock_object(self._driver, '_get_volume',
mock.Mock(return_value=None))
error = exception.ManageInvalidShare(reason="fake")
self.mock_object(
self._driver, '_get_mounted_share_size',
mock.Mock(side_effect=error))
self.mock_object(
self._driver._helpers[share['share_proto']],
'get_share_path_by_export_location',
mock.Mock(return_value=fake_path))
self.assertRaises(exception.ManageInvalidShare,
self._driver.manage_existing, share, {})
self._driver._get_mounted_share_size.assert_called_once_with(
fake_path, server_details)
self._driver._helpers[share['share_proto']].\
get_share_path_by_export_location.assert_called_once_with(
server_details, share['export_locations'][0]['path'])
def test_manage_share_not_attached_to_cinder_volume(self):
share = get_fake_manage_share()
share_size = "fake"
fake_path = '/foo/bar'
fake_exports = ['foo', 'bar']
server_details = {}
self._setup_manage_mocks(server_details=server_details)
self.mock_object(self._driver, '_get_volume')
self.mock_object(self._driver, '_get_mounted_share_size',
mock.Mock(return_value=share_size))
self.mock_object(
self._driver._helpers[share['share_proto']],
'get_share_path_by_export_location',
mock.Mock(return_value=fake_path))
self.mock_object(
self._driver._helpers[share['share_proto']],
'get_exports_for_share',
mock.Mock(return_value=fake_exports))
result = self._driver.manage_existing(share, {})
self.assertEqual(
{'size': share_size, 'export_locations': fake_exports}, result)
self._driver._helpers[share['share_proto']].get_exports_for_share.\
assert_called_once_with(
server_details, share['export_locations'][0]['path'])
self._driver._helpers[share['share_proto']].\
get_share_path_by_export_location.assert_called_once_with(
server_details, share['export_locations'][0]['path'])
self._driver._get_mounted_share_size.assert_called_once_with(
fake_path, server_details)
self.assertFalse(self._driver._get_volume.called)
def test_manage_share_attached_to_cinder_volume_not_found(self):
share = get_fake_manage_share()
server_details = {}
driver_options = {'volume_id': 'fake'}
self._setup_manage_mocks(server_details=server_details)
self.mock_object(
self._driver.volume_api, 'get',
mock.Mock(side_effect=exception.VolumeNotFound(volume_id="fake"))
)
self.assertRaises(exception.ManageInvalidShare,
self._driver.manage_existing, share, driver_options)
self._driver.volume_api.get.assert_called_once_with(
mock.ANY, driver_options['volume_id'])
def test_manage_share_attached_to_cinder_volume_not_mounted_to_srv(self):
share = get_fake_manage_share()
server_details = {'instance_id': 'fake'}
driver_options = {'volume_id': 'fake'}
volume = {'id': 'fake'}
self._setup_manage_mocks(server_details=server_details)
self.mock_object(self._driver.volume_api, 'get',
mock.Mock(return_value=volume))
self.mock_object(self._driver.compute_api, 'instance_volumes_list',
mock.Mock(return_value=[]))
self.assertRaises(exception.ManageInvalidShare,
self._driver.manage_existing, share, driver_options)
self._driver.volume_api.get.assert_called_once_with(
mock.ANY, driver_options['volume_id'])
self._driver.compute_api.instance_volumes_list.assert_called_once_with(
mock.ANY, server_details['instance_id'])
def test_manage_share_attached_to_cinder_volume(self):
share = get_fake_manage_share()
fake_size = 'foobar'
fake_exports = ['foo', 'bar']
server_details = {'instance_id': 'fake'}
driver_options = {'volume_id': 'fake'}
volume = {'id': 'fake', 'name': 'fake_volume_1', 'size': fake_size}
self._setup_manage_mocks(server_details=server_details)
self.mock_object(self._driver.volume_api, 'get',
mock.Mock(return_value=volume))
self._driver.volume_api.update = mock.Mock()
fake_volume = mock.Mock()
fake_volume.id = 'fake'
self.mock_object(self._driver.compute_api, 'instance_volumes_list',
mock.Mock(return_value=[fake_volume]))
self.mock_object(
self._driver._helpers[share['share_proto']],
'get_exports_for_share',
mock.Mock(return_value=fake_exports))
result = self._driver.manage_existing(share, driver_options)
self.assertEqual(
{'size': fake_size, 'export_locations': fake_exports}, result)
self._driver._helpers[share['share_proto']].get_exports_for_share.\
assert_called_once_with(
server_details, share['export_locations'][0]['path'])
expected_volume_update = {
'name': self._driver._get_volume_name(share['id'])
}
self._driver.volume_api.update.assert_called_once_with(
mock.ANY, volume['id'], expected_volume_update)
self.fake_private_storage.update.assert_called_once_with(
share['id'], {'volume_id': volume['id']}
)
def test_get_mounted_share_size(self):
output = ("Filesystem blocks Used Available Capacity Mounted on\n"
"/dev/fake 1G 1G 1G 4% /shares/share-fake")
self.mock_object(self._driver, '_ssh_exec',
mock.Mock(return_value=(output, '')))
actual_result = self._driver._get_mounted_share_size('/fake/path', {})
self.assertEqual(1, actual_result)
@ddt.data("fake\nfake\n", "fake", "fake\n")
def test_get_mounted_share_size_invalid_output(self, output):
self.mock_object(self._driver, '_ssh_exec',
mock.Mock(return_value=(output, '')))
self.assertRaises(exception.ManageInvalidShare,
self._driver._get_mounted_share_size,
'/fake/path', {})
def test_extend_share(self):
fake_volume = "fake"
fake_share = {'id': 'fake'}
new_size = 123
srv_details = self.server['backend_details']
self.mock_object(
self._driver.service_instance_manager,
'get_common_server',
mock.Mock(return_value=self.server)
)
self.mock_object(self._driver, '_unmount_device')
self.mock_object(self._driver, '_detach_volume')
self.mock_object(self._driver, '_extend_volume')
self.mock_object(self._driver, '_attach_volume')
self.mock_object(self._driver, '_mount_device')
self.mock_object(self._driver, '_resize_filesystem')
self.mock_object(
self._driver, '_get_volume',
mock.Mock(return_value=fake_volume)
)
CONF.set_default('driver_handles_share_servers', False)
self._driver.extend_share(fake_share, new_size)
self.assertTrue(
self._driver.service_instance_manager.get_common_server.called)
self._driver._unmount_device.assert_called_once_with(
fake_share, srv_details)
self._driver._detach_volume.assert_called_once_with(
mock.ANY, fake_share, srv_details)
self._driver._get_volume.assert_called_once_with(
mock.ANY, fake_share['id'])
self._driver._extend_volume.assert_called_once_with(
mock.ANY, fake_volume, new_size)
self._driver._attach_volume.assert_called_once_with(
mock.ANY, fake_share, srv_details['instance_id'], mock.ANY)
self.assertTrue(self._driver._resize_filesystem.called)
def test_extend_volume(self):
fake_volume = {'id': 'fake'}
new_size = 123
self.mock_object(self._driver.volume_api, 'extend')
self.mock_object(self._driver, '_wait_for_available_volume')
self._driver._extend_volume(self._context, fake_volume, new_size)
self._driver.volume_api.extend.assert_called_once_with(
self._context, fake_volume['id'], new_size
)
self._driver._wait_for_available_volume.assert_called_once_with(
fake_volume, mock.ANY, msg_timeout=mock.ANY, msg_error=mock.ANY
)
def test_resize_filesystem(self):
fake_server_details = {'fake': 'fake'}
fake_volume = {'mountpoint': '/dev/fake'}
self.mock_object(self._driver, '_ssh_exec')
self._driver._resize_filesystem(fake_server_details, fake_volume)
self._driver._ssh_exec.assert_any_call(
fake_server_details, ['sudo', 'fsck', '-pf', '/dev/fake'])
self._driver._ssh_exec.assert_any_call(
fake_server_details, ['sudo', 'resize2fs', '/dev/fake'])
self.assertEqual(2, self._driver._ssh_exec.call_count)
@ddt.data({'share_servers': [], 'result': None},
{'share_servers': None, 'result': None},
{'share_servers': ['fake'], 'result': 'fake'},
{'share_servers': ['fake', 'test'], 'result': 'fake'})
@ddt.unpack
def tests_choose_share_server_compatible_with_share(self, share_servers,
result):
fake_share = "fake"
actual_result = self._driver.choose_share_server_compatible_with_share(
self._context, share_servers, fake_share
)
self.assertEqual(result, actual_result)
@generic.ensure_server
def fake(driver_instance, context, share_server=None):
return share_server
@ddt.ddt
class GenericDriverEnsureServerTestCase(test.TestCase):
def setUp(self):
super(GenericDriverEnsureServerTestCase, self).setUp()
self._context = context.get_admin_context()
self.server = {'id': 'fake_id', 'backend_details': {'foo': 'bar'}}
self.dhss_false = type(
'Fake', (object,), {'driver_handles_share_servers': False})
self.dhss_true = type(
'Fake', (object,), {'driver_handles_share_servers': True})
def test_share_servers_are_not_handled_server_not_provided(self):
self.dhss_false.service_instance_manager = mock.Mock()
self.dhss_false.service_instance_manager.get_common_server = (
mock.Mock(return_value=self.server))
self.dhss_false.service_instance_manager.ensure_service_instance = (
mock.Mock(return_value=True))
actual = fake(self.dhss_false, self._context)
self.assertEqual(self.server, actual)
self.dhss_false.service_instance_manager.\
get_common_server.assert_called_once_with()
self.dhss_false.service_instance_manager.ensure_service_instance.\
assert_called_once_with(
self._context, self.server['backend_details'])
@ddt.data({'id': 'without_details'},
{'id': 'with_details', 'backend_details': {'foo': 'bar'}})
def test_share_servers_are_not_handled_server_provided(self, server):
self.assertRaises(
exception.ManilaException,
fake, self.dhss_false, self._context, share_server=server)
def test_share_servers_are_handled_server_provided(self):
self.dhss_true.service_instance_manager = mock.Mock()
self.dhss_true.service_instance_manager.ensure_service_instance = (
mock.Mock(return_value=True))
actual = fake(self.dhss_true, self._context, share_server=self.server)
self.assertEqual(self.server, actual)
self.dhss_true.service_instance_manager.ensure_service_instance.\
assert_called_once_with(
self._context, self.server['backend_details'])
def test_share_servers_are_handled_invalid_server_provided(self):
server = {'id': 'without_details'}
self.assertRaises(
exception.ManilaException,
fake, self.dhss_true, self._context, share_server=server)
def test_share_servers_are_handled_server_not_provided(self):
self.assertRaises(
exception.ManilaException, fake, self.dhss_true, self._context)
@ddt.ddt
class NFSHelperTestCase(test.TestCase):
"""Test case for NFS helper of generic driver."""
def setUp(self):
super(NFSHelperTestCase, self).setUp()
fake_utils.stub_out_utils_execute(self)
self.fake_conf = manila.share.configuration.Configuration(None)
self._ssh_exec = mock.Mock(return_value=('', ''))
self._execute = mock.Mock(return_value=('', ''))
self._helper = generic.NFSHelper(self._execute, self._ssh_exec,
self.fake_conf)
ip = '10.254.0.3'
self.server = fake_compute.FakeServer(
ip=ip, public_address=ip, instance_id='fake_instance_id')
self.share_name = 'fake_share_name'
def test_create_export(self):
ret = self._helper.create_export(self.server, self.share_name)
expected_location = ':'.join([self.server['public_address'],
os.path.join(CONF.share_mount_path,
self.share_name)])
self.assertEqual(ret, expected_location)
@ddt.data(const.ACCESS_LEVEL_RW, const.ACCESS_LEVEL_RO)
def test_allow_access(self, data):
self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files')
self._helper.allow_access(
self.server, self.share_name, 'ip', data, '10.0.0.2')
local_path = os.path.join(CONF.share_mount_path, self.share_name)
self._ssh_exec.assert_has_calls([
mock.call(self.server, ['sudo', 'exportfs']),
mock.call(self.server, ['sudo', 'exportfs', '-o',
'%s,no_subtree_check' % data,
':'.join(['10.0.0.2', local_path])])
])
self._helper._sync_nfs_temp_and_perm_files.assert_called_once_with(
self.server)
def test_allow_access_no_ip(self):
self.assertRaises(
exception.InvalidShareAccess,
self._helper.allow_access,
self.server, self.share_name,
'fake_type', 'fake_level', 'fake_rule')
@ddt.data(const.ACCESS_LEVEL_RW, const.ACCESS_LEVEL_RO)
def test_deny_access(self, data):
self.mock_object(self._helper, '_sync_nfs_temp_and_perm_files')
local_path = os.path.join(CONF.share_mount_path, self.share_name)
access = dict(
access_to='10.0.0.2', access_type='ip', access_level=data)
self._helper.deny_access(self.server, self.share_name, access)
export_string = ':'.join(['10.0.0.2', local_path])
expected_exec = ['sudo', 'exportfs', '-u', export_string]
self._ssh_exec.assert_called_once_with(self.server, expected_exec)
self._helper._sync_nfs_temp_and_perm_files.assert_called_once_with(
self.server)
def test_sync_nfs_temp_and_perm_files(self):
self._helper._sync_nfs_temp_and_perm_files(self.server)
self._helper._ssh_exec.assert_called_once_with(self.server, mock.ANY)
@ddt.data('/foo/bar', '5.6.7.8:/bar/quuz', '5.6.7.88:/foo/quuz')
def test_get_exports_for_share(self, export_location):
server = dict(public_address='1.2.3.4')
result = self._helper.get_exports_for_share(server, export_location)
path = export_location.split(':')[-1]
self.assertEqual([':'.join([server['public_address'], path])], result)
@ddt.data(
{'public_address_with_suffix': 'foo'},
{'with_prefix_public_address': 'bar'},
{'with_prefix_public_address_and_with_suffix': 'quuz'}, {})
def test_get_exports_for_share_with_error(self, server):
export_location = '1.2.3.4:/foo/bar'
self.assertRaises(
exception.ManilaException,
self._helper.get_exports_for_share, server, export_location)
@ddt.data('/foo/bar', '5.6.7.8:/foo/bar', '5.6.7.88:fake:/foo/bar')
def test_get_share_path_by_export_location(self, export_location):
result = self._helper.get_share_path_by_export_location(
dict(), export_location)
self.assertEqual('/foo/bar', result)
@ddt.ddt
class CIFSHelperTestCase(test.TestCase):
"""Test case for CIFS helper of generic driver."""
def setUp(self):
super(CIFSHelperTestCase, self).setUp()
self.server_details = {'instance_id': 'fake',
'public_address': '1.2.3.4', }
self.share_name = 'fake_share_name'
self.fake_conf = manila.share.configuration.Configuration(None)
self._ssh_exec = mock.Mock(return_value=('', ''))
self._execute = mock.Mock(return_value=('', ''))
self._helper = generic.CIFSHelper(self._execute, self._ssh_exec,
self.fake_conf)
self.access = dict(
access_level=const.ACCESS_LEVEL_RW,
access_type='ip',
access_to='1.1.1.1')
def test_init_helper(self):
self._helper.init_helper(self.server_details)
self._helper._ssh_exec.assert_called_once_with(
self.server_details,
['sudo', 'net', 'conf', 'list'],
)
def test_create_export_share_does_not_exist(self):
def fake_ssh_exec(*args, **kwargs):
if 'showshare' in args[1]:
raise exception.ProcessExecutionError()
else:
return ('', '')
self.mock_object(self._helper, '_ssh_exec',
mock.Mock(side_effect=fake_ssh_exec))
ret = self._helper.create_export(self.server_details, self.share_name)
expected_location = '\\\\%s\\%s' % (
self.server_details['public_address'], self.share_name)
self.assertEqual(ret, expected_location)
share_path = os.path.join(
self._helper.configuration.share_mount_path,
self.share_name)
self._helper._ssh_exec.assert_has_calls([
mock.call(
self.server_details,
['sudo', 'net', 'conf', 'showshare', self.share_name, ]
),
mock.call(
self.server_details,
[
'sudo', 'net', 'conf', 'addshare', self.share_name,
share_path, 'writeable=y', 'guest_ok=y',
]
),
mock.call(self.server_details, mock.ANY),
])
def test_create_export_share_exist_recreate_true(self):
ret = self._helper.create_export(self.server_details, self.share_name,
recreate=True)
expected_location = '\\\\%s\\%s' % (
self.server_details['public_address'], self.share_name)
self.assertEqual(ret, expected_location)
share_path = os.path.join(
self._helper.configuration.share_mount_path,
self.share_name)
self._helper._ssh_exec.assert_has_calls([
mock.call(
self.server_details,
['sudo', 'net', 'conf', 'showshare', self.share_name, ]
),
mock.call(
self.server_details,
['sudo', 'net', 'conf', 'delshare', self.share_name, ]
),
mock.call(
self.server_details,
[
'sudo', 'net', 'conf', 'addshare', self.share_name,
share_path, 'writeable=y', 'guest_ok=y',
]
),
mock.call(self.server_details, mock.ANY),
])
def test_create_export_share_exist_recreate_false(self):
self.assertRaises(
exception.ShareBackendException,
self._helper.create_export,
self.server_details,
self.share_name,
recreate=False,
)
self._helper._ssh_exec.assert_has_calls([
mock.call(
self.server_details,
['sudo', 'net', 'conf', 'showshare', self.share_name, ]
),
])
def test_remove_export(self):
self._helper.remove_export(self.server_details, self.share_name)
self._helper._ssh_exec.assert_called_once_with(
self.server_details,
['sudo', 'net', 'conf', 'delshare', self.share_name],
)
def test_remove_export_forcibly(self):
delshare_command = ['sudo', 'net', 'conf', 'delshare', self.share_name]
def fake_ssh_exec(*args, **kwargs):
if delshare_command == args[1]:
raise exception.ProcessExecutionError()
else:
return ('', '')
self.mock_object(self._helper, '_ssh_exec',
mock.Mock(side_effect=fake_ssh_exec))
self._helper.remove_export(self.server_details, self.share_name)
self._helper._ssh_exec.assert_has_calls([
mock.call(
self.server_details,
['sudo', 'net', 'conf', 'delshare', self.share_name],
),
mock.call(
self.server_details,
['sudo', 'smbcontrol', 'all', 'close-share', self.share_name],
),
])
def test_allow_access_ip_exist(self):
hosts = [self.access['access_to'], ]
self.mock_object(self._helper, '_get_allow_hosts',
mock.Mock(return_value=hosts))
self.mock_object(self._helper, '_set_allow_hosts')
self.assertRaises(
exception.ShareAccessExists,
self._helper.allow_access,
self.server_details,
self.share_name,
self.access['access_type'],
self.access['access_level'],
self.access['access_to'])
self._helper._get_allow_hosts.assert_called_once_with(
self.server_details, self.share_name)
self._helper._set_allow_hosts.assert_has_calls([])
def test_allow_access_ip_does_not_exist(self):
hosts = []
self.mock_object(self._helper, '_get_allow_hosts',
mock.Mock(return_value=hosts))
self.mock_object(self._helper, '_set_allow_hosts')
self._helper.allow_access(
self.server_details, self.share_name,
self.access['access_type'], self.access['access_level'],
self.access['access_to'])
self._helper._get_allow_hosts.assert_called_once_with(
self.server_details, self.share_name)
self._helper._set_allow_hosts.assert_called_once_with(
self.server_details, hosts, self.share_name)
def test_allow_access_wrong_type(self):
self.assertRaises(
exception.InvalidShareAccess,
self._helper.allow_access,
self.server_details,
self.share_name, 'fake', const.ACCESS_LEVEL_RW, '1.1.1.1')
@ddt.data(const.ACCESS_LEVEL_RO, 'fake')
def test_allow_access_wrong_access_level(self, data):
self.assertRaises(
exception.InvalidShareAccessLevel,
self._helper.allow_access,
self.server_details,
self.share_name, 'ip', data, '1.1.1.1')
@ddt.data(const.ACCESS_LEVEL_RO, 'fake')
def test_deny_access_unsupported_access_level(self, data):
access = dict(access_to='1.1.1.1', access_level=data)
self.mock_object(self._helper, '_get_allow_hosts')
self.mock_object(self._helper, '_set_allow_hosts')
self._helper.deny_access(self.server_details, self.share_name, access)
self.assertFalse(self._helper._get_allow_hosts.called)
self.assertFalse(self._helper._set_allow_hosts.called)
def test_deny_access_list_has_value(self):
hosts = [self.access['access_to'], ]
self.mock_object(self._helper, '_get_allow_hosts',
mock.Mock(return_value=hosts))
self.mock_object(self._helper, '_set_allow_hosts')
self._helper.deny_access(
self.server_details, self.share_name, self.access)
self._helper._get_allow_hosts.assert_called_once_with(
self.server_details, self.share_name)
self._helper._set_allow_hosts.assert_called_once_with(
self.server_details, [], self.share_name)
def test_deny_access_list_does_not_have_value(self):
hosts = []
self.mock_object(self._helper, '_get_allow_hosts',
mock.Mock(return_value=hosts))
self.mock_object(self._helper, '_set_allow_hosts')
self._helper.deny_access(
self.server_details, self.share_name, self.access)
self._helper._get_allow_hosts.assert_called_once_with(
self.server_details, self.share_name)
self._helper._set_allow_hosts.assert_has_calls([])
def test_deny_access_force(self):
self.mock_object(
self._helper,
'_get_allow_hosts',
mock.Mock(side_effect=exception.ProcessExecutionError()),
)
self.mock_object(self._helper, '_set_allow_hosts')
self._helper.deny_access(
self.server_details, self.share_name, self.access, force=True)
self._helper._get_allow_hosts.assert_called_once_with(
self.server_details, self.share_name)
self._helper._set_allow_hosts.assert_has_calls([])
def test_deny_access_not_force(self):
def raise_process_execution_error(*args, **kwargs):
raise exception.ProcessExecutionError()
self.mock_object(self._helper, '_get_allow_hosts',
mock.Mock(side_effect=raise_process_execution_error))
self.mock_object(self._helper, '_set_allow_hosts')
self.assertRaises(
exception.ProcessExecutionError,
self._helper.deny_access,
self.server_details, self.share_name, self.access)
self._helper._get_allow_hosts.assert_called_once_with(
self.server_details, self.share_name)
self._helper._set_allow_hosts.assert_has_calls([])
@ddt.data(
'', '1.2.3.4:/nfs/like/export', '/1.2.3.4/foo', '\\1.2.3.4\\foo',
'//1.2.3.4\\mixed_slashes_and_backslashes_one',
'\\\\1.2.3.4/mixed_slashes_and_backslashes_two')
def test__get_share_group_name_from_export_location(self, export_location):
self.assertRaises(
exception.InvalidShare,
self._helper._get_share_group_name_from_export_location,
export_location)
@ddt.data('//5.6.7.8/foo', '\\\\5.6.7.8\\foo')
def test_get_exports_for_share(self, export_location):
server = dict(public_address='1.2.3.4')
self.mock_object(
self._helper, '_get_share_group_name_from_export_location',
mock.Mock(side_effect=(
self._helper._get_share_group_name_from_export_location)))
result = self._helper.get_exports_for_share(server, export_location)
expected_export_location = ['\\\\%s\\foo' % server['public_address']]
self.assertEqual(expected_export_location, result)
self._helper._get_share_group_name_from_export_location.\
assert_called_once_with(export_location)
@ddt.data(
{'public_address_with_suffix': 'foo'},
{'with_prefix_public_address': 'bar'},
{'with_prefix_public_address_and_with_suffix': 'quuz'}, {})
def test_get_exports_for_share_with_exception(self, server):
export_location = '1.2.3.4:/foo/bar'
self.assertRaises(
exception.ManilaException,
self._helper.get_exports_for_share, server, export_location)
@ddt.data('//5.6.7.8/foo', '\\\\5.6.7.8\\foo')
def test_get_share_path_by_export_location(self, export_location):
fake_path = ' /bar/quuz\n '
fake_server = dict()
self.mock_object(
self._helper, '_ssh_exec',
mock.Mock(return_value=(fake_path, 'fake')))
self.mock_object(
self._helper, '_get_share_group_name_from_export_location',
mock.Mock(side_effect=(
self._helper._get_share_group_name_from_export_location)))
result = self._helper.get_share_path_by_export_location(
fake_server, export_location)
self.assertEqual('/bar/quuz', result)
self._helper._ssh_exec.assert_called_once_with(
fake_server, ['sudo', 'net', 'conf', 'getparm', 'foo', 'path'])
self._helper._get_share_group_name_from_export_location.\
assert_called_once_with(export_location)