c33b041c0c
With the introduction of Lowercase/Uppercase conversion functions in the utils.py, the following patch removes the dependancy of the current code on character casing. utils.convert_to_lowercase is called where the code expects the input to be in lower case while utils.convert_to_uppercase will be called where the code expects the input to be in CAPITAL casing only. Change-Id: I1c5c3c87f343fc2731469b9a0c38d278f6018a11
380 lines
14 KiB
Python
380 lines
14 KiB
Python
# Copyright 2012 OpenStack Foundation.
|
|
# All Rights Reserved
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import argparse
|
|
|
|
from neutronclient._i18n import _
|
|
from neutronclient.common import exceptions
|
|
from neutronclient.common import utils
|
|
from neutronclient.neutron import v2_0 as neutronV20
|
|
|
|
|
|
def _get_remote(rule):
|
|
if rule['remote_ip_prefix']:
|
|
remote = '%s (CIDR)' % rule['remote_ip_prefix']
|
|
elif rule['remote_group_id']:
|
|
remote = '%s (group)' % rule['remote_group_id']
|
|
else:
|
|
remote = None
|
|
return remote
|
|
|
|
|
|
def _get_protocol_port(rule):
|
|
proto = rule['protocol']
|
|
port_min = rule['port_range_min']
|
|
port_max = rule['port_range_max']
|
|
if proto in ('tcp', 'udp'):
|
|
if (port_min and port_min == port_max):
|
|
protocol_port = '%s/%s' % (port_min, proto)
|
|
elif port_min:
|
|
protocol_port = '%s-%s/%s' % (port_min, port_max, proto)
|
|
else:
|
|
protocol_port = proto
|
|
elif proto == 'icmp':
|
|
icmp_opts = []
|
|
if port_min is not None:
|
|
icmp_opts.append('type:%s' % port_min)
|
|
if port_max is not None:
|
|
icmp_opts.append('code:%s' % port_max)
|
|
|
|
if icmp_opts:
|
|
protocol_port = 'icmp (%s)' % ', '.join(icmp_opts)
|
|
else:
|
|
protocol_port = 'icmp'
|
|
elif proto is not None:
|
|
# port_range_min/max are not recognized for protocol
|
|
# other than TCP, UDP and ICMP.
|
|
protocol_port = proto
|
|
else:
|
|
protocol_port = None
|
|
|
|
return protocol_port
|
|
|
|
|
|
def _format_sg_rule(rule):
|
|
formatted = []
|
|
for field in ['direction',
|
|
'ethertype',
|
|
('protocol_port', _get_protocol_port),
|
|
'remote_ip_prefix',
|
|
'remote_group_id']:
|
|
if isinstance(field, tuple):
|
|
field, get_method = field
|
|
data = get_method(rule)
|
|
else:
|
|
data = rule[field]
|
|
if not data:
|
|
continue
|
|
if field in ('remote_ip_prefix', 'remote_group_id'):
|
|
data = '%s: %s' % (field, data)
|
|
formatted.append(data)
|
|
return ', '.join(formatted)
|
|
|
|
|
|
def _format_sg_rules(secgroup):
|
|
try:
|
|
return '\n'.join(sorted([_format_sg_rule(rule) for rule
|
|
in secgroup['security_group_rules']]))
|
|
except Exception:
|
|
return ''
|
|
|
|
|
|
def generate_default_ethertype(protocol):
|
|
if protocol == 'icmpv6':
|
|
return 'IPv6'
|
|
return 'IPv4'
|
|
|
|
|
|
class ListSecurityGroup(neutronV20.ListCommand):
|
|
"""List security groups that belong to a given tenant."""
|
|
|
|
resource = 'security_group'
|
|
list_columns = ['id', 'name', 'security_group_rules']
|
|
_formatters = {'security_group_rules': _format_sg_rules}
|
|
pagination_support = True
|
|
sorting_support = True
|
|
|
|
|
|
class ShowSecurityGroup(neutronV20.ShowCommand):
|
|
"""Show information of a given security group."""
|
|
|
|
resource = 'security_group'
|
|
allow_names = True
|
|
json_indent = 5
|
|
|
|
|
|
class CreateSecurityGroup(neutronV20.CreateCommand):
|
|
"""Create a security group."""
|
|
|
|
resource = 'security_group'
|
|
|
|
def add_known_arguments(self, parser):
|
|
parser.add_argument(
|
|
'name', metavar='NAME',
|
|
help=_('Name of the security group to be created.'))
|
|
parser.add_argument(
|
|
'--description',
|
|
help=_('Description of the security group to be created.'))
|
|
|
|
def args2body(self, parsed_args):
|
|
body = {'name': parsed_args.name}
|
|
neutronV20.update_dict(parsed_args, body,
|
|
['description', 'tenant_id'])
|
|
return {'security_group': body}
|
|
|
|
|
|
class DeleteSecurityGroup(neutronV20.DeleteCommand):
|
|
"""Delete a given security group."""
|
|
|
|
resource = 'security_group'
|
|
allow_names = True
|
|
|
|
|
|
class UpdateSecurityGroup(neutronV20.UpdateCommand):
|
|
"""Update a given security group."""
|
|
|
|
resource = 'security_group'
|
|
|
|
def add_known_arguments(self, parser):
|
|
parser.add_argument(
|
|
'--name',
|
|
help=_('Updated name of the security group.'))
|
|
parser.add_argument(
|
|
'--description',
|
|
help=_('Updated description of the security group.'))
|
|
|
|
def args2body(self, parsed_args):
|
|
body = {}
|
|
neutronV20.update_dict(parsed_args, body,
|
|
['name', 'description'])
|
|
return {'security_group': body}
|
|
|
|
|
|
class ListSecurityGroupRule(neutronV20.ListCommand):
|
|
"""List security group rules that belong to a given tenant."""
|
|
|
|
resource = 'security_group_rule'
|
|
list_columns = ['id', 'security_group_id', 'direction',
|
|
'ethertype', 'port/protocol', 'remote']
|
|
# replace_rules: key is an attribute name in Neutron API and
|
|
# corresponding value is a display name shown by CLI.
|
|
replace_rules = {'security_group_id': 'security_group',
|
|
'remote_group_id': 'remote_group'}
|
|
digest_fields = {
|
|
# The entry 'protocol/port' is left deliberately for backwards
|
|
# compatibility.
|
|
'remote': {
|
|
'method': _get_remote,
|
|
'depends_on': ['remote_ip_prefix', 'remote_group_id']},
|
|
'port/protocol': {
|
|
'method': _get_protocol_port,
|
|
'depends_on': ['protocol', 'port_range_min', 'port_range_max']},
|
|
'protocol/port': {
|
|
'method': _get_protocol_port,
|
|
'depends_on': ['protocol', 'port_range_min', 'port_range_max']}}
|
|
pagination_support = True
|
|
sorting_support = True
|
|
|
|
def get_parser(self, prog_name):
|
|
parser = super(ListSecurityGroupRule, self).get_parser(prog_name)
|
|
parser.add_argument(
|
|
'--no-nameconv', action='store_true',
|
|
help=_('Do not convert security group ID to its name.'))
|
|
return parser
|
|
|
|
@staticmethod
|
|
def replace_columns(cols, rules, reverse=False):
|
|
if reverse:
|
|
rules = dict((rules[k], k) for k in rules.keys())
|
|
return [rules.get(col, col) for col in cols]
|
|
|
|
def get_required_fields(self, fields):
|
|
fields = self.replace_columns(fields, self.replace_rules, reverse=True)
|
|
for field, digest_fields in self.digest_fields.items():
|
|
if field in fields:
|
|
fields += digest_fields['depends_on']
|
|
fields.remove(field)
|
|
return fields
|
|
|
|
def retrieve_list(self, parsed_args):
|
|
parsed_args.fields = self.get_required_fields(parsed_args.fields)
|
|
return super(ListSecurityGroupRule, self).retrieve_list(parsed_args)
|
|
|
|
def _get_sg_name_dict(self, data, page_size, no_nameconv):
|
|
"""Get names of security groups referred in the retrieved rules.
|
|
|
|
:return: a dict from secgroup ID to secgroup name
|
|
"""
|
|
if no_nameconv:
|
|
return {}
|
|
neutron_client = self.get_client()
|
|
search_opts = {'fields': ['id', 'name']}
|
|
if self.pagination_support:
|
|
if page_size:
|
|
search_opts.update({'limit': page_size})
|
|
sec_group_ids = set()
|
|
for rule in data:
|
|
for key in self.replace_rules:
|
|
if rule.get(key):
|
|
sec_group_ids.add(rule[key])
|
|
sec_group_ids = list(sec_group_ids)
|
|
|
|
def _get_sec_group_list(sec_group_ids):
|
|
search_opts['id'] = sec_group_ids
|
|
return neutron_client.list_security_groups(
|
|
**search_opts).get('security_groups', [])
|
|
|
|
try:
|
|
secgroups = _get_sec_group_list(sec_group_ids)
|
|
except exceptions.RequestURITooLong as uri_len_exc:
|
|
# Length of a query filter on security group rule id
|
|
# id=<uuid>& (with len(uuid)=36)
|
|
sec_group_id_filter_len = 40
|
|
# The URI is too long because of too many sec_group_id filters
|
|
# Use the excess attribute of the exception to know how many
|
|
# sec_group_id filters can be inserted into a single request
|
|
sec_group_count = len(sec_group_ids)
|
|
max_size = ((sec_group_id_filter_len * sec_group_count) -
|
|
uri_len_exc.excess)
|
|
chunk_size = max_size // sec_group_id_filter_len
|
|
secgroups = []
|
|
for i in range(0, sec_group_count, chunk_size):
|
|
secgroups.extend(
|
|
_get_sec_group_list(sec_group_ids[i: i + chunk_size]))
|
|
|
|
return dict([(sg['id'], sg['name'])
|
|
for sg in secgroups if sg['name']])
|
|
|
|
@staticmethod
|
|
def _has_fields(rule, required_fields):
|
|
return all([key in rule for key in required_fields])
|
|
|
|
def extend_list(self, data, parsed_args):
|
|
sg_dict = self._get_sg_name_dict(data, parsed_args.page_size,
|
|
parsed_args.no_nameconv)
|
|
for rule in data:
|
|
# Replace security group UUID with its name.
|
|
for key in self.replace_rules:
|
|
if key in rule:
|
|
rule[key] = sg_dict.get(rule[key], rule[key])
|
|
for field, digest_rule in self.digest_fields.items():
|
|
if self._has_fields(rule, digest_rule['depends_on']):
|
|
rule[field] = digest_rule['method'](rule) or 'any'
|
|
|
|
def setup_columns(self, info, parsed_args):
|
|
# Translate the specified columns from the command line
|
|
# into field names used in "info".
|
|
parsed_args.columns = self.replace_columns(parsed_args.columns,
|
|
self.replace_rules,
|
|
reverse=True)
|
|
# NOTE(amotoki): 2nd element of the tuple returned by setup_columns()
|
|
# is a generator, so if you need to create a look using the generator
|
|
# object, you need to recreate a generator to show a list expectedly.
|
|
info = super(ListSecurityGroupRule, self).setup_columns(info,
|
|
parsed_args)
|
|
cols = info[0]
|
|
if not parsed_args.no_nameconv:
|
|
# Replace column names in the header line (in info[0])
|
|
cols = self.replace_columns(info[0], self.replace_rules)
|
|
parsed_args.columns = cols
|
|
return (cols, info[1])
|
|
|
|
|
|
class ShowSecurityGroupRule(neutronV20.ShowCommand):
|
|
"""Show information of a given security group rule."""
|
|
|
|
resource = 'security_group_rule'
|
|
allow_names = False
|
|
|
|
|
|
class CreateSecurityGroupRule(neutronV20.CreateCommand):
|
|
"""Create a security group rule."""
|
|
|
|
resource = 'security_group_rule'
|
|
|
|
def add_known_arguments(self, parser):
|
|
parser.add_argument(
|
|
'--description',
|
|
help=_('Description of security group rule.'))
|
|
parser.add_argument(
|
|
'security_group_id', metavar='SECURITY_GROUP',
|
|
help=_('ID or name of the security group to '
|
|
'which the rule is added.'))
|
|
parser.add_argument(
|
|
'--direction',
|
|
type=utils.convert_to_lowercase,
|
|
default='ingress', choices=['ingress', 'egress'],
|
|
help=_('Direction of traffic: ingress/egress.'))
|
|
parser.add_argument(
|
|
'--ethertype',
|
|
help=_('IPv4/IPv6'))
|
|
parser.add_argument(
|
|
'--protocol',
|
|
type=utils.convert_to_lowercase,
|
|
help=_('Protocol of packet. Allowed values are '
|
|
'[icmp, icmpv6, tcp, udp] and '
|
|
'integer representations [0-255].'))
|
|
parser.add_argument(
|
|
'--port-range-min',
|
|
help=_('Starting port range. For ICMP it is type.'))
|
|
parser.add_argument(
|
|
'--port_range_min',
|
|
help=argparse.SUPPRESS)
|
|
parser.add_argument(
|
|
'--port-range-max',
|
|
help=_('Ending port range. For ICMP it is code.'))
|
|
parser.add_argument(
|
|
'--port_range_max',
|
|
help=argparse.SUPPRESS)
|
|
parser.add_argument(
|
|
'--remote-ip-prefix',
|
|
help=_('CIDR to match on.'))
|
|
parser.add_argument(
|
|
'--remote_ip_prefix',
|
|
help=argparse.SUPPRESS)
|
|
parser.add_argument(
|
|
'--remote-group-id', metavar='REMOTE_GROUP',
|
|
help=_('ID or name of the remote security group '
|
|
'to which the rule is applied.'))
|
|
parser.add_argument(
|
|
'--remote_group_id',
|
|
help=argparse.SUPPRESS)
|
|
|
|
def args2body(self, parsed_args):
|
|
_security_group_id = neutronV20.find_resourceid_by_name_or_id(
|
|
self.get_client(), 'security_group', parsed_args.security_group_id)
|
|
body = {'security_group_id': _security_group_id,
|
|
'direction': parsed_args.direction,
|
|
'ethertype': parsed_args.ethertype or
|
|
generate_default_ethertype(parsed_args.protocol)}
|
|
neutronV20.update_dict(parsed_args, body,
|
|
['protocol', 'port_range_min', 'port_range_max',
|
|
'remote_ip_prefix', 'tenant_id',
|
|
'description'])
|
|
if parsed_args.remote_group_id:
|
|
_remote_group_id = neutronV20.find_resourceid_by_name_or_id(
|
|
self.get_client(), 'security_group',
|
|
parsed_args.remote_group_id)
|
|
body['remote_group_id'] = _remote_group_id
|
|
return {'security_group_rule': body}
|
|
|
|
|
|
class DeleteSecurityGroupRule(neutronV20.DeleteCommand):
|
|
"""Delete a given security group rule."""
|
|
|
|
resource = 'security_group_rule'
|
|
allow_names = False
|