OSC Implementation for Share Networks

Add the following OSC commands:

 share network create
 share network list
 share network show
 share network set
 share network unset
 share network delete

Partially-implements: bp openstack-client-support
Change-Id: I6ddb2fec1d9421bfa8b11c82da5379ada80f610b
Co-Authored-By: Vida Haririan <vhariria@redhat.com>
Signed-off-by: Goutham Pacha Ravi <gouthampravi@gmail.com>
This commit is contained in:
Goutham Pacha Ravi 2021-08-18 09:05:22 -07:00
parent cc440609bd
commit d40205a2f2
15 changed files with 1570 additions and 36 deletions

View File

@ -61,6 +61,13 @@ share access rules
.. autoprogram-cliff:: openstack.share.v2
:command: share access *
==============
share networks
==============
.. autoprogram-cliff:: openstack.share.v2
:command: share network *
===========
share types
===========

View File

@ -261,19 +261,18 @@ def exit(msg=''):
sys.exit(1)
def transform_export_locations_to_string_view(export_locations):
export_locations_string_view = ''
replica_export_location_ignored_keys = (
'replica_state', 'availability_zone', 'share_replica_id')
for el in export_locations:
if hasattr(el, '_info'):
export_locations_dict = el._info
def convert_dict_list_to_string(data, ignored_keys=None):
ignored_keys = ignored_keys or []
if not isinstance(data, list):
data = [data]
data_string = ''
for datum in data:
if hasattr(datum, '_info'):
datum_dict = datum._info
else:
export_locations_dict = el
for k, v in export_locations_dict.items():
# NOTE(gouthamr): We don't want to show replica related info
# twice in the output, so ignore those.
if k not in replica_export_location_ignored_keys:
export_locations_string_view += '\n%(k)s = %(v)s' % {
datum_dict = datum
for k, v in datum_dict.items():
if k not in ignored_keys:
data_string += '\n%(k)s = %(v)s' % {
'k': k, 'v': v}
return export_locations_string_view
return data_string

View File

@ -555,9 +555,12 @@ class ShowShare(command.ShowOne):
parsed_args.share)
export_locations = share_client.share_export_locations.list(share_obj)
export_locations = (
cliutils.transform_export_locations_to_string_view(
export_locations))
export_locations = cliutils.convert_dict_list_to_string(
export_locations,
ignored_keys=['replica_state',
'availability_zone',
'share_replica_id']
)
data = share_obj._info
data['export_locations'] = export_locations

View File

