Merge "removes the nova-volume code from nova"

This commit is contained in:
Jenkins
2012-10-29 01:57:13 +00:00
committed by Gerrit Code Review
23 changed files with 5 additions and 6016 deletions

View File

@@ -70,8 +70,8 @@ if __name__ == '__main__':
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s') % mod.__name__)
for binary in ['nova-compute', 'nova-volume',
'nova-network', 'nova-scheduler', 'nova-cert']:
for binary in ['nova-compute', 'nova-network', 'nova-scheduler',
'nova-cert']:
try:
launcher.launch_server(service.Service.create(binary=binary))
except (Exception, SystemExit):

View File

@@ -1,46 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Starter script for Nova OS API."""
import eventlet
eventlet.monkey_patch(os=False)
import os
import sys
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(
sys.argv[0]), os.pardir, os.pardir))
if os.path.exists(os.path.join(possible_topdir, "nova", "__init__.py")):
sys.path.insert(0, possible_topdir)
from nova import flags
from nova.openstack.common import log as logging
from nova import service
from nova import utils
if __name__ == '__main__':
flags.parse_args(sys.argv)
logging.setup("nova")
utils.monkey_patch()
server = service.WSGIService('osapi_volume')
service.serve(server, workers=server.workers)
service.wait()

View File

@@ -777,33 +777,6 @@ class VersionCommands(object):
self.list()
class VolumeCommands(object):
"""Methods for dealing with a cloud in an odd state"""
@args('--volume', dest='volume_id', metavar='<volume id>',
help='Volume ID')
def reattach(self, volume_id):
"""Re-attach a volume that has previously been attached
to an instance. Typically called after a compute host
has been rebooted."""
if 'cinder' in FLAGS.volume_api_class:
print(_("\"nova-manage volume reattach\" only valid "
"when using nova-volume service"))
sys.exit(1)
ctxt = context.get_admin_context()
volume = db.volume_get(ctxt, param2id(volume_id))
if not volume['instance_id']:
print _("volume is not attached to an instance")
return
instance = db.instance_get(ctxt, volume['instance_id'])
rpcapi = compute_rpcapi.ComputeAPI()
rpcapi.attach_volume(ctxt, instance, volume['id'],
volume['mountpoint'])
class InstanceTypeCommands(object):
"""Class for managing instance types / flavors."""
@@ -1207,7 +1180,6 @@ CATEGORIES = [
('sm', StorageManagerCommands),
('version', VersionCommands),
('vm', VmCommands),
('volume', VolumeCommands),
('vpn', VpnCommands),
]

View File

@@ -1,50 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Starter script for Nova Volume."""
import eventlet
eventlet.monkey_patch()
import os
import sys
# If ../nova/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
sys.path.insert(0, possible_topdir)
from nova import flags
from nova.openstack.common import log as logging
from nova import service
from nova import utils
if __name__ == '__main__':
flags.parse_args(sys.argv)
FLAGS = flags.FLAGS
logging.setup("nova")
utils.monkey_patch()
server = service.Service.create(binary='nova-volume',
topic=FLAGS.volume_topic)
service.serve(server)
service.wait()

View File

@@ -1,81 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Cron script to generate usage notifications for volumes existing during
the audit period.
Together with the notifications generated by volumes
create/delete/resize, over that time period, this allows an external
system consuming usage notification feeds to calculate volume usage
for each tenant.
Time periods are specified as 'hour', 'month', 'day' or 'year'
hour = previous hour. If run at 9:07am, will generate usage for 8-9am.
month = previous month. If the script is run April 1, it will generate
usages for March 1 through March 31.
day = previous day. if run on July 4th, it generates usages for July 3rd.
year = previous year. If run on Jan 1, it generates usages for
Jan 1 through Dec 31 of the previous year.
"""
import datetime
import gettext
import os
import sys
import time
import traceback
# If ../nova/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'nova', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
gettext.install('nova', unicode=1)
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova.openstack.common import log as logging
from nova.openstack.common import rpc
from nova import utils
from nova.volume import utils as volume_utils
FLAGS = flags.FLAGS
if __name__ == '__main__':
admin_context = context.get_admin_context()
flags.FLAGS(sys.argv)
logging.setup("nova")
begin, end = utils.last_completed_audit_period()
print _("Starting volume usage audit")
print _("Creating usages for %s until %s") % (str(begin), str(end))
volumes = db.volume_get_active_by_window(admin_context,
begin,
end)
print _("Found %d volumes") % len(volumes)
for volume_ref in volumes:
try:
volume_utils.notify_usage_exists(
admin_context, volume_ref)
except Exception, e:
print traceback.format_exc(e)
print _("Volume usage audit completed")

View File

@@ -159,9 +159,6 @@ global_opts = [
cfg.StrOpt('scheduler_topic',
default='scheduler',
help='the topic scheduler nodes listen on'),
cfg.StrOpt('volume_topic',
default='volume',
help='the topic volume nodes listen on'),
cfg.StrOpt('network_topic',
default='network',
help='the topic network nodes listen on'),
@@ -169,7 +166,7 @@ global_opts = [
default=True,
help='whether to rate limit the api'),
cfg.ListOpt('enabled_apis',
default=['ec2', 'osapi_compute', 'osapi_volume', 'metadata'],
default=['ec2', 'osapi_compute', 'metadata'],
help='a list of APIs to enable by default'),
cfg.StrOpt('ec2_host',
default='$my_ip',
@@ -197,16 +194,6 @@ global_opts = [
'nova.api.openstack.compute.contrib.standard_extensions'
],
help='osapi compute extension to load'),
cfg.ListOpt('osapi_volume_ext_list',
default=[],
help='Specify list of extensions to load when using osapi_'
'volume_extension option with nova.api.openstack.'
'volume.contrib.select_extensions'),
cfg.MultiStrOpt('osapi_volume_extension',
default=[
'nova.api.openstack.volume.contrib.standard_extensions'
],
help='osapi volume extension to load'),
cfg.StrOpt('osapi_path',
default='/v1.1/',
help='the path prefix used to call the openstack api server'),
@@ -281,9 +268,6 @@ global_opts = [
cfg.StrOpt('network_manager',
default='nova.network.manager.VlanManager',
help='full class name for the Manager for network'),
cfg.StrOpt('volume_manager',
default='nova.volume.manager.VolumeManager',
help='full class name for the Manager for volume'),
cfg.StrOpt('scheduler_manager',
default='nova.scheduler.manager.SchedulerManager',
help='full class name for the Manager for scheduler'),
@@ -382,7 +366,7 @@ global_opts = [
default='nova.network.api.API',
help='The full class name of the network API class to use'),
cfg.StrOpt('volume_api_class',
default='nova.volume.api.API',
default='nova.volume.cinder.API',
help='The full class name of the volume API class to use'),
cfg.StrOpt('security_group_handler',
default='nova.network.sg.NullSecurityGroupHandler',

View File

@@ -26,7 +26,6 @@ flags.DECLARE('iscsi_num_targets', 'nova.volume.driver')
flags.DECLARE('network_size', 'nova.network.manager')
flags.DECLARE('num_networks', 'nova.network.manager')
flags.DECLARE('policy_file', 'nova.policy')
flags.DECLARE('volume_driver', 'nova.volume.manager')
def set_defaults(conf):
@@ -44,7 +43,6 @@ def set_defaults(conf):
conf.set_default('sqlite_synchronous', False)
conf.set_default('use_ipv6', True)
conf.set_default('verbose', True)
conf.set_default('volume_driver', 'nova.volume.driver.FakeISCSIDriver')
conf.set_default('api_paste_config', '$state_path/etc/nova/api-paste.ini')
conf.set_default('rpc_response_timeout', 5)
conf.set_default('rpc_cast_timeout', 5)

View File

@@ -1,121 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os.path
import shutil
import string
import tempfile
from nova import test
from nova.volume import iscsi
class TargetAdminTestCase(object):
def setUp(self):
self.cmds = []
self.tid = 1
self.target_name = 'iqn.2011-09.org.foo.bar:blaa'
self.lun = 10
self.path = '/foo'
self.vol_id = 'blaa'
self.script_template = None
self.stubs.Set(os.path, 'isfile', lambda _: True)
self.stubs.Set(os, 'unlink', lambda _: '')
self.stubs.Set(iscsi.TgtAdm, '_get_target', self.fake_get_target)
def fake_get_target(obj, iqn):
return 1
def get_script_params(self):
return {'tid': self.tid,
'target_name': self.target_name,
'lun': self.lun,
'path': self.path}
def get_script(self):
return self.script_template % self.get_script_params()
def fake_execute(self, *cmd, **kwargs):
self.cmds.append(string.join(cmd))
return "", None
def clear_cmds(self):
cmds = []
def verify_cmds(self, cmds):
self.assertEqual(len(cmds), len(self.cmds))
for a, b in zip(cmds, self.cmds):
self.assertEqual(a, b)
def verify(self):
script = self.get_script()
cmds = []
for line in script.split('\n'):
if not line.strip():
continue
cmds.append(line)
self.verify_cmds(cmds)
def run_commands(self):
tgtadm = iscsi.get_target_admin()
tgtadm.set_execute(self.fake_execute)
tgtadm.create_iscsi_target(self.target_name, self.tid,
self.lun, self.path)
tgtadm.show_target(self.tid, iqn=self.target_name)
tgtadm.remove_iscsi_target(self.tid, self.lun, self.vol_id)
def test_target_admin(self):
self.clear_cmds()
self.run_commands()
self.verify()
class TgtAdmTestCase(test.TestCase, TargetAdminTestCase):
def setUp(self):
super(TgtAdmTestCase, self).setUp()
TargetAdminTestCase.setUp(self)
self.persist_tempdir = tempfile.mkdtemp()
self.flags(iscsi_helper='tgtadm')
self.flags(volumes_dir=self.persist_tempdir)
self.script_template = "\n".join([
'tgt-admin --update iqn.2011-09.org.foo.bar:blaa',
'tgt-admin --delete iqn.2010-10.org.openstack:volume-blaa'])
def tearDown(self):
try:
shutil.rmtree(self.persist_tempdir)
except OSError:
pass
super(TgtAdmTestCase, self).tearDown()
class IetAdmTestCase(test.TestCase, TargetAdminTestCase):
def setUp(self):
super(IetAdmTestCase, self).setUp()
TargetAdminTestCase.setUp(self)
self.flags(iscsi_helper='ietadm')
self.script_template = "\n".join([
'ietadm --op new --tid=%(tid)s --params Name=%(target_name)s',
'ietadm --op new --tid=%(tid)s --lun=%(lun)s '
'--params Path=%(path)s,Type=fileio',
'ietadm --op show --tid=%(tid)s',
'ietadm --op delete --tid=%(tid)s --lun=%(lun)s',
'ietadm --op delete --tid=%(tid)s'])

View File

@@ -155,7 +155,6 @@ class LibvirtVolumeTestCase(test.TestCase):
}
def test_libvirt_volume_driver_serial(self):
vol_driver = volume_driver.VolumeDriver()
libvirt_driver = volume.LibvirtVolumeDriver(self.fake_conn)
name = 'volume-00000001'
vol = {'id': 1, 'name': name}

File diff suppressed because it is too large Load Diff

View File

@@ -1,234 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for the NetApp-specific NFS driver module (netapp_nfs)"""
from nova import context
from nova import exception
from nova import test
from nova.volume import netapp
from nova.volume import netapp_nfs
from nova.volume import nfs
from mox import IgnoreArg
from mox import IsA
from mox import MockObject
import mox
import suds
import types
class FakeVolume(object):
def __init__(self, size=0):
self.size = size
self.id = hash(self)
self.name = None
def __getitem__(self, key):
return self.__dict__[key]
class FakeSnapshot(object):
def __init__(self, volume_size=0):
self.volume_name = None
self.name = None
self.volume_id = None
self.volume_size = volume_size
self.user_id = None
self.status = None
def __getitem__(self, key):
return self.__dict__[key]
class FakeResponce(object):
def __init__(self, status):
"""
:param status: Either 'failed' or 'passed'
"""
self.Status = status
if status == 'failed':
self.Reason = 'Sample error'
class NetappNfsDriverTestCase(test.TestCase):
"""Test case for NetApp specific NFS clone driver"""
def setUp(self):
self._driver = netapp_nfs.NetAppNFSDriver()
super(NetappNfsDriverTestCase, self).setUp()
def test_check_for_setup_error(self):
mox = self.mox
drv = self._driver
# check exception raises when flags are not set
self.assertRaises(exception.NovaException,
drv.check_for_setup_error)
# set required flags
self.flags(netapp_wsdl_url='val',
netapp_login='val',
netapp_password='val',
netapp_server_hostname='val',
netapp_server_port='val')
mox.StubOutWithMock(nfs.NfsDriver, 'check_for_setup_error')
nfs.NfsDriver.check_for_setup_error()
mox.ReplayAll()
drv.check_for_setup_error()
def test_do_setup(self):
mox = self.mox
drv = self._driver
mox.StubOutWithMock(drv, 'check_for_setup_error')
mox.StubOutWithMock(netapp_nfs.NetAppNFSDriver, '_get_client')
drv.check_for_setup_error()
netapp_nfs.NetAppNFSDriver._get_client()
mox.ReplayAll()
drv.do_setup(IsA(context.RequestContext))
def test_create_snapshot(self):
"""Test snapshot can be created and deleted"""
mox = self.mox
drv = self._driver
mox.StubOutWithMock(drv, '_clone_volume')
drv._clone_volume(IgnoreArg(), IgnoreArg(), IgnoreArg())
mox.ReplayAll()
drv.create_snapshot(FakeSnapshot())
def test_create_volume_from_snapshot(self):
"""Tests volume creation from snapshot"""
drv = self._driver
mox = self.mox
volume = FakeVolume(1)
snapshot = FakeSnapshot(2)
self.assertRaises(exception.NovaException,
drv.create_volume_from_snapshot,
volume,
snapshot)
snapshot = FakeSnapshot(1)
location = '127.0.0.1:/nfs'
expected_result = {'provider_location': location}
mox.StubOutWithMock(drv, '_clone_volume')
mox.StubOutWithMock(drv, '_get_volume_location')
drv._clone_volume(IgnoreArg(), IgnoreArg(), IgnoreArg())
drv._get_volume_location(IgnoreArg()).AndReturn(location)
mox.ReplayAll()
loc = drv.create_volume_from_snapshot(volume, snapshot)
self.assertEquals(loc, expected_result)
def _prepare_delete_snapshot_mock(self, snapshot_exists):
drv = self._driver
mox = self.mox
mox.StubOutWithMock(drv, '_get_provider_location')
mox.StubOutWithMock(drv, '_volume_not_present')
if snapshot_exists:
mox.StubOutWithMock(drv, '_execute')
mox.StubOutWithMock(drv, '_get_volume_path')
drv._get_provider_location(IgnoreArg())
drv._volume_not_present(IgnoreArg(), IgnoreArg())\
.AndReturn(not snapshot_exists)
if snapshot_exists:
drv._get_volume_path(IgnoreArg(), IgnoreArg())
drv._execute('rm', None, run_as_root=True)
mox.ReplayAll()
return mox
def test_delete_existing_snapshot(self):
drv = self._driver
self._prepare_delete_snapshot_mock(True)
drv.delete_snapshot(FakeSnapshot())
def test_delete_missing_snapshot(self):
drv = self._driver
self._prepare_delete_snapshot_mock(False)
drv.delete_snapshot(FakeSnapshot())
def _prepare_clone_mock(self, status):
drv = self._driver
mox = self.mox
volume = FakeVolume()
setattr(volume, 'provider_location', '127.0.0.1:/nfs')
drv._client = MockObject(suds.client.Client)
drv._client.factory = MockObject(suds.client.Factory)
drv._client.service = MockObject(suds.client.ServiceSelector)
# ApiProxy() method is generated by ServiceSelector at runtime from the
# XML, so mocking is impossible.
setattr(drv._client.service,
'ApiProxy',
types.MethodType(lambda *args, **kwargs: FakeResponce(status),
suds.client.ServiceSelector))
mox.StubOutWithMock(drv, '_get_host_id')
mox.StubOutWithMock(drv, '_get_full_export_path')
drv._get_host_id(IgnoreArg()).AndReturn('10')
drv._get_full_export_path(IgnoreArg(), IgnoreArg()).AndReturn('/nfs')
return mox
def test_successfull_clone_volume(self):
drv = self._driver
mox = self._prepare_clone_mock('passed')
mox.ReplayAll()
volume_name = 'volume_name'
clone_name = 'clone_name'
volume_id = volume_name + str(hash(volume_name))
drv._clone_volume(volume_name, clone_name, volume_id)
def test_failed_clone_volume(self):
drv = self._driver
mox = self._prepare_clone_mock('failed')
mox.ReplayAll()
volume_name = 'volume_name'
clone_name = 'clone_name'
volume_id = volume_name + str(hash(volume_name))
self.assertRaises(exception.NovaException,
drv._clone_volume,
volume_name, clone_name, volume_id)

