cinder/cinder/tests/unit/volume/test_connection.py

1351 lines
66 KiB
Python

# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for Volume connection test cases."""
from unittest import mock
import ddt
from cinder import context
from cinder import db
from cinder import exception
from cinder.message import message_field
from cinder import objects
from cinder.objects import fields
from cinder.tests import fake_driver
from cinder.tests.unit.api.v2 import fakes as v2_fakes
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_volume
from cinder.tests.unit import utils as tests_utils
from cinder.tests.unit import volume as base
import cinder.volume
import cinder.volume.targets
import cinder.volume.targets.iscsi
@ddt.ddt
class DiscardFlagTestCase(base.BaseVolumeTestCase):
def setUp(self):
super(DiscardFlagTestCase, self).setUp()
self.volume.driver = mock.MagicMock()
db.volume_type_create(self.context,
v2_fakes.fake_default_type_get(
fake.VOLUME_TYPE2_ID))
self.vol_type = db.volume_type_get_by_name(self.context,
'vol_type_name')
@ddt.data(dict(config_discard_flag=True,
driver_discard_flag=None,
expected_flag=True),
dict(config_discard_flag=False,
driver_discard_flag=None,
expected_flag=None),
dict(config_discard_flag=True,
driver_discard_flag=True,
expected_flag=True),
dict(config_discard_flag=False,
driver_discard_flag=True,
expected_flag=True),
dict(config_discard_flag=False,
driver_discard_flag=False,
expected_flag=False),
dict(config_discard_flag=None,
driver_discard_flag=True,
expected_flag=True),
dict(config_discard_flag=None,
driver_discard_flag=False,
expected_flag=False))
@ddt.unpack
def test_initialize_connection_discard_flag(self,
config_discard_flag,
driver_discard_flag,
expected_flag):
self.volume.driver.create_export.return_value = None
connector = {'ip': 'IP', 'initiator': 'INITIATOR'}
conn_info = {
'driver_volume_type': 'iscsi',
'data': {'access_mode': 'rw',
'encrypted': False}
}
if driver_discard_flag is not None:
conn_info['data']['discard'] = driver_discard_flag
self.volume.driver.initialize_connection.return_value = conn_info
def _safe_get(key):
if key == 'report_discard_supported':
return config_discard_flag
else:
return None
self.volume.driver.configuration.safe_get.side_effect = _safe_get
with mock.patch.object(objects, 'Volume') as mock_vol:
volume = tests_utils.create_volume(self.context)
volume.volume_type_id = None
mock_vol.get_by_id.return_value = volume
conn_info = self.volume.initialize_connection(self.context,
volume,
connector)
self.assertEqual(expected_flag, conn_info['data'].get('discard'))
class VolumeConnectionTestCase(base.BaseVolumeTestCase):
def setUp(self, *args, **kwargs):
super(VolumeConnectionTestCase, self).setUp()
db.volume_type_create(self.context,
v2_fakes.fake_default_type_get(
fake.VOLUME_TYPE2_ID))
self.vol_type = db.volume_type_get_by_name(self.context,
'vol_type_name')
@mock.patch.object(cinder.volume.targets.iscsi.ISCSITarget,
'_get_target_chap_auth')
@mock.patch.object(db, 'volume_admin_metadata_get')
@mock.patch.object(db.sqlalchemy.api, 'volume_get')
@mock.patch.object(db, 'volume_update')
def test_initialize_connection_fetchqos(self,
_mock_volume_update,
_mock_volume_get,
_mock_volume_admin_metadata_get,
mock_get_target):
"""Make sure initialize_connection returns correct information."""
_fake_admin_meta = [{'key': 'fake-key', 'value': 'fake-value'}]
_fake_volume = {'volume_type_id': fake.VOLUME_TYPE_ID,
'name': 'fake_name',
'host': 'fake_host',
'id': fake.VOLUME_ID,
'volume_admin_metadata': _fake_admin_meta}
fake_volume_obj = fake_volume.fake_volume_obj(self.context,
**_fake_volume)
_mock_volume_get.return_value = _fake_volume
_mock_volume_update.return_value = _fake_volume
_mock_volume_admin_metadata_get.return_value = {
'fake-key': 'fake-value'}
connector = {'ip': 'IP', 'initiator': 'INITIATOR'}
qos_values = {'consumer': 'front-end',
'specs': {
'key1': 'value1',
'key2': 'value2'}
}
with mock.patch.object(cinder.volume.volume_types,
'get_volume_type_qos_specs') as type_qos, \
mock.patch.object(cinder.volume.volume_types,
'get_volume_type_extra_specs'
) as type_extra_specs, \
mock.patch.object(cinder.tests.fake_driver.FakeLoggingVolumeDriver,
'initialize_connection') as driver_init:
type_qos.return_value = dict(qos_specs=qos_values)
type_extra_specs.return_value = 'True'
driver_init.return_value = {'data': {}}
mock_get_target.return_value = None
qos_specs_expected = {'key1': 'value1',
'key2': 'value2'}
# initialize_connection() passes qos_specs that is designated to
# be consumed by front-end or both front-end and back-end
conn_info = self.volume.initialize_connection(
self.context, fake_volume_obj, connector,)
self.assertDictEqual(qos_specs_expected,
conn_info['data']['qos_specs'])
qos_values.update({'consumer': 'both'})
conn_info = self.volume.initialize_connection(
self.context, fake_volume_obj, connector)
self.assertDictEqual(qos_specs_expected,
conn_info['data']['qos_specs'])
# initialize_connection() skips qos_specs that is designated to be
# consumed by back-end only
qos_values.update({'consumer': 'back-end'})
type_qos.return_value = dict(qos_specs=qos_values)
conn_info = self.volume.initialize_connection(
self.context, fake_volume_obj, connector)
self.assertIsNone(conn_info['data']['qos_specs'])
@mock.patch.object(cinder.volume.targets.iscsi.ISCSITarget,
'_get_target_chap_auth')
@mock.patch.object(db, 'volume_admin_metadata_get')
@mock.patch.object(db.sqlalchemy.api, 'volume_get')
@mock.patch.object(db, 'volume_update')
def test_initialize_connection_qos_per_gb(self,
_mock_volume_update,
_mock_volume_get,
_mock_volume_admin_metadata_get,
mock_get_target):
"""QoS test with no minimum value."""
_fake_admin_meta = [{'key': 'fake-key', 'value': 'fake-value'}]
_fake_volume = {'size': 3,
'volume_type_id': fake.VOLUME_TYPE_ID,
'name': 'fake_name',
'host': 'fake_host',
'id': fake.VOLUME_ID,
'volume_admin_metadata': _fake_admin_meta}
fake_volume_obj = fake_volume.fake_volume_obj(self.context,
**_fake_volume)
_mock_volume_get.return_value = _fake_volume
_mock_volume_update.return_value = _fake_volume
_mock_volume_admin_metadata_get.return_value = {
'fake-key': 'fake-value'}
connector = {'ip': 'IP', 'initiator': 'INITIATOR'}
qos_values = {'consumer': 'front-end',
'specs': {
'write_iops_sec_per_gb': 30,
'read_iops_sec_per_gb': 7700,
'total_iops_sec_per_gb': 300000,
'read_bytes_sec_per_gb': 10,
'write_bytes_sec_per_gb': 40,
'total_bytes_sec_per_gb': 1048576}
}
with mock.patch.object(cinder.volume.volume_types,
'get_volume_type_qos_specs') as type_qos, \
mock.patch.object(cinder.volume.volume_types,
'get_volume_type_extra_specs'
) as type_extra_specs, \
mock.patch.object(cinder.tests.fake_driver.FakeLoggingVolumeDriver,
'initialize_connection') as driver_init:
type_qos.return_value = dict(qos_specs=qos_values)
type_extra_specs.return_value = 'True'
driver_init.return_value = {'data': {}}
mock_get_target.return_value = None
qos_specs_expected = {'write_iops_sec': 90,
'read_iops_sec': 23100,
'total_iops_sec': 900000,
'read_bytes_sec': 30,
'write_bytes_sec': 120,
'total_bytes_sec': 3145728}
# initialize_connection() passes qos_specs that is designated to
# be consumed by front-end or both front-end and back-end
conn_info = self.volume.initialize_connection(
self.context, fake_volume_obj, connector,)
self.assertDictEqual(qos_specs_expected,
conn_info['data']['qos_specs'])
qos_values.update({'consumer': 'both'})
conn_info = self.volume.initialize_connection(
self.context, fake_volume_obj, connector)
self.assertDictEqual(qos_specs_expected,
conn_info['data']['qos_specs'])
@mock.patch.object(cinder.volume.targets.iscsi.ISCSITarget,
'_get_target_chap_auth')
@mock.patch.object(db, 'volume_admin_metadata_get')
@mock.patch.object(db.sqlalchemy.api, 'volume_get')
@mock.patch.object(db, 'volume_update')
def test_initialize_connection_qos_per_gb_with_min_small(
self, _mock_volume_update, _mock_volume_get,
_mock_volume_admin_metadata_get, mock_get_target):
"""QoS test when volume size results in using minimum."""
_fake_admin_meta = [{'key': 'fake-key', 'value': 'fake-value'}]
_fake_volume = {'size': 1,
'volume_type_id': fake.VOLUME_TYPE_ID,
'name': 'fake_name',
'host': 'fake_host',
'id': fake.VOLUME_ID,
'volume_admin_metadata': _fake_admin_meta}
fake_volume_obj = fake_volume.fake_volume_obj(self.context,
**_fake_volume)
_mock_volume_get.return_value = _fake_volume
_mock_volume_update.return_value = _fake_volume
_mock_volume_admin_metadata_get.return_value = {
'fake-key': 'fake-value'}
connector = {'ip': 'IP', 'initiator': 'INITIATOR'}
qos_values = {'consumer': 'front-end',
'specs': {
'write_iops_sec_per_gb_min': 800,
'write_iops_sec_per_gb': 30,
'read_iops_sec_per_gb_min': 23100,
'read_iops_sec_per_gb': 7700,
'total_iops_sec_per_gb_min': 900000,
'total_iops_sec_per_gb': 300000,
'total_iops_sec_max': 15000000,
'read_bytes_sec_per_gb_min': 30,
'read_bytes_sec_per_gb': 10,
'write_bytes_sec_per_gb_min': 120,
'write_bytes_sec_per_gb': 40,
'total_bytes_sec_per_gb_min': 3145728,
'total_bytes_sec_per_gb': 1048576}
}
with mock.patch.object(cinder.volume.volume_types,
'get_volume_type_qos_specs') as type_qos, \
mock.patch.object(cinder.volume.volume_types,
'get_volume_type_extra_specs'
) as type_extra_specs, \
mock.patch.object(cinder.tests.fake_driver.FakeLoggingVolumeDriver,
'initialize_connection') as driver_init:
type_qos.return_value = dict(qos_specs=qos_values)
type_extra_specs.return_value = 'True'
driver_init.return_value = {'data': {}}
mock_get_target.return_value = None
qos_specs_expected = {'write_iops_sec': 800,
'read_iops_sec': 23100,
'total_iops_sec': 900000,
'read_bytes_sec': 30,
'write_bytes_sec': 120,
'total_bytes_sec': 3145728}
# initialize_connection() passes qos_specs that is designated to
# be consumed by front-end or both front-end and back-end
conn_info = self.volume.initialize_connection(
self.context, fake_volume_obj, connector,)
self.assertDictEqual(qos_specs_expected,
conn_info['data']['qos_specs'])
qos_values.update({'consumer': 'both'})
conn_info = self.volume.initialize_connection(
self.context, fake_volume_obj, connector)
self.assertDictEqual(qos_specs_expected,
conn_info['data']['qos_specs'])
@mock.patch.object(cinder.volume.targets.iscsi.ISCSITarget,
'_get_target_chap_auth')
@mock.patch.object(db, 'volume_admin_metadata_get')
@mock.patch.object(db.sqlalchemy.api, 'volume_get')
@mock.patch.object(db, 'volume_update')
def test_initialize_connection_qos_per_gb_with_min_large(
self, _mock_volume_update, _mock_volume_get,
_mock_volume_admin_metadata_get, mock_get_target):
"""QoS test when volume size results in using per-gb values."""
_fake_admin_meta = [{'key': 'fake-key', 'value': 'fake-value'}]
_fake_volume = {'size': 100,
'volume_type_id': fake.VOLUME_TYPE_ID,
'name': 'fake_name',
'host': 'fake_host',
'id': fake.VOLUME_ID,
'volume_admin_metadata': _fake_admin_meta}
fake_volume_obj = fake_volume.fake_volume_obj(self.context,
**_fake_volume)
_mock_volume_get.return_value = _fake_volume
_mock_volume_update.return_value = _fake_volume
_mock_volume_admin_metadata_get.return_value = {
'fake-key': 'fake-value'}
connector = {'ip': 'IP', 'initiator': 'INITIATOR'}
qos_values = {'consumer': 'front-end',
'specs': {
'write_iops_sec_per_gb_min': 800,
'write_iops_sec_per_gb': 30,
'read_iops_sec_per_gb_min': 23100,
'read_iops_sec_per_gb': 7700,
'total_iops_sec_per_gb_min': 900000,
'total_iops_sec_per_gb': 300000,
'total_iops_sec_max': 15000000,
'read_bytes_sec_per_gb_min': 30,
'read_bytes_sec_per_gb': 10,
'write_bytes_sec_per_gb_min': 120,
'write_bytes_sec_per_gb': 40,
'total_bytes_sec_per_gb_min': 3145728,
'total_bytes_sec_per_gb': 1048576}
}
with mock.patch.object(cinder.volume.volume_types,
'get_volume_type_qos_specs') as type_qos, \
mock.patch.object(cinder.volume.volume_types,
'get_volume_type_extra_specs'
) as type_extra_specs, \
mock.patch.object(cinder.tests.fake_driver.FakeLoggingVolumeDriver,
'initialize_connection') as driver_init:
type_qos.return_value = dict(qos_specs=qos_values)
type_extra_specs.return_value = 'True'
driver_init.return_value = {'data': {}}
mock_get_target.return_value = None
qos_specs_expected = {'write_iops_sec': 3000,
'read_iops_sec': 770000,
'total_iops_sec': 15000000,
'read_bytes_sec': 1000,
'write_bytes_sec': 4000,
'total_bytes_sec': 104857600}
# initialize_connection() passes qos_specs that is designated to
# be consumed by front-end or both front-end and back-end
conn_info = self.volume.initialize_connection(
self.context, fake_volume_obj, connector,)
self.assertDictEqual(qos_specs_expected,
conn_info['data']['qos_specs'])
qos_values.update({'consumer': 'both'})
conn_info = self.volume.initialize_connection(
self.context, fake_volume_obj, connector)
self.assertDictEqual(qos_specs_expected,
conn_info['data']['qos_specs'])
@mock.patch.object(fake_driver.FakeLoggingVolumeDriver, 'create_export')
def test_initialize_connection_export_failure(self,
_mock_create_export):
"""Test exception path for create_export failure."""
volume = tests_utils.create_volume(
self.context, admin_metadata={'fake-key': 'fake-value'},
**self.volume_params)
_mock_create_export.side_effect = exception.CinderException
connector = {'ip': 'IP', 'initiator': 'INITIATOR'}
self.assertRaises(exception.VolumeBackendAPIException,
self.volume.initialize_connection,
self.context, volume, connector)
def test_initialize_connection_maintenance(self):
"""Test initialize connection in maintenance."""
test_meta1 = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
volume['status'] = 'maintenance'
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidVolume,
volume_api.initialize_connection,
self.context,
volume,
None)
@ddt.ddt
class VolumeAttachDetachTestCase(base.BaseVolumeTestCase):
def setUp(self):
super(VolumeAttachDetachTestCase, self).setUp()
self.patch('cinder.volume.volume_utils.clear_volume', autospec=True)
self.user_context = context.RequestContext(user_id=fake.USER_ID,
project_id=fake.PROJECT_ID)
db.volume_type_create(self.context,
v2_fakes.fake_default_type_get(
fake.VOLUME_TYPE2_ID))
self.vol_type = db.volume_type_get_by_name(self.context,
'vol_type_name')
@ddt.data(False, True)
def test_run_attach_detach_volume_for_instance(self, volume_object):
"""Make sure volume can be attached and detached from instance."""
mountpoint = "/dev/sdf"
# attach volume to the instance then to detach
instance_uuid = '12345678-1234-5678-1234-567812345678'
volume = tests_utils.create_volume(self.user_context,
**self.volume_params)
with volume.obj_as_admin():
volume.admin_metadata['readonly'] = True
volume.save()
volume_id = volume.id
self.volume.create_volume(self.user_context,
volume=volume)
volume_passed = volume if volume_object else None
attachment = self.volume.attach_volume(self.user_context,
volume_id,
instance_uuid, None,
mountpoint, 'ro',
volume=volume_passed)
attachment2 = self.volume.attach_volume(self.user_context,
volume_id,
instance_uuid, None,
mountpoint, 'ro',
volume=volume_passed)
self.assertEqual(attachment.id, attachment2.id)
vol = objects.Volume.get_by_id(self.context, volume_id)
self.assertEqual("in-use", vol.status)
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
attachment.attach_status)
self.assertEqual(mountpoint, attachment.mountpoint)
self.assertEqual(instance_uuid, attachment.instance_uuid)
self.assertIsNone(attachment.attached_host)
admin_metadata = vol.volume_admin_metadata
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='True', attached_mode='ro')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
volume = volume if volume_object else vol
conn_info = self.volume.initialize_connection(self.context,
volume, connector)
self.assertEqual('ro', conn_info['data']['access_mode'])
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume=volume)
self.volume.detach_volume(self.context, volume_id,
attachment.id,
volume=volume_passed)
vol = objects.Volume.get_by_id(self.context, volume_id)
self.assertEqual('available', vol.status)
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.VolumeNotFound,
db.volume_get,
self.context,
volume_id)
@mock.patch('cinder.volume.manager.LOG', mock.Mock())
def test_initialize_connection(self):
volume = mock.Mock(save=mock.Mock(side_effect=Exception))
with mock.patch.object(self.volume, 'driver') as driver_mock:
self.assertRaises(exception.ExportFailure,
self.volume.initialize_connection, self.context,
volume, mock.Mock())
driver_mock.remove_export.assert_called_once_with(mock.ANY, volume)
def test_run_attach_detach_2volumes_for_instance(self):
"""Make sure volume can be attached and detached from instance."""
# attach first volume to the instance
mountpoint1 = "/dev/vdc"
instance_uuid = '12345678-1234-5678-1234-567812345678'
volume1 = tests_utils.create_volume(
self.context, admin_metadata={'readonly': 'True'},
**self.volume_params)
volume1_id = volume1['id']
self.volume.create_volume(self.context, volume1)
attachment = self.volume.attach_volume(self.context, volume1_id,
instance_uuid, None,
mountpoint1, 'ro')
vol1 = db.volume_get(context.get_admin_context(), volume1_id)
self.assertEqual("in-use", vol1['status'])
self.assertEqual('attached', attachment['attach_status'])
self.assertEqual(mountpoint1, attachment['mountpoint'])
self.assertEqual(instance_uuid, attachment['instance_uuid'])
self.assertIsNone(attachment['attached_host'])
admin_metadata = vol1['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='True', attached_mode='ro')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume1, connector)
self.assertEqual('ro', conn_info['data']['access_mode'])
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume1)
# attach 2nd volume to the instance
mountpoint2 = "/dev/vdd"
volume2 = tests_utils.create_volume(
self.context, admin_metadata={'readonly': 'False'},
**self.volume_params)
volume2_id = volume2['id']
self.volume.create_volume(self.context, volume2)
attachment2 = self.volume.attach_volume(self.context, volume2_id,
instance_uuid, None,
mountpoint2, 'rw')
vol2 = db.volume_get(context.get_admin_context(), volume2_id)
self.assertEqual("in-use", vol2['status'])
self.assertEqual('attached', attachment2['attach_status'])
self.assertEqual(mountpoint2, attachment2['mountpoint'])
self.assertEqual(instance_uuid, attachment2['instance_uuid'])
self.assertIsNone(attachment2['attached_host'])
admin_metadata = vol2['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='False', attached_mode='rw')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
connector = {'initiator': 'iqn.2012-07.org.fake:02'}
conn_info = self.volume.initialize_connection(self.context,
volume2, connector)
self.assertEqual('rw', conn_info['data']['access_mode'])
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume2)
# detach first volume and then 2nd volume
self.volume.detach_volume(self.context, volume1_id, attachment['id'])
vol1 = db.volume_get(self.context, volume1_id)
self.assertEqual('available', vol1['status'])
self.volume.delete_volume(self.context, volume1)
self.assertRaises(exception.VolumeNotFound,
db.volume_get,
self.context,
volume1_id)
self.volume.detach_volume(self.context, volume2_id, attachment2['id'])
vol2 = db.volume_get(self.context, volume2_id)
self.assertEqual('available', vol2['status'])
self.volume.delete_volume(self.context, volume2)
self.assertRaises(exception.VolumeNotFound,
db.volume_get,
self.context,
volume2_id)
def test_detach_invalid_attachment_id(self):
"""Make sure if the attachment id isn't found we raise."""
attachment_id = "notfoundid"
volume = tests_utils.create_volume(self.context,
admin_metadata={'readonly': 'True'},
multiattach=False,
**self.volume_params)
self.volume.detach_volume(self.context, volume['id'],
attachment_id)
volume = db.volume_get(self.context, volume['id'])
self.assertEqual('available', volume['status'])
instance_uuid = '12345678-1234-5678-1234-567812345678'
attached_host = 'fake_host'
mountpoint = '/dev/fake'
tests_utils.attach_volume(self.context, volume['id'],
instance_uuid, attached_host,
mountpoint)
self.volume.detach_volume(self.context, volume['id'],
attachment_id)
volume = db.volume_get(self.context, volume['id'])
self.assertEqual('in-use', volume['status'])
def test_detach_no_attachments(self):
self.volume_params['status'] = 'detaching'
volume = tests_utils.create_volume(self.context,
admin_metadata={'readonly': 'True'},
multiattach=False,
**self.volume_params)
self.volume.detach_volume(self.context, volume['id'])
volume = db.volume_get(self.context, volume['id'])
self.assertEqual('available', volume['status'])
def test_run_attach_detach_volume_for_instance_no_attachment_id(self):
"""Make sure volume can be attached and detached from instance."""
mountpoint = "/dev/sdf"
# attach volume to the instance then to detach
instance_uuid = '12345678-1234-5678-1234-567812345678'
instance_uuid_2 = '12345678-4321-8765-4321-567812345678'
volume = tests_utils.create_volume(self.context,
admin_metadata={'readonly': 'True'},
multiattach=True,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
attachment = self.volume.attach_volume(self.context, volume_id,
instance_uuid, None,
mountpoint, 'ro')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
attachment['attach_status'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertEqual(instance_uuid, attachment['instance_uuid'])
self.assertIsNone(attachment['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='True', attached_mode='ro')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
attachment2 = self.volume.attach_volume(self.context, volume_id,
instance_uuid_2, None,
mountpoint, 'ro')
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume, connector)
self.assertEqual('ro', conn_info['data']['access_mode'])
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume)
self.assertRaises(exception.InvalidVolume,
self.volume.detach_volume,
self.context, volume_id)
self.volume.detach_volume(self.context, volume_id, attachment['id'])
vol = db.volume_get(self.context, volume_id)
self.assertEqual('in-use', vol['status'])
self.volume.detach_volume(self.context, volume_id, attachment2['id'])
vol = db.volume_get(self.context, volume_id)
self.assertEqual('available', vol['status'])
attachment = self.volume.attach_volume(self.context, volume_id,
instance_uuid, None,
mountpoint, 'ro')
vol = db.volume_get(self.context, volume_id)
self.assertEqual('in-use', vol['status'])
self.volume.detach_volume(self.context, volume_id)
vol = db.volume_get(self.context, volume_id)
self.assertEqual('available', vol['status'])
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.VolumeNotFound,
db.volume_get,
self.context,
volume_id)
def test_run_attach_detach_multiattach_volume_for_instances(self):
"""Make sure volume can be attached to multiple instances."""
mountpoint = "/dev/sdf"
# attach volume to the instance then to detach
instance_uuid = '12345678-1234-5678-1234-567812345678'
volume = tests_utils.create_volume(self.context,
admin_metadata={'readonly': 'True'},
multiattach=True,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
attachment = self.volume.attach_volume(self.context, volume_id,
instance_uuid, None,
mountpoint, 'ro')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertTrue(vol['multiattach'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
attachment['attach_status'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertEqual(instance_uuid, attachment['instance_uuid'])
self.assertIsNone(attachment['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='True', attached_mode='ro')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume, connector)
self.assertEqual('ro', conn_info['data']['access_mode'])
instance2_uuid = '12345678-1234-5678-1234-567812345000'
mountpoint2 = "/dev/sdx"
attachment2 = self.volume.attach_volume(self.context, volume_id,
instance2_uuid, None,
mountpoint2, 'ro')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertTrue(vol['multiattach'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
attachment2['attach_status'])
self.assertEqual(mountpoint2, attachment2['mountpoint'])
self.assertEqual(instance2_uuid, attachment2['instance_uuid'])
self.assertIsNone(attachment2['attached_host'])
self.assertNotEqual(attachment, attachment2)
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume)
self.volume.detach_volume(self.context, volume_id, attachment['id'])
vol = db.volume_get(self.context, volume_id)
self.assertEqual('in-use', vol['status'])
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume)
self.volume.detach_volume(self.context, volume_id, attachment2['id'])
vol = db.volume_get(self.context, volume_id)
self.assertEqual('available', vol['status'])
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.VolumeNotFound,
db.volume_get,
self.context,
volume_id)
def test_run_attach_twice_multiattach_volume_for_instances(self):
"""Make sure volume can be attached to multiple instances."""
mountpoint = "/dev/sdf"
# attach volume to the instance then to detach
instance_uuid = '12345678-1234-5678-1234-567812345699'
volume = tests_utils.create_volume(self.context,
admin_metadata={'readonly': 'True'},
multiattach=True,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
attachment = self.volume.attach_volume(self.context, volume_id,
instance_uuid, None,
mountpoint, 'ro')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertTrue(vol['multiattach'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
attachment['attach_status'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertEqual(instance_uuid, attachment['instance_uuid'])
self.assertIsNone(attachment['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='True', attached_mode='ro')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume, connector)
self.assertEqual('ro', conn_info['data']['access_mode'])
mountpoint2 = "/dev/sdx"
attachment2 = self.volume.attach_volume(self.context, volume_id,
instance_uuid, None,
mountpoint2, 'ro')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertTrue(vol['multiattach'])
self.assertEqual('attached', attachment2['attach_status'])
self.assertEqual(mountpoint, attachment2['mountpoint'])
self.assertEqual(instance_uuid, attachment2['instance_uuid'])
self.assertIsNone(attachment2['attached_host'])
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume)
def test_attach_detach_not_multiattach_volume_for_instances(self):
"""Make sure volume can't be attached to more than one instance."""
mountpoint = "/dev/sdf"
# attach volume to the instance then to detach
instance_uuid = '12345678-1234-5678-1234-567812345678'
volume = tests_utils.create_volume(self.context,
admin_metadata={'readonly': 'True'},
multiattach=False,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
attachment = self.volume.attach_volume(self.context, volume_id,
instance_uuid, None,
mountpoint, 'ro')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertFalse(vol['multiattach'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
attachment['attach_status'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertEqual(instance_uuid, attachment['instance_uuid'])
self.assertIsNone(attachment['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='True', attached_mode='ro')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume, connector)
self.assertEqual('ro', conn_info['data']['access_mode'])
instance2_uuid = '12345678-1234-5678-1234-567812345000'
mountpoint2 = "/dev/sdx"
self.assertRaises(exception.InvalidVolume,
self.volume.attach_volume,
self.context,
volume_id,
instance2_uuid,
None,
mountpoint2, 'ro')
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume)
self.volume.detach_volume(self.context, volume_id, attachment['id'])
vol = db.volume_get(self.context, volume_id)
self.assertEqual('available', vol['status'])
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.VolumeNotFound,
db.volume_get,
self.context,
volume_id)
def test_run_attach_detach_volume_for_host(self):
"""Make sure volume can be attached and detached from host."""
mountpoint = "/dev/sdf"
volume = tests_utils.create_volume(
self.context,
admin_metadata={'readonly': 'False'},
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
attachment = self.volume.attach_volume(self.context, volume_id, None,
'fake_host', mountpoint, 'rw')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
attachment['attach_status'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertIsNone(attachment['instance_uuid'])
# sanitized, conforms to RFC-952 and RFC-1123 specs.
self.assertEqual('fake-host', attachment['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='False', attached_mode='rw')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume, connector)
self.assertEqual('rw', conn_info['data']['access_mode'])
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume)
self.volume.detach_volume(self.context, volume_id, attachment['id'])
vol = db.volume_get(self.context, volume_id)
self.assertEqual("available", vol['status'])
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.VolumeNotFound,
db.volume_get,
self.context,
volume_id)
def test_run_attach_detach_multiattach_volume_for_hosts(self):
"""Make sure volume can be attached and detached from hosts."""
mountpoint = "/dev/sdf"
volume = tests_utils.create_volume(
self.context,
admin_metadata={'readonly': 'False'},
multiattach=True,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
attachment = self.volume.attach_volume(self.context, volume_id, None,
'fake_host', mountpoint, 'rw')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertTrue(vol['multiattach'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
attachment['attach_status'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertIsNone(attachment['instance_uuid'])
# sanitized, conforms to RFC-952 and RFC-1123 specs.
self.assertEqual('fake-host', attachment['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='False', attached_mode='rw')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume, connector)
self.assertEqual('rw', conn_info['data']['access_mode'])
mountpoint2 = "/dev/sdx"
attachment2 = self.volume.attach_volume(self.context, volume_id, None,
'fake_host2', mountpoint2,
'rw')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
attachment2['attach_status'])
self.assertEqual(mountpoint2, attachment2['mountpoint'])
self.assertIsNone(attachment2['instance_uuid'])
# sanitized, conforms to RFC-952 and RFC-1123 specs.
self.assertEqual('fake-host2', attachment2['attached_host'])
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume)
self.volume.detach_volume(self.context, volume_id, attachment['id'])
vol = db.volume_get(self.context, volume_id)
self.assertEqual("in-use", vol['status'])
self.volume.detach_volume(self.context, volume_id, attachment2['id'])
vol = db.volume_get(self.context, volume_id)
self.assertEqual("available", vol['status'])
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.VolumeNotFound,
db.volume_get,
self.context,
volume_id)
def test_run_attach_twice_multiattach_volume_for_hosts(self):
"""Make sure volume can be attached and detached from hosts."""
mountpoint = "/dev/sdf"
volume = tests_utils.create_volume(
self.context,
admin_metadata={'readonly': 'False'},
multiattach=True,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
attachment = self.volume.attach_volume(self.context, volume_id, None,
'fake_host', mountpoint, 'rw')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertTrue(vol['multiattach'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
attachment['attach_status'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertIsNone(attachment['instance_uuid'])
# sanitized, conforms to RFC-952 and RFC-1123 specs.
self.assertEqual('fake-host', attachment['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='False', attached_mode='rw')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume, connector)
self.assertEqual('rw', conn_info['data']['access_mode'])
mountpoint2 = "/dev/sdx"
attachment2 = self.volume.attach_volume(self.context, volume_id, None,
'fake_host', mountpoint2,
'rw')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertEqual('attached', attachment2['attach_status'])
self.assertEqual(mountpoint, attachment2['mountpoint'])
self.assertIsNone(attachment2['instance_uuid'])
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume)
def test_run_attach_detach_not_multiattach_volume_for_hosts(self):
"""Make sure volume can't be attached to more than one host."""
mountpoint = "/dev/sdf"
volume = tests_utils.create_volume(
self.context,
admin_metadata={'readonly': 'False'},
multiattach=False,
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
attachment = self.volume.attach_volume(self.context, volume_id, None,
'fake_host', mountpoint, 'rw')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertFalse(vol['multiattach'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
attachment['attach_status'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertIsNone(attachment['instance_uuid'])
# sanitized, conforms to RFC-952 and RFC-1123 specs.
self.assertEqual('fake-host', attachment['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='False', attached_mode='rw')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume, connector)
self.assertEqual('rw', conn_info['data']['access_mode'])
mountpoint2 = "/dev/sdx"
self.assertRaises(exception.InvalidVolume,
self.volume.attach_volume,
self.context,
volume_id,
None,
'fake_host2',
mountpoint2,
'rw')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual('in-use', vol['status'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
attachment['attach_status'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertIsNone(attachment['instance_uuid'])
# sanitized, conforms to RFC-952 and RFC-1123 specs.
self.assertEqual('fake-host', attachment['attached_host'])
self.assertRaises(exception.VolumeAttached,
self.volume.delete_volume,
self.context,
volume)
self.volume.detach_volume(self.context, volume_id, attachment['id'])
vol = db.volume_get(self.context, volume_id)
self.assertEqual('available', vol['status'])
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.VolumeNotFound,
db.volume_get,
self.context,
volume_id)
def test_run_attach_detach_volume_with_attach_mode(self):
instance_uuid = '12345678-1234-5678-1234-567812345678'
mountpoint = "/dev/sdf"
volume = tests_utils.create_volume(self.context,
admin_metadata={'readonly': 'True'},
**self.volume_params)
volume_id = volume['id']
db.volume_update(self.context, volume_id, {'status': 'available', })
self.volume.attach_volume(self.context, volume_id, instance_uuid,
None, mountpoint, 'ro')
vol = db.volume_get(context.get_admin_context(), volume_id)
attachment = vol['volume_attachment'][0]
self.assertEqual('in-use', vol['status'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
vol['attach_status'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertEqual(instance_uuid, attachment['instance_uuid'])
self.assertIsNone(attachment['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='True', attached_mode='ro')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume, connector)
self.assertEqual('ro', conn_info['data']['access_mode'])
self.volume.detach_volume(self.context, volume_id, attachment['id'])
vol = db.volume_get(self.context, volume_id)
attachment = vol['volume_attachment']
self.assertEqual('available', vol['status'])
self.assertEqual(fields.VolumeAttachStatus.DETACHED,
vol['attach_status'])
self.assertEqual([], attachment)
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(1, len(admin_metadata))
self.assertEqual('readonly', admin_metadata[0]['key'])
self.assertEqual('True', admin_metadata[0]['value'])
self.volume.attach_volume(self.context, volume_id, None,
'fake_host', mountpoint, 'ro')
vol = db.volume_get(context.get_admin_context(), volume_id)
attachment = vol['volume_attachment'][0]
self.assertEqual('in-use', vol['status'])
self.assertEqual(fields.VolumeAttachStatus.ATTACHED,
vol['attach_status'])
self.assertEqual(mountpoint, attachment['mountpoint'])
self.assertIsNone(attachment['instance_uuid'])
self.assertEqual('fake-host', attachment['attached_host'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='True', attached_mode='ro')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
conn_info = self.volume.initialize_connection(self.context,
volume, connector)
self.assertEqual('ro', conn_info['data']['access_mode'])
self.volume.detach_volume(self.context, volume_id,
attachment['id'])
vol = db.volume_get(self.context, volume_id)
attachment = vol['volume_attachment']
self.assertEqual('available', vol['status'])
self.assertEqual(fields.VolumeAttachStatus.DETACHED,
vol['attach_status'])
self.assertEqual([], attachment)
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(1, len(admin_metadata))
self.assertEqual('readonly', admin_metadata[0]['key'])
self.assertEqual('True', admin_metadata[0]['value'])
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.VolumeNotFound,
db.volume_get,
self.context,
volume_id)
def test_run_manager_attach_detach_volume_with_wrong_attach_mode(self):
# Not allow using 'read-write' mode attach readonly volume
instance_uuid = '12345678-1234-5678-1234-567812345678'
mountpoint = "/dev/sdf"
volume = tests_utils.create_volume(self.context,
admin_metadata={'readonly': 'True'},
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
self.assertRaises(exception.InvalidVolumeAttachMode,
self.volume.attach_volume,
self.context,
volume_id,
instance_uuid,
None,
mountpoint,
'rw')
# Assert a user message was created
self.volume.message_api.create.assert_called_once_with(
self.context, message_field.Action.ATTACH_VOLUME,
resource_uuid=volume['id'],
exception=mock.ANY)
attachment = objects.VolumeAttachmentList.get_all_by_volume_id(
context.get_admin_context(), volume_id)[0]
self.assertEqual(fields.VolumeAttachStatus.ERROR_ATTACHING,
attachment.attach_status)
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(fields.VolumeAttachStatus.DETACHED,
vol['attach_status'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='True', attached_mode='rw')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
db.volume_update(self.context, volume_id, {'status': 'available'})
self.assertRaises(exception.InvalidVolumeAttachMode,
self.volume.attach_volume,
self.context,
volume_id,
None,
'fake_host',
mountpoint,
'rw')
attachment = objects.VolumeAttachmentList.get_all_by_volume_id(
context.get_admin_context(), volume_id)[0]
self.assertEqual(fields.VolumeAttachStatus.ERROR_ATTACHING,
attachment.attach_status)
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(fields.VolumeAttachStatus.DETACHED,
vol['attach_status'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(2, len(admin_metadata))
expected = dict(readonly='True', attached_mode='rw')
ret = {}
for item in admin_metadata:
ret.update({item['key']: item['value']})
self.assertDictEqual(expected, ret)
def test_run_api_attach_detach_volume_with_wrong_attach_mode(self):
# Not allow using 'read-write' mode attach readonly volume
instance_uuid = '12345678-1234-5678-1234-567812345678'
mountpoint = "/dev/sdf"
volume = tests_utils.create_volume(self.context,
admin_metadata={'readonly': 'True'},
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidVolumeAttachMode,
volume_api.attach,
self.context,
volume,
instance_uuid,
None,
mountpoint,
'rw')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(fields.VolumeAttachStatus.DETACHED,
vol['attach_status'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(1, len(admin_metadata))
self.assertEqual('readonly', admin_metadata[0]['key'])
self.assertEqual('True', admin_metadata[0]['value'])
db.volume_update(self.context, volume_id, {'status': 'available'})
self.assertRaises(exception.InvalidVolumeAttachMode,
volume_api.attach,
self.context,
volume,
None,
'fake_host',
mountpoint,
'rw')
vol = db.volume_get(context.get_admin_context(), volume_id)
self.assertEqual(fields.VolumeAttachStatus.DETACHED,
vol['attach_status'])
admin_metadata = vol['volume_admin_metadata']
self.assertEqual(1, len(admin_metadata))
self.assertEqual('readonly', admin_metadata[0]['key'])
self.assertEqual('True', admin_metadata[0]['value'])
def test_detach_volume_while_uploading_to_image_is_in_progress(self):
# If instance is booted from volume with 'Terminate on Delete' flag
# set, and when we delete instance then it tries to delete volume
# even it is in 'uploading' state.
# It is happening because detach call is setting volume status to
# 'available'.
mountpoint = "/dev/sdf"
# Attach volume to the instance
instance_uuid = '12345678-1234-5678-1234-567812345678'
volume = tests_utils.create_volume(self.context,
admin_metadata={'readonly': 'True'},
**self.volume_params)
volume_id = volume['id']
self.volume.create_volume(self.context, volume)
self.volume.attach_volume(self.context, volume_id, instance_uuid,
None, mountpoint, 'ro')
# Change volume status to 'uploading'
db.volume_update(self.context, volume_id, {'status': 'uploading'})
# Call detach api
self.volume.detach_volume(self.context, volume_id)
vol = db.volume_get(self.context, volume_id)
# Check that volume status is 'uploading'
self.assertEqual("uploading", vol['status'])
self.assertEqual(fields.VolumeAttachStatus.DETACHED,
vol['attach_status'])
def test_volume_attach_in_maintenance(self):
"""Test attach the volume in maintenance."""
test_meta1 = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
volume['status'] = 'maintenance'
self.assertRaises(exception.InvalidVolume,
self.volume_api.attach,
self.context,
volume, None, None, None, None)
@mock.patch('cinder.volume.api.API.attachment_deletion_allowed')
def test_volume_detach_in_maintenance(self, mock_attachment_deletion):
"""Test detach the volume in maintenance."""
test_meta1 = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
volume = tests_utils.create_volume(self.context, metadata=test_meta1,
**self.volume_params)
volume['status'] = 'maintenance'
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidVolume,
volume_api.detach,
self.context,
volume, None)
mock_attachment_deletion.assert_called_once_with(self.context,
None, volume)