293 lines
7.9 KiB
Python
293 lines
7.9 KiB
Python
import collections
|
|
import json
|
|
import re
|
|
|
|
from dcos import util
|
|
from dcos.errors import DCOSException
|
|
|
|
logger = util.get_logger(__name__)
|
|
|
|
|
|
def parse_json_item(json_item, schema):
|
|
"""Parse the json item (optionally based on a schema).
|
|
|
|
:param json_item: A JSON item in the form 'key=value'
|
|
:type json_item: str
|
|
:param schema: The JSON schema to use for parsing
|
|
:type schema: dict | None
|
|
:returns: A tuple for the parsed JSON item
|
|
:rtype: (str, any) where any is one of str, int, float, bool, list or dict
|
|
"""
|
|
|
|
terms = json_item.split('=', 1)
|
|
if len(terms) != 2:
|
|
raise DCOSException('{!r} is not a valid json-item'.format(json_item))
|
|
|
|
# Check that it is a valid key in our jsonschema
|
|
key = terms[0]
|
|
|
|
# Use the schema if we have it else, guess the type
|
|
if schema:
|
|
value = parse_json_value(key, terms[1], schema)
|
|
else:
|
|
value = _find_type(clean_value(terms[1]))
|
|
|
|
return (json.dumps(key), value)
|
|
|
|
|
|
def parse_json_value(key, value, schema):
|
|
"""Parse the json value based on a schema.
|
|
|
|
:param key: the key property
|
|
:type key: str
|
|
:param value: the value of property
|
|
:type value: str
|
|
:param schema: The JSON schema to use for parsing
|
|
:type schema: dict
|
|
:returns: parsed value
|
|
:rtype: str | int | float | bool | list | dict
|
|
"""
|
|
|
|
value_type = find_parser(key, schema)
|
|
return value_type(value)
|
|
|
|
|
|
def find_parser(key, schema):
|
|
"""
|
|
:param key: JSON field
|
|
:type key: str
|
|
:param schema: The JSON schema to use
|
|
:type schema: dict
|
|
:returns: A callable capable of parsing a string to its type
|
|
:rtype: ValueTypeParser
|
|
"""
|
|
|
|
key_schema = schema['properties'].get(key)
|
|
if key_schema is None:
|
|
keys = '\n'.join(schema['properties'].keys())
|
|
raise DCOSException(
|
|
'Property {!r} is invalid - '
|
|
'possible properties are: \n{}'.format(key, keys))
|
|
else:
|
|
return ValueTypeParser(key_schema)
|
|
|
|
|
|
class ValueTypeParser(object):
|
|
"""Callable for parsing a string against a known JSON type.
|
|
|
|
:param schema: The JSON type as a schema
|
|
:type schema: dict
|
|
"""
|
|
|
|
def __init__(self, schema):
|
|
self.schema = schema
|
|
|
|
def __call__(self, value):
|
|
"""
|
|
:param value: String to try and parse
|
|
:type value: str
|
|
:returns: The parse value
|
|
:rtype: str | int | float | bool | list | dict
|
|
"""
|
|
|
|
value = clean_value(value)
|
|
|
|
if self.schema['type'] == 'string':
|
|
if self.schema.get('format') == 'uri':
|
|
return _parse_url(value)
|
|
else:
|
|
return _parse_string(value)
|
|
elif self.schema['type'] == 'object':
|
|
return _parse_object(value)
|
|
elif self.schema['type'] == 'number':
|
|
return _parse_number(value)
|
|
elif self.schema['type'] == 'integer':
|
|
return _parse_integer(value)
|
|
elif self.schema['type'] == 'boolean':
|
|
return _parse_boolean(value)
|
|
elif self.schema['type'] == 'array':
|
|
return _parse_array(value)
|
|
else:
|
|
raise DCOSException('Unknown type {!r}'.format(self._value_type))
|
|
|
|
|
|
def clean_value(value):
|
|
"""
|
|
:param value: String to try and clean
|
|
:type value: str
|
|
:returns: The cleaned string
|
|
:rtype: str
|
|
"""
|
|
|
|
if len(value) > 1 and value.startswith('"') and value.endswith('"'):
|
|
return value[1:-1]
|
|
elif len(value) > 1 and value.startswith("'") and value.endswith("'"):
|
|
return value[1:-1]
|
|
else:
|
|
return value
|
|
|
|
|
|
def _find_type(value):
|
|
"""Find the correct type of the value
|
|
|
|
:param value: value to parse
|
|
:type value: str
|
|
:returns: The parsed value
|
|
:rtype: int|float|
|
|
"""
|
|
to_try = [_parse_integer, _parse_number, _parse_boolean, _parse_array,
|
|
_parse_url, _parse_object, _parse_string]
|
|
while len(to_try) > 0:
|
|
try:
|
|
return to_try.pop(0)(value)
|
|
except DCOSException:
|
|
pass
|
|
|
|
raise DCOSException(
|
|
'Unable to parse {!r} as a JSON object'.format(value))
|
|
|
|
|
|
def _parse_string(value):
|
|
"""
|
|
:param value: The string to parse
|
|
:type value: str
|
|
:returns: The parsed value
|
|
:rtype: str
|
|
"""
|
|
|
|
return None if value == 'null' else value
|
|
|
|
|
|
def _parse_object(value):
|
|
"""
|
|
:param value: The string to parse
|
|
:type value: str
|
|
:returns: The parsed value
|
|
:rtype: dict
|
|
"""
|
|
|
|
try:
|
|
json_object = json.loads(value)
|
|
if json_object is None or isinstance(json_object, collections.Mapping):
|
|
return json_object
|
|
else:
|
|
raise DCOSException(
|
|
'Unable to parse {!r} as a JSON object'.format(value))
|
|
except ValueError as error:
|
|
logger.exception('Error parsing value as a JSON object')
|
|
|
|
msg = 'Unable to parse {!r} as a JSON object: {}'.format(value, error)
|
|
raise DCOSException(msg)
|
|
|
|
|
|
def _parse_number(value):
|
|
"""
|
|
:param value: The string to parse
|
|
:type value: str
|
|
:returns: The parsed value
|
|
:rtype: float
|
|
"""
|
|
|
|
try:
|
|
return None if value == 'null' else float(value)
|
|
except ValueError as error:
|
|
logger.exception('Error parsing value as a JSON number')
|
|
|
|
msg = 'Unable to parse {!r} as a float: {}'.format(value, error)
|
|
raise DCOSException(msg)
|
|
|
|
|
|
def _parse_integer(value):
|
|
"""
|
|
:param value: The string to parse
|
|
:type value: str
|
|
:returns: The parsed value
|
|
:rtype: int
|
|
"""
|
|
|
|
try:
|
|
return None if value == 'null' else int(value)
|
|
except ValueError as error:
|
|
logger.exception('Error parsing value as a JSON integer')
|
|
|
|
msg = 'Unable to parse {!r} as an int: {}'.format(value, error)
|
|
raise DCOSException(msg)
|
|
|
|
|
|
def _parse_boolean(value):
|
|
"""
|
|
:param value: The string to parse
|
|
:type value: str
|
|
:returns: The parsed value
|
|
:rtype: bool
|
|
"""
|
|
|
|
try:
|
|
boolean = json.loads(value.lower())
|
|
if boolean is None or isinstance(boolean, bool):
|
|
return boolean
|
|
else:
|
|
raise DCOSException(
|
|
'Unable to parse {!r} as a boolean'.format(value))
|
|
except ValueError as error:
|
|
logger.exception('Error parsing value as a JSON boolean')
|
|
|
|
msg = 'Unable to parse {!r} as a boolean: {}'.format(value, error)
|
|
raise DCOSException(msg)
|
|
|
|
|
|
def _parse_array(value):
|
|
"""
|
|
:param value: The string to parse
|
|
:type value: str
|
|
:returns: The parsed value
|
|
:rtype: list
|
|
"""
|
|
|
|
try:
|
|
array = json.loads(value)
|
|
if array is None or isinstance(array, collections.Sequence):
|
|
return array
|
|
else:
|
|
raise DCOSException(
|
|
'Unable to parse {!r} as an array'.format(value))
|
|
except ValueError as error:
|
|
logger.exception('Error parsing value as a JSON array')
|
|
|
|
msg = 'Unable to parse {!r} as an array: {}'.format(value, error)
|
|
raise DCOSException(msg)
|
|
|
|
|
|
def _parse_url(value):
|
|
"""
|
|
:param value: The url to parse
|
|
:type url: str
|
|
:returns: The parsed value
|
|
:rtype: str
|
|
"""
|
|
|
|
scheme_pattern = r'^(?P<scheme>(?:(?:https?)://))'
|
|
domain_pattern = (
|
|
r'(?P<hostname>(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.?)+'
|
|
'(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)?|') # domain,
|
|
|
|
value_regex = re.match(
|
|
scheme_pattern + # http:// or https://
|
|
r'(([^:])+(:[^:]+)?@){0,1}' + # auth credentials
|
|
domain_pattern +
|
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}))' # or ip
|
|
r'(?P<port>(?::\d+))?' # port
|
|
r'(?P<path>(?:/?|[/?]\S+))$', # resource path
|
|
value, re.IGNORECASE)
|
|
|
|
if value_regex is None:
|
|
scheme_match = re.match(scheme_pattern, value, re.IGNORECASE)
|
|
if scheme_match is None:
|
|
logger.debug("Defaulting URL to https scheme")
|
|
return "https://" + value
|
|
else:
|
|
raise DCOSException(
|
|
'Unable to parse {!r} as a url'.format(value))
|
|
else:
|
|
return value
|