Add mountable snapshots support to manila client

This patch adds support to manila client for the mountable snapshots
feature.

Co-Authored-By: Miriam Yumi <miriam.peixoto@fit-tecnologia.org.br>
Co-Authored-By: Alyson Rosa <alyson.rosa@fit-tecnologia.org.br>

Implements: blueprint manila-mountable-snapshots
Change-Id: I785a784bcae7cf3bcef4fa6c64ba28ee58328389
Depends-On: I65f398a05f82eef31ec317d70dfa101483b44b30
This commit is contained in:
tpsilva 2016-07-13 10:57:32 -03:00 committed by Rodrigo Barbieri
parent b4250866ea
commit c0fdf827b9
21 changed files with 1160 additions and 27 deletions

View File

@ -65,6 +65,8 @@ iniset $MANILACLIENT_CONF DEFAULT access_types_mapping "nfs:ip,cifs:user"
# Dummy driver is capable of running share migration tests # Dummy driver is capable of running share migration tests
iniset $MANILACLIENT_CONF DEFAULT run_migration_tests "True" iniset $MANILACLIENT_CONF DEFAULT run_migration_tests "True"
# Running mountable snapshot tests in dummy driver
iniset $MANILACLIENT_CONF DEFAULT run_mount_snapshot_tests "True"
# Create share network and use it for functional tests if required # Create share network and use it for functional tests if required
USE_SHARE_NETWORK=$(trueorfalse True USE_SHARE_NETWORK) USE_SHARE_NETWORK=$(trueorfalse True USE_SHARE_NETWORK)

View File

@ -27,7 +27,7 @@ from manilaclient import utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
MAX_VERSION = '2.31' MAX_VERSION = '2.32'
MIN_VERSION = '2.0' MIN_VERSION = '2.0'
DEPRECATED_VERSION = '1.0' DEPRECATED_VERSION = '1.0'
_VERSIONED_METHOD_MAP = {} _VERSIONED_METHOD_MAP = {}

View File

@ -160,6 +160,11 @@ share_opts = [
"Disable this feature if there is no more than one " "Disable this feature if there is no more than one "
"storage pool being tested or if used driver does not " "storage pool being tested or if used driver does not "
"support it."), "support it."),
cfg.BoolOpt("run_mount_snapshot_tests",
default=False,
help="Defines whether to run mountable snapshots tests or "
"not. Disable this feature if used driver doesn't "
"support it."),
] ]
# 2. Generate config # 2. Generate config

View File