View File

@@ -1,278 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2011 Nexenta Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for OpenStack Nova volume driver
"""
import base64
import urllib2
import nova.flags
import nova.test
from nova.volume import nexenta
from nova.volume.nexenta import jsonrpc
from nova.volume.nexenta import volume
FLAGS = nova.flags.FLAGS
class TestNexentaDriver(nova.test.TestCase):
TEST_VOLUME_NAME = 'volume1'
TEST_VOLUME_NAME2 = 'volume2'
TEST_SNAPSHOT_NAME = 'snapshot1'
TEST_VOLUME_REF = {
'name': TEST_VOLUME_NAME,
'size': 1,
}
TEST_VOLUME_REF2 = {
'name': TEST_VOLUME_NAME2,
'size': 1,
}
TEST_SNAPSHOT_REF = {
'name': TEST_SNAPSHOT_NAME,
'volume_name': TEST_VOLUME_NAME,
}
def setUp(self):
super(TestNexentaDriver, self).setUp()
self.flags(
nexenta_host='1.1.1.1',
nexenta_volume='nova',
nexenta_target_prefix='iqn:',
nexenta_target_group_prefix='nova/',
nexenta_blocksize='8K',
nexenta_sparse=True,
)
self.nms_mock = self.mox.CreateMockAnything()
for mod in ['volume', 'zvol', 'iscsitarget',
'stmf', 'scsidisk', 'snapshot']:
setattr(self.nms_mock, mod, self.mox.CreateMockAnything())
self.stubs.Set(jsonrpc, 'NexentaJSONProxy',
lambda *_, **__: self.nms_mock)
self.drv = volume.NexentaDriver()
self.drv.do_setup({})
def test_setup_error(self):
self.nms_mock.volume.object_exists('nova').AndReturn(True)
self.mox.ReplayAll()
self.drv.check_for_setup_error()
def test_setup_error_fail(self):
self.nms_mock.volume.object_exists('nova').AndReturn(False)
self.mox.ReplayAll()
self.assertRaises(LookupError, self.drv.check_for_setup_error)
def test_local_path(self):
self.assertRaises(NotImplementedError, self.drv.local_path, '')
def test_create_volume(self):
self.nms_mock.zvol.create('nova/volume1', '1G', '8K', True)
self.mox.ReplayAll()
self.drv.create_volume(self.TEST_VOLUME_REF)
def test_delete_volume(self):
self.nms_mock.zvol.destroy('nova/volume1', '')
self.mox.ReplayAll()
self.drv.delete_volume(self.TEST_VOLUME_REF)
def test_create_snapshot(self):
self.nms_mock.zvol.create_snapshot('nova/volume1', 'snapshot1', '')
self.mox.ReplayAll()
self.drv.create_snapshot(self.TEST_SNAPSHOT_REF)
def test_create_volume_from_snapshot(self):
self.nms_mock.zvol.clone('nova/volume1@snapshot1', 'nova/volume2')
self.mox.ReplayAll()
self.drv.create_volume_from_snapshot(self.TEST_VOLUME_REF2,
self.TEST_SNAPSHOT_REF)
def test_delete_snapshot(self):
self.nms_mock.snapshot.destroy('nova/volume1@snapshot1', '')
self.mox.ReplayAll()
self.drv.delete_snapshot(self.TEST_SNAPSHOT_REF)
_CREATE_EXPORT_METHODS = [
('iscsitarget', 'create_target', ({'target_name': 'iqn:volume1'},),
u'Unable to create iscsi target\n'
u' iSCSI target iqn.1986-03.com.sun:02:nova-volume1 already'
u' configured\n'
u' itadm create-target failed with error 17\n',
),
('stmf', 'create_targetgroup', ('nova/volume1',),
u'Unable to create targetgroup: stmfadm: nova/volume1:'
u' already exists\n',
),
('stmf', 'add_targetgroup_member', ('nova/volume1', 'iqn:volume1'),
u'Unable to add member to targetgroup: stmfadm:'
u' iqn.1986-03.com.sun:02:nova-volume1: already exists\n',
),
('scsidisk', 'create_lu', ('nova/volume1', {}),
u"Unable to create lu with zvol 'nova/volume1':\n"
u" sbdadm: filename /dev/zvol/rdsk/nova/volume1: in use\n",
),
('scsidisk', 'add_lun_mapping_entry', ('nova/volume1', {
'target_group': 'nova/volume1', 'lun': '0'}),
u"Unable to add view to zvol 'nova/volume1' (LUNs in use: ):\n"
u" stmfadm: view entry exists\n",
),
]
def _stub_export_method(self, module, method, args, error, fail=False):
m = getattr(self.nms_mock, module)
m = getattr(m, method)
mock = m(*args)
if fail:
mock.AndRaise(nexenta.NexentaException(error))
def _stub_all_export_methods(self, fail=False):
for params in self._CREATE_EXPORT_METHODS:
self._stub_export_method(*params, fail=fail)
def test_create_export(self):
self._stub_all_export_methods()
self.mox.ReplayAll()
retval = self.drv.create_export({}, self.TEST_VOLUME_REF)
self.assertEquals(retval,
{'provider_location':
'%s:%s,1 %s%s' % (FLAGS.nexenta_host,
FLAGS.nexenta_iscsi_target_portal_port,
FLAGS.nexenta_target_prefix,
self.TEST_VOLUME_NAME)})
def __get_test(i):
def _test_create_export_fail(self):
for params in self._CREATE_EXPORT_METHODS[:i]:
self._stub_export_method(*params)
self._stub_export_method(*self._CREATE_EXPORT_METHODS[i],
fail=True)
self.mox.ReplayAll()
self.assertRaises(nexenta.NexentaException,
self.drv.create_export, {}, self.TEST_VOLUME_REF)
return _test_create_export_fail
for i in range(len(_CREATE_EXPORT_METHODS)):
locals()['test_create_export_fail_%d' % i] = __get_test(i)
def test_ensure_export(self):
self._stub_all_export_methods(fail=True)
self.mox.ReplayAll()
self.drv.ensure_export({}, self.TEST_VOLUME_REF)
def test_remove_export(self):
self.nms_mock.scsidisk.delete_lu('nova/volume1')
self.nms_mock.stmf.destroy_targetgroup('nova/volume1')
self.nms_mock.iscsitarget.delete_target('iqn:volume1')
self.mox.ReplayAll()
self.drv.remove_export({}, self.TEST_VOLUME_REF)
def test_remove_export_fail_0(self):
self.nms_mock.scsidisk.delete_lu('nova/volume1')
self.nms_mock.stmf.destroy_targetgroup('nova/volume1').AndRaise(
nexenta.NexentaException())
self.nms_mock.iscsitarget.delete_target('iqn:volume1')
self.mox.ReplayAll()
self.drv.remove_export({}, self.TEST_VOLUME_REF)
def test_remove_export_fail_1(self):
self.nms_mock.scsidisk.delete_lu('nova/volume1')
self.nms_mock.stmf.destroy_targetgroup('nova/volume1')
self.nms_mock.iscsitarget.delete_target('iqn:volume1').AndRaise(
nexenta.NexentaException())
self.mox.ReplayAll()
self.drv.remove_export({}, self.TEST_VOLUME_REF)
class TestNexentaJSONRPC(nova.test.TestCase):
URL = 'http://example.com/'
URL_S = 'https://example.com/'
USER = 'user'
PASSWORD = 'password'
HEADERS = {'Authorization': 'Basic %s' % (base64.b64encode(
':'.join((USER, PASSWORD))),),
'Content-Type': 'application/json'}
REQUEST = 'the request'
def setUp(self):
super(TestNexentaJSONRPC, self).setUp()
self.proxy = jsonrpc.NexentaJSONProxy(
self.URL, self.USER, self.PASSWORD, auto=True)
self.mox.StubOutWithMock(urllib2, 'Request', True)
self.mox.StubOutWithMock(urllib2, 'urlopen')
self.resp_mock = self.mox.CreateMockAnything()
self.resp_info_mock = self.mox.CreateMockAnything()
self.resp_mock.info().AndReturn(self.resp_info_mock)
urllib2.urlopen(self.REQUEST).AndReturn(self.resp_mock)
def test_call(self):
urllib2.Request(self.URL,
'{"object": null, "params": ["arg1", "arg2"], "method": null}',
self.HEADERS).AndReturn(self.REQUEST)
self.resp_info_mock.status = ''
self.resp_mock.read().AndReturn(
'{"error": null, "result": "the result"}')
self.mox.ReplayAll()
result = self.proxy('arg1', 'arg2')
self.assertEquals("the result", result)
def test_call_deep(self):
urllib2.Request(self.URL,
'{"object": "obj1.subobj", "params": ["arg1", "arg2"],'
' "method": "meth"}',
self.HEADERS).AndReturn(self.REQUEST)
self.resp_info_mock.status = ''
self.resp_mock.read().AndReturn(
'{"error": null, "result": "the result"}')
self.mox.ReplayAll()
result = self.proxy.obj1.subobj.meth('arg1', 'arg2')
self.assertEquals("the result", result)
def test_call_auto(self):
urllib2.Request(self.URL,
'{"object": null, "params": ["arg1", "arg2"], "method": null}',
self.HEADERS).AndReturn(self.REQUEST)
urllib2.Request(self.URL_S,
'{"object": null, "params": ["arg1", "arg2"], "method": null}',
self.HEADERS).AndReturn(self.REQUEST)
self.resp_info_mock.status = 'EOF in headers'
self.resp_mock.read().AndReturn(
'{"error": null, "result": "the result"}')
urllib2.urlopen(self.REQUEST).AndReturn(self.resp_mock)
self.mox.ReplayAll()
result = self.proxy('arg1', 'arg2')
self.assertEquals("the result", result)
def test_call_error(self):
urllib2.Request(self.URL,
'{"object": null, "params": ["arg1", "arg2"], "method": null}',
self.HEADERS).AndReturn(self.REQUEST)
self.resp_info_mock.status = ''
self.resp_mock.read().AndReturn(
'{"error": {"message": "the error"}, "result": "the result"}')
self.mox.ReplayAll()
self.assertRaises(jsonrpc.NexentaJSONException,
self.proxy, 'arg1', 'arg2')
def test_call_fail(self):
urllib2.Request(self.URL,
'{"object": null, "params": ["arg1", "arg2"], "method": null}',
self.HEADERS).AndReturn(self.REQUEST)
self.resp_info_mock.status = 'EOF in headers'
self.proxy.auto = False
self.mox.ReplayAll()
self.assertRaises(jsonrpc.NexentaJSONException,
self.proxy, 'arg1', 'arg2')

View File

@@ -1,569 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for the NFS driver module"""
import __builtin__
import errno
import os
import mox as mox_lib
from mox import IgnoreArg
from mox import IsA
from mox import stubout
from nova import context
from nova import exception
from nova.exception import ProcessExecutionError
from nova import test
from nova.volume import nfs
class DumbVolume(object):
fields = {}
def __setitem__(self, key, value):
self.fields[key] = value
def __getitem__(self, item):
return self.fields[item]
class NfsDriverTestCase(test.TestCase):
"""Test case for NFS driver"""
TEST_NFS_EXPORT1 = 'nfs-host1:/export'
TEST_NFS_EXPORT2 = 'nfs-host2:/export'
TEST_SIZE_IN_GB = 1
TEST_MNT_POINT = '/mnt/nfs'
TEST_MNT_POINT_BASE = '/mnt/test'
TEST_LOCAL_PATH = '/mnt/nfs/volume-123'
TEST_FILE_NAME = 'test.txt'
TEST_SHARES_CONFIG_FILE = '/etc/cinder/test-shares.conf'
ONE_GB_IN_BYTES = 1024 * 1024 * 1024
def setUp(self):
self._driver = nfs.NfsDriver()
super(NfsDriverTestCase, self).setUp()
def stub_out_not_replaying(self, obj, attr_name):
attr_to_replace = getattr(obj, attr_name)
stub = mox_lib.MockObject(attr_to_replace)
self.stubs.Set(obj, attr_name, stub)
def test_path_exists_should_return_true(self):
"""_path_exists should return True if stat returns 0"""
mox = self.mox
drv = self._driver
mox.StubOutWithMock(drv, '_execute')
drv._execute('stat', self.TEST_FILE_NAME, run_as_root=True)
mox.ReplayAll()
self.assertTrue(drv._path_exists(self.TEST_FILE_NAME))
def test_path_exists_should_return_false(self):
"""_path_exists should return True if stat doesn't return 0"""
mox = self.mox
drv = self._driver
mox.StubOutWithMock(drv, '_execute')
drv._execute('stat', self.TEST_FILE_NAME, run_as_root=True).\
AndRaise(ProcessExecutionError(
stderr="stat: cannot stat `test.txt': No such file or directory"))
mox.ReplayAll()
self.assertFalse(drv._path_exists(self.TEST_FILE_NAME))
def test_local_path(self):
"""local_path common use case"""
self.flags(nfs_mount_point_base=self.TEST_MNT_POINT_BASE)
drv = self._driver
volume = DumbVolume()
volume['provider_location'] = self.TEST_NFS_EXPORT1
volume['name'] = 'volume-123'
self.assertEqual(
'/mnt/test/2f4f60214cf43c595666dd815f0360a4/volume-123',
drv.local_path(volume))
def test_mount_nfs_should_mount_correctly(self):
"""_mount_nfs common case usage"""
mox = self.mox
drv = self._driver
mox.StubOutWithMock(drv, '_path_exists')
drv._path_exists(self.TEST_MNT_POINT).AndReturn(True)
mox.StubOutWithMock(drv, '_execute')
drv._execute('mount', '-t', 'nfs', self.TEST_NFS_EXPORT1,
self.TEST_MNT_POINT, run_as_root=True)
mox.ReplayAll()
drv._mount_nfs(self.TEST_NFS_EXPORT1, self.TEST_MNT_POINT)
def test_mount_nfs_should_suppress_already_mounted_error(self):
"""_mount_nfs should suppress already mounted error if ensure=True
"""
mox = self.mox
drv = self._driver
mox.StubOutWithMock(drv, '_path_exists')
drv._path_exists(self.TEST_MNT_POINT).AndReturn(True)
mox.StubOutWithMock(drv, '_execute')
drv._execute('mount', '-t', 'nfs', self.TEST_NFS_EXPORT1,
self.TEST_MNT_POINT, run_as_root=True).\
AndRaise(ProcessExecutionError(
stderr='is busy or already mounted'))
mox.ReplayAll()
drv._mount_nfs(self.TEST_NFS_EXPORT1, self.TEST_MNT_POINT, ensure=True)
def test_mount_nfs_should_reraise_already_mounted_error(self):
"""_mount_nfs should not suppress already mounted error if ensure=False
"""
mox = self.mox
drv = self._driver
mox.StubOutWithMock(drv, '_path_exists')
drv._path_exists(self.TEST_MNT_POINT).AndReturn(True)
mox.StubOutWithMock(drv, '_execute')
drv._execute('mount', '-t', 'nfs', self.TEST_NFS_EXPORT1,
self.TEST_MNT_POINT, run_as_root=True).\
AndRaise(ProcessExecutionError(stderr='is busy or already mounted'))
mox.ReplayAll()
self.assertRaises(ProcessExecutionError, drv._mount_nfs,
self.TEST_NFS_EXPORT1, self.TEST_MNT_POINT,
ensure=False)
def test_mount_nfs_should_create_mountpoint_if_not_yet(self):
"""_mount_nfs should create mountpoint if it doesn't exist"""
mox = self.mox
drv = self._driver
mox.StubOutWithMock(drv, '_path_exists')
drv._path_exists(self.TEST_MNT_POINT).AndReturn(False)
mox.StubOutWithMock(drv, '_execute')
drv._execute('mkdir', '-p', self.TEST_MNT_POINT)
drv._execute(*([IgnoreArg()] * 5), run_as_root=IgnoreArg())
mox.ReplayAll()
drv._mount_nfs(self.TEST_NFS_EXPORT1, self.TEST_MNT_POINT)
def test_mount_nfs_should_not_create_mountpoint_if_already(self):
"""_mount_nfs should not create mountpoint if it already exists"""
mox = self.mox
drv = self._driver
mox.StubOutWithMock(drv, '_path_exists')
drv._path_exists(self.TEST_MNT_POINT).AndReturn(True)
mox.StubOutWithMock(drv, '_execute')
drv._execute(*([IgnoreArg()] * 5), run_as_root=IgnoreArg())
mox.ReplayAll()
drv._mount_nfs(self.TEST_NFS_EXPORT1, self.TEST_MNT_POINT)
def test_get_hash_str(self):
"""_get_hash_str should calculation correct value"""
drv = self._driver
self.assertEqual('2f4f60214cf43c595666dd815f0360a4',
drv._get_hash_str(self.TEST_NFS_EXPORT1))
def test_get_mount_point_for_share(self):
"""_get_mount_point_for_share should calculate correct value"""
drv = self._driver
self.flags(nfs_mount_point_base=self.TEST_MNT_POINT_BASE)
self.assertEqual('/mnt/test/2f4f60214cf43c595666dd815f0360a4',
drv._get_mount_point_for_share(self.TEST_NFS_EXPORT1))
def test_get_available_capacity_with_df(self):
"""_get_available_capacity should calculate correct value"""
mox = self.mox
drv = self._driver
df_avail = 1490560
df_head = 'Filesystem 1K-blocks Used Available Use% Mounted on\n'
df_data = 'nfs-host:/export 2620544 996864 %d 41%% /mnt' % df_avail
df_output = df_head + df_data
self.flags(nfs_disk_util='df')
mox.StubOutWithMock(drv, '_get_mount_point_for_share')
drv._get_mount_point_for_share(self.TEST_NFS_EXPORT1).\
AndReturn(self.TEST_MNT_POINT)
mox.StubOutWithMock(drv, '_execute')
drv._execute('df', '-P', '-B', '1', self.TEST_MNT_POINT,
run_as_root=True).AndReturn((df_output, None))
mox.ReplayAll()
self.assertEquals(df_avail,
drv._get_available_capacity(self.TEST_NFS_EXPORT1))
def test_get_available_capacity_with_du(self):
"""_get_available_capacity should calculate correct value"""
mox = self.mox
drv = self._driver
self.flags(nfs_disk_util='du')
df_total_size = 2620544
df_used_size = 996864
df_avail_size = 1490560
df_title = 'Filesystem 1-blocks Used Available Use% Mounted on\n'
df_mnt_data = 'nfs-host:/export %d %d %d 41%% /mnt' % (df_total_size,
df_used_size,
df_avail_size)
df_output = df_title + df_mnt_data
du_used = 490560
du_output = '%d /mnt' % du_used
mox.StubOutWithMock(drv, '_get_mount_point_for_share')
drv._get_mount_point_for_share(self.TEST_NFS_EXPORT1).\
AndReturn(self.TEST_MNT_POINT)
mox.StubOutWithMock(drv, '_execute')
drv._execute('df', '-P', '-B', '1', self.TEST_MNT_POINT,
run_as_root=True).\
AndReturn((df_output, None))
drv._execute('du', '-sb', '--apparent-size',
'--exclude', '*snapshot*',
self.TEST_MNT_POINT,
run_as_root=True).AndReturn((du_output, None))
mox.ReplayAll()
self.assertEquals(df_total_size - du_used,
drv._get_available_capacity(self.TEST_NFS_EXPORT1))
def test_load_shares_config(self):
mox = self.mox
drv = self._driver
self.flags(nfs_shares_config=self.TEST_SHARES_CONFIG_FILE)
mox.StubOutWithMock(__builtin__, 'open')
config_data = []
config_data.append(self.TEST_NFS_EXPORT1)
config_data.append('#' + self.TEST_NFS_EXPORT2)
config_data.append('')
__builtin__.open(self.TEST_SHARES_CONFIG_FILE).AndReturn(config_data)
mox.ReplayAll()
shares = drv._load_shares_config()
self.assertEqual([self.TEST_NFS_EXPORT1], shares)
def test_ensure_share_mounted(self):
"""_ensure_share_mounted simple use case"""
mox = self.mox
drv = self._driver
mox.StubOutWithMock(drv, '_get_mount_point_for_share')
drv._get_mount_point_for_share(self.TEST_NFS_EXPORT1).\
AndReturn(self.TEST_MNT_POINT)
mox.StubOutWithMock(drv, '_mount_nfs')
drv._mount_nfs(self.TEST_NFS_EXPORT1, self.TEST_MNT_POINT, ensure=True)
mox.ReplayAll()
drv._ensure_share_mounted(self.TEST_NFS_EXPORT1)
def test_ensure_shares_mounted_should_save_mounting_successfully(self):
"""_ensure_shares_mounted should save share if mounted with success"""
mox = self.mox
drv = self._driver
mox.StubOutWithMock(drv, '_load_shares_config')
drv._load_shares_config().AndReturn([self.TEST_NFS_EXPORT1])
mox.StubOutWithMock(drv, '_ensure_share_mounted')
drv._ensure_share_mounted(self.TEST_NFS_EXPORT1)
mox.ReplayAll()
drv._ensure_shares_mounted()
self.assertEqual(1, len(drv._mounted_shares))
self.assertEqual(self.TEST_NFS_EXPORT1, drv._mounted_shares[0])
def test_ensure_shares_mounted_should_not_save_mounting_with_error(self):
"""_ensure_shares_mounted should not save share if failed to mount"""
mox = self.mox
drv = self._driver
mox.StubOutWithMock(drv, '_load_shares_config')
drv._load_shares_config().AndReturn([self.TEST_NFS_EXPORT1])
mox.StubOutWithMock(drv, '_ensure_share_mounted')
drv._ensure_share_mounted(self.TEST_NFS_EXPORT1).AndRaise(Exception())
mox.ReplayAll()
drv._ensure_shares_mounted()
self.assertEqual(0, len(drv._mounted_shares))
def test_setup_should_throw_error_if_shares_config_not_configured(self):
"""do_setup should throw error if shares config is not configured """
drv = self._driver
self.flags(nfs_shares_config=self.TEST_SHARES_CONFIG_FILE)
self.assertRaises(exception.NfsException,
drv.do_setup, IsA(context.RequestContext))
def test_setup_should_throw_exception_if_nfs_client_is_not_installed(self):
"""do_setup should throw error if nfs client is not installed """
mox = self.mox
drv = self._driver
self.flags(nfs_shares_config=self.TEST_SHARES_CONFIG_FILE)
mox.StubOutWithMock(os.path, 'exists')
os.path.exists(self.TEST_SHARES_CONFIG_FILE).AndReturn(True)
mox.StubOutWithMock(drv, '_execute')
drv._execute('mount.nfs', check_exit_code=False).\
AndRaise(OSError(errno.ENOENT, 'No such file or directory'))
mox.ReplayAll()
self.assertRaises(exception.NfsException,
drv.do_setup, IsA(context.RequestContext))
def test_find_share_should_throw_error_if_there_is_no_mounted_shares(self):
"""_find_share should throw error if there is no mounted shares"""
drv = self._driver
drv._mounted_shares = []
self.assertRaises(exception.NotFound, drv._find_share,
self.TEST_SIZE_IN_GB)
def test_find_share(self):
"""_find_share simple use case"""
mox = self.mox
drv = self._driver
drv._mounted_shares = [self.TEST_NFS_EXPORT1, self.TEST_NFS_EXPORT2]
mox.StubOutWithMock(drv, '_get_available_capacity')
drv._get_available_capacity(self.TEST_NFS_EXPORT1).\
AndReturn(2 * self.ONE_GB_IN_BYTES)
drv._get_available_capacity(self.TEST_NFS_EXPORT2).\
AndReturn(3 * self.ONE_GB_IN_BYTES)
mox.ReplayAll()
self.assertEqual(self.TEST_NFS_EXPORT2,
drv._find_share(self.TEST_SIZE_IN_GB))
def test_find_share_should_throw_error_if_there_is_no_enough_place(self):
"""_find_share should throw error if there is no share to host vol"""
mox = self.mox
drv = self._driver
drv._mounted_shares = [self.TEST_NFS_EXPORT1, self.TEST_NFS_EXPORT2]
mox.StubOutWithMock(drv, '_get_available_capacity')
drv._get_available_capacity(self.TEST_NFS_EXPORT1).\
AndReturn(0)
drv._get_available_capacity(self.TEST_NFS_EXPORT2).\
AndReturn(0)
mox.ReplayAll()
self.assertRaises(exception.NfsNoSuitableShareFound, drv._find_share,
self.TEST_SIZE_IN_GB)
def _simple_volume(self):
volume = DumbVolume()
volume['provider_location'] = '127.0.0.1:/mnt'
volume['name'] = 'volume_name'
volume['size'] = 10
return volume
def test_create_sparsed_volume(self):
mox = self.mox
drv = self._driver
volume = self._simple_volume()
self.flags(nfs_sparsed_volumes=True)
mox.StubOutWithMock(drv, '_create_sparsed_file')
mox.StubOutWithMock(drv, '_set_rw_permissions_for_all')
drv._create_sparsed_file(IgnoreArg(), IgnoreArg())
drv._set_rw_permissions_for_all(IgnoreArg())
mox.ReplayAll()
drv._do_create_volume(volume)
def test_create_nonsparsed_volume(self):
mox = self.mox
drv = self._driver
volume = self._simple_volume()
self.flags(nfs_sparsed_volumes=False)
mox.StubOutWithMock(drv, '_create_regular_file')
mox.StubOutWithMock(drv, '_set_rw_permissions_for_all')
drv._create_regular_file(IgnoreArg(), IgnoreArg())
drv._set_rw_permissions_for_all(IgnoreArg())
mox.ReplayAll()
drv._do_create_volume(volume)
def test_create_volume_should_ensure_nfs_mounted(self):
"""create_volume should ensure shares provided in config are mounted"""
mox = self.mox
drv = self._driver
self.stub_out_not_replaying(nfs, 'LOG')
self.stub_out_not_replaying(drv, '_find_share')
self.stub_out_not_replaying(drv, '_do_create_volume')
mox.StubOutWithMock(drv, '_ensure_shares_mounted')
drv._ensure_shares_mounted()
mox.ReplayAll()
volume = DumbVolume()
volume['size'] = self.TEST_SIZE_IN_GB
drv.create_volume(volume)
def test_create_volume_should_return_provider_location(self):
"""create_volume should return provider_location with found share """
mox = self.mox
drv = self._driver
self.stub_out_not_replaying(nfs, 'LOG')
self.stub_out_not_replaying(drv, '_ensure_shares_mounted')
self.stub_out_not_replaying(drv, '_do_create_volume')
mox.StubOutWithMock(drv, '_find_share')
drv._find_share(self.TEST_SIZE_IN_GB).AndReturn(self.TEST_NFS_EXPORT1)
mox.ReplayAll()
volume = DumbVolume()
volume['size'] = self.TEST_SIZE_IN_GB
result = drv.create_volume(volume)
self.assertEqual(self.TEST_NFS_EXPORT1, result['provider_location'])
def test_delete_volume(self):
"""delete_volume simple test case"""
mox = self.mox
drv = self._driver
self.stub_out_not_replaying(drv, '_ensure_share_mounted')
volume = DumbVolume()
volume['name'] = 'volume-123'
volume['provider_location'] = self.TEST_NFS_EXPORT1
mox.StubOutWithMock(drv, 'local_path')
drv.local_path(volume).AndReturn(self.TEST_LOCAL_PATH)
mox.StubOutWithMock(drv, '_path_exists')
drv._path_exists(self.TEST_LOCAL_PATH).AndReturn(True)
mox.StubOutWithMock(drv, '_execute')
drv._execute('rm', '-f', self.TEST_LOCAL_PATH, run_as_root=True)
mox.ReplayAll()
drv.delete_volume(volume)
def test_delete_should_ensure_share_mounted(self):
"""delete_volume should ensure that corresponding share is mounted"""
mox = self.mox
drv = self._driver
self.stub_out_not_replaying(drv, '_execute')
volume = DumbVolume()
volume['name'] = 'volume-123'
volume['provider_location'] = self.TEST_NFS_EXPORT1
mox.StubOutWithMock(drv, '_ensure_share_mounted')
drv._ensure_share_mounted(self.TEST_NFS_EXPORT1)
mox.ReplayAll()
drv.delete_volume(volume)
def test_delete_should_not_delete_if_provider_location_not_provided(self):
"""delete_volume shouldn't try to delete if provider_location missed"""
mox = self.mox
drv = self._driver
self.stub_out_not_replaying(drv, '_ensure_share_mounted')
volume = DumbVolume()
volume['name'] = 'volume-123'
volume['provider_location'] = None
mox.StubOutWithMock(drv, '_execute')
mox.ReplayAll()
drv.delete_volume(volume)
def test_delete_should_not_delete_if_there_is_no_file(self):
"""delete_volume should not try to delete if file missed"""
mox = self.mox
drv = self._driver
self.stub_out_not_replaying(drv, '_ensure_share_mounted')
volume = DumbVolume()
volume['name'] = 'volume-123'
volume['provider_location'] = self.TEST_NFS_EXPORT1
mox.StubOutWithMock(drv, 'local_path')
drv.local_path(volume).AndReturn(self.TEST_LOCAL_PATH)
mox.StubOutWithMock(drv, '_path_exists')
drv._path_exists(self.TEST_LOCAL_PATH).AndReturn(False)
mox.StubOutWithMock(drv, '_execute')
mox.ReplayAll()
drv.delete_volume(volume)

