openstacksdk/openstack/tests/unit/block_storage/v3/test_proxy.py

696 lines
25 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from openstack.block_storage.v3 import _proxy
from openstack.block_storage.v3 import backup
from openstack.block_storage.v3 import capabilities
from openstack.block_storage.v3 import extension
from openstack.block_storage.v3 import group_type
from openstack.block_storage.v3 import limits
from openstack.block_storage.v3 import quota_set
from openstack.block_storage.v3 import resource_filter
from openstack.block_storage.v3 import snapshot
from openstack.block_storage.v3 import stats
from openstack.block_storage.v3 import type
from openstack.block_storage.v3 import volume
from openstack import resource
from openstack.tests.unit import test_proxy_base
class TestVolumeProxy(test_proxy_base.TestProxyBase):
def setUp(self):
super(TestVolumeProxy, self).setUp()
self.proxy = _proxy.Proxy(self.session)
class TestVolume(TestVolumeProxy):
def test_volume_get(self):
self.verify_get(self.proxy.get_volume, volume.Volume)
def test_volume_find(self):
self.verify_find(self.proxy.find_volume, volume.Volume)
def test_volumes_detailed(self):
self.verify_list(self.proxy.volumes, volume.Volume,
method_kwargs={"details": True, "query": 1},
expected_kwargs={"query": 1,
"base_path": "/volumes/detail"})
def test_volumes_not_detailed(self):
self.verify_list(self.proxy.volumes, volume.Volume,
method_kwargs={"details": False, "query": 1},
expected_kwargs={"query": 1})
def test_volume_create_attrs(self):
self.verify_create(self.proxy.create_volume, volume.Volume)
def test_volume_delete(self):
self.verify_delete(self.proxy.delete_volume, volume.Volume, False)
def test_volume_delete_ignore(self):
self.verify_delete(self.proxy.delete_volume, volume.Volume, True)
def test_volume_delete_force(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.force_delete",
self.proxy.delete_volume,
method_args=["value"],
method_kwargs={"force": True},
expected_args=[self.proxy]
)
def test_backend_pools(self):
self.verify_list(self.proxy.backend_pools, stats.Pools)
def test_limits_get(self):
self.verify_get(
self.proxy.get_limits, limits.Limit,
method_args=[],
expected_kwargs={'requires_id': False})
def test_capabilites_get(self):
self.verify_get(self.proxy.get_capabilities, capabilities.Capabilities)
def test_resource_filters(self):
self.verify_list(self.proxy.resource_filters,
resource_filter.ResourceFilter)
def test_group_type_get(self):
self.verify_get(self.proxy.get_group_type, group_type.GroupType)
def test_group_type_find(self):
self.verify_find(self.proxy.find_group_type, group_type.GroupType)
def test_group_types(self):
self.verify_list(self.proxy.group_types, group_type.GroupType)
def test_group_type_create(self):
self.verify_create(self.proxy.create_group_type, group_type.GroupType)
def test_group_type_delete(self):
self.verify_delete(
self.proxy.delete_group_type, group_type.GroupType, False)
def test_group_type_delete_ignore(self):
self.verify_delete(
self.proxy.delete_group_type, group_type.GroupType, True)
def test_group_type_update(self):
self.verify_update(self.proxy.update_group_type, group_type.GroupType)
def test_extensions(self):
self.verify_list(self.proxy.extensions, extension.Extension)
def test_get_volume_metadata(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.fetch_metadata",
self.proxy.get_volume_metadata,
method_args=["value"],
expected_args=[self.proxy],
expected_result=volume.Volume(id="value", metadata={}))
def test_set_volume_metadata(self):
kwargs = {"a": "1", "b": "2"}
id = "an_id"
self._verify(
"openstack.block_storage.v3.volume.Volume.set_metadata",
self.proxy.set_volume_metadata,
method_args=[id],
method_kwargs=kwargs,
method_result=volume.Volume.existing(
id=id, metadata=kwargs),
expected_args=[self.proxy],
expected_kwargs={'metadata': kwargs},
expected_result=volume.Volume.existing(
id=id, metadata=kwargs))
def test_delete_volume_metadata(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.delete_metadata_item",
self.proxy.delete_volume_metadata,
expected_result=None,
method_args=["value", ["key"]],
expected_args=[self.proxy, "key"])
def test_volume_wait_for(self):
value = volume.Volume(id='1234')
self.verify_wait_for_status(
self.proxy.wait_for_status,
method_args=[value],
expected_args=[self.proxy, value, 'available', ['error'], 2, 120])
class TestVolumeActions(TestVolumeProxy):
def test_volume_extend(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.extend",
self.proxy.extend_volume,
method_args=["value", "new-size"],
expected_args=[self.proxy, "new-size"])
def test_volume_set_readonly_no_argument(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.set_readonly",
self.proxy.set_volume_readonly,
method_args=["value"],
expected_args=[self.proxy, True])
def test_volume_set_readonly_false(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.set_readonly",
self.proxy.set_volume_readonly,
method_args=["value", False],
expected_args=[self.proxy, False])
def test_volume_set_bootable(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.set_bootable_status",
self.proxy.set_volume_bootable_status,
method_args=["value", True],
expected_args=[self.proxy, True])
def test_volume_reset_volume_status(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.reset_status",
self.proxy.reset_volume_status,
method_args=["value", '1', '2', '3'],
expected_args=[self.proxy, '1', '2', '3'])
def test_volume_revert_to_snapshot(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.revert_to_snapshot",
self.proxy.revert_volume_to_snapshot,
method_args=["value", '1'],
expected_args=[self.proxy, '1'])
def test_attach_instance(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.attach",
self.proxy.attach_volume,
method_args=["value", '1'],
method_kwargs={'instance': '2'},
expected_args=[self.proxy, '1', '2', None])
def test_attach_host(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.attach",
self.proxy.attach_volume,
method_args=["value", '1'],
method_kwargs={'host_name': '3'},
expected_args=[self.proxy, '1', None, '3'])
def test_detach_defaults(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.detach",
self.proxy.detach_volume,
method_args=["value", '1'],
expected_args=[self.proxy, '1', False, None])
def test_detach_force(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.detach",
self.proxy.detach_volume,
method_args=["value", '1', True, {'a': 'b'}],
expected_args=[self.proxy, '1', True, {'a': 'b'}])
def test_unmanage(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.unmanage",
self.proxy.unmanage_volume,
method_args=["value"],
expected_args=[self.proxy])
def test_migrate_default(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.migrate",
self.proxy.migrate_volume,
method_args=["value", '1'],
expected_args=[self.proxy, '1', False, False, None])
def test_migrate_nondefault(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.migrate",
self.proxy.migrate_volume,
method_args=["value", '1', True, True],
expected_args=[self.proxy, '1', True, True, None])
def test_migrate_cluster(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.migrate",
self.proxy.migrate_volume,
method_args=["value"],
method_kwargs={'cluster': '3'},
expected_args=[self.proxy, None, False, False, '3'])
def test_complete_migration(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.complete_migration",
self.proxy.complete_volume_migration,
method_args=["value", '1'],
expected_args=[self.proxy, "1", False])
def test_complete_migration_error(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.complete_migration",
self.proxy.complete_volume_migration,
method_args=["value", "1", True],
expected_args=[self.proxy, "1", True])
def test_upload_to_image(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.upload_to_image",
self.proxy.upload_volume_to_image,
method_args=["value", "1"],
expected_args=[self.proxy, "1"],
expected_kwargs={
"force": False,
"disk_format": None,
"container_format": None,
"visibility": None,
"protected": None
})
def test_upload_to_image_extended(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.upload_to_image",
self.proxy.upload_volume_to_image,
method_args=["value", "1"],
method_kwargs={
"disk_format": "2",
"container_format": "3",
"visibility": "4",
"protected": "5"
},
expected_args=[self.proxy, "1"],
expected_kwargs={
"force": False,
"disk_format": "2",
"container_format": "3",
"visibility": "4",
"protected": "5"
})
def test_reserve(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.reserve",
self.proxy.reserve_volume,
method_args=["value"],
expected_args=[self.proxy])
def test_unreserve(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.unreserve",
self.proxy.unreserve_volume,
method_args=["value"],
expected_args=[self.proxy])
def test_begin_detaching(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.begin_detaching",
self.proxy.begin_volume_detaching,
method_args=["value"],
expected_args=[self.proxy])
def test_abort_detaching(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.abort_detaching",
self.proxy.abort_volume_detaching,
method_args=["value"],
expected_args=[self.proxy])
def test_init_attachment(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.init_attachment",
self.proxy.init_volume_attachment,
method_args=["value", "1"],
expected_args=[self.proxy, "1"])
def test_terminate_attachment(self):
self._verify(
"openstack.block_storage.v3.volume.Volume.terminate_attachment",
self.proxy.terminate_volume_attachment,
method_args=["value", "1"],
expected_args=[self.proxy, "1"])
class TestBackup(TestVolumeProxy):
def test_backups_detailed(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_list(self.proxy.backups, backup.Backup,
method_kwargs={"details": True, "query": 1},
expected_kwargs={"query": 1,
"base_path": "/backups/detail"})
def test_backups_not_detailed(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_list(self.proxy.backups, backup.Backup,
method_kwargs={"details": False, "query": 1},
expected_kwargs={"query": 1})
def test_backup_get(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_get(self.proxy.get_backup, backup.Backup)
def test_backup_find(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_find(self.proxy.find_backup, backup.Backup)
def test_backup_delete(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_delete(self.proxy.delete_backup, backup.Backup, False)
def test_backup_delete_ignore(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_delete(self.proxy.delete_backup, backup.Backup, True)
def test_backup_delete_force(self):
self._verify(
"openstack.block_storage.v3.backup.Backup.force_delete",
self.proxy.delete_backup,
method_args=["value"],
method_kwargs={"force": True},
expected_args=[self.proxy]
)
def test_backup_create_attrs(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_create(self.proxy.create_backup, backup.Backup)
def test_backup_restore(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self._verify(
'openstack.block_storage.v3.backup.Backup.restore',
self.proxy.restore_backup,
method_args=['volume_id'],
method_kwargs={'volume_id': 'vol_id', 'name': 'name'},
expected_args=[self.proxy],
expected_kwargs={'volume_id': 'vol_id', 'name': 'name'}
)
def test_backup_reset(self):
self._verify(
"openstack.block_storage.v3.backup.Backup.reset",
self.proxy.reset_backup,
method_args=["value", "new_status"],
expected_args=[self.proxy, "new_status"])
class TestSnapshot(TestVolumeProxy):
def test_snapshot_get(self):
self.verify_get(self.proxy.get_snapshot, snapshot.Snapshot)
def test_snapshot_find(self):
self.verify_find(self.proxy.find_snapshot, snapshot.Snapshot)
def test_snapshots_detailed(self):
self.verify_list(self.proxy.snapshots, snapshot.SnapshotDetail,
method_kwargs={"details": True, "query": 1},
expected_kwargs={"query": 1,
"base_path": "/snapshots/detail"})
def test_snapshots_not_detailed(self):
self.verify_list(self.proxy.snapshots, snapshot.Snapshot,
method_kwargs={"details": False, "query": 1},
expected_kwargs={"query": 1})
def test_snapshot_create_attrs(self):
self.verify_create(self.proxy.create_snapshot, snapshot.Snapshot)
def test_snapshot_delete(self):
self.verify_delete(self.proxy.delete_snapshot,
snapshot.Snapshot, False)
def test_snapshot_delete_ignore(self):
self.verify_delete(self.proxy.delete_snapshot,
snapshot.Snapshot, True)
def test_snapshot_delete_force(self):
self._verify(
"openstack.block_storage.v3.snapshot.Snapshot.force_delete",
self.proxy.delete_snapshot,
method_args=["value"],
method_kwargs={"force": True},
expected_args=[self.proxy]
)
def test_reset(self):
self._verify(
"openstack.block_storage.v3.snapshot.Snapshot.reset",
self.proxy.reset_snapshot,
method_args=["value", "new_status"],
expected_args=[self.proxy, "new_status"])
def test_set_status(self):
self._verify(
"openstack.block_storage.v3.snapshot.Snapshot.set_status",
self.proxy.set_snapshot_status,
method_args=["value", "new_status"],
expected_args=[self.proxy, "new_status", None])
def test_set_status_percentage(self):
self._verify(
"openstack.block_storage.v3.snapshot.Snapshot.set_status",
self.proxy.set_snapshot_status,
method_args=["value", "new_status", "per"],
expected_args=[self.proxy, "new_status", "per"])
def test_get_snapshot_metadata(self):
self._verify(
"openstack.block_storage.v3.snapshot.Snapshot.fetch_metadata",
self.proxy.get_snapshot_metadata,
method_args=["value"],
expected_args=[self.proxy],
expected_result=snapshot.Snapshot(id="value", metadata={}))
def test_set_snapshot_metadata(self):
kwargs = {"a": "1", "b": "2"}
id = "an_id"
self._verify(
"openstack.block_storage.v3.snapshot.Snapshot.set_metadata",
self.proxy.set_snapshot_metadata,
method_args=[id],
method_kwargs=kwargs,
method_result=snapshot.Snapshot.existing(
id=id, metadata=kwargs),
expected_args=[self.proxy],
expected_kwargs={'metadata': kwargs},
expected_result=snapshot.Snapshot.existing(
id=id, metadata=kwargs))
def test_delete_snapshot_metadata(self):
self._verify(
"openstack.block_storage.v3.snapshot.Snapshot."
"delete_metadata_item",
self.proxy.delete_snapshot_metadata,
expected_result=None,
method_args=["value", ["key"]],
expected_args=[self.proxy, "key"])
class TestType(TestVolumeProxy):
def test_type_get(self):
self.verify_get(self.proxy.get_type, type.Type)
def test_type_find(self):
self.verify_find(self.proxy.find_type, type.Type)
def test_types(self):
self.verify_list(self.proxy.types, type.Type)
def test_type_create_attrs(self):
self.verify_create(self.proxy.create_type, type.Type)
def test_type_delete(self):
self.verify_delete(self.proxy.delete_type, type.Type, False)
def test_type_delete_ignore(self):
self.verify_delete(self.proxy.delete_type, type.Type, True)
def test_type_update(self):
self.verify_update(self.proxy.update_type, type.Type)
def test_type_extra_specs_update(self):
kwargs = {"a": "1", "b": "2"}
id = "an_id"
self._verify(
"openstack.block_storage.v3.type.Type.set_extra_specs",
self.proxy.update_type_extra_specs,
method_args=[id],
method_kwargs=kwargs,
method_result=type.Type.existing(id=id,
extra_specs=kwargs),
expected_args=[self.proxy],
expected_kwargs=kwargs,
expected_result=kwargs)
def test_type_extra_specs_delete(self):
self._verify(
"openstack.block_storage.v3.type.Type.delete_extra_specs",
self.proxy.delete_type_extra_specs,
expected_result=None,
method_args=["value", "key"],
expected_args=[self.proxy, "key"])
def test_type_get_private_access(self):
self._verify(
"openstack.block_storage.v3.type.Type.get_private_access",
self.proxy.get_type_access,
method_args=["value"],
expected_args=[self.proxy])
def test_type_add_private_access(self):
self._verify(
"openstack.block_storage.v3.type.Type.add_private_access",
self.proxy.add_type_access,
method_args=["value", "a"],
expected_args=[self.proxy, "a"])
def test_type_remove_private_access(self):
self._verify(
"openstack.block_storage.v3.type.Type.remove_private_access",
self.proxy.remove_type_access,
method_args=["value", "a"],
expected_args=[self.proxy, "a"])
def test_type_encryption_get(self):
self.verify_get(
self.proxy.get_type_encryption,
type.TypeEncryption,
method_args=['value'],
expected_args=[],
expected_kwargs={
'volume_type_id': 'value',
'requires_id': False
})
def test_type_encryption_create(self):
self.verify_create(
self.proxy.create_type_encryption,
type.TypeEncryption,
method_kwargs={'volume_type': 'id'},
expected_kwargs={'volume_type_id': 'id'}
)
def test_type_encryption_update(self):
self.verify_update(
self.proxy.update_type_encryption, type.TypeEncryption)
def test_type_encryption_delete(self):
self.verify_delete(
self.proxy.delete_type_encryption, type.TypeEncryption, False)
def test_type_encryption_delete_ignore(self):
self.verify_delete(
self.proxy.delete_type_encryption, type.TypeEncryption, True)
class TestQuota(TestVolumeProxy):
def test_get(self):
self._verify(
'openstack.resource.Resource.fetch',
self.proxy.get_quota_set,
method_args=['prj'],
expected_args=[self.proxy],
expected_kwargs={
'error_message': None,
'requires_id': False,
'usage': False,
},
method_result=quota_set.QuotaSet(),
expected_result=quota_set.QuotaSet()
)
def test_get_query(self):
self._verify(
'openstack.resource.Resource.fetch',
self.proxy.get_quota_set,
method_args=['prj'],
method_kwargs={
'usage': True,
'user_id': 'uid'
},
expected_args=[self.proxy],
expected_kwargs={
'error_message': None,
'requires_id': False,
'usage': True,
'user_id': 'uid'
}
)
def test_get_defaults(self):
self._verify(
'openstack.resource.Resource.fetch',
self.proxy.get_quota_set_defaults,
method_args=['prj'],
expected_args=[self.proxy],
expected_kwargs={
'error_message': None,
'requires_id': False,
'base_path': '/os-quota-sets/defaults'
}
)
def test_reset(self):
self._verify(
'openstack.resource.Resource.delete',
self.proxy.revert_quota_set,
method_args=['prj'],
method_kwargs={'user_id': 'uid'},
expected_args=[self.proxy],
expected_kwargs={
'user_id': 'uid'
}
)
@mock.patch('openstack.proxy.Proxy._get_resource', autospec=True)
def test_update(self, gr_mock):
gr_mock.return_value = resource.Resource()
gr_mock.commit = mock.Mock()
self._verify(
'openstack.resource.Resource.commit',
self.proxy.update_quota_set,
method_args=['qs'],
method_kwargs={
'query': {'user_id': 'uid'},
'a': 'b',
},
expected_args=[self.proxy],
expected_kwargs={
'user_id': 'uid'
}
)
gr_mock.assert_called_with(
self.proxy,
quota_set.QuotaSet,
'qs', a='b'
)