@ -173,8 +173,8 @@ class BaseTestCase(base.ClientTestBase):
def create_share_type(cls, name=None, driver_handles_share_servers=True, def create_share_type(cls, name=None, driver_handles_share_servers=True,
snapshot_support=None, snapshot_support=None,
create_share_from_snapshot=None, create_share_from_snapshot=None,
revert_to_snapshot=None, is_public=True, revert_to_snapshot=None, mount_snapshot=None,
client=None, cleanup_in_class=True, is_public=True, client=None, cleanup_in_class=True,
microversion=None, extra_specs=None): microversion=None, extra_specs=None):
if client is None: if client is None:
client = cls.get_admin_client() client = cls.get_admin_client()
@ -186,7 +186,8 @@ class BaseTestCase(base.ClientTestBase):
microversion=microversion, microversion=microversion,
extra_specs=extra_specs, extra_specs=extra_specs,
create_share_from_snapshot=create_share_from_snapshot, create_share_from_snapshot=create_share_from_snapshot,
revert_to_snapshot=revert_to_snapshot revert_to_snapshot=revert_to_snapshot,
mount_snapshot=mount_snapshot,
) )
resource = { resource = {
"type": "share_type", "type": "share_type",

View File

@ -166,8 +166,8 @@ class ManilaCLIClient(base.CLIClient):
def create_share_type(self, name=None, driver_handles_share_servers=True, def create_share_type(self, name=None, driver_handles_share_servers=True,
snapshot_support=None, snapshot_support=None,
create_share_from_snapshot=None, create_share_from_snapshot=None,
revert_to_snapshot=None, is_public=True, revert_to_snapshot=None, mount_snapshot=None,
microversion=None, extra_specs=None): is_public=True, microversion=None, extra_specs=None):
"""Creates share type. """Creates share type.
:param name: text -- name of share type to use, if not set then :param name: text -- name of share type to use, if not set then
@ -183,6 +183,7 @@ class ManilaCLIClient(base.CLIClient):
alias. Default is None. alias. Default is None.
:param revert_to_snapshot: -- boolean or its string alias. Default is :param revert_to_snapshot: -- boolean or its string alias. Default is
None. None.
:param mount_snapshot: -- boolean or its string alias. Default is None.
""" """
if name is None: if name is None:
name = data_utils.rand_name('manilaclient_functional_test') name = data_utils.rand_name('manilaclient_functional_test')
@ -213,6 +214,12 @@ class ManilaCLIClient(base.CLIClient):
revert_to_snapshot) revert_to_snapshot)
cmd += (" --revert-to-snapshot-support " + revert_to_snapshot) cmd += (" --revert-to-snapshot-support " + revert_to_snapshot)
if mount_snapshot is not None:
if not isinstance(mount_snapshot, six.string_types):
mount_snapshot = six.text_type(
mount_snapshot)
cmd += (" --mount-snapshot-support " + mount_snapshot)
if extra_specs is not None: if extra_specs is not None:
extra_spec_str = '' extra_spec_str = ''
for k, v in extra_specs.items(): for k, v in extra_specs.items():
@ -827,6 +834,42 @@ class ManilaCLIClient(base.CLIClient):
snapshot = output_parser.details(snapshot_raw) snapshot = output_parser.details(snapshot_raw)
return snapshot return snapshot
@not_found_wrapper
def list_snapshot_export_locations(self, snapshot, columns=None,
microversion=None):
"""List snapshot export locations.
:param snapshot: str -- Name or ID of a snapshot.
:param columns: str -- comma separated string of columns.
Example, "--columns uuid,path".
:param microversion: API microversion to be used for request.
"""
cmd = "snapshot-export-location-list %s" % snapshot
if columns is not None:
cmd += " --columns " + columns
export_locations_raw = self.manila(cmd, microversion=microversion)
export_locations = utils.listing(export_locations_raw)
return export_locations
@not_found_wrapper
@forbidden_wrapper
def list_snapshot_instance_export_locations(self, snapshot_instance,
columns=None,
microversion=None):
"""List snapshot instance export locations.
:param snapshot_instance: str -- Name or ID of a snapshot instance.
:param columns: str -- comma separated string of columns.
Example, "--columns uuid,path".
:param microversion: API microversion to be used for request.
"""
cmd = "snapshot-instance-export-location-list %s" % snapshot_instance
if columns is not None:
cmd += " --columns " + columns
export_locations_raw = self.manila(cmd, microversion=microversion)
export_locations = utils.listing(export_locations_raw)
return export_locations
@not_found_wrapper @not_found_wrapper
@forbidden_wrapper @forbidden_wrapper
def delete_snapshot(self, snapshot, microversion=None): def delete_snapshot(self, snapshot, microversion=None):
@ -910,26 +953,55 @@ class ManilaCLIClient(base.CLIClient):
raise tempest_lib_exc.TimeoutException(message) raise tempest_lib_exc.TimeoutException(message)
@not_found_wrapper @not_found_wrapper
def list_access(self, share_id, columns=None, microversion=None): def list_access(self, entity_id, columns=None, microversion=None,
is_snapshot=False):
"""Returns list of access rules for a share. """Returns list of access rules for a share.
:param share_id: str -- Name or ID of a share. :param entity_id: str -- Name or ID of a share or snapshot.
:param columns: comma separated string of columns. :param columns: comma separated string of columns.
Example, "--columns access_type,access_to" Example, "--columns access_type,access_to"
:param is_snapshot: Boolean value to determine if should list
access of a share or snapshot.
""" """
cmd = 'access-list %s ' % share_id if is_snapshot:
cmd = 'snapshot-access-list %s ' % entity_id
else:
cmd = 'access-list %s ' % entity_id
if columns is not None: if columns is not None:
cmd += ' --columns ' + columns cmd += ' --columns ' + columns
access_list_raw = self.manila(cmd, microversion=microversion) access_list_raw = self.manila(cmd, microversion=microversion)
return output_parser.listing(access_list_raw) return output_parser.listing(access_list_raw)
@not_found_wrapper @not_found_wrapper
def get_access(self, share_id, access_id, microversion=None): def get_access(self, share_id, access_id, microversion=None,
for access in self.list_access(share_id, microversion=microversion): is_snapshot=False):
for access in self.list_access(share_id, microversion=microversion,
is_snapshot=is_snapshot):
if access['id'] == access_id: if access['id'] == access_id:
return access return access
raise tempest_lib_exc.NotFound() raise tempest_lib_exc.NotFound()
@not_found_wrapper
def snapshot_access_allow(self, snapshot_id, access_type, access_to,
microversion=None):
raw_access = self.manila(
'snapshot-access-allow %(id)s %(type)s %(access_to)s' % {
'id': snapshot_id,
'type': access_type,
'access_to': access_to,
},
microversion=microversion)
return output_parser.details(raw_access)
@not_found_wrapper
def snapshot_access_deny(self, snapshot_id, access_id, microversion=None):
return self.manila(
'snapshot-access-deny %(share_id)s %(access_id)s' % {
'share_id': snapshot_id,
'access_id': access_id,
},
microversion=microversion)
@not_found_wrapper @not_found_wrapper
def access_allow(self, share_id, access_type, access_to, access_level, def access_allow(self, share_id, access_type, access_to, access_level,
microversion=None): microversion=None):
@ -954,15 +1026,17 @@ class ManilaCLIClient(base.CLIClient):
microversion=microversion) microversion=microversion)
def wait_for_access_rule_status(self, share_id, access_id, state='active', def wait_for_access_rule_status(self, share_id, access_id, state='active',
microversion=None): microversion=None, is_snapshot=False):
access = self.get_access( access = self.get_access(
share_id, access_id, microversion=microversion) share_id, access_id, microversion=microversion,
is_snapshot=is_snapshot)
start = int(time.time()) start = int(time.time())
while access['state'] != state: while access['state'] != state:
time.sleep(self.build_interval) time.sleep(self.build_interval)
access = self.get_access( access = self.get_access(
share_id, access_id, microversion=microversion) share_id, access_id, microversion=microversion,
is_snapshot=is_snapshot)
if access['state'] == state: if access['state'] == state:
return return
@ -979,10 +1053,11 @@ class ManilaCLIClient(base.CLIClient):
raise tempest_lib_exc.TimeoutException(message) raise tempest_lib_exc.TimeoutException(message)
def wait_for_access_rule_deletion(self, share_id, access_id, def wait_for_access_rule_deletion(self, share_id, access_id,
microversion=None): microversion=None, is_snapshot=False):
try: try:
access = self.get_access( access = self.get_access(
share_id, access_id, microversion=microversion) share_id, access_id, microversion=microversion,
is_snapshot=is_snapshot)
except tempest_lib_exc.NotFound: except tempest_lib_exc.NotFound:
return return
@ -991,7 +1066,8 @@ class ManilaCLIClient(base.CLIClient):
time.sleep(self.build_interval) time.sleep(self.build_interval)
try: try:
access = self.get_access( access = self.get_access(
share_id, access_id, microversion=microversion) share_id, access_id, microversion=microversion,
is_snapshot=is_snapshot)
except tempest_lib_exc.NotFound: except tempest_lib_exc.NotFound:
return return
@ -1002,7 +1078,8 @@ class ManilaCLIClient(base.CLIClient):
if int(time.time()) - start >= self.build_timeout: if int(time.time()) - start >= self.build_timeout:
message = ( message = (
"Access rule %(access)s failed to reach deleted state " "Access rule %(access)s failed to reach deleted state "
"within the required time (%s s)." % self.build_timeout) "within the required time (%(timeout)s s)." %
{"access": access_id, "timeout": self.build_timeout})
raise tempest_lib_exc.TimeoutException(message) raise tempest_lib_exc.TimeoutException(message)
def reset_task_state(self, share_id, state, version=None): def reset_task_state(self, share_id, state, version=None):
@ -1136,6 +1213,40 @@ class ManilaCLIClient(base.CLIClient):
export_locations = utils.listing(export_locations_raw) export_locations = utils.listing(export_locations_raw)
return export_locations return export_locations
@not_found_wrapper
def get_snapshot_export_location(self, snapshot, export_location_uuid,
microversion=None):
"""Returns an export location by snapshot and its UUID.
:param snapshot: str -- Name or ID of a snapshot.
:param export_location_uuid: str -- UUID of an export location.
:param microversion: API microversion to be used for request.
"""
snapshot_raw = self.manila(
'snapshot-export-location-show %(snapshot)s %(el_uuid)s' % {
'snapshot': snapshot,
'el_uuid': export_location_uuid,
},
microversion=microversion)
snapshot = output_parser.details(snapshot_raw)
return snapshot
@not_found_wrapper
def get_snapshot_instance_export_location(
self, snapshot, export_location_uuid, microversion=None):
"""Returns an export location by snapshot instance and its UUID.
:param snapshot: str -- Name or ID of a snapshot instance.
:param export_location_uuid: str -- UUID of an export location.
:param microversion: API microversion to be used for request.
"""
snapshot_raw = self.manila(
'snapshot-instance-export-location-show %(snapshot)s %(el_uuid)s'
% {'snapshot': snapshot, 'el_uuid': export_location_uuid},
microversion=microversion)
snapshot = output_parser.details(snapshot_raw)
return snapshot
@not_found_wrapper @not_found_wrapper
def get_share_export_location(self, share, export_location_uuid, def get_share_export_location(self, share, export_location_uuid,
microversion=None): microversion=None):

View File