View File

@@ -72,7 +72,6 @@ class APITestCase(test.TestCase):
# Marking out the default extension paths makes this test MUCH faster.
self.flags(osapi_compute_extension=[])
self.flags(osapi_volume_extension=[])
found = False
mgr = computeextensions.ExtensionManager()

View File

@@ -1,161 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Josh Durgin
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import db
from nova import exception
from nova.openstack.common import log as logging
from nova.openstack.common import timeutils
from nova import test
from nova.tests.image import fake as fake_image
from nova.tests.test_volume import DriverTestCase
from nova.volume.driver import RBDDriver
LOG = logging.getLogger(__name__)
class RBDTestCase(test.TestCase):
def setUp(self):
super(RBDTestCase, self).setUp()
def fake_execute(*args):
pass
self.driver = RBDDriver(execute=fake_execute)
def test_good_locations(self):
locations = [
'rbd://fsid/pool/image/snap',
'rbd://%2F/%2F/%2F/%2F',
]
map(self.driver._parse_location, locations)
def test_bad_locations(self):
locations = [
'rbd://image',
'http://path/to/somewhere/else',
'rbd://image/extra',
'rbd://image/',
'rbd://fsid/pool/image/',
'rbd://fsid/pool/image/snap/',
'rbd://///',
]
for loc in locations:
self.assertRaises(exception.ImageUnacceptable,
self.driver._parse_location,
loc)
self.assertFalse(self.driver._is_cloneable(loc))
def test_cloneable(self):
self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
location = 'rbd://abc/pool/image/snap'
self.assertTrue(self.driver._is_cloneable(location))
def test_uncloneable_different_fsid(self):
self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
location = 'rbd://def/pool/image/snap'
self.assertFalse(self.driver._is_cloneable(location))
def test_uncloneable_unreadable(self):
def fake_exc(*args):
raise exception.ProcessExecutionError()
self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
self.stubs.Set(self.driver, '_execute', fake_exc)
location = 'rbd://abc/pool/image/snap'
self.assertFalse(self.driver._is_cloneable(location))
class FakeRBDDriver(RBDDriver):
def _clone(self):
pass
def _resize(self):
pass
class ManagedRBDTestCase(DriverTestCase):
driver_name = "nova.tests.test_rbd.FakeRBDDriver"
def setUp(self):
super(ManagedRBDTestCase, self).setUp()
fake_image.stub_out_image_service(self.stubs)
def _clone_volume_from_image(self, expected_status,
clone_works=True):
"""Try to clone a volume from an image, and check the status
afterwards"""
def fake_clone_image(volume, image_location):
pass
def fake_clone_error(volume, image_location):
raise exception.NovaException()
self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: True)
if clone_works:
self.stubs.Set(self.volume.driver, 'clone_image', fake_clone_image)
else:
self.stubs.Set(self.volume.driver, 'clone_image', fake_clone_error)
image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
volume_id = 1
# creating volume testdata
db.volume_create(self.context, {'id': volume_id,
'updated_at': timeutils.utcnow(),
'display_description': 'Test Desc',
'size': 20,
'status': 'creating',
'instance_uuid': None,
'host': 'dummy'})
try:
if clone_works:
self.volume.create_volume(self.context,
volume_id,
image_id=image_id)
else:
self.assertRaises(exception.NovaException,
self.volume.create_volume,
self.context,
volume_id,
image_id=image_id)
volume = db.volume_get(self.context, volume_id)
self.assertEqual(volume['status'], expected_status)
finally:
# cleanup
db.volume_destroy(self.context, volume_id)
def test_clone_image_status_available(self):
"""Verify that before cloning, an image is in the available state."""
self._clone_volume_from_image('available', True)
def test_clone_image_status_error(self):
"""Verify that before cloning, an image is in the available state."""
self._clone_volume_from_image('error', False)
def test_clone_success(self):
self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: True)
self.stubs.Set(self.volume.driver, 'clone_image', lambda a, b: True)
image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
self.assertTrue(self.volume.driver.clone_image({}, image_id))
def test_clone_bad_image_id(self):
self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: True)
self.assertFalse(self.volume.driver.clone_image({}, None))
def test_clone_uncloneable(self):
self.stubs.Set(self.volume.driver, '_is_cloneable', lambda x: False)
self.assertFalse(self.volume.driver.clone_image({}, 'dne'))

