after trunk merge
This commit is contained in:
@@ -187,6 +187,102 @@ class CloudTestCase(test.TestCase):
|
||||
sec['name'])
|
||||
db.security_group_destroy(self.context, sec['id'])
|
||||
|
||||
def test_describe_security_groups_by_id(self):
|
||||
sec = db.security_group_create(self.context,
|
||||
{'project_id': self.context.project_id,
|
||||
'name': 'test'})
|
||||
result = self.cloud.describe_security_groups(self.context,
|
||||
group_id=[sec['id']])
|
||||
self.assertEqual(len(result['securityGroupInfo']), 1)
|
||||
self.assertEqual(
|
||||
result['securityGroupInfo'][0]['groupName'],
|
||||
sec['name'])
|
||||
default = db.security_group_get_by_name(self.context,
|
||||
self.context.project_id,
|
||||
'default')
|
||||
result = self.cloud.describe_security_groups(self.context,
|
||||
group_id=[default['id']])
|
||||
self.assertEqual(len(result['securityGroupInfo']), 1)
|
||||
self.assertEqual(
|
||||
result['securityGroupInfo'][0]['groupName'],
|
||||
'default')
|
||||
db.security_group_destroy(self.context, sec['id'])
|
||||
|
||||
def test_create_delete_security_group(self):
|
||||
descript = 'test description'
|
||||
create = self.cloud.create_security_group
|
||||
result = create(self.context, 'testgrp', descript)
|
||||
group_descript = result['securityGroupSet'][0]['groupDescription']
|
||||
self.assertEqual(descript, group_descript)
|
||||
delete = self.cloud.delete_security_group
|
||||
self.assertTrue(delete(self.context, 'testgrp'))
|
||||
|
||||
def test_delete_security_group_by_id(self):
|
||||
sec = db.security_group_create(self.context,
|
||||
{'project_id': self.context.project_id,
|
||||
'name': 'test'})
|
||||
delete = self.cloud.delete_security_group
|
||||
self.assertTrue(delete(self.context, group_id=sec['id']))
|
||||
|
||||
def test_delete_security_group_with_bad_name(self):
|
||||
delete = self.cloud.delete_security_group
|
||||
notfound = exception.SecurityGroupNotFound
|
||||
self.assertRaises(notfound, delete, self.context, 'badname')
|
||||
|
||||
def test_delete_security_group_with_bad_group_id(self):
|
||||
delete = self.cloud.delete_security_group
|
||||
notfound = exception.SecurityGroupNotFound
|
||||
self.assertRaises(notfound, delete, self.context, group_id=999)
|
||||
|
||||
def test_delete_security_group_no_params(self):
|
||||
delete = self.cloud.delete_security_group
|
||||
self.assertRaises(exception.ApiError, delete, self.context)
|
||||
|
||||
def test_authorize_revoke_security_group_ingress(self):
|
||||
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||
sec = db.security_group_create(self.context, kwargs)
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||
authz(self.context, group_name=sec['name'], **kwargs)
|
||||
revoke = self.cloud.revoke_security_group_ingress
|
||||
self.assertTrue(revoke(self.context, group_name=sec['name'], **kwargs))
|
||||
|
||||
def test_authorize_revoke_security_group_ingress_by_id(self):
|
||||
sec = db.security_group_create(self.context,
|
||||
{'project_id': self.context.project_id,
|
||||
'name': 'test'})
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||
authz(self.context, group_id=sec['id'], **kwargs)
|
||||
revoke = self.cloud.revoke_security_group_ingress
|
||||
self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
|
||||
|
||||
def test_authorize_security_group_ingress_missing_protocol_params(self):
|
||||
sec = db.security_group_create(self.context,
|
||||
{'project_id': self.context.project_id,
|
||||
'name': 'test'})
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
self.assertRaises(exception.ApiError, authz, self.context, 'test')
|
||||
|
||||
def test_authorize_security_group_ingress_missing_group_name_or_id(self):
|
||||
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
self.assertRaises(exception.ApiError, authz, self.context, **kwargs)
|
||||
|
||||
def test_authorize_security_group_ingress_already_exists(self):
|
||||
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||
sec = db.security_group_create(self.context, kwargs)
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||
authz(self.context, group_name=sec['name'], **kwargs)
|
||||
self.assertRaises(exception.ApiError, authz, self.context,
|
||||
group_name=sec['name'], **kwargs)
|
||||
|
||||
def test_revoke_security_group_ingress_missing_group_name_or_id(self):
|
||||
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||
revoke = self.cloud.revoke_security_group_ingress
|
||||
self.assertRaises(exception.ApiError, revoke, self.context, **kwargs)
|
||||
|
||||
def test_describe_volumes(self):
|
||||
"""Makes sure describe_volumes works and filters results."""
|
||||
vol1 = db.volume_create(self.context, {})
|
||||
|
||||
Reference in New Issue
Block a user