OpenStack Block Storage (Cinder)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

3456 lines
152 KiB

# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for Volume Code."""
import datetime
import enum
import io
import time
from unittest import mock
import castellan
from castellan.common import exception as castellan_exception
from castellan import key_manager
import ddt
import eventlet
import os_brick.initiator.connectors.iscsi
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_utils import imageutils
from taskflow.engines.action_engine import engine
from cinder.api import common
from cinder import context
from cinder import coordination
from cinder import db
from cinder import exception
from cinder.message import message_field
from cinder import objects
from cinder.objects import fields
from cinder.policies import volumes as vol_policy
from cinder import quota
from cinder.tests import fake_driver
from cinder.tests.unit.api.v2 import fakes as v2_fakes
from cinder.tests.unit import conf_fixture
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.keymgr import fake as fake_keymgr
from cinder.tests.unit import utils as tests_utils
from cinder.tests.unit import volume as base
from cinder import utils
import cinder.volume
from cinder.volume import driver
from cinder.volume import manager as vol_manager
from cinder.volume import rpcapi as volume_rpcapi
import cinder.volume.targets.tgt
from cinder.volume import volume_types
QUOTAS = quota.QUOTAS
CONF = cfg.CONF
ENCRYPTION_PROVIDER = 'nova.volume.encryptors.cryptsetup.CryptsetupEncryptor'
fake_opt = [
cfg.StrOpt('fake_opt1', default='fake', help='fake opts')
]
def create_snapshot(volume_id, size=1, metadata=None, ctxt=None,
**kwargs):
"""Create a snapshot object."""
metadata = metadata or {}
snap = objects.Snapshot(ctxt or context.get_admin_context())
snap.volume_size = size
snap.user_id = fake.USER_ID
snap.project_id = fake.PROJECT_ID
snap.volume_id = volume_id
snap.status = fields.SnapshotStatus.CREATING
if metadata is not None:
snap.metadata = metadata
snap.update(kwargs)
snap.create()
return snap
class KeyObject(object):
def get_encoded(arg):
return "asdf".encode('utf-8')
class KeyObject2(object):
def get_encoded(arg):
return "qwert".encode('utf-8')
@ddt.ddt
class VolumeTestCase(base.BaseVolumeTestCase):
def setUp(self):
super(VolumeTestCase, self).setUp()
self.patch('cinder.volume.volume_utils.clear_volume', autospec=True)
self.expected_status = 'available'
self.service_id = 1
self.user_context = context.RequestContext(user_id=fake.USER_ID,
project_id=fake.PROJECT_ID)
elevated = context.get_admin_context()
db.volume_type_create(elevated,
v2_fakes.fake_default_type_get(
id=fake.VOLUME_TYPE2_ID))
self.vol_type = db.volume_type_get_by_name(elevated, '__DEFAULT__')
self._setup_volume_types()
def _create_volume(self, context, **kwargs):
return tests_utils.create_volume(
context,
volume_type_id=volume_types.get_default_volume_type()['id'],
**kwargs)
@mock.patch('cinder.objects.service.Service.get_minimum_rpc_version')
@mock.patch('cinder.objects.service.Service.get_minimum_obj_version')
@mock.patch('cinder.rpc.LAST_RPC_VERSIONS', {'cinder-scheduler': '1.3'})
@mock.patch('cinder.rpc.LAST_OBJ_VERSIONS', {'cinder-scheduler': '1.4'})
def test_reset(self, get_min_obj, get_min_rpc):
vol_mgr = vol_manager.VolumeManager()
scheduler_rpcapi = vol_mgr.scheduler_rpcapi
self.assertEqual('1.3', scheduler_rpcapi.client.version_cap)
self.assertEqual('1.4',
scheduler_rpcapi.client.serializer._base.version_cap)
get_min_obj.return_value = objects.base.OBJ_VERSIONS.get_current()
vol_mgr.reset()
scheduler_rpcapi = vol_mgr.scheduler_rpcapi
self.assertEqual(get_min_rpc.return_value,
scheduler_rpcapi.client.version_cap)
self.assertEqual(get_min_obj.return_value,
scheduler_rpcapi.client.serializer._base.version_cap)
self.assertIsNone(scheduler_rpcapi.client.serializer._base.manifest)
@mock.patch('oslo_utils.importutils.import_object')
def test_backend_availability_zone(self, mock_import_object):
# NOTE(smcginnis): This isn't really the best place for this test,
# but we don't currently have a pure VolumeManager test class. So
# until we create a good suite for that class, putting here with
# other tests that use VolumeManager.
opts = {
'backend_availability_zone': 'caerbannog'
}
def conf_get(option):
if option in opts:
return opts[option]
return None
mock_driver = mock.Mock()
mock_driver.configuration.safe_get.side_effect = conf_get
mock_driver.configuration.extra_capabilities = 'null'
def import_obj(*args, **kwargs):
return mock_driver
mock_import_object.side_effect = import_obj
manager = vol_manager.VolumeManager(volume_driver=mock_driver)
self.assertIsNotNone(manager)
self.assertEqual(opts['backend_availability_zone'],
manager.availability_zone)
@mock.patch('cinder.volume.manager.VolumeManager._append_volume_stats',
mock.Mock())
@mock.patch.object(vol_manager.VolumeManager,
'update_service_capabilities')
def test_report_filter_goodness_function(self, mock_update):
manager = vol_manager.VolumeManager()
manager.driver.set_initialized()
myfilterfunction = "myFilterFunction"
mygoodnessfunction = "myGoodnessFunction"
expected = {'name': 'cinder-volumes',
'storage_protocol': 'iSCSI',
'cacheable': True,
'filter_function': myfilterfunction,
'goodness_function': mygoodnessfunction,
}
with mock.patch.object(manager.driver,
'get_volume_stats') as m_get_stats:
with mock.patch.object(manager.driver,
'get_goodness_function') as m_get_goodness:
with mock.patch.object(manager.driver,
'get_filter_function') as m_get_filter:
m_get_stats.return_value = {'name': 'cinder-volumes',
'storage_protocol': 'iSCSI',
}
m_get_filter.return_value = myfilterfunction
m_get_goodness.return_value = mygoodnessfunction
manager._report_driver_status(context.get_admin_context())
self.assertTrue(m_get_stats.called)
mock_update.assert_called_once_with(expected)
def test_is_working(self):
# By default we have driver mocked to be initialized...
self.assertTrue(self.volume.is_working())
# ...lets switch it and check again!
self.volume.driver._initialized = False
self.assertFalse(self.volume.is_working())
def _create_min_max_size_dict(self, min_size, max_size):
return {volume_types.MIN_SIZE_KEY: min_size,
volume_types.MAX_SIZE_KEY: max_size}
def _setup_volume_types(self):
"""Creates 2 types, one with size limits, one without."""
spec_dict = self._create_min_max_size_dict(2, 4)
sized_vol_type_dict = {'name': 'limit',
'extra_specs': spec_dict}
db.volume_type_create(self.context, sized_vol_type_dict)
self.sized_vol_type = db.volume_type_get_by_name(
self.context, sized_vol_type_dict['name'])
unsized_vol_type_dict = {'name': 'unsized', 'extra_specs': {}}
db.volume_type_create(context.get_admin_context(),
unsized_vol_type_dict)
self.unsized_vol_type = db.volume_type_get_by_name(
self.context, unsized_vol_type_dict['name'])
@mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
@mock.patch.object(QUOTAS, 'reserve')
@mock.patch.object(QUOTAS, 'commit')
@mock.patch.object(QUOTAS, 'rollback')
def test_create_driver_not_initialized(self, reserve, commit, rollback,
mock_notify):
self.volume.driver._initialized = False
def fake_reserve(context, expire=None, project_id=None, **deltas):
return ["RESERVATION"]
def fake_commit_and_rollback(context, reservations, project_id=None):
pass
reserve.return_value = fake_reserve
commit.return_value = fake_commit_and_rollback
rollback.return_value = fake_commit_and_rollback
volume = tests_utils.create_volume(
self.context,
availability_zone=CONF.storage_availability_zone,
**self.volume_params)
volume_id = volume['id']
self.assertIsNone(volume['encryption_key_id'])
mock_notify.assert_not_called()
self.assertRaises(exception.DriverNotInitialized,
self.volume.create_volume, self.context, volume)
volume = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual("error", volume.status)
db.volume_destroy(context.get_admin_context(), volume_id)
def test_create_driver_not_initialized_rescheduling(self):
self.volume.driver._initialized = False
mock_delete = self.mock_object(self.volume.driver, 'delete_volume')
volume = tests_utils.create_volume(
self.context,
availability_zone=CONF.storage_availability_zone,
**self.volume_params)
volume_id = volume['id']
self.assertRaises(exception.DriverNotInitialized,
self.volume.create_volume,
self.context, volume,
{'volume_properties': self.volume_params},
{'retry': {'num_attempts': 1, 'host': []}})
# NOTE(dulek): Volume should be rescheduled as we passed request_spec
# and filter_properties, assert that it wasn't counted in
# allocated_capacity tracking.
self.assertEqual({'_pool0': {'allocated_capacity_gb': 0}},
self.volume.stats['pools'])
# NOTE(dulek): As we've rescheduled, make sure delete_volume was
# called.
self.assertTrue(mock_delete.called)
db.volume_destroy(context.get_admin_context(), volume_id)
def test_create_non_cinder_exception_rescheduling(self):
params = self.volume_params
del params['host']
volume = tests_utils.create_volume(
self.context,
availability_zone=CONF.storage_availability_zone,
**params)
volume_id = volume['id']
with mock.patch.object(self.volume.driver, 'create_volume',
side_effect=processutils.ProcessExecutionError):
self.assertRaises(processutils.ProcessExecutionError,
self.volume.create_volume,
self.context, volume,
{'volume_properties': params},
{'retry': {'num_attempts': 1, 'host': []}})
# NOTE(dulek): Volume should be rescheduled as we passed request_spec
# and filter_properties, assert that it wasn't counted in
# allocated_capacity tracking.
self.assertEqual({'_pool0': {'allocated_capacity_gb': 0}},
self.volume.stats['pools'])
db.volume_destroy(context.get_admin_context(), volume_id)
@mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
@mock.patch.object(QUOTAS, 'rollback')
@mock.patch.object(QUOTAS, 'commit')
@mock.patch.object(QUOTAS, 'reserve')
def test_delete_driver_not_initialized(self, reserve, commit, rollback,
mock_notify):
self.volume.driver._initialized = False
def fake_reserve(context, expire=None, project_id=None, **deltas):
return ["RESERVATION"]
def fake_commit_and_rollback(context, reservations, project_id=None):
pass
reserve.return_value = fake_reserve
commit.return_value = fake_commit_and_rollback
rollback.return_value = fake_commit_and_rollback
volume = tests_utils.create_volume(
self.context,
availability_zone=CONF.storage_availability_zone,
**self.volume_params)
self.assertIsNone(volume['encryption_key_id'])
mock_notify.assert_not_called()
self.assertRaises(exception.DriverNotInitialized,
self.volume.delete_volume, self.context, volume)
volume = objects.Volume.get_by_id(self.context, volume.id)
self.assertEqual("error_deleting", volume.status)
volume.destroy()
@mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
@mock.patch('cinder.quota.QUOTAS.rollback', new=mock.Mock())
@mock.patch('cinder.quota.QUOTAS.commit', new=mock.Mock())
@mock.patch('cinder.quota.QUOTAS.reserve', return_value=['RESERVATION'])
def test_create_delete_volume(self, _mock_reserve, mock_notify):
"""Test volume can be created and deleted."""
volume = tests_utils.create_volume(
self.context,
availability_zone=CONF.storage_availability_zone,
**self.volume_params)
volume_id = volume['id']
mock_notify.assert_not_called()
self.assertIsNone(volume['encryption_key_id'])
self.volume.create_volume(self.context, volume)
self.assert_notify_called(mock_notify,
(['INFO', 'volume.create.start'],
['INFO', 'volume.create.end']),
any_order=True)
self.assertEqual({'_pool0': {'allocated_capacity_gb': 1}},
self.volume.stats['pools'])
self.volume.delete_volume(self.context, volume)
vol = db.volume_get(context.get_admin_context(read_deleted='yes'),
volume_id)
self.assertEqual(vol['status'], 'deleted')
self.assert_notify_called(mock_notify,
(['INFO', 'volume.create.start'],
['INFO', 'volume.create.end'],
['INFO', 'volume.delete.start'],
['INFO', 'volume.delete.end']),
any_order=True)
self.assertEqual({'_pool0': {'allocated_capacity_gb': 0}},
self.volume.stats['pools'])
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume_id)
def test_create_delete_volume_with_metadata(self):
"""Test volume can be created with metadata and deleted."""
test_meta = {'fake_key': 'fake_value'}
volume = tests_utils.create_volume(self.context, metadata=test_meta,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
self.assertEqual(test_meta, volume.metadata)
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume_id)
def test_delete_volume_frozen(self):
service = tests_utils.create_service(self.context, {'frozen': True})
volume = tests_utils.create_volume(self.context, host=service.host)
self.assertRaises(exception.InvalidInput,
self.volume_api.delete, self.context, volume)
def test_delete_volume_another_cluster_fails(self):
"""Test delete of volume from another cluster fails."""
self.volume.cluster = 'mycluster'
volume = tests_utils.create_volume(self.context, status='available',
size=1, host=CONF.host + 'fake',
cluster_name=self.volume.cluster)
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume.id)
@mock.patch('cinder.db.volume_metadata_update')
def test_create_volume_metadata(self, metadata_update):
metadata = {'fake_key': 'fake_value'}
metadata_update.return_value = metadata
volume = tests_utils.create_volume(self.context, **self.volume_params)
res = self.volume_api.create_volume_metadata(self.context,
volume, metadata)
metadata_update.assert_called_once_with(self.context, volume.id,
metadata, False,
common.METADATA_TYPES.user)
self.assertEqual(metadata, res)
@ddt.data('maintenance', 'uploading')
def test_create_volume_metadata_maintenance(self, status):
metadata = {'fake_key': 'fake_value'}
volume = tests_utils.create_volume(self.context, **self.volume_params)
volume['status'] = status
self.assertRaises(exception.InvalidVolume,
self.volume_api.create_volume_metadata,
self.context,
volume,
metadata)
def test_update_volume_metadata_with_metatype(self):
"""Test update volume metadata with different metadata type."""
test_meta1 = {'fake_key1': 'fake_value1'}
test_meta2 = {'fake_key1': 'fake_value2'}
FAKE_METADATA_TYPE = enum.Enum('METADATA_TYPES', 'fake_type')
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
self.volume.create_volume(self.context, volume)
# update user metadata associated with the volume.
result_meta = self.volume_api.update_volume_metadata(
self.context,
volume,
test_meta2,
False,
common.METADATA_TYPES.user)
self.assertEqual(test_meta2, result_meta)
# create image metadata associated with the volume.
result_meta = self.volume_api.update_volume_metadata(
self.context,
volume,
test_meta1,
False,
common.METADATA_TYPES.image)
self.assertEqual(test_meta1, result_meta)
# update image metadata associated with the volume.
result_meta = self.volume_api.update_volume_metadata(
self.context,
volume,
test_meta2,
False,
common.METADATA_TYPES.image)
self.assertEqual(test_meta2, result_meta)
# update volume metadata with invalid metadta type.
self.assertRaises(exception.InvalidMetadataType,
self.volume_api.update_volume_metadata,
self.context,
volume,
test_meta1,
False,
FAKE_METADATA_TYPE.fake_type)
def test_update_volume_metadata_maintenance(self):
"""Test update volume metadata with different metadata type."""
test_meta1 = {'fake_key1': 'fake_value1'}
FAKE_METADATA_TYPE = enum.Enum('METADATA_TYPES', 'fake_type')
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
volume['status'] = 'maintenance'
self.assertRaises(exception.InvalidVolume,
self.volume_api.update_volume_metadata,
self.context,
volume,
test_meta1,
False,
FAKE_METADATA_TYPE.fake_type)
@mock.patch('cinder.db.volume_update')
def test_update_with_ovo(self, volume_update):
"""Test update volume using oslo_versionedobject."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
updates = {'display_name': 'foobbar'}
self.volume_api.update(self.context, volume, updates)
volume_update.assert_called_once_with(self.context, volume.id,
updates)
self.assertEqual('foobbar', volume.display_name)
def test_delete_volume_metadata_with_metatype(self):
"""Test delete volume metadata with different metadata type."""
test_meta1 = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
test_meta2 = {'fake_key1': 'fake_value1'}
FAKE_METADATA_TYPE = enum.Enum('METADATA_TYPES', 'fake_type')
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
# delete user metadata associated with the volume.
self.volume_api.delete_volume_metadata(
self.context,
volume,
'fake_key2',
common.METADATA_TYPES.user)
self.assertEqual(test_meta2,
db.volume_metadata_get(self.context, volume_id))
# create image metadata associated with the volume.
result_meta = self.volume_api.update_volume_metadata(
self.context,
volume,
test_meta1,
False,
common.METADATA_TYPES.image)
self.assertEqual(test_meta1, result_meta)
# delete image metadata associated with the volume.
self.volume_api.delete_volume_metadata(
self.context,
volume,
'fake_key2',
common.METADATA_TYPES.image)
# parse the result to build the dict.
rows = db.volume_glance_metadata_get(self.context, volume_id)
result = {}
for row in rows:
result[row['key']] = row['value']
self.assertEqual(test_meta2, result)
# delete volume metadata with invalid metadta type.
self.assertRaises(exception.InvalidMetadataType,
self.volume_api.delete_volume_metadata,
self.context,
volume,
'fake_key1',
FAKE_METADATA_TYPE.fake_type)
def test_delete_volume_metadata_maintenance(self):
"""Test delete volume metadata in maintenance."""
FAKE_METADATA_TYPE = enum.Enum('METADATA_TYPES', 'fake_type')
test_meta1 = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
volume['status'] = 'maintenance'
self.assertRaises(exception.InvalidVolume,
self.volume_api.delete_volume_metadata,
self.context,
volume,
'fake_key1',
FAKE_METADATA_TYPE.fake_type)
def test_accept_transfer_maintenance(self):
"""Test accept transfer in maintenance."""
test_meta1 = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
volume['status'] = 'maintenance'
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidVolume,
volume_api.accept_transfer,
self.context,
volume,
None, None)
@mock.patch.object(cinder.volume.api.API, 'list_availability_zones')
def test_create_volume_uses_default_availability_zone(self, mock_list_az):
"""Test setting availability_zone correctly during volume create."""
mock_list_az.return_value = ({'name': 'az1', 'available': True},
{'name': 'az2', 'available': True},
{'name': 'default-az', 'available': True})
volume_api = cinder.volume.api.API()
# Test backwards compatibility, default_availability_zone not set
self.override_config('storage_availability_zone', 'az2')
volume = volume_api.create(self.context,
1,
'name',
'description',
volume_type=self.vol_type)
self.assertEqual('az2', volume['availability_zone'])
self.override_config('default_availability_zone', 'default-az')
volume = volume_api.create(self.context,
1,
'name',
'description',
volume_type=self.vol_type)
self.assertEqual('default-az', volume['availability_zone'])
def test_create_volume_with_default_type_misconfigured(self):
"""Test volume creation with non-existent default volume type."""
volume_api = cinder.volume.api.API()
self.flags(default_volume_type='fake_type')
# Create volume with default volume type while default
# volume type doesn't exist
self.assertRaises(exception.VolumeTypeDefaultMisconfiguredError,
volume_api.create, self.context, 1,
'name', 'description')
@mock.patch('cinder.quota.QUOTAS.rollback', new=mock.MagicMock())
@mock.patch('cinder.quota.QUOTAS.commit', new=mock.MagicMock())
@mock.patch('cinder.quota.QUOTAS.reserve', return_value=["RESERVATION"])
def test_create_volume_with_volume_type(self, _mock_reserve):
"""Test volume creation with default volume type."""
volume_api = cinder.volume.api.API()
# Create volume with default volume type while default
# volume type doesn't exist, volume_type_id should be NULL
volume = volume_api.create(self.context,
1,
'name',
'description',
volume_type=self.vol_type)
self.assertIsNone(volume['encryption_key_id'])
# Create default volume type
vol_type = conf_fixture.def_vol_type
db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
vol_type)
# Create volume with default volume type
volume = volume_api.create(self.context,
1,
'name',
'description')
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
self.assertIsNone(volume['encryption_key_id'])
# Create volume with specific volume type
vol_type = 'test'
db.volume_type_create(context.get_admin_context(),
{'name': vol_type, 'extra_specs': {}})
db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
vol_type)
volume = volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
@mock.patch('cinder.quota.QUOTAS.rollback', new=mock.MagicMock())
@mock.patch('cinder.quota.QUOTAS.commit', new=mock.MagicMock())
@mock.patch('cinder.quota.QUOTAS.reserve', return_value=["RESERVATION"])
def test_create_volume_with_volume_type_size_limits(self, _mock_reserve):
"""Test that volume type size limits are enforced."""
volume_api = cinder.volume.api.API()
volume = volume_api.create(self.context,
2,
'name',
'description',
volume_type=self.sized_vol_type)
self.assertEqual(self.sized_vol_type['id'], volume['volume_type_id'])
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
1,
'name',
'description',
volume_type=self.sized_vol_type)
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
5,
'name',
'description',
volume_type=self.sized_vol_type)
def test_create_volume_with_multiattach_volume_type(self):
"""Test volume creation with multiattach volume type."""
elevated = context.get_admin_context()
volume_api = cinder.volume.api.API()
especs = dict(multiattach="<is> True")
volume_types.create(elevated,
"multiattach-type",
especs,
description="test-multiattach")
foo = objects.VolumeType.get_by_name_or_id(elevated,
"multiattach-type")
vol = volume_api.create(self.context,
1,
'admin-vol',
'description',
volume_type=foo)
self.assertEqual(foo['id'], vol['volume_type_id'])
self.assertTrue(vol['multiattach'])
def test_create_volume_with_multiattach_flag(self):
"""Tests creating a volume with multiattach=True but no special type.
This tests the pre 3.50 microversion behavior of being able to create
a volume with the multiattach request parameter regardless of a
multiattach-capable volume type.
"""
volume_api = cinder.volume.api.API()
volume = volume_api.create(
self.context, 1, 'name', 'description', multiattach=True,
volume_type=self.vol_type)
self.assertTrue(volume.multiattach)
def _fail_multiattach_policy_authorize(self, policy):
if policy == vol_policy.MULTIATTACH_POLICY:
raise exception.PolicyNotAuthorized(action='Test')
def test_create_volume_with_multiattach_volume_type_not_authorized(self):
"""Test policy unauthorized create with multiattach volume type."""
elevated = context.get_admin_context()
volume_api = cinder.volume.api.API()
especs = dict(multiattach="<is> True")
volume_types.create(elevated,
"multiattach-type",
especs,
description="test-multiattach")
foo = objects.VolumeType.get_by_name_or_id(elevated,
"multiattach-type")
with mock.patch.object(self.context, 'authorize') as mock_auth:
mock_auth.side_effect = self._fail_multiattach_policy_authorize
self.assertRaises(exception.PolicyNotAuthorized,
volume_api.create, self.context,
1, 'admin-vol', 'description',
volume_type=foo)
def test_create_volume_with_multiattach_flag_not_authorized(self):
"""Test policy unauthorized create with multiattach flag."""
volume_api = cinder.volume.api.API()
with mock.patch.object(self.context, 'authorize') as mock_auth:
mock_auth.side_effect = self._fail_multiattach_policy_authorize
self.assertRaises(exception.PolicyNotAuthorized,
volume_api.create, self.context, 1, 'name',
'description', multiattach=True)
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_create_volume_with_encrypted_volume_type_multiattach(self):
ctxt = context.get_admin_context()
cipher = 'aes-xts-plain64'
key_size = 256
control_location = 'front-end'
db.volume_type_create(ctxt,
{'id': '61298380-0c12-11e3-bfd6-4b48424183be',
'name': 'LUKS',
'extra_specs': {'multiattach': '<is> True'}})
db.volume_type_encryption_create(
ctxt,
'61298380-0c12-11e3-bfd6-4b48424183be',
{'control_location': control_location,
'provider': ENCRYPTION_PROVIDER,
'cipher': cipher,
'key_size': key_size})
volume_api = cinder.volume.api.API()
db_vol_type = db.volume_type_get_by_name(ctxt, 'LUKS')
self.assertRaises(exception.InvalidVolume,
volume_api.create,
self.context,
1,
'name',
'description',
volume_type=db_vol_type)
@ddt.data({'cipher': 'blowfish-cbc', 'algo': 'blowfish', 'length': 32},
{'cipher': 'aes-xts-plain64', 'algo': 'aes', 'length': 256})
@ddt.unpack
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_create_volume_with_encrypted_volume_types(
self, cipher, algo, length):
ctxt = context.get_admin_context()
key_size = length
control_location = 'front-end'
db.volume_type_create(ctxt,
{'id': '61298380-0c12-11e3-bfd6-4b48424183be',
'name': 'LUKS'})
db.volume_type_encryption_create(
ctxt,
'61298380-0c12-11e3-bfd6-4b48424183be',
{'control_location': control_location,
'provider': ENCRYPTION_PROVIDER,
'cipher': cipher,
'key_size': key_size})
volume_api = cinder.volume.api.API()
db_vol_type = db.volume_type_get_by_name(ctxt, 'LUKS')
volume = volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
key_manager = volume_api.key_manager
key = key_manager.get(self.context, volume['encryption_key_id'])
self.assertEqual(key_size, len(key.get_encoded()) * 8)
self.assertEqual(algo, key.algorithm)
metadata = db.volume_encryption_metadata_get(self.context, volume.id)
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
self.assertEqual(cipher, metadata.get('cipher'))
self.assertEqual(key_size, metadata.get('key_size'))
self.assertIsNotNone(volume['encryption_key_id'])
def test_create_volume_with_provider_id(self):
volume_params_with_provider_id = dict(provider_id=fake.PROVIDER_ID,
**self.volume_params)
volume = tests_utils.create_volume(self.context,
**volume_params_with_provider_id)
self.volume.create_volume(self.context, volume)
self.assertEqual(fake.PROVIDER_ID, volume['provider_id'])
def test_create_volume_with_admin_metadata(self):
with mock.patch.object(
self.volume.driver, 'create_volume',
return_value={'admin_metadata': {'foo': 'bar'}}):
volume = tests_utils.create_volume(self.user_context)
self.volume.create_volume(self.user_context, volume)
self.assertEqual({'foo': 'bar'}, volume['admin_metadata'])
@mock.patch.object(key_manager, 'API', new=fake_keymgr.fake_api)
def test_create_delete_volume_with_encrypted_volume_type(self):
cipher = 'aes-xts-plain64'
key_size = 256
db.volume_type_create(self.context,
{'id': fake.VOLUME_TYPE_ID, 'name': 'LUKS'})
db.volume_type_encryption_create(
self.context, fake.VOLUME_TYPE_ID,
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
db_vol_type = db.volume_type_get_by_name(self.context, 'LUKS')
volume = self.volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
self.assertIsNotNone(volume.get('encryption_key_id', None))
self.assertEqual(db_vol_type.get('id'), volume['volume_type_id'])
volume['host'] = 'fake_host'
volume['status'] = 'available'
db.volume_update(self.context, volume['id'], {'status': 'available'})
self.volume_api.delete(self.context, volume)
volume = objects.Volume.get_by_id(self.context, volume.id)
while volume.status == 'available':
# Must wait for volume_api delete request to process enough to
# change the volume status.
time.sleep(0.5)
volume.refresh()
self.assertEqual('deleting', volume['status'])
db.volume_destroy(self.context, volume['id'])
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume['id'])
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_delete_encrypted_volume_fail_deleting_key(self):
cipher = 'aes-xts-plain64'
key_size = 256
db.volume_type_create(self.context,
{'id': fake.VOLUME_TYPE_ID, 'name': 'LUKS'})
db.volume_type_encryption_create(
self.context, fake.VOLUME_TYPE_ID,
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
db_vol_type = db.volume_type_get_by_name(self.context, 'LUKS')
volume = self.volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
volume_id = volume['id']
volume['host'] = 'fake_host'
volume['status'] = 'available'
db.volume_update(self.context, volume_id, {'status': 'available'})
with mock.patch.object(
self.volume_api.key_manager,
'delete',
side_effect=Exception):
self.assertRaises(exception.InvalidVolume,
self.volume_api.delete,
self.context,
volume)
volume = objects.Volume.get_by_id(self.context, volume_id)
self.assertEqual("error_deleting", volume.status)
volume.destroy()
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_delete_encrypted_volume_key_not_found(self):
cipher = 'aes-xts-plain64'
key_size = 256
db.volume_type_create(self.context,
{'id': fake.VOLUME_TYPE_ID, 'name': 'LUKS'})
db.volume_type_encryption_create(
self.context, fake.VOLUME_TYPE_ID,
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
db_vol_type = db.volume_type_get_by_name(self.context, 'LUKS')
volume = self.volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
volume_id = volume['id']
volume['host'] = 'fake_host'
volume['status'] = 'available'
db.volume_update(self.context, volume_id, {'status': 'available'})
with mock.patch.object(
self.volume_api.key_manager,
'delete',
side_effect=castellan_exception.ManagedObjectNotFoundError):
self.volume_api.delete(self.context, volume)
volume = objects.Volume.get_by_id(self.context, volume_id)
self.assertEqual("deleting", volume.status)
volume.destroy()
def test_delete_busy_volume(self):
"""Test volume survives deletion if driver reports it as busy."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
with mock.patch.object(self.volume.driver, 'delete_volume',
side_effect=exception.VolumeIsBusy(
volume_name='fake')
) as mock_del_vol:
self.volume.delete_volume(self.context, volume)
volume_ref = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(volume_id, volume_ref.id)
self.assertEqual("available", volume_ref.status)
mock_del_vol.assert_called_once_with(volume)
def test_unmanage_encrypted_volume_fails(self):
volume = tests_utils.create_volume(
self.context,
encryption_key_id=fake.ENCRYPTION_KEY_ID,
**self.volume_params)
self.volume.create_volume(self.context, volume)
manager = vol_manager.VolumeManager()
self.assertRaises(exception.Invalid,
manager.delete_volume,
self.context,
volume,
unmanage_only=True)
self.volume.delete_volume(self.context, volume)
def test_get_volume_different_tenant(self):
"""Test can't get volume of another tenant when viewable_admin_meta."""
volume = tests_utils.create_volume(self.context,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
another_context = context.RequestContext('another_user_id',
'another_project_id',
is_admin=False)
self.assertNotEqual(another_context.project_id,
self.context.project_id)
volume_api = cinder.volume.api.API()
self.assertRaises(exception.VolumeNotFound, volume_api.get,
another_context, volume_id, viewable_admin_meta=True)
self.assertEqual(volume_id,
volume_api.get(self.context, volume_id)['id'])
self.volume.delete_volume(self.context, volume)
def test_get_all_limit_bad_value(self):
"""Test value of 'limit' is numeric and >= 0"""
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidInput,
volume_api.get_all,
self.context,
limit="A")
self.assertRaises(exception.InvalidInput,
volume_api.get_all,
self.context,
limit="-1")
def test_get_all_tenants_volume_list(self):
"""Validate when the volume list for all tenants is returned"""
volume_api = cinder.volume.api.API()
with mock.patch.object(volume_api.db,
'volume_get_all_by_project') as by_project:
with mock.patch.object(volume_api.db,
'volume_get_all') as get_all:
db_volume = {'volume_type_id': fake.VOLUME_TYPE_ID,
'name': 'fake_name',
'host': 'fake_host',
'id': fake.VOLUME_ID}
volume = fake_volume.fake_db_volume(**db_volume)
by_project.return_value = [volume]
get_all.return_value = [volume]
volume_api.get_all(self.context, filters={'all_tenants': '0'})
self.assertTrue(by_project.called)
by_project.called = False
self.context.is_admin = False
volume_api.get_all(self.context, filters={'all_tenants': '1'})
self.assertTrue(by_project.called)
# check for volume list of all tenants
self.context.is_admin = True
volume_api.get_all(self.context, filters={'all_tenants': '1'})
self.assertTrue(get_all.called)
def test_delete_volume_in_error_extending(self):
"""Test volume can be deleted in error_extending stats."""
# create a volume
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume)
# delete 'error_extending' volume
db.volume_update(self.context, volume['id'],
{'status': 'error_extending'})
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.NotFound, db.volume_get,
self.context, volume['id'])
@mock.patch.object(db.sqlalchemy.api, 'volume_get',
side_effect=exception.VolumeNotFound(
volume_id='12345678-1234-5678-1234-567812345678'))
def test_delete_volume_not_found(self, mock_get_volume):
"""Test delete volume moves on if the volume does not exist."""
volume_id = '12345678-1234-5678-1234-567812345678'
volume = objects.Volume(self.context, status='available', id=volume_id)
self.volume.delete_volume(self.context, volume)
self.assertTrue(mock_get_volume.called)
@mock.patch('cinder.volume.drivers.lvm.LVMVolumeDriver.'
'create_volume_from_snapshot')
def test_create_volume_from_snapshot(self, mock_create_from_snap):
"""Test volume can be created from a snapshot."""
volume_src = tests_utils.create_volume(self.context,
**self.volume_params)
self.volume.create_volume(self.context, volume_src)
snapshot_id = create_snapshot(volume_src['id'],
size=volume_src['size'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.volume.create_snapshot(self.context, snapshot_obj)
volume_dst = tests_utils.create_volume(self.context,
snapshot_id=snapshot_id,
**self.volume_params)
self.volume.create_volume(self.context, volume_dst)
self.assertEqual(volume_dst['id'],
db.volume_get(
context.get_admin_context(),
volume_dst['id']).id)
self.assertEqual(snapshot_id,
db.volume_get(context.get_admin_context(),
volume_dst['id']).snapshot_id)
self.volume.delete_volume(self.context, volume_dst)
self.volume.delete_snapshot(self.context, snapshot_obj)
self.volume.delete_volume(self.context, volume_src)
@mock.patch('cinder.volume.flows.api.create_volume.get_flow')
@mock.patch('cinder.objects.volume.Volume.get_by_id')
def test_create_volume_from_snapshot_with_types(
self, _get_by_id, _get_flow):
"""Test volume create from snapshot with types including mistmatch."""
volume_api = cinder.volume.api.API()
foo_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE_ID,
name='foo',
extra_specs={'volume_backend_name': 'dev_1'})
biz_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE2_ID,
name='foo',
extra_specs={'volume_backend_name': 'dev_2'})
source_vol = fake_volume.fake_volume_obj(
self.context,
id=fake.VOLUME_ID,
status='available',
volume_size=10,
volume_type_id=biz_type.id)
source_vol.volume_type = biz_type
snapshot = {'id': fake.SNAPSHOT_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 10,
'volume_type_id': biz_type.id}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
**snapshot)
snapshot_obj.volume = source_vol
# Make sure the case of specifying a type that
# doesn't match the snapshots type fails
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
snapshot=snapshot_obj)
# Make sure that trying to specify a type
# when the snapshots type is None fails
snapshot_obj.volume_type_id = None
snapshot_obj.volume.volume_type_id = None
snapshot_obj.volume.volume_type = None
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
snapshot=snapshot_obj)
snapshot_obj.volume_type_id = foo_type.id
snapshot_obj.volume.volume_type_id = foo_type.id
snapshot_obj.volume.volume_type = foo_type
volume_api.create(self.context, size=1, name='fake_name',
description='fake_desc', volume_type=foo_type,
snapshot=snapshot_obj)
@mock.patch('cinder.volume.flows.api.create_volume.get_flow')
@mock.patch('cinder.objects.volume.Volume.get_by_id')
def test_create_volume_from_source_with_types(
self, _get_by_id, _get_flow):
"""Test volume create from source with types including mistmatch."""
volume_api = cinder.volume.api.API()
foo_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE_ID,
name='foo',
extra_specs={'volume_backend_name': 'dev_1'})
biz_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE2_ID,
name='biz',
extra_specs={'volume_backend_name': 'dev_2'})
source_vol = fake_volume.fake_volume_obj(
self.context,
id=fake.VOLUME_ID,
status='available',
volume_size=0,
volume_type_id=biz_type.id)
source_vol.volume_type = biz_type
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
source_volume=source_vol)
# Make sure that trying to specify a type
# when the source type is None fails
source_vol.volume_type_id = None
source_vol.volume_type = None
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
source_volume=source_vol)
source_vol.volume_type_id = biz_type.id
source_vol.volume_type = biz_type
volume_api.create(self.context, size=1, name='fake_name',
description='fake_desc', volume_type=biz_type,
source_volume=source_vol)
@mock.patch('cinder.volume.flows.api.create_volume.get_flow')
@mock.patch('cinder.objects.volume.Volume.get_by_id')
def test_create_volume_from_source_with_same_backend(
self, _get_by_id, _get_flow):
"""Test volume create from source with type mismatch same backend."""
volume_api = cinder.volume.api.API()
foo_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE_ID,
name='foo',
qos_specs_id=None,
deleted=False,
created_at=datetime.datetime(2015, 5, 8, 0, 40, 5, 408232),
updated_at=None,
extra_specs={'volume_backend_name': 'dev_1'},
is_public=True,
deleted_at=None,
description=None)
biz_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE2_ID,
name='biz',
qos_specs_id=None,
deleted=False,
created_at=datetime.datetime(2015, 5, 8, 0, 20, 5, 408232),
updated_at=None,
extra_specs={'volume_backend_name': 'dev_1'},
is_public=True,
deleted_at=None,
description=None)
source_vol = fake_volume.fake_volume_obj(
self.context,
id=fake.VOLUME_ID,
status='available',
volume_size=10,
volume_type_id=biz_type.id)
source_vol.volume_type = biz_type
volume_api.create(self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
source_volume=source_vol)
@mock.patch('cinder.volume.flows.api.create_volume.get_flow')
@mock.patch('cinder.objects.volume.Volume.get_by_id')
def test_create_from_source_and_snap_only_one_backend(
self, _get_by_id, _get_flow):
"""Test create from source and snap with type mismatch one backend."""
volume_api = cinder.volume.api.API()
foo_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE_ID,
name='foo',
qos_specs_id=None,
deleted=False,
created_at=datetime.datetime(2015, 5, 8, 0, 40, 5, 408232),
updated_at=None,
extra_specs={'some_key': 3},
is_public=True,
deleted_at=None,
description=None)
biz_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE2_ID,
name='biz',
qos_specs_id=None,
deleted=False,
created_at=datetime.datetime(2015, 5, 8, 0, 20, 5, 408232),
updated_at=None,
extra_specs={'some_other_key': 4},
is_public=True,
deleted_at=None,
description=None)
source_vol = fake_volume.fake_volume_obj(
self.context,
id=fake.VOLUME_ID,
status='available',
volume_size=10,
volume_type_id=biz_type.id)
source_vol.volume_type = biz_type
snapshot = {'id': fake.SNAPSHOT_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 10,
'volume_type_id': biz_type['id']}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
**snapshot)
snapshot_obj.volume = source_vol
with mock.patch('cinder.db.service_get_all') as mock_get_service, \
mock.patch.object(volume_api,
'list_availability_zones') as mock_get_azs:
mock_get_service.return_value = [
{'host': 'foo',
'uuid': 'a3a593da-7f8d-4bb7-8b4c-f2bc1e0b4824'}]
mock_get_azs.return_value = {}
volume_api.create(self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
source_volume=source_vol)
volume_api.create(self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
snapshot=snapshot_obj)
def _test_create_from_source_snapshot_encryptions(
self, is_snapshot=False):
volume_api = cinder.volume.api.API()
foo_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE_ID,
name='foo',
extra_specs={'volume_backend_name': 'dev_1'})
biz_type = fake_volume.fake_volume_type_obj(
self.context,
id=fake.VOLUME_TYPE2_ID,
name='biz',
extra_specs={'volume_backend_name': 'dev_1'})
source_vol = fake_volume.fake_volume_obj(
self.context,
id=fake.VOLUME_ID,
status='available',
volume_size=1,
volume_type_id=biz_type.id)
source_vol.volume_type = biz_type
snapshot = {'id': fake.SNAPSHOT_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 1,
'volume_type_id': biz_type['id']}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
**snapshot)
snapshot_obj.volume = source_vol
with mock.patch.object(
cinder.volume.volume_types,
'volume_types_encryption_changed') as mock_encryption_changed:
mock_encryption_changed.return_value = True
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
volume_type=foo_type,
source_volume=(
source_vol if not is_snapshot else None),
snapshot=snapshot_obj if is_snapshot else None)
def test_create_from_source_encryption_changed(self):
self._test_create_from_source_snapshot_encryptions()
def test_create_from_snapshot_encryption_changed(self):
self._test_create_from_source_snapshot_encryptions(is_snapshot=True)
def _mock_synchronized(self, name, *s_args, **s_kwargs):
def inner_sync1(f):
def inner_sync2(*args, **kwargs):
self.called.append('lock-%s' % (name))
ret = f(*args, **kwargs)
self.called.append('unlock-%s' % (name))
return ret
return inner_sync2
return inner_sync1
def _fake_execute(self, *cmd, **kwargs):
pass
@mock.patch.object(coordination.Coordinator, 'get_lock')
@mock.patch.object(fake_driver.FakeLoggingVolumeDriver,
'create_volume_from_snapshot')
def test_create_volume_from_snapshot_check_locks(
self, mock_lvm_create, mock_lock):
orig_flow = engine.ActionEngine.run
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
mock_lock.assert_called_with('%s-delete_snapshot' % snap_id)
# now proceed with the flow.
ret = orig_flow(*args, **kwargs)
return ret
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
# no lock
self.volume.create_volume(self.context, src_vol)
snap_id = create_snapshot(src_vol.id,
size=src_vol['size'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snap_id)
# no lock
self.volume.create_snapshot(self.context, snapshot_obj)
dst_vol = tests_utils.create_volume(self.context,
snapshot_id=snap_id,
**self.volume_params)
admin_ctxt = context.get_admin_context()
# mock the flow runner so we can do some checks
self.mock_object(engine.ActionEngine, 'run', mock_flow_run)
# locked
self.volume.create_volume(self.context, dst_vol,
request_spec={'snapshot_id': snap_id})
mock_lock.assert_called_with('%s-delete_snapshot' % snap_id)
self.assertEqual(dst_vol.id, db.volume_get(admin_ctxt, dst_vol.id).id)
self.assertEqual(snap_id,
db.volume_get(admin_ctxt, dst_vol.id).snapshot_id)
# locked
self.volume.delete_volume(self.context, dst_vol)
mock_lock.assert_called_with('%s-delete_volume' % dst_vol.id)
# locked
self.volume.delete_snapshot(self.context, snapshot_obj)
mock_lock.assert_called_with('%s-delete_snapshot' % snap_id)
# locked
self.volume.delete_volume(self.context, src_vol)
mock_lock.assert_called_with('%s-delete_volume' % src_vol.id)
self.assertTrue(mock_lvm_create.called)
@mock.patch.object(coordination.Coordinator, 'get_lock')
def test_create_volume_from_volume_check_locks(self, mock_lock):
# mock the synchroniser so we can record events
self.mock_object(utils, 'execute', self._fake_execute)
orig_flow = engine.ActionEngine.run
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
mock_lock.assert_called_with('%s-delete_volume' % src_vol_id)
# now proceed with the flow.
ret = orig_flow(*args, **kwargs)
return ret
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
# no lock
self.volume.create_volume(self.context, src_vol)
self.assertEqual(0, mock_lock.call_count)
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
dst_vol_id = dst_vol['id']
admin_ctxt = context.get_admin_context()
# mock the flow runner so we can do some checks
self.mock_object(engine.ActionEngine, 'run', mock_flow_run)
# locked
self.volume.create_volume(self.context, dst_vol,
request_spec={'source_volid': src_vol_id})
mock_lock.assert_called_with('%s-delete_volume' % src_vol_id)
self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
self.assertEqual(src_vol_id,
db.volume_get(admin_ctxt, dst_vol_id).source_volid)
# locked
self.volume.delete_volume(self.context, dst_vol)
mock_lock.assert_called_with('%s-delete_volume' % dst_vol_id)
# locked
self.volume.delete_volume(self.context, src_vol)
mock_lock.assert_called_with('%s-delete_volume' % src_vol_id)
def _raise_metadata_copy_failure(self, method, dst_vol):
# MetadataCopyFailure exception will be raised if DB service is Down
# while copying the volume glance metadata
with mock.patch.object(db, method) as mock_db:
mock_db.side_effect = exception.MetadataCopyFailure(
reason="Because of DB service down.")
self.assertRaises(exception.MetadataCopyFailure,
self.volume.create_volume,
self.context,
dst_vol)
# ensure that status of volume is 'error'
vol = db.volume_get(self.context, dst_vol.id)
self.assertEqual('error', vol['status'])
# cleanup resource
db.volume_destroy(self.context, dst_vol.id)
@mock.patch('cinder.utils.execute')
def test_create_volume_from_volume_with_glance_volume_metadata_none(
self, mock_execute):
# create source volume
mock_execute.return_value = None
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
self.volume.create_volume(self.context, src_vol)
# set bootable flag of volume to True
db.volume_update(self.context, src_vol['id'], {'bootable': True})
# create volume from source volume
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
self.volume.create_volume(self.context, dst_vol)
self.assertRaises(exception.GlanceMetadataNotFound,
db.volume_glance_metadata_copy_from_volume_to_volume,
self.context, src_vol_id, dst_vol['id'])
# ensure that status of volume is 'available'
vol = db.volume_get(self.context, dst_vol['id'])
self.assertEqual('available', vol['status'])
# cleanup resource
db.volume_destroy(self.context, src_vol_id)
db.volume_destroy(self.context, dst_vol['id'])
@mock.patch('cinder.utils.execute')
def test_create_volume_from_volume_raise_metadata_copy_failure(
self, mock_execute):
# create source volume
mock_execute.return_value = None
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
self.volume.create_volume(self.context, src_vol)
# set bootable flag of volume to True
db.volume_update(self.context, src_vol['id'], {'bootable': True})
# create volume from source volume
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
self._raise_metadata_copy_failure(
'volume_glance_metadata_copy_from_volume_to_volume',
dst_vol)
# cleanup resource
db.volume_destroy(self.context, src_vol_id)
@mock.patch('cinder.utils.execute')
def test_create_volume_from_snapshot_raise_metadata_copy_failure(
self, mock_execute):
# create source volume
mock_execute.return_value = None
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
self.volume.create_volume(self.context, src_vol)
# set bootable flag of volume to True
db.volume_update(self.context, src_vol['id'], {'bootable': True})
# create volume from snapshot
snapshot_id = create_snapshot(src_vol['id'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.volume.create_snapshot(self.context, snapshot_obj)
# ensure that status of snapshot is 'available'
self.assertEqual(fields.SnapshotStatus.AVAILABLE, snapshot_obj.status)
dst_vol = tests_utils.create_volume(self.context,
snapshot_id=snapshot_id,
**self.volume_params)
self._raise_metadata_copy_failure(
'volume_glance_metadata_copy_to_volume',
dst_vol)
# cleanup resource
snapshot_obj.destroy()
db.volume_destroy(self.context, src_vol_id)
@mock.patch('cinder.utils.execute')
def test_create_volume_from_snapshot_with_glance_volume_metadata_none(
self, mock_execute):
# create source volume
mock_execute.return_value = None
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
self.volume.create_volume(self.context, src_vol)
# set bootable flag of volume to True
db.volume_update(self.context, src_vol['id'], {'bootable': True})
volume = db.volume_get(self.context, src_vol_id)
# create snapshot of volume
snapshot_id = create_snapshot(volume['id'])['id']
snapshot_obj = objects.Snapshot.get_by_id(self.context, snapshot_id)
self.volume.create_snapshot(self.context, snapshot_obj)
# ensure that status of snapshot is 'available'
self.assertEqual(fields.SnapshotStatus.AVAILABLE, snapshot_obj.status)
# create volume from snapshot
dst_vol = tests_utils.create_volume(self.context,
snapshot_id=snapshot_id,
**self.volume_params)
self.volume.create_volume(self.context, dst_vol)
self.assertRaises(exception.GlanceMetadataNotFound,
db.volume_glance_metadata_copy_to_volume,
self.context, dst_vol['id'], snapshot_id)
# ensure that status of volume is 'available'
vol = db.volume_get(self.context, dst_vol['id'])
self.assertEqual('available', vol['status'])
# cleanup resource
snapshot_obj.destroy()
db.volume_destroy(self.context, src_vol_id)
db.volume_destroy(self.context, dst_vol['id'])
@ddt.data({'connector_class':
os_brick.initiator.connectors.iscsi.ISCSIConnector,
'rekey_supported': True,
'already_encrypted': 'yes'},
{'connector_class':
os_brick.initiator.connectors.iscsi.ISCSIConnector,
'rekey_supported': True,
'already_encrypted': 'no'},
{'connector_class':
os_brick.initiator.connectors.rbd.RBDConnector,
'rekey_supported': False,
'already_encrypted': 'no'})
@ddt.unpack
@mock.patch('cinder.volume.volume_utils.delete_encryption_key')
@mock.patch('cinder.volume.flows.manager.create_volume.'
'CreateVolumeFromSpecTask._setup_encryption_keys')
@mock.patch('cinder.db.sqlalchemy.api.volume_encryption_metadata_get')
@mock.patch('cinder.image.image_utils.qemu_img_info')
@mock.patch('cinder.volume.driver.VolumeDriver._detach_volume')
@mock.patch('cinder.volume.driver.VolumeDriver._attach_volume')
@mock.patch('cinder.volume.volume_utils.brick_get_connector_properties')
@mock.patch('cinder.utils.execute')
def test_create_volume_from_volume_with_enc(
self, mock_execute, mock_brick_gcp, mock_at, mock_det,
mock_qemu_img_info, mock_enc_metadata_get, mock_setup_enc_keys,
mock_del_enc_key, connector_class=None, rekey_supported=None,
already_encrypted=None):
# create source volume
mock_execute.return_value = ('', '')
mock_enc_metadata_get.return_value = {'cipher': 'aes-xts-plain64',
'key_size': 256,
'provider': 'luks'}
mock_setup_enc_keys.return_value = (
'qwert',
'asdfg',
fake.ENCRYPTION_KEY2_ID)
params = {'status': 'creating',
'size': 1,
'host': CONF.host,
'encryption_key_id': fake.ENCRYPTION_KEY_ID}
src_vol = tests_utils.create_volume(self.context, **params)
src_vol_id = src_vol['id']
self.volume.create_volume(self.context, src_vol)
db.volume_update(self.context,
src_vol['id'],
{'encryption_key_id': fake.ENCRYPTION_KEY_ID})
# create volume from source volume
params['encryption_key_id'] = fake.ENCRYPTION_KEY2_ID
attach_info = {
'connector': connector_class(None),
'device': {'path': '/some/device/thing'}}
mock_at.return_value = (attach_info, src_vol)
img_info = imageutils.QemuImgInfo()
if already_encrypted:
# defaults to None when not encrypted
img_info.encrypted = 'yes'
img_info.file_format = 'raw'
mock_qemu_img_info.return_value = img_info
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**params)
self.volume.create_volume(self.context, dst_vol)
# ensure that status of volume is 'available'
vol = db.volume_get(self.context, dst_vol['id'])
self.assertEqual('available', vol['status'])
# cleanup resource
db.volume_destroy(self.context, src_vol_id)
db.volume_destroy(self.context, dst_vol['id'])
if rekey_supported:
mock_setup_enc_keys.assert_called_once_with(
mock.ANY,
src_vol,
{'key_size': 256,
'provider': 'luks',
'cipher': 'aes-xts-plain64'}
)
if already_encrypted:
mock_execute.assert_called_once_with(
'cryptsetup', 'luksChangeKey',
'/some/device/thing',
'--force-password',
log_errors=processutils.LOG_ALL_ERRORS,
process_input='qwert\nasdfg\n',
run_as_root=True)
else:
mock_execute.assert_called_once_with(
'cryptsetup', '--batch-mode', 'luksFormat',
'--type', 'luks1',
'--cipher', 'aes-xts-plain64', '--key-size', '256',
'--key-file=-', '/some/device/thing',
process_input='asdfg',
run_as_root=True)
mock_del_enc_key.assert_called_once_with(mock.ANY, # context
mock.ANY, # keymgr
fake.ENCRYPTION_KEY2_ID)
else:
mock_setup_enc_keys.assert_not_called()
mock_execute.assert_not_called()
mock_del_enc_key.assert_not_called()
mock_at.assert_called()
mock_det.assert_called()
@mock.patch('cinder.db.sqlalchemy.api.volume_encryption_metadata_get')
def test_setup_encryption_keys(self, mock_enc_metadata_get):
key_mgr = fake_keymgr.fake_api()
self.mock_object(castellan.key_manager, 'API', return_value=key_mgr)
key_id = key_mgr.store(self.context, KeyObject())
key2_id = key_mgr.store(self.context, KeyObject2())
params = {'status': 'creating',
'size': 1,
'host': CONF.host,
'encryption_key_id': key_id}
vol = tests_utils.create_volume(self.context, **params)
self.volume.create_volume(self.context, vol)
db.volume_update(self.context,
vol['id'],
{'encryption_key_id': key_id})
mock_enc_metadata_get.return_value = {'cipher': 'aes-xts-plain64',
'key_size': 256,
'provider': 'luks'}
ctxt = context.get_admin_context()
enc_info = {'encryption_key_id': key_id}
with mock.patch('cinder.volume.volume_utils.create_encryption_key',
return_value=key2_id):
r = cinder.volume.flows.manager.create_volume.\
CreateVolumeFromSpecTask._setup_encryption_keys(ctxt,
vol,
enc_info)
(source_pass, new_pass, new_key_id) = r
self.assertNotEqual(source_pass, new_pass)
self.assertEqual(new_key_id, key2_id)
@mock.patch.object(key_manager, 'API', fake_keymgr.fake_api)
def test_create_volume_from_snapshot_with_encryption(self):
"""Test volume can be created from a snapshot of an encrypted volume"""
ctxt = context.get_admin_context()
cipher = 'aes-xts-plain64'
key_size = 256
db.volume_type_create(ctxt,
{'id': '61298380-0c12-11e3-bfd6-4b48424183be',
'name': 'LUKS'})
db.volume_type_encryption_create(
ctxt,
'61298380-0c12-11e3-bfd6-4b48424183be',
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
volume_api = cinder.volume.api.API()
db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
'LUKS')
volume_src = volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
db.volume_update(self.context, volume_src['id'],
{'host': 'fake_host@fake_backend',
'status': 'available'})
volume_src = objects.Volume.get_by_id(self.context, volume_src['id'])
snapshot_ref = volume_api.create_snapshot_force(self.context,
volume_src,
'name',
'description')
snapshot_ref['status'] = fields.SnapshotStatus.AVAILABLE
# status must be available
volume_dst = volume_api.create(self.context,
1,
'name',
'description',
snapshot=snapshot_ref)
self.assertEqual(volume_dst['id'],
db.volume_get(
context.get_admin_context(),
volume_dst['id']).id)
self.assertEqual(snapshot_ref['id'],
db.volume_get(context.get_admin_context(),
volume_dst['id']).snapshot_id)
# ensure encryption keys match
self.assertIsNotNone(volume_src['encryption_key_id'])
self.assertIsNotNone(volume_dst['encryption_key_id'])
key_manager = volume_api.key_manager # must use *same* key manager
volume_src_key = key_manager.get(self.context,
volume_src['encryption_key_id'])
volume_dst_key = key_manager.get(self.context,
volume_dst['encryption_key_id'])
self.assertEqual(volume_src_key, volume_dst_key)
def test_create_volume_from_encrypted_volume(self):
"""Test volume can be created from an encrypted volume."""
self.mock_object(key_manager, 'API', fake_keymgr.fake_api)
cipher = 'aes-xts-plain64'
key_size = 256
volume_api = cinder.volume.api.API()
ctxt = context.get_admin_context()
db.volume_type_create(ctxt,
{'id': '61298380-0c12-11e3-bfd6-4b48424183be',
'name': 'LUKS'})
db.volume_type_encryption_create(
ctxt,
'61298380-0c12-11e3-bfd6-4b48424183be',
{'control_location': 'front-end', 'provider': ENCRYPTION_PROVIDER,
'cipher': cipher, 'key_size': key_size})
db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),
'LUKS')
volume_src = volume_api.create(self.context,
1,
'name',
'description',
volume_type=db_vol_type)
db.volume_update(self.context, volume_src['id'],
{'host': 'fake_host@fake_backend',
'status': 'available'})
volume_src = objects.Volume.get_by_id(self.context, volume_src['id'])
volume_dst = volume_api.create(self.context,
1,
'name',
'description',
source_volume=volume_src)
self.assertEqual(volume_dst['id'],
db.volume_get(context.get_admin_context(),
volume_dst['id']).id)
self.assertEqual(volume_src['id'],
db.volume_get(context.get_admin_context(),
volume_dst['id']).source_volid)
# ensure encryption keys match
self.assertIsNotNone(volume_src['encryption_key_id'])
self.assertIsNotNone(volume_dst['encryption_key_id'])
km = volume_api.key_manager # must use *same* key manager
volume_src_key = km.get(self.context,
volume_src['encryption_key_id'])
volume_dst_key = km.get(self.context,
volume_dst['encryption_key_id'])
self.assertEqual(volume_src_key, volume_dst_key)
def test_delete_invalid_status_fails(self):
self.volume_params['status'] = 'invalid1234'
volume = tests_utils.create_volume(self.context,
**self.volume_params)
vol_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidVolume,
vol_api.delete,
self.context,
volume)
def test_create_volume_from_snapshot_fail_bad_size(self):
"""Test volume can't be created from snapshot with bad volume size."""
volume_api = cinder.volume.api.API()
snapshot = {'id': fake.SNAPSHOT_ID,
'status': fields.SnapshotStatus.AVAILABLE,
'volume_size': 10}
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.context,
**snapshot)
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
snapshot=snapshot_obj)
def test_create_volume_from_snapshot_fail_wrong_az(self):
"""Test volume can't be created from snapshot in a different az."""
volume_api = cinder.volume.api.API()
def fake_list_availability_zones(enable_cache=False):
return ({'name': 'nova', 'available': True},
{'name': 'az2', 'available': True})
self.mock_object(volume_api,
'list_availability_zones',
fake_list_availability_zones)
volume_src = tests_utils.create_volume(self.context,
availability_zone='az2',
**self.volume_params)
self.volume.create_volume(self.context, volume_src)
snapshot = create_snapshot(volume_src['id'])
self.volume.create_snapshot(self.context, snapshot)
volume_dst = volume_api.create(self.context,
size=1,
name='fake_name',
description='fake_desc',
snapshot=snapshot)
self.assertEqual('az2', volume_dst['availability_zone'])
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
size=1,
name='fake_name',
description='fake_desc',
snapshot=snapshot,
availability_zone='nova')
def test_create_volume_with_invalid_exclusive_options(self):
"""Test volume create with multiple exclusive options fails."""
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidInput,
volume_api.create,
self.context,
1,
'name',
'description',
snapshot=fake.SNAPSHOT_ID,
image_id=fake.IMAGE_ID,
source_volume=fake.VOLUME_ID)
def test_reserve_volume_success(self):
volume = tests_utils.create_volume(self.context, status='available')
cinder.volume.api.API().reserve_volume(self.context, volume)
volume_db = db.volume_get(self.context, volume.id)
self.assertEqual('attaching', volume_db.status)
db.volume_destroy(self.context, volume.id)
def test_reserve_volume_in_attaching(self):
self._test_reserve_volume_bad_status('attaching')
def test_reserve_volume_in_maintenance(self):
self._test_reserve_volume_bad_status('maintenance')
def _test_reserve_volume_bad_status(self, status):
volume = tests_utils.create_volume(self.context, status=status)
self.assertRaises(exception.InvalidVolume,
cinder.volume.api.API().reserve_volume,
self.context,
volume)
db.volume_destroy(self.context, volume.id)
def test_attachment_reserve_with_bootable_volume(self):
# test the private _attachment_reserve method with a bootable,
# in-use, multiattach volume.
instance_uuid = fake.UUID1
volume = tests_utils.create_volume(self.context, status='in-use')
tests_utils.attach_volume(self.context, volume.id, instance_uuid,
'attached_host', 'mountpoint', mode='rw')
volume.multiattach = True
volume.bootable = True
attachment = self.volume_api._attachment_reserve(
self.context, volume, instance_uuid)
self.assertEqual(attachment.attach_status, 'reserved')
def test_attachment_reserve_conditional_update_attach_race(self):
# Tests a scenario where two instances are racing to attach the
# same multiattach=False volume. One updates the volume status to
# "reserved" but the other fails the conditional update which is
# then validated to not be the same instance that is already attached
# to the multiattach=False volume which triggers a failure.
volume = tests_utils.create_volume(self.context)
# Assert that we're not dealing with a multiattach volume and that
# it does not have any existing attachments.
self.assertFalse(volume.multiattach)
self.assertEqual(0, len(volume.volume_attachment))
# Attach the first instance which is OK and should update the volume
# status to 'reserved'.
self.volume_api._attachment_reserve(self.context, volume, fake.UUID1)
# Try attaching a different instance to the same volume which should
# fail.
ex = self.assertRaises(exception.InvalidVolume,
self.volume_api._attachment_reserve,
self.context, volume, fake.UUID2)
self.assertIn("status must be available or downloading", str(ex))
def test_attachment_reserve_with_instance_uuid_error_volume(self):
# Tests that trying to create an attachment (with an instance_uuid
# provided) on a volume that's not 'available' or 'downloading' status
# will fail if the volume does not have any attachments, similar to how
# the volume reserve action works.
volume = tests_utils.create_volume(self.context, status='error')
# Assert that we're not dealing with a multiattach volume and that
# it does not have any existing attachments.
self.assertFalse(volume.multiattach)
self.assertEqual(0, len(volume.volume_attachment))
# Try attaching an instance to the volume which should fail based on
# the volume status.
ex = self.assertRaises(exception.InvalidVolume,
self.volume_api._attachment_reserve,
self.context, volume, fake.UUID1)
self.assertIn("status must be available or downloading", str(ex))
def test_unreserve_volume_success_in_use(self):
volume = tests_utils.create_volume(self.context, status='attaching')
tests_utils.attach_volume(self.context, volume.id, fake.INSTANCE_ID,
'attached_host', 'mountpoint', mode='rw')
cinder.volume.api.API().unreserve_volume(self.context, volume)
db_volume = db.volume_get(self.context, volume.id)
self.assertEqual('in-use', db_volume.status)
def test_unreserve_volume_success_available(self):
volume = tests_utils.create_volume(self.context, status='attaching')
cinder.volume.api.API().unreserve_volume(self.context, volume)
db_volume = db.volume_get(self.context, volume.id)
self.assertEqual('available', db_volume.status)
def test_multi_node(self):
# TODO(termie): Figure out how to test with two nodes,
# each of them having a different FLAG for storage_node
# This will allow us to test cross-node interactions
pass
def test_cannot_delete_volume_in_use(self):
"""Test volume can't be deleted in in-use status."""
self._test_cannot_delete_volume('in-use')
def test_cannot_delete_volume_maintenance(self):
"""Test volume can't be deleted in maintenance status."""
self._test_cannot_delete_volume('maintenance')
def _test_cannot_delete_volume(self, status):
"""Test volume can't be deleted in invalid stats."""
# create a volume and assign to host
volume = tests_utils.create_volume(self.context, CONF.host,
status=status)
# 'in-use' status raises InvalidVolume
self.assertRaises(exception.InvalidVolume,
self.volume_api.delete,
self.context,
volume)
# clean up
self.volume.delete_volume(self.context, volume)
def test_force_delete_volume(self):
"""Test volume can be forced to delete."""
# create a volume and assign to host
self.volume_params['status'] = 'error_deleting'
volume = tests_utils.create_volume(self.context, **self.volume_params)
# 'error_deleting' volumes can't be deleted
self.assertRaises(exception.InvalidVolume,
self.volume_api.delete,
self.context,
volume)
# delete with force
self.volume_api.delete(self.context, volume, force=True)
# status is deleting
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('deleting', volume.status)
# clean up
self.volume.delete_volume(self.context, volume)
def test_cannot_force_delete_attached_volume(self):
"""Test volume can't be force delete in attached state."""
volume = tests_utils.create_volume(self.context, CONF.host,
status='in-use',
attach_status=
fields.VolumeAttachStatus.ATTACHED)
self.assertRaises(exception.InvalidVolume,
self.volume_api.delete,
self.context,
volume,
force=True)
db.volume_destroy(self.context, volume.id)
def test__revert_to_snapshot_generic_failed(self):
fake_volume = tests_utils.create_volume(self.context,
status='available')
fake_snapshot = tests_utils.create_snapshot(self.context,
fake_volume.id)
with mock.patch.object(
self.volume.driver,
'_create_temp_volume_from_snapshot') as mock_temp, \
mock.patch.object(
self.volume.driver,
'delete_volume') as mock_driver_delete, \
mock.patch.object(
self.volume, '_copy_volume_data') as mock_copy:
temp_volume = tests_utils.create_volume(self.context,
status='available')
mock_copy.side_effect = [exception.VolumeDriverException('error')]
mock_temp.return_value = temp_volume
self.assertRaises(exception.VolumeDriverException,
self.volume._revert_to_snapshot_generic,
self.context, fake_volume, fake_snapshot)
mock_copy.assert_called_once_with(
self.context, temp_volume, fake_volume)
mock_driver_delete.assert_called_once_with(temp_volume)
def test__revert_to_snapshot_generic(self):
fake_volume = tests_utils.create_volume(self.context,
status='available')
fake_snapshot = tests_utils.create_snapshot(self.context,
fake_volume.id)
with mock.patch.object(
self.volume.driver,
'_create_temp_volume_from_snapshot') as mock_temp,\
mock.patch.object(
self.volume.driver, 'delete_volume') as mock_driver_delete,\
mock.patch.object(
self.volume, '_copy_volume_data') as mock_copy:
temp_volume = tests_utils.create_volume(self.context,
status='available')
mock_temp.return_value = temp_volume
self.volume._revert_to_snapshot_generic(
self.context, fake_volume, fake_snapshot)
mock_copy.assert_called_once_with(
self.context, temp_volume, fake_volume)
mock_driver_delete.assert_called_once_with(temp_volume)
@ddt.data({'driver_error': True},
{'driver_error': False})
@ddt.unpack
def test__revert_to_snapshot(self, driver_error):
mock.patch.object(self.volume, '_notify_about_snapshot_usage')
with mock.patch.object(self.volume.driver,
'revert_to_snapshot') as driver_revert, \
mock.patch.object(self.volume, '_notify_about_volume_usage'), \
mock.patch.object(self.volume, '_notify_about_snapshot_usage'),\
mock.patch.object(self.volume,
'_revert_to_snapshot_generic') as generic_revert:
if driver_error:
driver_revert.side_effect = [NotImplementedError]
else:
driver_revert.return_value = None
self.volume._revert_to_snapshot(self.context, {}, {})
driver_revert.assert_called_once_with(self.context, {}, {})
if driver_error:
generic_revert.assert_called_once_with(self.context, {}, {})
@ddt.data({},
{'has_snapshot': True},
{'use_temp_snapshot': True},
{'use_temp_snapshot': True, 'has_snapshot': True})
@ddt.unpack
def test_revert_to_snapshot(self, has_snapshot=False,
use_temp_snapshot=False):
fake_volume = tests_utils.create_volume(self.context,
status='reverting',
project_id='123',
size=2)
fake_snapshot = tests_utils.create_snapshot(self.context,
fake_volume['id'],
status='restoring',
volume_size=1)
with mock.patch.object(self.volume,
'_revert_to_snapshot') as _revert,\
mock.patch.object(self.volume,
'_create_backup_snapshot') as _create_snapshot,\