View File

@@ -1,208 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import exception
from nova.openstack.common import log as logging
from nova import test
from nova.volume.solidfire import SolidFire
LOG = logging.getLogger(__name__)
class SolidFireVolumeTestCase(test.TestCase):
def fake_issue_api_request(obj, method, params):
if method is 'GetClusterInfo':
LOG.info('Called Fake GetClusterInfo...')
results = {'result': {'clusterInfo':
{'name': 'fake-cluster',
'mvip': '1.1.1.1',
'svip': '1.1.1.1',
'uniqueID': 'unqid',
'repCount': 2,
'attributes': {}}}}
return results
elif method is 'AddAccount':
LOG.info('Called Fake AddAccount...')
return {'result': {'accountID': 25}, 'id': 1}
elif method is 'GetAccountByName':
LOG.info('Called Fake GetAccountByName...')
results = {'result': {'account':
{'accountID': 25,
'username': params['username'],
'status': 'active',
'initiatorSecret': '123456789012',
'targetSecret': '123456789012',
'attributes': {},
'volumes': [6, 7, 20]}},
"id": 1}
return results
elif method is 'CreateVolume':
LOG.info('Called Fake CreateVolume...')
return {'result': {'volumeID': 5}, 'id': 1}
elif method is 'DeleteVolume':
LOG.info('Called Fake DeleteVolume...')
return {'result': {}, 'id': 1}
elif method is 'ListVolumesForAccount':
test_name = 'OS-VOLID-a720b3c0-d1f0-11e1-9b23-0800200c9a66'
LOG.info('Called Fake ListVolumesForAccount...')
result = {'result': {
'volumes': [{'volumeID': 5,
'name': test_name,
'accountID': 25,
'sliceCount': 1,
'totalSize': 1048576 * 1024,
'enable512e': True,
'access': "readWrite",
'status': "active",
'attributes':None,
'qos': None,
'iqn': test_name}]}}
return result
else:
LOG.error('Crap, unimplemented API call in Fake:%s' % method)
def fake_issue_api_request_no_volume(obj, method, params):
if method is 'ListVolumesForAccount':
LOG.info('Called Fake ListVolumesForAccount...')
return {'result': {'volumes': []}}
else:
return obj.fake_issue_api_request(method, params)
def fake_issue_api_request_fails(obj, method, params):
return {'error': {'code': 000,
'name': 'DummyError',
'message': 'This is a fake error response'},
'id': 1}
def fake_volume_get(obj, key, default=None):
return {'qos': 'fast'}
def test_create_volume(self):
self.stubs.Set(SolidFire, '_issue_api_request',
self.fake_issue_api_request)
testvol = {'project_id': 'testprjid',
'name': 'testvol',
'size': 1,
'id': 'a720b3c0-d1f0-11e1-9b23-0800200c9a66'}
sfv = SolidFire()
model_update = sfv.create_volume(testvol)
def test_create_volume_with_qos(self):
preset_qos = {}
preset_qos['qos'] = 'fast'
self.stubs.Set(SolidFire, '_issue_api_request',
self.fake_issue_api_request)
testvol = {'project_id': 'testprjid',
'name': 'testvol',
'size': 1,
'id': 'a720b3c0-d1f0-11e1-9b23-0800200c9a66',
'metadata': [preset_qos]}
sfv = SolidFire()
model_update = sfv.create_volume(testvol)
def test_create_volume_fails(self):
self.stubs.Set(SolidFire, '_issue_api_request',
self.fake_issue_api_request_fails)
testvol = {'project_id': 'testprjid',
'name': 'testvol',
'size': 1,
'id': 'a720b3c0-d1f0-11e1-9b23-0800200c9a66'}
sfv = SolidFire()
self.assertRaises(exception.SolidFireAPIDataException,
sfv.create_volume, testvol)
def test_create_sfaccount(self):
sfv = SolidFire()
self.stubs.Set(SolidFire, '_issue_api_request',
self.fake_issue_api_request)
account = sfv._create_sfaccount('project-id')
self.assertNotEqual(account, None)
def test_create_sfaccount_fails(self):
sfv = SolidFire()
self.stubs.Set(SolidFire, '_issue_api_request',
self.fake_issue_api_request_fails)
account = sfv._create_sfaccount('project-id')
self.assertEqual(account, None)
def test_get_sfaccount_by_name(self):
sfv = SolidFire()
self.stubs.Set(SolidFire, '_issue_api_request',
self.fake_issue_api_request)
account = sfv._get_sfaccount_by_name('some-name')
self.assertNotEqual(account, None)
def test_get_sfaccount_by_name_fails(self):
sfv = SolidFire()
self.stubs.Set(SolidFire, '_issue_api_request',
self.fake_issue_api_request_fails)
account = sfv._get_sfaccount_by_name('some-name')
self.assertEqual(account, None)
def test_delete_volume(self):
self.stubs.Set(SolidFire, '_issue_api_request',
self.fake_issue_api_request)
testvol = {'project_id': 'testprjid',
'name': 'test_volume',
'size': 1,
'id': 'a720b3c0-d1f0-11e1-9b23-0800200c9a66'}
sfv = SolidFire()
model_update = sfv.delete_volume(testvol)
def test_delete_volume_fails_no_volume(self):
self.stubs.Set(SolidFire, '_issue_api_request',
self.fake_issue_api_request_no_volume)
testvol = {'project_id': 'testprjid',
'name': 'no-name',
'size': 1,
'id': 'a720b3c0-d1f0-11e1-9b23-0800200c9a66'}
sfv = SolidFire()
self.assertRaises(exception.VolumeNotFound,
sfv.delete_volume, testvol)
def test_delete_volume_fails_account_lookup(self):
self.stubs.Set(SolidFire, '_issue_api_request',
self.fake_issue_api_request_fails)
testvol = {'project_id': 'testprjid',
'name': 'no-name',
'size': 1,
'id': 'a720b3c0-d1f0-11e1-9b23-0800200c9a66'}
sfv = SolidFire()
self.assertRaises(exception.SfAccountNotFound,
sfv.delete_volume,
testvol)
def test_get_cluster_info(self):
self.stubs.Set(SolidFire, '_issue_api_request',
self.fake_issue_api_request)
sfv = SolidFire()
sfv._get_cluster_info()
def test_get_cluster_info_fail(self):
self.stubs.Set(SolidFire, '_issue_api_request',
self.fake_issue_api_request_fails)
sfv = SolidFire()
self.assertRaises(exception.SolidFireAPIException,
sfv._get_cluster_info)

