144 lines
6.2 KiB
Python
144 lines
6.2 KiB
Python
# Copyright 2016 NEC Corporation. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from tempest.api.compute import base
|
|
from tempest.common import utils
|
|
from tempest import config
|
|
from tempest.lib.common.utils import data_utils
|
|
from tempest.lib import decorators
|
|
from tempest.lib import exceptions as lib_exc
|
|
|
|
CONF = config.CONF
|
|
|
|
|
|
class VolumesAdminNegativeTest(base.BaseV2ComputeAdminTest):
|
|
"""Negative tests of volume swapping"""
|
|
|
|
create_default_network = True
|
|
|
|
@classmethod
|
|
def skip_checks(cls):
|
|
super(VolumesAdminNegativeTest, cls).skip_checks()
|
|
if not CONF.service_available.cinder:
|
|
skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
|
|
raise cls.skipException(skip_msg)
|
|
|
|
@classmethod
|
|
def resource_setup(cls):
|
|
super(VolumesAdminNegativeTest, cls).resource_setup()
|
|
cls.server = cls.create_test_server(wait_until='ACTIVE')
|
|
|
|
@decorators.attr(type=['negative'])
|
|
@decorators.idempotent_id('309b5ecd-0585-4a7e-a36f-d2b2bf55259d')
|
|
def test_update_attached_volume_with_nonexistent_volume_in_uri(self):
|
|
"""Test swapping non existent volume should fail"""
|
|
volume = self.create_volume()
|
|
nonexistent_volume = data_utils.rand_uuid()
|
|
self.assertRaises(lib_exc.NotFound,
|
|
self.admin_servers_client.update_attached_volume,
|
|
self.server['id'], nonexistent_volume,
|
|
volumeId=volume['id'])
|
|
|
|
@decorators.related_bug('1629110', status_code=400)
|
|
@decorators.attr(type=['negative'])
|
|
@decorators.idempotent_id('7dcac15a-b107-46d3-a5f6-cb863f4e454a')
|
|
def test_update_attached_volume_with_nonexistent_volume_in_body(self):
|
|
"""Test swapping volume to a non existence volume should fail"""
|
|
volume = self.create_volume()
|
|
self.attach_volume(self.server, volume)
|
|
|
|
nonexistent_volume = data_utils.rand_uuid()
|
|
self.assertRaises(lib_exc.BadRequest,
|
|
self.admin_servers_client.update_attached_volume,
|
|
self.server['id'], volume['id'],
|
|
volumeId=nonexistent_volume)
|
|
|
|
|
|
class UpdateMultiattachVolumeNegativeTest(base.BaseV2ComputeAdminTest):
|
|
"""Negative tests of swapping volume attached to multiple servers
|
|
|
|
Negative tests of swapping volume attached to multiple servers with
|
|
compute microversion greater than 2.59 and volume microversion greater
|
|
than 3.26
|
|
"""
|
|
|
|
min_microversion = '2.60'
|
|
volume_min_microversion = '3.27'
|
|
|
|
@classmethod
|
|
def skip_checks(cls):
|
|
super(UpdateMultiattachVolumeNegativeTest, cls).skip_checks()
|
|
if not CONF.compute_feature_enabled.volume_multiattach:
|
|
raise cls.skipException('Volume multi-attach is not available.')
|
|
|
|
@decorators.attr(type=['negative'])
|
|
@decorators.idempotent_id('7576d497-b7c6-44bd-9cc5-c5b4e50fec71')
|
|
@utils.services('volume')
|
|
def test_multiattach_rw_volume_update_failure(self):
|
|
"""Test swapping volume attached to multi-servers with read-write mode
|
|
|
|
1. Create two volumes "vol1" and "vol2"
|
|
2. Create two instances "server1" and "server2"
|
|
3. Attach "vol1" to both of these instances
|
|
4. By default both of these attachments should have an attach_mode of
|
|
read-write, so trying to swap "vol1" to "vol2" should fail
|
|
5. Check "vol1" is still attached to both servers
|
|
6. Check "vol2" is not attached to any server
|
|
"""
|
|
# Create two multiattach capable volumes.
|
|
vol1 = self.create_volume(multiattach=True)
|
|
vol2 = self.create_volume(multiattach=True)
|
|
|
|
# Create two instances.
|
|
server1 = self.create_test_server(wait_until='ACTIVE')
|
|
server2 = self.create_test_server(wait_until='ACTIVE')
|
|
|
|
# Attach vol1 to both of these instances.
|
|
vol1_attachment1 = self.attach_volume(server1, vol1)
|
|
vol1_attachment2 = self.attach_volume(server2, vol1)
|
|
|
|
# Assert that we now have two attachments.
|
|
vol1 = self.volumes_client.show_volume(vol1['id'])['volume']
|
|
self.assertEqual(2, len(vol1['attachments']))
|
|
|
|
# By default both of these attachments should have an attach_mode of
|
|
# read-write, assert that here to ensure the following calls to update
|
|
# the volume will be rejected.
|
|
for volume_attachment in vol1['attachments']:
|
|
attachment_id = volume_attachment['attachment_id']
|
|
attachment = self.attachments_client.show_attachment(
|
|
attachment_id)['attachment']
|
|
self.assertEqual('rw', attachment['attach_mode'])
|
|
|
|
# Assert that a BadRequest is raised when we attempt to update volume1
|
|
# to volume2 on server1 or server2.
|
|
self.assertRaises(lib_exc.BadRequest,
|
|
self.admin_servers_client.update_attached_volume,
|
|
server1['id'], vol1['id'], volumeId=vol2['id'])
|
|
self.assertRaises(lib_exc.BadRequest,
|
|
self.admin_servers_client.update_attached_volume,
|
|
server2['id'], vol1['id'], volumeId=vol2['id'])
|
|
|
|
# Fetch the volume 1 to check the current attachments.
|
|
vol1 = self.volumes_client.show_volume(vol1['id'])['volume']
|
|
vol1_attachment_ids = [a['id'] for a in vol1['attachments']]
|
|
|
|
# Assert that volume 1 is still attached to both server 1 and 2.
|
|
self.assertIn(vol1_attachment1['id'], vol1_attachment_ids)
|
|
self.assertIn(vol1_attachment2['id'], vol1_attachment_ids)
|
|
|
|
# Assert that volume 2 has no attachments.
|
|
vol2 = self.volumes_client.show_volume(vol2['id'])['volume']
|
|
self.assertEqual([], vol2['attachments'])
|