@ -90,7 +90,7 @@ class ShareTypesReadWriteTest(base.BaseTestCase):
self.skip_if_microversion_not_supported('2.0') self.skip_if_microversion_not_supported('2.0')
self._test_create_delete_share_type( self._test_create_delete_share_type(
'2.0', is_public, dhss, spec_snapshot_support, '2.0', is_public, dhss, spec_snapshot_support,
None, None, extra_specs) None, None, None, extra_specs)
@ddt.data(*unit_test_types.get_valid_type_create_data_2_24()) @ddt.data(*unit_test_types.get_valid_type_create_data_2_24())
@ddt.unpack @ddt.unpack
@ -101,7 +101,7 @@ class ShareTypesReadWriteTest(base.BaseTestCase):
self.skip_if_microversion_not_supported('2.24') self.skip_if_microversion_not_supported('2.24')
self._test_create_delete_share_type( self._test_create_delete_share_type(
'2.24', is_public, dhss, spec_snapshot_support, '2.24', is_public, dhss, spec_snapshot_support,
spec_create_share_from_snapshot, None, extra_specs) spec_create_share_from_snapshot, None, None, extra_specs)
@ddt.data(*unit_test_types.get_valid_type_create_data_2_27()) @ddt.data(*unit_test_types.get_valid_type_create_data_2_27())
@ddt.unpack @ddt.unpack
@ -114,12 +114,13 @@ class ShareTypesReadWriteTest(base.BaseTestCase):
self._test_create_delete_share_type( self._test_create_delete_share_type(
'2.27', is_public, dhss, spec_snapshot_support, '2.27', is_public, dhss, spec_snapshot_support,
spec_create_share_from_snapshot, spec_revert_to_snapshot_support, spec_create_share_from_snapshot, spec_revert_to_snapshot_support,
extra_specs) None, extra_specs)
def _test_create_delete_share_type(self, microversion, is_public, dhss, def _test_create_delete_share_type(self, microversion, is_public, dhss,
spec_snapshot_support, spec_snapshot_support,
spec_create_share_from_snapshot, spec_create_share_from_snapshot,
spec_revert_to_snapshot_support, spec_revert_to_snapshot_support,
spec_mount_snapshot_support,
extra_specs): extra_specs):
share_type_name = data_utils.rand_name('manilaclient_functional_test') share_type_name = data_utils.rand_name('manilaclient_functional_test')
@ -134,6 +135,7 @@ class ShareTypesReadWriteTest(base.BaseTestCase):
snapshot_support=spec_snapshot_support, snapshot_support=spec_snapshot_support,
create_share_from_snapshot=spec_create_share_from_snapshot, create_share_from_snapshot=spec_create_share_from_snapshot,
revert_to_snapshot=spec_revert_to_snapshot_support, revert_to_snapshot=spec_revert_to_snapshot_support,
mount_snapshot=spec_mount_snapshot_support,
is_public=is_public, is_public=is_public,
microversion=microversion, microversion=microversion,
extra_specs=extra_specs) extra_specs=extra_specs)
@ -183,6 +185,11 @@ class ShareTypesReadWriteTest(base.BaseTestCase):
('{} : {}'.format( ('{} : {}'.format(
'revert_to_snapshot_support', 'revert_to_snapshot_support',
spec_revert_to_snapshot_support)).strip()) spec_revert_to_snapshot_support)).strip())
if spec_mount_snapshot_support is not None:
expected_extra_specs.append(
('{} : {}'.format(
'mount_snapshot_support',
spec_mount_snapshot_support)).strip())
# Verify optional extra specs # Verify optional extra specs
optional_extra_specs = share_type['optional_extra_specs'] optional_extra_specs = share_type['optional_extra_specs']

View File

@ -0,0 +1,171 @@
# Copyright (c) 2017 Hitachi Data Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib import exceptions as tempest_lib_exc
import testtools
from manilaclient import config
from manilaclient.tests.functional import base
from manilaclient.tests.functional import utils
CONF = config.CONF
@testtools.skipUnless(CONF.run_snapshot_tests and
CONF.run_mount_snapshot_tests,
"Snapshots or mountable snapshots tests are disabled.")
@utils.skip_if_microversion_not_supported('2.32')
class SnapshotAccessReadBase(base.BaseTestCase):
protocol = None
@classmethod
def setUpClass(cls):
super(SnapshotAccessReadBase, cls).setUpClass()
if cls.protocol not in CONF.enable_protocols:
message = "%s tests are disabled." % cls.protocol
raise cls.skipException(message)
cls.access_types = CONF.access_types_mapping.get(
cls.protocol, '').split(' ')
if not cls.access_types:
raise cls.skipException("No access types were provided for %s "
"snapshot access tests." % cls.protocol)
cls.share = cls.create_share(share_protocol=cls.protocol,
public=True,
cleanup_in_class=True,
client=cls.get_user_client())
int_range = range(0, 10)
cls.access_to = {
'ip': ['99.88.77.%d' % i for i in int_range],
'user': ['foo_user_%d' % i for i in int_range],
'cert': ['tenant_%d.example.com' % i for i in int_range],
}
def _test_create_list_access_rule_for_snapshot(self, snapshot_id):
access = []
access_type = self.access_types[0]
for i in range(5):
access_ = self.user_client.snapshot_access_allow(
snapshot_id, access_type,
self.access_to[access_type][i])
access.append(access_)
return access
def test_create_list_access_rule_for_snapshot(self):
snapshot = self.create_snapshot(share=self.share['id'],
client=self.get_user_client(),
cleanup_in_class=False)
access = self._test_create_list_access_rule_for_snapshot(
snapshot['id'])
access_list = self.user_client.list_access(
snapshot['id'], is_snapshot=True)
for i in range(5):
self.assertIn(access[i]['id'],
[access_list[j]['id'] for j in range(5)])
self.assertIn(access[i]['access_type'],
[access_list[j]['access_type'] for j in range(5)])
self.assertIn(access[i]['access_to'],
[access_list[j]['access_to'] for j in range(5)])
self.assertIsNotNone(access_list[i]['access_type'])
self.assertIsNotNone(access_list[i]['access_to'])
def test_create_list_access_rule_for_snapshot_select_column(self):
snapshot = self.create_snapshot(share=self.share['id'],
client=self.get_user_client(),
cleanup_in_class=False)
self._test_create_list_access_rule_for_snapshot(snapshot['id'])
access_list = self.user_client.list_access(
snapshot['id'], columns="access_type,access_to", is_snapshot=True)
self.assertTrue(any(x['Access_Type'] is not None for x in access_list))
self.assertTrue(any(x['Access_To'] is not None for x in access_list))
def _create_delete_access_rule(self, snapshot_id, access_type, access_to):
if access_type not in self.access_types:
raise self.skipException(
"'%(access_type)s' access rules is disabled for protocol "
"'%(protocol)s'." % {"access_type": access_type,
"protocol": self.protocol})
access = self.user_client.snapshot_access_allow(
snapshot_id, access_type, access_to)
self.assertEqual(access_type, access.get('access_type'))
self.assertEqual(access_to.replace('\\\\', '\\'),
access.get('access_to'))
self.user_client.wait_for_access_rule_status(
snapshot_id, access['id'], is_snapshot=True)
self.user_client.snapshot_access_deny(snapshot_id, access['id'])
self.user_client.wait_for_access_rule_deletion(
snapshot_id, access['id'], is_snapshot=True)
self.assertRaises(tempest_lib_exc.NotFound,
self.user_client.get_access, snapshot_id,
access['id'], is_snapshot=True)
def test_create_delete_snapshot_ip_access_rule(self):
snapshot = self.create_snapshot(share=self.share['id'],
client=self.get_user_client(),
cleanup_in_class=False)
self._create_delete_access_rule(
snapshot['id'], 'ip', self.access_to['ip'][0])
def test_create_delete_snapshot_user_access_rule(self):
snapshot = self.create_snapshot(share=self.share['id'],
client=self.get_user_client(),
cleanup_in_class=False)
self._create_delete_access_rule(
snapshot['id'], 'user', CONF.username_for_user_rules)
def test_create_delete_snapshot_cert_access_rule(self):
snapshot = self.create_snapshot(share=self.share['id'],
client=self.get_user_client(),
cleanup_in_class=False)
self._create_delete_access_rule(
snapshot['id'], 'cert', self.access_to['cert'][0])
class NFSSnapshotAccessTest(SnapshotAccessReadBase):
protocol = 'nfs'
class CIFSSnapshotAccessTest(SnapshotAccessReadBase):
protocol = 'cifs'
class GlusterFSSnapshotAccessTest(SnapshotAccessReadBase):
protocol = 'glusterfs'
class HDFSSnapshotAccessTest(SnapshotAccessReadBase):
protocol = 'hdfs'
def load_tests(loader, tests, _):
result = []
for test_case in tests:
if type(test_case._tests[0]) is SnapshotAccessReadBase:
continue
result.append(test_case)
return loader.suiteClass(result)