File diff suppressed because it is too large Load Diff

View File

@@ -1,931 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests for Volume Code.
"""
import cStringIO
import datetime
import mox
import os
import shutil
import tempfile
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova.openstack.common import importutils
from nova.openstack.common.notifier import api as notifier_api
from nova.openstack.common.notifier import test_notifier
from nova.openstack.common import rpc
import nova.policy
from nova import quota
from nova import test
from nova.tests.image import fake as fake_image
import nova.volume.api
from nova.volume import iscsi
QUOTAS = quota.QUOTAS
FLAGS = flags.FLAGS
class VolumeTestCase(test.TestCase):
"""Test Case for volumes."""
def setUp(self):
super(VolumeTestCase, self).setUp()
self.compute = importutils.import_object(FLAGS.compute_manager)
vol_tmpdir = tempfile.mkdtemp()
self.flags(compute_driver='nova.virt.fake.FakeDriver',
volumes_dir=vol_tmpdir,
notification_driver=[test_notifier.__name__])
self.stubs.Set(iscsi.TgtAdm, '_get_target', self.fake_get_target)
self.volume = importutils.import_object(FLAGS.volume_manager)
self.context = context.get_admin_context()
instance = db.instance_create(self.context, {})
self.instance_id = instance['id']
self.instance_uuid = instance['uuid']
test_notifier.NOTIFICATIONS = []
fake_image.stub_out_image_service(self.stubs)
def tearDown(self):
try:
shutil.rmtree(FLAGS.volumes_dir)
except OSError:
pass
db.instance_destroy(self.context, self.instance_uuid)
notifier_api._reset_drivers()
super(VolumeTestCase, self).tearDown()
def fake_get_target(obj, iqn):
return 1
@staticmethod
def _create_volume(size=0, snapshot_id=None, image_id=None, metadata=None):
"""Create a volume object."""
vol = {}
vol['size'] = size
vol['snapshot_id'] = snapshot_id
vol['image_id'] = image_id
vol['user_id'] = 'fake'
vol['project_id'] = 'fake'
vol['availability_zone'] = FLAGS.storage_availability_zone
vol['status'] = "creating"
vol['attach_status'] = "detached"
if metadata is not None:
vol['metadata'] = metadata
return db.volume_create(context.get_admin_context(), vol)
def test_ec2_uuid_mapping(self):
ec2_vol = db.ec2_volume_create(context.get_admin_context(),
'aaaaaaaa-bbbb-bbbb-bbbb-aaaaaaaaaaaa', 5)
self.assertEqual(5, ec2_vol['id'])
self.assertEqual('aaaaaaaa-bbbb-bbbb-bbbb-aaaaaaaaaaaa',
db.get_volume_uuid_by_ec2_id(context.get_admin_context(), 5))
ec2_vol = db.ec2_volume_create(context.get_admin_context(),
'aaaaaaaa-bbbb-bbbb-bbbb-aaaaaaaaaaaa', 1)
self.assertEqual(1, ec2_vol['id'])
ec2_vol = db.ec2_volume_create(context.get_admin_context(),
'aaaaaaaa-bbbb-bbbb-bbbb-aaaaaaaaazzz')
self.assertEqual(6, ec2_vol['id'])
def test_create_delete_volume(self):
"""Test volume can be created and deleted."""
# Need to stub out reserve, commit, and rollback
def fake_reserve(context, expire=None, **deltas):
return ["RESERVATION"]
def fake_commit(context, reservations):
pass
def fake_rollback(context, reservations):
pass
self.stubs.Set(QUOTAS, "reserve", fake_reserve)
self.stubs.Set(QUOTAS, "commit", fake_commit)
self.stubs.Set(QUOTAS, "rollback", fake_rollback)
volume = self._create_volume()
volume_id = volume['id']
self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
self.volume.create_volume(self.context, volume_id)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
volume_id).id)
self.volume.delete_volume(self.context, volume_id)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 4)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume_id)
def test_create_delete_volume_with_metadata(self):
"""Test volume can be created and deleted."""
test_meta = {'fake_key': 'fake_value'}
volume = self._create_volume('0', None, metadata=test_meta)
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
result_meta = {
volume.volume_metadata[0].key: volume.volume_metadata[0].value}
self.assertEqual(result_meta, test_meta)
self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume_id)
def _do_test_create_over_quota(self, resource, expected):
"""Test volume creation over quota."""
def fake_reserve(context, **deltas):
kwargs = dict(overs=[resource],
quotas=dict(gigabytes=1000, volumes=10),
usages=dict(gigabytes=dict(reserved=1, in_use=999),
volumes=dict(reserved=1, in_use=9)))
raise exception.OverQuota(**kwargs)
def fake_commit(context, reservations):
self.fail('should not commit over quota')
self.stubs.Set(QUOTAS, 'reserve', fake_reserve)
self.stubs.Set(QUOTAS, 'commit', fake_commit)
volume_api = nova.volume.api.API()
self.assertRaises(expected,
volume_api.create,
self.context,
2,
'name',
'description')
def test_create_volumes_over_quota(self):
self._do_test_create_over_quota('volumes',
exception.VolumeLimitExceeded)
def test_create_gigabytes_over_quota(self):
self._do_test_create_over_quota('gigabytes',
exception.VolumeSizeTooLarge)
def test_delete_busy_volume(self):
"""Test volume survives deletion if driver reports it as busy."""
volume = self._create_volume()
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
self.mox.StubOutWithMock(self.volume.driver, 'delete_volume')
self.volume.driver.delete_volume(mox.IgnoreArg()).AndRaise(
exception.VolumeIsBusy)
self.mox.ReplayAll()
res = self.volume.delete_volume(self.context, volume_id)
self.assertEqual(True, res)
volume_ref = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(volume_id, volume_ref.id)
self.assertEqual("available", volume_ref.status)
self.mox.UnsetStubs()
self.volume.delete_volume(self.context, volume_id)
def test_create_volume_from_snapshot(self):
"""Test volume can be created from a snapshot."""
volume_src = self._create_volume()
self.volume.create_volume(self.context, volume_src['id'])
snapshot_id = self._create_snapshot(volume_src['id'])
self.volume.create_snapshot(self.context, volume_src['id'],
snapshot_id)
volume_dst = self._create_volume(0, snapshot_id)
self.volume.create_volume(self.context, volume_dst['id'], snapshot_id)
self.assertEqual(volume_dst['id'],
db.volume_get(
context.get_admin_context(),
volume_dst['id']).id)
self.assertEqual(snapshot_id, db.volume_get(
context.get_admin_context(),
volume_dst['id']).snapshot_id)
self.volume.delete_volume(self.context, volume_dst['id'])
self.volume.delete_snapshot(self.context, snapshot_id)
self.volume.delete_volume(self.context, volume_src['id'])
def test_too_big_volume(self):
"""Ensure failure if a too large of a volume is requested."""
# FIXME(vish): validation needs to move into the data layer in
# volume_create
return True
try:
volume = self._create_volume('1001')
self.volume.create_volume(self.context, volume)
self.fail("Should have thrown TypeError")
except TypeError:
pass
def test_run_attach_detach_volume(self):
"""Make sure volume can be attached and detached from instance."""
inst = {}
inst['image_id'] = 1
inst['reservation_id'] = 'r-fakeres'
inst['launch_time'] = '10'
inst['user_id'] = 'fake'
inst['project_id'] = 'fake'
inst['instance_type_id'] = '2' # m1.tiny
inst['ami_launch_index'] = 0
instance = db.instance_create(self.context, {})
instance_id = instance['id']
instance_uuid = instance['uuid']
mountpoint = "/dev/sdf"
volume = self._create_volume()
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
if FLAGS.fake_tests:
db.volume_attached(self.context, volume_id, instance_uuid,
mountpoint)
else:
self.compute.attach_volume(self.context,
instance_uuid,
volume_id,
mountpoint)
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
self.assertEqual(vol['mountpoint'], mountpoint)
self.assertEqual(vol['instance_uuid'], instance_uuid)
self.assertNotEqual(vol['attach_time'], None)
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume_id)
if FLAGS.fake_tests:
db.volume_detached(self.context, volume_id)
else:
self.compute.detach_volume(self.context,
instance_uuid,
volume_id)
vol = db.volume_get(self.context, volume_id)
self.assertEqual(vol['status'], "available")
self.assertEqual(vol['attach_time'], None)
self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.VolumeNotFound,
db.volume_get,
self.context,
volume_id)
db.instance_destroy(self.context, instance_uuid)
def test_concurrent_volumes_get_different_targets(self):
"""Ensure multiple concurrent volumes get different targets."""
volume_ids = []
targets = []
def _check(volume_id):
"""Make sure targets aren't duplicated."""
volume_ids.append(volume_id)
admin_context = context.get_admin_context()
iscsi_target = db.volume_get_iscsi_target_num(admin_context,
volume_id)
self.assert_(iscsi_target not in targets)
targets.append(iscsi_target)
total_slots = FLAGS.iscsi_num_targets
for _index in xrange(total_slots):
self._create_volume()
for volume_id in volume_ids:
self.volume.delete_volume(self.context, volume_id)
def test_multi_node(self):
# TODO(termie): Figure out how to test with two nodes,
# each of them having a different FLAG for storage_node
# This will allow us to test cross-node interactions
pass
@staticmethod
def _create_snapshot(volume_id, size='0'):
"""Create a snapshot object."""
snap = {}
snap['volume_size'] = size
snap['user_id'] = 'fake'
snap['project_id'] = 'fake'
snap['volume_id'] = volume_id
snap['status'] = "creating"
return db.snapshot_create(context.get_admin_context(), snap)['id']
def test_create_delete_snapshot(self):
"""Test snapshot can be created and deleted."""
volume = self._create_volume()
self.volume.create_volume(self.context, volume['id'])
snapshot_id = self._create_snapshot(volume['id'])
self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
self.assertEqual(snapshot_id,
db.snapshot_get(context.get_admin_context(),
snapshot_id).id)
self.volume.delete_snapshot(self.context, snapshot_id)
self.assertRaises(exception.NotFound,
db.snapshot_get,
self.context,
snapshot_id)
self.volume.delete_volume(self.context, volume['id'])
def test_cant_delete_volume_in_use(self):
"""Test volume can't be deleted in invalid stats."""
# create a volume and assign to host
volume = self._create_volume()
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'in-use'
volume['host'] = 'fakehost'
volume_api = nova.volume.api.API()
# 'in-use' status raises InvalidVolume
self.assertRaises(exception.InvalidVolume,
volume_api.delete,
self.context,
volume)
# clean up
self.volume.delete_volume(self.context, volume['id'])
def test_force_delete_volume(self):
"""Test volume can be forced to delete."""
# create a volume and assign to host
volume = self._create_volume()
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'error_deleting'
volume['host'] = 'fakehost'
volume_api = nova.volume.api.API()
# 'error_deleting' volumes can't be deleted
self.assertRaises(exception.InvalidVolume,
volume_api.delete,
self.context,
volume)
# delete with force
volume_api.delete(self.context, volume, force=True)
# status is deleting
volume = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEquals(volume['status'], 'deleting')
# clean up
self.volume.delete_volume(self.context, volume['id'])
def test_cant_delete_volume_with_snapshots(self):
"""Test snapshot can be created and deleted."""
volume = self._create_volume()
self.volume.create_volume(self.context, volume['id'])
snapshot_id = self._create_snapshot(volume['id'])
self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
self.assertEqual(snapshot_id,
db.snapshot_get(context.get_admin_context(),
snapshot_id).id)
volume['status'] = 'available'
volume['host'] = 'fakehost'
volume_api = nova.volume.api.API()
self.assertRaises(exception.InvalidVolume,
volume_api.delete,
self.context,
volume)
self.volume.delete_snapshot(self.context, snapshot_id)
self.volume.delete_volume(self.context, volume['id'])
def test_can_delete_errored_snapshot(self):
"""Test snapshot can be created and deleted."""
volume = self._create_volume()
self.volume.create_volume(self.context, volume['id'])
snapshot_id = self._create_snapshot(volume['id'])
self.volume.create_snapshot(self.context, volume['id'], snapshot_id)
snapshot = db.snapshot_get(context.get_admin_context(),
snapshot_id)
volume_api = nova.volume.api.API()
snapshot['status'] = 'badstatus'
self.assertRaises(exception.InvalidVolume,
volume_api.delete_snapshot,
self.context,
snapshot)
snapshot['status'] = 'error'
self.volume.delete_snapshot(self.context, snapshot_id)
self.volume.delete_volume(self.context, volume['id'])
def test_create_snapshot_force(self):
"""Test snapshot in use can be created forcibly."""
def fake_cast(ctxt, topic, msg):
pass
self.stubs.Set(rpc, 'cast', fake_cast)
volume = self._create_volume()
self.volume.create_volume(self.context, volume['id'])
db.volume_attached(self.context, volume['id'], self.instance_uuid,
'/dev/sda1')
volume_api = nova.volume.api.API()
volume = volume_api.get(self.context, volume['id'])
self.assertRaises(exception.InvalidVolume,
volume_api.create_snapshot,
self.context, volume,
'fake_name', 'fake_description')
snapshot_ref = volume_api.create_snapshot_force(self.context,
volume,
'fake_name',
'fake_description')
db.snapshot_destroy(self.context, snapshot_ref['id'])
db.volume_destroy(self.context, volume['id'])
def test_delete_busy_snapshot(self):
"""Test snapshot can be created and deleted."""
volume = self._create_volume()
volume_id = volume['id']
self.volume.create_volume(self.context, volume_id)
snapshot_id = self._create_snapshot(volume_id)
self.volume.create_snapshot(self.context, volume_id, snapshot_id)
self.mox.StubOutWithMock(self.volume.driver, 'delete_snapshot')
self.volume.driver.delete_snapshot(mox.IgnoreArg()).AndRaise(
exception.SnapshotIsBusy)
self.mox.ReplayAll()
self.volume.delete_snapshot(self.context, snapshot_id)
snapshot_ref = db.snapshot_get(self.context, snapshot_id)
self.assertEqual(snapshot_id, snapshot_ref.id)
self.assertEqual("available", snapshot_ref.status)
self.mox.UnsetStubs()
self.volume.delete_snapshot(self.context, snapshot_id)
self.volume.delete_volume(self.context, volume_id)
def test_create_volume_usage_notification(self):
"""Ensure create volume generates appropriate usage notification"""
volume = self._create_volume()
volume_id = volume['id']
self.assertEquals(len(test_notifier.NOTIFICATIONS), 0)
self.volume.create_volume(self.context, volume_id)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 2)
msg = test_notifier.NOTIFICATIONS[0]
self.assertEquals(msg['event_type'], 'volume.create.start')
payload = msg['payload']
self.assertEquals(payload['status'], 'creating')
msg = test_notifier.NOTIFICATIONS[1]
self.assertEquals(msg['priority'], 'INFO')
self.assertEquals(msg['event_type'], 'volume.create.end')
payload = msg['payload']
self.assertEquals(payload['tenant_id'], volume['project_id'])
self.assertEquals(payload['user_id'], volume['user_id'])
self.assertEquals(payload['volume_id'], volume['id'])
self.assertEquals(payload['status'], 'available')
self.assertEquals(payload['size'], volume['size'])
self.assertTrue('display_name' in payload)
self.assertTrue('snapshot_id' in payload)
self.assertTrue('launched_at' in payload)
self.assertTrue('created_at' in payload)
self.volume.delete_volume(self.context, volume_id)
def _do_test_create_volume_with_size(self, size):
def fake_reserve(context, expire=None, **deltas):
return ["RESERVATION"]
def fake_commit(context, reservations):
pass
def fake_rollback(context, reservations):
pass
self.stubs.Set(QUOTAS, "reserve", fake_reserve)
self.stubs.Set(QUOTAS, "commit", fake_commit)
self.stubs.Set(QUOTAS, "rollback", fake_rollback)
volume_api = nova.volume.api.API()
volume = volume_api.create(self.context,
size,
'name',
'description')
self.assertEquals(volume['size'], int(size))
def test_create_volume_int_size(self):
"""Test volume creation with int size."""
self._do_test_create_volume_with_size(2)
def test_create_volume_string_size(self):
"""Test volume creation with string size."""
self._do_test_create_volume_with_size('2')
def test_create_volume_with_bad_size(self):
def fake_reserve(context, expire=None, **deltas):
return ["RESERVATION"]
def fake_commit(context, reservations):
pass
def fake_rollback(context, reservations):
pass
self.stubs.Set(QUOTAS, "reserve", fake_reserve)
self.stubs.Set(QUOTAS, "commit", fake_commit)
self.stubs.Set(QUOTAS, "rollback", fake_rollback)
volume_api = nova.volume.api.API()
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
'2Gb',
'name',
'description')
def test_begin_roll_detaching_volume(self):
"""Test begin_detaching and roll_detaching functions."""
volume = self._create_volume()
volume_api = nova.volume.api.API()
volume_api.begin_detaching(self.context, volume)
volume = db.volume_get(self.context, volume['id'])
self.assertEqual(volume['status'], "detaching")
volume_api.roll_detaching(self.context, volume)
volume = db.volume_get(self.context, volume['id'])
self.assertEqual(volume['status'], "in-use")
def _create_volume_from_image(self, expected_status,
fakeout_copy_image_to_volume=False):
"""Call copy image to volume, Test the status of volume after calling
copying image to volume."""
def fake_local_path(volume):
return dst_path
def fake_copy_image_to_volume(context, volume, image_id):
pass
dst_fd, dst_path = tempfile.mkstemp()
os.close(dst_fd)
self.stubs.Set(self.volume.driver, 'local_path', fake_local_path)
if fakeout_copy_image_to_volume:
self.stubs.Set(self.volume, '_copy_image_to_volume',
fake_copy_image_to_volume)
image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
volume_id = 1
# creating volume testdata
db.volume_create(self.context, {'id': volume_id,
'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'display_description': 'Test Desc',
'size': 20,
'status': 'creating',
'instance_uuid': None,
'host': 'dummy'})
try:
self.volume.create_volume(self.context,
volume_id,
image_id=image_id)
volume = db.volume_get(self.context, volume_id)
self.assertEqual(volume['status'], expected_status)
finally:
# cleanup
db.volume_destroy(self.context, volume_id)
os.unlink(dst_path)
def test_create_volume_from_image_status_downloading(self):
"""Verify that before copying image to volume, it is in downloading
state."""
self._create_volume_from_image('downloading', True)
def test_create_volume_from_image_status_available(self):
"""Verify that before copying image to volume, it is in available
state."""
self._create_volume_from_image('available')
def test_create_volume_from_image_exception(self):
"""Verify that create volume from image, the volume status is
'downloading'."""
dst_fd, dst_path = tempfile.mkstemp()
os.close(dst_fd)
self.stubs.Set(self.volume.driver, 'local_path', lambda x: dst_path)
image_id = 'aaaaaaaa-0000-0000-0000-000000000000'
# creating volume testdata
volume_id = 1
db.volume_create(self.context, {'id': volume_id,
'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'display_description': 'Test Desc',
'size': 20,
'status': 'creating',
'host': 'dummy'})
self.assertRaises(exception.ImageNotFound,
self.volume.create_volume,
self.context,
volume_id,
None,
image_id)
volume = db.volume_get(self.context, volume_id)
self.assertEqual(volume['status'], "error")
# cleanup
db.volume_destroy(self.context, volume_id)
os.unlink(dst_path)
def test_copy_volume_to_image_status_available(self):
dst_fd, dst_path = tempfile.mkstemp()
os.close(dst_fd)
def fake_local_path(volume):
return dst_path
self.stubs.Set(self.volume.driver, 'local_path', fake_local_path)
image_id = '70a599e0-31e7-49b7-b260-868f441e862b'
# creating volume testdata
volume_id = 1
db.volume_create(self.context, {'id': volume_id,
'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'display_description': 'Test Desc',
'size': 20,
'status': 'uploading',
'instance_uuid': None,
'host': 'dummy'})
try:
# start test
self.volume.copy_volume_to_image(self.context,
volume_id,
image_id)
volume = db.volume_get(self.context, volume_id)
self.assertEqual(volume['status'], 'available')
finally:
# cleanup
db.volume_destroy(self.context, volume_id)
os.unlink(dst_path)
def test_copy_volume_to_image_status_use(self):
dst_fd, dst_path = tempfile.mkstemp()
os.close(dst_fd)
def fake_local_path(volume):
return dst_path
self.stubs.Set(self.volume.driver, 'local_path', fake_local_path)
#image_id = '70a599e0-31e7-49b7-b260-868f441e862b'
image_id = 'a440c04b-79fa-479c-bed1-0b816eaec379'
# creating volume testdata
volume_id = 1
db.volume_create(self.context,
{'id': volume_id,
'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'display_description': 'Test Desc',
'size': 20,
'status': 'uploading',
'instance_uuid':
'b21f957d-a72f-4b93-b5a5-45b1161abb02',
'host': 'dummy'})
try:
# start test
self.volume.copy_volume_to_image(self.context,
volume_id,
image_id)
volume = db.volume_get(self.context, volume_id)
self.assertEqual(volume['status'], 'in-use')
finally:
# cleanup
db.volume_destroy(self.context, volume_id)
os.unlink(dst_path)
def test_copy_volume_to_image_exception(self):
dst_fd, dst_path = tempfile.mkstemp()
os.close(dst_fd)
def fake_local_path(volume):
return dst_path
self.stubs.Set(self.volume.driver, 'local_path', fake_local_path)
image_id = 'aaaaaaaa-0000-0000-0000-000000000000'
# creating volume testdata
volume_id = 1
db.volume_create(self.context, {'id': volume_id,
'updated_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'display_description': 'Test Desc',
'size': 20,
'status': 'in-use',
'host': 'dummy'})
try:
# start test
self.assertRaises(exception.ImageNotFound,
self.volume.copy_volume_to_image,
self.context,
volume_id,
image_id)
volume = db.volume_get(self.context, volume_id)
self.assertEqual(volume['status'], 'available')
finally:
# cleanup
db.volume_destroy(self.context, volume_id)
os.unlink(dst_path)
def test_create_volume_from_exact_sized_image(self):
"""Verify that an image which is exactly the same size as the
volume, will work correctly."""
class _FakeImageService:
def __init__(self, db_driver=None, image_service=None):
pass
def show(self, context, image_id):
return {'size': 2 * 1024 * 1024 * 1024}
image_id = '70a599e0-31e7-49b7-b260-868f441e862b'
try:
volume_id = None
volume_api = nova.volume.api.API(
image_service=_FakeImageService())
volume = volume_api.create(self.context, 2, 'name', 'description',
image_id=1)
volume_id = volume['id']
self.assertEqual(volume['status'], 'creating')
finally:
# cleanup
db.volume_destroy(self.context, volume_id)
def test_create_volume_from_oversized_image(self):
"""Verify that an image which is too big will fail correctly."""
class _FakeImageService:
def __init__(self, db_driver=None, image_service=None):
pass
def show(self, context, image_id):
return {'size': 2 * 1024 * 1024 * 1024 + 1}
image_id = '70a599e0-31e7-49b7-b260-868f441e862b'
volume_api = nova.volume.api.API(image_service=_FakeImageService())
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context, 2,
'name', 'description', image_id=1)
class DriverTestCase(test.TestCase):
"""Base Test class for Drivers."""
driver_name = "nova.volume.driver.FakeBaseDriver"
def setUp(self):
super(DriverTestCase, self).setUp()
vol_tmpdir = tempfile.mkdtemp()
self.flags(volume_driver=self.driver_name,
volumes_dir=vol_tmpdir)
self.volume = importutils.import_object(FLAGS.volume_manager)
self.context = context.get_admin_context()
self.output = ""
self.stubs.Set(iscsi.TgtAdm, '_get_target', self.fake_get_target)
def _fake_execute(_command, *_args, **_kwargs):
"""Fake _execute."""
return self.output, None
self.volume.driver.set_execute(_fake_execute)
instance = db.instance_create(self.context, {})
self.instance_id = instance['id']
self.instance_uuid = instance['uuid']
def tearDown(self):
try:
shutil.rmtree(FLAGS.volumes_dir)
except OSError:
pass
super(DriverTestCase, self).tearDown()
def fake_get_target(obj, iqn):
return 1
def _attach_volume(self):
"""Attach volumes to an instance."""
return []
def _detach_volume(self, volume_id_list):
"""Detach volumes from an instance."""
for volume_id in volume_id_list:
db.volume_detached(self.context, volume_id)
self.volume.delete_volume(self.context, volume_id)
class VolumeDriverTestCase(DriverTestCase):
"""Test case for VolumeDriver"""
driver_name = "nova.volume.driver.VolumeDriver"
def test_delete_busy_volume(self):
"""Test deleting a busy volume."""
self.stubs.Set(self.volume.driver, '_volume_not_present',
lambda x: False)
self.stubs.Set(self.volume.driver, '_delete_volume',
lambda x, y: False)
# Want DriverTestCase._fake_execute to return 'o' so that
# volume.driver.delete_volume() raises the VolumeIsBusy exception.
self.output = 'o'
self.assertRaises(exception.VolumeIsBusy,
self.volume.driver.delete_volume,
{'name': 'test1', 'size': 1024})
# when DriverTestCase._fake_execute returns something other than
# 'o' volume.driver.delete_volume() does not raise an exception.
self.output = 'x'
self.volume.driver.delete_volume({'name': 'test1', 'size': 1024})
class ISCSITestCase(DriverTestCase):
"""Test Case for ISCSIDriver"""
driver_name = "nova.volume.driver.ISCSIDriver"
def _attach_volume(self):
"""Attach volumes to an instance. """
volume_id_list = []
for index in xrange(3):
vol = {}
vol['size'] = 0
vol_ref = db.volume_create(self.context, vol)
self.volume.create_volume(self.context, vol_ref['id'])
vol_ref = db.volume_get(self.context, vol_ref['id'])
# each volume has a different mountpoint
mountpoint = "/dev/sd" + chr((ord('b') + index))
db.volume_attached(self.context, vol_ref['id'], self.instance_uuid,
mountpoint)
volume_id_list.append(vol_ref['id'])
return volume_id_list
def test_check_for_export_with_no_volume(self):
self.volume.check_for_export(self.context, self.instance_id)
class VolumePolicyTestCase(test.TestCase):
def setUp(self):
super(VolumePolicyTestCase, self).setUp()
nova.policy.reset()
nova.policy.init()
self.context = context.get_admin_context()
def tearDown(self):
super(VolumePolicyTestCase, self).tearDown()
nova.policy.reset()
def _set_rules(self, rules):
nova.common.policy.set_brain(nova.common.policy.HttpBrain(rules))
def test_check_policy(self):
self.mox.StubOutWithMock(nova.policy, 'enforce')
target = {
'project_id': self.context.project_id,
'user_id': self.context.user_id,
}
nova.policy.enforce(self.context, 'volume:attach', target)
self.mox.ReplayAll()
nova.volume.api.check_policy(self.context, 'attach')
def test_check_policy_with_target(self):
self.mox.StubOutWithMock(nova.policy, 'enforce')
target = {
'project_id': self.context.project_id,
'user_id': self.context.user_id,
'id': 2,
}
nova.policy.enforce(self.context, 'volume:attach', target)
self.mox.ReplayAll()
nova.volume.api.check_policy(self.context, 'attach', {'id': 2})

