Refactor secret validation for redability

Mostly applied method extraction to the secret schema validation
function in order to have it more readable.

Change-Id: I1bf35adebc95aadfc4be03400ddd848625569e01
This commit is contained in:
Juan Antonio Osorio
2014-06-24 10:15:29 +03:00
parent 3695063004
commit cc9f511f54

View File

@@ -78,6 +78,29 @@ class ValidatorBase(object):
parent_schema)
return schema_name
def _assert_schema_is_valid(self, json_data, schema_name):
"""Assert that the JSON structure is valid according to the given
schema.
:raises: InvalidObject exception if the data is not schema compliant.
"""
try:
schema.validate(json_data, self.schema)
except schema.ValidationError as e:
raise exception.InvalidObject(schema=schema_name,
reason=e.message,
property=get_invalid_property(e))
def _assert_validity(self, valid_condition, schema_name, message,
property):
"""Assert that a certain condition is met.
:raises: InvalidObject exception if the condition is not met.
"""
if not valid_condition:
raise exception.InvalidObject(schema=schema_name, reason=message,
property=property)
class NewSecretValidator(ValidatorBase):
"""Validate a new secret."""
@@ -106,99 +129,43 @@ class NewSecretValidator(ValidatorBase):
}
def validate(self, json_data, parent_schema=None):
"""Validate the input JSON for the schema for secrets."""
schema_name = self._full_name(parent_schema)
self._assert_schema_is_valid(json_data, schema_name)
try:
schema.validate(json_data, self.schema)
except schema.ValidationError as e:
raise exception.InvalidObject(schema=schema_name,
reason=e.message,
property=get_invalid_property(e))
json_data['name'] = self._extract_name(json_data)
# Validate/normalize 'name'.
name = json_data.get('name', '').strip()
if not name:
name = None
json_data['name'] = name
# Validate/convert 'expiration' if provided.
expiration = self._extract_expiration(json_data, schema_name)
if expiration:
# Verify not already expired.
utcnow = timeutils.utcnow()
if expiration <= utcnow:
raise exception.InvalidObject(schema=schema_name,
reason=u._("'expiration' is "
"before current "
"time"),
property="expiration")
self._assert_expiration_is_valid(expiration, schema_name)
json_data['expiration'] = expiration
# Validate/convert 'payload' if provided.
if 'payload' in json_data:
content_type = json_data.get('payload_content_type')
if content_type is None:
raise exception.InvalidObject(
schema=schema_name,
reason=u._("If 'payload' is supplied, "
"'payload_content_type' must "
"also be supplied."),
property="payload_content_type"
)
if content_type.lower() not in mime_types.SUPPORTED:
raise exception.InvalidObject(
schema=schema_name,
reason=u._("payload_content_type is not one of "
"{0}").format(mime_types.SUPPORTED),
property="payload_content_type"
)
content_encoding = json_data.get('payload_content_encoding')
if content_type == 'application/octet-stream' and \
content_encoding is None:
raise exception.InvalidObject(
schema=schema_name,
reason=u._("payload_content_encoding must be specified "
"when payload_content_type is application/"
"octet-stream."),
property="payload_content_encoding"
)
if content_type.startswith('text/plain') and \
content_encoding is not None:
raise exception.InvalidObject(
schema=schema_name,
reason=u._("payload_content_encoding must not be "
"specified when payload_content_type is "
"text/plain"),
property="payload_content_encoding"
)
payload = json_data['payload']
if secret_too_big(payload):
raise exception.LimitExceeded()
payload = payload.strip()
if not payload:
raise exception.InvalidObject(schema=schema_name,
reason=u._("If 'payload' "
"specified, must "
"be non empty"),
property="payload")
self._validate_content_parameters(content_type, content_encoding,
schema_name)
payload = self._extract_payload(json_data)
self._assert_validity(payload, schema_name,
u._("If 'payload' specified, must be non "
"empty"),
"payload")
json_data['payload'] = payload
elif 'payload_content_type' in json_data and \
parent_schema is None:
raise exception.InvalidObject(
schema=schema_name,
reason=u._("payload must be provided "
"when payload_content_type is specified"),
property="payload"
)
elif 'payload_content_type' in json_data:
self._assert_validity(parent_schema is not None, schema_name,
u._("payload must be provided when "
"payload_content_type is specified"),
"payload")
return json_data
def _extract_name(self, json_data):
"""Extracts and returns the name from the JSON data."""
name = json_data.get('name', '').strip()
if not name:
return None
return name
def _extract_expiration(self, json_data, schema_name):
"""Extracts and returns the expiration date from the JSON data."""
expiration = None
@@ -209,13 +176,70 @@ class NewSecretValidator(ValidatorBase):
expiration = timeutils.normalize_time(expiration_tz)
except ValueError:
LOG.exception("Problem parsing expiration date")
raise exception.InvalidObject(schema=schema_name,
reason=u._("Invalid date "
"for 'expiration'"),
property="expiration")
raise exception.InvalidObject(
schema=schema_name,
reason=u._("Invalid date for 'expiration'"),
property="expiration")
return expiration
def _assert_expiration_is_valid(self, expiration, schema_name):
"""Asserts that the given expiration date is valid. Which means that
it should not be in the past.
"""
if expiration:
# Verify not already expired.
utcnow = timeutils.utcnow()
self._assert_validity(expiration > utcnow, schema_name,
u._("'expiration' is before current time"),
"expiration")
def _validate_content_parameters(self, content_type, content_encoding,
schema_name):
"""Check that the content_type, content_encoding and the parameters
that they affect are valid.
"""
self._assert_validity(
content_type is not None,
schema_name,
u._("If 'payload' is supplied, 'payload_content_type' must also "
"be supplied."),
"payload_content_type")
self._assert_validity(
content_type.lower() in mime_types.SUPPORTED,
schema_name,
u._("payload_content_type is not one of {0}").format(
mime_types.SUPPORTED),
"payload_content_type")
if content_type == 'application/octet-stream':
self._assert_validity(
content_encoding is not None,
schema_name,
u._("payload_content_encoding must be specified when "
"payload_content_type is application/octet-stream."),
"payload_content_encoding")
if content_type.startswith('text/plain'):
self._assert_validity(
content_encoding is None,
schema_name,
u._("payload_content_encoding must not be specified when "
"payload_content_type is text/plain"),
"payload_content_encoding")
def _extract_payload(self, json_data):
"""Extracts and returns the payload from the JSON data.
:raises: LimitExceeded if the payload is too big
"""
payload = json_data['payload']
if secret_too_big(payload):
raise exception.LimitExceeded()
return payload.strip()
class NewOrderValidator(ValidatorBase):
"""Validate a new order."""