View File

@ -0,0 +1,120 @@
# Copyright (c) 2017 Hitachi Data Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from oslo_utils import uuidutils
import testtools
from manilaclient import config
from manilaclient.tests.functional import base
from manilaclient.tests.functional import utils
CONF = config.CONF
@ddt.ddt
@testtools.skipUnless(CONF.run_snapshot_tests and
CONF.run_mount_snapshot_tests,
"Snapshots or mountable snapshots tests are disabled.")
@utils.skip_if_microversion_not_supported('2.32')
class SnapshotInstanceExportLocationReadWriteTest(base.BaseTestCase):
@classmethod
def setUpClass(cls):
super(SnapshotInstanceExportLocationReadWriteTest, cls).setUpClass()
cls.share = cls.create_share(
client=cls.get_user_client(),
cleanup_in_class=True)
cls.snapshot = cls.create_snapshot(share=cls.share['id'],
client=cls.get_user_client(),
cleanup_in_class=True)
def test_get_snapshot_instance_export_location(self):
client = self.admin_client
snapshot_instances = client.list_snapshot_instances(
self.snapshot['id'])
self.assertGreater(len(snapshot_instances), 0)
self.assertIn('ID', snapshot_instances[0])
self.assertTrue(uuidutils.is_uuid_like(
snapshot_instances[0]['ID']))
snapshot_instance_id = snapshot_instances[0]['ID']
export_locations = client.list_snapshot_instance_export_locations(
snapshot_instance_id)
el = client.get_snapshot_instance_export_location(
snapshot_instance_id, export_locations[0]['ID'])
expected_keys = ['path', 'id', 'is_admin_only',
'share_snapshot_instance_id', 'updated_at',
'created_at']
for key in expected_keys:
self.assertIn(key, el)
for key, key_el in (
('ID', 'id'), ('Path', 'path'),
('Is Admin only', 'is_admin_only')):
self.assertEqual(export_locations[0][key], el[key_el])
self.assertTrue(uuidutils.is_uuid_like(
el['share_snapshot_instance_id']))
self.assertTrue(uuidutils.is_uuid_like(el['id']))
self.assertIn(el['is_admin_only'], ('True', 'False'))
def test_list_snapshot_instance_export_locations(self):
client = self.admin_client
snapshot_instances = client.list_snapshot_instances(
self.snapshot['id'])
self.assertGreater(len(snapshot_instances), 0)
self.assertIn('ID', snapshot_instances[0])
self.assertTrue(uuidutils.is_uuid_like(snapshot_instances[0]['ID']))
snapshot_instance_id = snapshot_instances[0]['ID']
export_locations = client.list_snapshot_instance_export_locations(
snapshot_instance_id)
self.assertGreater(len(export_locations), 0)
expected_keys = ('ID', 'Path', 'Is Admin only')
for el in export_locations:
for key in expected_keys:
self.assertIn(key, el)
self.assertTrue(uuidutils.is_uuid_like(el['ID']))
def test_list_snapshot_instance_export_locations_with_columns(self):
client = self.admin_client
snapshot_instances = client.list_snapshot_instances(
self.snapshot['id'])
self.assertGreater(len(snapshot_instances), 0)
self.assertIn('ID', snapshot_instances[0])
self.assertTrue(uuidutils.is_uuid_like(snapshot_instances[0]['ID']))
snapshot_instance_id = snapshot_instances[0]['ID']
export_locations = client.list_snapshot_instance_export_locations(
snapshot_instance_id, columns='id,path')
self.assertGreater(len(export_locations), 0)
expected_keys = ('Id', 'Path')
unexpected_keys = ('Updated At', 'Created At', 'Is Admin only')
for el in export_locations:
for key in expected_keys:
self.assertIn(key, el)
for key in unexpected_keys:
self.assertNotIn(key, el)
self.assertTrue(uuidutils.is_uuid_like(el['Id']))

View File

@ -0,0 +1,93 @@
# Copyright (c) 2017 Hitachi Data Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from oslo_utils import uuidutils
import testtools
from manilaclient import config
from manilaclient.tests.functional import base
from manilaclient.tests.functional import utils
CONF = config.CONF
@ddt.ddt
@testtools.skipUnless(CONF.run_snapshot_tests and
CONF.run_mount_snapshot_tests,
"Snapshots or mountable snapshots tests are disabled.")
@utils.skip_if_microversion_not_supported('2.32')
class SnapshotExportLocationReadWriteTest(base.BaseTestCase):
@classmethod
def setUpClass(cls):
super(SnapshotExportLocationReadWriteTest, cls).setUpClass()
cls.share = cls.create_share(
client=cls.get_user_client(),
cleanup_in_class=True)
cls.snapshot = cls.create_snapshot(share=cls.share['id'],
client=cls.get_user_client(),
cleanup_in_class=True)
@ddt.data('admin', 'user')
def test_get_snapshot_export_location(self, role):
client = self.admin_client if role == 'admin' else self.user_client
export_locations = client.list_snapshot_export_locations(
self.snapshot['id'])
el = client.get_snapshot_export_location(
self.snapshot['id'], export_locations[0]['ID'])
expected_keys = ['path', 'id', 'updated_at', 'created_at']
if role == 'admin':
expected_keys.extend(['is_admin_only',
'share_snapshot_instance_id'])
self.assertTrue(uuidutils.is_uuid_like(
el['share_snapshot_instance_id']))
self.assertIn(el['is_admin_only'], ('True', 'False'))
self.assertTrue(uuidutils.is_uuid_like(el['id']))
for key in expected_keys:
self.assertIn(key, el)
@ddt.data('admin', 'user')
def test_list_snapshot_export_locations(self, role):
client = self.admin_client if role == 'admin' else self.user_client
export_locations = client.list_snapshot_export_locations(
self.snapshot['id'])
self.assertGreater(len(export_locations), 0)
expected_keys = ('ID', 'Path')
for el in export_locations:
for key in expected_keys:
self.assertIn(key, el)
self.assertTrue(uuidutils.is_uuid_like(el['ID']))
@ddt.data('admin', 'user')
def test_list_snapshot_export_locations_with_columns(self, role):
client = self.admin_client if role == 'admin' else self.user_client
export_locations = client.list_snapshot_export_locations(
self.snapshot['id'], columns='id,path')
self.assertGreater(len(export_locations), 0)
expected_keys = ('Id', 'Path')
unexpected_keys = ('Updated At', 'Created At')
for el in export_locations:
for key in expected_keys:
self.assertIn(key, el)
for key in unexpected_keys:
self.assertNotIn(key, el)
self.assertTrue(uuidutils.is_uuid_like(el['Id']))

View File

