modified to conform to latest AWS EC2 API spec for authorize & revoke ingress params using the IpPermissions data structure, which nests lists of CIDR blocks (IpRanges) as well as lists of Group data
This commit is contained in:
commit
a1ab67f6ab
@ -269,25 +269,64 @@ class CloudTestCase(test.TestCase):
|
|||||||
delete = self.cloud.delete_security_group
|
delete = self.cloud.delete_security_group
|
||||||
self.assertRaises(exception.ApiError, delete, self.context)
|
self.assertRaises(exception.ApiError, delete, self.context)
|
||||||
|
|
||||||
def test_authorize_revoke_security_group_ingress(self):
|
def test_authorize_security_group_ingress(self):
|
||||||
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||||
sec = db.security_group_create(self.context, kwargs)
|
sec = db.security_group_create(self.context, kwargs)
|
||||||
authz = self.cloud.authorize_security_group_ingress
|
authz = self.cloud.authorize_security_group_ingress
|
||||||
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||||
authz(self.context, group_name=sec['name'], **kwargs)
|
self.assertTrue(authz(self.context, group_name=sec['name'], **kwargs))
|
||||||
|
|
||||||
|
def test_authorize_security_group_ingress_ip_permissions_ip_ranges(self):
|
||||||
|
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||||
|
sec = db.security_group_create(self.context, kwargs)
|
||||||
|
authz = self.cloud.authorize_security_group_ingress
|
||||||
|
kwargs = {'ip_permissions': [{'to_port': 81, 'from_port': 81,
|
||||||
|
'ip_ranges':
|
||||||
|
{'1': {'cidr_ip': u'0.0.0.0/0'},
|
||||||
|
'2': {'cidr_ip': u'10.10.10.10/32'}},
|
||||||
|
'ip_protocol': u'tcp'}]}
|
||||||
|
self.assertTrue(authz(self.context, group_name=sec['name'], **kwargs))
|
||||||
|
|
||||||
|
def test_authorize_security_group_ingress_ip_permissions_groups(self):
|
||||||
|
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||||
|
sec = db.security_group_create(self.context, kwargs)
|
||||||
|
authz = self.cloud.authorize_security_group_ingress
|
||||||
|
kwargs = {'ip_permissions': [{'to_port': 81, 'from_port': 81,
|
||||||
|
'ip_ranges':{'1': {'cidr_ip': u'0.0.0.0/0'},
|
||||||
|
'2': {'cidr_ip': u'10.10.10.10/32'}},
|
||||||
|
'groups': {'1': {'user_id': u'someuser',
|
||||||
|
'group_name': u'somegroup1'},
|
||||||
|
'2': {'user_id': u'someuser',
|
||||||
|
'group_name': u'othergroup2'}},
|
||||||
|
'ip_protocol': u'tcp'}]}
|
||||||
|
self.assertTrue(authz(self.context, group_name=sec['name'], **kwargs))
|
||||||
|
|
||||||
|
def test_revoke_security_group_ingress(self):
|
||||||
|
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||||
|
sec = db.security_group_create(self.context, kwargs)
|
||||||
|
authz = self.cloud.authorize_security_group_ingress
|
||||||
|
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||||
|
authz(self.context, group_id=sec['id'], **kwargs)
|
||||||
revoke = self.cloud.revoke_security_group_ingress
|
revoke = self.cloud.revoke_security_group_ingress
|
||||||
self.assertTrue(revoke(self.context, group_name=sec['name'], **kwargs))
|
self.assertTrue(revoke(self.context, group_name=sec['name'], **kwargs))
|
||||||
|
|
||||||
def test_authorize_revoke_security_group_ingress_by_id(self):
|
def test_revoke_security_group_ingress_by_id(self):
|
||||||
sec = db.security_group_create(self.context,
|
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||||
{'project_id': self.context.project_id,
|
sec = db.security_group_create(self.context, kwargs)
|
||||||
'name': 'test'})
|
|
||||||
authz = self.cloud.authorize_security_group_ingress
|
authz = self.cloud.authorize_security_group_ingress
|
||||||
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||||
authz(self.context, group_id=sec['id'], **kwargs)
|
authz(self.context, group_id=sec['id'], **kwargs)
|
||||||
revoke = self.cloud.revoke_security_group_ingress
|
revoke = self.cloud.revoke_security_group_ingress
|
||||||
self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
|
self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
|
||||||
|
|
||||||
|
def test_authorize_security_group_ingress_by_id(self):
|
||||||
|
sec = db.security_group_create(self.context,
|
||||||
|
{'project_id': self.context.project_id,
|
||||||
|
'name': 'test'})
|
||||||
|
authz = self.cloud.authorize_security_group_ingress
|
||||||
|
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||||
|
self.assertTrue(authz(self.context, group_id=sec['id'], **kwargs))
|
||||||
|
|
||||||
def test_authorize_security_group_ingress_missing_protocol_params(self):
|
def test_authorize_security_group_ingress_missing_protocol_params(self):
|
||||||
sec = db.security_group_create(self.context,
|
sec = db.security_group_create(self.context,
|
||||||
{'project_id': self.context.project_id,
|
{'project_id': self.context.project_id,
|
||||||
|
Loading…
Reference in New Issue
Block a user