View File

@@ -1,167 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 Zadara Storage Inc.
# Copyright (c) 2011 OpenStack LLC.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for volume types code
"""
import time
from nova import context
from nova.db.sqlalchemy import models
from nova.db.sqlalchemy import session as sql_session
from nova import exception
from nova import flags
from nova.openstack.common import log as logging
from nova import test
from nova.volume import volume_types
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
class VolumeTypeTestCase(test.TestCase):
"""Test cases for volume type code"""
def setUp(self):
super(VolumeTypeTestCase, self).setUp()
self.ctxt = context.get_admin_context()
self.vol_type1_name = str(int(time.time()))
self.vol_type1_specs = dict(
type="physical drive",
drive_type="SAS",
size="300",
rpm="7200",
visible="True")
def test_volume_type_create_then_destroy(self):
"""Ensure volume types can be created and deleted"""
prev_all_vtypes = volume_types.get_all_types(self.ctxt)
volume_types.create(self.ctxt,
self.vol_type1_name,
self.vol_type1_specs)
new = volume_types.get_volume_type_by_name(self.ctxt,
self.vol_type1_name)
LOG.info(_("Given data: %s"), self.vol_type1_specs)
LOG.info(_("Result data: %s"), new)
for k, v in self.vol_type1_specs.iteritems():
self.assertEqual(v, new['extra_specs'][k],
'one of fields does not match')
new_all_vtypes = volume_types.get_all_types(self.ctxt)
self.assertEqual(len(prev_all_vtypes) + 1,
len(new_all_vtypes),
'drive type was not created')
volume_types.destroy(self.ctxt, self.vol_type1_name)
new_all_vtypes = volume_types.get_all_types(self.ctxt)
self.assertEqual(prev_all_vtypes,
new_all_vtypes,
'drive type was not deleted')
def test_get_all_volume_types(self):
"""Ensures that all volume types can be retrieved"""
session = sql_session.get_session()
total_volume_types = session.query(models.VolumeTypes).count()
vol_types = volume_types.get_all_types(self.ctxt)
self.assertEqual(total_volume_types, len(vol_types))
def test_non_existent_vol_type_shouldnt_delete(self):
"""Ensures that volume type creation fails with invalid args"""
self.assertRaises(exception.VolumeTypeNotFoundByName,
volume_types.destroy, self.ctxt, "sfsfsdfdfs")
def test_repeated_vol_types_shouldnt_raise(self):
"""Ensures that volume duplicates don't raise"""
new_name = self.vol_type1_name + "dup"
volume_types.create(self.ctxt, new_name)
volume_types.destroy(self.ctxt, new_name)
volume_types.create(self.ctxt, new_name)
def test_invalid_volume_types_params(self):
"""Ensures that volume type creation fails with invalid args"""
self.assertRaises(exception.InvalidVolumeType,
volume_types.destroy, self.ctxt, None)
self.assertRaises(exception.InvalidVolumeType,
volume_types.get_volume_type, self.ctxt, None)
self.assertRaises(exception.InvalidVolumeType,
volume_types.get_volume_type_by_name,
self.ctxt, None)
def test_volume_type_get_by_id_and_name(self):
"""Ensure volume types get returns same entry"""
volume_types.create(self.ctxt,
self.vol_type1_name,
self.vol_type1_specs)
new = volume_types.get_volume_type_by_name(self.ctxt,
self.vol_type1_name)
new2 = volume_types.get_volume_type(self.ctxt, new['id'])
self.assertEqual(new, new2)
def test_volume_type_search_by_extra_spec(self):
"""Ensure volume types get by extra spec returns correct type"""
volume_types.create(self.ctxt, "type1", {"key1": "val1",
"key2": "val2"})
volume_types.create(self.ctxt, "type2", {"key2": "val2",
"key3": "val3"})
volume_types.create(self.ctxt, "type3", {"key3": "another_value",
"key4": "val4"})
vol_types = volume_types.get_all_types(self.ctxt,
search_opts={'extra_specs': {"key1": "val1"}})
LOG.info("vol_types: %s" % vol_types)
self.assertEqual(len(vol_types), 1)
self.assertTrue("type1" in vol_types.keys())
self.assertEqual(vol_types['type1']['extra_specs'],
{"key1": "val1", "key2": "val2"})
vol_types = volume_types.get_all_types(self.ctxt,
search_opts={'extra_specs': {"key2": "val2"}})
LOG.info("vol_types: %s" % vol_types)
self.assertEqual(len(vol_types), 2)
self.assertTrue("type1" in vol_types.keys())
self.assertTrue("type2" in vol_types.keys())
vol_types = volume_types.get_all_types(self.ctxt,
search_opts={'extra_specs': {"key3": "val3"}})
LOG.info("vol_types: %s" % vol_types)
self.assertEqual(len(vol_types), 1)
self.assertTrue("type2" in vol_types.keys())
def test_volume_type_search_by_extra_spec_multiple(self):
"""Ensure volume types get by extra spec returns correct type"""
volume_types.create(self.ctxt, "type1", {"key1": "val1",
"key2": "val2",
"key3": "val3"})
volume_types.create(self.ctxt, "type2", {"key2": "val2",
"key3": "val3"})
volume_types.create(self.ctxt, "type3", {"key1": "val1",
"key3": "val3",
"key4": "val4"})
vol_types = volume_types.get_all_types(self.ctxt,
search_opts={'extra_specs': {"key1": "val1",
"key3": "val3"}})
LOG.info("vol_types: %s" % vol_types)
self.assertEqual(len(vol_types), 2)
self.assertTrue("type1" in vol_types.keys())
self.assertTrue("type3" in vol_types.keys())
self.assertEqual(vol_types['type1']['extra_specs'],
{"key1": "val1", "key2": "val2", "key3": "val3"})
self.assertEqual(vol_types['type3']['extra_specs'],
{"key1": "val1", "key3": "val3", "key4": "val4"})

