Browse Source

Merge "Remove mox from test-neutron-firewall"

changes/06/567206/1
Zuul 3 years ago
committed by Gerrit Code Review
parent
commit
0f1e2a0bc1
  1. 537
      heat/tests/openstack/neutron/test_neutron_firewall.py

537
heat/tests/openstack/neutron/test_neutron_firewall.py

@ -79,29 +79,18 @@ class FirewallTest(common.HeatTestCase):
def setUp(self):
super(FirewallTest, self).setUp()
self.m.StubOutWithMock(neutronclient.Client, 'create_firewall')
self.m.StubOutWithMock(neutronclient.Client, 'delete_firewall')
self.m.StubOutWithMock(neutronclient.Client, 'show_firewall')
self.m.StubOutWithMock(neutronclient.Client, 'update_firewall')
self.mockclient = mock.Mock(spec=neutronclient.Client)
self.patchobject(neutronclient, 'Client', return_value=self.mockclient)
self.patchobject(neutron.NeutronClientPlugin, 'has_extension',
return_value=True)
def create_firewall(self, value_specs=True):
snippet = template_format.parse(firewall_template)
self.mockclient.create_firewall.return_value = {
'firewall': {'id': '5678'}
}
if not value_specs:
del snippet['resources']['firewall']['properties']['value_specs']
neutronclient.Client.create_firewall({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'firewall_policy_id': 'policy-id', 'shared': True}}
).AndReturn({'firewall': {'id': '5678'}})
else:
neutronclient.Client.create_firewall({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'router_ids': ['router_1', 'router_2'],
'firewall_policy_id': 'policy-id', 'shared': True}}
).AndReturn({'firewall': {'id': '5678'}})
self.stack = utils.parse_stack(snippet)
resource_defns = self.stack.t.resource_definitions(self.stack)
@ -111,21 +100,28 @@ class FirewallTest(common.HeatTestCase):
def test_create(self):
rsrc = self.create_firewall()
neutronclient.Client.show_firewall('5678').AndReturn(
{'firewall': {'status': 'ACTIVE'}})
self.m.ReplayAll()
self.mockclient.show_firewall.return_value = {
'firewall': {'status': 'ACTIVE'}
}
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall.assert_called_once_with({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'router_ids': ['router_1', 'router_2'],
'firewall_policy_id': 'policy-id', 'shared': True
}
})
self.mockclient.show_firewall.assert_called_once_with('5678')
def test_create_failed_error_status(self):
cfg.CONF.set_override('action_retry_limit', 0)
rsrc = self.create_firewall()
neutronclient.Client.show_firewall('5678').AndReturn(
{'firewall': {'status': 'PENDING_CREATE'}})
neutronclient.Client.show_firewall('5678').AndReturn(
{'firewall': {'status': 'ERROR'}})
self.m.ReplayAll()
self.mockclient.show_firewall.side_effect = [
{'firewall': {'status': 'PENDING_CREATE'}},
{'firewall': {'status': 'ERROR'}},
]
error = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(rsrc.create))
@ -134,16 +130,19 @@ class FirewallTest(common.HeatTestCase):
'Went to status ERROR due to "Error in Firewall"',
six.text_type(error))
self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
def test_create_failed(self):
neutronclient.Client.create_firewall({
self.mockclient.create_firewall.assert_called_once_with({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'router_ids': ['router_1', 'router_2'],
'firewall_policy_id': 'policy-id', 'shared': True}}
).AndRaise(exceptions.NeutronClientException())
self.m.ReplayAll()
'firewall_policy_id': 'policy-id', 'shared': True
}
})
self.mockclient.show_firewall.assert_called_with('5678')
def test_create_failed(self):
self.mockclient.create_firewall.side_effect = (
exceptions.NeutronClientException())
snippet = template_format.parse(firewall_template)
stack = utils.parse_stack(snippet)
@ -158,44 +157,67 @@ class FirewallTest(common.HeatTestCase):
'An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall.assert_called_once_with({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'router_ids': ['router_1', 'router_2'],
'firewall_policy_id': 'policy-id', 'shared': True
}
})
def test_delete(self):
rsrc = self.create_firewall()
neutronclient.Client.show_firewall('5678').AndReturn(
{'firewall': {'status': 'ACTIVE'}})
self.mockclient.show_firewall.side_effect = [
{'firewall': {'status': 'ACTIVE'}},
exceptions.NeutronClientException(status_code=404),
]
self.mockclient.delete_firewall.return_value = None
neutronclient.Client.delete_firewall('5678')
neutronclient.Client.show_firewall('5678').AndRaise(
exceptions.NeutronClientException(status_code=404))
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall.assert_called_once_with({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'router_ids': ['router_1', 'router_2'],
'firewall_policy_id': 'policy-id', 'shared': True
}
})
self.mockclient.delete_firewall.assert_called_once_with('5678')
self.mockclient.show_firewall.assert_called_with('5678')
def test_delete_already_gone(self):
neutronclient.Client.delete_firewall('5678').AndRaise(
rsrc = self.create_firewall()
self.mockclient.show_firewall.return_value = {
'firewall': {'status': 'ACTIVE'}
}
self.mockclient.delete_firewall.side_effect = (
exceptions.NeutronClientException(status_code=404))
rsrc = self.create_firewall()
neutronclient.Client.show_firewall('5678').AndReturn(
{'firewall': {'status': 'ACTIVE'}})
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall.assert_called_once_with({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'router_ids': ['router_1', 'router_2'],
'firewall_policy_id': 'policy-id', 'shared': True
}
})
self.mockclient.delete_firewall.assert_called_once_with('5678')
self.mockclient.show_firewall.assert_called_once_with('5678')
def test_delete_failed(self):
neutronclient.Client.delete_firewall('5678').AndRaise(
rsrc = self.create_firewall()
self.mockclient.show_firewall.return_value = {
'firewall': {'status': 'ACTIVE'}
}
self.mockclient.delete_firewall.side_effect = (
exceptions.NeutronClientException(status_code=400))
rsrc = self.create_firewall()
neutronclient.Client.show_firewall('5678').AndReturn(
{'firewall': {'status': 'ACTIVE'}})
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(rsrc.delete))
@ -204,45 +226,72 @@ class FirewallTest(common.HeatTestCase):
'An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall.assert_called_once_with({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'router_ids': ['router_1', 'router_2'],
'firewall_policy_id': 'policy-id', 'shared': True
}
})
self.mockclient.delete_firewall.assert_called_once_with('5678')
self.mockclient.show_firewall.assert_called_once_with('5678')
def test_attribute(self):
rsrc = self.create_firewall()
neutronclient.Client.show_firewall('5678').AndReturn(
{'firewall': {'status': 'ACTIVE'}})
neutronclient.Client.show_firewall('5678').MultipleTimes(
).AndReturn(
{'firewall': {'admin_state_up': True,
'firewall_policy_id': 'policy-id',
'shared': True}})
self.m.ReplayAll()
self.mockclient.show_firewall.return_value = {
'firewall': {
'status': 'ACTIVE',
'admin_state_up': True,
'firewall_policy_id': 'policy-id',
'shared': True,
}
}
scheduler.TaskRunner(rsrc.create)()
self.assertIs(True, rsrc.FnGetAtt('admin_state_up'))
self.assertEqual('This attribute is currently unsupported in neutron '
'firewall resource.', rsrc.FnGetAtt('shared'))
self.assertEqual('policy-id', rsrc.FnGetAtt('firewall_policy_id'))
self.m.VerifyAll()
self.mockclient.create_firewall.assert_called_once_with({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'router_ids': ['router_1', 'router_2'],
'firewall_policy_id': 'policy-id', 'shared': True
}
})
self.mockclient.show_firewall.assert_called_with('5678')
def test_attribute_failed(self):
rsrc = self.create_firewall()
neutronclient.Client.show_firewall('5678').AndReturn(
{'firewall': {'status': 'ACTIVE'}})
self.m.ReplayAll()
self.mockclient.show_firewall.return_value = {
'firewall': {'status': 'ACTIVE'}
}
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.InvalidTemplateAttribute,
rsrc.FnGetAtt, 'subnet_id')
self.assertEqual(
'The Referenced Attribute (firewall subnet_id) is '
'incorrect.', six.text_type(error))
self.m.VerifyAll()
self.mockclient.create_firewall.assert_called_once_with({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'router_ids': ['router_1', 'router_2'],
'firewall_policy_id': 'policy-id', 'shared': True
}
})
self.mockclient.show_firewall.assert_called_once_with('5678')
def test_update(self):
rsrc = self.create_firewall()
neutronclient.Client.show_firewall('5678').AndReturn(
{'firewall': {'status': 'ACTIVE'}})
neutronclient.Client.update_firewall(
'5678', {'firewall': {'admin_state_up': False}})
self.m.ReplayAll()
self.mockclient.show_firewall.return_value = {
'firewall': {'status': 'ACTIVE'}
}
self.mockclient.update_firewall.return_value = None
scheduler.TaskRunner(rsrc.create)()
props = self.fw_props.copy()
@ -250,16 +299,23 @@ class FirewallTest(common.HeatTestCase):
update_template = rsrc.t.freeze(properties=props)
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
self.mockclient.create_firewall.assert_called_once_with({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'router_ids': ['router_1', 'router_2'],
'firewall_policy_id': 'policy-id', 'shared': True
}
})
self.mockclient.show_firewall.assert_called_once_with('5678')
self.mockclient.update_firewall.assert_called_once_with(
'5678', {'firewall': {'admin_state_up': False}})
def test_update_with_value_specs(self):
rsrc = self.create_firewall(value_specs=False)
neutronclient.Client.show_firewall('5678').AndReturn(
{'firewall': {'status': 'ACTIVE'}})
neutronclient.Client.update_firewall(
'5678', {'firewall': {'router_ids': ['router_1',
'router_2']}})
self.m.ReplayAll()
self.mockclient.show_firewall.return_value = {
'firewall': {'status': 'ACTIVE'}
}
scheduler.TaskRunner(rsrc.create)()
prop_diff = {
'value_specs': {
@ -270,11 +326,21 @@ class FirewallTest(common.HeatTestCase):
rsrc.type(),
prop_diff)
rsrc.handle_update(update_snippet, {}, prop_diff)
self.m.VerifyAll()
self.mockclient.create_firewall.assert_called_once_with({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'firewall_policy_id': 'policy-id', 'shared': True
}
})
self.mockclient.show_firewall.assert_called_once_with('5678')
self.mockclient.update_firewall.assert_called_once_with(
'5678', {'firewall': {'router_ids': ['router_1',
'router_2']}})
def test_get_live_state(self):
rsrc = self.create_firewall(value_specs=True)
rsrc.client().show_firewall = mock.Mock(return_value={
self.mockclient.show_firewall.return_value = {
'firewall': {
'status': 'ACTIVE',
'router_ids': ['router_1', 'router_2'],
@ -285,8 +351,8 @@ class FirewallTest(common.HeatTestCase):
'id': '11425cd4-41b6-4fd4-97aa-17629c63de61',
'description': ''
}
})
self.m.ReplayAll()
}
scheduler.TaskRunner(rsrc.create)()
reality = rsrc.get_live_state(rsrc.properties)
@ -301,26 +367,30 @@ class FirewallTest(common.HeatTestCase):
}
self.assertEqual(expected, reality)
self.m.VerifyAll()
self.mockclient.create_firewall.assert_called_once_with({
'firewall': {
'name': 'test-firewall', 'admin_state_up': True,
'router_ids': ['router_1', 'router_2'],
'firewall_policy_id': 'policy-id', 'shared': True
}
})
self.mockclient.show_firewall.assert_called_with('5678')
class FirewallPolicyTest(common.HeatTestCase):
def setUp(self):
super(FirewallPolicyTest, self).setUp()
self.m.StubOutWithMock(neutronclient.Client, 'create_firewall_policy')
self.m.StubOutWithMock(neutronclient.Client, 'delete_firewall_policy')
self.m.StubOutWithMock(neutronclient.Client, 'show_firewall_policy')
self.m.StubOutWithMock(neutronclient.Client, 'update_firewall_policy')
self.mockclient = mock.Mock(spec=neutronclient.Client)
self.patchobject(neutronclient, 'Client', return_value=self.mockclient)
self.patchobject(neutron.NeutronClientPlugin, 'has_extension',
return_value=True)
def create_firewall_policy(self):
neutronclient.Client.create_firewall_policy({
'firewall_policy': {
'name': 'test-firewall-policy', 'shared': True,
'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']}}
).AndReturn({'firewall_policy': {'id': '5678'}})
self.mockclient.create_firewall_policy.return_value = {
'firewall_policy': {'id': '5678'}
}
snippet = template_format.parse(firewall_policy_template)
self.stack = utils.parse_stack(snippet)
@ -331,18 +401,20 @@ class FirewallPolicyTest(common.HeatTestCase):
def test_create(self):
rsrc = self.create_firewall_policy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_create_failed(self):
neutronclient.Client.create_firewall_policy({
self.mockclient.create_firewall_policy.assert_called_once_with({
'firewall_policy': {
'name': 'test-firewall-policy', 'shared': True,
'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']}}
).AndRaise(exceptions.NeutronClientException())
self.m.ReplayAll()
'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']
}
})
def test_create_failed(self):
self.mockclient.create_firewall_policy.side_effect = (
exceptions.NeutronClientException())
snippet = template_format.parse(firewall_policy_template)
stack = utils.parse_stack(snippet)
@ -357,37 +429,56 @@ class FirewallPolicyTest(common.HeatTestCase):
'An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall_policy.assert_called_once_with({
'firewall_policy': {
'name': 'test-firewall-policy', 'shared': True,
'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']
}
})
def test_delete(self):
neutronclient.Client.delete_firewall_policy('5678')
neutronclient.Client.show_firewall_policy('5678').AndRaise(
rsrc = self.create_firewall_policy()
self.mockclient.delete_firewall_policy.return_value = None
self.mockclient.show_firewall_policy.side_effect = (
exceptions.NeutronClientException(status_code=404))
rsrc = self.create_firewall_policy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall_policy.assert_called_once_with({
'firewall_policy': {
'name': 'test-firewall-policy', 'shared': True,
'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']
}
})
self.mockclient.delete_firewall_policy.assert_called_once_with('5678')
self.mockclient.show_firewall_policy.assert_called_once_with('5678')
def test_delete_already_gone(self):
neutronclient.Client.delete_firewall_policy('5678').AndRaise(
rsrc = self.create_firewall_policy()
self.mockclient.delete_firewall_policy.side_effect = (
exceptions.NeutronClientException(status_code=404))
rsrc = self.create_firewall_policy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall_policy.assert_called_once_with({
'firewall_policy': {
'name': 'test-firewall-policy', 'shared': True,
'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']
}
})
self.mockclient.delete_firewall_policy.assert_called_once_with('5678')
self.mockclient.show_firewall_policy.assert_not_called()
def test_delete_failed(self):
neutronclient.Client.delete_firewall_policy('5678').AndRaise(
rsrc = self.create_firewall_policy()
self.mockclient.delete_firewall_policy.side_effect = (
exceptions.NeutronClientException(status_code=400))
rsrc = self.create_firewall_policy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(rsrc.delete))
@ -396,35 +487,56 @@ class FirewallPolicyTest(common.HeatTestCase):
'An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall_policy.assert_called_once_with({
'firewall_policy': {
'name': 'test-firewall-policy', 'shared': True,
'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']
}
})
self.mockclient.delete_firewall_policy.assert_called_once_with('5678')
self.mockclient.show_firewall_policy.assert_not_called()
def test_attribute(self):
rsrc = self.create_firewall_policy()
neutronclient.Client.show_firewall_policy('5678').MultipleTimes(
).AndReturn(
{'firewall_policy': {'audited': True, 'shared': True}})
self.m.ReplayAll()
self.mockclient.show_firewall_policy.return_value = {
'firewall_policy': {'audited': True, 'shared': True}
}
scheduler.TaskRunner(rsrc.create)()
self.assertIs(True, rsrc.FnGetAtt('audited'))
self.assertIs(True, rsrc.FnGetAtt('shared'))
self.m.VerifyAll()
self.mockclient.create_firewall_policy.assert_called_once_with({
'firewall_policy': {
'name': 'test-firewall-policy', 'shared': True,
'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']
}
})
self.mockclient.show_firewall_policy.assert_called_with('5678')
def test_attribute_failed(self):
rsrc = self.create_firewall_policy()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.InvalidTemplateAttribute,
rsrc.FnGetAtt, 'subnet_id')
self.assertEqual(
'The Referenced Attribute (firewall_policy subnet_id) is '
'incorrect.', six.text_type(error))
self.m.VerifyAll()
self.mockclient.create_firewall_policy.assert_called_once_with({
'firewall_policy': {
'name': 'test-firewall-policy', 'shared': True,
'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']
}
})
self.mockclient.show_firewall_policy.assert_not_called()
def test_update(self):
rsrc = self.create_firewall_policy()
neutronclient.Client.update_firewall_policy(
'5678', {'firewall_policy': {'firewall_rules': ['3', '4']}})
self.m.ReplayAll()
self.mockclient.update_firewall_policy.return_value = None
scheduler.TaskRunner(rsrc.create)()
props = self.tmpl['resources']['firewall_policy']['properties'].copy()
@ -432,27 +544,29 @@ class FirewallPolicyTest(common.HeatTestCase):
update_template = rsrc.t.freeze(properties=props)
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
self.mockclient.create_firewall_policy.assert_called_once_with({
'firewall_policy': {
'name': 'test-firewall-policy', 'shared': True,
'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']
}
})
self.mockclient.update_firewall_policy.assert_called_once_with(
'5678', {'firewall_policy': {'firewall_rules': ['3', '4']}})
class FirewallRuleTest(common.HeatTestCase):
def setUp(self):
super(FirewallRuleTest, self).setUp()
self.m.StubOutWithMock(neutronclient.Client, 'create_firewall_rule')
self.m.StubOutWithMock(neutronclient.Client, 'delete_firewall_rule')
self.m.StubOutWithMock(neutronclient.Client, 'show_firewall_rule')
self.m.StubOutWithMock(neutronclient.Client, 'update_firewall_rule')
self.mockclient = mock.Mock(spec=neutronclient.Client)
self.patchobject(neutronclient, 'Client', return_value=self.mockclient)
self.patchobject(neutron.NeutronClientPlugin, 'has_extension',
return_value=True)
def create_firewall_rule(self):
neutronclient.Client.create_firewall_rule({
'firewall_rule': {
'name': 'test-firewall-rule', 'shared': True,
'action': 'allow', 'protocol': 'tcp', 'enabled': True,
'ip_version': "4"}}
).AndReturn({'firewall_rule': {'id': '5678'}})
self.mockclient.create_firewall_rule.return_value = {
'firewall_rule': {'id': '5678'}
}
snippet = template_format.parse(firewall_rule_template)
self.stack = utils.parse_stack(snippet)
@ -463,10 +577,17 @@ class FirewallRuleTest(common.HeatTestCase):
def test_create(self):
rsrc = self.create_firewall_rule()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall_rule.assert_called_once_with({
'firewall_rule': {
'name': 'test-firewall-rule', 'shared': True,
'action': 'allow', 'protocol': 'tcp', 'enabled': True,
'ip_version': "4"
}
})
def test_validate_failed_with_string_None_protocol(self):
snippet = template_format.parse(firewall_rule_template)
@ -479,13 +600,9 @@ class FirewallRuleTest(common.HeatTestCase):
self.assertRaises(exception.StackValidationFailed, rsrc.validate)
def test_create_with_protocol_any(self):
neutronclient.Client.create_firewall_rule({
'firewall_rule': {
'name': 'test-firewall-rule', 'shared': True,
'action': 'allow', 'protocol': None, 'enabled': True,
'ip_version': "4"}}
).AndReturn({'firewall_rule': {'id': '5678'}})
self.m.ReplayAll()
self.mockclient.create_firewall_rule.return_value = {
'firewall_rule': {'id': '5678'}
}
snippet = template_format.parse(firewall_rule_template)
snippet['resources']['firewall_rule']['properties']['protocol'] = 'any'
@ -494,16 +611,18 @@ class FirewallRuleTest(common.HeatTestCase):
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
def test_create_failed(self):
neutronclient.Client.create_firewall_rule({
self.mockclient.create_firewall_rule.assert_called_once_with({
'firewall_rule': {
'name': 'test-firewall-rule', 'shared': True,
'action': 'allow', 'protocol': 'tcp', 'enabled': True,
'ip_version': "4"}}
).AndRaise(exceptions.NeutronClientException())
self.m.ReplayAll()
'action': 'allow', 'protocol': None, 'enabled': True,
'ip_version': "4"
}
})
def test_create_failed(self):
self.mockclient.create_firewall_rule.side_effect = (
exceptions.NeutronClientException())
snippet = template_format.parse(firewall_rule_template)
stack = utils.parse_stack(snippet)
@ -518,37 +637,59 @@ class FirewallRuleTest(common.HeatTestCase):
'An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall_rule.assert_called_once_with({
'firewall_rule': {
'name': 'test-firewall-rule', 'shared': True,
'action': 'allow', 'protocol': 'tcp', 'enabled': True,
'ip_version': "4"
}
})
def test_delete(self):
neutronclient.Client.delete_firewall_rule('5678')
neutronclient.Client.show_firewall_rule('5678').AndRaise(
rsrc = self.create_firewall_rule()
self.mockclient.delete_firewall_rule.return_value = None
self.mockclient.show_firewall_rule.side_effect = (
exceptions.NeutronClientException(status_code=404))
rsrc = self.create_firewall_rule()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall_rule.assert_called_once_with({
'firewall_rule': {
'name': 'test-firewall-rule', 'shared': True,
'action': 'allow', 'protocol': 'tcp', 'enabled': True,
'ip_version': "4"
}
})
self.mockclient.delete_firewall_rule.assert_called_once_with('5678')
self.mockclient.show_firewall_rule.assert_called_once_with('5678')
def test_delete_already_gone(self):
neutronclient.Client.delete_firewall_rule('5678').AndRaise(
rsrc = self.create_firewall_rule()
self.mockclient.delete_firewall_rule.side_effect = (
exceptions.NeutronClientException(status_code=404))
rsrc = self.create_firewall_rule()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall_rule.assert_called_once_with({
'firewall_rule': {
'name': 'test-firewall-rule', 'shared': True,
'action': 'allow', 'protocol': 'tcp', 'enabled': True,
'ip_version': "4"
}
})
self.mockclient.delete_firewall_rule.assert_called_once_with('5678')
self.mockclient.show_firewall_rule.assert_not_called()
def test_delete_failed(self):
neutronclient.Client.delete_firewall_rule('5678').AndRaise(
rsrc = self.create_firewall_rule()
self.mockclient.delete_firewall_rule.side_effect = (
exceptions.NeutronClientException(status_code=400))
rsrc = self.create_firewall_rule()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(rsrc.delete))
@ -557,35 +698,59 @@ class FirewallRuleTest(common.HeatTestCase):
'An unknown exception occurred.',
six.text_type(error))
self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state)
self.m.VerifyAll()
self.mockclient.create_firewall_rule.assert_called_once_with({
'firewall_rule': {
'name': 'test-firewall-rule', 'shared': True,
'action': 'allow', 'protocol': 'tcp', 'enabled': True,
'ip_version': "4"
}
})
self.mockclient.delete_firewall_rule.assert_called_once_with('5678')
self.mockclient.show_firewall_rule.assert_not_called()
def test_attribute(self):
rsrc = self.create_firewall_rule()
neutronclient.Client.show_firewall_rule('5678').MultipleTimes(
).AndReturn(
{'firewall_rule': {'protocol': 'tcp', 'shared': True}})
self.m.ReplayAll()
self.mockclient.show_firewall_rule.return_value = {
'firewall_rule': {'protocol': 'tcp', 'shared': True}
}
scheduler.TaskRunner(rsrc.create)()
self.assertEqual('tcp', rsrc.FnGetAtt('protocol'))
self.assertIs(True, rsrc.FnGetAtt('shared'))
self.m.VerifyAll()
self.mockclient.create_firewall_rule.assert_called_once_with({
'firewall_rule': {
'name': 'test-firewall-rule', 'shared': True,
'action': 'allow', 'protocol': 'tcp', 'enabled': True,
'ip_version': "4"
}
})
self.mockclient.show_firewall_rule.assert_called_with('5678')
def test_attribute_failed(self):
rsrc = self.create_firewall_rule()
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
error = self.assertRaises(exception.InvalidTemplateAttribute,
rsrc.FnGetAtt, 'subnet_id')
self.assertEqual(
'The Referenced Attribute (firewall_rule subnet_id) is '
'incorrect.', six.text_type(error))
self.m.VerifyAll()
self.mockclient.create_firewall_rule.assert_called_once_with({
'firewall_rule': {
'name': 'test-firewall-rule', 'shared': True,
'action': 'allow', 'protocol': 'tcp', 'enabled': True,
'ip_version': "4"
}
})
self.mockclient.show_firewall_rule.assert_not_called()
def test_update(self):
rsrc = self.create_firewall_rule()
neutronclient.Client.update_firewall_rule(
'5678', {'firewall_rule': {'protocol': 'icmp'}})
self.m.ReplayAll()
self.mockclient.update_firewall_rule.return_value = None
scheduler.TaskRunner(rsrc.create)()
props = self.tmpl['resources']['firewall_rule']['properties'].copy()
@ -593,17 +758,33 @@ class FirewallRuleTest(common.HeatTestCase):
update_template = rsrc.t.freeze(properties=props)
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
self.mockclient.create_firewall_rule.assert_called_once_with({
'firewall_rule': {
'name': 'test-firewall-rule', 'shared': True,
'action': 'allow', 'protocol': 'tcp', 'enabled': True,
'ip_version': "4"
}
})
self.mockclient.update_firewall_rule.assert_called_once_with(
'5678', {'firewall_rule': {'protocol': 'icmp'}})
def test_update_protocol_to_any(self):
rsrc = self.create_firewall_rule()
neutronclient.Client.update_firewall_rule(
'5678', {'firewall_rule': {'protocol': None}})
self.m.ReplayAll()
self.mockclient.update_firewall_rule.return_value = None
scheduler.TaskRunner(rsrc.create)()
# update to 'any' protocol
props = self.tmpl['resources']['firewall_rule']['properties'].copy()
props['protocol'] = 'any'
update_template = rsrc.t.freeze(properties=props)
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
self.mockclient.create_firewall_rule.assert_called_once_with({
'firewall_rule': {
'name': 'test-firewall-rule', 'shared': True,
'action': 'allow', 'protocol': 'tcp', 'enabled': True,
'ip_version': "4"
}
})
self.mockclient.update_firewall_rule.assert_called_once_with(
'5678', {'firewall_rule': {'protocol': None}})
Loading…
Cancel
Save