Adds volume snapshot functionality to shade
Adds volume snapshot create, delete, get, and list functions Change-Id: Ibe540f3b1932f1a703c98e580f98d0c7091c9622
This commit is contained in:
parent
7dff4f07e8
commit
fff354e64f
@ -1041,6 +1041,11 @@ class OpenStackCloud(object):
|
|||||||
return _utils._filter_list(
|
return _utils._filter_list(
|
||||||
volumes, name_or_id, filters, name_key='display_name')
|
volumes, name_or_id, filters, name_key='display_name')
|
||||||
|
|
||||||
|
def search_volume_snapshots(self, name_or_id=None, filters=None):
|
||||||
|
volumesnapshots = self.list_volume_snapshots()
|
||||||
|
return _utils._filter_list(
|
||||||
|
volumesnapshots, name_or_id, filters, name_key='display_name')
|
||||||
|
|
||||||
def search_flavors(self, name_or_id=None, filters=None):
|
def search_flavors(self, name_or_id=None, filters=None):
|
||||||
flavors = self.list_flavors()
|
flavors = self.list_flavors()
|
||||||
return _utils._filter_list(flavors, name_or_id, filters)
|
return _utils._filter_list(flavors, name_or_id, filters)
|
||||||
@ -2545,6 +2550,160 @@ class OpenStackCloud(object):
|
|||||||
)
|
)
|
||||||
return vol
|
return vol
|
||||||
|
|
||||||
|
def create_volume_snapshot(self, volume_id, force=False,
|
||||||
|
display_name=None, display_description=None,
|
||||||
|
wait=True, timeout=None):
|
||||||
|
"""Create a volume.
|
||||||
|
|
||||||
|
:param volume_id: the id of the volume to snapshot.
|
||||||
|
:param force: If set to True the snapshot will be created even if the
|
||||||
|
volume is attached to an instance, if False it will not
|
||||||
|
:param display_name: name of the snapshot, one will be generated if
|
||||||
|
one is not provided
|
||||||
|
:param display_description: description of the snapshot, one will be
|
||||||
|
one is not provided
|
||||||
|
:param wait: If true, waits for volume snapshot to be created.
|
||||||
|
:param timeout: Seconds to wait for volume snapshot creation. None is
|
||||||
|
forever.
|
||||||
|
|
||||||
|
:returns: The created volume object.
|
||||||
|
|
||||||
|
:raises: OpenStackCloudTimeout if wait time exceeded.
|
||||||
|
:raises: OpenStackCloudException on operation error.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
snapshot = self.manager.submitTask(
|
||||||
|
_tasks.VolumeSnapshotCreate(
|
||||||
|
volume_id=volume_id, force=force,
|
||||||
|
display_name=display_name,
|
||||||
|
display_description=display_description)
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
raise OpenStackCloudException(
|
||||||
|
"Error creating snapshot of volume %s: %s" % (volume_id, e)
|
||||||
|
)
|
||||||
|
|
||||||
|
snapshot = meta.obj_to_dict(snapshot)
|
||||||
|
|
||||||
|
if wait:
|
||||||
|
snapshot_id = snapshot['id']
|
||||||
|
for count in _utils._iterate_timeout(
|
||||||
|
timeout,
|
||||||
|
"Timeout waiting for the volume snapshot to be available."
|
||||||
|
):
|
||||||
|
snapshot = self.get_volume_snapshot_by_id(snapshot_id)
|
||||||
|
|
||||||
|
if snapshot['status'] == 'available':
|
||||||
|
break
|
||||||
|
|
||||||
|
if snapshot['status'] == 'error':
|
||||||
|
raise OpenStackCloudException(
|
||||||
|
"Error in creating volume, please check logs")
|
||||||
|
|
||||||
|
return snapshot
|
||||||
|
|
||||||
|
def get_volume_snapshot_by_id(self, snapshot_id):
|
||||||
|
"""Takes a snapshot_id and gets a dict of the snapshot
|
||||||
|
that maches that id.
|
||||||
|
|
||||||
|
Note: This is more efficient than get_volume_snapshot.
|
||||||
|
|
||||||
|
param: snapshot_id: ID of the volume snapshot.
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
snapshot = self.manager.submitTask(
|
||||||
|
_tasks.VolumeSnapshotGet(
|
||||||
|
snapshot_id=snapshot_id
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
raise OpenStackCloudException(
|
||||||
|
"Error getting snapshot %s: %s" % (snapshot_id, e)
|
||||||
|
)
|
||||||
|
|
||||||
|
return meta.obj_to_dict(snapshot)
|
||||||
|
|
||||||
|
def get_volume_snapshot(self, name_or_id, filters=None):
|
||||||
|
"""Get a volume by name or ID.
|
||||||
|
|
||||||
|
:param name_or_id: Name or ID of the volume snapshot.
|
||||||
|
:param dict filters:
|
||||||
|
A dictionary of meta data to use for further filtering. Elements
|
||||||
|
of this dictionary may, themselves, be dictionaries. Example::
|
||||||
|
|
||||||
|
{
|
||||||
|
'last_name': 'Smith',
|
||||||
|
'other': {
|
||||||
|
'gender': 'Female'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
:returns: A volume dict or None if no matching volume is
|
||||||
|
found.
|
||||||
|
|
||||||
|
"""
|
||||||
|
return _utils._get_entity(self.search_volume_snapshots, name_or_id,
|
||||||
|
filters)
|
||||||
|
|
||||||
|
def list_volume_snapshots(self, detailed=True, search_opts=None):
|
||||||
|
"""List all volume snapshots.
|
||||||
|
|
||||||
|
:returns: A list of volume snapshots dicts.
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return meta.obj_list_to_dict(
|
||||||
|
self.manager.submitTask(
|
||||||
|
_tasks.VolumeSnapshotList(detailed=detailed,
|
||||||
|
search_opts=search_opts)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
raise OpenStackCloudException(
|
||||||
|
"Error getting a list of snapshots: %s" % e
|
||||||
|
)
|
||||||
|
|
||||||
|
def delete_volume_snapshot(self, name_or_id=None, wait=False,
|
||||||
|
timeout=None):
|
||||||
|
"""Delete a volume snapshot.
|
||||||
|
|
||||||
|
:param name_or_id: Name or unique ID of the volume snapshot.
|
||||||
|
:param wait: If true, waits for volume snapshot to be deleted.
|
||||||
|
:param timeout: Seconds to wait for volume snapshot deletion. None is
|
||||||
|
forever.
|
||||||
|
|
||||||
|
:raises: OpenStackCloudTimeout if wait time exceeded.
|
||||||
|
:raises: OpenStackCloudException on operation error.
|
||||||
|
"""
|
||||||
|
|
||||||
|
volumesnapshot = self.get_volume_snapshot(name_or_id)
|
||||||
|
|
||||||
|
if not volumesnapshot:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.manager.submitTask(
|
||||||
|
_tasks.VolumeSnapshotDelete(
|
||||||
|
snapshot=volumesnapshot['id']
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
raise OpenStackCloudException(
|
||||||
|
"Error in deleting volume snapshot: %s" % str(e))
|
||||||
|
|
||||||
|
if wait:
|
||||||
|
for count in _utils._iterate_timeout(
|
||||||
|
timeout,
|
||||||
|
"Timeout waiting for the volume snapshot to be deleted."):
|
||||||
|
if not self.get_volume_snapshot(volumesnapshot['id']):
|
||||||
|
break
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
def get_server_id(self, name_or_id):
|
def get_server_id(self, name_or_id):
|
||||||
server = self.get_server(name_or_id)
|
server = self.get_server(name_or_id)
|
||||||
if server:
|
if server:
|
||||||
|
@ -261,6 +261,26 @@ class VolumeAttach(task_manager.Task):
|
|||||||
return client.nova_client.volumes.create_server_volume(**self.args)
|
return client.nova_client.volumes.create_server_volume(**self.args)
|
||||||
|
|
||||||
|
|
||||||
|
class VolumeSnapshotCreate(task_manager.Task):
|
||||||
|
def main(self, client):
|
||||||
|
return client.cinder_client.volume_snapshots.create(**self.args)
|
||||||
|
|
||||||
|
|
||||||
|
class VolumeSnapshotGet(task_manager.Task):
|
||||||
|
def main(self, client):
|
||||||
|
return client.cinder_client.volume_snapshots.get(**self.args)
|
||||||
|
|
||||||
|
|
||||||
|
class VolumeSnapshotList(task_manager.Task):
|
||||||
|
def main(self, client):
|
||||||
|
return client.cinder_client.volume_snapshots.list(**self.args)
|
||||||
|
|
||||||
|
|
||||||
|
class VolumeSnapshotDelete(task_manager.Task):
|
||||||
|
def main(self, client):
|
||||||
|
return client.cinder_client.volume_snapshots.delete(**self.args)
|
||||||
|
|
||||||
|
|
||||||
class NeutronSecurityGroupList(task_manager.Task):
|
class NeutronSecurityGroupList(task_manager.Task):
|
||||||
def main(self, client):
|
def main(self, client):
|
||||||
return client.neutron_client.list_security_groups()
|
return client.neutron_client.list_security_groups()
|
||||||
|
@ -101,6 +101,14 @@ class FakeVolume(object):
|
|||||||
self.display_name = display_name
|
self.display_name = display_name
|
||||||
|
|
||||||
|
|
||||||
|
class FakeVolumeSnapshot(object):
|
||||||
|
def __init__(self, id, status, display_name, display_description):
|
||||||
|
self.id = id
|
||||||
|
self.status = status
|
||||||
|
self.display_name = display_name
|
||||||
|
self.display_description = display_description
|
||||||
|
|
||||||
|
|
||||||
class FakeMachine(object):
|
class FakeMachine(object):
|
||||||
def __init__(self, id, name=None, driver=None, driver_info=None,
|
def __init__(self, id, name=None, driver=None, driver_info=None,
|
||||||
chassis_uuid=None, instance_info=None, instance_uuid=None,
|
chassis_uuid=None, instance_info=None, instance_uuid=None,
|
||||||
|
66
shade/tests/functional/test_volume.py
Normal file
66
shade/tests/functional/test_volume.py
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
test_volume
|
||||||
|
----------------------------------
|
||||||
|
|
||||||
|
Functional tests for `shade` block storage methods.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from testtools import content
|
||||||
|
|
||||||
|
from shade import openstack_cloud
|
||||||
|
from shade.tests import base
|
||||||
|
|
||||||
|
|
||||||
|
class TestVolume(base.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestVolume, self).setUp()
|
||||||
|
self.cloud = openstack_cloud(cloud='devstack')
|
||||||
|
if not self.cloud.has_service('volume'):
|
||||||
|
self.skipTest('volume service not supported by cloud')
|
||||||
|
|
||||||
|
def test_volumes(self):
|
||||||
|
'''Test volume and snapshot functionality'''
|
||||||
|
volume_name = self.getUniqueString()
|
||||||
|
snapshot_name = self.getUniqueString()
|
||||||
|
self.addDetail('volume', content.text_content(volume_name))
|
||||||
|
self.addCleanup(self.cleanup, volume_name, snapshot_name)
|
||||||
|
volume = self.cloud.create_volume(display_name=volume_name, size=1)
|
||||||
|
snapshot = self.cloud.create_volume_snapshot(
|
||||||
|
volume['id'],
|
||||||
|
display_name=snapshot_name
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual([volume], self.cloud.list_volumes())
|
||||||
|
self.assertEqual([snapshot], self.cloud.list_volume_snapshots())
|
||||||
|
self.assertEqual(snapshot,
|
||||||
|
self.cloud.get_volume_snapshot(
|
||||||
|
snapshot['display_name']))
|
||||||
|
self.assertEqual(snapshot,
|
||||||
|
self.cloud.get_volume_snapshot_by_id(snapshot['id']))
|
||||||
|
|
||||||
|
self.cloud.delete_volume_snapshot(snapshot_name, wait=True)
|
||||||
|
self.cloud.delete_volume(volume_name)
|
||||||
|
|
||||||
|
def cleanup(self, volume_name, snapshot_name):
|
||||||
|
volume = self.cloud.get_volume(volume_name)
|
||||||
|
snapshot = self.cloud.get_volume_snapshot(snapshot_name)
|
||||||
|
if volume:
|
||||||
|
self.cloud.delete_volume(volume_name)
|
||||||
|
|
||||||
|
if snapshot:
|
||||||
|
self.cloud.delete_volume_snapshot(snapshot_name)
|
120
shade/tests/unit/test_create_volume_snapshot.py
Normal file
120
shade/tests/unit/test_create_volume_snapshot.py
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
test_create_volume_snapshot
|
||||||
|
----------------------------------
|
||||||
|
|
||||||
|
Tests for the `create_volume_snapshot` command.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from mock import patch
|
||||||
|
import os_client_config
|
||||||
|
from shade import meta
|
||||||
|
from shade import OpenStackCloud
|
||||||
|
from shade.tests import base, fakes
|
||||||
|
from shade.exc import (OpenStackCloudException, OpenStackCloudTimeout)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateVolumeSnapshot(base.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestCreateVolumeSnapshot, self).setUp()
|
||||||
|
config = os_client_config.OpenStackConfig()
|
||||||
|
self.client = OpenStackCloud(
|
||||||
|
cloud_config=config.get_one_cloud(validate=False))
|
||||||
|
|
||||||
|
@patch.object(OpenStackCloud, 'cinder_client')
|
||||||
|
def test_create_volume_snapshot_wait(self, mock_cinder):
|
||||||
|
"""
|
||||||
|
Test that create_volume_snapshot with a wait returns the volume
|
||||||
|
snapshot when its status changes to "available".
|
||||||
|
"""
|
||||||
|
build_snapshot = fakes.FakeVolumeSnapshot('1234', 'creating',
|
||||||
|
'foo', 'derpysnapshot')
|
||||||
|
fake_snapshot = fakes.FakeVolumeSnapshot('1234', 'available',
|
||||||
|
'foo', 'derpysnapshot')
|
||||||
|
|
||||||
|
mock_cinder.volume_snapshots.create.return_value = build_snapshot
|
||||||
|
mock_cinder.volume_snapshots.get.return_value = fake_snapshot
|
||||||
|
mock_cinder.volume_snapshots.list.return_value = [
|
||||||
|
build_snapshot, fake_snapshot]
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
meta.obj_to_dict(fake_snapshot),
|
||||||
|
self.client.create_volume_snapshot(volume_id='1234',
|
||||||
|
wait=True)
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_cinder.volume_snapshots.create.assert_called_with(
|
||||||
|
display_description=None, display_name=None, force=False,
|
||||||
|
volume_id='1234'
|
||||||
|
)
|
||||||
|
mock_cinder.volume_snapshots.get.assert_called_with(
|
||||||
|
snapshot_id=meta.obj_to_dict(build_snapshot)['id']
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch.object(OpenStackCloud, 'cinder_client')
|
||||||
|
def test_create_volume_snapshot_with_timeout(self, mock_cinder):
|
||||||
|
"""
|
||||||
|
Test that a timeout while waiting for the volume snapshot to create
|
||||||
|
raises an exception in create_volume_snapshot.
|
||||||
|
"""
|
||||||
|
build_snapshot = fakes.FakeVolumeSnapshot('1234', 'creating',
|
||||||
|
'foo', 'derpysnapshot')
|
||||||
|
|
||||||
|
mock_cinder.volume_snapshots.create.return_value = build_snapshot
|
||||||
|
mock_cinder.volume_snapshots.get.return_value = build_snapshot
|
||||||
|
mock_cinder.volume_snapshots.list.return_value = [build_snapshot]
|
||||||
|
|
||||||
|
self.assertRaises(
|
||||||
|
OpenStackCloudTimeout,
|
||||||
|
self.client.create_volume_snapshot, volume_id='1234',
|
||||||
|
wait=True, timeout=1)
|
||||||
|
|
||||||
|
mock_cinder.volume_snapshots.create.assert_called_with(
|
||||||
|
display_description=None, display_name=None, force=False,
|
||||||
|
volume_id='1234'
|
||||||
|
)
|
||||||
|
mock_cinder.volume_snapshots.get.assert_called_with(
|
||||||
|
snapshot_id=meta.obj_to_dict(build_snapshot)['id']
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch.object(OpenStackCloud, 'cinder_client')
|
||||||
|
def test_create_volume_snapshot_with_error(self, mock_cinder):
|
||||||
|
"""
|
||||||
|
Test that a error status while waiting for the volume snapshot to
|
||||||
|
create raises an exception in create_volume_snapshot.
|
||||||
|
"""
|
||||||
|
build_snapshot = fakes.FakeVolumeSnapshot('1234', 'creating',
|
||||||
|
'bar', 'derpysnapshot')
|
||||||
|
error_snapshot = fakes.FakeVolumeSnapshot('1234', 'error',
|
||||||
|
'blah', 'derpysnapshot')
|
||||||
|
|
||||||
|
mock_cinder.volume_snapshots.create.return_value = build_snapshot
|
||||||
|
mock_cinder.volume_snapshots.get.return_value = error_snapshot
|
||||||
|
mock_cinder.volume_snapshots.list.return_value = [error_snapshot]
|
||||||
|
|
||||||
|
self.assertRaises(
|
||||||
|
OpenStackCloudException,
|
||||||
|
self.client.create_volume_snapshot, volume_id='1234',
|
||||||
|
wait=True, timeout=5)
|
||||||
|
|
||||||
|
mock_cinder.volume_snapshots.create.assert_called_with(
|
||||||
|
display_description=None, display_name=None, force=False,
|
||||||
|
volume_id='1234'
|
||||||
|
)
|
||||||
|
mock_cinder.volume_snapshots.get.assert_called_with(
|
||||||
|
snapshot_id=meta.obj_to_dict(build_snapshot)['id']
|
||||||
|
)
|
94
shade/tests/unit/test_delete_volume_snapshot.py
Normal file
94
shade/tests/unit/test_delete_volume_snapshot.py
Normal file
@ -0,0 +1,94 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
test_delete_volume_snapshot
|
||||||
|
----------------------------------
|
||||||
|
|
||||||
|
Tests for the `delete_volume_snapshot` command.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from mock import patch
|
||||||
|
import os_client_config
|
||||||
|
from shade import OpenStackCloud
|
||||||
|
from shade.tests import base, fakes
|
||||||
|
from shade.exc import (OpenStackCloudException, OpenStackCloudTimeout)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeleteVolumeSnapshot(base.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestDeleteVolumeSnapshot, self).setUp()
|
||||||
|
config = os_client_config.OpenStackConfig()
|
||||||
|
self.client = OpenStackCloud(
|
||||||
|
cloud_config=config.get_one_cloud(validate=False))
|
||||||
|
|
||||||
|
@patch.object(OpenStackCloud, 'cinder_client')
|
||||||
|
def test_delete_volume_snapshot(self, mock_cinder):
|
||||||
|
"""
|
||||||
|
Test that delete_volume_snapshot without a wait returns True instance
|
||||||
|
when the volume snapshot deletes.
|
||||||
|
"""
|
||||||
|
fake_snapshot = fakes.FakeVolumeSnapshot('1234', 'available',
|
||||||
|
'foo', 'derpysnapshot')
|
||||||
|
|
||||||
|
mock_cinder.volume_snapshots.list.return_value = [fake_snapshot]
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
True,
|
||||||
|
self.client.delete_volume_snapshot(name_or_id='1234', wait=False)
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_cinder.volume_snapshots.list.assert_called_with(detailed=True,
|
||||||
|
search_opts=None)
|
||||||
|
|
||||||
|
@patch.object(OpenStackCloud, 'cinder_client')
|
||||||
|
def test_delete_volume_snapshot_with_error(self, mock_cinder):
|
||||||
|
"""
|
||||||
|
Test that a exception while deleting a volume snapshot will cause an
|
||||||
|
OpenStackCloudException.
|
||||||
|
"""
|
||||||
|
fake_snapshot = fakes.FakeVolumeSnapshot('1234', 'available',
|
||||||
|
'foo', 'derpysnapshot')
|
||||||
|
|
||||||
|
mock_cinder.volume_snapshots.delete.side_effect = Exception(
|
||||||
|
"exception")
|
||||||
|
mock_cinder.volume_snapshots.list.return_value = [fake_snapshot]
|
||||||
|
|
||||||
|
self.assertRaises(
|
||||||
|
OpenStackCloudException,
|
||||||
|
self.client.delete_volume_snapshot, name_or_id='1234',
|
||||||
|
wait=True, timeout=1)
|
||||||
|
|
||||||
|
mock_cinder.volume_snapshots.delete.assert_called_with(
|
||||||
|
snapshot='1234')
|
||||||
|
|
||||||
|
@patch.object(OpenStackCloud, 'cinder_client')
|
||||||
|
def test_delete_volume_snapshot_with_timeout(self, mock_cinder):
|
||||||
|
"""
|
||||||
|
Test that a timeout while waiting for the volume snapshot to delete
|
||||||
|
raises an exception in delete_volume_snapshot.
|
||||||
|
"""
|
||||||
|
fake_snapshot = fakes.FakeVolumeSnapshot('1234', 'available',
|
||||||
|
'foo', 'derpysnapshot')
|
||||||
|
|
||||||
|
mock_cinder.volume_snapshots.list.return_value = [fake_snapshot]
|
||||||
|
|
||||||
|
self.assertRaises(
|
||||||
|
OpenStackCloudTimeout,
|
||||||
|
self.client.delete_volume_snapshot, name_or_id='1234',
|
||||||
|
wait=True, timeout=1)
|
||||||
|
|
||||||
|
mock_cinder.volume_snapshots.list.assert_called_with(detailed=True,
|
||||||
|
search_opts=None)
|
Loading…
Reference in New Issue
Block a user