Add support for default security group rule CRUDs

Change-Id: I1c18c2ec5eb4923e1ab8b3fc6199ef6f329b4a4d
This commit is contained in:
Slawek Kaplonski 2023-06-30 12:04:02 +02:00
parent 0fb1cae1a8
commit 7b99b57165
11 changed files with 1749 additions and 120 deletions

View File

@ -0,0 +1,11 @@
===========================
default security group rule
===========================
A **default security group rule** specifies the template of the security group
rules which will be used by neutron to create rules in every new security group.
Network v2
.. autoprogram-cliff:: openstack.network.v2
:command: default security group rule *

View File

@ -81,3 +81,103 @@ def str2dict(strdict):
key, sep, value = kv.partition(':') key, sep, value = kv.partition(':')
result[key] = value result[key] = value
return result return result
def format_security_group_rule_show(obj):
data = transform_compute_security_group_rule(obj)
return zip(*sorted(data.items()))
def format_network_port_range(rule):
# Display port range or ICMP type and code. For example:
# - ICMP type: 'type=3'
# - ICMP type and code: 'type=3:code=0'
# - ICMP code: Not supported
# - Matching port range: '443:443'
# - Different port range: '22:24'
# - Single port: '80:80'
# - No port range: ''
port_range = ''
if is_icmp_protocol(rule['protocol']):
if rule['port_range_min']:
port_range += 'type=' + str(rule['port_range_min'])
if rule['port_range_max']:
port_range += ':code=' + str(rule['port_range_max'])
elif rule['port_range_min'] or rule['port_range_max']:
port_range_min = str(rule['port_range_min'])
port_range_max = str(rule['port_range_max'])
if rule['port_range_min'] is None:
port_range_min = port_range_max
if rule['port_range_max'] is None:
port_range_max = port_range_min
port_range = port_range_min + ':' + port_range_max
return port_range
def format_remote_ip_prefix(rule):
remote_ip_prefix = rule['remote_ip_prefix']
if remote_ip_prefix is None:
ethertype = rule['ether_type']
if ethertype == 'IPv4':
remote_ip_prefix = '0.0.0.0/0'
elif ethertype == 'IPv6':
remote_ip_prefix = '::/0'
return remote_ip_prefix
def convert_ipvx_case(string):
if string.lower() == 'ipv4':
return 'IPv4'
if string.lower() == 'ipv6':
return 'IPv6'
return string
def is_icmp_protocol(protocol):
# NOTE(rtheis): Neutron has deprecated protocol icmpv6.
# However, while the OSC CLI doesn't document the protocol,
# the code must still handle it. In addition, handle both
# protocol names and numbers.
if protocol in ['icmp', 'icmpv6', 'ipv6-icmp', '1', '58']:
return True
else:
return False
def convert_to_lowercase(string):
return string.lower()
def get_protocol(parsed_args, default_protocol='any'):
protocol = default_protocol
if parsed_args.protocol is not None:
protocol = parsed_args.protocol
if hasattr(parsed_args, "proto") and parsed_args.proto is not None:
protocol = parsed_args.proto
if protocol == 'any':
protocol = None
return protocol
def get_ethertype(parsed_args, protocol):
ethertype = 'IPv4'
if parsed_args.ethertype is not None:
ethertype = parsed_args.ethertype
elif is_ipv6_protocol(protocol):
ethertype = 'IPv6'
return ethertype
def is_ipv6_protocol(protocol):
# NOTE(rtheis): Neutron has deprecated protocol icmpv6.
# However, while the OSC CLI doesn't document the protocol,
# the code must still handle it. In addition, handle both
# protocol names and numbers.
if (
protocol is not None
and protocol.startswith('ipv6-')
or protocol in ['icmpv6', '41', '43', '44', '58', '59', '60']
):
return True
else:
return False

View File

