Merge "Cleanup nova subcommands for security groups and rules"

This commit is contained in:
Jenkins 2013-05-21 07:56:30 +00:00 committed by Gerrit Code Review
commit 986d700c55
4 changed files with 102 additions and 22 deletions

@ -18,6 +18,7 @@ Security group rules interface (1.1 extension).
"""
from novaclient import base
from novaclient import exceptions
class SecurityGroupRule(base.Resource):
@ -34,7 +35,7 @@ class SecurityGroupRuleManager(base.Manager):
def create(self, parent_group_id, ip_protocol=None, from_port=None,
to_port=None, cidr=None, group_id=None):
"""
Create a security group
Create a security group rule
:param ip_protocol: IP protocol, one of 'tcp', 'udp' or 'icmp'
:param from_port: Source port
@ -43,6 +44,19 @@ class SecurityGroupRuleManager(base.Manager):
:param group_id: Security group id (int)
:param parent_group_id: Parent security group id (int)
"""
try:
from_port = int(from_port)
except (TypeError, ValueError):
raise exceptions.CommandError("From port must be an integer.")
try:
to_port = int(to_port)
except (TypeError, ValueError):
raise exceptions.CommandError("To port must be an integer.")
if ip_protocol.upper() not in ['TCP', 'UDP', 'ICMP']:
raise exceptions.CommandError("Ip protocol must be 'tcp', 'udp', "
"or 'icmp'.")
body = {"security_group_rule": {
"ip_protocol": ip_protocol,
"from_port": from_port,

@ -1878,10 +1878,18 @@ def _print_secgroup_rules(rules):
def _print_secgroups(secgroups):
utils.print_list(secgroups, ['Name', 'Description'])
utils.print_list(secgroups, ['Id', 'Name', 'Description'])
def _get_secgroup(cs, secgroup):
# Check secgroup is an ID
if utils.is_uuid_like(strutils.safe_encode(secgroup)):
try:
return cs.security_groups.get(secgroup)
except exceptions.NotFound:
pass
# Check secgroup as a name
match_found = False
for s in cs.security_groups.list():
encoding = (locale.getpreferredencoding() or
@ -1895,11 +1903,14 @@ def _get_secgroup(cs, secgroup):
raise exceptions.NoUniqueMatch(msg)
match_found = s
if match_found is False:
raise exceptions.CommandError("Secgroup %s not found" % secgroup)
raise exceptions.CommandError("Secgroup ID or name '%s' not found."
% secgroup)
return match_found
@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.')
@utils.arg('secgroup',
metavar='<secgroup>',
help='ID or name of security group.')
@utils.arg('ip_proto',
metavar='<ip-proto>',
help='IP protocol (icmp, tcp, udp).')
@ -1921,7 +1932,9 @@ def do_secgroup_add_rule(cs, args):
_print_secgroup_rules([rule])
@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.')
@utils.arg('secgroup',
metavar='<secgroup>',
help='ID or name of security group.')
@utils.arg('ip_proto',
metavar='<ip-proto>',
help='IP protocol (icmp, tcp, udp).')
@ -1934,7 +1947,6 @@ def do_secgroup_add_rule(cs, args):
@utils.arg('cidr', metavar='<cidr>', help='CIDR for address range.')
def do_secgroup_delete_rule(cs, args):
"""Delete a rule from a security group."""
secgroup = _get_secgroup(cs, args.secgroup)
for rule in secgroup.rules:
if (rule['ip_protocol'] and
@ -1942,6 +1954,7 @@ def do_secgroup_delete_rule(cs, args):
rule['from_port'] == int(args.from_port) and
rule['to_port'] == int(args.to_port) and
rule['ip_range']['cidr'] == args.cidr):
_print_secgroup_rules([rule])
return cs.security_group_rules.delete(rule['id'])
raise exceptions.CommandError("Rule not found")
@ -1952,13 +1965,18 @@ def do_secgroup_delete_rule(cs, args):
help='Description of security group.')
def do_secgroup_create(cs, args):
"""Create a security group."""
_print_secgroups([cs.security_groups.create(args.name, args.description)])
secgroup = cs.security_groups.create(args.name, args.description)
_print_secgroups([secgroup])
@utils.arg('secgroup', metavar='<secgroup>', help='Name of security group.')
@utils.arg('secgroup',
metavar='<secgroup>',
help='ID or name of security group.')
def do_secgroup_delete(cs, args):
"""Delete a security group."""
cs.security_groups.delete(_get_secgroup(cs, args.secgroup))
secgroup = _get_secgroup(cs, args.secgroup)
cs.security_groups.delete(secgroup)
_print_secgroups([secgroup])
@utils.arg('--all-tenants',
@ -1977,24 +1995,28 @@ def do_secgroup_delete(cs, args):
def do_secgroup_list(cs, args):
"""List security groups for the current tenant."""
search_opts = {'all_tenants': args.all_tenants}
columns = ['Name', 'Description']
columns = ['Id', 'Name', 'Description']
if args.all_tenants:
columns.append('Tenant_ID')
groups = cs.security_groups.list(search_opts=search_opts)
utils.print_list(groups, columns)
@utils.arg('secgroup', metavar='<secgroup>', help='Name of security group.')
@utils.arg('secgroup',
metavar='<secgroup>',
help='ID or name of security group.')
def do_secgroup_list_rules(cs, args):
"""List rules for a security group."""
secgroup = _get_secgroup(cs, args.secgroup)
_print_secgroup_rules(secgroup.rules)
@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.')
@utils.arg('secgroup',
metavar='<secgroup>',
help='ID or name of security group.')
@utils.arg('source_group',
metavar='<source-group>',
help='ID of source group.')
help='ID or name of source group.')
@utils.arg('ip_proto',
metavar='<ip-proto>',
help='IP protocol (icmp, tcp, udp).')
@ -2023,10 +2045,12 @@ def do_secgroup_add_group_rule(cs, args):
_print_secgroup_rules([rule])
@utils.arg('secgroup', metavar='<secgroup>', help='ID of security group.')
@utils.arg('secgroup',
metavar='<secgroup>',
help='ID or name of security group.')
@utils.arg('source_group',
metavar='<source-group>',
help='ID of source group.')
help='ID or name of source group.')
@utils.arg('ip_proto',
metavar='<ip-proto>',
help='IP protocol (icmp, tcp, udp).')

@ -1,3 +1,4 @@
from novaclient import exceptions
from novaclient.v1_1 import security_group_rules
from tests import utils
from tests.v1_1 import fakes
@ -11,7 +12,48 @@ class SecurityGroupRulesTest(utils.TestCase):
cs.security_group_rules.delete(1)
cs.assert_called('DELETE', '/os-security-group-rules/1')
def test_create_security_group(self):
sg = cs.security_group_rules.create(1)
cs.assert_called('POST', '/os-security-group-rules')
def test_create_security_group_rule(self):
sg = cs.security_group_rules.create(1, "tcp", 1, 65535, "10.0.0.0/16")
body = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": 1,
"to_port": 65535,
"cidr": "10.0.0.0/16",
"group_id": None,
"parent_group_id": 1,
}
}
cs.assert_called('POST', '/os-security-group-rules', body)
self.assertTrue(isinstance(sg, security_group_rules.SecurityGroupRule))
def test_create_security_group_group_rule(self):
sg = cs.security_group_rules.create(1, "tcp", 1, 65535, "10.0.0.0/16",
101)
body = {
"security_group_rule": {
"ip_protocol": "tcp",
"from_port": 1,
"to_port": 65535,
"cidr": "10.0.0.0/16",
"group_id": 101,
"parent_group_id": 1,
}
}
cs.assert_called('POST', '/os-security-group-rules', body)
self.assertTrue(isinstance(sg, security_group_rules.SecurityGroupRule))
def test_invalid_parameters_create(self):
self.assertRaises(exceptions.CommandError,
cs.security_group_rules.create, "secgrouprulecreate",
1, "invalid", 1, 65535, "10.0.0.0/16")
self.assertRaises(exceptions.CommandError,
cs.security_group_rules.create, "secgrouprulecreate",
1, "tcp", "invalid", 65535, "10.0.0.0/16")
self.assertRaises(exceptions.CommandError,
cs.security_group_rules.create, "secgrouprulecreate",
1, "tcp", 1, "invalid", "10.0.0.0/16")

@ -1229,9 +1229,9 @@ class ShellTest(utils.TestCase):
self.run_command('secgroup-add-rule test tcp 22 22 10.0.0.0/8')
self.assert_called('POST', '/os-security-group-rules',
{'security_group_rule':
{'from_port': '22',
{'from_port': 22,
'ip_protocol': 'tcp',
'to_port': '22',
'to_port': 22,
'parent_group_id': 1,
'cidr': '10.0.0.0/8',
'group_id': None}})
@ -1248,9 +1248,9 @@ class ShellTest(utils.TestCase):
self.run_command('secgroup-add-group-rule test test2 tcp 22 22')
self.assert_called('POST', '/os-security-group-rules',
{'security_group_rule':
{'from_port': '22',
{'from_port': 22,
'ip_protocol': 'TCP',
'to_port': '22',
'to_port': 22,
'parent_group_id': 1,
'cidr': None,
'group_id': 2}})