Files
python-manilaclient/manilaclient/osc/v2/share_networks.py
denver-baraka 53ca6e9f87 Fix neutron validation in share network creation
The 'share network create' command previously
attempted to fetch the subnet directly, which
could lead to errors when the neuron network data
was invalid or missing.

This change updates the logic to properly handle
invalid neutron network or subnet IDs during share
network creation.

Test Plan:
- Added unit tests to verify valid and invalid
  neutron info.

Closes-Bug: #2051394
Change-Id: Ib233f02ad94326d5b8ffadf962fb911d417b024a
Signed-off-by: denver-baraka <denverbaraka@gmail.com>
Assisted-By: Copilot
2025-10-29 11:25:22 +03:00

719 lines
28 KiB
Python

# Copyright 2021 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from openstackclient.identity import common as identity_common
from osc_lib.cli import format_columns
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils as oscutils
from manilaclient import api_versions
from manilaclient.common._i18n import _
from manilaclient.common import cliutils
LOG = logging.getLogger(__name__)
class ListShareNetwork(command.Lister):
_description = _("List share networks")
def get_parser(self, prog_name):
parser = super(ListShareNetwork, self).get_parser(prog_name)
parser.add_argument(
'--name',
metavar="<share-network>",
help=_('Filter share networks by name')
)
parser.add_argument(
'--name~',
metavar="<share-network-name-pattern>",
help=_('Filter share networks by name-pattern. Available only '
'for microversion >= 2.36.')
)
parser.add_argument(
'--description',
metavar="<share-network-description>",
help=_('Filter share networks by description. Available '
'only for microversion >= 2.36')
)
parser.add_argument(
'--description~',
metavar="<share-network-description-pattern>",
help=_('Filter share networks by description-pattern. Available '
'only for microversion >= 2.36.')
)
parser.add_argument(
'--all-projects',
action='store_true',
default=False,
help=_('Include all projects (admin only)'),
)
parser.add_argument(
'--project',
metavar='<project>',
help=_('Filter share networks by project (name or ID) '
'(admin only)')
)
identity_common.add_project_domain_option_to_parser(parser)
parser.add_argument(
'--created-since',
metavar='<yyyy-mm-dd>',
help=_('Filter share networks by date they were created after. '
'The date can be in the format yyyy-mm-dd.')
)
parser.add_argument(
'--created-before',
metavar='<yyyy-mm-dd>',
help=_('Filter share networks by date they were created before. '
'The date can be in the format yyyy-mm-dd.')
)
parser.add_argument(
'--security-service',
metavar='<security-service>',
help=_('Filter share networks by the name or ID of a security '
'service attached to the network.')
)
parser.add_argument(
'--neutron-net-id',
metavar='<neutron-net-id>',
help=_('Filter share networks by the ID of a neutron network.')
)
parser.add_argument(
'--neutron-subnet-id',
metavar='<neutron-subnet-id>',
help=_('Filter share networks by the ID of a neutron sub network.')
)
parser.add_argument(
'--network-type',
metavar='<network-type>',
help=_('Filter share networks by the type of network. Examples '
'include "flat", "vlan", "vxlan", "geneve", etc.')
)
parser.add_argument(
'--segmentation-id',
metavar='<segmentation-id>',
help=_('Filter share networks by the segmentation ID of network. '
'Relevant only for segmented networks such as "vlan", '
'"vxlan", "geneve", etc.')
)
parser.add_argument(
'--cidr',
metavar='<X.X.X.X/X>',
help=_('Filter share networks by the CIDR of network.')
)
parser.add_argument(
'--ip-version',
metavar='4/6',
choices=['4', '6'],
help=_('Filter share networks by the IP Version of the network, '
'either 4 or 6.')
)
parser.add_argument(
'--detail',
action='store_true',
default=False,
help=_("List share networks with details")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
identity_client = self.app.client_manager.identity
columns = ['ID', 'Name']
if parsed_args.detail:
columns.extend([
'Status',
'Created At',
'Updated At',
'Description',
])
project_id = None
if parsed_args.project:
project_id = identity_common.find_project(
identity_client,
parsed_args.project,
parsed_args.project_domain).id
# set value of 'all_tenants' when using project option
all_tenants = bool(parsed_args.project) or parsed_args.all_projects
if parsed_args.all_projects:
columns.append('Project ID')
search_opts = {
'all_tenants': all_tenants,
'project_id': project_id,
'name': parsed_args.name,
'created_since': parsed_args.created_since,
'created_before': parsed_args.created_before,
'neutron_net_id': parsed_args.neutron_net_id,
'neutron_subnet_id': parsed_args.neutron_subnet_id,
'network_type': parsed_args.network_type,
'segmentation_id': parsed_args.segmentation_id,
'cidr': parsed_args.cidr,
'ip_version': parsed_args.ip_version,
'security_service': parsed_args.security_service,
}
if share_client.api_version >= api_versions.APIVersion("2.36"):
search_opts['name~'] = getattr(parsed_args, 'name~')
search_opts['description~'] = getattr(parsed_args, 'description~')
search_opts['description'] = parsed_args.description
elif (parsed_args.description or getattr(parsed_args, 'name~') or
getattr(parsed_args, 'description~')):
raise exceptions.CommandError(
"Pattern based filtering (name~, description~ and description)"
" is only available with manila API version >= 2.36")
data = share_client.share_networks.list(search_opts=search_opts)
return (
columns,
(oscutils.get_item_properties(s, columns) for s in data)
)
class ShowShareNetwork(command.ShowOne):
"""Display a share network"""
_description = _("Show details about a share network")
def get_parser(self, prog_name):
parser = super(ShowShareNetwork, self).get_parser(prog_name)
parser.add_argument(
"share_network",
metavar="<share-network>",
help=_("Name or ID of the share network to display")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
share_network = oscutils.find_resource(
share_client.share_networks,
parsed_args.share_network)
data = share_network._info
if 'share_network_subnets' not in data:
data['share_network_subnets'] = []
# Special mapping for columns to make the output easier to read:
# 'metadata' --> 'properties'
for ss in data['share_network_subnets']:
ss.update(
{
'properties':
format_columns.DictColumn(ss.pop('metadata', {})),
},
)
# Add security services information
security_services = share_client.security_services.list(
search_opts={'share_network_id': share_network.id}, detailed=False)
data['security_services'] = [
{
'security_service_name': ss.name,
'security_service_id': ss.id,
}
for ss in security_services
]
if parsed_args.formatter == 'table':
data['share_network_subnets'] = (
cliutils.convert_dict_list_to_string(
data['share_network_subnets'])
)
data['security_services'] = cliutils.convert_dict_list_to_string(
data['security_services']
)
data.pop('links', None)
return self.dict2columns(data)
class CreateShareNetwork(command.ShowOne):
"""Create a share network."""
_description = _("Create a share network")
def get_parser(self, prog_name):
parser = super(CreateShareNetwork, self).get_parser(prog_name)
parser.add_argument(
"--name",
metavar="<share-network>",
help=_("Add a name to the share network (Optional)")
)
parser.add_argument(
"--description",
metavar="<description>",
default=None,
help=_("Add a description to the share network (Optional).")
)
parser.add_argument(
"--neutron-net-id",
metavar="<neutron-net-id>",
default=None,
help=_("ID of the neutron network that must be associated with "
"the share network (Optional). The network specified will "
"be associated with the 'default' share network subnet, "
"unless 'availability-zone' is also specified.")
)
parser.add_argument(
"--neutron-subnet-id",
metavar="<neutron-subnet-id>",
default=None,
help=_("ID of the neutron sub-network that must be associated "
"with the share network (Optional). The subnet specified "
"will be associated with the 'default' share network "
"subnet, unless 'availability-zone' is also specified.")
)
parser.add_argument(
"--availability-zone",
metavar="<availability-zone>",
default=None,
help=_("Name or ID of the avalilability zone to assign the "
"specified network subnet parameters to. Must be used "
"in conjunction with 'neutron-net-id' and "
"'neutron-subnet-id'. Do not specify this parameter if "
"the network must be available across all availability "
"zones ('default' share network subnet). Available "
"only for microversion >= 2.51.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
neutron_client = getattr(self.app.client_manager, 'network', None)
neutron_net_id = parsed_args.neutron_net_id
neutron_subnet_id = parsed_args.neutron_subnet_id
if neutron_client and neutron_net_id:
try:
neutron_net_id = neutron_client.find_network(
neutron_net_id,
ignore_missing=False).id
except Exception:
raise exceptions.CommandError(
f"Neutron network '{parsed_args.neutron_net_id}'"
f" not found.")
if neutron_client and neutron_subnet_id:
try:
neutron_subnet_id = neutron_client.find_subnet(
neutron_subnet_id,
ignore_missing=False).id
except Exception:
raise exceptions.CommandError(
f"Neutron subnet '{parsed_args.neutron_subnet_id}'"
f" not found.")
availability_zone = None
if (parsed_args.availability_zone and
share_client.api_version < api_versions.APIVersion("2.51")):
raise exceptions.CommandError(
"Availability zone can be specified only with manila API "
"version >= 2.51")
elif parsed_args.availability_zone:
availability_zone = parsed_args.availability_zone
kwargs = {
"name": parsed_args.name,
"description": parsed_args.description,
"neutron_net_id": neutron_net_id,
"neutron_subnet_id": neutron_subnet_id,
}
if availability_zone:
kwargs['availability_zone'] = availability_zone
share_network = share_client.share_networks.create(**kwargs)
share_network_data = share_network._info
if 'share_network_subnets' not in share_network_data:
share_network_data['share_network_subnets'] = []
share_network_data.pop('links', None)
if parsed_args.formatter == 'table':
share_network_data['share_network_subnets'] = (
cliutils.convert_dict_list_to_string(
share_network_data['share_network_subnets'])
)
return self.dict2columns(share_network_data)
class DeleteShareNetwork(command.Command):
"""Delete one or more share networks"""
_description = _("Delete one or more share networks")
def get_parser(self, prog_name):
parser = super(DeleteShareNetwork, self).get_parser(prog_name)
parser.add_argument(
"share_network",
metavar="<share-network>",
nargs="+",
help=_("Name or ID of the share network(s) to delete")
)
parser.add_argument(
"--wait",
action='store_true',
default=False,
help=_("Wait for the share network(s) to be deleted")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
result = 0
for share_network in parsed_args.share_network:
try:
share_network_obj = oscutils.find_resource(
share_client.share_networks,
share_network)
share_client.share_networks.delete(
share_network_obj)
if parsed_args.wait:
if not oscutils.wait_for_delete(
manager=share_client.share_networks,
res_id=share_network_obj.id):
result += 1
except Exception as e:
result += 1
LOG.error(f"Failed to delete share network with "
f"name or ID {share_network}: {e}")
if result > 0:
total = len(parsed_args.share_network)
msg = f"{result} of {total} share networks failed to be deleted."
raise exceptions.CommandError(msg)
class SetShareNetwork(command.Command):
"""Set share network properties."""
_description = _("Set share network properties")
def get_parser(self, prog_name):
parser = super(SetShareNetwork, self).get_parser(prog_name)
parser.add_argument(
"share_network",
metavar="<share-network>",
help=_('Name or ID of the share network to set a property for')
)
parser.add_argument(
"--name",
metavar="<name>",
default=None,
help=_("Set a new name to the share network.")
)
parser.add_argument(
"--description",
metavar="<description>",
default=None,
help=_("Set a new description to the share network.")
)
parser.add_argument(
"--status",
metavar="<status>",
choices=['active', 'error', 'network_change'],
help=_("Assign a status to the share network (Admin only). "
"Options include : active, error, network_change. "
"Available only for microversion >= 2.63.")
)
parser.add_argument(
'--neutron-net-id',
metavar='<neutron-net-id>',
help=_("Update the neutron network associated with the default "
"share network subnet. If a default share network subnet "
"is not present or if the share network is in use, setting "
"this will fail.")
)
parser.add_argument(
'--neutron-subnet-id',
metavar='<neutron-subnet-id>',
help=_("Update the neutron subnetwork associated with the default "
"share network subnet. If a default share network subnet "
"is not present or if the share network is in use, setting "
"this will fail.")
)
parser.add_argument(
'--current-security-service',
metavar='<security-service>',
help=_("Name or ID of a security service that is currently "
"associated with a share network that must be replaced. "
"Replacing a security service is only available for "
"microversions >= 2.63.")
)
parser.add_argument(
'--new-security-service',
metavar='<security-service>',
help=_("Name or ID of a security service that must be associated "
"with the share network. When replacing a security "
"service, the current security service must also be "
"provided with the '--current-security-service' option. "
"Replacing a security service is only available for "
"microversions >= 2.63.")
)
parser.add_argument(
'--check-only',
action='store_true',
help=_("Run a dry-run of a security service replacement. "
"Available only for microversion >=2.63")
)
parser.add_argument(
'--restart-check',
action='store_true',
help=_("Restart a dry-run of a security service "
"replacement. Helpful when check results are "
"stale. Available only for microversion >=2.63.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
result = 0
share_network = oscutils.find_resource(share_client.share_networks,
parsed_args.share_network)
kwargs = {}
# some args are only for newer API micro-version
if share_client.api_version < api_versions.APIVersion("2.63"):
new_args = ('status', 'check_only', 'restart_check',
'current_security_service')
invalid_args = [
arg for arg in new_args
if getattr(parsed_args, arg) is not None
]
raise exceptions.CommandError(
f"Use of {' '.join(invalid_args)} is only available for API "
f"microversions >= 2.63.")
# If "--check-only" and "--restart-check" are used, a new security
# service option must be supplied
if parsed_args.check_only or parsed_args.restart_check:
if not parsed_args.new_security_service:
raise exceptions.CommandError(
_("Must provide new security service with --check-only "
"and --restart-check."))
if parsed_args.name is not None:
kwargs['name'] = parsed_args.name
if parsed_args.description is not None:
kwargs['description'] = parsed_args.description
if parsed_args.neutron_net_id is not None:
kwargs['neutron_net_id'] = parsed_args.neutron_net_id
if parsed_args.neutron_subnet_id is not None:
kwargs['neutron_subnet_id'] = parsed_args.neutron_subnet_id
if kwargs:
try:
share_client.share_networks.update(
share_network,
**kwargs
)
except Exception as e:
result += 1
LOG.error(f"Failed to set share network properties "
f"{kwargs}: {e}")
if parsed_args.status:
try:
share_client.share_networks.reset_state(
share_network,
parsed_args.status
)
except Exception as e:
result += 1
LOG.error(f"Failed to update share network status to "
f"{parsed_args.status}: {e}")
new_security_service = current_security_service = None
if parsed_args.new_security_service:
new_security_service = oscutils.find_resource(
share_client.security_services,
parsed_args.new_security_service)
if parsed_args.current_security_service:
current_security_service = oscutils.find_resource(
share_client.security_services,
parsed_args.current_security_service)
if new_security_service and not current_security_service:
try:
if parsed_args.check_only:
check_result = share_client.share_networks\
.add_security_service_check(
share_network, new_security_service,
reset_operation=parsed_args.restart_check)
is_compatible = check_result[1].get('compatible')
# NOTE(gouthamr): We're logging to the console here,
# because there's no need to print useful
# information beyond the result. Use of error
# logging is a hack, since other kinds of logs may
# not print to console by default.
if is_compatible is None:
LOG.error("Security service check has been "
"successfully initiated, please retry "
"after some time.")
elif is_compatible:
LOG.error(
f"Security service "
f"{parsed_args.new_security_service} can "
f"be added to share network "
f"{parsed_args.share_network}.")
else:
LOG.error(
f"Security service "
f"{parsed_args.new_security_service} cannot "
f"be added to share network "
f"{parsed_args.share_network}.")
else:
share_client.share_networks.add_security_service(
share_network, new_security_service)
except Exception as e:
result += 1
LOG.error(f"Failed to add security service "
f"{parsed_args.new_security_service} to "
f"share network {parsed_args.share_network}: {e}")
if new_security_service and current_security_service:
try:
if parsed_args.check_only:
check_result = share_client.share_networks.\
update_share_network_security_service_check(
share_network,
current_security_service,
new_security_service,
reset_operation=parsed_args.restart_check)
is_compatible = check_result[1].get('compatible')
# NOTE(gouthamr): We're logging to the console here,
# because there's no need to print useful
# information beyond the result. Use of error
# logging is a hack, since other kinds of logs may
# not print to console by default.
if is_compatible is None:
LOG.error("Security service check has been "
"successfully initiated, please retry "
"after some time.")
elif is_compatible:
LOG.error(
f"Security service "
f"{parsed_args.current_security_service} can "
f"be replaced with security service "
f"{parsed_args.new_security_service} on share "
f"network {parsed_args.share_network}.")
else:
LOG.error(
f"Security service "
f"{parsed_args.current_security_service} cannot "
f"be replaced with security service "
f"{parsed_args.new_security_service} on share "
f"network {parsed_args.share_network}.")
else:
share_client.share_networks.\
update_share_network_security_service(
share_network,
current_security_service,
new_security_service)
except Exception as e:
result += 1
LOG.error(f"Failed to update security service "
f"{parsed_args.current_security_service} on "
f"share network "
f"{parsed_args.share_network}: {e}")
if result > 0:
raise exceptions.CommandError(_("One or more of the "
"set operations failed"))
class UnsetShareNetwork(command.Command):
"""Unset a share network property."""
_description = _("Unset a share network property")
def get_parser(self, prog_name):
parser = super(UnsetShareNetwork, self).get_parser(prog_name)
parser.add_argument(
"share_network",
metavar="<share-network>",
help=_("Name or ID of the share network to unset a property of")
)
parser.add_argument(
"--name",
action='store_true',
help=_("Unset share network name."),
)
parser.add_argument(
"--description",
action='store_true',
help=_("Unset share network description."),
)
parser.add_argument(
"--security-service",
metavar="<security-service>",
help=_("Disassociate a security service from the share network. "
"This is only possible with unused share networks."),
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
share_network = oscutils.find_resource(
share_client.share_networks,
parsed_args.share_network)
security_service = None
if parsed_args.security_service:
security_service = oscutils.find_resource(
share_client.security_services,
parsed_args.security_service)
result = 0
kwargs = {}
if parsed_args.name:
# the SDK unsets name if it is an empty string
kwargs['name'] = ''
if parsed_args.description:
# the SDK unsets description if it is an empty string
kwargs['description'] = ''
if kwargs:
try:
share_client.share_networks.update(
share_network,
**kwargs
)
except Exception as e:
result += 1
LOG.error(f"Failed to unset share network properties "
f"{kwargs}: {e}")
if security_service:
try:
share_client.share_networks.remove_security_service(
share_network, security_service)
except Exception as e:
result += 1
LOG.error(f"Failed to dissociate security service"
f"{parsed_args.security_service} from "
f"{parsed_args.share_network}: {e}")
if result > 0:
raise exceptions.CommandError(_("One or more of the "
"unset operations failed"))