View File

@@ -1,130 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 Zadara Storage Inc.
# Copyright (c) 2011 OpenStack LLC.
# Copyright 2011 University of Southern California
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for volume types extra specs code
"""
from nova import context
from nova import db
from nova import test
class VolumeTypeExtraSpecsTestCase(test.TestCase):
def setUp(self):
super(VolumeTypeExtraSpecsTestCase, self).setUp()
self.context = context.get_admin_context()
self.vol_type1 = dict(name="TEST: Regular volume test")
self.vol_type1_specs = dict(vol_extra1="value1",
vol_extra2="value2",
vol_extra3=3)
self.vol_type1['extra_specs'] = self.vol_type1_specs
ref = db.volume_type_create(self.context, self.vol_type1)
self.volume_type1_id = ref.id
for k, v in self.vol_type1_specs.iteritems():
self.vol_type1_specs[k] = str(v)
self.vol_type2_noextra = dict(name="TEST: Volume type without extra")
ref = db.volume_type_create(self.context, self.vol_type2_noextra)
self.vol_type2_id = ref.id
def tearDown(self):
# Remove the volume type from the database
db.volume_type_destroy(context.get_admin_context(),
self.vol_type1['name'])
db.volume_type_destroy(context.get_admin_context(),
self.vol_type2_noextra['name'])
super(VolumeTypeExtraSpecsTestCase, self).tearDown()
def test_volume_type_specs_get(self):
expected_specs = self.vol_type1_specs.copy()
actual_specs = db.volume_type_extra_specs_get(
context.get_admin_context(),
self.volume_type1_id)
self.assertEquals(expected_specs, actual_specs)
def test_volume_type_extra_specs_delete(self):
expected_specs = self.vol_type1_specs.copy()
del expected_specs['vol_extra2']
db.volume_type_extra_specs_delete(context.get_admin_context(),
self.volume_type1_id,
'vol_extra2')
actual_specs = db.volume_type_extra_specs_get(
context.get_admin_context(),
self.volume_type1_id)
self.assertEquals(expected_specs, actual_specs)
def test_volume_type_extra_specs_update(self):
expected_specs = self.vol_type1_specs.copy()
expected_specs['vol_extra3'] = "4"
db.volume_type_extra_specs_update_or_create(
context.get_admin_context(),
self.volume_type1_id,
dict(vol_extra3=4))
actual_specs = db.volume_type_extra_specs_get(
context.get_admin_context(),
self.volume_type1_id)
self.assertEquals(expected_specs, actual_specs)
def test_volume_type_extra_specs_create(self):
expected_specs = self.vol_type1_specs.copy()
expected_specs['vol_extra4'] = 'value4'
expected_specs['vol_extra5'] = 'value5'
db.volume_type_extra_specs_update_or_create(
context.get_admin_context(),
self.volume_type1_id,
dict(vol_extra4="value4",
vol_extra5="value5"))
actual_specs = db.volume_type_extra_specs_get(
context.get_admin_context(),
self.volume_type1_id)
self.assertEquals(expected_specs, actual_specs)
def test_volume_type_get_with_extra_specs(self):
volume_type = db.volume_type_get(
context.get_admin_context(),
self.volume_type1_id)
self.assertEquals(volume_type['extra_specs'],
self.vol_type1_specs)
volume_type = db.volume_type_get(
context.get_admin_context(),
self.vol_type2_id)
self.assertEquals(volume_type['extra_specs'], {})
def test_volume_type_get_by_name_with_extra_specs(self):
volume_type = db.volume_type_get_by_name(
context.get_admin_context(),
self.vol_type1['name'])
self.assertEquals(volume_type['extra_specs'],
self.vol_type1_specs)
volume_type = db.volume_type_get_by_name(
context.get_admin_context(),
self.vol_type2_noextra['name'])
self.assertEquals(volume_type['extra_specs'], {})
def test_volume_type_get_all(self):
expected_specs = self.vol_type1_specs.copy()
types = db.volume_type_get_all(context.get_admin_context())
self.assertEquals(
types[self.vol_type1['name']]['extra_specs'], expected_specs)
self.assertEquals(
types[self.vol_type2_noextra['name']]['extra_specs'], {})

View File

@@ -1,91 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests For miscellaneous util methods used with volume."""
from nova import context
from nova import db
from nova import flags
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
from nova.openstack.common.notifier import api as notifier_api
from nova.openstack.common.notifier import test_notifier
from nova import test
from nova.tests import fake_network
from nova.volume import utils as volume_utils
LOG = logging.getLogger(__name__)
FLAGS = flags.FLAGS
class UsageInfoTestCase(test.TestCase):
def setUp(self):
super(UsageInfoTestCase, self).setUp()
self.flags(compute_driver='nova.virt.fake.FakeDriver',
host='fake',
notification_driver=[test_notifier.__name__])
fake_network.set_stub_network_methods(self.stubs)
self.volume = importutils.import_object(FLAGS.volume_manager)
self.user_id = 'fake'
self.project_id = 'fake'
self.snapshot_id = 'fake'
self.volume_size = 0
self.context = context.RequestContext(self.user_id, self.project_id)
test_notifier.NOTIFICATIONS = []
def tearDown(self):
notifier_api._reset_drivers()
super(UsageInfoTestCase, self).tearDown()
def _create_volume(self, params={}):
"""Create a test volume"""
vol = {}
vol['snapshot_id'] = self.snapshot_id
vol['user_id'] = self.user_id
vol['project_id'] = self.project_id
vol['host'] = FLAGS.host
vol['availability_zone'] = FLAGS.storage_availability_zone
vol['status'] = "creating"
vol['attach_status'] = "detached"
vol['size'] = self.volume_size
vol.update(params)
return db.volume_create(self.context, vol)['id']
def test_notify_usage_exists(self):
"""Ensure 'exists' notification generates appropriate usage data."""
volume_id = self._create_volume()
volume = db.volume_get(self.context, volume_id)
volume_utils.notify_usage_exists(self.context, volume)
self.assertEquals(len(test_notifier.NOTIFICATIONS), 1)
msg = test_notifier.NOTIFICATIONS[0]
self.assertEquals(msg['priority'], 'INFO')
self.assertEquals(msg['event_type'], 'volume.exists')
payload = msg['payload']
self.assertEquals(payload['tenant_id'], self.project_id)
self.assertEquals(payload['user_id'], self.user_id)
self.assertEquals(payload['snapshot_id'], self.snapshot_id)
self.assertEquals(payload['volume_id'], volume.id)
self.assertEquals(payload['size'], self.volume_size)
for attr in ('display_name', 'created_at', 'launched_at',
'status', 'audit_period_beginning',
'audit_period_ending'):
self.assertTrue(attr in payload,
msg="Key %s not in payload" % attr)
db.volume_destroy(context.get_admin_context(), volume['id'])

