Merge "Uses jsonschema library to verify clean steps"

This commit is contained in:
Jenkins 2016-03-14 18:46:18 +00:00 committed by Gerrit Code Review
commit e346849ee6
2 changed files with 55 additions and 44 deletions

View File

@ -16,6 +16,7 @@
import ast
import datetime
import jsonschema
from oslo_config import cfg
from oslo_log import log
from oslo_utils import strutils
@ -46,6 +47,36 @@ CONF.import_opt('heartbeat_timeout', 'ironic.conductor.manager',
group='conductor')
LOG = log.getLogger(__name__)
_CLEAN_STEPS_SCHEMA = {
"$schema": "http://json-schema.org/schema#",
"title": "Clean steps schema",
"type": "array",
# list of clean steps
"items": {
"type": "object",
# args is optional
"required": ["interface", "step"],
"properties": {
"interface": {
"description": "driver interface",
"enum": list(conductor_utils.CLEANING_INTERFACE_PRIORITY)
# interface value must be one of the valid interfaces
},
"step": {
"description": "name of clean step",
"type": "string",
"minLength": 1
},
"args": {
"description": "additional args",
"type": "object",
"properties": {}
},
},
# interface, step and args are the only expected keys
"additionalProperties": False
}
}
# Vendor information for node's driver:
# key = driver name;
@ -520,44 +551,11 @@ def _check_clean_steps(clean_steps):
clean_steps parameter of :func:`NodeStatesController.provision`.
:raises: InvalidParameterValue if validation of clean steps fails.
"""
if not isinstance(clean_steps, list):
raise exception.InvalidParameterValue(_('clean_steps must be a '
'list of dictionaries.'))
all_errors = []
interfaces = conductor_utils.CLEANING_INTERFACE_PRIORITY.keys()
for step in clean_steps:
if not isinstance(step, dict):
all_errors.append(_('Clean step must be a dictionary; invalid '
'step: %(step)s.') % {'step': step})
continue
errors = []
unknown = set(step) - set(['interface', 'step', 'args'])
if unknown:
errors.append(_('Unrecognized keys %(keys)s')
% {'keys': ', '.join(unknown)})
for key in ['interface', 'step']:
if key not in step or not step[key]:
errors.append(_('Missing value for key "%(key)s"')
% {'key': key})
elif key == 'interface':
if step[key] not in interfaces:
errors.append(_('"interface" value must be one of '
'%(valid)s')
% {'valid': ', '.join(interfaces)})
args = step.get('args')
if args is not None and not isinstance(args, dict):
errors.append(_('"args" must be a dictionary'))
if errors:
errors.append(_('invalid step: %(step)s.') % {'step': step})
all_errors.append('; '.join(errors))
if all_errors:
raise exception.InvalidParameterValue(
_('Invalid clean_steps. %s') % ' '.join(all_errors))
try:
jsonschema.validate(clean_steps, _CLEAN_STEPS_SCHEMA)
except jsonschema.ValidationError as exc:
raise exception.InvalidParameterValue(_('Invalid clean_steps: %s') %
exc)
class Node(base.APIBase):

View File

@ -2345,19 +2345,20 @@ class TestCheckCleanSteps(base.TestCase):
def test__check_clean_steps_not_list(self):
clean_steps = {"step": "upgrade_firmware", "interface": "deploy"}
self.assertRaisesRegexp(exception.InvalidParameterValue,
'list',
"not of type 'array'",
api_node._check_clean_steps, clean_steps)
def test__check_clean_steps_step_not_dict(self):
clean_steps = ['clean step']
self.assertRaisesRegexp(exception.InvalidParameterValue,
'dictionary',
"not of type 'object'",
api_node._check_clean_steps, clean_steps)
def test__check_clean_steps_step_key_invalid(self):
clean_steps = [{"unknown": "upgrade_firmware", "interface": "deploy"}]
clean_steps = [{"step": "upgrade_firmware", "interface": "deploy",
"unknown": "upgrade_firmware"}]
self.assertRaisesRegexp(exception.InvalidParameterValue,
'Unrecognized',
'unexpected',
api_node._check_clean_steps, clean_steps)
def test__check_clean_steps_step_missing_interface(self):
@ -2366,16 +2367,28 @@ class TestCheckCleanSteps(base.TestCase):
'interface',
api_node._check_clean_steps, clean_steps)
def test__check_clean_steps_step_missing_step_key(self):
clean_steps = [{"interface": "deploy"}]
self.assertRaisesRegexp(exception.InvalidParameterValue,
'step',
api_node._check_clean_steps, clean_steps)
def test__check_clean_steps_step_missing_step_value(self):
clean_steps = [{"step": None, "interface": "deploy"}]
self.assertRaisesRegexp(exception.InvalidParameterValue,
'step',
"not of type 'string'",
api_node._check_clean_steps, clean_steps)
def test__check_clean_steps_step_min_length_step_value(self):
clean_steps = [{"step": "", "interface": "deploy"}]
self.assertRaisesRegexp(exception.InvalidParameterValue,
'is too short',
api_node._check_clean_steps, clean_steps)
def test__check_clean_steps_step_interface_value_invalid(self):
clean_steps = [{"step": "upgrade_firmware", "interface": "not"}]
self.assertRaisesRegexp(exception.InvalidParameterValue,
'"interface" value must be one of',
'is not one of',
api_node._check_clean_steps, clean_steps)
def test__check_clean_steps_step_args_value_invalid(self):