@ -58,6 +58,17 @@ def get_fake_export_location():
} }
def get_fake_snapshot_export_location():
return {
'uuid': 'foo_el_uuid',
'path': '/foo/el/path',
'share_snapshot_instance_id': 'foo_share_instance_id',
'is_admin_only': False,
'created_at': '2017-01-17T13:14:15Z',
'updated_at': '2017-01-17T14:15:16Z',
}
class FakeHTTPClient(fakes.FakeHTTPClient): class FakeHTTPClient(fakes.FakeHTTPClient):
def get_(self, **kw): def get_(self, **kw):
@ -259,6 +270,12 @@ class FakeHTTPClient(fakes.FakeHTTPClient):
assert body[action] is None assert body[action] is None
elif action in ('unmanage', ): elif action in ('unmanage', ):
assert body[action] is None assert body[action] is None
elif action in 'allow_access':
assert 'access_type' in body['allow_access']
assert 'access_to' in body['allow_access']
_body = {'snapshot_access': body['allow_access']}
elif action in 'deny_access':
assert 'access_id' in body['deny_access']
else: else:
raise AssertionError("Unexpected action: %s" % action) raise AssertionError("Unexpected action: %s" % action)
return (resp, {}, _body) return (resp, {}, _body)
@ -931,6 +948,34 @@ class FakeHTTPClient(fakes.FakeHTTPClient):
instances = {'snapshot_instance': self.fake_snapshot_instance} instances = {'snapshot_instance': self.fake_snapshot_instance}
return (200, {}, instances) return (200, {}, instances)
def get_snapshot_instances_1234_export_locations_fake_el_id(self, **kw):
return (200, {}, {'share_snapshot_export_location': {
'id': 'fake_id', 'path': '/fake_path'}})
def get_snapshots_1234_export_locations_fake_el_id(self, **kw):
return (200, {}, {'share_snapshot_export_location': {
'id': 'fake_id', 'path': '/fake_path'}})
def get_snapshot_instances_1234_export_locations(
self, **kw):
snapshot_export_location = {'share_snapshot_export_locations':
[get_fake_export_location()]}
return (200, {}, snapshot_export_location)
def get_snapshots_1234_export_locations(self):
snapshot_export_location = {'share_snapshot_export_locations':
[get_fake_export_location()]}
return (200, {}, snapshot_export_location)
def get_snapshots_1234_access_list(self, **kw):
access_list = {'snapshot_access_list': [{
'state': 'active',
'id': '1234',
'access_type': 'ip',
'access_to': '6.6.6.6'
}]}
return (200, {}, access_list)
def post_snapshot_instances_1234_action(self, body, **kw): def post_snapshot_instances_1234_action(self, body, **kw):
_body = None _body = None
resp = 202 resp = 202

View File

@ -0,0 +1,45 @@
# Copyright (c) 2017 Hitachi Data Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from manilaclient import extension
from manilaclient.tests.unit import utils
from manilaclient.tests.unit.v2 import fakes
from manilaclient.v2 import share_snapshot_export_locations
extensions = [
extension.Extension('share_snapshot_export_locations',
share_snapshot_export_locations),
]
cs = fakes.FakeClient(extensions=extensions)
class ShareSnapshotExportLocationsTest(utils.TestCase):
def test_list_snapshot(self):
snapshot_id = '1234'
cs.share_snapshot_export_locations.list(snapshot_id)
cs.assert_called(
'GET', '/snapshots/%s/export-locations' % snapshot_id)
def test_get_snapshot(self):
snapshot_id = '1234'
el_id = 'fake_el_id'
cs.share_snapshot_export_locations.get(el_id, snapshot_id)
cs.assert_called(
'GET',
('/snapshots/%(snapshot_id)s/export-locations/'
'%(el_id)s') % {
'snapshot_id': snapshot_id, 'el_id': el_id})

View File

@ -0,0 +1,48 @@
# Copyright (c) 2017 Hitachi Data Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from manilaclient import extension
from manilaclient.tests.unit import utils
from manilaclient.tests.unit.v2 import fakes
from manilaclient.v2 import share_snapshot_instance_export_locations
extensions = [
extension.Extension('share_snapshot_export_locations',
share_snapshot_instance_export_locations),
]
cs = fakes.FakeClient(extensions=extensions)
class ShareSnapshotInstanceExportLocationsTest(utils.TestCase):
def test_list_snapshot_instance(self):
snapshot_instance_id = '1234'
cs.share_snapshot_instance_export_locations.list(
snapshot_instance_id)
cs.assert_called(
'GET', '/snapshot-instances/%s/export-locations'
% snapshot_instance_id)
def test_get_snapshot_instance(self):
snapshot_instance_id = '1234'
el_id = 'fake_el_id'
cs.share_snapshot_instance_export_locations.get(
el_id, snapshot_instance_id)
cs.assert_called(
'GET',
('/snapshot-instances/%(snapshot_id)s/export-locations/'
'%(el_id)s') % {
'snapshot_id': snapshot_instance_id, 'el_id': el_id})

View File

@ -179,3 +179,38 @@ class ShareSnapshotsTest(utils.TestCase):
manager._action.assert_called_once_with("unmanage", snapshot) manager._action.assert_called_once_with("unmanage", snapshot)
self.assertEqual("fake", result) self.assertEqual("fake", result)
def test_allow_access(self):
snapshot = "fake_snapshot"
access_type = "fake_type"
access_to = "fake_to"
access = ("foo", {"snapshot_access": "fake"})
version = api_versions.APIVersion("2.32")
mock_microversion = mock.Mock(api_version=version)
manager = share_snapshots.ShareSnapshotManager(api=mock_microversion)
with mock.patch.object(manager, "_action",
mock.Mock(return_value=access)):
result = manager.allow(snapshot, access_type, access_to)
self.assertEqual("fake", result)
manager._action.assert_called_once_with(
"allow_access", snapshot,
{'access_type': access_type, 'access_to': access_to})
def test_deny_access(self):
snapshot = "fake_snapshot"
access_id = "fake_id"
version = api_versions.APIVersion("2.32")
mock_microversion = mock.Mock(api_version=version)
manager = share_snapshots.ShareSnapshotManager(api=mock_microversion)
with mock.patch.object(manager, "_action"):
manager.deny(snapshot, access_id)
manager._action.assert_called_once_with(
"deny_access", snapshot, {'access_id': access_id})
def test_access_list(self):
cs.share_snapshots.access_list(1234)
cs.assert_called('GET', '/snapshots/1234/access-list')

View File

