AuthorizeSecurityGroupIngress now works.

This commit is contained in:
Soren Hansen
2010-09-09 15:13:04 +02:00
parent 0da3cec279
commit 886d31a409
2 changed files with 126 additions and 9 deletions

View File

@@ -214,14 +214,54 @@ class CloudController(object):
@rbac.allow('all')
def describe_security_groups(self, context, **kwargs):
groups = {'securityGroupSet':
[{ 'groupDescription': group.description,
'groupName' : group.name,
'ownerId': context.user.id } for group in db.security_group_get_by_user(context, context.user.id) ] }
[{ 'groupDescription': group.description,
'groupName' : group.name,
'ownerId': context.user.id } for group in \
db.security_group_get_by_user(context,
context.user.id) ] }
return groups
@rbac.allow('netadmin')
def authorize_security_group_ingress(self, context, group_name, **kwargs):
def authorize_security_group_ingress(self, context, group_name,
to_port=None, from_port=None,
ip_protocol=None, cidr_ip=None,
user_id=None,
source_security_group_name=None,
source_security_group_owner_id=None):
security_group = db.security_group_get_by_user_and_name(context,
context.user.id,
group_name)
values = { 'parent_security_group' : security_group.id }
# Aw, crap.
if source_security_group_name:
if source_security_group_owner_id:
other_user_id = source_security_group_owner_id
else:
other_user_id = context.user.id
foreign_security_group = \
db.security_group_get_by_user_and_name(context,
other_user_id,
source_security_group_name)
values['group_id'] = foreign_security_group.id
elif cidr_ip:
values['cidr'] = cidr_ip
else:
return { 'return': False }
if ip_protocol and from_port and to_port:
values['protocol'] = ip_protocol
values['from_port'] = from_port
values['to_port'] = to_port
else:
# If cidr based filtering, protocol and ports are mandatory
if 'cidr' in values:
print values
return None
security_group_rule = db.security_group_rule_create(context, values)
return True
@rbac.allow('netadmin')
@@ -234,6 +274,8 @@ class CloudController(object):
@rbac.allow('netadmin')
def delete_security_group(self, context, group_name, **kwargs):
security_group = db.security_group_get_by_user_and_name(context, context.user.id, group_name)
security_group.delete()
return True
@rbac.allow('projectmanager', 'sysadmin')

View File

@@ -233,20 +233,29 @@ class ApiEc2TestCase(test.BaseTestCase):
self.manager.delete_user(user)
def test_get_all_security_groups(self):
"""Test that operations on security groups stick"""
"""Test that we can retrieve security groups"""
self.expect_http()
self.mox.ReplayAll()
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \
for x in range(random.randint(4, 8)))
user = self.manager.create_user('fake', 'fake', 'fake', admin=True)
project = self.manager.create_project('fake', 'fake', 'fake')
rv = self.ec2.get_all_security_groups()
self.assertEquals(len(rv), 1)
self.assertEquals(rv[0].name, 'default')
self.assertEquals(len(rv), 1)
self.assertEquals(rv[0].name, 'default')
self.manager.delete_project(project)
self.manager.delete_user(user)
def test_create_delete_security_group(self):
"""Test that we can create a security group"""
self.expect_http()
self.mox.ReplayAll()
user = self.manager.create_user('fake', 'fake', 'fake', admin=True)
project = self.manager.create_project('fake', 'fake', 'fake')
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \
for x in range(random.randint(4, 8)))
self.ec2.create_security_group(security_group_name, 'test group')
@@ -257,5 +266,71 @@ class ApiEc2TestCase(test.BaseTestCase):
self.assertEquals(len(rv), 2)
self.assertTrue(security_group_name in [group.name for group in rv])
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(security_group_name)
self.manager.delete_project(project)
self.manager.delete_user(user)
def test_authorize_security_group_cidr(self):
"""Test that we can add rules to a security group"""
self.expect_http()
self.mox.ReplayAll()
user = self.manager.create_user('fake', 'fake', 'fake', admin=True)
project = self.manager.create_project('fake', 'fake', 'fake')
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \
for x in range(random.randint(4, 8)))
group = self.ec2.create_security_group(security_group_name, 'test group')
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.authorize('tcp', 80, 80, '0.0.0.0/0')
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(security_group_name)
self.manager.delete_project(project)
self.manager.delete_user(user)
return
def test_authorize_security_group_foreign_group(self):
"""Test that we can grant another security group access to a security group"""
self.expect_http()
self.mox.ReplayAll()
user = self.manager.create_user('fake', 'fake', 'fake', admin=True)
project = self.manager.create_project('fake', 'fake', 'fake')
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \
for x in range(random.randint(4, 8)))
group = self.ec2.create_security_group(security_group_name, 'test group')
self.expect_http()
self.mox.ReplayAll()
other_group = self.ec2.create_security_group('appserver', 'The application tier')
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.authorize(src_group=other_group)
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(security_group_name)
self.manager.delete_project(project)
self.manager.delete_user(user)
return