Merge "Unittests: refactor AWS::EC2::SecurityGroup tests"

This commit is contained in:
Jenkins 2014-10-07 06:01:33 +00:00 committed by Gerrit Code Review
commit 9dbad001f6

View File

@ -169,7 +169,7 @@ Resources:
self.assertEqual(ref_id, rsrc.FnGetRefId()) self.assertEqual(ref_id, rsrc.FnGetRefId())
self.assertEqual(metadata, dict(rsrc.metadata_get())) self.assertEqual(metadata, dict(rsrc.metadata_get()))
def test_security_group_nova(self): def stubout_nova_create_security_group(self):
#create script #create script
self.mock_no_neutron() self.mock_no_neutron()
nova.NovaClientPlugin._create().AndReturn(self.fc) nova.NovaClientPlugin._create().AndReturn(self.fc)
@ -196,27 +196,28 @@ Resources:
2, 'tcp', None, None, None, 1).AndReturn(None) 2, 'tcp', None, None, None, 1).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.create( nova_sgr.SecurityGroupRuleManager.create(
2, 'icmp', None, None, None, '1').AndReturn(None) 2, 'icmp', None, None, None, '1').AndReturn(None)
return sg_name
# delete script def stubout_nova_get_security_group(self, sg_name):
nova_sg.SecurityGroupManager.get(2).AndReturn(NovaSG( nova_sg.SecurityGroupManager.get(2).AndReturn(NovaSG(
id=2, id=2,
name=sg_name, name=sg_name,
description='HTTP and SSH access', description='',
rules=[{ rules=[{
"from_port": '22', "from_port": 22,
"group": {}, "group": {},
"ip_protocol": "tcp", "ip_protocol": "tcp",
"to_port": '22', "to_port": 22,
"parent_group_id": 2, "parent_group_id": 2,
"ip_range": { "ip_range": {
"cidr": "0.0.0.0/0" "cidr": "0.0.0.0/0"
}, },
'id': 130 'id': 130
}, { }, {
'from_port': '80', 'from_port': 80,
'group': {}, 'group': {},
'ip_protocol': 'tcp', 'ip_protocol': 'tcp',
'to_port': '80', 'to_port': 80,
'parent_group_id': 2, 'parent_group_id': 2,
'ip_range': { 'ip_range': {
'cidr': '0.0.0.0/0' 'cidr': '0.0.0.0/0'
@ -246,207 +247,15 @@ Resources:
'id': 133 'id': 133
}] }]
)) ))
def stubout_nova_delete_security_group_rules(self, sg_name):
self.stubout_nova_get_security_group(sg_name)
nova_sgr.SecurityGroupRuleManager.delete(130).AndReturn(None) nova_sgr.SecurityGroupRuleManager.delete(130).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.delete(131).AndReturn(None) nova_sgr.SecurityGroupRuleManager.delete(131).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.delete(132).AndReturn(None) nova_sgr.SecurityGroupRuleManager.delete(132).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.delete(133).AndReturn(None) nova_sgr.SecurityGroupRuleManager.delete(133).AndReturn(None)
nova_sg.SecurityGroupManager.delete(2).AndReturn(None)
self.m.ReplayAll() def stubout_neutron_create_security_group(self):
stack = self.create_stack(self.test_template_nova)
sg = stack['the_sg']
self.assertResourceState(sg, utils.PhysName('test_stack', 'the_sg'))
stack.delete()
self.m.VerifyAll()
def test_security_group_nova_bad_source_group(self):
#create script
self.mock_no_neutron()
nova.NovaClientPlugin._create().AndReturn(self.fc)
nova_sg.SecurityGroupManager.list().AndReturn([NovaSG(
id=1,
name='test',
description='FAKE_SECURITY_GROUP',
rules=[],
)])
sg_name = utils.PhysName('test_stack', 'the_sg')
nova_sg.SecurityGroupManager.create(
sg_name,
'HTTP and SSH access').AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[]))
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '22', '22', '0.0.0.0/0', None).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '80', '80', '0.0.0.0/0', None).AndReturn(None)
# delete script
nova_sg.SecurityGroupManager.get(2).AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[{
"from_port": '22',
"group": {},
"ip_protocol": "tcp",
"to_port": '22',
"parent_group_id": 2,
"ip_range": {
"cidr": "0.0.0.0/0"
},
'id': 130
}, {
'from_port': '80',
'group': {},
'ip_protocol': 'tcp',
'to_port': '80',
'parent_group_id': 2,
'ip_range': {
'cidr': '0.0.0.0/0'
},
'id': 131
}]
))
nova_sgr.SecurityGroupRuleManager.delete(130).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.delete(131).AndReturn(None)
nova_sg.SecurityGroupManager.delete(2).AndReturn(None)
self.m.ReplayAll()
stack = self.create_stack(self.test_template_nova_bad_source_group)
sg = stack['the_sg']
self.assertEqual(sg.FAILED, sg.status)
self.assertIn('not found', sg.status_reason)
stack.delete()
self.m.VerifyAll()
def test_security_group_client_exception(self):
#create script
self.mock_no_neutron()
nova.NovaClientPlugin._create().AndReturn(self.fc)
sg_name = utils.PhysName('test_stack', 'the_sg')
nova_sg.SecurityGroupManager.list().AndReturn([
NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[],
),
NovaSG(
id=1,
name='test',
description='FAKE_SECURITY_GROUP',
rules=[],
)
])
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '22', '22', '0.0.0.0/0', None).AndRaise(
fakes.fake_exception(400, 'Rule already exists'))
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '80', '80', '0.0.0.0/0', None).AndReturn(
fakes.fake_exception(400, 'Rule already exists'))
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', None, None, None, 1).AndReturn(
fakes.fake_exception(400, 'Rule already exists'))
nova_sgr.SecurityGroupRuleManager.create(
2, 'icmp', None, None, None, '1').AndReturn(
fakes.fake_exception(400, 'Rule already exists'))
# delete script
nova_sg.SecurityGroupManager.get(2).AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[{
"from_port": '22',
"group": {},
"ip_protocol": "tcp",
"to_port": '22',
"parent_group_id": 2,
"ip_range": {
"cidr": "0.0.0.0/0"
},
'id': 130
}, {
'from_port': '80',
'group': {},
'ip_protocol': 'tcp',
'to_port': '80',
'parent_group_id': 2,
'ip_range': {
'cidr': '0.0.0.0/0'
},
'id': 131
}, {
'from_port': None,
'group': {
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'name': 'test'
},
'ip_protocol': 'tcp',
'to_port': None,
'parent_group_id': 2,
'ip_range': {},
'id': 132
}, {
'from_port': None,
'group': {
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'name': 'test'
},
'ip_protocol': 'icmp',
'to_port': None,
'parent_group_id': 2,
'ip_range': {},
'id': 133
}]
))
nova_sgr.SecurityGroupRuleManager.delete(130).AndRaise(
fakes.fake_exception())
nova_sgr.SecurityGroupRuleManager.delete(131).AndRaise(
fakes.fake_exception())
nova_sgr.SecurityGroupRuleManager.delete(132).AndRaise(
fakes.fake_exception())
nova_sgr.SecurityGroupRuleManager.delete(133).AndRaise(
fakes.fake_exception())
nova_sg.SecurityGroupManager.delete(2).AndReturn(None)
nova_sg.SecurityGroupManager.get(2).AndRaise(
fakes.fake_exception())
self.m.ReplayAll()
stack = self.create_stack(self.test_template_nova)
sg = stack['the_sg']
self.assertResourceState(sg, utils.PhysName('test_stack', 'the_sg'))
scheduler.TaskRunner(sg.delete)()
sg.state_set(sg.CREATE, sg.COMPLETE, 'to delete again')
sg.resource_id = 2
stack.delete()
self.m.VerifyAll()
def test_security_group_nova_with_egress_rules(self):
self.mock_no_neutron()
t = template_format.parse(self.test_template_nova_with_egress)
stack = self.parse_stack(t)
sg = stack['the_sg']
self.assertRaises(exception.EgressRuleNotAllowed, sg.validate)
def test_security_group_neutron(self):
#create script
sg_name = utils.PhysName('test_stack', 'the_sg') sg_name = utils.PhysName('test_stack', 'the_sg')
neutronclient.Client.create_security_group({ neutronclient.Client.create_security_group({
'security_group': { 'security_group': {
@ -485,6 +294,11 @@ Resources:
} }
}) })
neutronclient.Client.delete_security_group_rule('aaaa-1').AndReturn(
None)
neutronclient.Client.delete_security_group_rule('aaaa-2').AndReturn(
None)
neutronclient.Client.create_security_group_rule({ neutronclient.Client.create_security_group_rule({
'security_group_rule': { 'security_group_rule': {
'direction': 'ingress', 'direction': 'ingress',
@ -557,10 +371,6 @@ Resources:
'id': 'dddd' 'id': 'dddd'
} }
}) })
neutronclient.Client.delete_security_group_rule('aaaa-1').AndReturn(
None)
neutronclient.Client.delete_security_group_rule('aaaa-2').AndReturn(
None)
neutronclient.Client.create_security_group_rule({ neutronclient.Client.create_security_group_rule({
'security_group_rule': { 'security_group_rule': {
'direction': 'egress', 'direction': 'egress',
@ -610,7 +420,7 @@ Resources:
} }
}) })
# delete script def stubout_neutron_get_security_group(self):
neutronclient.Client.show_security_group('aaaa').AndReturn({ neutronclient.Client.show_security_group('aaaa').AndReturn({
'security_group': { 'security_group': {
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88', 'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
@ -619,25 +429,25 @@ Resources:
'security_group_rules': [{ 'security_group_rules': [{
'direction': 'ingress', 'direction': 'ingress',
'protocol': 'tcp', 'protocol': 'tcp',
'port_range_max': '22', 'port_range_max': 22,
'id': 'bbbb', 'id': 'bbbb',
'ethertype': 'IPv4', 'ethertype': 'IPv4',
'security_group_id': 'aaaa', 'security_group_id': 'aaaa',
'remote_group_id': None, 'remote_group_id': None,
'remote_ip_prefix': '0.0.0.0/0', 'remote_ip_prefix': '0.0.0.0/0',
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88', 'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'port_range_min': '22' 'port_range_min': 22
}, { }, {
'direction': 'ingress', 'direction': 'ingress',
'protocol': 'tcp', 'protocol': 'tcp',
'port_range_max': '80', 'port_range_max': 80,
'id': 'cccc', 'id': 'cccc',
'ethertype': 'IPv4', 'ethertype': 'IPv4',
'security_group_id': 'aaaa', 'security_group_id': 'aaaa',
'remote_group_id': None, 'remote_group_id': None,
'remote_ip_prefix': '0.0.0.0/0', 'remote_ip_prefix': '0.0.0.0/0',
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88', 'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'port_range_min': '80' 'port_range_min': 80
}, { }, {
'direction': 'ingress', 'direction': 'ingress',
'protocol': 'tcp', 'protocol': 'tcp',
@ -652,14 +462,14 @@ Resources:
}, { }, {
'direction': 'egress', 'direction': 'egress',
'protocol': 'tcp', 'protocol': 'tcp',
'port_range_max': '22', 'port_range_max': 22,
'id': 'eeee', 'id': 'eeee',
'ethertype': 'IPv4', 'ethertype': 'IPv4',
'security_group_id': 'aaaa', 'security_group_id': 'aaaa',
'remote_group_id': None, 'remote_group_id': None,
'remote_ip_prefix': '10.0.1.0/24', 'remote_ip_prefix': '10.0.1.0/24',
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88', 'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'port_range_min': '22' 'port_range_min': 22
}, { }, {
'direction': 'egress', 'direction': 'egress',
'protocol': None, 'protocol': None,
@ -673,11 +483,223 @@ Resources:
'port_range_min': None 'port_range_min': None
}], }],
'id': 'aaaa'}}) 'id': 'aaaa'}})
def stubout_neutron_delete_security_group_rules(self):
self.stubout_neutron_get_security_group()
neutronclient.Client.delete_security_group_rule('bbbb').AndReturn(None) neutronclient.Client.delete_security_group_rule('bbbb').AndReturn(None)
neutronclient.Client.delete_security_group_rule('cccc').AndReturn(None) neutronclient.Client.delete_security_group_rule('cccc').AndReturn(None)
neutronclient.Client.delete_security_group_rule('dddd').AndReturn(None) neutronclient.Client.delete_security_group_rule('dddd').AndReturn(None)
neutronclient.Client.delete_security_group_rule('eeee').AndReturn(None) neutronclient.Client.delete_security_group_rule('eeee').AndReturn(None)
neutronclient.Client.delete_security_group_rule('ffff').AndReturn(None) neutronclient.Client.delete_security_group_rule('ffff').AndReturn(None)
def test_security_group_nova(self):
#create script
#nova.NovaClientPlugin._create().AndReturn(self.fc)
sg_name = self.stubout_nova_create_security_group()
# delete script
self.stubout_nova_delete_security_group_rules(sg_name)
nova_sg.SecurityGroupManager.delete(2).AndReturn(None)
self.m.ReplayAll()
stack = self.create_stack(self.test_template_nova)
sg = stack['the_sg']
self.assertResourceState(sg, utils.PhysName('test_stack', 'the_sg'))
stack.delete()
self.m.VerifyAll()
def test_security_group_nova_bad_source_group(self):
#create script
self.mock_no_neutron()
nova.NovaClientPlugin._create().AndReturn(self.fc)
nova_sg.SecurityGroupManager.list().AndReturn([NovaSG(
id=1,
name='test',
description='FAKE_SECURITY_GROUP',
rules=[],
)])
sg_name = utils.PhysName('test_stack', 'the_sg')
nova_sg.SecurityGroupManager.create(
sg_name,
'HTTP and SSH access').AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[]))
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '22', '22', '0.0.0.0/0', None).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '80', '80', '0.0.0.0/0', None).AndReturn(None)
# delete script
nova_sg.SecurityGroupManager.get(2).AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[{
"from_port": 22,
"group": {},
"ip_protocol": "tcp",
"to_port": 22,
"parent_group_id": 2,
"ip_range": {
"cidr": "0.0.0.0/0"
},
'id': 130
}, {
'from_port': 80,
'group': {},
'ip_protocol': 'tcp',
'to_port': 80,
'parent_group_id': 2,
'ip_range': {
'cidr': '0.0.0.0/0'
},
'id': 131
}]
))
nova_sgr.SecurityGroupRuleManager.delete(130).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.delete(131).AndReturn(None)
nova_sg.SecurityGroupManager.delete(2).AndReturn(None)
self.m.ReplayAll()
stack = self.create_stack(self.test_template_nova_bad_source_group)
sg = stack['the_sg']
self.assertEqual(sg.FAILED, sg.status)
self.assertIn('not found', sg.status_reason)
stack.delete()
self.m.VerifyAll()
def test_security_group_nova_exception(self):
#create script
self.mock_no_neutron()
nova.NovaClientPlugin._create().AndReturn(self.fc)
sg_name = utils.PhysName('test_stack', 'the_sg')
nova_sg.SecurityGroupManager.list().AndReturn([
NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[],
),
NovaSG(
id=1,
name='test',
description='FAKE_SECURITY_GROUP',
rules=[],
)
])
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '22', '22', '0.0.0.0/0', None).AndRaise(
fakes.fake_exception(400, 'Rule already exists'))
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '80', '80', '0.0.0.0/0', None).AndReturn(
fakes.fake_exception(400, 'Rule already exists'))
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', None, None, None, 1).AndReturn(
fakes.fake_exception(400, 'Rule already exists'))
nova_sgr.SecurityGroupRuleManager.create(
2, 'icmp', None, None, None, '1').AndReturn(
fakes.fake_exception(400, 'Rule already exists'))
# delete script
nova_sg.SecurityGroupManager.get(2).AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[{
"from_port": 22,
"group": {},
"ip_protocol": "tcp",
"to_port": 22,
"parent_group_id": 2,
"ip_range": {
"cidr": "0.0.0.0/0"
},
'id': 130
}, {
'from_port': 80,
'group': {},
'ip_protocol': 'tcp',
'to_port': 80,
'parent_group_id': 2,
'ip_range': {
'cidr': '0.0.0.0/0'
},
'id': 131
}, {
'from_port': None,
'group': {
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'name': 'test'
},
'ip_protocol': 'tcp',
'to_port': None,
'parent_group_id': 2,
'ip_range': {},
'id': 132
}, {
'from_port': None,
'group': {
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'name': 'test'
},
'ip_protocol': 'icmp',
'to_port': None,
'parent_group_id': 2,
'ip_range': {},
'id': 133
}]
))
nova_sgr.SecurityGroupRuleManager.delete(130).AndRaise(
fakes.fake_exception())
nova_sgr.SecurityGroupRuleManager.delete(131).AndRaise(
fakes.fake_exception())
nova_sgr.SecurityGroupRuleManager.delete(132).AndRaise(
fakes.fake_exception())
nova_sgr.SecurityGroupRuleManager.delete(133).AndRaise(
fakes.fake_exception())
nova_sg.SecurityGroupManager.delete(2).AndReturn(None)
nova_sg.SecurityGroupManager.get(2).AndRaise(
fakes.fake_exception())
self.m.ReplayAll()
stack = self.create_stack(self.test_template_nova)
sg = stack['the_sg']
self.assertResourceState(sg, utils.PhysName('test_stack', 'the_sg'))
scheduler.TaskRunner(sg.delete)()
sg.state_set(sg.CREATE, sg.COMPLETE, 'to delete again')
sg.resource_id = 2
stack.delete()
self.m.VerifyAll()
def test_security_group_nova_with_egress_rules(self):
self.mock_no_neutron()
t = template_format.parse(self.test_template_nova_with_egress)
stack = self.parse_stack(t)
sg = stack['the_sg']
self.assertRaises(exception.EgressRuleNotAllowed, sg.validate)
def test_security_group_neutron(self):
#create script
self.stubout_neutron_create_security_group()
# delete script
self.stubout_neutron_delete_security_group_rules()
neutronclient.Client.delete_security_group('aaaa').AndReturn(None) neutronclient.Client.delete_security_group('aaaa').AndReturn(None)
self.m.ReplayAll() self.m.ReplayAll()
@ -783,25 +805,25 @@ Resources:
'security_group_rules': [{ 'security_group_rules': [{
'direction': 'ingress', 'direction': 'ingress',
'protocol': 'tcp', 'protocol': 'tcp',
'port_range_max': '22', 'port_range_max': 22,
'id': 'bbbb', 'id': 'bbbb',
'ethertype': 'IPv4', 'ethertype': 'IPv4',
'security_group_id': 'aaaa', 'security_group_id': 'aaaa',
'remote_group_id': None, 'remote_group_id': None,
'remote_ip_prefix': '0.0.0.0/0', 'remote_ip_prefix': '0.0.0.0/0',
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88', 'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'port_range_min': '22' 'port_range_min': 22
}, { }, {
'direction': 'ingress', 'direction': 'ingress',
'protocol': 'tcp', 'protocol': 'tcp',
'port_range_max': '80', 'port_range_max': 80,
'id': 'cccc', 'id': 'cccc',
'ethertype': 'IPv4', 'ethertype': 'IPv4',
'security_group_id': 'aaaa', 'security_group_id': 'aaaa',
'remote_group_id': None, 'remote_group_id': None,
'remote_ip_prefix': '0.0.0.0/0', 'remote_ip_prefix': '0.0.0.0/0',
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88', 'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'port_range_min': '80' 'port_range_min': 80
}, { }, {
'direction': 'ingress', 'direction': 'ingress',
'protocol': 'tcp', 'protocol': 'tcp',
@ -816,14 +838,14 @@ Resources:
}, { }, {
'direction': 'egress', 'direction': 'egress',
'protocol': 'tcp', 'protocol': 'tcp',
'port_range_max': '22', 'port_range_max': 22,
'id': 'eeee', 'id': 'eeee',
'ethertype': 'IPv4', 'ethertype': 'IPv4',
'security_group_id': 'aaaa', 'security_group_id': 'aaaa',
'remote_group_id': None, 'remote_group_id': None,
'remote_ip_prefix': '10.0.1.0/24', 'remote_ip_prefix': '10.0.1.0/24',
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88', 'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'port_range_min': '22' 'port_range_min': 22
}, { }, {
'direction': 'egress', 'direction': 'egress',
'protocol': None, 'protocol': None,