@ -455,6 +455,7 @@ class ShellTest(test_utils.TestCase):
'snapshot_support': True, 'snapshot_support': True,
'create_share_from_snapshot_support': True, 'create_share_from_snapshot_support': True,
'revert_to_snapshot_support': False, 'revert_to_snapshot_support': False,
'mount_snapshot_support': False,
}, },
'share_type_access:is_public': public 'share_type_access:is_public': public
} }
@ -836,6 +837,7 @@ class ShellTest(test_utils.TestCase):
"snapshot_support": True, "snapshot_support": True,
"create_share_from_snapshot_support": True, "create_share_from_snapshot_support": True,
"revert_to_snapshot_support": False, "revert_to_snapshot_support": False,
"mount_snapshot_support": False,
} }
} }
} }
@ -884,6 +886,7 @@ class ShellTest(test_utils.TestCase):
"snapshot_support": expected_bool, "snapshot_support": expected_bool,
"create_share_from_snapshot_support": True, "create_share_from_snapshot_support": True,
"revert_to_snapshot_support": False, "revert_to_snapshot_support": False,
"mount_snapshot_support": False,
"replication_type": replication_type, "replication_type": replication_type,
} }
} }
@ -913,6 +916,7 @@ class ShellTest(test_utils.TestCase):
"snapshot_support": True, "snapshot_support": True,
"create_share_from_snapshot_support": expected_bool, "create_share_from_snapshot_support": expected_bool,
"revert_to_snapshot_support": False, "revert_to_snapshot_support": False,
"mount_snapshot_support": False,
} }
} }
} }
@ -973,6 +977,43 @@ class ShellTest(test_utils.TestCase):
'type-create test false --revert-to-snapshot-support ' + value, 'type-create test false --revert-to-snapshot-support ' + value,
) )
@ddt.unpack
@ddt.data(
*([{'expected_bool': True, 'text': v}
for v in ('true', 'True', '1', 'TRUE', 'tRuE')] +
[{'expected_bool': False, 'text': v}
for v in ('false', 'False', '0', 'FALSE', 'fAlSe')])
)
def test_type_create_with_mount_snapshot_support(
self, expected_bool, text):
expected = {
"share_type": {
"name": "test",
"share_type_access:is_public": True,
"extra_specs": {
"driver_handles_share_servers": False,
"snapshot_support": True,
"create_share_from_snapshot_support": True,
"revert_to_snapshot_support": False,
"mount_snapshot_support": expected_bool,
}
}
}
self.run_command('type-create test false --snapshot-support true '
'--revert-to-snapshot-support false '
'--mount-snapshot-support ' + text)
self.assert_called('POST', '/types', body=expected)
@ddt.data('fake', 'FFFalse', 'trueee')
def test_type_create_invalid_mount_snapshot_support_value(self, value):
self.assertRaises(
exceptions.CommandError,
self.run_command,
'type-create test false --mount-snapshot-support ' + value,
)
@ddt.data('--is-public', '--is_public') @ddt.data('--is-public', '--is_public')
def test_update(self, alias): def test_update(self, alias):
# basic rename with positional arguments # basic rename with positional arguments
@ -1514,6 +1555,60 @@ class ShellTest(test_utils.TestCase):
mock.ANY, mock.ANY,
['Id', 'Access_Type']) ['Id', 'Access_Type'])
@mock.patch.object(cliutils, 'print_list', mock.Mock())
def test_snapshot_access_list(self):
self.run_command("snapshot-access-list 1234")
self.assert_called('GET', '/snapshots/1234/access-list')
cliutils.print_list.assert_called_with(
mock.ANY, ['id', 'access_type', 'access_to', 'state'])
@mock.patch.object(cliutils, 'print_dict', mock.Mock())
def test_snapshot_access_allow(self):
self.run_command("snapshot-access-allow 1234 ip 1.1.1.1")
self.assert_called('POST', '/snapshots/1234/action')
cliutils.print_dict.assert_called_with(
{'access_type': 'ip', 'access_to': '1.1.1.1'})
def test_snapshot_access_deny(self):
self.run_command("snapshot-access-deny 1234 fake_id")
self.assert_called('POST', '/snapshots/1234/action')
@mock.patch.object(cliutils, 'print_list', mock.Mock())
def test_snapshot_export_location_list(self):
self.run_command('snapshot-export-location-list 1234')
self.assert_called(
'GET', '/snapshots/1234/export-locations')
@mock.patch.object(cliutils, 'print_list', mock.Mock())
def test_snapshot_instance_export_location_list(self):
self.run_command('snapshot-instance-export-location-list 1234')
self.assert_called(
'GET', '/snapshot-instances/1234/export-locations')
@mock.patch.object(cliutils, 'print_dict', mock.Mock())
def test_snapshot_instance_export_location_show(self):
self.run_command('snapshot-instance-export-location-show 1234 '
'fake_el_id')
self.assert_called(
'GET', '/snapshot-instances/1234/export-locations/fake_el_id')
cliutils.print_dict.assert_called_once_with(
{'path': '/fake_path', 'id': 'fake_id'})
@mock.patch.object(cliutils, 'print_dict', mock.Mock())
def test_snapshot_export_location_show(self):
self.run_command('snapshot-export-location-show 1234 fake_el_id')
self.assert_called('GET',
'/snapshots/1234/export-locations/fake_el_id')
cliutils.print_dict.assert_called_once_with(
{'path': '/fake_path', 'id': 'fake_id'})
@mock.patch.object(cliutils, 'print_list', mock.Mock()) @mock.patch.object(cliutils, 'print_list', mock.Mock())
def test_security_service_list(self): def test_security_service_list(self):
self.run_command('security-service-list') self.run_command('security-service-list')
@ -2350,7 +2445,7 @@ class ShellTest(test_utils.TestCase):
@mock.patch.object(shell_v2, '_find_share_snapshot', mock.Mock()) @mock.patch.object(shell_v2, '_find_share_snapshot', mock.Mock())
def test_snapshot_instance_list_for_snapshot(self): def test_snapshot_instance_list_for_snapshot(self):
fsnapshot = type('FakeSansphot', (object,), fsnapshot = type('FakeSnapshot', (object,),
{'id': 'fake-snapshot-id'}) {'id': 'fake-snapshot-id'})
shell_v2._find_share_snapshot.return_value = fsnapshot shell_v2._find_share_snapshot.return_value = fsnapshot
cmd = 'snapshot-instance-list --snapshot %s' cmd = 'snapshot-instance-list --snapshot %s'
@ -2361,7 +2456,10 @@ class ShellTest(test_utils.TestCase):
def test_snapshot_instance_show(self): def test_snapshot_instance_show(self):
self.run_command('snapshot-instance-show 1234') self.run_command('snapshot-instance-show 1234')
self.assert_called('GET', '/snapshot-instances/1234') self.assert_called_anytime('GET', '/snapshot-instances/1234',
clear_callstack=False)
self.assert_called_anytime('GET',
'/snapshot-instances/1234/export-locations')
def test_snapshot_instance_reset_state(self): def test_snapshot_instance_reset_state(self):
self.run_command('snapshot-instance-reset-state 1234') self.run_command('snapshot-instance-reset-state 1234')

View File

@ -188,11 +188,12 @@ class TypesTest(utils.TestCase):
def _add_standard_extra_specs_to_dict(self, extra_specs, def _add_standard_extra_specs_to_dict(self, extra_specs,
create_from_snapshot=None, create_from_snapshot=None,
revert_to_snapshot=None): revert_to_snapshot=None,
mount_snapshot=None):
# Short-circuit checks to allow for extra specs to be (and remain) None # Short-circuit checks to allow for extra specs to be (and remain) None
if all(spec is None for spec in [ if all(spec is None for spec in [
create_from_snapshot, revert_to_snapshot]): create_from_snapshot, revert_to_snapshot, mount_snapshot]):
return extra_specs return extra_specs
extra_specs = extra_specs or {} extra_specs = extra_specs or {}
@ -203,6 +204,9 @@ class TypesTest(utils.TestCase):
if revert_to_snapshot is not None: if revert_to_snapshot is not None:
extra_specs['revert_to_snapshot_support'] = ( extra_specs['revert_to_snapshot_support'] = (
revert_to_snapshot) revert_to_snapshot)
if mount_snapshot is not None:
extra_specs['mount_snapshot_support'] = (
mount_snapshot)
return extra_specs return extra_specs

View File

