Unittests: refactor AWS::EC2::SecurityGroup tests

Refactor mocks for rule creation into separate helper methods.

Change-Id: Ia32639147f12e2a3026b2d8c3efb499d8cea6765
This commit is contained in:
Pavlo Shchelokovskyy 2014-09-10 23:27:29 +03:00
parent ec0a6720ab
commit 8156b2b0e0

View File

@ -169,7 +169,7 @@ Resources:
self.assertEqual(ref_id, rsrc.FnGetRefId())
self.assertEqual(metadata, dict(rsrc.metadata_get()))
def test_security_group_nova(self):
def stubout_nova_create_security_group(self):
#create script
self.mock_no_neutron()
nova.NovaClientPlugin._create().AndReturn(self.fc)
@ -196,27 +196,28 @@ Resources:
2, 'tcp', None, None, None, 1).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.create(
2, 'icmp', None, None, None, '1').AndReturn(None)
return sg_name
# delete script
def stubout_nova_get_security_group(self, sg_name):
nova_sg.SecurityGroupManager.get(2).AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
description='',
rules=[{
"from_port": '22',
"from_port": 22,
"group": {},
"ip_protocol": "tcp",
"to_port": '22',
"to_port": 22,
"parent_group_id": 2,
"ip_range": {
"cidr": "0.0.0.0/0"
},
'id': 130
}, {
'from_port': '80',
'from_port': 80,
'group': {},
'ip_protocol': 'tcp',
'to_port': '80',
'to_port': 80,
'parent_group_id': 2,
'ip_range': {
'cidr': '0.0.0.0/0'
@ -246,207 +247,15 @@ Resources:
'id': 133
}]
))
def stubout_nova_delete_security_group_rules(self, sg_name):
self.stubout_nova_get_security_group(sg_name)
nova_sgr.SecurityGroupRuleManager.delete(130).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.delete(131).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.delete(132).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.delete(133).AndReturn(None)
nova_sg.SecurityGroupManager.delete(2).AndReturn(None)
self.m.ReplayAll()
stack = self.create_stack(self.test_template_nova)
sg = stack['the_sg']
self.assertResourceState(sg, utils.PhysName('test_stack', 'the_sg'))
stack.delete()
self.m.VerifyAll()
def test_security_group_nova_bad_source_group(self):
#create script
self.mock_no_neutron()
nova.NovaClientPlugin._create().AndReturn(self.fc)
nova_sg.SecurityGroupManager.list().AndReturn([NovaSG(
id=1,
name='test',
description='FAKE_SECURITY_GROUP',
rules=[],
)])
sg_name = utils.PhysName('test_stack', 'the_sg')
nova_sg.SecurityGroupManager.create(
sg_name,
'HTTP and SSH access').AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[]))
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '22', '22', '0.0.0.0/0', None).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '80', '80', '0.0.0.0/0', None).AndReturn(None)
# delete script
nova_sg.SecurityGroupManager.get(2).AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[{
"from_port": '22',
"group": {},
"ip_protocol": "tcp",
"to_port": '22',
"parent_group_id": 2,
"ip_range": {
"cidr": "0.0.0.0/0"
},
'id': 130
}, {
'from_port': '80',
'group': {},
'ip_protocol': 'tcp',
'to_port': '80',
'parent_group_id': 2,
'ip_range': {
'cidr': '0.0.0.0/0'
},
'id': 131
}]
))
nova_sgr.SecurityGroupRuleManager.delete(130).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.delete(131).AndReturn(None)
nova_sg.SecurityGroupManager.delete(2).AndReturn(None)
self.m.ReplayAll()
stack = self.create_stack(self.test_template_nova_bad_source_group)
sg = stack['the_sg']
self.assertEqual(sg.FAILED, sg.status)
self.assertIn('not found', sg.status_reason)
stack.delete()
self.m.VerifyAll()
def test_security_group_client_exception(self):
#create script
self.mock_no_neutron()
nova.NovaClientPlugin._create().AndReturn(self.fc)
sg_name = utils.PhysName('test_stack', 'the_sg')
nova_sg.SecurityGroupManager.list().AndReturn([
NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[],
),
NovaSG(
id=1,
name='test',
description='FAKE_SECURITY_GROUP',
rules=[],
)
])
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '22', '22', '0.0.0.0/0', None).AndRaise(
fakes.fake_exception(400, 'Rule already exists'))
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '80', '80', '0.0.0.0/0', None).AndReturn(
fakes.fake_exception(400, 'Rule already exists'))
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', None, None, None, 1).AndReturn(
fakes.fake_exception(400, 'Rule already exists'))
nova_sgr.SecurityGroupRuleManager.create(
2, 'icmp', None, None, None, '1').AndReturn(
fakes.fake_exception(400, 'Rule already exists'))
# delete script
nova_sg.SecurityGroupManager.get(2).AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[{
"from_port": '22',
"group": {},
"ip_protocol": "tcp",
"to_port": '22',
"parent_group_id": 2,
"ip_range": {
"cidr": "0.0.0.0/0"
},
'id': 130
}, {
'from_port': '80',
'group': {},
'ip_protocol': 'tcp',
'to_port': '80',
'parent_group_id': 2,
'ip_range': {
'cidr': '0.0.0.0/0'
},
'id': 131
}, {
'from_port': None,
'group': {
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'name': 'test'
},
'ip_protocol': 'tcp',
'to_port': None,
'parent_group_id': 2,
'ip_range': {},
'id': 132
}, {
'from_port': None,
'group': {
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'name': 'test'
},
'ip_protocol': 'icmp',
'to_port': None,
'parent_group_id': 2,
'ip_range': {},
'id': 133
}]
))
nova_sgr.SecurityGroupRuleManager.delete(130).AndRaise(
fakes.fake_exception())
nova_sgr.SecurityGroupRuleManager.delete(131).AndRaise(
fakes.fake_exception())
nova_sgr.SecurityGroupRuleManager.delete(132).AndRaise(
fakes.fake_exception())
nova_sgr.SecurityGroupRuleManager.delete(133).AndRaise(
fakes.fake_exception())
nova_sg.SecurityGroupManager.delete(2).AndReturn(None)
nova_sg.SecurityGroupManager.get(2).AndRaise(
fakes.fake_exception())
self.m.ReplayAll()
stack = self.create_stack(self.test_template_nova)
sg = stack['the_sg']
self.assertResourceState(sg, utils.PhysName('test_stack', 'the_sg'))
scheduler.TaskRunner(sg.delete)()
sg.state_set(sg.CREATE, sg.COMPLETE, 'to delete again')
sg.resource_id = 2
stack.delete()
self.m.VerifyAll()
def test_security_group_nova_with_egress_rules(self):
self.mock_no_neutron()
t = template_format.parse(self.test_template_nova_with_egress)
stack = self.parse_stack(t)
sg = stack['the_sg']
self.assertRaises(exception.EgressRuleNotAllowed, sg.validate)
def test_security_group_neutron(self):
#create script
def stubout_neutron_create_security_group(self):
sg_name = utils.PhysName('test_stack', 'the_sg')
neutronclient.Client.create_security_group({
'security_group': {
@ -485,6 +294,11 @@ Resources:
}
})
neutronclient.Client.delete_security_group_rule('aaaa-1').AndReturn(
None)
neutronclient.Client.delete_security_group_rule('aaaa-2').AndReturn(
None)
neutronclient.Client.create_security_group_rule({
'security_group_rule': {
'direction': 'ingress',
@ -557,10 +371,6 @@ Resources:
'id': 'dddd'
}
})
neutronclient.Client.delete_security_group_rule('aaaa-1').AndReturn(
None)
neutronclient.Client.delete_security_group_rule('aaaa-2').AndReturn(
None)
neutronclient.Client.create_security_group_rule({
'security_group_rule': {
'direction': 'egress',
@ -610,7 +420,7 @@ Resources:
}
})
# delete script
def stubout_neutron_get_security_group(self):
neutronclient.Client.show_security_group('aaaa').AndReturn({
'security_group': {
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
@ -619,25 +429,25 @@ Resources:
'security_group_rules': [{
'direction': 'ingress',
'protocol': 'tcp',
'port_range_max': '22',
'port_range_max': 22,
'id': 'bbbb',
'ethertype': 'IPv4',
'security_group_id': 'aaaa',
'remote_group_id': None,
'remote_ip_prefix': '0.0.0.0/0',
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'port_range_min': '22'
'port_range_min': 22
}, {
'direction': 'ingress',
'protocol': 'tcp',
'port_range_max': '80',
'port_range_max': 80,
'id': 'cccc',
'ethertype': 'IPv4',
'security_group_id': 'aaaa',
'remote_group_id': None,
'remote_ip_prefix': '0.0.0.0/0',
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'port_range_min': '80'
'port_range_min': 80
}, {
'direction': 'ingress',
'protocol': 'tcp',
@ -652,14 +462,14 @@ Resources:
}, {
'direction': 'egress',
'protocol': 'tcp',
'port_range_max': '22',
'port_range_max': 22,
'id': 'eeee',
'ethertype': 'IPv4',
'security_group_id': 'aaaa',
'remote_group_id': None,
'remote_ip_prefix': '10.0.1.0/24',
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'port_range_min': '22'
'port_range_min': 22
}, {
'direction': 'egress',
'protocol': None,
@ -673,11 +483,223 @@ Resources:
'port_range_min': None
}],
'id': 'aaaa'}})
def stubout_neutron_delete_security_group_rules(self):
self.stubout_neutron_get_security_group()
neutronclient.Client.delete_security_group_rule('bbbb').AndReturn(None)
neutronclient.Client.delete_security_group_rule('cccc').AndReturn(None)
neutronclient.Client.delete_security_group_rule('dddd').AndReturn(None)
neutronclient.Client.delete_security_group_rule('eeee').AndReturn(None)
neutronclient.Client.delete_security_group_rule('ffff').AndReturn(None)
def test_security_group_nova(self):
#create script
#nova.NovaClientPlugin._create().AndReturn(self.fc)
sg_name = self.stubout_nova_create_security_group()
# delete script
self.stubout_nova_delete_security_group_rules(sg_name)
nova_sg.SecurityGroupManager.delete(2).AndReturn(None)
self.m.ReplayAll()
stack = self.create_stack(self.test_template_nova)
sg = stack['the_sg']
self.assertResourceState(sg, utils.PhysName('test_stack', 'the_sg'))
stack.delete()
self.m.VerifyAll()
def test_security_group_nova_bad_source_group(self):
#create script
self.mock_no_neutron()
nova.NovaClientPlugin._create().AndReturn(self.fc)
nova_sg.SecurityGroupManager.list().AndReturn([NovaSG(
id=1,
name='test',
description='FAKE_SECURITY_GROUP',
rules=[],
)])
sg_name = utils.PhysName('test_stack', 'the_sg')
nova_sg.SecurityGroupManager.create(
sg_name,
'HTTP and SSH access').AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[]))
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '22', '22', '0.0.0.0/0', None).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '80', '80', '0.0.0.0/0', None).AndReturn(None)
# delete script
nova_sg.SecurityGroupManager.get(2).AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[{
"from_port": 22,
"group": {},
"ip_protocol": "tcp",
"to_port": 22,
"parent_group_id": 2,
"ip_range": {
"cidr": "0.0.0.0/0"
},
'id': 130
}, {
'from_port': 80,
'group': {},
'ip_protocol': 'tcp',
'to_port': 80,
'parent_group_id': 2,
'ip_range': {
'cidr': '0.0.0.0/0'
},
'id': 131
}]
))
nova_sgr.SecurityGroupRuleManager.delete(130).AndReturn(None)
nova_sgr.SecurityGroupRuleManager.delete(131).AndReturn(None)
nova_sg.SecurityGroupManager.delete(2).AndReturn(None)
self.m.ReplayAll()
stack = self.create_stack(self.test_template_nova_bad_source_group)
sg = stack['the_sg']
self.assertEqual(sg.FAILED, sg.status)
self.assertIn('not found', sg.status_reason)
stack.delete()
self.m.VerifyAll()
def test_security_group_nova_exception(self):
#create script
self.mock_no_neutron()
nova.NovaClientPlugin._create().AndReturn(self.fc)
sg_name = utils.PhysName('test_stack', 'the_sg')
nova_sg.SecurityGroupManager.list().AndReturn([
NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[],
),
NovaSG(
id=1,
name='test',
description='FAKE_SECURITY_GROUP',
rules=[],
)
])
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '22', '22', '0.0.0.0/0', None).AndRaise(
fakes.fake_exception(400, 'Rule already exists'))
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', '80', '80', '0.0.0.0/0', None).AndReturn(
fakes.fake_exception(400, 'Rule already exists'))
nova_sgr.SecurityGroupRuleManager.create(
2, 'tcp', None, None, None, 1).AndReturn(
fakes.fake_exception(400, 'Rule already exists'))
nova_sgr.SecurityGroupRuleManager.create(
2, 'icmp', None, None, None, '1').AndReturn(
fakes.fake_exception(400, 'Rule already exists'))
# delete script
nova_sg.SecurityGroupManager.get(2).AndReturn(NovaSG(
id=2,
name=sg_name,
description='HTTP and SSH access',
rules=[{
"from_port": 22,
"group": {},
"ip_protocol": "tcp",
"to_port": 22,
"parent_group_id": 2,
"ip_range": {
"cidr": "0.0.0.0/0"
},
'id': 130
}, {
'from_port': 80,
'group': {},
'ip_protocol': 'tcp',
'to_port': 80,
'parent_group_id': 2,
'ip_range': {
'cidr': '0.0.0.0/0'
},
'id': 131
}, {
'from_port': None,
'group': {
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'name': 'test'
},
'ip_protocol': 'tcp',
'to_port': None,
'parent_group_id': 2,
'ip_range': {},
'id': 132
}, {
'from_port': None,
'group': {
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'name': 'test'
},
'ip_protocol': 'icmp',
'to_port': None,
'parent_group_id': 2,
'ip_range': {},
'id': 133
}]
))
nova_sgr.SecurityGroupRuleManager.delete(130).AndRaise(
fakes.fake_exception())
nova_sgr.SecurityGroupRuleManager.delete(131).AndRaise(
fakes.fake_exception())
nova_sgr.SecurityGroupRuleManager.delete(132).AndRaise(
fakes.fake_exception())
nova_sgr.SecurityGroupRuleManager.delete(133).AndRaise(
fakes.fake_exception())
nova_sg.SecurityGroupManager.delete(2).AndReturn(None)
nova_sg.SecurityGroupManager.get(2).AndRaise(
fakes.fake_exception())
self.m.ReplayAll()
stack = self.create_stack(self.test_template_nova)
sg = stack['the_sg']
self.assertResourceState(sg, utils.PhysName('test_stack', 'the_sg'))
scheduler.TaskRunner(sg.delete)()
sg.state_set(sg.CREATE, sg.COMPLETE, 'to delete again')
sg.resource_id = 2
stack.delete()
self.m.VerifyAll()
def test_security_group_nova_with_egress_rules(self):
self.mock_no_neutron()
t = template_format.parse(self.test_template_nova_with_egress)
stack = self.parse_stack(t)
sg = stack['the_sg']
self.assertRaises(exception.EgressRuleNotAllowed, sg.validate)
def test_security_group_neutron(self):
#create script
self.stubout_neutron_create_security_group()
# delete script
self.stubout_neutron_delete_security_group_rules()
neutronclient.Client.delete_security_group('aaaa').AndReturn(None)
self.m.ReplayAll()
@ -783,25 +805,25 @@ Resources:
'security_group_rules': [{
'direction': 'ingress',
'protocol': 'tcp',
'port_range_max': '22',
'port_range_max': 22,
'id': 'bbbb',
'ethertype': 'IPv4',
'security_group_id': 'aaaa',
'remote_group_id': None,
'remote_ip_prefix': '0.0.0.0/0',
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'port_range_min': '22'
'port_range_min': 22
}, {
'direction': 'ingress',
'protocol': 'tcp',
'port_range_max': '80',
'port_range_max': 80,
'id': 'cccc',
'ethertype': 'IPv4',
'security_group_id': 'aaaa',
'remote_group_id': None,
'remote_ip_prefix': '0.0.0.0/0',
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'port_range_min': '80'
'port_range_min': 80
}, {
'direction': 'ingress',
'protocol': 'tcp',
@ -816,14 +838,14 @@ Resources:
}, {
'direction': 'egress',
'protocol': 'tcp',
'port_range_max': '22',
'port_range_max': 22,
'id': 'eeee',
'ethertype': 'IPv4',
'security_group_id': 'aaaa',
'remote_group_id': None,
'remote_ip_prefix': '10.0.1.0/24',
'tenant_id': 'f18ca530cc05425e8bac0a5ff92f7e88',
'port_range_min': '22'
'port_range_min': 22
}, {
'direction': 'egress',
'protocol': None,