Merge "Ignore irrelevant secgroup form field errors"

This commit is contained in:
Jenkins 2013-10-14 10:06:39 +00:00 committed by Gerrit Code Review
commit 678a5cb30a
3 changed files with 110 additions and 29 deletions

View File

@ -273,6 +273,10 @@ class AddRule(forms.SelfHandlingForm):
def clean(self):
cleaned_data = super(AddRule, self).clean()
def update_cleaned_data(key, value):
cleaned_data[key] = value
self.errors.pop(key, None)
rule_menu = cleaned_data.get('rule_menu')
port_or_range = cleaned_data.get("port_or_range")
remote = cleaned_data.get("remote")
@ -285,7 +289,7 @@ class AddRule(forms.SelfHandlingForm):
port = cleaned_data.get("port", None)
if rule_menu == 'icmp':
cleaned_data['ip_protocol'] = rule_menu
update_cleaned_data('ip_protocol', rule_menu)
if icmp_type is None:
msg = _('The ICMP type is invalid.')
raise ValidationError(msg)
@ -298,17 +302,21 @@ class AddRule(forms.SelfHandlingForm):
if icmp_code not in xrange(-1, 256):
msg = _('The ICMP code not in range (-1, 255)')
raise ValidationError(msg)
cleaned_data['from_port'] = icmp_type
cleaned_data['to_port'] = icmp_code
update_cleaned_data('from_port', icmp_type)
update_cleaned_data('to_port', icmp_code)
update_cleaned_data('port', None)
elif rule_menu == 'tcp' or rule_menu == 'udp':
cleaned_data['ip_protocol'] = rule_menu
update_cleaned_data('ip_protocol', rule_menu)
update_cleaned_data('icmp_code', None)
update_cleaned_data('icmp_type', None)
if port_or_range == "port":
cleaned_data["from_port"] = port
cleaned_data["to_port"] = port
update_cleaned_data('from_port', port)
update_cleaned_data('to_port', port)
if port is None:
msg = _('The specified port is invalid.')
raise ValidationError(msg)
else:
update_cleaned_data('port', None)
if from_port is None:
msg = _('The "from" port number is invalid.')
raise ValidationError(msg)
@ -337,9 +345,9 @@ class AddRule(forms.SelfHandlingForm):
cleaned_data['direction'] = 'ingress'
if remote == "cidr":
cleaned_data['security_group'] = None
update_cleaned_data('security_group', None)
else:
cleaned_data['cidr'] = None
update_cleaned_data('cidr', None)
# If cleaned_data does not contain cidr, cidr is already marked
# as invalid, so skip the further validation for cidr.

View File

@ -55,6 +55,30 @@ class SecurityGroupsViewTests(test.TestCase):
'security_groups:add_rule',
args=[sec_group.id])
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',
'security_group_backend')})
def _add_security_group_rule_fixture(self, **kwargs):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_rule_create(
IsA(http.HttpRequest),
kwargs.get('sec_group', sec_group.id),
kwargs.get('ingress', 'ingress'),
kwargs.get('ethertype', 'IPv4'),
kwargs.get('ip_protocol', rule.ip_protocol),
kwargs.get('from_port', int(rule.from_port)),
kwargs.get('to_port', int(rule.to_port)),
kwargs.get('cidr', rule.ip_range['cidr']),
kwargs.get('security_group', u'%s' % sec_group.id)).AndReturn(rule)
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
return sec_group, rule
@test.create_stubs({api.network: ('security_group_get',)})
def test_update_security_groups_get(self):
sec_group = self.security_groups.first()
@ -168,26 +192,9 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.get(self.detail_url)
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',
'security_group_backend')})
def test_detail_add_rule_cidr(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id,
'ingress', 'IPv4',
rule.ip_protocol,
int(rule.from_port),
int(rule.to_port),
rule.ip_range['cidr'],
None).AndReturn(rule)
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
sec_group, rule = self._add_security_group_rule_fixture(
security_group=None)
self.mox.ReplayAll()
formData = {'method': 'AddRule',
@ -200,6 +207,72 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
def test_detail_add_rule_cidr_with_invalid_unused_fields(self):
sec_group, rule = self._add_security_group_rule_fixture(
security_group=None)
self.mox.ReplayAll()
formData = {'method': 'AddRule',
'id': sec_group.id,
'port_or_range': 'port',
'port': rule.from_port,
'to_port': 'INVALID',
'from_port': 'INVALID',
'icmp_code': 'INVALID',
'icmp_type': 'INVALID',
'security_group': 'INVALID',
'ip_protocol': 'INVALID',
'rule_menu': rule.ip_protocol,
'cidr': rule.ip_range['cidr'],
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, self.detail_url)
def test_detail_add_rule_securitygroup_with_invalid_unused_fields(self):
sec_group, rule = self._add_security_group_rule_fixture(
cidr=None, ethertype='')
self.mox.ReplayAll()
formData = {'method': 'AddRule',
'id': sec_group.id,
'port_or_range': 'port',
'port': rule.from_port,
'to_port': 'INVALID',
'from_port': 'INVALID',
'icmp_code': 'INVALID',
'icmp_type': 'INVALID',
'security_group': sec_group.id,
'ip_protocol': 'INVALID',
'rule_menu': rule.ip_protocol,
'cidr': 'INVALID',
'remote': 'sg'}
res = self.client.post(self.edit_url, formData)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, self.detail_url)
def test_detail_add_rule_icmp_with_invalid_unused_fields(self):
sec_group, rule = self._add_security_group_rule_fixture(
ip_protocol='icmp', security_group=None)
self.mox.ReplayAll()
formData = {'method': 'AddRule',
'id': sec_group.id,
'port_or_range': 'port',
'port': 'INVALID',
'to_port': 'INVALID',
'from_port': 'INVALID',
'icmp_code': rule.to_port,
'icmp_type': rule.from_port,
'security_group': sec_group.id,
'ip_protocol': 'INVALID',
'rule_menu': 'icmp',
'cidr': rule.ip_range['cidr'],
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',
'security_group_backend')})

View File

@ -69,7 +69,7 @@ def create_stubs(stubs_to_create={}):
def inner_stub_out(fn):
@wraps(fn)
def instance_stub_out(self):
def instance_stub_out(self, *args, **kwargs):
for key in stubs_to_create:
if not (isinstance(stubs_to_create[key], tuple) or
isinstance(stubs_to_create[key], list)):
@ -80,7 +80,7 @@ def create_stubs(stubs_to_create={}):
for value in stubs_to_create[key]:
self.mox.StubOutWithMock(key, value)
return fn(self)
return fn(self, *args, **kwargs)
return instance_stub_out
return inner_stub_out