@ -39,6 +39,8 @@ from manilaclient.v2 import share_instances
from manilaclient.v2 import share_networks from manilaclient.v2 import share_networks
from manilaclient.v2 import share_replicas from manilaclient.v2 import share_replicas
from manilaclient.v2 import share_servers from manilaclient.v2 import share_servers
from manilaclient.v2 import share_snapshot_export_locations
from manilaclient.v2 import share_snapshot_instance_export_locations
from manilaclient.v2 import share_snapshot_instances from manilaclient.v2 import share_snapshot_instances
from manilaclient.v2 import share_snapshots from manilaclient.v2 import share_snapshots
from manilaclient.v2 import share_type_access from manilaclient.v2 import share_type_access
@ -234,6 +236,12 @@ class Client(object):
self.share_snapshots = share_snapshots.ShareSnapshotManager(self) self.share_snapshots = share_snapshots.ShareSnapshotManager(self)
self.share_snapshot_instances = ( self.share_snapshot_instances = (
share_snapshot_instances.ShareSnapshotInstanceManager(self)) share_snapshot_instances.ShareSnapshotInstanceManager(self))
self.share_snapshot_export_locations = (
share_snapshot_export_locations.ShareSnapshotExportLocationManager(
self))
self.share_snapshot_instance_export_locations = (
share_snapshot_instance_export_locations.
ShareSnapshotInstanceExportLocationManager(self))
self.share_types = share_types.ShareTypeManager(self) self.share_types = share_types.ShareTypeManager(self)
self.share_type_access = share_type_access.ShareTypeAccessManager(self) self.share_type_access = share_type_access.ShareTypeAccessManager(self)

View File

@ -0,0 +1,55 @@
# Copyright (c) 2017 Hitachi Data Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
try:
from urllib import urlencode # noqa
except ImportError:
from urllib.parse import urlencode # noqa
from manilaclient import api_versions
from manilaclient import base
from manilaclient.common.apiclient import base as common_base
class ShareSnapshotExportLocation(common_base.Resource):
"""Represent an export location snapshot of a snapshot."""
def __repr__(self):
return "<ShareSnapshotExportLocation: %s>" % self.id
def __getitem__(self, key):
return self._info[key]
class ShareSnapshotExportLocationManager(base.ManagerWithFind):
"""Manage :class:`ShareSnapshotExportLocation` resources."""
resource_class = ShareSnapshotExportLocation
@api_versions.wraps("2.32")
def list(self, snapshot=None):
return self._list("/snapshots/%s/export-locations" %
common_base.getid(snapshot),
'share_snapshot_export_locations')
@api_versions.wraps("2.32")
def get(self, export_location, snapshot=None):
params = {
"snapshot_id": common_base.getid(snapshot),
"export_location_id": common_base.getid(export_location),
}
return self._get("/snapshots/%(snapshot_id)s/export-locations/"
"%(export_location_id)s" % params,
"share_snapshot_export_location")

View File

@ -0,0 +1,55 @@
# Copyright (c) 2017 Hitachi Data Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
try:
from urllib import urlencode # noqa
except ImportError:
from urllib.parse import urlencode # noqa
from manilaclient import api_versions
from manilaclient import base
from manilaclient.common.apiclient import base as common_base
class ShareSnapshotInstanceExportLocation(common_base.Resource):
"""Represent an export location from a snapshot instance."""
def __repr__(self):
return "<ShareSnapshotInstanceExportLocation: %s>" % self.id
def __getitem__(self, key):
return self._info[key]
class ShareSnapshotInstanceExportLocationManager(base.ManagerWithFind):
"""Manage :class:`ShareSnapshotInstanceExportLocation` resources."""
resource_class = ShareSnapshotInstanceExportLocation
@api_versions.wraps("2.32")
def list(self, snapshot_instance=None):
return self._list("/snapshot-instances/%s/export-locations" %
common_base.getid(snapshot_instance),
'share_snapshot_export_locations')
@api_versions.wraps("2.32")
def get(self, export_location, snapshot_instance=None):
params = {
"snapshot_instance_id": common_base.getid(snapshot_instance),
"export_location_id": common_base.getid(export_location),
}
return self._get("/snapshot-instances/%(snapshot_instance_id)s/"
"export-locations/%(export_location_id)s" % params,
"share_snapshot_export_location")

View File

@ -51,6 +51,17 @@ class ShareSnapshot(common_base.Resource):
"""Unmanage this snapshot.""" """Unmanage this snapshot."""
self.manager.unmanage(self) self.manager.unmanage(self)
def allow(self, access_type, access_to):
"""Allow access to a share snapshot."""
return self.manager.allow(self, access_type, access_to)
def deny(self, id):
"""Denies access to a share snapshot."""
return self.manager.deny(self, id)
def access_list(self):
return self.manager.access_list(self)
class ShareSnapshotManager(base.ManagerWithFind): class ShareSnapshotManager(base.ManagerWithFind):
"""Manage :class:`ShareSnapshot` resources.""" """Manage :class:`ShareSnapshot` resources."""
@ -202,8 +213,38 @@ class ShareSnapshotManager(base.ManagerWithFind):
def reset_state(self, snapshot, state): def reset_state(self, snapshot, state):
return self._do_reset_state(snapshot, state, "reset_status") return self._do_reset_state(snapshot, state, "reset_status")
def _do_allow(self, snapshot, access_type, access_to):
access_params = {
'access_type': access_type,
'access_to': access_to,
}
return self._action('allow_access', snapshot,
access_params)[1]['snapshot_access']
@api_versions.wraps("2.32")
def allow(self, snapshot, access_type, access_to):
return self._do_allow(snapshot, access_type, access_to)
def _do_deny(self, snapshot, id):
return self._action('deny_access', snapshot, {'access_id': id})
@api_versions.wraps("2.32")
def deny(self, snapshot, id):
return self._do_deny(snapshot, id)
def _do_access_list(self, snapshot):
snapshot_id = common_base.getid(snapshot)
access_list = self._list("/snapshots/%s/access-list" % snapshot_id,
'snapshot_access_list')
return access_list
@api_versions.wraps("2.32")
def access_list(self, snapshot):
return self._do_access_list(snapshot)
def _action(self, action, snapshot, info=None, **kwargs): def _action(self, action, snapshot, info=None, **kwargs):
"""Perform a snapshot 'action'.""" """Perform a snapshot 'action'."""
body = {action: info} body = {action: info}
self.run_hooks('modify_body_for_action', body, **kwargs) self.run_hooks('modify_body_for_action', body, **kwargs)
url = '/snapshots/%s/action' % common_base.getid(snapshot) url = '/snapshots/%s/action' % common_base.getid(snapshot)

View File

