Cleanup nova subcommands for security groups and rules

The Changes include:
  - Sanity check for from_port, to_port, and IP protocol when adding
    a security rule
  - Print one more column 'Id' for security groups
  - If there are multiple security groups with the same name, the group
    can't be deleted unless an ID is specified. However, there is no code
    to search and delete security group by ID. So, _get_secgroup() will
    get group by ID if the input is like a UUID. If not found, then get
    group by name.
  - Some corresponding changes for help messages.
  - Test case changes after adding sanity check.

Change-Id: Ibd82d8404bdd64e4bca2f8b25756bfaff7b28194
Signed-off-by: Shane Wang <shane.wang@intel.com>
This commit is contained in:
Shane Wang 2013-05-16 14:30:22 +08:00
parent 64e43fde43
commit b1802a59bb
4 changed files with 102 additions and 22 deletions

View File

@ -18,6 +18,7 @@ Security group rules interface (1.1 extension).
""" """
from novaclient import base from novaclient import base
from novaclient import exceptions
class SecurityGroupRule(base.Resource): class SecurityGroupRule(base.Resource):
@ -34,7 +35,7 @@ class SecurityGroupRuleManager(base.ManagerWithFind):
def create(self, parent_group_id, ip_protocol=None, from_port=None, def create(self, parent_group_id, ip_protocol=None, from_port=None,
to_port=None, cidr=None, group_id=None): to_port=None, cidr=None, group_id=None):
""" """
Create a security group Create a security group rule
:param ip_protocol: IP protocol, one of 'tcp', 'udp' or 'icmp' :param ip_protocol: IP protocol, one of 'tcp', 'udp' or 'icmp'
:param from_port: Source port :param from_port: Source port
@ -43,6 +44,19 @@ class SecurityGroupRuleManager(base.ManagerWithFind):
:param group_id: Security group id (int) :param group_id: Security group id (int)
:param parent_group_id: Parent security group id (int) :param parent_group_id: Parent security group id (int)
""" """
try:
from_port = int(from_port)
except (TypeError, ValueError):
raise exceptions.CommandError("From port must be an integer.")
try:
to_port = int(to_port)
except (TypeError, ValueError):
raise exceptions.CommandError("To port must be an integer.")
if ip_protocol.upper() not in ['TCP', 'UDP', 'ICMP']:
raise exceptions.CommandError("Ip protocol must be 'tcp', 'udp', "
"or 'icmp'.")
body = {"security_group_rule": { body = {"security_group_rule": {
"ip_protocol": ip_protocol, "ip_protocol": ip_protocol,
"from_port": from_port, "from_port": from_port,

View File

@ -1878,10 +1878,18 @@ def _print_secgroup_rules(rules):
def _print_secgroups(secgroups): def _print_secgroups(secgroups):
utils.print_list(secgroups, ['Name', 'Description']) utils.print_list(secgroups, ['Id', 'Name', 'Description'])
def _get_secgroup(cs, secgroup): def _get_secgroup(cs, secgroup):
# Check secgroup is an ID
if utils.is_uuid_like(strutils.safe_encode(secgroup)):
try:
return cs.security_groups.get(secgroup)
except exceptions.NotFound:
pass
# Check secgroup as a name
match_found = False match_found = False
for s in cs.security_groups.list(): for s in cs.security_groups.list():
encoding = (locale.getpreferredencoding() or encoding = (locale.getpreferredencoding() or
@ -1895,11 +1903,14 @@ def _get_secgroup(cs, secgroup):
raise exceptions.NoUniqueMatch(msg) raise exceptions.NoUniqueMatch(msg)
match_found = s match_found = s
if match_found is False: if match_found is False:
raise exceptions.CommandError("Secgroup %s not found" % secgroup) raise exceptions.CommandError("Secgroup ID or name '%s' not found."
% secgroup)
return match_found return match_found
@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.') @utils.arg('secgroup',
metavar='<secgroup>',
help='ID or name of security group.')
@utils.arg('ip_proto', @utils.arg('ip_proto',
metavar='<ip-proto>', metavar='<ip-proto>',
help='IP protocol (icmp, tcp, udp).') help='IP protocol (icmp, tcp, udp).')
@ -1921,7 +1932,9 @@ def do_secgroup_add_rule(cs, args):
_print_secgroup_rules([rule]) _print_secgroup_rules([rule])
@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.') @utils.arg('secgroup',
metavar='<secgroup>',
help='ID or name of security group.')
@utils.arg('ip_proto', @utils.arg('ip_proto',
metavar='<ip-proto>', metavar='<ip-proto>',
help='IP protocol (icmp, tcp, udp).') help='IP protocol (icmp, tcp, udp).')
@ -1934,7 +1947,6 @@ def do_secgroup_add_rule(cs, args):
@utils.arg('cidr', metavar='<cidr>', help='CIDR for address range.') @utils.arg('cidr', metavar='<cidr>', help='CIDR for address range.')
def do_secgroup_delete_rule(cs, args): def do_secgroup_delete_rule(cs, args):
"""Delete a rule from a security group.""" """Delete a rule from a security group."""
secgroup = _get_secgroup(cs, args.secgroup) secgroup = _get_secgroup(cs, args.secgroup)
for rule in secgroup.rules: for rule in secgroup.rules:
if (rule['ip_protocol'] and if (rule['ip_protocol'] and
@ -1942,6 +1954,7 @@ def do_secgroup_delete_rule(cs, args):
rule['from_port'] == int(args.from_port) and rule['from_port'] == int(args.from_port) and
rule['to_port'] == int(args.to_port) and rule['to_port'] == int(args.to_port) and
rule['ip_range']['cidr'] == args.cidr): rule['ip_range']['cidr'] == args.cidr):
_print_secgroup_rules([rule])
return cs.security_group_rules.delete(rule['id']) return cs.security_group_rules.delete(rule['id'])
raise exceptions.CommandError("Rule not found") raise exceptions.CommandError("Rule not found")
@ -1952,13 +1965,18 @@ def do_secgroup_delete_rule(cs, args):
help='Description of security group.') help='Description of security group.')
def do_secgroup_create(cs, args): def do_secgroup_create(cs, args):
"""Create a security group.""" """Create a security group."""
_print_secgroups([cs.security_groups.create(args.name, args.description)]) secgroup = cs.security_groups.create(args.name, args.description)
_print_secgroups([secgroup])
@utils.arg('secgroup', metavar='<secgroup>', help='Name of security group.') @utils.arg('secgroup',
metavar='<secgroup>',
help='ID or name of security group.')
def do_secgroup_delete(cs, args): def do_secgroup_delete(cs, args):
"""Delete a security group.""" """Delete a security group."""
cs.security_groups.delete(_get_secgroup(cs, args.secgroup)) secgroup = _get_secgroup(cs, args.secgroup)
cs.security_groups.delete(secgroup)
_print_secgroups([secgroup])
@utils.arg('--all-tenants', @utils.arg('--all-tenants',
@ -1977,24 +1995,28 @@ def do_secgroup_delete(cs, args):
def do_secgroup_list(cs, args): def do_secgroup_list(cs, args):
"""List security groups for the current tenant.""" """List security groups for the current tenant."""
search_opts = {'all_tenants': args.all_tenants} search_opts = {'all_tenants': args.all_tenants}
columns = ['Name', 'Description'] columns = ['Id', 'Name', 'Description']
if args.all_tenants: if args.all_tenants:
columns.append('Tenant_ID') columns.append('Tenant_ID')
groups = cs.security_groups.list(search_opts=search_opts) groups = cs.security_groups.list(search_opts=search_opts)
utils.print_list(groups, columns) utils.print_list(groups, columns)
@utils.arg('secgroup', metavar='<secgroup>', help='Name of security group.') @utils.arg('secgroup',
metavar='<secgroup>',
help='ID or name of security group.')
def do_secgroup_list_rules(cs, args): def do_secgroup_list_rules(cs, args):
"""List rules for a security group.""" """List rules for a security group."""
secgroup = _get_secgroup(cs, args.secgroup) secgroup = _get_secgroup(cs, args.secgroup)
_print_secgroup_rules(secgroup.rules) _print_secgroup_rules(secgroup.rules)
@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.') @utils.arg('secgroup',
metavar='<secgroup>',
help='ID or name of security group.')
@utils.arg('source_group', @utils.arg('source_group',
metavar='<source-group>', metavar='<source-group>',
help='ID of source group.') help='ID or name of source group.')
@utils.arg('ip_proto', @utils.arg('ip_proto',
metavar='<ip-proto>', metavar='<ip-proto>',
help='IP protocol (icmp, tcp, udp).') help='IP protocol (icmp, tcp, udp).')
@ -2023,10 +2045,12 @@ def do_secgroup_add_group_rule(cs, args):
_print_secgroup_rules([rule]) _print_secgroup_rules([rule])
@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.') @utils.arg('secgroup',
metavar='<secgroup>',
help='ID or name of security group.')
@utils.arg('source_group', @utils.arg('source_group',
metavar='<source-group>', metavar='<source-group>',
help='ID of source group.') help='ID or name of source group.')
@utils.arg('ip_proto', @utils.arg('ip_proto',
metavar='<ip-proto>', metavar='<ip-proto>',
help='IP protocol (icmp, tcp, udp).') help='IP protocol (icmp, tcp, udp).')

View File

@ -1,3 +1,4 @@
from novaclient import exceptions
from novaclient.v1_1 import security_group_rules from novaclient.v1_1 import security_group_rules
from tests import utils from tests import utils
from tests.v1_1 import fakes from tests.v1_1 import fakes
@ -11,7 +12,48 @@ class SecurityGroupRulesTest(utils.TestCase):
cs.security_group_rules.delete(1) cs.security_group_rules.delete(1)
cs.assert_called('DELETE', '/os-security-group-rules/1') cs.assert_called('DELETE', '/os-security-group-rules/1')
def test_create_security_group(self): def test_create_security_group_rule(self):
sg = cs.security_group_rules.create(1) sg = cs.security_group_rules.create(1, "tcp", 1, 65535, "10.0.0.0/16")
cs.assert_called('POST', '/os-security-group-rules')
body = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": 1,
"to_port": 65535,
"cidr": "10.0.0.0/16",
"group_id": None,
"parent_group_id": 1,
}
}
cs.assert_called('POST', '/os-security-group-rules', body)
self.assertTrue(isinstance(sg, security_group_rules.SecurityGroupRule)) self.assertTrue(isinstance(sg, security_group_rules.SecurityGroupRule))
def test_create_security_group_group_rule(self):
sg = cs.security_group_rules.create(1, "tcp", 1, 65535, "10.0.0.0/16",
101)
body = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": 1,
"to_port": 65535,
"cidr": "10.0.0.0/16",
"group_id": 101,
"parent_group_id": 1,
}
}
cs.assert_called('POST', '/os-security-group-rules', body)
self.assertTrue(isinstance(sg, security_group_rules.SecurityGroupRule))
def test_invalid_parameters_create(self):
self.assertRaises(exceptions.CommandError,
cs.security_group_rules.create, "secgrouprulecreate",
1, "invalid", 1, 65535, "10.0.0.0/16")
self.assertRaises(exceptions.CommandError,
cs.security_group_rules.create, "secgrouprulecreate",
1, "tcp", "invalid", 65535, "10.0.0.0/16")
self.assertRaises(exceptions.CommandError,
cs.security_group_rules.create, "secgrouprulecreate",
1, "tcp", 1, "invalid", "10.0.0.0/16")