@ -215,7 +215,7 @@ class ShareInstanceShow(command.ShowOne):
if parsed_args.formatter == 'table':
instance._info['export_locations'] = (
cliutils.transform_export_locations_to_string_view(
cliutils.convert_dict_list_to_string(
instance._info['export_locations']))
instance._info.pop('links', None)

View File

@ -0,0 +1,680 @@
# Copyright 2021 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from openstackclient.identity import common as identity_common
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils as oscutils
from manilaclient import api_versions
from manilaclient.common._i18n import _
from manilaclient.common import cliutils
LOG = logging.getLogger(__name__)
class ListShareNetwork(command.Lister):
_description = _("List share networks")
def get_parser(self, prog_name):
parser = super(ListShareNetwork, self).get_parser(prog_name)
parser.add_argument(
'--name',
metavar="<share-network>",
help=_('Filter share networks by name')
)
parser.add_argument(
'--name~',
metavar="<share-network-name-pattern>",
help=_('Filter share networks by name-pattern. Available only '
'for microversion >= 2.36.')
)
parser.add_argument(
'--description',
metavar="<share-network-description>",
help=_('Filter share networks by description. Available '
'only for microversion >= 2.36')
)
parser.add_argument(
'--description~',
metavar="<share-network-description-pattern>",
help=_('Filter share networks by description-pattern. Available '
'only for microversion >= 2.36.')
)
parser.add_argument(
'--all-projects',
action='store_true',
default=False,
help=_('Include all projects (admin only)'),
)
parser.add_argument(
'--project',
metavar='<project>',
help=_('Filter share networks by project (name or ID) '
'(admin only)')
)
identity_common.add_project_domain_option_to_parser(parser)
parser.add_argument(
'--created-since',
metavar='<yyyy-mm-dd>',
help=_('Filter share networks by date they were created after. '
'The date can be in the format yyyy-mm-dd.')
)
parser.add_argument(
'--created-before',
metavar='<yyyy-mm-dd>',
help=_('Filter share networks by date they were created before. '
'The date can be in the format yyyy-mm-dd.')
)
parser.add_argument(
'--security-service',
metavar='<security-service>',
help=_('Filter share networks by the name or ID of a security '
'service attached to the network.')
)
parser.add_argument(
'--neutron-net-id',
metavar='<neutron-net-id>',
help=_('Filter share networks by the ID of a neutron network.')
)
parser.add_argument(
'--neutron-subnet-id',
metavar='<neutron-subnet-id>',
help=_('Filter share networks by the ID of a neutron sub network.')
)
parser.add_argument(
'--network-type',
metavar='<network-type>',
help=_('Filter share networks by the type of network. Examples '
'include "flat", "vlan", "vxlan", "geneve", etc.')
)
parser.add_argument(
'--segmentation-id',
metavar='<segmentation-id>',
help=_('Filter share networks by the segmentation ID of network. '
'Relevant only for segmented networks such as "vlan", '
'"vxlan", "geneve", etc.')
)
parser.add_argument(
'--cidr',
metavar='<X.X.X.X/X>',
help=_('Filter share networks by the CIDR of network.')
)
parser.add_argument(
'--ip-version',
metavar='4/6',
choices=['4', '6'],
help=_('Filter share networks by the IP Version of the network, '
'either 4 or 6.')
)
parser.add_argument(
'--detail',
action='store_true',
default=False,
help=_("List share networks with details")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
identity_client = self.app.client_manager.identity
columns = ['ID', 'Name']
if parsed_args.detail:
columns.extend([
'Status',
'Created At',
'Updated At',
'Description',
])
project_id = None
if parsed_args.project:
project_id = identity_common.find_project(
identity_client,
parsed_args.project,
parsed_args.project_domain).id
# set value of 'all_tenants' when using project option
all_tenants = bool(parsed_args.project) or parsed_args.all_projects
if parsed_args.all_projects:
columns.append('Project ID')
search_opts = {
'all_tenants': all_tenants,
'project_id': project_id,
'name': parsed_args.name,
'created_since': parsed_args.created_since,
'created_before': parsed_args.created_before,
'neutron_net_id': parsed_args.neutron_net_id,
'neutron_subnet_id': parsed_args.neutron_subnet_id,
'network_type': parsed_args.network_type,
'segmentation_id': parsed_args.segmentation_id,
'cidr': parsed_args.cidr,
'ip_version': parsed_args.ip_version,
'security_service': parsed_args.security_service,
}
if share_client.api_version >= api_versions.APIVersion("2.36"):
search_opts['name~'] = getattr(parsed_args, 'name~')
search_opts['description~'] = getattr(parsed_args, 'description~')
search_opts['description'] = parsed_args.description
elif (parsed_args.description or getattr(parsed_args, 'name~') or
getattr(parsed_args, 'description~')):
raise exceptions.CommandError(
"Pattern based filtering (name~, description~ and description)"
" is only available with manila API version >= 2.36")
data = share_client.share_networks.list(search_opts=search_opts)
return (
columns,
(oscutils.get_item_properties(s, columns) for s in data)
)
class ShowShareNetwork(command.ShowOne):
"""Display a share network"""
_description = _("Show details about a share network")
def get_parser(self, prog_name):
parser = super(ShowShareNetwork, self).get_parser(prog_name)
parser.add_argument(
"share_network",
metavar="<share-network>",
help=_("Name or ID of the share network to display")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
share_network = oscutils.find_resource(
share_client.share_networks,
parsed_args.share_network)
data = share_network._info
# Add security services information
security_services = share_client.security_services.list(
search_opts={'share_network_id': share_network.id}, detailed=False)
data['security_services'] = [
{
'security_service_name': ss.name,
'security_service_id': ss.id,
}
for ss in security_services
]
if parsed_args.formatter == 'table':
data['share_network_subnets'] = (
cliutils.convert_dict_list_to_string(
data['share_network_subnets'])
)
data['security_services'] = cliutils.convert_dict_list_to_string(
data['security_services']
)
data.pop('links', None)
return self.dict2columns(data)
class CreateShareNetwork(command.ShowOne):
"""Create a share network."""
_description = _("Create a share network")
def get_parser(self, prog_name):
parser = super(CreateShareNetwork, self).get_parser(prog_name)
parser.add_argument(
"--name",
metavar="<share-network>",
help=_("Add a name to the share network (Optional)")
)
parser.add_argument(
"--description",
metavar="<description>",
default=None,
help=_("Add a description to the share network (Optional).")
)
parser.add_argument(
"--neutron-net-id",
metavar="<neutron-net-id>",
default=None,
help=_("ID of the neutron network that must be associated with "
"the share network (Optional). The network specified will "
"be associated with the 'default' share network subnet, "
"unless 'availability-zone' is also specified.")
)
parser.add_argument(
"--neutron-subnet-id",
metavar="<neutron-subnet-id>",
default=None,
help=_("ID of the neutron sub-network that must be associated "
"with the share network (Optional). The subnet specified "
"will be associated with the 'default' share network "
"subnet, unless 'availability-zone' is also specified.")
)
parser.add_argument(
"--availability-zone",
metavar="<availability-zone>",
default=None,
help=_("Name or ID of the avalilability zone to assign the "
"specified network subnet parameters to. Must be used "
"in conjunction with 'neutron-net-id' and "
"'neutron-subnet-id'. Do not specify this parameter if "
"the network must be available across all availability "
"zones ('default' share network subnet). Available "
"only for microversion >= 2.51.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
availability_zone = None
if (parsed_args.availability_zone and
share_client.api_version < api_versions.APIVersion("2.51")):
raise exceptions.CommandError(
"Availability zone can be specified only with manila API "
"version >= 2.51")
elif parsed_args.availability_zone:
availability_zone = oscutils.find_resource(
share_client.availability_zones,
parsed_args.availability_zone).name
share_network = share_client.share_networks.create(
name=parsed_args.name,
description=parsed_args.description,
neutron_net_id=parsed_args.neutron_net_id,
neutron_subnet_id=parsed_args.neutron_subnet_id,
availability_zone=availability_zone,
)
share_network_data = share_network._info
share_network_data.pop('links', None)
if parsed_args.formatter == 'table':
share_network_data['share_network_subnets'] = (
cliutils.convert_dict_list_to_string(
share_network_data['share_network_subnets'])
)
return self.dict2columns(share_network_data)
class DeleteShareNetwork(command.Command):
"""Delete one or more share networks"""
_description = _("Delete one or more share networks")
def get_parser(self, prog_name):
parser = super(DeleteShareNetwork, self).get_parser(prog_name)
parser.add_argument(
"share_network",
metavar="<share-network>",
nargs="+",
help=_("Name or ID of the share network(s) to delete")
)
parser.add_argument(
"--wait",
action='store_true',
default=False,
help=_("Wait for the share network(s) to be deleted")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
result = 0
for share_network in parsed_args.share_network:
try:
share_network_obj = oscutils.find_resource(
share_client.share_networks,
share_network)
share_client.share_networks.delete(
share_network_obj)
if parsed_args.wait:
if not oscutils.wait_for_delete(
manager=share_client.share_networks,
res_id=share_network_obj.id):
result += 1
except Exception as e:
result += 1
LOG.error(f"Failed to delete share network with "
f"name or ID {share_network}: {e}")
if result > 0:
total = len(parsed_args.share_network)
msg = f"{result} of {total} share networks failed to be deleted."
raise exceptions.CommandError(msg)
class SetShareNetwork(command.Command):
"""Set share network properties."""
_description = _("Set share network properties")
def get_parser(self, prog_name):
parser = super(SetShareNetwork, self).get_parser(prog_name)
parser.add_argument(
"share_network",
metavar="<share-network>",
help=_('Name or ID of the share network to set a property for')
)
parser.add_argument(
"--name",
metavar="<name>",
default=None,
help=_("Set a new name to the share network.")
)
parser.add_argument(
"--description",
metavar="<description>",
default=None,
help=_("Set a new description to the share network.")
)
parser.add_argument(
"--status",
metavar="<status>",
choices=['active', 'error', 'network_change'],
help=_("Assign a status to the share network (Admin only). "
"Options include : active, error, network_change. "
"Available only for microversion >= 2.63.")
)
parser.add_argument(
'--neutron-net-id',
metavar='<neutron-net-id>',
help=_("Update the neutron network associated with the default "
"share network subnet. If a default share network subnet "
"is not present or if the share network is in use, setting "
"this will fail.")
)
parser.add_argument(
'--neutron-subnet-id',
metavar='<neutron-subnet-id>',
help=_("Update the neutron subnetwork associated with the default "
"share network subnet. If a default share network subnet "
"is not present or if the share network is in use, setting "
"this will fail.")
)
parser.add_argument(
'--current-security-service',
metavar='<security-service>',
help=_("Name or ID of a security service that is currently "
"associated with a share network that must be replaced. "
"Replacing a security service is only available for "
"microversions >= 2.63.")
)
parser.add_argument(
'--new-security-service',
metavar='<security-service>',
help=_("Name or ID of a security service that must be associated "
"with the share network. When replacing a security "
"service, the current security service must also be "
"provided with the '--current-security-service' option. "
"Replacing a security service is only available for "
"microversions >= 2.63.")
)
parser.add_argument(
'--check-only',
action='store_true',
help=_("Run a dry-run of a security service replacement. "
"Available only for microversion >=2.63")
)
parser.add_argument(
'--restart-check',
action='store_true',
help=_("Restart a dry-run of a security service "
"replacement. Helpful when check results are "
"stale. Available only for microversion >=2.63.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
result = 0
share_network = oscutils.find_resource(share_client.share_networks,
parsed_args.share_network)
kwargs = {}
# some args are only for newer API micro-version
if share_client.api_version < api_versions.APIVersion("2.63"):
new_args = ('status', 'check_only', 'restart_check',
'current_security_service')
invalid_args = [
arg for arg in new_args
if getattr(parsed_args, arg) is not None
]
raise exceptions.CommandError(
f"Use of {' '.join(invalid_args)} is only available for API "
f"microversions >= 2.63.")
# If "--check-only" and "--restart-check" are used, a new security
# service option must be supplied
if parsed_args.check_only or parsed_args.restart_check:
if not parsed_args.new_security_service:
raise exceptions.CommandError(
_("Must provide new security service with --check-only "
"and --restart-check."))
if parsed_args.name is not None:
kwargs['name'] = parsed_args.name
if parsed_args.description is not None:
kwargs['description'] = parsed_args.description
if parsed_args.neutron_net_id is not None:
kwargs['neutron_net_id'] = parsed_args.neutron_net_id
if parsed_args.neutron_subnet_id is not None:
kwargs['neutron_subnet_id'] = parsed_args.neutron_subnet_id
if kwargs:
try:
share_client.share_networks.update(
share_network,
**kwargs
)
except Exception as e:
result += 1
LOG.error(f"Failed to set share network properties "
f"{kwargs}: {e}")
if parsed_args.status:
try:
share_client.share_networks.reset_state(
share_network,
parsed_args.status
)
except Exception as e:
result += 1
LOG.error(f"Failed to update share network status to "
f"{parsed_args.status}: {e}")
new_security_service = current_security_service = None
if parsed_args.new_security_service:
new_security_service = oscutils.find_resource(
share_client.security_services,
parsed_args.new_security_service)
if parsed_args.current_security_service:
current_security_service = oscutils.find_resource(
share_client.security_services,
parsed_args.current_security_service)
if new_security_service and not current_security_service:
try:
if parsed_args.check_only:
check_result = share_client.share_networks\
.add_security_service_check(
share_network, new_security_service,
reset_operation=parsed_args.restart_check)
is_compatible = check_result[1].get('compatible')
# NOTE(gouthamr): We're logging to the console here,
# because there's no need to print useful
# information beyond the result. Use of error
# logging is a hack, since other kinds of logs may
# not print to console by default.
if is_compatible is None:
LOG.error("Security service check has been "
"successfully initiated, please retry "
"after some time.")
elif is_compatible:
LOG.error(
f"Security service "
f"{parsed_args.new_security_service} can "
f"be added to share network "
f"{parsed_args.share_network}.")
else:
LOG.error(
f"Security service "
f"{parsed_args.new_security_service} cannot "
f"be added to share network "
f"{parsed_args.share_network}.")
else:
share_client.share_networks.add_security_service(
share_network, new_security_service)
except Exception as e:
result += 1
LOG.error(f"Failed to add security service "
f"{parsed_args.new_security_service} to "
f"share network {parsed_args.share_network}: {e}")
if new_security_service and current_security_service:
try:
if parsed_args.check_only:
check_result = share_client.share_networks.\
update_share_network_security_service_check(
share_network,
current_security_service,
new_security_service,
reset_operation=parsed_args.restart_check)
is_compatible = check_result[1].get('compatible')
# NOTE(gouthamr): We're logging to the console here,
# because there's no need to print useful
# information beyond the result. Use of error
# logging is a hack, since other kinds of logs may
# not print to console by default.
if is_compatible is None:
LOG.error("Security service check has been "
"successfully initiated, please retry "
"after some time.")
elif is_compatible:
LOG.error(
f"Security service "
f"{parsed_args.current_security_service} can "
f"be replaced with security service "
f"{parsed_args.new_security_service} on share "
f"network {parsed_args.share_network}.")
else:
LOG.error(
f"Security service "
f"{parsed_args.current_security_service} cannot "
f"be replaced with security service "
f"{parsed_args.new_security_service} on share "
f"network {parsed_args.share_network}.")
else:
share_client.share_networks.\
update_share_network_security_service(
share_network,
current_security_service,
new_security_service)
except Exception as e:
result += 1
LOG.error(f"Failed to update security service "
f"{parsed_args.current_security_service} on "
f"share network "
f"{parsed_args.share_network}: {e}")
if result > 0:
raise exceptions.CommandError(_("One or more of the "
"set operations failed"))
class UnsetShareNetwork(command.Command):
"""Unset a share network property."""
_description = _("Unset a share network property")
def get_parser(self, prog_name):
parser = super(UnsetShareNetwork, self).get_parser(prog_name)
parser.add_argument(
"share_network",
metavar="<share-network>",
help=_("Name or ID of the share network to unset a property of")
)
parser.add_argument(
"--name",
action='store_true',
help=_("Unset share network name."),
)
parser.add_argument(
"--description",
action='store_true',
help=_("Unset share network description."),
)
parser.add_argument(
"--security-service",
metavar="<security-service>",
help=_("Disassociate a security service from the share network. "
"This is only possible with unused share networks."),
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
share_network = oscutils.find_resource(
share_client.share_networks,
parsed_args.share_network)
security_service = None
if parsed_args.security_service:
security_service = oscutils.find_resource(
share_client.security_services,
parsed_args.security_service)
result = 0
kwargs = {}
if parsed_args.name:
# the SDK unsets name if it is an empty string
kwargs['name'] = ''
if parsed_args.description:
# the SDK unsets description if it is an empty string
kwargs['description'] = ''
if kwargs:
try:
share_client.share_networks.update(
share_network,
**kwargs
)
except Exception as e:
result += 1
LOG.error(f"Failed to unset share network properties "
f"{kwargs}: {e}")
if security_service:
try:
share_client.share_networks.remove_security_service(
share_network, security_service)
except Exception as e:
result += 1
LOG.error(f"Failed to dissociate security service"
f"{parsed_args.security_service} from "
f"{parsed_args.share_network}: {e}")
if result > 0:
raise exceptions.CommandError(_("One or more of the "
"unset operations failed"))

View File

@ -96,7 +96,7 @@ class ShowShareSnapshotInstance(command.ShowOne):
if parsed_args.formatter == 'table':
snapshot_instance._info['export_locations'] = (
cliutils.transform_export_locations_to_string_view(
cliutils.convert_dict_list_to_string(
snapshot_instance._info['export_locations']))
snapshot_instance._info.pop('links', None)

View File

@ -148,9 +148,9 @@ class ShowShareSnapshot(command.ShowOne):
export_locations = (
share_client.share_snapshot_export_locations.list(
share_snapshot))
export_locations = (
cliutils.transform_export_locations_to_string_view(
export_locations))
export_locations = cliutils.convert_dict_list_to_string(
export_locations, ignored_keys=['links'])
data = share_snapshot._info
data['export_locations'] = export_locations

View File

@ -39,6 +39,8 @@ class FakeShareClient(object):
self.share_snapshot_instances = mock.Mock()
self.share_replicas = mock.Mock()
self.share_replica_export_locations = mock.Mock()
self.share_networks = mock.Mock()
self.security_services = mock.Mock()
self.shares.resource_class = osc_fakes.FakeResource(None, {})
self.share_instance_export_locations = mock.Mock()
self.share_export_locations = mock.Mock()
@ -1000,3 +1002,72 @@ class FakeShareLimits(object):
share_limits = osc_fakes.FakeLimitsResource(
info=copy.deepcopy(share_limits), loaded=True)
return share_limits
class FakeShareNetwork(object):
"""Fake a share network"""
@staticmethod
def create_one_share_network(attrs=None, methods=None):
"""Create a fake share network
:param Dictionary attrs:
A dictionary with all attributes
:return:
A FakeResource object, with project_id, resource and so on
"""
attrs = attrs or {}
methods = methods or {}
share_network = {
'id': str(uuid.uuid4()),
'project_id': uuid.uuid4().hex,
'created_at': datetime.datetime.now().isoformat(),
'description': 'description-' + uuid.uuid4().hex,
'name': 'name-' + uuid.uuid4().hex,
"status": "active",
"security_service_update_support": True,
'share_network_subnets': [
{
'id': str(uuid.uuid4()),
"availability_zone": None,
"created_at": datetime.datetime.now().isoformat(),
"updated_at": datetime.datetime.now().isoformat(),
"segmentation_id": 1010,
"neutron_net_id": str(uuid.uuid4()),
"neutron_subnet_id": str(uuid.uuid4()),
"ip_version": 4,
"cidr": "10.0.0.0/24",
"network_type": "vlan",
"mtu": "1500",
"gateway": "10.0.0.1"
},
],
}
share_network.update(attrs)
share_network = osc_fakes.FakeResource(info=copy.deepcopy(
share_network),
methods=methods,
loaded=True)
return share_network
@staticmethod
def create_share_networks(attrs=None, count=2):
"""Create multiple fake share networks.
:param Dictionary attrs:
A dictionary with all attributes
:param Integer count:
The number of share networks to be faked
:return:
A list of FakeResource objects
"""
share_networks = []
for n in range(count):
share_networks.append(
FakeShareNetwork.create_one_share_network(attrs))
return share_networks

View File

@ -1030,8 +1030,8 @@ class TestShareShow(TestShare):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
cliutils.transform_export_locations_to_string_view = mock.Mock()
cliutils.transform_export_locations_to_string_view.return_value = dict(
cliutils.convert_dict_list_to_string = mock.Mock()
cliutils.convert_dict_list_to_string.return_value = dict(
self._export_location)
columns, data = self.cmd.take_action(parsed_args)

View File

@ -292,8 +292,8 @@ class TestShareInstanceShow(TestShareInstance):
expected_columns += ('export_locations',)
expected_data_dic += (dict(self.export_locations[0]),)
cliutils.transform_export_locations_to_string_view = mock.Mock()
cliutils.transform_export_locations_to_string_view.return_value = dict(
cliutils.convert_dict_list_to_string = mock.Mock()
cliutils.convert_dict_list_to_string.return_value = dict(
self.export_locations[0])
arglist = [

View File

@ -0,0 +1,757 @@
# Copyright 2021 Red Hat Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from unittest import mock
import uuid
import ddt
from osc_lib import exceptions
from osc_lib import utils as oscutils
from manilaclient import api_versions
from manilaclient.osc.v2 import share_networks as osc_share_networks
from manilaclient.tests.unit.osc import osc_utils
from manilaclient.tests.unit.osc.v2 import fakes as manila_fakes
COLUMNS = ['ID', 'Name']
COLUMNS_DETAIL = [
'ID',
'Name',
'Status',
'Created At',
'Updated At',
'Description',
]
class TestShareNetwork(manila_fakes.TestShare):
def setUp(self):
super(TestShareNetwork, self).setUp()
self.share_networks_mock = self.app.client_manager.share.share_networks
self.share_networks_mock.reset_mock()
self.app.client_manager.share.api_version = api_versions.APIVersion(
api_versions.MAX_VERSION)
@ddt.ddt
class TestShareNetworkCreate(TestShareNetwork):
def setUp(self):
super(TestShareNetworkCreate, self).setUp()
self.share_network = (
manila_fakes.FakeShareNetwork.create_one_share_network())
self.share_networks_mock.create.return_value = self.share_network
self.cmd = osc_share_networks.CreateShareNetwork(self.app, None)
self.data = self.share_network._info.values()
self.columns = self.share_network._info.keys()
@ddt.data('table', 'yaml')
def test_share_network_create_formatter(self, formatter):
arglist = [
'-f', formatter,
]
verifylist = [
('formatter', formatter),
]
expected_data = self.share_network._info
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.share_networks_mock.create.assert_called_once_with(
name=None,
description=None,
neutron_net_id=None,
neutron_subnet_id=None,
availability_zone=None)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(expected_data.values(), data)
def test_share_network_create_with_args(self):
fake_neutron_net_id = str(uuid.uuid4())
fake_neutron_subnet_id = str(uuid.uuid4())
fake_az = mock.Mock()
fake_az.name = 'nova'
fake_az.id = str(uuid.uuid4())
arglist = [
'--name', 'zorilla-net',
'--description', 'fastest-backdoor-network-ever',
'--neutron-net-id', fake_neutron_net_id,
'--neutron-subnet-id', fake_neutron_subnet_id,
'--availability-zone', 'nova',
]
verifylist = [
('name', 'zorilla-net'),
('description', 'fastest-backdoor-network-ever'),
('neutron_net_id', fake_neutron_net_id),
('neutron_subnet_id', fake_neutron_subnet_id),
('availability_zone', 'nova'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('osc_lib.utils.find_resource',
return_value=fake_az):
columns, data = self.cmd.take_action(parsed_args)
self.share_networks_mock.create.assert_called_once_with(
name='zorilla-net',
description='fastest-backdoor-network-ever',
neutron_net_id=fake_neutron_net_id,
neutron_subnet_id=fake_neutron_subnet_id,
availability_zone='nova',
)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
@ddt.ddt
class TestShareNetworkDelete(TestShareNetwork):
def setUp(self):
super(TestShareNetworkDelete, self).setUp()
self.share_network = (
manila_fakes.FakeShareNetwork.create_one_share_network())
self.share_networks_mock.get.return_value = self.share_network
self.cmd = osc_share_networks.DeleteShareNetwork(self.app, None)
def test_share_network_delete_missing_args(self):
arglist = []
verifylist = []
self.assertRaises(osc_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
@ddt.data(True, False)
def test_share_network_delete_with_wait(self, wait):
oscutils.wait_for_delete = mock.Mock(return_value=True)
share_networks = (
manila_fakes.FakeShareNetwork.create_share_networks(
count=2))
arglist = [
share_networks[0].id,
share_networks[1].name,
]
if wait:
arglist.append('--wait')
verifylist = [
('share_network', [share_networks[0].id, share_networks[1].name])
]
if wait:
verifylist.append(('wait', True))
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('osc_lib.utils.find_resource',
side_effect=share_networks):
result = self.cmd.take_action(parsed_args)
self.assertEqual(self.share_networks_mock.delete.call_count,
len(share_networks))
if wait:
oscutils.wait_for_delete.assert_has_calls([
mock.call(manager=self.share_networks_mock,
res_id=share_networks[0].id),
mock.call(manager=self.share_networks_mock,
res_id=share_networks[1].id)
])
self.assertIsNone(result)
def test_share_network_delete_exception(self):
arglist = [
self.share_network.id,
]
verifylist = [
('share_network', [self.share_network.id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.share_networks_mock.delete.side_effect = exceptions.CommandError()
self.assertRaises(exceptions.CommandError,
self.cmd.take_action,
parsed_args)
def test_share_network_delete_wait_fails(self):
oscutils.wait_for_delete = mock.Mock(return_value=False)
arglist = [
self.share_network.id,
'--wait',
]
verifylist = [
('share_network', [self.share_network.id]),
('wait', True),
]
with mock.patch('osc_lib.utils.find_resource',
return_value=self.share_network):
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(exceptions.CommandError,
self.cmd.take_action,
parsed_args)
self.share_networks_mock.delete.assert_called_once_with(
self.share_network)
@ddt.ddt
class TestShareNetworkShow(TestShareNetwork):
def setUp(self):
super(TestShareNetworkShow, self).setUp()
self.share_network = (
manila_fakes.FakeShareNetwork.create_one_share_network())
self.share_networks_mock.get.return_value = self.share_network
self.security_services_mock = (
self.app.client_manager.share.security_services)
self.cmd = osc_share_networks.ShowShareNetwork(self.app, None)
self.data = self.share_network._info.values()
self.columns = self.share_network._info.keys()
def test_share_network_show_missing_args(self):
arglist = []
verifylist = []
self.assertRaises(osc_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
@ddt.data('name', 'id')
def test_share_network_show_by(self, attr):
network_to_show = getattr(self.share_network, attr)
fake_security_service = mock.Mock()
fake_security_service.id = str(uuid.uuid4())
fake_security_service.name = 'security-service-%s' % uuid.uuid4().hex
self.security_services_mock.list = mock.Mock(
return_value=[fake_security_service])
arglist = [
network_to_show,
]
verifylist = [
('share_network', network_to_show),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('osc_lib.utils.find_resource',
return_value=self.share_network) as find_resource:
columns, data = self.cmd.take_action(parsed_args)
find_resource.assert_called_once_with(
self.share_networks_mock, network_to_show)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
@ddt.ddt
class TestShareNetworkList(TestShareNetwork):
def setUp(self):
super(TestShareNetworkList, self).setUp()
self.share_networks = (
manila_fakes.FakeShareNetwork.create_share_networks(
count=2))
self.share_networks_list = oscutils.sort_items(self.share_networks,
'name:asc',
str)
self.share_networks_mock.list.return_value = self.share_networks_list
self.values = (oscutils.get_dict_properties(
s._info, COLUMNS) for s in self.share_networks_list)
self.expected_search_opts = {
'all_tenants': False,
'project_id': None,
'name': None,
'created_since': None,
'created_before': None,
'neutron_net_id': None,
'neutron_subnet_id': None,
'network_type': None,
'segmentation_id': None,
'cidr': None,
'ip_version': None,
'security_service': None,
'name~': None,
'description~': None,
'description': None,
}
self.cmd = osc_share_networks.ListShareNetwork(self.app, None)
@ddt.data(True, False)
def test_list_share_networks_with_search_opts(self, with_search_opts):
if with_search_opts:
arglist = [
'--name', 'foo',
'--ip-version', '4',
'--description~', 'foo-share-network',
]
verifylist = [
('name', 'foo'),
('ip_version', '4'),
('description~', 'foo-share-network'),
]
self.expected_search_opts.update({
'name': 'foo',
'ip_version': '4',
'description~': 'foo-share-network',
})
else:
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.share_networks_mock.list.assert_called_once_with(
search_opts=self.expected_search_opts)
self.assertEqual(COLUMNS, columns)
self.assertEqual(list(self.values), list(data))
def test_list_share_networks_all_projects(self):
all_tenants_list = COLUMNS.copy()
all_tenants_list.append('Project ID')
self.expected_search_opts.update({'all_tenants': True})
list_values = (oscutils.get_dict_properties(
s._info, all_tenants_list) for s in self.share_networks_list)
arglist = [
'--all-projects',
]
verifylist = [
('all_projects', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.share_networks_mock.list.assert_called_once_with(
search_opts=self.expected_search_opts)
self.assertEqual(all_tenants_list, columns)
self.assertEqual(list(list_values), list(data))
def test_list_share_networks_detail(self):
values = (oscutils.get_dict_properties(
s._info, COLUMNS_DETAIL) for s in self.share_networks_list)
arglist = [
'--detail',
]
verifylist = [
('detail', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.share_networks_mock.list.assert_called_once_with(
search_opts=self.expected_search_opts)
self.assertEqual(COLUMNS_DETAIL, columns)
self.assertEqual(list(values), list(data))
def test_list_share_networks_api_version_exception(self):
self.app.client_manager.share.api_version = api_versions.APIVersion(
"2.35")
arglist = [
'--description',
'Description',
]
verifylist = [
('description', 'Description'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
@ddt.ddt
class TestShareNetworkUnset(TestShareNetwork):
def setUp(self):
super(TestShareNetworkUnset, self).setUp()
self.share_network = (
manila_fakes.FakeShareNetwork.create_one_share_network())
self.share_networks_mock.get.return_value = self.share_network
self.cmd = osc_share_networks.UnsetShareNetwork(self.app, None)
def test_unset_share_network_name(self):
arglist = [
self.share_network.id,
'--name',
]
verifylist = [
('share_network', self.share_network.id),
('name', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('osc_lib.utils.find_resource',
return_value=self.share_network):
result = self.cmd.take_action(parsed_args)
self.share_networks_mock.update.assert_called_once_with(
self.share_network, name='')
self.assertIsNone(result)
def test_unset_share_network_description(self):
arglist = [
self.share_network.id,
'--description',
]
verifylist = [
('share_network', self.share_network.id),
('description', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.share_networks_mock.update.assert_called_once_with(
self.share_network, description='')
self.assertIsNone(result)
@ddt.data('name', 'security_service')
def test_unset_share_network_exception_while_updating(self, attr):
arglist = [
self.share_network.id,
'--name',
'--security-service',
'fake-security-service',
]
verifylist = [
('share_network', self.share_network.id),
('name', True),
('security_service', 'fake-security-service'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
if attr == 'name':
self.share_networks_mock.update.side_effect = Exception()
else:
self.share_networks_mock.remove_security_service.side_effect = (
Exception())
with mock.patch('osc_lib.utils.find_resource',
side_effect=[self.share_network,
'fake-security-service']):
self.assertRaises(exceptions.CommandError,
self.cmd.take_action,
parsed_args)
self.share_networks_mock.update.assert_called_once_with(
self.share_network, name='')
if attr == 'security_service':
self.share_networks_mock.remove_security_service\
.assert_called_once_with(self.share_network,
'fake-security-service')
def test_unset_share_network_security_service(self):
arglist = [
self.share_network.id,
'--security-service',
'fake-security-service',
]
verifylist = [
('share_network', self.share_network.id),
('security_service', 'fake-security-service'),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('osc_lib.utils.find_resource',
side_effect=[self.share_network,
'fake-security-service']):
result = self.cmd.take_action(parsed_args)
self.assertIsNone(result)
self.share_networks_mock.remove_security_service\
.assert_called_once_with(self.share_network,
'fake-security-service')
@ddt.ddt
class TestShareNetworkSet(TestShareNetwork):
def setUp(self):
super(TestShareNetworkSet, self).setUp()
self.share_network = (
manila_fakes.FakeShareNetwork.create_one_share_network())
self.share_networks_mock.get.return_value = self.share_network
self.cmd = osc_share_networks.SetShareNetwork(self.app, None)
@ddt.data({'status': 'error',
'current_security_service': str(uuid.uuid4()),
'check_only': True,
'restart_check': True},
{'status': None,
'current_security_service': str(uuid.uuid4()),
'check_only': True,
'restart_check': None},
{'status': None,
'current_security_service': str(uuid.uuid4()),
'check_only': True,
'restart_check': True},
)
@ddt.unpack
def test_set_share_network_api_version_exception(self,
status,
current_security_service,
check_only,
restart_check):
self.app.client_manager.share.api_version = api_versions.APIVersion(
"2.62")
arglist = [self.share_network.id]
verifylist = [('share_network', self.share_network.id)]
if status:
arglist.extend(['--status', status])
verifylist.append(('status', status))
if current_security_service:
arglist.extend(['--current-security-service',
current_security_service])
verifylist.append(('current_security_service',
current_security_service))
if check_only and restart_check:
arglist.extend(['--check-only', '--restart-check'])
verifylist.extend([('check_only', True), ('restart_check', True)])
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(
exceptions.CommandError,
self.cmd.take_action,
parsed_args)
def test_set_network_properties(self):
new_name = 'share-network-name-' + uuid.uuid4().hex
new_description = 'share-network-description-' + uuid.uuid4().hex
new_neutron_subnet_id = str(uuid.uuid4())
arglist = [
self.share_network.id,
'--name', new_name,
'--description', new_description,
'--neutron-subnet-id', new_neutron_subnet_id,
]
verifylist = [
('share_network', self.share_network.id),
('name', new_name),
('description', new_description),
('neutron_subnet_id', new_neutron_subnet_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('osc_lib.utils.find_resource',
return_value=self.share_network):
result = self.cmd.take_action(parsed_args)
self.share_networks_mock.update.assert_called_once_with(
self.share_network,
name=parsed_args.name,
description=new_description,
neutron_subnet_id=new_neutron_subnet_id,
)
self.assertIsNone(result)
def test_set_share_network_status(self):
arglist = [
self.share_network.id,
'--status', 'error'
]
verifylist = [
('share_network', self.share_network.id),
('status', 'error')
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('osc_lib.utils.find_resource',
return_value=self.share_network):
result = self.cmd.take_action(parsed_args)
self.share_networks_mock.reset_state.assert_called_once_with(
self.share_network, parsed_args.status)
self.assertIsNone(result)
def test_set_network_update_exception(self):
share_network_name = 'share-network-name-' + uuid.uuid4().hex
arglist = [
self.share_network.id,
'--name', share_network_name
]
verifylist = [
('share_network', self.share_network.id),
('name', share_network_name)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.share_networks_mock.update.side_effect = Exception()
with mock.patch('osc_lib.utils.find_resource',
return_value=self.share_network):
self.assertRaises(exceptions.CommandError,
self.cmd.take_action,
parsed_args)
self.share_networks_mock.update.assert_called_once_with(
self.share_network, name=parsed_args.name)
def test_set_share_network_status_exception(self):
arglist = [
self.share_network.id,
'--status', 'error'
]
verifylist = [
('share_network', self.share_network.id),
('status', 'error')
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.share_networks_mock.reset_state.side_effect = Exception()
with mock.patch('osc_lib.utils.find_resource',
return_value=self.share_network):
self.assertRaises(exceptions.CommandError,
self.cmd.take_action,
parsed_args)
self.share_networks_mock.reset_state.assert_called_once_with(
self.share_network, parsed_args.status)
@ddt.data({'check_only': False, 'restart_check': False},
{'check_only': True, 'restart_check': True},
{'check_only': True, 'restart_check': False})
@ddt.unpack
def test_set_share_network_add_new_security_service_check_reset(
self, check_only, restart_check):
self.share_networks_mock .add_security_service_check = mock.Mock(
return_value=(200, {'compatible': True}))
arglist = [
self.share_network.id,
'--new-security-service', 'new-security-service-name',
]
verifylist = [
('share_network', self.share_network.id),
('new_security_service', 'new-security-service-name'),
]
if check_only:
arglist.append('--check-only')
verifylist.append(('check_only', True))
if restart_check:
arglist.append('--restart-check')
verifylist.append(('restart_check', True))
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('osc_lib.utils.find_resource',
side_effect=[self.share_network,
'new-security-service']):
result = self.cmd.take_action(parsed_args)
if check_only:
self.share_networks_mock.add_security_service_check\
.assert_called_once_with(self.share_network,
'new-security-service',
reset_operation=restart_check)
self.share_networks_mock.add_security_service.assert_not_called()
else:
self.share_networks_mock.add_security_service_check\
.assert_not_called()
self.share_networks_mock.add_security_service\
.assert_called_once_with(self.share_network,
'new-security-service')
self.assertIsNone(result)
@ddt.data({'check_only': False, 'restart_check': False},
{'check_only': True, 'restart_check': True},
{'check_only': True, 'restart_check': False})
@ddt.unpack
def test_set_share_network_update_security_service_check_reset(
self, check_only, restart_check):
self.share_networks_mock\
.update_share_network_security_service_check = mock.Mock(
return_value=(200, {'compatible': True}))
arglist = [
self.share_network.id,
'--new-security-service', 'new-security-service-name',
'--current-security-service', 'current-security-service-name'
]
verifylist = [
('share_network', self.share_network.id),
('new_security_service', 'new-security-service-name'),
('current_security_service', 'current-security-service-name'),
]
if check_only:
arglist.append('--check-only')
verifylist.append(('check_only', True))
if restart_check:
arglist.append('--restart-check')
verifylist.append(('restart_check', True))
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
with mock.patch('osc_lib.utils.find_resource',
side_effect=[self.share_network,
'new-security-service',
'current-security-service']):
result = self.cmd.take_action(parsed_args)
if check_only:
self.share_networks_mock\
.update_share_network_security_service_check\
.assert_called_once_with(self.share_network,
'current-security-service',
'new-security-service',
reset_operation=restart_check)
self.share_networks_mock.update_share_network_security_service\
.assert_not_called()
else:
self.share_networks_mock\
.update_share_network_security_service_check\
.assert_not_called()
self.share_networks_mock.update_share_network_security_service\
.assert_called_once_with(self.share_network,
'current-security-service',
'new-security-service')
self.assertIsNone(result)

View File

@ -156,7 +156,7 @@ class TestShareSnapshotInstanceShow(TestShareSnapshotInstance):
.ShowShareSnapshotInstance(self.app, None))
self.share_snapshot_instance._info['export_locations'] = (
cliutils.transform_export_locations_to_string_view(
cliutils.convert_dict_list_to_string(
self.share_snapshot_instances_el_list))
self.data = self.share_snapshot_instance._info.values()

View File

@ -283,8 +283,8 @@ class TestShareSnapshotShow(TestShareSnapshot):
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
cliutils.transform_export_locations_to_string_view = mock.Mock()
cliutils.transform_export_locations_to_string_view.return_value = (
cliutils.convert_dict_list_to_string = mock.Mock()
cliutils.convert_dict_list_to_string.return_value = (
self.export_location)
columns, data = self.cmd.take_action(parsed_args)

View File

@ -175,8 +175,12 @@ def _print_share(cs, share): # noqa
# +-------------------+--------------------------------------------+
if info.get('export_locations'):
info['export_locations'] = (
cliutils.transform_export_locations_to_string_view(
info['export_locations']))
cliutils.convert_dict_list_to_string(
info['export_locations'],
ignored_keys=['replica_state',
'availability_zone',
'share_replica_id'])
)
# No need to print both volume_type and share_type to CLI
if 'volume_type' in info and 'share_type' in info:
@ -229,8 +233,13 @@ def _print_share_instance(cs, instance): # noqa
info.pop('links', None)
if info.get('export_locations'):
info['export_locations'] = (
cliutils.transform_export_locations_to_string_view(
info['export_locations']))
cliutils.convert_dict_list_to_string(
info['export_locations'],
ignored_keys=['replica_state',
'availability_zone',
'share_replica_id'])
)
cliutils.print_dict(info)
@ -252,8 +261,10 @@ def _print_share_replica(cs, replica): # noqa
info.pop('links', None)
if info.get('export_locations'):
info['export_locations'] = (
cliutils.transform_export_locations_to_string_view(
info['export_locations']))
cliutils.convert_dict_list_to_string(
info['export_locations'],
ignored_keys=['replica_state', 'availability_zone',
'share_replica_id']))
cliutils.print_dict(info)
@ -303,7 +314,7 @@ def _print_share_snapshot(cs, snapshot):
if info.get('export_locations'):
info['export_locations'] = (
cliutils.transform_export_locations_to_string_view(
cliutils.convert_dict_list_to_string(
info['export_locations']))
cliutils.print_dict(info)

View File

@ -105,6 +105,12 @@ openstack.share.v2 =
share_instance_set = manilaclient.osc.v2.share_instances:ShareInstanceSet
share_instance_show = manilaclient.osc.v2.share_instances:ShareInstanceShow
share_limits_show = manilaclient.osc.v2.share_limits:ShareLimitsShow
share_network_list = manilaclient.osc.v2.share_networks:ListShareNetwork
share_network_show = manilaclient.osc.v2.share_networks:ShowShareNetwork
share_network_create = manilaclient.osc.v2.share_networks:CreateShareNetwork
share_network_delete = manilaclient.osc.v2.share_networks:DeleteShareNetwork
share_network_set = manilaclient.osc.v2.share_networks:SetShareNetwork
share_network_unset = manilaclient.osc.v2.share_networks:UnsetShareNetwork
[coverage:run]
omit = manilaclient/tests/*