cinder/cinder/tests/unit/volume/test_volume.py

2953 lines
129 KiB
Python

# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for Volume Code."""
import datetime
import ddt
import time
import uuid
from castellan import key_manager
import enum
import eventlet
import mock
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_utils import imageutils
import six
from taskflow.engines.action_engine import engine
from cinder.api import common
from cinder import context
from cinder import coordination
from cinder import db
from cinder import exception
from cinder import objects
from cinder.objects import fields
import cinder.policy
from cinder import quota
from cinder.tests import fake_driver
from cinder.tests.unit import conf_fixture
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.keymgr import fake as fake_keymgr
from cinder.tests.unit import utils as tests_utils
from cinder.tests.unit import volume as base
from cinder import utils
import cinder.volume
from cinder.volume import driver
from cinder.volume import manager as vol_manager
from cinder.volume import rpcapi as volume_rpcapi
import cinder.volume.targets.tgt
QUOTAS = quota.QUOTAS
CONF = cfg.CONF
ENCRYPTION_PROVIDER = 'nova.volume.encryptors.cryptsetup.CryptsetupEncryptor'
fake_opt = [
cfg.StrOpt('fake_opt1', default='fake', help='fake opts')
]
def create_snapshot(volume_id, size=1, metadata=None, ctxt=None,
**kwargs):
"""Create a snapshot object."""
metadata = metadata or {}
snap = objects.Snapshot(ctxt or context.get_admin_context())
snap.volume_size = size
snap.user_id = fake.USER_ID
snap.project_id = fake.PROJECT_ID
snap.volume_id = volume_id
snap.status = fields.SnapshotStatus.CREATING
if metadata is not None:
snap.metadata = metadata
snap.update(kwargs)
snap.create()
return snap
@ddt.ddt
class VolumeTestCase(base.BaseVolumeTestCase):
def setUp(self):
super(VolumeTestCase, self).setUp()
self.patch('cinder.volume.utils.clear_volume', autospec=True)
self.expected_status = 'available'
self.service_id = 1
self.user_context = context.RequestContext(user_id=fake.USER_ID,
project_id=fake.PROJECT_ID)
@mock.patch('cinder.objects.service.Service.get_minimum_rpc_version')
@mock.patch('cinder.objects.service.Service.get_minimum_obj_version')
@mock.patch('cinder.rpc.LAST_RPC_VERSIONS', {'cinder-scheduler': '1.3'})
@mock.patch('cinder.rpc.LAST_OBJ_VERSIONS', {'cinder-scheduler': '1.4'})
def test_reset(self, get_min_obj, get_min_rpc):
vol_mgr = vol_manager.VolumeManager()
scheduler_rpcapi = vol_mgr.scheduler_rpcapi
self.assertEqual('1.3', scheduler_rpcapi.client.version_cap)
self.assertEqual('1.4',
scheduler_rpcapi.client.serializer._base.version_cap)
get_min_obj.return_value = objects.base.OBJ_VERSIONS.get_current()
vol_mgr.reset()
scheduler_rpcapi = vol_mgr.scheduler_rpcapi
self.assertEqual(get_min_rpc.return_value,
scheduler_rpcapi.client.version_cap)
self.assertEqual(get_min_obj.return_value,
scheduler_rpcapi.client.serializer._base.version_cap)
self.assertIsNone(scheduler_rpcapi.client.serializer._base.manifest)
@mock.patch('oslo_utils.importutils.import_object')
def test_backend_availability_zone(self, mock_import_object):
# NOTE(smcginnis): This isn't really the best place for this test,
# but we don't currently have a pure VolumeManager test class. So
# until we create a good suite for that class, putting here with
# other tests that use VolumeManager.
opts = {
'backend_availability_zone': 'caerbannog'
}
def conf_get(option):
if option in opts:
return opts[option]
return None
mock_driver = mock.Mock()
mock_driver.configuration.safe_get.side_effect = conf_get
mock_driver.configuration.extra_capabilities = 'null'
def import_obj(*args, **kwargs):
return mock_driver
mock_import_object.side_effect = import_obj
manager = vol_manager.VolumeManager(volume_driver=mock_driver)
self.assertIsNotNone(manager)
self.assertEqual(opts['backend_availability_zone'],
manager.availability_zone)
@mock.patch('cinder.volume.manager.VolumeManager._append_volume_stats',
mock.Mock())
@mock.patch.object(vol_manager.VolumeManager,
'update_service_capabilities')
def test_report_filter_goodness_function(self, mock_update):
manager = vol_manager.VolumeManager()
manager.driver.set_initialized()
myfilterfunction = "myFilterFunction"
mygoodnessfunction = "myGoodnessFunction"
expected = {'name': 'cinder-volumes',
'filter_function': myfilterfunction,
'goodness_function': mygoodnessfunction,
}
with mock.patch.object(manager.driver,
'get_volume_stats') as m_get_stats:
with mock.patch.object(manager.driver,
'get_goodness_function') as m_get_goodness:
with mock.patch.object(manager.driver,
'get_filter_function') as m_get_filter:
m_get_stats.return_value = {'name': 'cinder-volumes'}
m_get_filter.return_value = myfilterfunction
m_get_goodness.return_value = mygoodnessfunction
manager._report_driver_status(context.get_admin_context())
self.assertTrue(m_get_stats.called)
mock_update.assert_called_once_with(expected)
def test_is_working(self):
# By default we have driver mocked to be initialized...
self.assertTrue(self.volume.is_working())
# ...lets switch it and check again!
self.volume.driver._initialized = False
self.assertFalse(self.volume.is_working())
@mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
@mock.patch.object(QUOTAS, 'reserve')
@mock.patch.object(QUOTAS, 'commit')
@mock.patch.object(QUOTAS, 'rollback')
def test_create_driver_not_initialized(self, reserve, commit, rollback,
mock_notify):
self.volume.driver._initialized = False
def fake_reserve(context, expire=None, project_id=None, **deltas):
return ["RESERVATION"]
def fake_commit_and_rollback(context, reservations, project_id=None):
pass
reserve.return_value = fake_reserve
commit.return_value = fake_commit_and_rollback
rollback.return_value = fake_commit_and_rollback
volume = tests_utils.create_volume(
self.context,
availability_zone=CONF.storage_availability_zone,
**self.volume_params)
volume_id = volume['id']
self.assertIsNone(volume['encryption_key_id'])
mock_notify.assert_not_called()
self.assertRaises(exception.DriverNotInitialized,
self.volume.create_volume, self.context, volume)
volume = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual("error", volume.status)
db.volume_destroy(context.get_admin_context(), volume_id)
def test_create_driver_not_initialized_rescheduling(self):
self.volume.driver._initialized = False
mock_delete = self.mock_object(self.volume.driver, 'delete_volume')
volume = tests_utils.create_volume(
self.context,
availability_zone=CONF.storage_availability_zone,
**self.volume_params)
volume_id = volume['id']
self.assertRaises(exception.DriverNotInitialized,
self.volume.create_volume,
self.context, volume,
{'volume_properties': self.volume_params},
{'retry': {'num_attempts': 1, 'host': []}})
# NOTE(dulek): Volume should be rescheduled as we passed request_spec
# and filter_properties, assert that it wasn't counted in
# allocated_capacity tracking.
self.assertEqual({}, self.volume.stats['pools'])
# NOTE(dulek): As we've rescheduled, make sure delete_volume was
# called.
self.assertTrue(mock_delete.called)
db.volume_destroy(context.get_admin_context(), volume_id)
def test_create_non_cinder_exception_rescheduling(self):
params = self.volume_params
del params['host']
volume = tests_utils.create_volume(
self.context,
availability_zone=CONF.storage_availability_zone,
**params)
volume_id = volume['id']
with mock.patch.object(self.volume.driver, 'create_volume',
side_effect=processutils.ProcessExecutionError):
self.assertRaises(processutils.ProcessExecutionError,
self.volume.create_volume,
self.context, volume,
{'volume_properties': params},
{'retry': {'num_attempts': 1, 'host': []}})
# NOTE(dulek): Volume should be rescheduled as we passed request_spec
# and filter_properties, assert that it wasn't counted in
# allocated_capacity tracking.
self.assertEqual({}, self.volume.stats['pools'])
db.volume_destroy(context.get_admin_context(), volume_id)
@mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
@mock.patch.object(QUOTAS, 'rollback')
@mock.patch.object(QUOTAS, 'commit')
@mock.patch.object(QUOTAS, 'reserve')
def test_delete_driver_not_initialized(self, reserve, commit, rollback,
mock_notify):
self.volume.driver._initialized = False
def fake_reserve(context, expire=None, project_id=None, **deltas):
return ["RESERVATION"]
def fake_commit_and_rollback(context, reservations, project_id=None):
pass
reserve.return_value = fake_reserve
commit.return_value = fake_commit_and_rollback
rollback.return_value = fake_commit_and_rollback
volume = tests_utils.create_volume(
self.context,
availability_zone=CONF.storage_availability_zone,
**self.volume_params)
self.assertIsNone(volume['encryption_key_id'])
mock_notify.assert_not_called()
self.assertRaises(exception.DriverNotInitialized,
self.volume.delete_volume, self.context, volume)
volume = objects.Volume.get_by_id(self.context, volume.id)
self.assertEqual("error_deleting", volume.status)
volume.destroy()
@mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
@mock.patch('cinder.quota.QUOTAS.rollback', new=mock.Mock())
@mock.patch('cinder.quota.QUOTAS.commit', new=mock.Mock())
@mock.patch('cinder.quota.QUOTAS.reserve', return_value=['RESERVATION'])
def test_create_delete_volume(self, _mock_reserve, mock_notify):
"""Test volume can be created and deleted."""
volume = tests_utils.create_volume(
self.context,
availability_zone=CONF.storage_availability_zone,
**self.volume_params)
volume_id = volume['id']
mock_notify.assert_not_called()
self.assertIsNone(volume['encryption_key_id'])
self.volume.create_volume(self.context, volume)
self.assert_notify_called(mock_notify,
(['INFO', 'volume.create.start'],
['INFO', 'volume.create.end']))
self.volume.delete_volume(self.context, volume)
vol = db.volume_get(context.get_admin_context(read_deleted='yes'),
volume_id)
self.assertEqual(vol['status'], 'deleted')
self.assert_notify_called(mock_notify,
(['INFO', 'volume.create.start'],
['INFO', 'volume.create.end'],
['INFO', 'volume.delete.start'],
['INFO', 'volume.delete.end']))
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume_id)
def test_create_delete_volume_with_metadata(self):
"""Test volume can be created with metadata and deleted."""
test_meta = {'fake_key': 'fake_value'}
volume = tests_utils.create_volume(self.context, metadata=test_meta,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
self.assertEqual(test_meta, volume.metadata)
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume_id)
def test_delete_volume_frozen(self):
service = tests_utils.create_service(self.context, {'frozen': True})
volume = tests_utils.create_volume(self.context, host=service.host)
self.assertRaises(exception.InvalidInput,
self.volume_api.delete, self.context, volume)
def test_delete_volume_another_cluster_fails(self):
"""Test delete of volume from another cluster fails."""
self.volume.cluster = 'mycluster'
volume = tests_utils.create_volume(self.context, status='available',
size=1, host=CONF.host + 'fake',
cluster_name=self.volume.cluster)
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume.id)
@mock.patch('cinder.db.volume_metadata_update')
def test_create_volume_metadata(self, metadata_update):
metadata = {'fake_key': 'fake_value'}
metadata_update.return_value = metadata
volume = tests_utils.create_volume(self.context, **self.volume_params)
res = self.volume_api.create_volume_metadata(self.context,
volume, metadata)
metadata_update.assert_called_once_with(self.context, volume.id,
metadata, False,
common.METADATA_TYPES.user)
self.assertEqual(metadata, res)
@ddt.data('maintenance', 'uploading')
def test_create_volume_metadata_maintenance(self, status):
metadata = {'fake_key': 'fake_value'}
volume = tests_utils.create_volume(self.context, **self.volume_params)
volume['status'] = status
self.assertRaises(exception.InvalidVolume,
self.volume_api.create_volume_metadata,
self.context,
volume,
metadata)
def test_update_volume_metadata_with_metatype(self):
"""Test update volume metadata with different metadata type."""
test_meta1 = {'fake_key1': 'fake_value1'}
test_meta2 = {'fake_key1': 'fake_value2'}
FAKE_METADATA_TYPE = enum.Enum('METADATA_TYPES', 'fake_type')
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
self.volume.create_volume(self.context, volume)
# update user metadata associated with the volume.
result_meta = self.volume_api.update_volume_metadata(
self.context,
volume,
test_meta2,
False,
common.METADATA_TYPES.user)
self.assertEqual(test_meta2, result_meta)
# create image metadata associated with the volume.
result_meta = self.volume_api.update_volume_metadata(
self.context,
volume,
test_meta1,
False,
common.METADATA_TYPES.image)
self.assertEqual(test_meta1, result_meta)
# update image metadata associated with the volume.
result_meta = self.volume_api.update_volume_metadata(
self.context,
volume,
test_meta2,
False,
common.METADATA_TYPES.image)
self.assertEqual(test_meta2, result_meta)
# update volume metadata with invalid metadta type.
self.assertRaises(exception.InvalidMetadataType,
self.volume_api.update_volume_metadata,
self.context,
volume,
test_meta1,
False,
FAKE_METADATA_TYPE.fake_type)
def test_update_volume_metadata_maintenance(self):
"""Test update volume metadata with different metadata type."""
test_meta1 = {'fake_key1': 'fake_value1'}
FAKE_METADATA_TYPE = enum.Enum('METADATA_TYPES', 'fake_type')
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
volume['status'] = 'maintenance'
self.assertRaises(exception.InvalidVolume,
self.volume_api.update_volume_metadata,
self.context,
volume,
test_meta1,
False,
FAKE_METADATA_TYPE.fake_type)
@mock.patch('cinder.db.volume_update')
def test_update_with_ovo(self, volume_update):
"""Test update volume using oslo_versionedobject."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
updates = {'display_name': 'foobbar'}
self.volume_api.update(self.context, volume, updates)
volume_update.assert_called_once_with(self.context, volume.id,
updates)
self.assertEqual('foobbar', volume.display_name)
def test_delete_volume_metadata_with_metatype(self):
"""Test delete volume metadata with different metadata type."""
test_meta1 = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
test_meta2 = {'fake_key1': 'fake_value1'}
FAKE_METADATA_TYPE = enum.Enum('METADATA_TYPES', 'fake_type')
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
# delete user metadata associated with the volume.
self.volume_api.delete_volume_metadata(
self.context,
volume,
'fake_key2',
common.METADATA_TYPES.user)
self.assertEqual(test_meta2,
db.volume_metadata_get(self.context, volume_id))
# create image metadata associated with the volume.
result_meta = self.volume_api.update_volume_metadata(
self.context,
volume,
test_meta1,
False,
common.METADATA_TYPES.image)
self.assertEqual(test_meta1, result_meta)
# delete image metadata associated with the volume.
self.volume_api.delete_volume_metadata(
self.context,
volume,
'fake_key2',
common.METADATA_TYPES.image)
# parse the result to build the dict.
rows = db.volume_glance_metadata_get(self.context, volume_id)
result = {}
for row in rows:
result[row['key']] = row['value']
self.assertEqual(test_meta2, result)
# delete volume metadata with invalid metadta type.
self.assertRaises(exception.InvalidMetadataType,
self.volume_api.delete_volume_metadata,
self.context,
volume,
'fake_key1',
FAKE_METADATA_TYPE.fake_type)
def test_delete_volume_metadata_maintenance(self):
"""Test delete volume metadata in maintenance."""
FAKE_METADATA_TYPE = enum.Enum('METADATA_TYPES', 'fake_type')
test_meta1 = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
volume['status'] = 'maintenance'
self.assertRaises(exception.InvalidVolume,
self.volume_api.delete_volume_metadata,
self.context,
volume,
'fake_key1',
FAKE_METADATA_TYPE.fake_type)
def test_accept_transfer_maintenance(self):
"""Test accept transfer in maintenance."""
test_meta1 = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
volume['status'] = 'maintenance'
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidVolume,
volume_api.accept_transfer,
self.context,
volume,
None, None)
@mock.patch.object(cinder.volume.api.API, 'list_availability_zones')
def test_create_volume_uses_default_availability_zone(self, mock_list_az):
"""Test setting availability_zone correctly during volume create."""
mock_list_az.return_value = ({'name': 'az1', 'available': True},
{'name': 'az2', 'available': True},
{'name': 'default-az', 'available': True})
volume_api = cinder.volume.api.API()
# Test backwards compatibility, default_availability_zone not set
self.override_config('storage_availability_zone', 'az2')
volume = volume_api.create(self.context,
1,
'name',
'description')
self.assertEqual('az2', volume['availability_zone'])
self.override_config('default_availability_zone', 'default-az')
volume = volume_api.create(self.context,
1,
'name',
'description')
self.assertEqual('default-az', volume['availability_zone'])
@mock.patch('cinder.quota.QUOTAS.rollback', new=mock.MagicMock())
@mock.patch('cinder.quota.QUOTAS.commit', new=mock.MagicMock())
@mock.patch('cinder.quota.QUOTAS.reserve', return_value=["RESERVATION"])
def test_create_volume_with_volume_type(self, _mock_reserve):
"""Test volume creation with default volume type."""
volume_api = cinder.volume.api.API()
# Create volume with default volume type while default
# volume type doesn't exist, volume_type_id should be NULL
volume = volume_api.create(self.context,
1,
'name',
'description')
self.assertIsNone(volume['volume_type_id'])
self.assertIsNone(volume['encryption_key_id'])
# Create default volume type
vol_type = conf_fixture.def_vol_type
db.volume_type_create(context.get_admin_context(),
{'name': vol_type, 'extra_specs': {}})
db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
vol_type)
# Create volume with default volume type
volume = volume_api.create(self.context,
1,
'name',
'description')
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
self.assertIsNone(volume['encryption_key_id'])
# Create volume with specific volume type
vol_type = 'test'
db.volume_type_create(context.get_admin_context(),
{'name': vol_type, 'extra_specs': {}})
db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
vol_type)
volume = volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_create_volume_with_encrypted_volume_type_aes(self):
ctxt = context.get_admin_context()
cipher = 'aes-xts-plain64'
key_size = 256
control_location = 'front-end'
db.volume_type_create(ctxt,
{'id': '61298380-0c12-11e3-bfd6-4b48424183be',
'name': 'LUKS'})
db.volume_type_encryption_create(
ctxt,
'61298380-0c12-11e3-bfd6-4b48424183be',
{'control_location': control_location,
'provider': ENCRYPTION_PROVIDER,
'cipher': cipher,
'key_size': key_size})
volume_api = cinder.volume.api.API()
db_vol_type = db.volume_type_get_by_name(ctxt, 'LUKS')
volume = volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
key_manager = volume_api.key_manager
key = key_manager.get(self.context, volume['encryption_key_id'])
self.assertEqual(key_size, len(key.get_encoded()) * 8)
self.assertEqual('aes', key.algorithm)
metadata = db.volume_encryption_metadata_get(self.context, volume.id)
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
self.assertEqual(cipher, metadata.get('cipher'))
self.assertEqual(key_size, metadata.get('key_size'))
self.assertIsNotNone(volume['encryption_key_id'])
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_create_volume_with_encrypted_volume_type_blowfish(self):
ctxt = context.get_admin_context()
cipher = 'blowfish-cbc'
key_size = 32
control_location = 'front-end'
db.volume_type_create(ctxt,
{'id': '61298380-0c12-11e3-bfd6-4b48424183be',
'name': 'LUKS'})
db.volume_type_encryption_create(
ctxt,
'61298380-0c12-11e3-bfd6-4b48424183be',
{'control_location': control_location,
'provider': ENCRYPTION_PROVIDER,
'cipher': cipher,
'key_size': key_size})
volume_api = cinder.volume.api.API()
db_vol_type = db.volume_type_get_by_name(ctxt, 'LUKS')
volume = volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
key_manager = volume_api.key_manager
key = key_manager.get(self.context, volume['encryption_key_id'])
self.assertEqual('blowfish', key.algorithm)
metadata = db.volume_encryption_metadata_get(self.context, volume.id)
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
self.assertEqual(cipher, metadata.get('cipher'))
self.assertEqual(key_size, metadata.get('key_size'))
self.assertIsNotNone(volume['encryption_key_id'])
def test_create_volume_with_provider_id(self):
volume_params_with_provider_id = dict(provider_id=fake.PROVIDER_ID,
**self.volume_params)
volume = tests_utils.create_volume(self.context,
**volume_params_with_provider_id)
self.volume.create_volume(self.context, volume)
self.assertEqual(fake.PROVIDER_ID, volume['provider_id'])
def test_create_volume_with_admin_metadata(self):
with mock.patch.object(
self.volume.driver, 'create_volume',
return_value={'admin_metadata': {'foo': 'bar'}}):
volume = tests_utils.create_volume(self.user_context)
self.volume.create_volume(self.user_context, volume)
self.assertEqual({'foo': 'bar'}, volume['admin_metadata'])
@mock.patch.object(key_manager, 'API', new=fake_keymgr.fake_api)
def test_create_delete_volume_with_encrypted_volume_type(self):
cipher = 'aes-xts-plain64'
key_size = 256
db.volume_type_create(self.context,
{'id': fake.VOLUME_TYPE_ID, 'name': 'LUKS'})
db.volume_type_encryption_create(
self.context, fake.VOLUME_TYPE_ID,
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
db_vol_type = db.volume_type_get_by_name(self.context, 'LUKS')
volume = self.volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
self.assertIsNotNone(volume.get('encryption_key_id', None))
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
volume['host'] = 'fake_host'
volume['status'] = 'available'
db.volume_update(self.context, volume['id'], {'status': 'available'})
self.volume_api.delete(self.context, volume)
volume = objects.Volume.get_by_id(self.context, volume.id)
while volume.status == 'available':
# Must wait for volume_api delete request to process enough to
# change the volume status.
time.sleep(0.5)
volume.refresh()
self.assertEqual('deleting', volume['status'])
db.volume_destroy(self.context, volume['id'])
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume['id'])
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_delete_encrypted_volume_fail_deleting_key(self):
cipher = 'aes-xts-plain64'
key_size = 256
db.volume_type_create(self.context,
{'id': fake.VOLUME_TYPE_ID, 'name': 'LUKS'})
db.volume_type_encryption_create(
self.context, fake.VOLUME_TYPE_ID,
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
db_vol_type = db.volume_type_get_by_name(self.context, 'LUKS')
volume = self.volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
volume_id = volume['id']
volume['host'] = 'fake_host'
volume['status'] = 'available'
db.volume_update(self.context, volume_id, {'status': 'available'})
with mock.patch.object(
self.volume_api.key_manager,
'delete',
side_effect=Exception):
self.assertRaises(exception.InvalidVolume,
self.volume_api.delete,
self.context,
volume)
volume = objects.Volume.get_by_id(self.context, volume_id)
self.assertEqual("error_deleting", volume.status)
volume.destroy()
def test_delete_busy_volume(self):
"""Test volume survives deletion if driver reports it as busy."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
with mock.patch.object(self.volume.driver, 'delete_volume',
side_effect=exception.VolumeIsBusy(
volume_name='fake')
) as mock_del_vol:
self.volume.delete_volume(self.context, volume)
volume_ref = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(volume_id, volume_ref.id)
self.assertEqual("available", volume_ref.status)
mock_del_vol.assert_called_once_with(volume)
def test_unmanage_encrypted_volume_fails(self):
volume = tests_utils.create_volume(
self.context,
encryption_key_id=fake.ENCRYPTION_KEY_ID,
**self.volume_params)
self.volume.create_volume(self.context, volume)
manager = vol_manager.VolumeManager()
self.assertRaises(exception.Invalid,
manager.delete_volume,
self.context,
volume,
unmanage_only=True)
self.volume.delete_volume(self.context, volume)
def test_get_volume_different_tenant(self):
"""Test can't get volume of another tenant when viewable_admin_meta."""
volume = tests_utils.create_volume(self.context,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
another_context = context.RequestContext('another_user_id',
'another_project_id',
is_admin=False)
self.assertNotEqual(another_context.project_id,
self.context.project_id)
volume_api = cinder.volume.api.API()
self.assertRaises(exception.VolumeNotFound, volume_api.get,
another_context, volume_id, viewable_admin_meta=True)
self.assertEqual(volume_id,
volume_api.get(self.context, volume_id)['id'])
self.volume.delete_volume(self.context, volume)
def test_get_all_limit_bad_value(self):
"""Test value of 'limit' is numeric and >= 0"""
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidInput,
volume_api.get_all,
self.context,
limit="A")
self.assertRaises(exception.InvalidInput,
volume_api.get_all,
self.context,
limit="-1")
def test_get_all_tenants_volume_list(self):
"""Validate when the volume list for all tenants is returned"""
volume_api = cinder.volume.api.API()
with mock.patch.object(volume_api.db,
'volume_get_all_by_project') as by_project:
with mock.patch.object(volume_api.db,
'volume_get_all') as get_all:
db_volume = {'volume_type_id': fake.VOLUME_TYPE_ID,
'name': 'fake_name',
'host': 'fake_host',
'id': fake.VOLUME_ID}
volume = fake_volume.fake_db_volume(**db_volume)
by_project.return_value = [volume]
get_all.return_value = [volume]
volume_api.get_all(self.context, filters={'all_tenants': '0'})
self.assertTrue(by_project.called)
by_project.called = False
self.context.is_admin = False
volume_api.get_all(self.context, filters={'all_tenants': '1'})
self.assertTrue(by_project.called)
# check for volume list of all tenants
self.context.is_admin = True
volume_api.get_all(self.context, filters={'all_tenants': '1'})
self.assertTrue(get_all.called)
def test_delete_volume_in_error_extending(self):
"""Test volume can be deleted in error_extending stats."""
# create a volume
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume)
# delete 'error_extending' volume
db.volume_update(self.context, volume['id'],
{'status': 'error_extending'})
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.NotFound, db.volume_get,
self.context, volume['id'])
@mock.patch.object(db.sqlalchemy.api, 'volume_get',
side_effect=exception.VolumeNotFound(
volume_id='12345678-1234-5678-1234-567812345678'))
def test_delete_volume_not_found(self, mock_get_volume):
"""Test delete volume moves on if the volume does not exist."""
volume_id = '12345678-1234-5678-1234-567812345678'
volume = objects.Volume(self.context, status='available', id=volume_id)
self.volume.delete_volume(self.context, volume)
self.assertTrue(mock_get_volume.called)
@mock.patch('cinder.volume.drivers.lvm.LVMVolumeDriver.'
'create_volume_from_snapshot')
def test_create_volume_from_snapshot(self, mock_create_from_snap):
"""Test volume can be created from a snapshot."""
volume_src = tests_utils.create_volume(self.context,
**self.volume_params)
self.volume.create_volume(self.context, volume_src)
snapshot_id = create_snapshot(volume_src['id'],
size=volume_src['size'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.volume.create_snapshot(self.context, snapshot_obj)
volume_dst = tests_utils.create_volume(self.context,
snapshot_id=snapshot_id,
**self.volume_params)
self.volume.create_volume(self.context, volume_dst)
self.assertEqual(volume_dst['id'],
db.volume_get(
context.get_admin_context(),
volume_dst['id']).id)
self.assertEqual(snapshot_id,
db.volume_get(context.get_admin_context(),
volume_dst['id']).snapshot_id)
self.volume.delete_volume(self.context, volume_dst)
self.volume.delete_snapshot(self.context, snapshot_obj)
self.volume.delete_volume(self.context, volume_src)
@mock.patch('cinder.volume.flows.api.create_volume.get_flow')
def test_create_volume_from_snapshot_with_types(self, _get_flow):
"""Test volume create from snapshot with types including mistmatch."""
volume_api = cinder.volume.api.API()
foo_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE_ID,
name='foo',
extra_specs={'volume_backend_name': 'dev_1'})
biz_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE2_ID,
name='foo',
extra_specs={'volume_backend_name': 'dev_2'})
source_vol = fake_volume.fake_volume_obj(
self.context,
id=fake.VOLUME_ID,
status='available',
volume_size=10,
volume_type_id=biz_type.id)
source_vol.volume_type = biz_type
snapshot = {'id': fake.SNAPSHOT_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 10,
'volume_type_id': biz_type.id}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
**snapshot)
snapshot_obj.volume = source_vol
# Make sure the case of specifying a type that
# doesn't match the snapshots type fails
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
snapshot=snapshot_obj)
# Make sure that trying to specify a type
# when the snapshots type is None fails
snapshot_obj.volume_type_id = None
snapshot_obj.volume.volume_type_id = None
snapshot_obj.volume.volume_type = None
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
snapshot=snapshot_obj)
snapshot_obj.volume_type_id = foo_type.id
snapshot_obj.volume.volume_type_id = foo_type.id
snapshot_obj.volume.volume_type = foo_type
volume_api.create(self.context, size=1, name='fake_name',
description='fake_desc', volume_type=foo_type,
snapshot=snapshot_obj)
@mock.patch('cinder.volume.flows.api.create_volume.get_flow')
def test_create_volume_from_source_with_types(self, _get_flow):
"""Test volume create from source with types including mistmatch."""
volume_api = cinder.volume.api.API()
foo_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE_ID,
name='foo',
extra_specs={'volume_backend_name': 'dev_1'})
biz_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE2_ID,
name='biz',
extra_specs={'volume_backend_name': 'dev_2'})
source_vol = fake_volume.fake_volume_obj(
self.context,
id=fake.VOLUME_ID,
status='available',
volume_size=0,
volume_type_id=biz_type.id)
source_vol.volume_type = biz_type
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
source_volume=source_vol)
# Make sure that trying to specify a type
# when the source type is None fails
source_vol.volume_type_id = None
source_vol.volume_type = None
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
source_volume=source_vol)
source_vol.volume_type_id = biz_type.id
source_vol.volume_type = biz_type
volume_api.create(self.context, size=1, name='fake_name',
description='fake_desc', volume_type=biz_type,
source_volume=source_vol)
@mock.patch('cinder.volume.flows.api.create_volume.get_flow')
def test_create_volume_from_source_with_same_backend(self, _get_flow):
"""Test volume create from source with type mismatch same backend."""
volume_api = cinder.volume.api.API()
foo_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE_ID,
name='foo',
qos_specs_id=None,
deleted=False,
created_at=datetime.datetime(2015, 5, 8, 0, 40, 5, 408232),
updated_at=None,
extra_specs={'volume_backend_name': 'dev_1'},
is_public=True,
deleted_at=None,
description=None)
biz_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE2_ID,
name='biz',
qos_specs_id=None,
deleted=False,
created_at=datetime.datetime(2015, 5, 8, 0, 20, 5, 408232),
updated_at=None,
extra_specs={'volume_backend_name': 'dev_1'},
is_public=True,
deleted_at=None,
description=None)
source_vol = fake_volume.fake_volume_obj(
self.context,
id=fake.VOLUME_ID,
status='available',
volume_size=10,
volume_type_id=biz_type.id)
source_vol.volume_type = biz_type
volume_api.create(self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
source_volume=source_vol)
@mock.patch('cinder.volume.flows.api.create_volume.get_flow')
def test_create_from_source_and_snap_only_one_backend(self, _get_flow):
"""Test create from source and snap with type mismatch one backend."""
volume_api = cinder.volume.api.API()
foo_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE_ID,
name='foo',
qos_specs_id=None,
deleted=False,
created_at=datetime.datetime(2015, 5, 8, 0, 40, 5, 408232),
updated_at=None,
extra_specs={'some_key': 3},
is_public=True,
deleted_at=None,
description=None)
biz_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE2_ID,
name='biz',
qos_specs_id=None,
deleted=False,
created_at=datetime.datetime(2015, 5, 8, 0, 20, 5, 408232),
updated_at=None,
extra_specs={'some_other_key': 4},
is_public=True,
deleted_at=None,
description=None)
source_vol = fake_volume.fake_volume_obj(
self.context,
id=fake.VOLUME_ID,
status='available',
volume_size=10,
volume_type_id=biz_type.id)
source_vol.volume_type = biz_type
snapshot = {'id': fake.SNAPSHOT_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 10,
'volume_type_id': biz_type['id']}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
**snapshot)
snapshot_obj.volume = source_vol
with mock.patch('cinder.db.service_get_all') as mock_get_service, \
mock.patch.object(volume_api,
'list_availability_zones') as mock_get_azs:
mock_get_service.return_value = [
{'host': 'foo',
'uuid': 'a3a593da-7f8d-4bb7-8b4c-f2bc1e0b4824'}]
mock_get_azs.return_value = {}
volume_api.create(self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
source_volume=source_vol)
volume_api.create(self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
snapshot=snapshot_obj)
def _test_create_from_source_snapshot_encryptions(
self, is_snapshot=False):
volume_api = cinder.volume.api.API()
foo_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE_ID,
name='foo',
extra_specs={'volume_backend_name': 'dev_1'})
biz_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE2_ID,
name='biz',
extra_specs={'volume_backend_name': 'dev_1'})
source_vol = fake_volume.fake_volume_obj(
self.context,
id=fake.VOLUME_ID,
status='available',
volume_size=1,
volume_type_id=biz_type.id)
source_vol.volume_type = biz_type
snapshot = {'id': fake.SNAPSHOT_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 1,
'volume_type_id': biz_type['id']}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
**snapshot)
snapshot_obj.volume = source_vol
with mock.patch.object(
cinder.volume.volume_types,
'volume_types_encryption_changed') as mock_encryption_changed:
mock_encryption_changed.return_value = True
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
source_volume=(
source_vol if not is_snapshot else None),
snapshot=snapshot_obj if is_snapshot else None)
def test_create_from_source_encryption_changed(self):
self._test_create_from_source_snapshot_encryptions()
def test_create_from_snapshot_encryption_changed(self):
self._test_create_from_source_snapshot_encryptions(is_snapshot=True)
def _mock_synchronized(self, name, *s_args, **s_kwargs):
def inner_sync1(f):
def inner_sync2(*args, **kwargs):
self.called.append('lock-%s' % (name))
ret = f(*args, **kwargs)
self.called.append('unlock-%s' % (name))
return ret
return inner_sync2
return inner_sync1
def _fake_execute(self, *cmd, **kwargs):
pass
@mock.patch.object(coordination.Coordinator, 'get_lock')
@mock.patch.object(fake_driver.FakeLoggingVolumeDriver,
'create_volume_from_snapshot')
def test_create_volume_from_snapshot_check_locks(
self, mock_lvm_create, mock_lock):
orig_flow = engine.ActionEngine.run
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
mock_lock.assert_called_with('%s-delete_snapshot' % snap_id)
# now proceed with the flow.
ret = orig_flow(*args, **kwargs)
return ret
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
# no lock
self.volume.create_volume(self.context, src_vol)
snap_id = create_snapshot(src_vol.id,
size=src_vol['size'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
# no lock
self.volume.create_snapshot(self.context, snapshot_obj)
dst_vol = tests_utils.create_volume(self.context,
snapshot_id=snap_id,
**self.volume_params)
admin_ctxt = context.get_admin_context()
# mock the flow runner so we can do some checks
self.mock_object(engine.ActionEngine, 'run', mock_flow_run)
# locked
self.volume.create_volume(self.context, dst_vol,
request_spec={'snapshot_id': snap_id})
mock_lock.assert_called_with('%s-delete_snapshot' % snap_id)
self.assertEqual(dst_vol.id, db.volume_get(admin_ctxt, dst_vol.id).id)
self.assertEqual(snap_id,
db.volume_get(admin_ctxt, dst_vol.id).snapshot_id)
# locked
self.volume.delete_volume(self.context, dst_vol)
mock_lock.assert_called_with('%s-delete_volume' % dst_vol.id)
# locked
self.volume.delete_snapshot(self.context, snapshot_obj)
mock_lock.assert_called_with('%s-delete_snapshot' % snap_id)
# locked
self.volume.delete_volume(self.context, src_vol)
mock_lock.assert_called_with('%s-delete_volume' % src_vol.id)
self.assertTrue(mock_lvm_create.called)
@mock.patch.object(coordination.Coordinator, 'get_lock')
def test_create_volume_from_volume_check_locks(self, mock_lock):
# mock the synchroniser so we can record events
self.mock_object(utils, 'execute', self._fake_execute)
orig_flow = engine.ActionEngine.run
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
mock_lock.assert_called_with('%s-delete_volume' % src_vol_id)
# now proceed with the flow.
ret = orig_flow(*args, **kwargs)
return ret
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
# no lock
self.volume.create_volume(self.context, src_vol)
self.assertEqual(0, mock_lock.call_count)
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
dst_vol_id = dst_vol['id']
admin_ctxt = context.get_admin_context()
# mock the flow runner so we can do some checks
self.mock_object(engine.ActionEngine, 'run', mock_flow_run)
# locked
self.volume.create_volume(self.context, dst_vol,
request_spec={'source_volid': src_vol_id})
mock_lock.assert_called_with('%s-delete_volume' % src_vol_id)
self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
self.assertEqual(src_vol_id,
db.volume_get(admin_ctxt, dst_vol_id).source_volid)
# locked
self.volume.delete_volume(self.context, dst_vol)
mock_lock.assert_called_with('%s-delete_volume' % dst_vol_id)
# locked
self.volume.delete_volume(self.context, src_vol)
mock_lock.assert_called_with('%s-delete_volume' % src_vol_id)
def _raise_metadata_copy_failure(self, method, dst_vol):
# MetadataCopyFailure exception will be raised if DB service is Down
# while copying the volume glance metadata
with mock.patch.object(db, method) as mock_db:
mock_db.side_effect = exception.MetadataCopyFailure(
reason="Because of DB service down.")
self.assertRaises(exception.MetadataCopyFailure,
self.volume.create_volume,
self.context,
dst_vol)
# ensure that status of volume is 'error'
vol = db.volume_get(self.context, dst_vol.id)
self.assertEqual('error', vol['status'])
# cleanup resource
db.volume_destroy(self.context, dst_vol.id)
@mock.patch('cinder.utils.execute')
def test_create_volume_from_volume_with_glance_volume_metadata_none(
self, mock_execute):
# create source volume
mock_execute.return_value = None
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
self.volume.create_volume(self.context, src_vol)
# set bootable flag of volume to True
db.volume_update(self.context, src_vol['id'], {'bootable': True})
# create volume from source volume
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
self.volume.create_volume(self.context, dst_vol)
self.assertRaises(exception.GlanceMetadataNotFound,
db.volume_glance_metadata_copy_from_volume_to_volume,
self.context, src_vol_id, dst_vol['id'])
# ensure that status of volume is 'available'
vol = db.volume_get(self.context, dst_vol['id'])
self.assertEqual('available', vol['status'])
# cleanup resource
db.volume_destroy(self.context, src_vol_id)
db.volume_destroy(self.context, dst_vol['id'])
@mock.patch('cinder.utils.execute')
def test_create_volume_from_volume_raise_metadata_copy_failure(
self, mock_execute):
# create source volume
mock_execute.return_value = None
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
self.volume.create_volume(self.context, src_vol)
# set bootable flag of volume to True
db.volume_update(self.context, src_vol['id'], {'bootable': True})
# create volume from source volume
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
self._raise_metadata_copy_failure(
'volume_glance_metadata_copy_from_volume_to_volume',
dst_vol)
# cleanup resource
db.volume_destroy(self.context, src_vol_id)
@mock.patch('cinder.utils.execute')
def test_create_volume_from_snapshot_raise_metadata_copy_failure(
self, mock_execute):
# create source volume
mock_execute.return_value = None
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
self.volume.create_volume(self.context, src_vol)
# set bootable flag of volume to True
db.volume_update(self.context, src_vol['id'], {'bootable': True})
# create volume from snapshot
snapshot_id = create_snapshot(src_vol['id'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.volume.create_snapshot(self.context, snapshot_obj)
# ensure that status of snapshot is 'available'
self.assertEqual(fields.SnapshotStatus.AVAILABLE, snapshot_obj.status)
dst_vol = tests_utils.create_volume(self.context,
snapshot_id=snapshot_id,
**self.volume_params)
self._raise_metadata_copy_failure(
'volume_glance_metadata_copy_to_volume',
dst_vol)
# cleanup resource
snapshot_obj.destroy()
db.volume_destroy(self.context, src_vol_id)
@mock.patch('cinder.utils.execute')
def test_create_volume_from_snapshot_with_glance_volume_metadata_none(
self, mock_execute):
# create source volume
mock_execute.return_value = None
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
self.volume.create_volume(self.context, src_vol)
# set bootable flag of volume to True
db.volume_update(self.context, src_vol['id'], {'bootable': True})
volume = db.volume_get(self.context, src_vol_id)
# create snapshot of volume
snapshot_id = create_snapshot(volume['id'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.volume.create_snapshot(self.context, snapshot_obj)
# ensure that status of snapshot is 'available'
self.assertEqual(fields.SnapshotStatus.AVAILABLE, snapshot_obj.status)
# create volume from snapshot
dst_vol = tests_utils.create_volume(self.context,
snapshot_id=snapshot_id,
**self.volume_params)
self.volume.create_volume(self.context, dst_vol)
self.assertRaises(exception.GlanceMetadataNotFound,
db.volume_glance_metadata_copy_to_volume,
self.context, dst_vol['id'], snapshot_id)
# ensure that status of volume is 'available'
vol = db.volume_get(self.context, dst_vol['id'])
self.assertEqual('available', vol['status'])
# cleanup resource
snapshot_obj.destroy()
db.volume_destroy(self.context, src_vol_id)
db.volume_destroy(self.context, dst_vol['id'])
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_create_volume_from_snapshot_with_encryption(self):
"""Test volume can be created from a snapshot of an encrypted volume"""
ctxt = context.get_admin_context()
cipher = 'aes-xts-plain64'
key_size = 256
db.volume_type_create(ctxt,
{'id': '61298380-0c12-11e3-bfd6-4b48424183be',
'name': 'LUKS'})
db.volume_type_encryption_create(
ctxt,
'61298380-0c12-11e3-bfd6-4b48424183be',
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
volume_api = cinder.volume.api.API()
db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
'LUKS')
volume_src = volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
db.volume_update(self.context, volume_src['id'],
{'host': 'fake_host@fake_backend'})
volume_src = objects.Volume.get_by_id(self.context, volume_src['id'])
snapshot_ref = volume_api.create_snapshot_force(self.context,
volume_src,
'name',
'description')
snapshot_ref['status'] = fields.SnapshotStatus.AVAILABLE
# status must be available
volume_dst = volume_api.create(self.context,
1,
'name',
'description',
snapshot=snapshot_ref)
self.assertEqual(volume_dst['id'],
db.volume_get(
context.get_admin_context(),
volume_dst['id']).id)
self.assertEqual(snapshot_ref['id'],
db.volume_get(context.get_admin_context(),
volume_dst['id']).snapshot_id)
# ensure encryption keys match
self.assertIsNotNone(volume_src['encryption_key_id'])
self.assertIsNotNone(volume_dst['encryption_key_id'])
key_manager = volume_api.key_manager # must use *same* key manager
volume_src_key = key_manager.get(self.context,
volume_src['encryption_key_id'])
volume_dst_key = key_manager.get(self.context,
volume_dst['encryption_key_id'])
self.assertEqual(volume_src_key, volume_dst_key)
def test_create_volume_from_encrypted_volume(self):
"""Test volume can be created from an encrypted volume."""
self.mock_object(key_manager, 'API', fake_keymgr.fake_api)
cipher = 'aes-xts-plain64'
key_size = 256
volume_api = cinder.volume.api.API()
ctxt = context.get_admin_context()
db.volume_type_create(ctxt,
{'id': '61298380-0c12-11e3-bfd6-4b48424183be',
'name': 'LUKS'})
db.volume_type_encryption_create(
ctxt,
'61298380-0c12-11e3-bfd6-4b48424183be',
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
'LUKS')
volume_src = volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
db.volume_update(self.context, volume_src['id'],
{'host': 'fake_host@fake_backend',
'status': 'available'})
volume_src = objects.Volume.get_by_id(self.context, volume_src['id'])
volume_dst = volume_api.create(self.context,
1,
'name',
'description',
source_volume=volume_src)
self.assertEqual(volume_dst['id'],
db.volume_get(context.get_admin_context(),
volume_dst['id']).id)
self.assertEqual(volume_src['id'],
db.volume_get(context.get_admin_context(),
volume_dst['id']).source_volid)
# ensure encryption keys match
self.assertIsNotNone(volume_src['encryption_key_id'])
self.assertIsNotNone(volume_dst['encryption_key_id'])
km = volume_api.key_manager # must use *same* key manager
volume_src_key = km.get(self.context,
volume_src['encryption_key_id'])
volume_dst_key = km.get(self.context,
volume_dst['encryption_key_id'])
self.assertEqual(volume_src_key, volume_dst_key)
def test_delete_invalid_status_fails(self):
self.volume_params['status'] = 'invalid1234'
volume = tests_utils.create_volume(self.context,
**self.volume_params)
vol_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidVolume,
vol_api.delete,
self.context,
volume)
def test_create_volume_from_snapshot_fail_bad_size(self):
"""Test volume can't be created from snapshot with bad volume size."""
volume_api = cinder.volume.api.API()
snapshot = {'id': fake.SNAPSHOT_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 10}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
**snapshot)
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
snapshot=snapshot_obj)
def test_create_volume_from_snapshot_fail_wrong_az(self):
"""Test volume can't be created from snapshot in a different az."""
volume_api = cinder.volume.api.API()
def fake_list_availability_zones(enable_cache=False):
return ({'name': 'nova', 'available': True},
{'name': 'az2', 'available': True})
self.mock_object(volume_api,
'list_availability_zones',
fake_list_availability_zones)
volume_src = tests_utils.create_volume(self.context,
availability_zone='az2',
**self.volume_params)
self.volume.create_volume(self.context, volume_src)
snapshot = create_snapshot(volume_src['id'])
self.volume.create_snapshot(self.context, snapshot)
volume_dst = volume_api.create(self.context,
size=1,
name='fake_name',
description='fake_desc',
snapshot=snapshot)
self.assertEqual('az2', volume_dst['availability_zone'])
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
snapshot=snapshot,
availability_zone='nova')
def test_create_volume_with_invalid_exclusive_options(self):
"""Test volume create with multiple exclusive options fails."""
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
1,
'name',
'description',
snapshot=fake.SNAPSHOT_ID,
image_id=fake.IMAGE_ID,
source_volume=fake.VOLUME_ID)
def test_reserve_volume_success(self):
volume = tests_utils.create_volume(self.context, status='available')
cinder.volume.api.API().reserve_volume(self.context, volume)
volume_db = db.volume_get(self.context, volume.id)
self.assertEqual('attaching', volume_db.status)
db.volume_destroy(self.context, volume.id)
def test_reserve_volume_in_attaching(self):
self._test_reserve_volume_bad_status('attaching')
def test_reserve_volume_in_maintenance(self):
self._test_reserve_volume_bad_status('maintenance')
def _test_reserve_volume_bad_status(self, status):
volume = tests_utils.create_volume(self.context, status=status)
self.assertRaises(exception.InvalidVolume,
cinder.volume.api.API().reserve_volume,
self.context,
volume)
db.volume_destroy(self.context, volume.id)
def test_unreserve_volume_success_in_use(self):
UUID = six.text_type(uuid.uuid4())
volume = tests_utils.create_volume(self.context, status='attaching')
tests_utils.attach_volume(self.context, volume.id, UUID,
'attached_host', 'mountpoint', mode='rw')
cinder.volume.api.API().unreserve_volume(self.context, volume)
db_volume = db.volume_get(self.context, volume.id)
self.assertEqual('in-use', db_volume.status)
def test_unreserve_volume_success_available(self):
volume = tests_utils.create_volume(self.context, status='attaching')
cinder.volume.api.API().unreserve_volume(self.context, volume)
db_volume = db.volume_get(self.context, volume.id)
self.assertEqual('available', db_volume.status)
def test_multi_node(self):
# TODO(termie): Figure out how to test with two nodes,
# each of them having a different FLAG for storage_node
# This will allow us to test cross-node interactions
pass
def test_cannot_delete_volume_in_use(self):
"""Test volume can't be deleted in in-use status."""
self._test_cannot_delete_volume('in-use')
def test_cannot_delete_volume_maintenance(self):
"""Test volume can't be deleted in maintenance status."""
self._test_cannot_delete_volume('maintenance')
def _test_cannot_delete_volume(self, status):
"""Test volume can't be deleted in invalid stats."""
# create a volume and assign to host
volume = tests_utils.create_volume(self.context, CONF.host,
status=status)
# 'in-use' status raises InvalidVolume
self.assertRaises(exception.InvalidVolume,
self.volume_api.delete,
self.context,
volume)
# clean up
self.volume.delete_volume(self.context, volume)
def test_force_delete_volume(self):
"""Test volume can be forced to delete."""
# create a volume and assign to host
self.volume_params['status'] = 'error_deleting'
volume = tests_utils.create_volume(self.context, **self.volume_params)
# 'error_deleting' volumes can't be deleted
self.assertRaises(exception.InvalidVolume,
self.volume_api.delete,
self.context,
volume)
# delete with force
self.volume_api.delete(self.context, volume, force=True)
# status is deleting
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('deleting', volume.status)
# clean up
self.volume.delete_volume(self.context, volume)
def test_cannot_force_delete_attached_volume(self):
"""Test volume can't be force delete in attached state."""
volume = tests_utils.create_volume(self.context, CONF.host,
status='in-use',
attach_status=
fields.VolumeAttachStatus.ATTACHED)
self.assertRaises(exception.InvalidVolume,
self.volume_api.delete,
self.context,
volume,
force=True)
db.volume_destroy(self.context, volume.id)
def test__revert_to_snapshot_generic_failed(self):
fake_volume = tests_utils.create_volume(self.context,
status='available')
fake_snapshot = tests_utils.create_snapshot(self.context,
fake_volume.id)
with mock.patch.object(
self.volume.driver,
'_create_temp_volume_from_snapshot') as mock_temp, \
mock.patch.object(
self.volume.driver,
'delete_volume') as mock_driver_delete, \
mock.patch.object(
self.volume, '_copy_volume_data') as mock_copy:
temp_volume = tests_utils.create_volume(self.context,
status='available')
mock_copy.side_effect = [exception.VolumeDriverException('error')]
mock_temp.return_value = temp_volume
self.assertRaises(exception.VolumeDriverException,
self.volume._revert_to_snapshot_generic,
self.context, fake_volume, fake_snapshot)
mock_copy.assert_called_once_with(
self.context, temp_volume, fake_volume)
mock_driver_delete.assert_called_once_with(temp_volume)
def test__revert_to_snapshot_generic(self):
fake_volume = tests_utils.create_volume(self.context,
status='available')
fake_snapshot = tests_utils.create_snapshot(self.context,
fake_volume.id)
with mock.patch.object(
self.volume.driver,
'_create_temp_volume_from_snapshot') as mock_temp,\
mock.patch.object(
self.volume.driver, 'delete_volume') as mock_driver_delete,\
mock.patch.object(
self.volume, '_copy_volume_data') as mock_copy:
temp_volume = tests_utils.create_volume(self.context,
status='available')
mock_temp.return_value = temp_volume
self.volume._revert_to_snapshot_generic(
self.context, fake_volume, fake_snapshot)
mock_copy.assert_called_once_with(
self.context, temp_volume, fake_volume)
mock_driver_delete.assert_called_once_with(temp_volume)
@ddt.data({'driver_error': True},
{'driver_error': False})
@ddt.unpack
def test__revert_to_snapshot(self, driver_error):
mock.patch.object(self.volume, '_notify_about_snapshot_usage')
with mock.patch.object(self.volume.driver,
'revert_to_snapshot') as driver_revert, \
mock.patch.object(self.volume, '_notify_about_volume_usage'), \
mock.patch.object(self.volume, '_notify_about_snapshot_usage'),\
mock.patch.object(self.volume,
'_revert_to_snapshot_generic') as generic_revert:
if driver_error:
driver_revert.side_effect = [NotImplementedError]
else:
driver_revert.return_value = None
self.volume._revert_to_snapshot(self.context, {}, {})
driver_revert.assert_called_once_with(self.context, {}, {})
if driver_error:
generic_revert.assert_called_once_with(self.context, {}, {})
@ddt.data({},
{'has_snapshot': True},
{'use_temp_snapshot': True},
{'use_temp_snapshot': True, 'has_snapshot': True})
@ddt.unpack
def test_revert_to_snapshot(self, has_snapshot=False,
use_temp_snapshot=False):
fake_volume = tests_utils.create_volume(self.context,
status='reverting',
project_id='123',
size=2)
fake_snapshot = tests_utils.create_snapshot(self.context,
fake_volume['id'],
status='restoring',
volume_size=1)
with mock.patch.object(self.volume,
'_revert_to_snapshot') as _revert,\
mock.patch.object(self.volume,
'_create_backup_snapshot') as _create_snapshot,\
mock.patch.object(self.volume,
'delete_snapshot') as _delete_snapshot, \
mock.patch.object(self.volume.driver,
'snapshot_revert_use_temp_snapshot') as \
_use_temp_snap:
_revert.return_value = None
_use_temp_snap.return_value = use_temp_snapshot
if has_snapshot:
_create_snapshot.return_value = {'id': 'fake_snapshot'}
else:
_create_snapshot.return_value = None
self.volume.revert_to_snapshot(self.context, fake_volume,
fake_snapshot)
_revert.assert_called_once_with(self.context, fake_volume,
fake_snapshot)
if not use_temp_snapshot:
_create_snapshot.assert_not_called()
else:
_create_snapshot.assert_called_once_with(self.context,
fake_volume)
if use_temp_snapshot and has_snapshot:
_delete_snapshot.assert_called_once_with(
self.context, {'id': 'fake_snapshot'}, handle_quota=False)
else:
_delete_snapshot.assert_not_called()
fake_volume.refresh()
fake_snapshot.refresh()
self.assertEqual('available', fake_volume['status'])
self.assertEqual('available', fake_snapshot['status'])
self.assertEqual(2, fake_volume['size'])
def test_revert_to_snapshot_failed(self):
fake_volume = tests_utils.create_volume(self.context,
status='reverting',
project_id='123',
size=2)
fake_snapshot = tests_utils.create_snapshot(self.context,
fake_volume['id'],
status='restoring',
volume_size=1)
with mock.patch.object(self.volume,
'_revert_to_snapshot') as _revert, \
mock.patch.object(self.volume,
'_create_backup_snapshot'), \
mock.patch.object(self.volume,
'delete_snapshot') as _delete_snapshot:
_revert.side_effect = [exception.VolumeDriverException(
message='fake_message')]
self.assertRaises(exception.VolumeDriverException,
self.volume.revert_to_snapshot,
self.context, fake_volume,
fake_snapshot)
_revert.assert_called_once_with(self.context, fake_volume,
fake_snapshot)
_delete_snapshot.assert_not_called()
fake_volume.refresh()
fake_snapshot.refresh()
self.assertEqual('error', fake_volume['status'])
self.assertEqual('available', fake_snapshot['status'])
self.assertEqual(2, fake_volume['size'])
def test_cannot_delete_volume_with_snapshots(self):
"""Test volume can't be deleted with dependent snapshots."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume)
snapshot = create_snapshot(volume['id'], size=volume['size'])
self.volume.create_snapshot(self.context, snapshot)
self.assertEqual(
snapshot.id, objects.Snapshot.get_by_id(self.context,
snapshot.id).id)
volume['status'] = 'available'
volume['host'] = 'fakehost'
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidVolume,
volume_api.delete,
self.context,
volume)
self.volume.delete_snapshot(self.context, snapshot)
self.volume.delete_volume(self.context, volume)
def test_can_delete_errored_snapshot(self):
"""Test snapshot can be created and deleted."""
volume = tests_utils.create_volume(self.context, CONF.host)
snapshot = create_snapshot(volume.id, size=volume['size'],
ctxt=self.context,
status=fields.SnapshotStatus.ERROR)
self.volume_api.delete_snapshot(self.context, snapshot)
self.assertEqual(fields.SnapshotStatus.DELETING, snapshot.status)
self.volume.delete_volume(self.context, volume)
def test_create_snapshot_set_worker(self):
volume = tests_utils.create_volume(self.context)
snapshot = create_snapshot(volume.id, size=volume['size'],
ctxt=self.context,
status=fields.SnapshotStatus.CREATING)
self.volume.create_snapshot(self.context, snapshot)
volume.set_worker.assert_called_once_with()
def test_cannot_delete_snapshot_with_bad_status(self):
volume = tests_utils.create_volume(self.context, CONF.host)
snapshot = create_snapshot(volume.id, size=volume['size'],
ctxt=self.context,
status=fields.SnapshotStatus.CREATING)
self.assertRaises(exception.InvalidSnapshot,
self.volume_api.delete_snapshot,
self.context,
snapshot)
snapshot.status = fields.SnapshotStatus.ERROR
snapshot.save()
self.volume_api.delete_snapshot(self.context, snapshot)
self.assertEqual(fields.SnapshotStatus.DELETING, snapshot.status)
self.volume.delete_volume(self.context, volume)
@mock.patch.object(QUOTAS, "rollback")
@mock.patch.object(QUOTAS, "commit")
@mock.patch.object(QUOTAS, "reserve", return_value=["RESERVATION"])
def _do_test_create_volume_with_size(self, size, *_unused_quota_mocks):
volume_api = cinder.volume.api.API()
volume = volume_api.create(self.context,
size,
'name',
'description')
self.assertEqual(int(size), volume['size'])
def test_create_volume_int_size(self):
"""Test volume creation with int size."""
self._do_test_create_volume_with_size(2)
def test_create_volume_string_size(self):
"""Test volume creation with string size."""
self._do_test_create_volume_with_size('2')
@mock.patch.object(QUOTAS, "rollback")
@mock.patch.object(QUOTAS, "commit")
@mock.patch.object(QUOTAS, "reserve", return_value=["RESERVATION"])
def test_create_volume_with_bad_size(self, *_unused_quota_mocks):
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
'2Gb',
'name',
'description')
def test_create_volume_with_float_fails(self):
"""Test volume creation with invalid float size."""
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
'1.5',
'name',
'description')
def test_create_volume_with_zero_size_fails(self):
"""Test volume creation with string size."""
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
'0',
'name',
'description')
def test_begin_detaching_fails_available(self):
volume_api = cinder.volume.api.API()
volume = tests_utils.create_volume(self.context, status='available')
# Volume status is 'available'.
self.assertRaises(exception.InvalidVolume, volume_api.begin_detaching,
self.context, volume)
db.volume_update(self.context, volume.id,
{'status': 'in-use',
'attach_status':
fields.VolumeAttachStatus.DETACHED})
# Should raise an error since not attached
self.assertRaises(exception.InvalidVolume, volume_api.begin_detaching,
self.context, volume)
db.volume_update(self.context, volume.id,
{'attach_status':
fields.VolumeAttachStatus.ATTACHED})
# Ensure when attached no exception raised
volume_api.begin_detaching(self.context, volume)
volume_api.update(self.context, volume, {'status': 'maintenance'})
self.assertRaises(exception.InvalidVolume, volume_api.begin_detaching,
self.context, volume)
db.volume_destroy(self.context, volume.id)
def test_begin_roll_detaching_volume(self):
"""Test begin_detaching and roll_detaching functions."""
instance_uuid = '12345678-1234-5678-1234-567812345678'
volume = tests_utils.create_volume(self.context, **self.volume_params)
attachment = db.volume_attach(self.context,
{'volume_id': volume['id'],
'attached_host': 'fake-host'})
db.volume_attached(self.context, attachment['id'], instance_uuid,
'fake-host', 'vdb')
volume_api = cinder.volume.api.API()
volume_api.begin_detaching(self.context, volume)
volume = volume_api.get(self.context, volume['id'])
self.assertEqual("detaching", volume['status'])
volume_api.roll_detaching(self.context, volume)
volume = volume_api.get(self.context, volume['id'])
self.assertEqual("in-use", volume['status'])
def test_volume_api_update(self):
# create a raw vol
volume = tests_utils.create_volume(self.context, **self.volume_params)
# use volume.api to update name
volume_api = cinder.volume.api.API()
update_dict = {'display_name': 'test update name'}
volume_api.update(self.context, volume, update_dict)
# read changes from db
vol = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEqual('test update name', vol['display_name'])
def test_volume_api_update_maintenance(self):
# create a raw vol
volume = tests_utils.create_volume(self.context, **self.volume_params)
volume['status'] = 'maintenance'
# use volume.api to update name
volume_api = cinder.volume.api.API()
update_dict = {'display_name': 'test update name'}
self.assertRaises(exception.InvalidVolume, volume_api.update,
self.context, volume, update_dict)
def test_volume_api_get_list_volumes_image_metadata(self):
"""Test get_list_volumes_image_metadata in volume API."""
ctxt = context.get_admin_context()
db.volume_create(ctxt, {'id': 'fake1', 'status': 'available',
'host': 'test', 'provider_location': '',
'size': 1})
db.volume_glance_metadata_create(ctxt, 'fake1', 'key1', 'value1')
db.volume_glance_metadata_create(ctxt, 'fake1', 'key2', 'value2')
db.volume_create(ctxt, {'id': 'fake2', 'status': 'available',
'host': 'test', 'provider_location': '',
'size': 1})
db.volume_glance_metadata_create(ctxt, 'fake2', 'key3', 'value3')
db.volume_glance_metadata_create(ctxt, 'fake2', 'key4', 'value4')
volume_api = cinder.volume.api.API()
results = volume_api.get_list_volumes_image_metadata(ctxt, ['fake1',
'fake2'])
expect_results = {'fake1': {'key1': 'value1', 'key2': 'value2'},
'fake2': {'key3': 'value3', 'key4': 'value4'}}
self.assertEqual(expect_results, results)
@mock.patch.object(QUOTAS, 'limit_check')
@mock.patch.object(QUOTAS, 'reserve')
def test_extend_attached_volume(self, reserve, limit_check):
volume = tests_utils.create_volume(self.context, size=2,
status='available', host=CONF.host)
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidVolume,
volume_api._extend,
self.context,
volume, 3, attached=True)
db.volume_update(self.context, volume.id, {'status': 'in-use'})
volume.refresh()
reserve.return_value = ["RESERVATION"]
volume_api._extend(self.context, volume, 3, attached=True)
volume.refresh()
self.assertEqual('extending', volume.status)
self.assertEqual('in-use', volume.previous_status)
reserve.assert_called_once_with(self.context, gigabytes=1,
project_id=volume.project_id)
limit_check.side_effect = None
reserve.side_effect = None
db.volume_update(self.context, volume.id, {'status': 'in-use'})
volume_api.scheduler_rpcapi = mock.MagicMock()
volume_api.scheduler_rpcapi.extend_volume = mock.MagicMock()
volume_api._extend(self.context, volume, 3, attached=True)
request_spec = {
'volume_properties': volume,
'volume_type': {},
'volume_id': volume.id
}
volume_api.scheduler_rpcapi.extend_volume.assert_called_once_with(
self.context, volume, 3, ["RESERVATION"], request_spec)
# clean up
self.volume.delete_volume(self.context, volume)
@mock.patch.object(QUOTAS, 'limit_check')
@mock.patch.object(QUOTAS, 'reserve')
def test_extend_volume(self, reserve, limit_check):
"""Test volume can be extended at API level."""
# create a volume and assign to host
volume = tests_utils.create_volume(self.context, size=2,
status='in-use', host=CONF.host)
volume_api = cinder.volume.api.API()
# Extend fails when status != available
self.assertRaises(exception.InvalidVolume,
volume_api._extend,
self.context,
volume,
3)
db.volume_update(self.context, volume.id, {'status': 'available'})
volume.refresh()
# Extend fails when new_size < orig_size
self.assertRaises(exception.InvalidInput,
volume_api._extend,
self.context,
volume,
1)
# Extend fails when new_size == orig_size
self.assertRaises(exception.InvalidInput,
volume_api._extend,
self.context,
volume,
2)
# works when new_size > orig_size
reserve.return_value = ["RESERVATION"]
volume_api._extend(self.context, volume, 3)
volume.refresh()
self.assertEqual('extending', volume.status)
self.assertEqual('available', volume.previous_status)
reserve.assert_called_once_with(self.context, gigabytes=1,
project_id=volume.project_id)
# Test the quota exceeded
db.volume_update(self.context, volume.id, {'status': 'available'})
reserve.side_effect = exception.OverQuota(overs=['gigabytes'],
quotas={'gigabytes': 20},
usages={'gigabytes':
{'reserved': 5,
'in_use': 15}})
self.assertRaises(exception.VolumeSizeExceedsAvailableQuota,
volume_api._extend, self.context,
volume, 3)
db.volume_update(self.context, volume.id, {'status': 'available'})
limit_check.side_effect = exception.OverQuota(
overs=['per_volume_gigabytes'], quotas={'per_volume_gigabytes': 2})
self.assertRaises(exception.VolumeSizeExceedsLimit,
volume_api._extend, self.context,
volume, 3)
# Test scheduler path
limit_check.side_effect = None
reserve.side_effect = None
db.volume_update(self.context, volume.id, {'status': 'available'})
volume_api.scheduler_rpcapi = mock.MagicMock()
volume_api.scheduler_rpcapi.extend_volume = mock.MagicMock()
volume_api._extend(self.context, volume, 3)
request_spec = {
'volume_properties': volume,
'volume_type': {},
'volume_id': volume.id
}
volume_api.scheduler_rpcapi.extend_volume.assert_called_once_with(
self.context, volume, 3, ["RESERVATION"], request_spec)
# clean up
self.volume.delete_volume(self.context, volume)
def test_extend_volume_driver_not_initialized(self):
"""Test volume can be extended at API level."""
# create a volume and assign to host
fake_reservations = ['RESERVATION']
volume = tests_utils.create_volume(self.context, size=2,
status='available',
host=CONF.host)
self.volume.create_volume(self.context, volume)
self.volume.driver._initialized = False
self.assertRaises(exception.DriverNotInitialized,
self.volume.extend_volume,
self.context, volume, 3,
fake_reservations)
volume.refresh()
self.assertEqual('error_extending', volume.status)
# lets cleanup the mess.
self.volume.driver._initialized = True
self.volume.delete_volume(self.context, volume)
def _test_extend_volume_manager_fails_with_exception(self, volume):
fake_reservations = ['RESERVATION']
# Test driver exception
with mock.patch.object(self.volume.driver,
'extend_volume') as extend_volume:
extend_volume.side_effect =\
exception.CinderException('fake exception')
volume['status'] = 'extending'
self.volume.extend_volume(self.context, volume, '4',
fake_reservations)
volume.refresh()
self.assertEqual(2, volume.size)
self.assertEqual('error_extending', volume.status)
@mock.patch('cinder.compute.API')
def _test_extend_volume_manager_successful(self, volume, nova_api):
"""Test volume can be extended at the manager level."""
def fake_extend(volume, new_size):
volume['size'] = new_size
nova_extend_volume = nova_api.return_value.extend_volume
fake_reservations = ['RESERVATION']
orig_status = volume.status
# Test driver success
with mock.patch.object(self.volume.driver,
'extend_volume') as extend_volume:
with mock.patch.object(QUOTAS, 'commit') as quotas_commit:
extend_volume.return_value = fake_extend
volume.status = 'extending'
self.volume.extend_volume(self.context, volume, '4',
fake_reservations)
volume.refresh()
self.assertEqual(4, volume.size)
self.assertEqual(orig_status, volume.status)
quotas_commit.assert_called_with(
self.context,
['RESERVATION'],
project_id=volume.project_id)
if orig_status == 'in-use':
instance_uuids = [
attachment.instance_uuid
for attachment in volume.volume_attachment]
nova_extend_volume.assert_called_with(
self.context, instance_uuids, volume.id)
def test_extend_volume_manager_available_fails_with_exception(self):
volume = tests_utils.create_volume(self.context, size=2,
status='creating', host=CONF.host)
self.volume.create_volume(self.context, volume)
self._test_extend_volume_manager_fails_with_exception(volume)
self.volume.delete_volume(self.context, volume)
def test_extend_volume_manager_available_successful(self):
volume = tests_utils.create_volume(self.context, size=2,
status='creating', host=CONF.host)
self.volume.create_volume(self.context, volume)
self._test_extend_volume_manager_successful(volume)
self.volume.delete_volume(self.context, volume)
def test_extend_volume_manager_in_use_fails_with_exception(self):
volume = tests_utils.create_volume(self.context, size=2,
status='creating', host=CONF.host)
self.volume.create_volume(self.context, volume)
instance_uuid = '12345678-1234-5678-1234-567812345678'
attachment = db.volume_attach(self.context,
{'volume_id': volume.id,
'attached_host': 'fake-host'})
db.volume_attached(self.context, attachment.id, instance_uuid,
'fake-host', 'vdb')
volume.refresh()
self._test_extend_volume_manager_fails_with_exception(volume)
self.volume.detach_volume(self.context, volume.id, attachment.id)
self.volume.delete_volume(self.context, volume)
def test_extend_volume_manager_in_use_successful(self):
volume = tests_utils.create_volume(self.context, size=2,
status='creating', host=CONF.host)
self.volume.create_volume(self.context, volume)
instance_uuid = '12345678-1234-5678-1234-567812345678'
attachment = db.volume_attach(self.context,
{'volume_id': volume.id,
'attached_host': 'fake-host'})
db.volume_attached(self.context, attachment.id, instance_uuid,
'fake-host', 'vdb')
volume.refresh()
self._test_extend_volume_manager_successful(volume)
self.volume.detach_volume(self.context, volume.id, attachment.id)
self.volume.delete_volume(self.context, volume)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.extend_volume')
def test_extend_volume_with_volume_type(self, mock_rpc_extend):
elevated = context.get_admin_context()
project_id = self.context.project_id
db.volume_type_create(elevated, {'name': 'type', 'extra_specs': {}})
vol_type = db.volume_type_get_by_name(elevated, 'type')
volume_api = cinder.volume.api.API()
volume = volume_api.create(self.context, 100, 'name', 'description',
volume_type=vol_type)
try:
usage = db.quota_usage_get(elevated, project_id, 'gigabytes_type')
volumes_in_use = usage.in_use
except exception.QuotaUsageNotFound:
volumes_in_use = 0
self.assertEqual(100, volumes_in_use)
db.volume_update(self.context, volume.id, {'status': 'available'})
volume_api._extend(self.context, volume, 200)
mock_rpc_extend.called_once_with(self.context, volume, 200, mock.ANY)
try:
usage = db.quota_usage_get(elevated, project_id, 'gigabytes_type')
volumes_reserved = usage.reserved
except exception.QuotaUsageNotFound:
volumes_reserved = 0
self.assertEqual(100, volumes_reserved)
def test_create_volume_from_sourcevol(self):
"""Test volume can be created from a source volume."""
def fake_create_cloned_volume(volume, src_vref):
pass
self.mock_object(self.volume.driver, 'create_cloned_volume',
fake_create_cloned_volume)
volume_src = tests_utils.create_volume(self.context,
**self.volume_params)
self.volume.create_volume(self.context, volume_src)
volume_dst = tests_utils.create_volume(self.context,
source_volid=volume_src['id'],
**self.volume_params)
self.volume.create_volume(self.context, volume_dst)
volume_dst.refresh()
self.assertEqual('available', volume_dst.status)
self.volume.delete_volume(self.context, volume_dst)
self.volume.delete_volume(self.context, volume_src)
@mock.patch('cinder.volume.api.API.list_availability_zones',
return_value=({'name': 'nova', 'available': True},
{'name': 'az2', 'available': True}))
def test_create_volume_from_sourcevol_fail_wrong_az(self, _mock_laz):
"""Test volume can't be cloned from an other volume in different az."""
volume_api = cinder.volume.api.API()
volume_src = tests_utils.create_volume(self.context,
availability_zone='az2',
**self.volume_params)
self.volume.create_volume(self.context, volume_src)
volume_src = db.volume_get(self.context, volume_src['id'])
volume_dst = volume_api.create(self.context,
size=1,
name='fake_name',
description='fake_desc',
source_volume=volume_src)
self.assertEqual('az2', volume_dst['availability_zone'])
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
source_volume=volume_src,
availability_zone='nova')
@mock.patch('cinder.image.image_utils.qemu_img_info')
def test_create_volume_from_sourcevol_with_glance_metadata(
self, mock_qemu_info):
"""Test glance metadata can be correctly copied to new volume."""
def fake_create_cloned_volume(volume, src_vref):
pass
self.mock_object(self.volume.driver, 'create_cloned_volume',
fake_create_cloned_volume)
image_info = imageutils.QemuImgInfo()
image_info.virtual_size = '1073741824'
mock_qemu_info.return_value = image_info
volume_src = self._create_volume_from_image()
self.volume.create_volume(self.context, volume_src)
volume_dst = tests_utils.create_volume(self.context,
source_volid=volume_src['id'],
**self.volume_params)
self.volume.create_volume(self.context, volume_dst)
self.assertEqual('available',
db.volume_get(context.get_admin_context(),
volume_dst['id']).status)
src_glancemeta = db.volume_get(context.get_admin_context(),
volume_src['id']).volume_glance_metadata
dst_glancemeta = db.volume_get(context.get_admin_context(),
volume_dst['id']).volume_glance_metadata
for meta_src in src_glancemeta:
for meta_dst in dst_glancemeta:
if meta_dst.key == meta_src.key:
self.assertEqual(meta_src.value, meta_dst.value)
self.volume.delete_volume(self.context, volume_src)
self.volume.delete_volume(self.context, volume_dst)
def test_create_volume_from_sourcevol_failed_clone(self):
"""Test src vol status will be restore by error handling code."""
def fake_error_create_cloned_volume(volume, src_vref):
db.volume_update(self.context, src_vref['id'], {'status': 'error'})
raise exception.CinderException('fake exception')
self.mock_object(self.volume.driver, 'create_cloned_volume',
fake_error_create_cloned_volume)
volume_src = tests_utils.create_volume(self.context,
**self.volume_params)
self.assertEqual('creating', volume_src.status)
self.volume.create_volume(self.context, volume_src)
self.assertEqual('available', volume_src.status)
volume_dst = tests_utils.create_volume(self.context,
source_volid=volume_src['id'],
**self.volume_params)
self.assertEqual('creating', volume_dst.status)
self.assertRaises(exception.CinderException,
self.volume.create_volume,
self.context,
volume_dst)
# Source volume's status is still available and dst is set to error
self.assertEqual('available', volume_src.status)
self.assertEqual('error', volume_dst.status)
self.volume.delete_volume(self.context, volume_dst)
self.volume.delete_volume(self.context, volume_src)
def test_clean_temporary_volume(self):
def fake_delete_volume(ctxt, volume):
volume.destroy()
fake_volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host,
migration_status='migrating')
fake_new_volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
# 1. Only clean the db
self.volume._clean_temporary_volume(self.context, fake_volume,
fake_new_volume,
clean_db_only=True)
self.assertRaises(exception.VolumeNotFound,
db.volume_get, self.context,
fake_new_volume.id)
# 2. Delete the backend storage
fake_new_volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
with mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume') as \
mock_delete_volume:
mock_delete_volume.side_effect = fake_delete_volume
self.volume._clean_temporary_volume(self.context,
fake_volume,
fake_new_volume,
clean_db_only=False)
self.assertRaises(exception.VolumeNotFound,
db.volume_get, self.context,
fake_new_volume.id)
# Check when the migrated volume is not in migration
fake_new_volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
fake_volume.migration_status = 'non-migrating'
fake_volume.save()
self.volume._clean_temporary_volume(self.context, fake_volume,
fake_new_volume)
volume = db.volume_get(context.get_admin_context(),
fake_new_volume.id)
self.assertIsNone(volume.migration_status)
def test_check_volume_filters_true(self):
"""Test bootable as filter for true"""
volume_api = cinder.volume.api.API()
filters = {'bootable': 'TRUE'}
# To convert filter value to True or False
volume_api.check_volume_filters(filters)
# Confirming converted filter value against True
self.assertTrue(filters['bootable'])
def test_check_volume_filters_false(self):
"""Test bootable as filter for false"""
volume_api = cinder.volume.api.API()
filters = {'bootable': 'false'}
# To convert filter value to True or False
volume_api.check_volume_filters(filters)
# Confirming converted filter value against False
self.assertEqual(False, filters['bootable'])
def test_check_volume_filters_invalid(self):
"""Test bootable as filter"""
volume_api = cinder.volume.api.API()
filters = {'bootable': 'invalid'}
# To convert filter value to True or False
volume_api.check_volume_filters(filters)
# Confirming converted filter value against invalid value
self.assertTrue(filters['bootable'])
def test_update_volume_readonly_flag(self):
"""Test volume readonly flag can be updated at API level."""
# create a volume and assign to host
volume = tests_utils.create_volume(self.context,
admin_metadata={'readonly': 'True'},
**self.volume_params)
self.volume.create_volume(self.context, volume)
volume.status = 'in-use'
def sort_func(obj):
return obj['name']
volume_api = cinder.volume.api.API()
# Update fails when status != available
self.assertRaises(exception.InvalidVolume,
volume_api.update_readonly_flag,
self.context,
volume,
False)
volume.status = 'available'
# works when volume in 'available' status
volume_api.update_readonly_flag(self.context, volume, False)
volume.refresh()
self.assertEqual('available', volume.status)
admin_metadata = volume.volume_admin_metadata
self.assertEqual(1, len(admin_metadata))
self.assertEqual('readonly', admin_metadata[0]['key'])
self.assertEqual('False', admin_metadata[0]['value'])
# clean up
self.volume.delete_volume(self.context, volume)
def test_secure_file_operations_enabled(self):
"""Test secure file operations setting for base driver.
General, non network file system based drivers do not have
anything to do with "secure_file_operations". This test verifies that
calling the method always returns False.
"""
ret_flag = self.volume.driver.secure_file_operations_enabled()
self.assertFalse(ret_flag)
@mock.patch.object(driver.BaseVD, 'secure_file_operations_enabled')
def test_secure_file_operations_enabled_2(self, mock_secure):
mock_secure.return_value = True
vol = tests_utils.create_volume(self.context)
result = self.volume.secure_file_operations_enabled(self.context,
vol)
mock_secure.assert_called_once_with()
self.assertTrue(result)
@mock.patch('cinder.volume.flows.common.make_pretty_name',
new=mock.MagicMock())
@mock.patch('cinder.scheduler.rpcapi.SchedulerAPI.create_volume',
return_value=None)
@mock.patch('cinder.volume.flows.manager.create_volume.'
'CreateVolumeFromSpecTask.execute',
side_effect=exception.DriverNotInitialized())
def test_create_volume_raise_rescheduled_exception(self, mock_execute,
mock_reschedule):
# Create source volume
test_vol = tests_utils.create_volume(self.context,
**self.volume_params)
test_vol_id = test_vol['id']
self.assertRaises(exception.DriverNotInitialized,
self.volume.create_volume,
self.context, test_vol,
{'volume_properties': self.volume_params},
{'retry': {'num_attempts': 1, 'host': []}})
self.assertTrue(mock_reschedule.called)
volume = db.volume_get(context.get_admin_context(), test_vol_id)
self.assertEqual('creating', volume['status'])
@mock.patch('cinder.volume.flows.manager.create_volume.'
'CreateVolumeFromSpecTask.execute')
def test_create_volume_raise_unrescheduled_exception(self, mock_execute):
# create source volume
test_vol = tests_utils.create_volume(self.context,
**self.volume_params)
test_vol_id = test_vol['id']
mock_execute.side_effect = exception.VolumeNotFound(
volume_id=test_vol_id)
self.assertRaises(exception.VolumeNotFound,
self.volume.create_volume,
self.context, test_vol,
{'volume_properties': self.volume_params},
{'retry': {'num_attempts': 1, 'host': []}})
volume = db.volume_get(context.get_admin_context(), test_vol_id)
self.assertEqual('error', volume['status'])
def test_cascade_delete_volume_with_snapshots(self):
"""Test volume deletion with dependent snapshots."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume)
snapshot = create_snapshot(volume['id'], size=volume['size'])
self.volume.create_snapshot(self.context, snapshot)
self.assertEqual(
snapshot.id, objects.Snapshot.get_by_id(self.context,
snapshot.id).id)
volume['status'] = 'available'
volume['host'] = 'fakehost'
volume_api = cinder.volume.api.API()
volume_api.delete(self.context,
volume,
cascade=True)
def test_cascade_delete_volume_with_snapshots_error(self):
"""Test volume deletion with dependent snapshots."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume)
snapshot = create_snapshot(volume['id'], size=volume['size'])
self.volume.create_snapshot(self.context, snapshot)
self.assertEqual(
snapshot.id, objects.Snapshot.get_by_id(self.context,
snapshot.id).id)
snapshot.update({'status': fields.SnapshotStatus.CREATING})
snapshot.save()
volume['status'] = 'available'
volume['host'] = 'fakehost'
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidVolume,
volume_api.delete,
self.context,
volume,
cascade=True)
def test_cascade_force_delete_volume_with_snapshots_error(self):
"""Test volume force deletion with errored dependent snapshots."""
volume = tests_utils.create_volume(self.context,
host='fakehost')
snapshot = create_snapshot(volume.id,
size=volume.size,
status=fields.SnapshotStatus.ERROR_DELETING)
self.volume.create_snapshot(self.context, snapshot)
volume_api = cinder.volume.api.API()
volume_api.delete(self.context, volume, cascade=True, force=True)
snapshot = objects.Snapshot.get_by_id(self.context, snapshot.id)
self.assertEqual('deleting', snapshot.status)
volume = objects.Volume.get_by_id(self.context, volume.id)
self.assertEqual('deleting', volume.status)
def test_cascade_delete_volume_with_snapshots_in_other_project(self):
"""Test volume deletion with dependent snapshots in other project."""
volume = tests_utils.create_volume(self.user_context,
**self.volume_params)
snapshot = create_snapshot(volume['id'], size=volume['size'],
project_id=fake.PROJECT2_ID)
self.volume.create_snapshot(self.context, snapshot)
self.assertEqual(
snapshot.id, objects.Snapshot.get_by_id(self.context,
snapshot.id).id)
volume['status'] = 'available'
volume['host'] = 'fakehost'
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidVolume,
volume_api.delete,
self.user_context,
volume,
cascade=True)
@mock.patch.object(driver.BaseVD, 'get_backup_device')
@mock.patch.object(driver.BaseVD, 'secure_file_operations_enabled')
def test_get_backup_device(self, mock_secure, mock_get_backup):
vol = tests_utils.create_volume(self.context)
backup = tests_utils.create_backup(self.context, vol['id'])
mock_secure.return_value = False
mock_get_backup.return_value = (vol, False)
result = self.volume.get_backup_device(self.context,
backup)
mock_get_backup.assert_called_once_with(self.context, backup)
mock_secure.assert_called_once_with()
expected_result = {'backup_device': vol, 'secure_enabled': False,
'is_snapshot': False}
self.assertEqual(expected_result, result)
@mock.patch.object(driver.BaseVD, 'get_backup_device')
@mock.patch.object(driver.BaseVD, 'secure_file_operations_enabled')
def test_get_backup_device_want_objects(self, mock_secure,
mock_get_backup):
vol = tests_utils.create_volume(self.context)
backup = tests_utils.create_backup(self.context, vol['id'])
mock_secure.return_value = False
mock_get_backup.return_value = (vol, False)
result = self.volume.get_backup_device(self.context,
backup, want_objects=True)
mock_get_backup.assert_called_once_with(self.context, backup)
mock_secure.assert_called_once_with()
expected_result = objects.BackupDeviceInfo.from_primitive(
{'backup_device': vol, 'secure_enabled': False,
'is_snapshot': False},
self.context)
self.assertEqual(expected_result, result)
@mock.patch('cinder.tests.fake_driver.FakeLoggingVolumeDriver.'
'SUPPORTS_ACTIVE_ACTIVE', True)
def test_set_resource_host_different(self):
manager = vol_manager.VolumeManager(host='localhost-1@ceph',
cluster='mycluster@ceph')
volume = tests_utils.create_volume(self.user_context,
host='localhost-2@ceph#ceph',
cluster_name='mycluster@ceph')
manager._set_resource_host(volume)
volume.refresh()
self.assertEqual('localhost-1@ceph#ceph', volume.host)
@mock.patch('cinder.tests.fake_driver.FakeLoggingVolumeDriver.'
'SUPPORTS_ACTIVE_ACTIVE', True)
def test_set_resource_host_equal(self):
manager = vol_manager.VolumeManager(host='localhost-1@ceph',
cluster='mycluster@ceph')
volume = tests_utils.create_volume(self.user_context,
host='localhost-1@ceph#ceph',
cluster_name='mycluster@ceph')
with mock.patch.object(volume, 'save') as save_mock:
manager._set_resource_host(volume)
save_mock.assert_not_called()
def test_volume_attach_attaching(self):
"""Test volume_attach ."""
instance_uuid = '12345678-1234-5678-1234-567812345678'
volume = tests_utils.create_volume(self.context, **self.volume_params)
attachment = db.volume_attach(self.context,
{'volume_id': volume['id'],
'attached_host': 'fake-host'})
db.volume_attached(self.context, attachment['id'], instance_uuid,
'fake-host', 'vdb', mark_attached=False)
volume_api = cinder.volume.api.API()
volume = volume_api.get(self.context, volume['id'])
self.assertEqual("attaching", volume['status'])
self.assertEqual("attaching", volume['attach_status'])
def test__append_volume_stats_with_pools(self):
manager = vol_manager.VolumeManager()
manager.stats = {'pools': {'pool1': {'allocated_capacity_gb': 20},
'pool2': {'allocated_capacity_gb': 10}}}
vol_stats = {'vendor_name': 'Open Source', 'pools': [
{'pool_name': 'pool1', 'provisioned_capacity_gb': 31},
{'pool_name': 'pool2', 'provisioned_capacity_gb': 21}]}
manager._append_volume_stats(vol_stats)
expected = {'provisioned_capacity_gb': 30, 'allocated_capacity_gb': 20}
expected = {'vendor_name': 'Open Source', 'pools': [
{'pool_name': 'pool1', 'provisioned_capacity_gb': 31,
'allocated_capacity_gb': 20},
{'pool_name': 'pool2', 'provisioned_capacity_gb': 21,
'allocated_capacity_gb': 10}]}
self.assertDictEqual(expected, vol_stats)
def test__append_volume_stats_no_pools(self):
manager = vol_manager.VolumeManager()
manager.stats = {'pools': {'backend': {'allocated_capacity_gb': 20}}}
vol_stats = {'provisioned_capacity_gb': 30}
manager._append_volume_stats(vol_stats)
expected = {'provisioned_capacity_gb': 30, 'allocated_capacity_gb': 20}
self.assertDictEqual(expected, vol_stats)
def test__append_volume_stats_no_pools_no_volumes(self):
manager = vol_manager.VolumeManager()
# This is what gets set on c-vol manager's init_host method
manager.stats = {'pools': {}, 'allocated_capacity_gb': 0}
vol_stats = {'provisioned_capacity_gb': 30}
manager._append_volume_stats(vol_stats)
expected = {'provisioned_capacity_gb': 30, 'allocated_capacity_gb': 0}
self.assertDictEqual(expected, vol_stats)
def test__append_volume_stats_driver_error(self):
manager = vol_manager.VolumeManager()
self.assertRaises(exception.ProgrammingError,
manager._append_volume_stats, {'pools': 'bad_data'})
class VolumeTestCaseLocks(base.BaseVolumeTestCase):
MOCK_TOOZ = False
def test_create_volume_from_volume_delete_lock_taken(self):
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
# no lock
self.volume.create_volume(self.context, src_vol)
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
orig_elevated = self.context.elevated
gthreads = []
def mock_elevated(*args, **kwargs):
# unset mock so it is only called once
self.mock_object(self.context, 'elevated', orig_elevated)
# we expect this to block and then fail
t = eventlet.spawn(self.volume.create_volume,
self.context,
volume=dst_vol,
request_spec={'source_volid': src_vol_id})
gthreads.append(t)
return orig_elevated(*args, **kwargs)
# mock something from early on in the delete operation and within the
# lock so that when we do the create we expect it to block.
self.mock_object(self.context, 'elevated', mock_elevated)
# locked
self.volume.delete_volume(self.context, src_vol)
# we expect the volume create to fail with the following err since the
# source volume was deleted while the create was locked. Note that the
# volume is still in the db since it was created by the test prior to
# calling manager.create_volume.
with mock.patch('sys.stderr', new=six.StringIO()):
self.assertRaises(exception.VolumeNotFound, gthreads[0].wait)
def test_create_volume_from_snapshot_delete_lock_taken(self):
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
# no lock
self.volume.create_volume(self.context, src_vol)
# create snapshot
snap_id = create_snapshot(src_vol.id,
size=src_vol['size'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
# no lock
self.volume.create_snapshot(self.context, snapshot_obj)
# create vol from snapshot...
dst_vol = tests_utils.create_volume(self.context,
snapshot_id=snap_id,
source_volid=src_vol.id,
**self.volume_params)
orig_elevated = self.context.elevated
gthreads = []
def mock_elevated(*args, **kwargs):
# unset mock so it is only called once
self.mock_object(self.context, 'elevated', orig_elevated)
# We expect this to block and then fail
t = eventlet.spawn(self.volume.create_volume, self.context,
volume=dst_vol,
request_spec={'snapshot_id': snap_id})
gthreads.append(t)
return orig_elevated(*args, **kwargs)
# mock something from early on in the delete operation and within the
# lock so that when we do the create we expect it to block.
self.mock_object(self.context, 'elevated', mock_elevated)
# locked
self.volume.delete_snapshot(self.context, snapshot_obj)
# we expect the volume create to fail with the following err since the
# snapshot was deleted while the create was locked. Note that the
# volume is still in the db since it was created by the test prior to
# calling manager.create_volume.
with mock.patch('sys.stderr', new=six.StringIO()):
self.assertRaises(exception.SnapshotNotFound, gthreads[0].wait)
# locked
self.volume.delete_volume(self.context, src_vol)
# make sure it is gone
self.assertRaises(exception.VolumeNotFound, db.volume_get,
self.context, src_vol.id)