@ -0,0 +1,399 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Default Security Group Rule action implementations"""
import logging
from osc_lib.cli import parseractions
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils
from openstackclient.i18n import _
from openstackclient.network import common
from openstackclient.network import utils as network_utils
LOG = logging.getLogger(__name__)
def _get_columns(item):
column_map = {}
hidden_columns = ['location', 'name', 'revision_number']
return utils.get_osc_show_columns_for_sdk_resource(
item, column_map, hidden_columns
)
class CreateDefaultSecurityGroupRule(
command.ShowOne, common.NeutronCommandWithExtraArgs
):
_description = _("Create a new default security group rule")
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'--description',
metavar='<description>',
help=_("Set default security group rule description"),
)
parser.add_argument(
'--icmp-type',
metavar='<icmp-type>',
type=int,
help=_("ICMP type for ICMP IP protocols"),
)
parser.add_argument(
'--icmp-code',
metavar='<icmp-code>',
type=int,
help=_("ICMP code for ICMP IP protocols"),
)
direction_group = parser.add_mutually_exclusive_group()
direction_group.add_argument(
'--ingress',
action='store_true',
help=_("Rule will apply to incoming network traffic (default)"),
)
direction_group.add_argument(
'--egress',
action='store_true',
help=_("Rule will apply to outgoing network traffic"),
)
parser.add_argument(
'--ethertype',
metavar='<ethertype>',
choices=['IPv4', 'IPv6'],
type=network_utils.convert_ipvx_case,
help=_(
"Ethertype of network traffic "
"(IPv4, IPv6; default: based on IP protocol)"
),
)
remote_group = parser.add_mutually_exclusive_group()
remote_group.add_argument(
"--remote-ip",
metavar="<ip-address>",
help=_(
"Remote IP address block (may use CIDR notation; "
"default for IPv4 rule: 0.0.0.0/0, "
"default for IPv6 rule: ::/0)"
),
)
remote_group.add_argument(
"--remote-group",
metavar="<group>",
help=_("Remote security group (ID)"),
)
remote_group.add_argument(
"--remote-address-group",
metavar="<group>",
help=_("Remote address group (ID)"),
)
parser.add_argument(
'--dst-port',
metavar='<port-range>',
action=parseractions.RangeAction,
help=_(
"Destination port, may be a single port or a starting and "
"ending port range: 137:139. Required for IP protocols TCP "
"and UDP. Ignored for ICMP IP protocols."
),
)
parser.add_argument(
'--protocol',
metavar='<protocol>',
type=network_utils.convert_to_lowercase,
help=_(
"IP protocol (ah, dccp, egp, esp, gre, icmp, igmp, "
"ipv66-encap, ipv6-frag, ipv6-icmp, ipv6-nonxt, ipv6-opts, "
"ipv6-route, ospf, pgm, rsvp, sctp, tcp, udp, udplite, vrrp "
"and integer representations [0-255] or any; "
"default: any (all protocols))"
),
)
parser.add_argument(
'--for-default-sg',
action='store_true',
default=False,
help=_(
"Set this default security group rule to be used in all "
"default security groups created automatically for each "
"project"
),
)
parser.add_argument(
'--for-custom-sg',
action='store_true',
default=True,
help=_(
"Set this default security group rule to be used in all "
"custom security groups created manually by users"
),
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.sdk_connection.network
# Build the create attributes.
attrs = {}
attrs['protocol'] = network_utils.get_protocol(parsed_args)
if parsed_args.description is not None:
attrs['description'] = parsed_args.description
# NOTE: A direction must be specified and ingress
# is the default.
if parsed_args.ingress or not parsed_args.egress:
attrs['direction'] = 'ingress'
if parsed_args.egress:
attrs['direction'] = 'egress'
# NOTE(rtheis): Use ethertype specified else default based
# on IP protocol.
attrs['ethertype'] = network_utils.get_ethertype(
parsed_args, attrs['protocol']
)
# NOTE(rtheis): Validate the port range and ICMP type and code.
# It would be ideal if argparse could do this.
if parsed_args.dst_port and (
parsed_args.icmp_type or parsed_args.icmp_code
):
msg = _(
'Argument --dst-port not allowed with arguments '
'--icmp-type and --icmp-code'
)
raise exceptions.CommandError(msg)
if parsed_args.icmp_type is None and parsed_args.icmp_code is not None:
msg = _('Argument --icmp-type required with argument --icmp-code')
raise exceptions.CommandError(msg)
is_icmp_protocol = network_utils.is_icmp_protocol(attrs['protocol'])
if not is_icmp_protocol and (
parsed_args.icmp_type or parsed_args.icmp_code
):
msg = _(
'ICMP IP protocol required with arguments '
'--icmp-type and --icmp-code'
)
raise exceptions.CommandError(msg)
# NOTE(rtheis): For backwards compatibility, continue ignoring
# the destination port range when an ICMP IP protocol is specified.
if parsed_args.dst_port and not is_icmp_protocol:
attrs['port_range_min'] = parsed_args.dst_port[0]
attrs['port_range_max'] = parsed_args.dst_port[1]
if parsed_args.icmp_type is not None and parsed_args.icmp_type >= 0:
attrs['port_range_min'] = parsed_args.icmp_type
if parsed_args.icmp_code is not None and parsed_args.icmp_code >= 0:
attrs['port_range_max'] = parsed_args.icmp_code
if parsed_args.remote_group is not None:
attrs['remote_group_id'] = parsed_args.remote_group
elif parsed_args.remote_address_group is not None:
attrs['remote_address_group_id'] = parsed_args.remote_address_group
elif parsed_args.remote_ip is not None:
attrs['remote_ip_prefix'] = parsed_args.remote_ip
elif attrs['ethertype'] == 'IPv4':
attrs['remote_ip_prefix'] = '0.0.0.0/0'
elif attrs['ethertype'] == 'IPv6':
attrs['remote_ip_prefix'] = '::/0'
attrs['used_in_default_sg'] = parsed_args.for_default_sg
attrs['used_in_non_default_sg'] = parsed_args.for_custom_sg
attrs.update(
self._parse_extra_properties(parsed_args.extra_properties)
)
# Create and show the security group rule.
obj = client.create_default_security_group_rule(**attrs)
display_columns, columns = _get_columns(obj)
data = utils.get_item_properties(obj, columns)
return (display_columns, data)
class DeleteDefaultSecurityGroupRule(command.Command):
_description = _("Delete default security group rule(s)")
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'rule',
metavar='<rule>',
nargs="+",
help=_("Default security group rule(s) to delete (ID only)"),
)
return parser
def take_action(self, parsed_args):
result = 0
client = self.app.client_manager.sdk_connection.network
for r in parsed_args.rule:
try:
obj = client.find_default_security_group_rule(
r, ignore_missing=False
)
client.delete_default_security_group_rule(obj)
except Exception as e:
result += 1
LOG.error(
_(
"Failed to delete default SG rule with "
"ID '%(rule_id)s': %(e)s"
),
{'rule_id': r, 'e': e},
)
if result > 0:
total = len(parsed_args.rule)
msg = _(
"%(result)s of %(total)s default rules failed to delete."
) % {'result': result, 'total': total}
raise exceptions.CommandError(msg)
class ListDefaultSecurityGroupRule(command.Lister):
_description = _("List default security group rules")
def _format_network_security_group_rule(self, rule):
"""Transform the SDK DefaultSecurityGroupRule object to a dict
The SDK object gets in the way of reformatting columns...
Create port_range column from port_range_min and port_range_max
"""
rule = rule.to_dict()
rule['port_range'] = network_utils.format_network_port_range(rule)
rule['remote_ip_prefix'] = network_utils.format_remote_ip_prefix(rule)
return rule
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'--protocol',
metavar='<protocol>',
type=network_utils.convert_to_lowercase,
help=_(
"List rules by the IP protocol (ah, dhcp, egp, esp, gre, "
"icmp, igmp, ipv6-encap, ipv6-frag, ipv6-icmp, "
"ipv6-nonxt, ipv6-opts, ipv6-route, ospf, pgm, rsvp, "
"sctp, tcp, udp, udplite, vrrp and integer "
"representations [0-255] or any; "
"default: any (all protocols))"
),
)
parser.add_argument(
'--ethertype',
metavar='<ethertype>',
type=network_utils.convert_to_lowercase,
help=_("List default rules by the Ethertype (IPv4 or IPv6)"),
)
direction_group = parser.add_mutually_exclusive_group()
direction_group.add_argument(
'--ingress',
action='store_true',
help=_(
"List default rules which will be applied to incoming "
"network traffic"
),
)
direction_group.add_argument(
'--egress',
action='store_true',
help=_(
"List default rules which will be applied to outgoing "
"network traffic"
),
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.sdk_connection.network
column_headers = (
'ID',
'IP Protocol',
'Ethertype',
'IP Range',
'Port Range',
'Direction',
'Remote Security Group',
'Remote Address Group',
'Used in default Security Group',
'Used in custom Security Group',
)
columns = (
'id',
'protocol',
'ether_type',
'remote_ip_prefix',
'port_range',
'direction',
'remote_group_id',
'remote_address_group_id',
'used_in_default_sg',
'used_in_non_default_sg',
)
# Get the security group rules using the requested query.
query = {}
if parsed_args.ingress:
query['direction'] = 'ingress'
if parsed_args.egress:
query['direction'] = 'egress'
if parsed_args.protocol is not None:
query['protocol'] = parsed_args.protocol
rules = [
self._format_network_security_group_rule(r)
for r in client.default_security_group_rules(**query)
]
return (
column_headers,
(
utils.get_dict_properties(
s,
columns,
)
for s in rules
),
)
class ShowDefaultSecurityGroupRule(command.ShowOne):
_description = _("Display default security group rule details")
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'rule',
metavar="<rule>",
help=_("Default security group rule to display (ID only)"),
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.sdk_connection.network
obj = client.find_default_security_group_rule(
parsed_args.rule, ignore_missing=False
)
# necessary for old rules that have None in this field
if not obj['remote_ip_prefix']:
obj['remote_ip_prefix'] = network_utils.format_remote_ip_prefix(
obj
)
display_columns, columns = _get_columns(obj)
data = utils.get_item_properties(obj, columns)
return (display_columns, data)

View File

@ -28,48 +28,6 @@ from openstackclient.network import utils as network_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def _format_security_group_rule_show(obj):
data = network_utils.transform_compute_security_group_rule(obj)
return zip(*sorted(data.items()))
def _format_network_port_range(rule):
# Display port range or ICMP type and code. For example:
# - ICMP type: 'type=3'
# - ICMP type and code: 'type=3:code=0'
# - ICMP code: Not supported
# - Matching port range: '443:443'
# - Different port range: '22:24'
# - Single port: '80:80'
# - No port range: ''
port_range = ''
if _is_icmp_protocol(rule['protocol']):
if rule['port_range_min']:
port_range += 'type=' + str(rule['port_range_min'])
if rule['port_range_max']:
port_range += ':code=' + str(rule['port_range_max'])
elif rule['port_range_min'] or rule['port_range_max']:
port_range_min = str(rule['port_range_min'])
port_range_max = str(rule['port_range_max'])
if rule['port_range_min'] is None:
port_range_min = port_range_max
if rule['port_range_max'] is None:
port_range_max = port_range_min
port_range = port_range_min + ':' + port_range_max
return port_range
def _format_remote_ip_prefix(rule):
remote_ip_prefix = rule['remote_ip_prefix']
if remote_ip_prefix is None:
ethertype = rule['ether_type']
if ethertype == 'IPv4':
remote_ip_prefix = '0.0.0.0/0'
elif ethertype == 'IPv6':
remote_ip_prefix = '::/0'
return remote_ip_prefix
def _get_columns(item): def _get_columns(item):
column_map = {} column_map = {}
hidden_columns = ['location', 'tenant_id'] hidden_columns = ['location', 'tenant_id']
@ -78,29 +36,6 @@ def _get_columns(item):
) )
def _convert_to_lowercase(string):
return string.lower()
def _convert_ipvx_case(string):
if string.lower() == 'ipv4':
return 'IPv4'
if string.lower() == 'ipv6':
return 'IPv6'
return string
def _is_icmp_protocol(protocol):
# NOTE(rtheis): Neutron has deprecated protocol icmpv6.
# However, while the OSC CLI doesn't document the protocol,
# the code must still handle it. In addition, handle both
# protocol names and numbers.
if protocol in ['icmp', 'icmpv6', 'ipv6-icmp', '1', '58']:
return True
else:
return False
# TODO(abhiraut): Use the SDK resource mapped attribute names once the # TODO(abhiraut): Use the SDK resource mapped attribute names once the
# OSC minimum requirements include SDK 1.0. # OSC minimum requirements include SDK 1.0.
class CreateSecurityGroupRule( class CreateSecurityGroupRule(
@ -188,7 +123,7 @@ class CreateSecurityGroupRule(
protocol_group.add_argument( protocol_group.add_argument(
'--protocol', '--protocol',
metavar='<protocol>', metavar='<protocol>',
type=_convert_to_lowercase, type=network_utils.convert_to_lowercase,
help=protocol_help, help=protocol_help,
**proto_choices **proto_choices
) )
@ -196,7 +131,7 @@ class CreateSecurityGroupRule(
protocol_group.add_argument( protocol_group.add_argument(
'--proto', '--proto',
metavar='<proto>', metavar='<proto>',
type=_convert_to_lowercase, type=network_utils.convert_to_lowercase,
help=argparse.SUPPRESS, help=argparse.SUPPRESS,
**proto_choices **proto_choices
) )
@ -246,7 +181,7 @@ class CreateSecurityGroupRule(
'--ethertype', '--ethertype',
metavar='<ethertype>', metavar='<ethertype>',
choices=['IPv4', 'IPv6'], choices=['IPv4', 'IPv6'],
type=_convert_ipvx_case, type=network_utils.convert_ipvx_case,
help=self.enhance_help_neutron( help=self.enhance_help_neutron(
_( _(
"Ethertype of network traffic " "Ethertype of network traffic "
@ -264,38 +199,6 @@ class CreateSecurityGroupRule(
) )
return parser return parser
def _get_protocol(self, parsed_args, default_protocol='any'):
protocol = default_protocol
if parsed_args.protocol is not None:
protocol = parsed_args.protocol
if parsed_args.proto is not None:
protocol = parsed_args.proto
if protocol == 'any':
protocol = None
return protocol
def _get_ethertype(self, parsed_args, protocol):
ethertype = 'IPv4'
if parsed_args.ethertype is not None:
ethertype = parsed_args.ethertype
elif self._is_ipv6_protocol(protocol):
ethertype = 'IPv6'
return ethertype
def _is_ipv6_protocol(self, protocol):
# NOTE(rtheis): Neutron has deprecated protocol icmpv6.
# However, while the OSC CLI doesn't document the protocol,
# the code must still handle it. In addition, handle both
# protocol names and numbers.
if (
protocol is not None
and protocol.startswith('ipv6-')
or protocol in ['icmpv6', '41', '43', '44', '58', '59', '60']
):
return True
else:
return False
def take_action_network(self, client, parsed_args): def take_action_network(self, client, parsed_args):
# Get the security group ID to hold the rule. # Get the security group ID to hold the rule.
security_group_id = client.find_security_group( security_group_id = client.find_security_group(
@ -304,7 +207,7 @@ class CreateSecurityGroupRule(
# Build the create attributes. # Build the create attributes.
attrs = {} attrs = {}
attrs['protocol'] = self._get_protocol(parsed_args) attrs['protocol'] = network_utils.get_protocol(parsed_args)
if parsed_args.description is not None: if parsed_args.description is not None:
attrs['description'] = parsed_args.description attrs['description'] = parsed_args.description
@ -318,7 +221,7 @@ class CreateSecurityGroupRule(
# NOTE(rtheis): Use ethertype specified else default based # NOTE(rtheis): Use ethertype specified else default based
# on IP protocol. # on IP protocol.
attrs['ethertype'] = self._get_ethertype( attrs['ethertype'] = network_utils.get_ethertype(
parsed_args, attrs['protocol'] parsed_args, attrs['protocol']
) )
@ -335,7 +238,7 @@ class CreateSecurityGroupRule(
if parsed_args.icmp_type is None and parsed_args.icmp_code is not None: if parsed_args.icmp_type is None and parsed_args.icmp_code is not None:
msg = _('Argument --icmp-type required with argument --icmp-code') msg = _('Argument --icmp-type required with argument --icmp-code')
raise exceptions.CommandError(msg) raise exceptions.CommandError(msg)
is_icmp_protocol = _is_icmp_protocol(attrs['protocol']) is_icmp_protocol = network_utils.is_icmp_protocol(attrs['protocol'])
if not is_icmp_protocol and ( if not is_icmp_protocol and (
parsed_args.icmp_type or parsed_args.icmp_code parsed_args.icmp_type or parsed_args.icmp_code
): ):
@ -390,7 +293,9 @@ class CreateSecurityGroupRule(
def take_action_compute(self, client, parsed_args): def take_action_compute(self, client, parsed_args):
group = client.api.security_group_find(parsed_args.group) group = client.api.security_group_find(parsed_args.group)
protocol = self._get_protocol(parsed_args, default_protocol='tcp') protocol = network_utils.get_protocol(
parsed_args, default_protocol='tcp'
)
if protocol == 'icmp': if protocol == 'icmp':
from_port, to_port = -1, -1 from_port, to_port = -1, -1
else: else:
@ -414,7 +319,7 @@ class CreateSecurityGroupRule(
remote_ip=remote_ip, remote_ip=remote_ip,
remote_group=parsed_args.remote_group, remote_group=parsed_args.remote_group,
) )
return _format_security_group_rule_show(obj) return network_utils.format_security_group_rule_show(obj)
class DeleteSecurityGroupRule(common.NetworkAndComputeDelete): class DeleteSecurityGroupRule(common.NetworkAndComputeDelete):
@ -451,8 +356,8 @@ class ListSecurityGroupRule(common.NetworkAndComputeLister):
Create port_range column from port_range_min and port_range_max Create port_range column from port_range_min and port_range_max
""" """
rule = rule.to_dict() rule = rule.to_dict()
rule['port_range'] = _format_network_port_range(rule) rule['port_range'] = network_utils.format_network_port_range(rule)
rule['remote_ip_prefix'] = _format_remote_ip_prefix(rule) rule['remote_ip_prefix'] = network_utils.format_remote_ip_prefix(rule)
return rule return rule
def update_parser_common(self, parser): def update_parser_common(self, parser):
@ -478,7 +383,7 @@ class ListSecurityGroupRule(common.NetworkAndComputeLister):
parser.add_argument( parser.add_argument(
'--protocol', '--protocol',
metavar='<protocol>', metavar='<protocol>',
type=_convert_to_lowercase, type=network_utils.convert_to_lowercase,
help=self.enhance_help_neutron( help=self.enhance_help_neutron(
_( _(
"List rules by the IP protocol (ah, dhcp, egp, esp, gre, " "List rules by the IP protocol (ah, dhcp, egp, esp, gre, "
@ -493,7 +398,7 @@ class ListSecurityGroupRule(common.NetworkAndComputeLister):
parser.add_argument( parser.add_argument(
'--ethertype', '--ethertype',
metavar='<ethertype>', metavar='<ethertype>',
type=_convert_to_lowercase, type=network_utils.convert_to_lowercase,
help=self.enhance_help_neutron( help=self.enhance_help_neutron(
_("List rules by the Ethertype (IPv4 or IPv6)") _("List rules by the Ethertype (IPv4 or IPv6)")
), ),
@ -677,7 +582,9 @@ class ShowSecurityGroupRule(common.NetworkAndComputeShowOne):
) )
# necessary for old rules that have None in this field # necessary for old rules that have None in this field
if not obj['remote_ip_prefix']: if not obj['remote_ip_prefix']:
obj['remote_ip_prefix'] = _format_remote_ip_prefix(obj) obj['remote_ip_prefix'] = network_utils.format_remote_ip_prefix(
obj
)
display_columns, columns = _get_columns(obj) display_columns, columns = _get_columns(obj)
data = utils.get_item_properties(obj, columns) data = utils.get_item_properties(obj, columns)
return (display_columns, data) return (display_columns, data)
@ -704,4 +611,4 @@ class ShowSecurityGroupRule(common.NetworkAndComputeShowOne):
raise exceptions.CommandError(msg) raise exceptions.CommandError(msg)
# NOTE(rtheis): Format security group rule # NOTE(rtheis): Format security group rule
return _format_security_group_rule_show(obj) return network_utils.format_security_group_rule_show(obj)

View File

@ -0,0 +1,69 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
from openstackclient.tests.functional.network.v2 import common
class SecurityGroupRuleTests(common.NetworkTests):
"""Functional tests for security group rule"""
def setUp(self):
super(SecurityGroupRuleTests, self).setUp()
# Nothing in this class works with Nova Network
if not self.haz_network:
self.skipTest("No Network service present")
if not self.is_extension_enabled("security-groups-default-rules"):
self.skipTest("No security-groups-default-rules extension present")
self.port = random.randint(1, 65535)
self.protocol = random.choice(["tcp", "udp"])
self.direction = random.choice(["ingress", "egress"])
# Create the default security group rule.
cmd_output = self.openstack(
'default security group rule create '
'--protocol %(protocol)s '
'--dst-port %(port)s:%(port)s '
'--%(direction)s --ethertype IPv4 '
% {
'protocol': self.protocol,
'port': self.port,
'direction': self.direction,
},
parse_output=True,
)
self.addCleanup(
self.openstack,
'default security group rule delete ' + cmd_output['id'],
)
self.DEFAULT_SG_RULE_ID = cmd_output['id']
def test_security_group_rule_list(self):
cmd_output = self.openstack(
'default security group rule list ',
parse_output=True,
)
self.assertIn(
self.DEFAULT_SG_RULE_ID, [rule['ID'] for rule in cmd_output]
)
def test_security_group_rule_show(self):
cmd_output = self.openstack(
'default security group rule show ' + self.DEFAULT_SG_RULE_ID,
parse_output=True,
)
self.assertEqual(self.DEFAULT_SG_RULE_ID, cmd_output['id'])
self.assertEqual(self.protocol, cmd_output['protocol'])
self.assertEqual(self.port, cmd_output['port_range_min'])
self.assertEqual(self.port, cmd_output['port_range_max'])
self.assertEqual(self.direction, cmd_output['direction'])

File diff suppressed because it is too large Load Diff

View File

@ -49,7 +49,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
( (
expected_columns, expected_columns,
expected_data, expected_data,
) = security_group_rule._format_security_group_rule_show( ) = network_utils.format_security_group_rule_show(
self._security_group_rule self._security_group_rule
) )
return expected_columns, expected_data return expected_columns, expected_data
@ -513,7 +513,7 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
# The security group rule to be shown. # The security group rule to be shown.
_security_group_rule = compute_fakes.create_one_security_group_rule() _security_group_rule = compute_fakes.create_one_security_group_rule()
columns, data = security_group_rule._format_security_group_rule_show( columns, data = network_utils.format_security_group_rule_show(
_security_group_rule _security_group_rule
) )

View File

@ -16,6 +16,7 @@ from unittest.mock import call
from osc_lib import exceptions from osc_lib import exceptions
from openstackclient.network import utils as network_utils
from openstackclient.network.v2 import security_group_rule from openstackclient.network.v2 import security_group_rule
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
from openstackclient.tests.unit.network.v2 import fakes as network_fakes from openstackclient.tests.unit.network.v2 import fakes as network_fakes
@ -1124,9 +1125,7 @@ class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
_security_group_rule.protocol, _security_group_rule.protocol,
_security_group_rule.ether_type, _security_group_rule.ether_type,
_security_group_rule.remote_ip_prefix, _security_group_rule.remote_ip_prefix,
security_group_rule._format_network_port_range( network_utils.format_network_port_range(_security_group_rule),
_security_group_rule
),
_security_group_rule.direction, _security_group_rule.direction,
_security_group_rule.remote_group_id, _security_group_rule.remote_group_id,
_security_group_rule.remote_address_group_id, _security_group_rule.remote_address_group_id,
@ -1138,9 +1137,7 @@ class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
_security_group_rule.protocol, _security_group_rule.protocol,
_security_group_rule.ether_type, _security_group_rule.ether_type,
_security_group_rule.remote_ip_prefix, _security_group_rule.remote_ip_prefix,
security_group_rule._format_network_port_range( network_utils.format_network_port_range(_security_group_rule),
_security_group_rule
),
_security_group_rule.direction, _security_group_rule.direction,
_security_group_rule.remote_group_id, _security_group_rule.remote_group_id,
_security_group_rule.remote_address_group_id, _security_group_rule.remote_address_group_id,

View File

@ -0,0 +1,8 @@
---
features:
- |
Add ``default security group rule create``, ``default security group rule
delete``, ``default security group rule list`` and ``default security group
rule show`` commands to support Neutron Default Security Group Rule CRUD
operations.
[Bug `1983053 <https://bugs.launchpad.net/bugs/1983053>`_]

View File

@ -7,7 +7,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
cryptography>=2.7 # BSD/Apache-2.0 cryptography>=2.7 # BSD/Apache-2.0
cliff>=3.5.0 # Apache-2.0 cliff>=3.5.0 # Apache-2.0
iso8601>=0.1.11 # MIT iso8601>=0.1.11 # MIT
openstacksdk>=1.4.0 # Apache-2.0 openstacksdk>=2.0.0 # Apache-2.0
osc-lib>=2.3.0 # Apache-2.0 osc-lib>=2.3.0 # Apache-2.0
oslo.i18n>=3.15.3 # Apache-2.0 oslo.i18n>=3.15.3 # Apache-2.0
python-keystoneclient>=3.22.0 # Apache-2.0 python-keystoneclient>=3.22.0 # Apache-2.0

View File

@ -571,6 +571,11 @@ openstack.network.v2 =
security_group_rule_list = openstackclient.network.v2.security_group_rule:ListSecurityGroupRule security_group_rule_list = openstackclient.network.v2.security_group_rule:ListSecurityGroupRule
security_group_rule_show = openstackclient.network.v2.security_group_rule:ShowSecurityGroupRule security_group_rule_show = openstackclient.network.v2.security_group_rule:ShowSecurityGroupRule
default_security_group_rule_create = openstackclient.network.v2.default_security_group_rule:CreateDefaultSecurityGroupRule
default_security_group_rule_delete = openstackclient.network.v2.default_security_group_rule:DeleteDefaultSecurityGroupRule
default_security_group_rule_list = openstackclient.network.v2.default_security_group_rule:ListDefaultSecurityGroupRule
default_security_group_rule_show = openstackclient.network.v2.default_security_group_rule:ShowDefaultSecurityGroupRule
subnet_create = openstackclient.network.v2.subnet:CreateSubnet subnet_create = openstackclient.network.v2.subnet:CreateSubnet
subnet_delete = openstackclient.network.v2.subnet:DeleteSubnet subnet_delete = openstackclient.network.v2.subnet:DeleteSubnet
subnet_list = openstackclient.network.v2.subnet:ListSubnet subnet_list = openstackclient.network.v2.subnet:ListSubnet