View File

@ -1229,9 +1229,9 @@ class ShellTest(utils.TestCase):
self.run_command('secgroup-add-rule test tcp 22 22 10.0.0.0/8') self.run_command('secgroup-add-rule test tcp 22 22 10.0.0.0/8')
self.assert_called('POST', '/os-security-group-rules', self.assert_called('POST', '/os-security-group-rules',
{'security_group_rule': {'security_group_rule':
{'from_port': '22', {'from_port': 22,
'ip_protocol': 'tcp', 'ip_protocol': 'tcp',
'to_port': '22', 'to_port': 22,
'parent_group_id': 1, 'parent_group_id': 1,
'cidr': '10.0.0.0/8', 'cidr': '10.0.0.0/8',
'group_id': None}}) 'group_id': None}})
@ -1248,9 +1248,9 @@ class ShellTest(utils.TestCase):
self.run_command('secgroup-add-group-rule test test2 tcp 22 22') self.run_command('secgroup-add-group-rule test test2 tcp 22 22')
self.assert_called('POST', '/os-security-group-rules', self.assert_called('POST', '/os-security-group-rules',
{'security_group_rule': {'security_group_rule':
{'from_port': '22', {'from_port': 22,
'ip_protocol': 'TCP', 'ip_protocol': 'TCP',
'to_port': '22', 'to_port': 22,
'parent_group_id': 1, 'parent_group_id': 1,
'cidr': None, 'cidr': None,
'group_id': 2}}) 'group_id': 2}})