[OSC] Implement Share Replica Commands

In this patch we add the following openstack share commands:
share replica create
share replica delete
share replica list
share replica show
share replica set
share replica promote
share replica resync

Partially-implements: bp openstack-client-support
Change-Id: I8d0542faf56e6022e3b265957132f00ce063dcd6
This commit is contained in:
Maari Tamm 2020-11-29 09:33:32 +00:00
parent 5220acf12e
commit df3388677c
6 changed files with 992 additions and 0 deletions

View File

@ -81,3 +81,10 @@ user messages
.. autoprogram-cliff:: openstack.share.v2
:command: share message *
==============
share replicas
==============
.. autoprogram-cliff:: openstack.share.v2
:command: share replica *

View File

@ -80,3 +80,11 @@ def extract_extra_specs(extra_specs, specs_to_add):
else:
extra_specs[key] = value
return extra_specs
def format_column_headers(columns):
column_headers = []
for column in columns:
column_headers.append(
column.replace('_', ' ').title().replace('Id', 'ID'))
return column_headers

View File

@ -0,0 +1,329 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils as osc_utils
from manilaclient.common._i18n import _
from manilaclient.osc import utils
LOG = logging.getLogger(__name__)
class CreateShareReplica(command.ShowOne):
"""Create a share replica."""
_description = _("Create a replica of the given share")
def get_parser(self, prog_name):
parser = super(CreateShareReplica, self).get_parser(prog_name)
parser.add_argument(
"share",
metavar="<share>",
help=_("Name or ID of the share to replicate.")
)
parser.add_argument(
'--availability-zone',
metavar='<availability-zone>',
default=None,
help=_('Availability zone in which the replica should be created.')
)
parser.add_argument(
'--wait',
action='store_true',
default=False,
help=_('Wait for replica creation')
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
share = osc_utils.find_resource(share_client.shares,
parsed_args.share)
share_replica = share_client.share_replicas.create(
share,
availability_zone=parsed_args.availability_zone
)
if parsed_args.wait:
if not osc_utils.wait_for_status(
status_f=share_client.share_replicas.get,
res_id=share_replica.id,
success_status=['available']
):
LOG.error(_("ERROR: Share replica is in error state."))
share_replica = osc_utils.find_resource(
share_client.share_replicas,
share_replica.id)
return self.dict2columns(share_replica._info)
class DeleteShareReplica(command.Command):
"""Delete one or more share replicas."""
_description = _("Delete one or more share replicas")
def get_parser(self, prog_name):
parser = super(DeleteShareReplica, self).get_parser(prog_name)
parser.add_argument(
"replica",
metavar="<replica>",
nargs="+",
help=_("Name or ID of the replica(s) to delete")
)
parser.add_argument(
"--force",
action='store_true',
default=False,
help=_("Attempt to force delete a replica on its backend. "
"Using this option will purge the replica from Manila "
"even if it is not cleaned up on the backend. ")
)
parser.add_argument(
"--wait",
action='store_true',
default=False,
help=_("Wait for share replica deletion")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
result = 0
for replica in parsed_args.replica:
try:
replica_obj = osc_utils.find_resource(
share_client.share_replicas,
replica)
share_client.share_replicas.delete(
replica_obj,
force=parsed_args.force)
if parsed_args.wait:
if not osc_utils.wait_for_delete(
manager=share_client.share_replicas,
res_id=replica_obj.id):
result += 1
except Exception as e:
result += 1
LOG.error(_(
"Failed to delete a share replica with "
"name or ID '%(replica)s': %(e)s"),
{'replica': replica, 'e': e})
if result > 0:
total = len(parsed_args.replica)
msg = (_("%(result)s of %(total)s replicas failed "
"to delete.") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
class ListShareReplica(command.Lister):
"""List share replicas."""
_description = _("List share replicas")
def get_parser(self, prog_name):
parser = super(ListShareReplica, self).get_parser(prog_name)
parser.add_argument(
"--share",
metavar="<share>",
default=None,
help=_("Name or ID of the share to list replicas for.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
share = None
if parsed_args.share:
share = osc_utils.find_resource(
share_client.shares,
parsed_args.share)
replicas = share_client.share_replicas.list(share=share)
columns = [
'id',
'status',
'replica_state',
'share_id',
'host',
'availability_zone',
'updated_at',
]
column_headers = utils.format_column_headers(columns)
data = (osc_utils.get_dict_properties(
replica._info, columns) for replica in replicas)
return (column_headers, data)
class ShowShareReplica(command.ShowOne):
"""Show share replica."""
_description = _("Show details about a replica")
def get_parser(self, prog_name):
parser = super(ShowShareReplica, self).get_parser(prog_name)
parser.add_argument(
"replica",
metavar="<replica>",
help=_("ID of the share replica.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
replica = osc_utils.find_resource(
share_client.share_replicas,
parsed_args.replica)
return self.dict2columns(replica._info)
class SetShareReplica(command.Command):
"""Set share replica"""
_description = _("Explicitly set share replica status and/or "
"replica-state")
def get_parser(self, prog_name):
parser = super(SetShareReplica, self).get_parser(prog_name)
parser.add_argument(
"replica",
metavar="<replica>",
help=_("ID of the share replica to modify.")
)
parser.add_argument(
"--replica-state",
metavar="<replica-state>",
choices=['in_sync', 'out_of_sync', 'active', 'error'],
help=_("Indicate which replica_state to assign the replica. "
"Options include in_sync, out_of_sync, active and error.")
)
parser.add_argument(
"--status",
metavar="<status>",
choices=['available', 'error', 'creating', 'deleting',
'error_deleting'],
help=_("Indicate which status to assign the replica. Options "
"include available, error, creating, deleting and "
"error_deleting.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
result = 0
replica = osc_utils.find_resource(
share_client.share_replicas,
parsed_args.replica)
if parsed_args.replica_state:
try:
share_client.share_replicas.reset_replica_state(
replica,
parsed_args.replica_state
)
except Exception as e:
result += 1
LOG.error(_(
"Failed to set replica_state "
"'%(replica_state)s': %(exception)s"),
{'replica_state': parsed_args.replica_state,
'exception': e})
if parsed_args.status:
try:
share_client.share_replicas.reset_state(
replica,
parsed_args.status
)
except Exception as e:
result += 1
LOG.error(_(
"Failed to set status '%(status)s': %(exception)s"),
{'status': parsed_args.status, 'exception': e})
if not parsed_args.replica_state and not parsed_args.status:
raise exceptions.CommandError(_(
"Nothing to set. Please define "
"either '--replica_state' or '--status'."))
if result > 0:
raise exceptions.CommandError(_("One or more of the "
"set operations failed"))
class PromoteShareReplica(command.Command):
"""Promote share replica"""
_description = _("Promote specified replica to 'active' "
"replica_state.")
def get_parser(self, prog_name):
parser = super(PromoteShareReplica, self).get_parser(prog_name)
parser.add_argument(
"replica",
metavar="<replica>",
help=_("ID of the share replica.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
replica = osc_utils.find_resource(
share_client.share_replicas,
parsed_args.replica)
try:
share_client.share_replicas.promote(replica)
except Exception as e:
raise exceptions.CommandError(_(
"Failed to promote replica to 'active': %s" % (e)))
class ResyncShareReplica(command.Command):
"""Resync share replica"""
_description = _("Attempt to update the share replica with its "
"'active' mirror.")
def get_parser(self, prog_name):
parser = super(ResyncShareReplica, self).get_parser(prog_name)
parser.add_argument(
"replica",
metavar="<replica>",
help=_("ID of the share replica to resync.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
replica = osc_utils.find_resource(
share_client.share_replicas,
parsed_args.replica)
try:
share_client.share_replicas.resync(replica)
except Exception as e:
raise exceptions.CommandError(_(
"Failed to resync share replica: %s" % (e)))

View File

@ -36,6 +36,7 @@ class FakeShareClient(object):
self.quotas = mock.Mock()
self.share_snapshots = mock.Mock()
self.share_snapshot_export_locations = mock.Mock()
self.share_replicas = mock.Mock()
self.shares.resource_class = osc_fakes.FakeResource(None, {})
self.share_export_locations = mock.Mock()
self.share_export_locations.resource_class = (
@ -592,3 +593,59 @@ class FakeMessage(object):
FakeMessage.create_one_message(attrs))
return messages
class FakeShareReplica(object):
"""Fake a share replica"""
@staticmethod
def create_one_replica(attrs=None, methods=None):
"""Create a fake share replica
:param Dictionary attrs:
A dictionary with all attributes
:return:
A FakeResource object, with project_id, resource and so on
"""
attrs = attrs or {}
methods = methods or {}
share_replica = {
'availability_zone': None,
'cast_rules_to_readonly': True,
'created_at': datetime.datetime.now().isoformat(),
'host': None,
'id': 'replica-id-' + uuid.uuid4().hex,
'replica_state': None,
'share_id': 'share-id-' + uuid.uuid4().hex,
'share_network_id': None,
'share_server_id': None,
'status': None,
'updated_at': None
}
share_replica.update(attrs)
share_replica = osc_fakes.FakeResource(info=copy.deepcopy(
share_replica),
methods=methods,
loaded=True)
return share_replica
@staticmethod
def create_share_replicas(attrs=None, count=2):
"""Create multiple fake replicas.
:param Dictionary attrs:
A dictionary with all attributes
:param Integer count:
The number of share types to be faked
:return:
A list of FakeResource objects
"""
share_replicas = []
for n in range(0, count):
share_replicas.append(
FakeShareReplica.create_one_replica(attrs))
return share_replicas

View File

@ -0,0 +1,584 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from unittest import mock
from osc_lib import exceptions
from osc_lib import utils as oscutils
from manilaclient.osc import utils
from manilaclient.osc.v2 import share_replicas as osc_share_replicas
from manilaclient.tests.unit.osc import osc_utils
from manilaclient.tests.unit.osc.v2 import fakes as manila_fakes
class TestShareReplica(manila_fakes.TestShare):
def setUp(self):
super(TestShareReplica, self).setUp()
self.shares_mock = self.app.client_manager.share.shares
self.shares_mock.reset_mock()
self.replicas_mock = self.app.client_manager.share.share_replicas
self.replicas_mock.reset_mock()
class TestShareReplicaCreate(TestShareReplica):
def setUp(self):
super(TestShareReplicaCreate, self).setUp()
self.share = manila_fakes.FakeShare.create_one_share()
self.shares_mock.get.return_value = self.share
self.share_replica = (
manila_fakes.FakeShareReplica.create_one_replica(
attrs={
'availability_zone': 'manila-zone-1',
'status': 'available'}
))
self.replicas_mock.create.return_value = self.share_replica
self.replicas_mock.get.return_value = self.share_replica
self.cmd = osc_share_replicas.CreateShareReplica(self.app, None)
self.data = tuple(self.share_replica._info.values())
self.columns = tuple(self.share_replica._info.keys())
def test_share_replica_create_missing_args(self):
arglist = []
verifylist = []
self.assertRaises(
osc_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_share_replica_create(self):
arglist = [
self.share.id
]
verifylist = [
('share', self.share.id)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.replicas_mock.create.assert_called_with(
self.share,
availability_zone=None
)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
def test_share_replica_create_az(self):
arglist = [
self.share.id,
'--availability-zone', self.share.availability_zone
]
verifylist = [
('share', self.share.id),
('availability_zone', self.share.availability_zone)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.replicas_mock.create.assert_called_with(
self.share,
availability_zone=self.share.availability_zone
)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
def test_share_replica_create_wait(self):
arglist = [
self.share.id,
'--wait'
]
verifylist = [
('share', self.share.id),
('wait', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.replicas_mock.create.assert_called_with(
self.share,
availability_zone=None
)
self.replicas_mock.get.assert_called_with(self.share_replica.id)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
@mock.patch('manilaclient.osc.v2.share_replicas.LOG')
def test_share_replica_create_wait_exception(self, mock_logger):
arglist = [
self.share.id,
'--wait'
]
verifylist = [
('share', self.share.id),
('wait', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('osc_lib.utils.wait_for_status', return_value=False):
columns, data = self.cmd.take_action(parsed_args)
self.replicas_mock.create.assert_called_with(
self.share,
availability_zone=None
)
mock_logger.error.assert_called_with(
"ERROR: Share replica is in error state.")
self.replicas_mock.get.assert_called_with(self.share_replica.id)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
class TestShareReplicaDelete(TestShareReplica):
def setUp(self):
super(TestShareReplicaDelete, self).setUp()
self.share_replica = (
manila_fakes.FakeShareReplica.create_one_replica())
self.replicas_mock.get.return_value = self.share_replica
self.cmd = osc_share_replicas.DeleteShareReplica(self.app, None)
def test_share_replica_delete_missing_args(self):
arglist = []
verifylist = []
self.assertRaises(osc_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_share_replica_delete(self):
arglist = [
self.share_replica.id
]
verifylist = [
('replica', [self.share_replica.id])
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.replicas_mock.delete.assert_called_with(
self.share_replica,
force=False)
self.assertIsNone(result)
def test_share_replica_delete_force(self):
arglist = [
self.share_replica.id,
'--force'
]
verifylist = [
('replica', [self.share_replica.id]),
('force', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.replicas_mock.delete.assert_called_with(
self.share_replica,
force=True)
self.assertIsNone(result)
def test_share_replica_delete_multiple(self):
share_replicas = (
manila_fakes.FakeShareReplica.create_share_replicas(
count=2))
arglist = [
share_replicas[0].id,
share_replicas[1].id
]
verifylist = [
('replica', [share_replicas[0].id, share_replicas[1].id])
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.assertEqual(self.replicas_mock.delete.call_count,
len(share_replicas))
self.assertIsNone(result)
def test_share_snapshot_delete_exception(self):
arglist = [
self.share_replica.id
]
verifylist = [
('replica', [self.share_replica.id])
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.replicas_mock.delete.side_effect = exceptions.CommandError()
self.assertRaises(exceptions.CommandError,
self.cmd.take_action,
parsed_args)
def test_share_replica_delete_wait(self):
arglist = [
self.share_replica.id,
'--wait'
]
verifylist = [
('replica', [self.share_replica.id]),
('wait', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('osc_lib.utils.wait_for_delete', return_value=True):
result = self.cmd.take_action(parsed_args)
self.replicas_mock.delete.assert_called_with(
self.share_replica,
force=False)
self.replicas_mock.get.assert_called_with(self.share_replica.id)
self.assertIsNone(result)
def test_share_replica_delete_wait_exception(self):
arglist = [
self.share_replica.id,
'--wait'
]
verifylist = [
('replica', [self.share_replica.id]),
('wait', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('osc_lib.utils.wait_for_delete', return_value=False):
self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args
)
class TestShareReplicaList(TestShareReplica):
columns = [
'id',
'status',
'replica_state',
'share_id',
'host',
'availability_zone',
'updated_at',
]
column_headers = utils.format_column_headers(columns)
def setUp(self):
super(TestShareReplicaList, self).setUp()
self.share = manila_fakes.FakeShare.create_one_share()
self.shares_mock.get.return_value = self.share
self.replicas_list = (
manila_fakes.FakeShareReplica.create_share_replicas(
count=2))
self.replicas_mock.list.return_value = self.replicas_list
self.values = (oscutils.get_dict_properties(
i._info, self.columns) for i in self.replicas_list)
self.cmd = osc_share_replicas.ListShareReplica(self.app, None)
def test_share_replica_list(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.replicas_mock.list.assert_called_with(share=None)
self.assertEqual(self.column_headers, columns)
self.assertEqual(list(self.values), list(data))
def test_share_replica_list_for_share(self):
arglist = [
'--share', self.share.id
]
verifylist = [
('share', self.share.id)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.replicas_mock.list.assert_called_with(share=self.share)
self.assertEqual(self.column_headers, columns)
self.assertEqual(list(self.values), list(data))
class TestShareReplicaShow(TestShareReplica):
def setUp(self):
super(TestShareReplicaShow, self).setUp()
self.share_replica = (
manila_fakes.FakeShareReplica.create_one_replica()
)
self.replicas_mock.get.return_value = self.share_replica
self.cmd = osc_share_replicas.ShowShareReplica(self.app, None)
self.data = tuple(self.share_replica._info.values())
self.columns = tuple(self.share_replica._info.keys())
def test_share_replica_show_missing_args(self):
arglist = []
verifylist = []
self.assertRaises(
osc_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_share_replica_show(self):
arglist = [
self.share_replica.id
]
verifylist = [
('replica', self.share_replica.id)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.replicas_mock.get.assert_called_with(
self.share_replica.id
)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
class TestShareReplicaSet(TestShareReplica):
def setUp(self):
super(TestShareReplicaSet, self).setUp()
self.share_replica = (
manila_fakes.FakeShareReplica.create_one_replica()
)
self.replicas_mock.get.return_value = self.share_replica
self.cmd = osc_share_replicas.SetShareReplica(self.app, None)
def test_share_replica_set_replica_state(self):
new_replica_state = 'in_sync'
arglist = [
self.share_replica.id,
'--replica-state', new_replica_state
]
verifylist = [
('replica', self.share_replica.id),
('replica_state', new_replica_state)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.replicas_mock.reset_replica_state.assert_called_with(
self.share_replica,
new_replica_state)
self.assertIsNone(result)
def test_share_replica_set_replica_state_exception(self):
new_replica_state = 'in_sync'
arglist = [
self.share_replica.id,
'--replica-state', new_replica_state
]
verifylist = [
('replica', self.share_replica.id),
('replica_state', new_replica_state)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.replicas_mock.reset_replica_state.side_effect = Exception()
self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
def test_share_replica_set_status(self):
new_status = 'available'
arglist = [
self.share_replica.id,
'--status', new_status
]
verifylist = [
('replica', self.share_replica.id),
('status', new_status)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.replicas_mock.reset_state.assert_called_with(
self.share_replica,
new_status)
self.assertIsNone(result)
def test_share_replica_set_status_exception(self):
new_status = 'available'
arglist = [
self.share_replica.id,
'--status', new_status
]
verifylist = [
('replica', self.share_replica.id),
('status', new_status)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.replicas_mock.reset_state.side_effect = Exception()
self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
def test_share_replica_set_nothing_defined(self):
arglist = [
self.share_replica.id,
]
verifylist = [
('replica', self.share_replica.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
class TestShareReplicaPromote(TestShareReplica):
def setUp(self):
super(TestShareReplicaPromote, self).setUp()
self.share_replica = (
manila_fakes.FakeShareReplica.create_one_replica()
)
self.replicas_mock.get.return_value = self.share_replica
self.cmd = osc_share_replicas.PromoteShareReplica(
self.app, None)
def test_share_replica_promote(self):
arglist = [
self.share_replica.id,
]
verifylist = [
('replica', self.share_replica.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.replicas_mock.promote.assert_called_with(
self.share_replica)
self.assertIsNone(result)
def test_share_replica_promote_exception(self):
arglist = [
self.share_replica.id,
]
verifylist = [
('replica', self.share_replica.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.replicas_mock.promote.side_effect = Exception()
self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
class TestShareReplicaResync(TestShareReplica):
def setUp(self):
super(TestShareReplicaResync, self).setUp()
self.share_replica = (
manila_fakes.FakeShareReplica.create_one_replica()
)
self.replicas_mock.get.return_value = self.share_replica
self.cmd = osc_share_replicas.ResyncShareReplica(
self.app, None)
def test_share_replica_resync(self):
arglist = [
self.share_replica.id,
]
verifylist = [
('replica', self.share_replica.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.replicas_mock.resync.assert_called_with(
self.share_replica)
self.assertIsNone(result)
def test_share_replica_resync_exception(self):
arglist = [
self.share_replica.id,
]
verifylist = [
('replica', self.share_replica.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.replicas_mock.resync.side_effect = Exception()
self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)

View File

@ -82,6 +82,13 @@ openstack.share.v2 =
share_message_delete = manilaclient.osc.v2.messages:DeleteMessage
share_message_list = manilaclient.osc.v2.messages:ListMessage
share_message_show = manilaclient.osc.v2.messages:ShowMessage
share_replica_create = manilaclient.osc.v2.share_replicas:CreateShareReplica
share_replica_delete = manilaclient.osc.v2.share_replicas:DeleteShareReplica
share_replica_list = manilaclient.osc.v2.share_replicas:ListShareReplica
share_replica_show = manilaclient.osc.v2.share_replicas:ShowShareReplica
share_replica_set = manilaclient.osc.v2.share_replicas:SetShareReplica
share_replica_promote = manilaclient.osc.v2.share_replicas:PromoteShareReplica
share_replica_resync = manilaclient.osc.v2.share_replicas:ResyncShareReplica
[coverage:run]
omit = manilaclient/tests/*