View File

@@ -170,7 +170,7 @@ class XenAPIVolumeTestCase(stubs.XenAPITestBase):
vol['user_id'] = 'fake'
vol['project_id'] = 'fake'
vol['host'] = 'localhost'
vol['availability_zone'] = FLAGS.storage_availability_zone
vol['availability_zone'] = FLAGS.node_availability_zone
vol['status'] = "creating"
vol['attach_status'] = "detached"
return db.volume_create(self.context, vol)

View File

@@ -1,140 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Test suite for Xen Storage Manager Volume Driver."""
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova.openstack.common import log as logging
from nova.tests.xenapi import stubs
from nova.virt.xenapi import fake as xenapi_fake
from nova.volume import xensm
LOG = logging.getLogger(__name__)
FLAGS = flags.FLAGS
class XenSMTestCase(stubs.XenAPITestBase):
"""Unit tests for Xen Storage Manager Volume operations."""
def _get_sm_backend_params(self):
config_params = ("name_label=testsmbackend "
"server=localhost "
"serverpath=/tmp/nfspath")
params = dict(flavor_id=1,
sr_uuid=None,
sr_type='nfs',
config_params=config_params)
return params
def setUp(self):
super(XenSMTestCase, self).setUp()
self.user_id = 'fake'
self.project_id = 'fake'
self.context = context.RequestContext(self.user_id, self.project_id)
self.flags(compute_driver='xenapi.XenAPIDriver',
xenapi_connection_url='http://test_url',
xenapi_connection_username='test_user',
xenapi_connection_password='test_pass')
stubs.stubout_session(self.stubs, xenapi_fake.SessionBase)
xenapi_fake.reset()
self.driver = xensm.XenSMDriver()
self.driver.db = db
def _setup_step(self, ctxt):
# Create a fake backend conf
params = self._get_sm_backend_params()
beconf = db.sm_backend_conf_create(ctxt,
params)
# Call setup, the one time operation that will create a backend SR
self.driver.do_setup(ctxt)
return beconf
def test_do_setup(self):
ctxt = context.get_admin_context()
beconf = self._setup_step(ctxt)
beconf = db.sm_backend_conf_get(ctxt, beconf['id'])
self.assertIsInstance(beconf['sr_uuid'], basestring)
def _create_volume(self, size=0):
"""Create a volume object."""
vol = {}
vol['size'] = size
vol['user_id'] = 'fake'
vol['project_id'] = 'fake'
vol['host'] = 'localhost'
vol['availability_zone'] = FLAGS.storage_availability_zone
vol['status'] = "creating"
vol['attach_status'] = "detached"
return db.volume_create(self.context, vol)
def test_create_volume(self):
ctxt = context.get_admin_context()
beconf = self._setup_step(ctxt)
volume = self._create_volume()
self.driver.create_volume(volume)
db.sm_volume_get(ctxt, volume['id'])
def test_local_path(self):
ctxt = context.get_admin_context()
volume = self._create_volume()
val = self.driver.local_path(volume)
self.assertIsInstance(val, basestring)
def test_delete_volume(self):
ctxt = context.get_admin_context()
beconf = self._setup_step(ctxt)
volume = self._create_volume()
self.driver.create_volume(volume)
self.driver.delete_volume(volume)
self.assertRaises(exception.NotFound,
db.sm_volume_get,
ctxt,
volume['id'])
def test_delete_volume_raises_notfound(self):
ctxt = context.get_admin_context()
beconf = self._setup_step(ctxt)
self.assertRaises(exception.NotFound,
self.driver.delete_volume,
{'id': "FA15E-1D"})
def _get_expected_conn(self, beconf, vol):
expected = {}
expected['volume_id'] = unicode(vol['id'])
expected['flavor_id'] = beconf['flavor_id']
expected['sr_uuid'] = unicode(beconf['sr_uuid'])
expected['sr_type'] = unicode(beconf['sr_type'])
return expected
def test_initialize_connection(self):
ctxt = context.get_admin_context()
beconf = self._setup_step(ctxt)
beconf = db.sm_backend_conf_get(ctxt, beconf['id'])
volume = self._create_volume()
self.driver.create_volume(volume)
expected = self._get_expected_conn(beconf, volume)
conn = self.driver.initialize_connection(volume, 'fakeConnector')
res = {}
for key in ['volume_id', 'flavor_id', 'sr_uuid', 'sr_type']:
res[key] = conn['data'][key]
self.assertDictMatch(expected, res)
self.assertEqual(set(conn['data']['introduce_sr_keys']),
set([u'sr_type', u'server', u'serverpath']))