@ -225,6 +225,12 @@ def _find_share_snapshot(cs, snapshot):
def _print_share_snapshot(cs, snapshot): def _print_share_snapshot(cs, snapshot):
info = snapshot._info.copy() info = snapshot._info.copy()
info.pop('links', None) info.pop('links', None)
if info.get('export_locations'):
info['export_locations'] = (
_transform_export_locations_to_string_view(
info['export_locations']))
cliutils.print_dict(info) cliutils.print_dict(info)
@ -1198,6 +1204,27 @@ def do_access_allow(cs, args):
cliutils.print_dict(access) cliutils.print_dict(access)
@api_versions.wraps("2.32")
@cliutils.arg(
'snapshot',
metavar='<snapshot>',
help='Name or ID of the share snapshot to allow access to.')
@cliutils.arg(
'access_type',
metavar='<access_type>',
help='Access rule type (only "ip", "user"(user or group), "cert" or '
'"cephx" are supported).')
@cliutils.arg(
'access_to',
metavar='<access_to>',
help='Value that defines access.')
def do_snapshot_access_allow(cs, args):
"""Allow read only access to a snapshot."""
share_snapshot = _find_share_snapshot(cs, args.snapshot)
access = share_snapshot.allow(args.access_type, args.access_to)
cliutils.print_dict(access)
@cliutils.arg( @cliutils.arg(
'share', 'share',
metavar='<share>', metavar='<share>',
@ -1212,6 +1239,34 @@ def do_access_deny(cs, args):
share.deny(args.id) share.deny(args.id)
@api_versions.wraps("2.32")
@cliutils.arg(
'snapshot',
metavar='<snapshot>',
help='Name or ID of the share snapshot to deny access to.')
@cliutils.arg(
'id',
metavar='<id>',
nargs='+',
help='ID(s) of the access rule(s) to be deleted.')
def do_snapshot_access_deny(cs, args):
"""Deny access to a snapshot."""
failure_count = 0
snapshot = _find_share_snapshot(cs, args.snapshot)
for access_id in args.id:
try:
snapshot.deny(access_id)
except Exception as e:
failure_count += 1
print("Failed to remove rule %(access)s: %(reason)s."
% {'access': access_id, 'reason': e},
file=sys.stderr)
if failure_count == len(args.id):
raise exceptions.CommandError("Unable to delete any of the specified "
"snapshot rules.")
@api_versions.wraps("1.0", "2.20") @api_versions.wraps("1.0", "2.20")
@cliutils.arg( @cliutils.arg(
'share', 'share',
@ -1265,6 +1320,30 @@ def do_access_list(cs, args):
cliutils.print_list(access_list, list_of_keys) cliutils.print_list(access_list, list_of_keys)
@api_versions.wraps("2.32")
@cliutils.arg(
'snapshot',
metavar='<snapshot>',
help='Name or ID of the share snapshot to list access of.')
@cliutils.arg(
'--columns',
metavar='<columns>',
type=str,
default=None,
help='Comma separated list of columns to be displayed '
'e.g. --columns "access_type,access_to"')
def do_snapshot_access_list(cs, args):
"""Show access list for a snapshot."""
if args.columns is not None:
list_of_keys = _split_columns(columns=args.columns)
else:
list_of_keys = ['id', 'access_type', 'access_to', 'state']
snapshot = _find_share_snapshot(cs, args.snapshot)
access_list = snapshot.access_list()
cliutils.print_list(access_list, list_of_keys)
@cliutils.arg( @cliutils.arg(
'--all-tenants', '--all-tenants',
dest='all_tenants', dest='all_tenants',
@ -1727,9 +1806,105 @@ def do_snapshot_list(cs, args):
def do_snapshot_show(cs, args): def do_snapshot_show(cs, args):
"""Show details about a snapshot.""" """Show details about a snapshot."""
snapshot = _find_share_snapshot(cs, args.snapshot) snapshot = _find_share_snapshot(cs, args.snapshot)
export_locations = cs.share_snapshot_export_locations.list(
snapshot=snapshot)
snapshot._info['export_locations'] = export_locations
_print_share_snapshot(cs, snapshot) _print_share_snapshot(cs, snapshot)
@api_versions.wraps("2.32")
@cliutils.arg(
'snapshot',
metavar='<snapshot>',
help='Name or ID of the snapshot.')
@cliutils.arg(
'--columns',
metavar='<columns>',
type=str,
default=None,
help='Comma separated list of columns to be displayed '
'e.g. --columns "id,path"')
def do_snapshot_export_location_list(cs, args):
"""List export locations of a given snapshot."""
if args.columns is not None:
list_of_keys = _split_columns(columns=args.columns)
else:
list_of_keys = [
'ID',
'Path',
]
snapshot = _find_share_snapshot(cs, args.snapshot)
export_locations = cs.share_snapshot_export_locations.list(
snapshot)
cliutils.print_list(export_locations, list_of_keys)
@api_versions.wraps("2.32")
@cliutils.arg(
'instance',
metavar='<instance>',
help='Name or ID of the snapshot instance.')
@cliutils.arg(
'--columns',
metavar='<columns>',
type=str,
default=None,
help='Comma separated list of columns to be displayed '
'e.g. --columns "id,path,is_admin_only"')
def do_snapshot_instance_export_location_list(cs, args):
"""List export locations of a given snapshot instance."""
if args.columns is not None:
list_of_keys = _split_columns(columns=args.columns)
else:
list_of_keys = [
'ID',
'Path',
'Is Admin only',
]
instance = _find_share_snapshot_instance(cs, args.instance)
export_locations = cs.share_snapshot_instance_export_locations.list(
instance)
cliutils.print_list(export_locations, list_of_keys)
@api_versions.wraps("2.32")
@cliutils.arg(
'snapshot',
metavar='<snapshot>',
help='Name or ID of the snapshot.')
@cliutils.arg(
'export_location',
metavar='<export_location>',
help='ID of the share snapshot export location.')
def do_snapshot_export_location_show(cs, args):
"""Show export location of the share snapshot."""
snapshot = _find_share_snapshot(cs, args.snapshot)
export_location = cs.share_snapshot_export_locations.get(
args.export_location, snapshot)
view_data = export_location._info.copy()
cliutils.print_dict(view_data)
@api_versions.wraps("2.32")
@cliutils.arg(
'snapshot_instance',
metavar='<snapshot_instance>',
help='ID of the share snapshot instance.')
@cliutils.arg(
'export_location',
metavar='<export_location>',
help='ID of the share snapshot instance export location.')
def do_snapshot_instance_export_location_show(cs, args):
"""Show export location of the share instance snapshot."""
snapshot_instance = _find_share_snapshot_instance(cs,
args.snapshot_instance)
export_location = cs.share_snapshot_instance_export_locations.get(
args.export_location, snapshot_instance)
view_data = export_location._info.copy()
cliutils.print_dict(view_data)
@cliutils.arg( @cliutils.arg(
'share', 'share',
metavar='<share>', metavar='<share>',
@ -1944,7 +2119,10 @@ def do_snapshot_instance_show(cs, args):
"""Show details about a share snapshot instance.""" """Show details about a share snapshot instance."""
snapshot_instance = _find_share_snapshot_instance( snapshot_instance = _find_share_snapshot_instance(
cs, args.snapshot_instance) cs, args.snapshot_instance)
cliutils.print_dict(snapshot_instance._info) export_locations = (
cs.share_snapshot_instance_export_locations.list(snapshot_instance))
snapshot_instance._info['export_locations'] = export_locations
_print_share_snapshot(cs, snapshot_instance)
@cliutils.arg( @cliutils.arg(
@ -3268,6 +3446,13 @@ def do_extra_specs_list(cs, args):
action='single_alias', action='single_alias',
help="Boolean extra spec used for filtering of back ends by their " help="Boolean extra spec used for filtering of back ends by their "
"capability to revert shares to snapshots. (Default is False).") "capability to revert shares to snapshots. (Default is False).")
@cliutils.arg(
'--mount_snapshot_support',
'--mount-snapshot-support',
metavar='<mount_snapshot_support>',
action='single_alias',
help="Boolean extra spec used for filtering of back ends by their "
"capability to mount share snapshots. (Default is False).")
@cliutils.arg( @cliutils.arg(
'--extra-specs', '--extra-specs',
'--extra_specs', # alias '--extra_specs', # alias
@ -3312,6 +3497,7 @@ def do_type_create(cs, args):
'snapshot_support', 'snapshot_support',
'create_share_from_snapshot_support', 'create_share_from_snapshot_support',
'revert_to_snapshot_support', 'revert_to_snapshot_support',
'mount_snapshot_support'
) )
for key in boolean_keys: for key in boolean_keys:
value = getattr(args, key) value = getattr(args, key)

View File

@ -0,0 +1,3 @@
---
features:
- Added support for the mountable snapshots feature to manila client.