[OSC] Implement the second part of share snapshots commands

In this patch we add openstack commands for:
share snapshot adopt
share snapshot abandon
share snapshot export location list
share snapshot export location show
share snapshot access create
share snapshot access delete
share snapshot access list

Partially-implements bp openstack-client-support

Change-Id: I6d573655b7b28e0378df9917b597875acb12cca2
This commit is contained in:
Maari Tamm 2020-08-16 20:11:48 +00:00
parent bf3e7cb716
commit d832baa312
4 changed files with 842 additions and 0 deletions

View File

@ -425,3 +425,301 @@ class ListShareSnapshot(command.Lister):
return (columns,
(utils.get_item_properties(s, columns) for s in snapshots))
class AdoptShareSnapshot(command.ShowOne):
"""Adopt a share snapshot not handled by Manila (Admin only)."""
_description = _("Adopt a share snapshot")
def get_parser(self, prog_name):
parser = super(AdoptShareSnapshot, self).get_parser(prog_name)
parser.add_argument(
"share",
metavar="<share>",
help=_("Name or ID of the share that owns the snapshot "
"to be adopted.")
)
parser.add_argument(
"provider_location",
metavar="<provider-location>",
help=_("Provider location of the snapshot on the backend.")
)
parser.add_argument(
"--name",
metavar="<name>",
default=None,
help=_("Optional snapshot name (Default=None).")
)
parser.add_argument(
"--description",
metavar="<description>",
default=None,
help=_("Optional snapshot description (Default=None).")
)
parser.add_argument(
"--driver-option",
metavar="<key=value>",
default={},
action=parseractions.KeyValueAction,
help=_(
"Set driver options as key=value pairs."
"(repeat option to set multiple key=value pairs)")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
share = utils.find_resource(share_client.shares,
parsed_args.share)
snapshot = share_client.share_snapshots.manage(
share=share,
provider_location=parsed_args.provider_location,
driver_options=parsed_args.driver_option,
name=parsed_args.name,
description=parsed_args.description
)
snapshot._info.pop('links', None)
return self.dict2columns(snapshot._info)
class AbandonShareSnapshot(command.Command):
"""Abandon one or more share snapshots (Admin only)."""
_description = _("Abandon share snapshot(s)")
def get_parser(self, prog_name):
parser = super(AbandonShareSnapshot, self).get_parser(prog_name)
parser.add_argument(
"snapshot",
metavar="<snapshot>",
nargs='+',
help=_("Name or ID of the snapshot(s) to be abandoned.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
result = 0
for snapshot in parsed_args.snapshot:
snapshot_obj = utils.find_resource(
share_client.share_snapshots,
snapshot)
try:
share_client.share_snapshots.unmanage(snapshot_obj)
except Exception as e:
result += 1
LOG.error(_(
"Failed to abandon share snapshot with "
"name or ID '%(snapshot)s': %(e)s"),
{'snapshot': snapshot, 'e': e})
if result > 0:
total = len(parsed_args.snapshot)
msg = (_("%(result)s of %(total)s snapshots failed "
"to abandon.") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
class ShareSnapshotAccessAllow(command.ShowOne):
"""Allow read only access to a snapshot."""
_description = _("Allow access to a snapshot")
def get_parser(self, prog_name):
parser = super(ShareSnapshotAccessAllow, self).get_parser(prog_name)
parser.add_argument(
"snapshot",
metavar="<snapshot>",
help=_("Name or ID of the snapshot")
)
parser.add_argument(
'access_type',
metavar="<access_type>",
help=_('Access rule type (only "ip", "user" (user or group), '
'"cert" or "cephx" are supported).')
)
parser.add_argument(
'access_to',
metavar="<access_to>",
help=_('Value that defines access.')
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
snapshot_obj = utils.find_resource(
share_client.share_snapshots,
parsed_args.snapshot)
try:
snapshot_access = share_client.share_snapshots.allow(
snapshot=snapshot_obj,
access_type=parsed_args.access_type,
access_to=parsed_args.access_to
)
return self.dict2columns(snapshot_access)
except Exception as e:
raise exceptions.CommandError(
"Failed to create access to share snapshot "
"'%s': %s" % (snapshot_obj, e))
class ShareSnapshotAccessDeny(command.Command):
"""Delete access to a snapshot"""
_description = _(
"Delete access to a snapshot")
def get_parser(self, prog_name):
parser = super(ShareSnapshotAccessDeny, self).get_parser(prog_name)
parser.add_argument(
"snapshot",
metavar="<snapshot>",
help=_("Name or ID of the share snapshot to deny access to.")
)
parser.add_argument(
"id",
metavar="<id>",
nargs="+",
help=_("ID(s) of the access rule(s) to be deleted.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
result = 0
snapshot_obj = utils.find_resource(
share_client.share_snapshots,
parsed_args.snapshot)
for access_id in parsed_args.id:
try:
share_client.share_snapshots.deny(
snapshot=snapshot_obj,
id=access_id
)
except Exception as e:
result += 1
LOG.error(_(
"Failed to delete access to share snapshot "
"for an access rule with ID '%(access)s': %(e)s"),
{'access': access_id, 'e': e})
if result > 0:
total = len(parsed_args.id)
msg = (_("Failed to delete access to a share snapshot for "
"%(result)s out of %(total)s access rule ID's ")
% {'result': result, 'total': total})
raise exceptions.CommandError(msg)
class ShareSnapshotAccessList(command.Lister):
"""Show access list for a snapshot"""
_description = _(
"Show access list for a snapshot")
def get_parser(self, prog_name):
parser = super(ShareSnapshotAccessList, self).get_parser(prog_name)
parser.add_argument(
"snapshot",
metavar="<snapshot>",
help=_("Name or ID of the share snapshot to show access list for.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
snapshot_obj = utils.find_resource(
share_client.share_snapshots,
parsed_args.snapshot)
access_rules = share_client.share_snapshots.access_list(
snapshot_obj)
columns = [
"id",
"access_type",
"access_to",
"state"
]
return (columns,
(utils.get_item_properties(s, columns) for s in access_rules))
class ShareSnapshotListExportLocation(command.Lister):
"""List export locations of a given snapshot"""
_description = _(
"List export locations of a given snapshot")
def get_parser(self, prog_name):
parser = super(ShareSnapshotListExportLocation, self).get_parser(
prog_name)
parser.add_argument(
"snapshot",
metavar="<snapshot>",
help=_("Name or ID of the share snapshot.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
snapshot_obj = utils.find_resource(
share_client.share_snapshots,
parsed_args.snapshot)
export_locations = share_client.share_snapshot_export_locations.list(
snapshot=snapshot_obj)
columns = ["id", "path"]
return (
columns,
(utils.get_item_properties(s, columns) for s in export_locations))
class ShareSnapshotShowExportLocation(command.ShowOne):
"""Show export location of the share snapshot"""
_description = _(
"Show export location of the share snapshot")
def get_parser(self, prog_name):
parser = super(ShareSnapshotShowExportLocation, self).get_parser(
prog_name)
parser.add_argument(
"snapshot",
metavar="<snapshot>",
help=_("Name or ID of the share snapshot.")
)
parser.add_argument(
"export_location",
metavar="<export-location>",
help=_("ID of the share snapshot export location.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
snapshot_obj = utils.find_resource(
share_client.share_snapshots,
parsed_args.snapshot)
export_location = share_client.share_snapshot_export_locations.get(
export_location=parsed_args.export_location,
snapshot=snapshot_obj)
return self.dict2columns(export_location._info)

View File

@ -435,3 +435,101 @@ class FakeShareSnapshot(object):
FakeShareSnapshot.create_one_snapshot(attrs))
return share_snapshots
class FakeSnapshotAccessRule(object):
"""Fake one or more snapshot access rules"""
@staticmethod
def create_one_access_rule(attrs={}):
"""Create a fake snapshot access rule
:param Dictionary attrs:
A dictionary with all attributes
:return:
A FakeResource object, with project_id, resource and so on
"""
snapshot_access_rule = {
'access_to': 'demo',
'access_type': 'user',
'id': 'access_rule-id-' + uuid.uuid4().hex,
'state': 'queued_to_apply'
}
snapshot_access_rule.update(attrs)
snapshot_access_rule = osc_fakes.FakeResource(info=copy.deepcopy(
snapshot_access_rule),
loaded=True)
return snapshot_access_rule
@staticmethod
def create_access_rules(attrs={}, count=2):
"""Create multiple fake snapshots.
:param Dictionary attrs:
A dictionary with all attributes
:param Integer count:
The number of share types to be faked
:return:
A list of FakeResource objects
"""
access_rules = []
for n in range(0, count):
access_rules.append(
FakeSnapshotAccessRule.create_one_access_rule(attrs))
return access_rules
class FakeSnapshotExportLocation(object):
"""Fake one or more export locations"""
@staticmethod
def create_one_export_location(attrs=None):
"""Create a fake snapshot export location
:param Dictionary attrs:
A dictionary with all attributes
:return:
A FakeResource object, with project_id, resource and so on
"""
attrs = attrs or {}
snapshot_export_location_info = {
"created_at": 'time-' + uuid.uuid4().hex,
"id": "id-" + uuid.uuid4().hex,
"is_admin_only": False,
"links": [],
"path": "/path/to/fake/snapshot/snapshot",
"share_snapshot_instance_id": 'instance-id' + uuid.uuid4().hex,
"updated_at": 'time-' + uuid.uuid4().hex,
}
snapshot_export_location_info.update(attrs)
snapshot_export_location = osc_fakes.FakeResource(info=copy.deepcopy(
snapshot_export_location_info),
loaded=True)
return snapshot_export_location
@staticmethod
def create_export_locations(attrs={}, count=2):
"""Create multiple fake export locations.
:param Dictionary attrs:
A dictionary with all attributes
:param Integer count:
The number of share types to be faked
:return:
A list of FakeResource objects
"""
export_locations = []
for n in range(0, count):
export_locations.append(
FakeSnapshotExportLocation.create_one_export_location(
attrs))
return export_locations

View File

@ -55,6 +55,10 @@ class TestShareSnapshot(manila_fakes.TestShare):
self.app.client_manager.share.api_version = api_versions.APIVersion(
"2.51")
self.export_locations_mock = (
self.app.client_manager.share.share_snapshot_export_locations)
self.export_locations_mock.reset_mock()
class TestShareSnapshotCreate(TestShareSnapshot):
@ -630,3 +634,438 @@ class TestShareSnapshotList(TestShareSnapshot):
self.assertEqual(COLUMNS, columns)
self.assertEqual(list(values), list(data))
class TestShareSnapshotAdopt(TestShareSnapshot):
def setUp(self):
super(TestShareSnapshotAdopt, self).setUp()
self.share = manila_fakes.FakeShare.create_one_share()
self.shares_mock.get.return_value = self.share
self.share_snapshot = (
manila_fakes.FakeShareSnapshot.create_one_snapshot())
self.export_location = (
manila_fakes.FakeShareExportLocation.create_one_export_location())
self.snapshots_mock.manage.return_value = self.share_snapshot
self.cmd = osc_share_snapshots.AdoptShareSnapshot(self.app, None)
self.data = tuple(self.share_snapshot._info.values())
self.columns = tuple(self.share_snapshot._info.keys())
def test_share_snapshot_adopt_missing_args(self):
arglist = []
verifylist = []
self.assertRaises(osc_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_snapshot_adopt(self):
arglist = [
self.share.id,
self.export_location.fake_path
]
verifylist = [
('share', self.share.id),
('provider_location', self.export_location.fake_path)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.snapshots_mock.manage.assert_called_with(
share=self.share,
provider_location=self.export_location.fake_path,
driver_options={},
name=None,
description=None
)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
def test_snapshot_adopt_name(self):
name = 'name-' + uuid.uuid4().hex
arglist = [
self.share.id,
self.export_location.fake_path,
'--name', name,
]
verifylist = [
('share', self.share.id),
('provider_location', self.export_location.fake_path),
('name', name)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.snapshots_mock.manage.assert_called_with(
share=self.share,
provider_location=self.export_location.fake_path,
driver_options={},
name=name,
description=None
)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
def test_snapshot_adopt_driver_option(self):
arglist = [
self.share.id,
self.export_location.fake_path,
'--driver-option', 'key1=value1',
'--driver-option', 'key2=value2'
]
verifylist = [
('share', self.share.id),
('provider_location', self.export_location.fake_path),
('driver_option', {'key1': 'value1', 'key2': 'value2'})
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.snapshots_mock.manage.assert_called_with(
share=self.share,
provider_location=self.export_location.fake_path,
driver_options={
'key1': 'value1',
'key2': 'value2'
},
name=None,
description=None
)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
class TestShareSnapshotAbandon(TestShareSnapshot):
def setUp(self):
super(TestShareSnapshotAbandon, self).setUp()
self.share_snapshot = (
manila_fakes.FakeShareSnapshot.create_one_snapshot())
self.snapshots_mock.get.return_value = self.share_snapshot
self.cmd = osc_share_snapshots.AbandonShareSnapshot(self.app, None)
def test_share_snapshot_abandon_missing_args(self):
arglist = []
verifylist = []
self.assertRaises(osc_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_share_snapshot_abandon(self):
arglist = [
self.share_snapshot.id
]
verifylist = [
('snapshot', [self.share_snapshot.id])
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.snapshots_mock.unmanage.assert_called_with(self.share_snapshot)
self.assertIsNone(result)
def test_share_snapshot_abandon_multiple(self):
share_snapshots = (
manila_fakes.FakeShareSnapshot.create_share_snapshots(
count=2))
arglist = [
share_snapshots[0].id,
share_snapshots[1].id
]
verifylist = [
('snapshot', [share_snapshots[0].id, share_snapshots[1].id])
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.assertEqual(self.snapshots_mock.unmanage.call_count,
len(share_snapshots))
self.assertIsNone(result)
class TestShareSnapshotAccessAllow(TestShareSnapshot):
def setUp(self):
super(TestShareSnapshotAccessAllow, self).setUp()
self.share_snapshot = (
manila_fakes.FakeShareSnapshot.create_one_snapshot())
self.snapshots_mock.get.return_value = self.share_snapshot
self.access_rule = (
manila_fakes.FakeSnapshotAccessRule.create_one_access_rule())
self.snapshots_mock.allow.return_value = self.access_rule._info
self.cmd = osc_share_snapshots.ShareSnapshotAccessAllow(
self.app, None)
def test_share_snapshot_access_allow(self):
arglist = [
self.share_snapshot.id,
'user',
'demo'
]
verifylist = [
('snapshot', self.share_snapshot.id),
('access_type', 'user'),
('access_to', 'demo')
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.snapshots_mock.allow.assert_called_with(
snapshot=self.share_snapshot,
access_type='user',
access_to='demo'
)
self.assertEqual(tuple(self.access_rule._info.keys()), columns)
self.assertCountEqual(self.access_rule._info.values(), data)
def test_share_snapshot_access_allow_exception(self):
arglist = [
self.share_snapshot.id,
'user',
'demo'
]
verifylist = [
('snapshot', self.share_snapshot.id),
('access_type', 'user'),
('access_to', 'demo')
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.snapshots_mock.allow.side_effect = exceptions.CommandError()
self.assertRaises(exceptions.CommandError,
self.cmd.take_action,
parsed_args)
class TestShareSnapshotAccessDeny(TestShareSnapshot):
def setUp(self):
super(TestShareSnapshotAccessDeny, self).setUp()
self.share_snapshot = (
manila_fakes.FakeShareSnapshot.create_one_snapshot())
self.snapshots_mock.get.return_value = self.share_snapshot
self.access_rule = (
manila_fakes.FakeSnapshotAccessRule.create_one_access_rule())
self.cmd = osc_share_snapshots.ShareSnapshotAccessDeny(
self.app, None)
def test_share_snapshot_access_deny(self):
arglist = [
self.share_snapshot.id,
self.access_rule.id,
]
verifylist = [
('snapshot', self.share_snapshot.id),
('id', [self.access_rule.id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.snapshots_mock.deny.assert_called_with(
snapshot=self.share_snapshot,
id=self.access_rule.id
)
self.assertIsNone(result)
def test_share_snapshot_access_deny_multiple(self):
access_rules = (
manila_fakes.FakeSnapshotAccessRule.create_access_rules(
count=2))
arglist = [
self.share_snapshot.id,
access_rules[0].id,
access_rules[1].id,
]
verifylist = [
('snapshot', self.share_snapshot.id),
('id', [access_rules[0].id, access_rules[1].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.assertEqual(self.snapshots_mock.deny.call_count,
len(access_rules))
self.assertIsNone(result)
def test_share_snapshot_access_deny_exception(self):
arglist = [
self.share_snapshot.id,
self.access_rule.id,
]
verifylist = [
('snapshot', self.share_snapshot.id),
('id', [self.access_rule.id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.snapshots_mock.deny.side_effect = exceptions.CommandError()
self.assertRaises(exceptions.CommandError,
self.cmd.take_action,
parsed_args)
class TestShareSnapshotAccessList(TestShareSnapshot):
access_rules_columns = [
'id',
'access_type',
'access_to',
'state',
]
def setUp(self):
super(TestShareSnapshotAccessList, self).setUp()
self.share_snapshot = (
manila_fakes.FakeShareSnapshot.create_one_snapshot())
self.snapshots_mock.get.return_value = self.share_snapshot
self.access_rules = (
manila_fakes.FakeSnapshotAccessRule.create_access_rules(
count=2))
self.snapshots_mock.access_list.return_value = self.access_rules
self.cmd = osc_share_snapshots.ShareSnapshotAccessList(
self.app, None)
self.values = (oscutils.get_dict_properties(
a._info, self.access_rules_columns) for a in self.access_rules)
def test_share_snapshot_access_list(self):
arglist = [
self.share_snapshot.id
]
verifylist = [
('snapshot', self.share_snapshot.id)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.snapshots_mock.access_list.assert_called_with(
self.share_snapshot)
self.assertEqual(self.access_rules_columns, columns)
self.assertCountEqual(self.values, data)
class TestShareSnapshotExportLocationList(TestShareSnapshot):
columns = ["id", "path"]
def setUp(self):
super(TestShareSnapshotExportLocationList, self).setUp()
self.share_snapshot = (
manila_fakes.FakeShareSnapshot.create_one_snapshot())
self.snapshots_mock.get.return_value = self.share_snapshot
self.export_locations = (
manila_fakes.FakeSnapshotExportLocation.create_export_locations()
)
self.export_locations_mock.list.return_value = self.export_locations
self.values = (oscutils.get_dict_properties(
e._info, self.columns) for e in self.export_locations)
self.cmd = osc_share_snapshots.ShareSnapshotListExportLocation(
self.app, None)
def test_snapshot_export_locations_list(self):
arglist = [
self.share_snapshot.id
]
verifylist = [
('snapshot', self.share_snapshot.id)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.export_locations_mock.list.assert_called_with(
snapshot=self.share_snapshot)
self.assertEqual(self.columns, columns)
self.assertCountEqual(self.values, data)
class TestShareSnapshotExportLocationShow(TestShareSnapshot):
def setUp(self):
super(TestShareSnapshotExportLocationShow, self).setUp()
self.share_snapshot = (
manila_fakes.FakeShareSnapshot.create_one_snapshot())
self.snapshots_mock.get.return_value = self.share_snapshot
self.export_location = (
manila_fakes.FakeSnapshotExportLocation.create_one_export_location() # noqa E501
)
self.export_locations_mock.get.return_value = self.export_location
self.cmd = osc_share_snapshots.ShareSnapshotShowExportLocation(
self.app, None)
def test_snapshot_export_locations_list(self):
arglist = [
self.share_snapshot.id,
self.export_location.id
]
verifylist = [
('snapshot', self.share_snapshot.id),
('export_location', self.export_location.id)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.export_locations_mock.get.assert_called_with(
export_location=self.export_location.id,
snapshot=self.share_snapshot)
self.assertEqual(tuple(self.export_location._info.keys()), columns)
self.assertCountEqual(self.export_location._info.values(), data)

View File

@ -68,6 +68,13 @@ openstack.share.v2 =
share_snapshot_set = manilaclient.osc.v2.share_snapshots:SetShareSnapshot
share_snapshot_unset = manilaclient.osc.v2.share_snapshots:UnsetShareSnapshot
share_snapshot_list = manilaclient.osc.v2.share_snapshots:ListShareSnapshot
share_snapshot_adopt = manilaclient.osc.v2.share_snapshots:AdoptShareSnapshot
share_snapshot_abandon = manilaclient.osc.v2.share_snapshots:AbandonShareSnapshot
share_snapshot_access_create = manilaclient.osc.v2.share_snapshots:ShareSnapshotAccessAllow
share_snapshot_access_delete = manilaclient.osc.v2.share_snapshots:ShareSnapshotAccessDeny
share_snapshot_access_list = manilaclient.osc.v2.share_snapshots:ShareSnapshotAccessList
share_snapshot_export_location_list = manilaclient.osc.v2.share_snapshots:ShareSnapshotListExportLocation
share_snapshot_export_location_show = manilaclient.osc.v2.share_snapshots:ShareSnapshotShowExportLocation
[coverage:run]
omit = manilaclient/tests/*