Merge "Validate AdvEnum & return an InvalidInput on error"

This commit is contained in:
Jenkins 2014-11-24 12:51:07 +00:00 committed by Gerrit Code Review
commit 1c88f83f82
3 changed files with 159 additions and 5 deletions

View File

@ -165,8 +165,12 @@ class AdvEnum(wtypes.wsproperty):
return self._default
def _set(self, parent, value):
if self.datatype.validate(value):
setattr(parent, self._name, value)
try:
if self.datatype.validate(value):
setattr(parent, self._name, value)
except ValueError as e:
raise wsme.exc.InvalidInput(self._name.replace('_advenum_', '', 1),
value, e)
class CronType(wtypes.UserType):

View File

@ -532,8 +532,158 @@ class TestAlarms(v2.FunctionalTest,
'statistic': 'magic',
}
}
self.post_json('/alarms', params=json, expect_errors=True, status=400,
headers=self.auth_headers)
resp = self.post_json('/alarms', params=json, expect_errors=True,
status=400, headers=self.auth_headers)
expected_err_msg = ("Invalid input for field/attribute"
" statistic."
" Value: 'magic'."
" Value should be one of:"
" count, max, sum, avg, min")
self.assertEqual(expected_err_msg,
resp.json['error_message']['faultstring'])
alarms = list(self.alarm_conn.get_alarms())
self.assertEqual(4, len(alarms))
def test_post_invalid_alarm_input_state(self):
json = {
'name': 'alarm1',
'state': 'bad_state',
'type': 'threshold',
'threshold_rule': {
'meter_name': 'ameter',
'comparison_operator': 'gt',
'threshold': 50.0
}
}
resp = self.post_json('/alarms', params=json, expect_errors=True,
status=400, headers=self.auth_headers)
expected_err_msg = ("Invalid input for field/attribute state."
" Value: 'bad_state'."
" Value should be one of:"
" alarm, ok, insufficient data")
self.assertEqual(expected_err_msg,
resp.json['error_message']['faultstring'])
alarms = list(self.alarm_conn.get_alarms())
self.assertEqual(4, len(alarms))
def test_post_invalid_alarm_input_comparison_operator(self):
json = {
'name': 'alarm2',
'state': 'ok',
'type': 'threshold',
'threshold_rule': {
'meter_name': 'ameter',
'comparison_operator': 'bad_co',
'threshold': 50.0
}
}
resp = self.post_json('/alarms', params=json, expect_errors=True,
status=400, headers=self.auth_headers)
expected_err_msg = ("Invalid input for field/attribute"
" comparison_operator."
" Value: 'bad_co'."
" Value should be one of:"
" gt, lt, ne, ge, le, eq")
self.assertEqual(expected_err_msg,
resp.json['error_message']['faultstring'])
alarms = list(self.alarm_conn.get_alarms())
self.assertEqual(4, len(alarms))
def test_post_invalid_alarm_input_type(self):
json = {
'name': 'alarm3',
'state': 'ok',
'type': 'bad_type',
'threshold_rule': {
'meter_name': 'ameter',
'comparison_operator': 'gt',
'threshold': 50.0
}
}
resp = self.post_json('/alarms', params=json, expect_errors=True,
status=400, headers=self.auth_headers)
expected_err_msg = ("Invalid input for field/attribute"
" type."
" Value: 'bad_type'."
" Value should be one of:"
" threshold, combination")
self.assertEqual(expected_err_msg,
resp.json['error_message']['faultstring'])
alarms = list(self.alarm_conn.get_alarms())
self.assertEqual(4, len(alarms))
def test_post_invalid_alarm_input_enabled_str(self):
json = {
'name': 'alarm5',
'enabled': 'bad_enabled',
'state': 'ok',
'type': 'threshold',
'threshold_rule': {
'meter_name': 'ameter',
'comparison_operator': 'gt',
'threshold': 50.0
}
}
resp = self.post_json('/alarms', params=json, expect_errors=True,
status=400, headers=self.auth_headers)
expected_err_msg = ("Invalid input for field/attribute"
" enabled."
" Value: 'bad_enabled'."
" Wrong type. Expected '<type 'bool'>',"
" got '<type 'str'>'")
self.assertEqual(expected_err_msg,
resp.json['error_message']['faultstring'])
alarms = list(self.alarm_conn.get_alarms())
self.assertEqual(4, len(alarms))
def test_post_invalid_alarm_input_enabled_int(self):
json = {
'name': 'alarm6',
'enabled': 0,
'state': 'ok',
'type': 'threshold',
'threshold_rule': {
'meter_name': 'ameter',
'comparison_operator': 'gt',
'threshold': 50.0
}
}
resp = self.post_json('/alarms', params=json, expect_errors=True,
status=400, headers=self.auth_headers)
expected_err_msg = ("Invalid input for field/attribute"
" enabled."
" Value: '0'."
" Wrong type. Expected '<type 'bool'>',"
" got '<type 'int'>'")
self.assertEqual(expected_err_msg,
resp.json['error_message']['faultstring'])
alarms = list(self.alarm_conn.get_alarms())
self.assertEqual(4, len(alarms))
def test_post_invalid_combination_alarm_input_operator(self):
json = {
'enabled': False,
'name': 'alarm6',
'state': 'ok',
'type': 'combination',
'ok_actions': ['http://something/ok'],
'alarm_actions': ['http://something/alarm'],
'insufficient_data_actions': ['http://something/no'],
'repeat_actions': True,
'combination_rule': {
'alarm_ids': ['a',
'b'],
'operator': 'bad_operator',
}
}
resp = self.post_json('/alarms', params=json, expect_errors=True,
status=400, headers=self.auth_headers)
expected_err_msg = ("Invalid input for field/attribute"
" operator."
" Value: 'bad_operator'."
" Value should be one of: and, or")
self.assertEqual(expected_err_msg,
resp.json['error_message']['faultstring'])
alarms = list(self.alarm_conn.get_alarms())
self.assertEqual(4, len(alarms))

View File

@ -32,4 +32,4 @@ class TestWsmeCustomType(base.BaseTestCase):
obj = dummybase(ae="one")
self.assertEqual("one", obj.ae)
self.assertRaises(ValueError, dummybase, ae="not exists")
self.assertRaises(wsme.exc.InvalidInput, dummybase, ae="not exists")