Refactor JSONSchema handling code

This to better support both Draft 3 and Draft 4 simultaneously.

Additionally, we fix some bugs in the API v1 Schemas.

Change-Id: Iead88f0fb1320ef4ad545a2b66a53b82424c4b14
This commit is contained in:
Kiall Mac Innes 2013-07-30 15:41:44 +01:00
parent 6a44de8ff9
commit 1c8428e7a9
17 changed files with 370 additions and 377 deletions

View File

@ -1,5 +1,5 @@
{ {
"id": "/schemas/domain", "id": "domain",
"$schema": "http://json-schema.org/draft-03/hyper-schema", "$schema": "http://json-schema.org/draft-03/hyper-schema",

View File

@ -1,5 +1,5 @@
{ {
"id": "/schemas/domains", "id": "domains",
"$schema": "http://json-schema.org/draft-03/hyper-schema", "$schema": "http://json-schema.org/draft-03/hyper-schema",
@ -11,7 +11,7 @@
"domains": { "domains": {
"type": "array", "type": "array",
"description": "Domains", "description": "Domains",
"items": {"$ref": "/schemas/domain"} "items": {"$ref": "domain#"}
} }
} }
} }

View File

@ -1,5 +1,5 @@
{ {
"id": "/schemas/fault", "id": "fault",
"$schema": "http://json-schema.org/draft-03/hyper-schema", "$schema": "http://json-schema.org/draft-03/hyper-schema",

View File

@ -1,5 +1,5 @@
{ {
"id": "/schemas/limits", "id": "limits",
"$schema": "http://json-schema.org/draft-03/hyper-schema", "$schema": "http://json-schema.org/draft-03/hyper-schema",

View File

@ -1,5 +1,5 @@
{ {
"id": "/schemas/record", "id": "record",
"$schema": "http://json-schema.org/draft-03/hyper-schema", "$schema": "http://json-schema.org/draft-03/hyper-schema",

View File

@ -1,5 +1,5 @@
{ {
"id": "/schemas/records", "id": "records",
"$schema": "http://json-schema.org/draft-03/hyper-schema", "$schema": "http://json-schema.org/draft-03/hyper-schema",
@ -11,7 +11,7 @@
"records": { "records": {
"type": "array", "type": "array",
"description": "Records", "description": "Records",
"items": {"$ref": "/schemas/record"} "items": {"$ref": "record#"}
} }
} }
} }

View File

@ -1,5 +1,5 @@
{ {
"id": "/schemas/server", "id": "server",
"$schema": "http://json-schema.org/draft-03/hyper-schema", "$schema": "http://json-schema.org/draft-03/hyper-schema",

View File

@ -1,5 +1,5 @@
{ {
"id": "/schemas/servers", "id": "servers",
"$schema": "http://json-schema.org/draft-03/hyper-schema", "$schema": "http://json-schema.org/draft-03/hyper-schema",
@ -11,7 +11,7 @@
"servers": { "servers": {
"type": "array", "type": "array",
"description": "Servers", "description": "Servers",
"items": {"$ref": "/schemas/server"} "items": {"$ref": "server#"}
} }
} }
} }

View File

@ -1,5 +1,5 @@
{ {
"id": "/schemas/tsigkey", "id": "tsigkey",
"$schema": "http://json-schema.org/draft-03/hyper-schema", "$schema": "http://json-schema.org/draft-03/hyper-schema",

View File

@ -1,5 +1,5 @@
{ {
"id": "/schemas/tsigkeys", "id": "tsigkeys",
"$schema": "http://json-schema.org/draft-03/hyper-schema", "$schema": "http://json-schema.org/draft-03/hyper-schema",
@ -11,7 +11,7 @@
"tsigkeys": { "tsigkeys": {
"type": "array", "type": "array",
"description": "TSIG Keys", "description": "TSIG Keys",
"items": {"$ref": "/schemas/tsigkey"} "items": {"$ref": "tsigkey#"}
} }
} }
} }

View File

@ -1,258 +0,0 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import jsonschema
import netaddr
import iso8601
from datetime import datetime
from designate.openstack.common import log as logging
from designate import exceptions
from designate import utils
LOG = logging.getLogger(__name__)
_RE_DOMAINNAME = r'^(?!.{255,})((?!\-)[A-Za-z0-9_\-]{1,63}(?<!\-)\.)+$'
_RE_HOSTNAME = r'^(?!.{255,})((^\*|(?!\-)[A-Za-z0-9_\-]{1,63})(?<!\-)\.)+$'
RESOLVER = jsonschema.RefResolver('/', {}, store={
'/schemas/domain': utils.load_schema('v1', 'domain'),
'/schemas/domains': utils.load_schema('v1', 'domains'),
'/schemas/fault': utils.load_schema('v1', 'fault'),
'/schemas/limits': utils.load_schema('v1', 'domains'),
'/schemas/record': utils.load_schema('v1', 'record'),
'/schemas/records': utils.load_schema('v1', 'records'),
'/schemas/server': utils.load_schema('v1', 'server'),
'/schemas/servers': utils.load_schema('v1', 'servers'),
'/schemas/tsigkey': utils.load_schema('v1', 'tsigkey'),
'/schemas/tsigkeys': utils.load_schema('v1', 'tsigkeys'),
})
class SchemaValidator(jsonschema.Draft3Validator):
def validate_type(self, types, instance, schema):
# NOTE(kiall): A datetime object is not a string, but is still valid.
if ('format' in schema and schema['format'] == 'date-time'
and isinstance(instance, datetime)):
return
errors = super(SchemaValidator, self).validate_type(types, instance,
schema)
for error in errors:
yield error
def validate_format(self, format, instance, schema):
if format == "date-time":
# ISO 8601 format
if self.is_type(instance, "string"):
try:
iso8601.parse_date(instance)
except Exception:
msg = "%s is not an ISO 8601 date" % (instance)
yield jsonschema.ValidationError(msg)
elif format == "date":
# YYYY-MM-DD
if self.is_type(instance, "string"):
# TODO(kiall): I'm sure there is a more accurate regex than
# this..
pattern = ('^[0-9]{4}-(((0[13578]|(10|12))-'
'(0[1-9]|[1-2][0-9]|3[0-1]))|'
'(02-(0[1-9]|[1-2][0-9]))|((0[469]|11)-'
'(0[1-9]|[1-2][0-9]|30)))$')
if not re.match(pattern, instance):
msg = "%s is not a date" % (instance)
yield jsonschema.ValidationError(msg)
elif format == "time":
# hh:mm:ss
if self.is_type(instance, "string"):
# TODO(kiall): I'm sure there is a more accurate regex than
# this..
pattern = "^(?:(?:([01]?\d|2[0-3]):)?([0-5]?\d):)?([0-5]?\d)$"
if not re.match(pattern, instance):
msg = "%s is not a time" % (instance)
yield jsonschema.ValidationError(msg)
pass
elif format == "email":
# A valid email address. We use the RFC1035 version of "valid"
if self.is_type(instance, "string"):
if instance.count('@') != 1:
msg = "%s is not an email" % (instance)
yield jsonschema.ValidationError(msg)
else:
rname = instance.replace('@', '.', 1)
if not re.match(_RE_DOMAINNAME, "%s." % rname):
msg = "%s is not an email" % (instance)
yield jsonschema.ValidationError(msg)
elif format == "ip-address":
# IPv4 Address
if self.is_type(instance, "string"):
try:
netaddr.IPAddress(instance, version=4)
except netaddr.AddrFormatError:
msg = "%s is not an IPv4 address" % (instance)
yield jsonschema.ValidationError(msg)
else:
if instance == '0.0.0.0': # RFC5735
msg = "%s is not an IPv4 address" % (instance)
yield jsonschema.ValidationError(msg)
# is it a dotted quad & all 4 fields <= 255
m = re.match('(\d+)\.(\d+)\.(\d+)\.(\d+)$', instance)
if not (m and (int(m.group(1)) <= 255 and
int(m.group(2)) <= 255 and
int(m.group(3)) <= 255 and
int(m.group(4)) <= 255)):
msg = "%s is not an IPv4 address" % (instance)
yield jsonschema.ValidationError(msg)
elif format == "ipv6":
# IPv6 Address
if self.is_type(instance, "string"):
try:
netaddr.IPAddress(instance, version=6)
except netaddr.AddrFormatError:
msg = "%s is not an IPv6 address" % (instance)
yield jsonschema.ValidationError(msg)
elif format == "host-name":
# A valid hostname
if self.is_type(instance, "string"):
if not re.match(_RE_HOSTNAME, instance):
msg = "%s is not a host name" % (instance)
yield jsonschema.ValidationError(msg)
elif format == "domain-name":
# A valid domainname
if self.is_type(instance, "string"):
if not re.match(_RE_DOMAINNAME, instance):
msg = "%s is not a domain name" % (instance)
yield jsonschema.ValidationError(msg)
def validate_anyOf(self, schemas, instance, schema):
for s in schemas:
if self.is_valid(instance, s):
return
else:
yield jsonschema.ValidationError(
"%r is not valid for any of listed schemas %r" %
(instance, schemas)
)
def validate_allOf(self, schemas, instance, schema):
for s in schemas:
if not self.is_valid(instance, s):
yield jsonschema.ValidationError(
"%r is not valid against %r" % (instance, s)
)
def validate_oneOf(self, schemas, instance, schema):
match = False
for s in schemas:
if self.is_valid(instance, s):
if match:
yield jsonschema.ValidationError(
"%r matches more than one schema in %r" %
(instance, schemas)
)
match = True
if not match:
yield jsonschema.ValidationError(
"%r is not valid for any of listed schemas %r" %
(instance, schemas)
)
class Schema(object):
def __init__(self, version, name):
self.raw_schema = utils.load_schema(version, name)
self.validator = SchemaValidator(self.raw_schema, resolver=RESOLVER)
@property
def schema(self):
return self.validator.schema
@property
def properties(self):
return self.schema['properties']
@property
def resolver(self):
return self.validator.resolver
@property
def links(self):
return self.schema['links']
@property
def raw(self):
return self.raw_schema
def validate(self, obj):
errors = []
for error in self.validator.iter_errors(obj):
errors.append({
'path': ".".join(reversed(error.path)),
'message': error.message,
'validator': error.validator
})
if len(errors) > 0:
raise exceptions.InvalidObject("Provided object does not match "
"schema", errors=errors)
def filter(self, instance, properties=None):
if not properties:
properties = self.properties
filtered = {}
for name, subschema in properties.items():
if 'type' in subschema and subschema['type'] == 'array':
subinstance = instance.get(name, None)
filtered[name] = self._filter_array(subinstance, subschema)
elif 'type' in subschema and subschema['type'] == 'object':
subinstance = instance.get(name, None)
properties = subschema['properties']
filtered[name] = self.filter(subinstance, properties)
else:
filtered[name] = instance.get(name, None)
return filtered
def _filter_array(self, instance, schema):
if 'items' in schema and isinstance(schema['items'], list):
# NOTE(kiall): We currently don't make use of this..
raise NotImplementedError()
elif 'items' in schema:
schema = schema['items']
if '$ref' in schema:
with self.resolver.resolving(schema['$ref']) as ischema:
schema = ischema
properties = schema['properties']
return [self.filter(i, properties) for i in instance]
elif 'properties' in schema:
schema = schema['properties']
with self.resolver.resolving(schema['$ref']) as ischema:
schema = ischema
return [self.filter(i, schema) for i in instance]
else:
raise NotImplementedError('Can\'t filter unknown array type')

View File

@ -0,0 +1,111 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import log as logging
from designate import exceptions
from designate import utils
from designate.schema import validators
from designate.schema import resolvers
LOG = logging.getLogger(__name__)
class Schema(object):
def __init__(self, version, name):
self.raw_schema = utils.load_schema(version, name)
self.resolver = resolvers.LocalResolver.from_schema(
version, self.raw_schema)
if version == 'v1':
self.validator = validators.Draft3Validator(
self.raw_schema, resolver=self.resolver)
else:
raise Exception('Unknown API version: %s' % version)
@property
def schema(self):
return self.validator.schema
@property
def properties(self):
return self.schema['properties']
@property
def links(self):
return self.schema['links']
@property
def raw(self):
return self.raw_schema
def validate(self, obj):
errors = []
for error in self.validator.iter_errors(obj):
errors.append({
'path': ".".join([str(x) for x in error.path]),
'message': error.message,
'validator': error.validator
})
if len(errors) > 0:
raise exceptions.InvalidObject("Provided object does not match "
"schema", errors=errors)
def filter(self, instance, properties=None):
if not properties:
properties = self.properties
filtered = {}
for name, subschema in properties.items():
if 'type' in subschema and subschema['type'] == 'array':
subinstance = instance.get(name, None)
filtered[name] = self._filter_array(subinstance, subschema)
elif 'type' in subschema and subschema['type'] == 'object':
subinstance = instance.get(name, None)
properties = subschema['properties']
filtered[name] = self.filter(subinstance, properties)
else:
filtered[name] = instance.get(name, None)
return filtered
def _filter_array(self, instance, schema):
if 'items' in schema and isinstance(schema['items'], list):
# NOTE(kiall): We currently don't make use of this..
raise NotImplementedError()
elif 'items' in schema:
schema = schema['items']
if '$ref' in schema:
with self.resolver.resolving(schema['$ref']) as ischema:
schema = ischema
properties = schema['properties']
return [self.filter(i, properties) for i in instance]
elif 'properties' in schema:
schema = schema['properties']
with self.resolver.resolving(schema['$ref']) as ischema:
schema = ischema
return [self.filter(i, schema) for i in instance]
else:
raise NotImplementedError('Can\'t filter unknown array type')

View File

@ -0,0 +1,87 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import jsonschema
import netaddr
from designate.openstack.common import log as logging
LOG = logging.getLogger(__name__)
RE_DOMAINNAME = r'^(?!.{255,})((?!\-)[A-Za-z0-9_\-]{1,63}(?<!\-)\.)+$'
RE_HOSTNAME = r'^(?!.{255,})((^\*|(?!\-)[A-Za-z0-9_\-]{1,63})(?<!\-)\.)+$'
@jsonschema._checks_drafts(draft3='ip-address', draft4='ipv4')
def is_ipv4(instance):
try:
address = netaddr.IPAddress(instance, version=4)
# netaddr happly accepts, and expands "127.0" into "127.0.0.0"
if str(address) != instance:
return False
except netaddr.AddrFormatError:
return False
if instance == '0.0.0.0': # RFC5735
return False
return True
@jsonschema._checks_drafts('ipv6')
def is_ipv6(instance):
try:
netaddr.IPAddress(instance, version=6)
except netaddr.AddrFormatError:
return False
return True
@jsonschema._checks_drafts(draft3="host-name", draft4="hostname")
def is_hostname(instance):
if not re.match(RE_HOSTNAME, instance):
return False
return True
@jsonschema._checks_drafts(draft3="domain-name", draft4="domainname")
def is_domainname(instance):
if not re.match(RE_DOMAINNAME, instance):
return False
return True
@jsonschema._checks_drafts("email")
def is_email(instance):
# A valid email address. We use the RFC1035 version of "valid".
if instance.count('@') != 1:
return False
rname = instance.replace('@', '.', 1)
if not re.match(RE_DOMAINNAME, "%s." % rname):
return False
return True
draft3_format_checker = jsonschema.FormatChecker(
jsonschema._draft_checkers["draft3"])
draft4_format_checker = jsonschema.FormatChecker(
jsonschema._draft_checkers["draft4"])

View File

@ -0,0 +1,38 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import jsonschema
from designate.openstack.common import log as logging
from designate import utils
LOG = logging.getLogger(__name__)
class LocalResolver(jsonschema.RefResolver):
def __init__(self, base_uri, referrer):
super(LocalResolver, self).__init__(base_uri, referrer, (), True)
self.api_version = None
@classmethod
def from_schema(cls, api_version, schema, *args, **kwargs):
resolver = cls(schema.get("id", ""), schema, *args, **kwargs)
resolver.api_version = api_version
return resolver
def resolve_remote(self, uri):
LOG.debug('Loading remote schema: %s', uri)
return utils.load_schema(self.api_version, uri)

View File

@ -0,0 +1,78 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import jsonschema
from designate.openstack.common import log as logging
from designate.schema import format
LOG = logging.getLogger(__name__)
class _Draft34CommonMixin(object):
def validate_type(self, types, instance, schema):
# NOTE(kiall): A datetime object is not a string, but is still valid.
if ('format' in schema and schema['format'] == 'date-time'
and isinstance(instance, datetime.datetime)):
return
errors = super(_Draft34CommonMixin, self).validate_type(
types, instance, schema)
for error in errors:
yield error
class Draft4Validator(_Draft34CommonMixin, jsonschema.Draft4Validator):
def __init__(self, schema, types=(), resolver=None, format_checker=None):
if format_checker is None:
format_checker = format.draft4_format_checker
super(Draft4Validator, self).__init__(schema, types, resolver,
format_checker)
class Draft3Validator(_Draft34CommonMixin, jsonschema.Draft3Validator):
def __init__(self, schema, types=(), resolver=None, format_checker=None):
if format_checker is None:
format_checker = format.draft3_format_checker
super(Draft3Validator, self).__init__(schema, types, resolver,
format_checker)
def validate_oneOf(self, oneOf, instance, schema):
# Backported from Draft4 to Draft3
subschemas = enumerate(oneOf)
all_errors = []
for index, subschema in subschemas:
errors = list(self.descend(instance, subschema, schema_path=index))
if not errors:
first_valid = subschema
break
all_errors.extend(errors)
else:
yield jsonschema.ValidationError(
"%r is not valid under any of the given schemas" % (instance,),
context=all_errors,
)
more_valid = [s for i, s in subschemas if self.is_valid(instance, s)]
if more_valid:
more_valid.append(first_valid)
reprs = ", ".join(repr(schema) for schema in more_valid)
yield jsonschema.ValidationError(
"%r is valid under each of %s" % (instance, reprs)
)

View File

@ -0,0 +1,24 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.tests import TestCase
from designate import schema
class TestSchema(TestCase):
def test_constructor(self):
domain = schema.Schema('v1', 'domain')
self.assertIsInstance(domain, schema.Schema)

View File

@ -1,6 +1,6 @@
# Copyright 2012 Managed I.T. # Copyright 2013 Hewlett-Packard Development Company, L.P.
# #
# Author: Kiall Mac Innes <kiall@managedit.ie> # Author: Kiall Mac Innes <kiall@hp.com>
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -13,29 +13,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import jsonschema
from datetime import datetime
from designate.openstack.common import log as logging
from designate.tests import TestCase from designate.tests import TestCase
from designate import schema from designate.openstack.common import log as logging
from designate.schema import format
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class TestSchemaValidator(TestCase): class SchemaFormatTest(TestCase):
def test_validate_format_ipaddress(self): def test_is_ipv4(self):
test_schema = {
"properties": {
"ipaddress": {
"type": "string",
"format": "ip-address",
"required": True
},
}
}
validator = schema.SchemaValidator(test_schema)
valid_ipaddresses = [ valid_ipaddresses = [
'0.0.0.1', '0.0.0.1',
'127.0.0.1', '127.0.0.1',
@ -59,27 +45,12 @@ class TestSchemaValidator(TestCase):
] ]
for ipaddress in valid_ipaddresses: for ipaddress in valid_ipaddresses:
LOG.debug('Expecting success for: %s' % ipaddress) self.assertTrue(format.is_ipv4(ipaddress))
validator.validate({'ipaddress': ipaddress})
for ipaddress in invalid_ipaddresses: for ipaddress in invalid_ipaddresses:
with self.assertRaises(jsonschema.ValidationError): self.assertFalse(format.is_ipv4(ipaddress))
LOG.debug('Expecting failure for: %s' % ipaddress)
validator.validate({'ipaddress': ipaddress})
def test_validate_format_hostname(self):
test_schema = {
"properties": {
"hostname": {
"type": "string",
"format": "host-name",
"required": True
},
}
}
validator = schema.SchemaValidator(test_schema)
def test_is_hostname(self):
valid_hostnames = [ valid_hostnames = [
'example.com.', 'example.com.',
'www.example.com.', 'www.example.com.',
@ -141,27 +112,12 @@ class TestSchemaValidator(TestCase):
] ]
for hostname in valid_hostnames: for hostname in valid_hostnames:
LOG.debug('Expecting success for: %s' % hostname) self.assertTrue(format.is_hostname(hostname))
validator.validate({'hostname': hostname})
for hostname in invalid_hostnames: for hostname in invalid_hostnames:
with self.assertRaises(jsonschema.ValidationError): self.assertFalse(format.is_hostname(hostname))
LOG.debug('Expecting failure for: %s' % hostname)
validator.validate({'hostname': hostname})
def test_validate_format_domainname(self):
test_schema = {
"properties": {
"domainname": {
"type": "string",
"format": "domain-name",
"required": True
},
}
}
validator = schema.SchemaValidator(test_schema)
def test_is_domainname(self):
valid_domainnames = [ valid_domainnames = [
'example.com.', 'example.com.',
'www.example.com.', 'www.example.com.',
@ -226,27 +182,12 @@ class TestSchemaValidator(TestCase):
] ]
for domainname in valid_domainnames: for domainname in valid_domainnames:
LOG.debug('Expecting success for: %s' % domainname) self.assertTrue(format.is_domainname(domainname))
validator.validate({'domainname': domainname})
for domainname in invalid_domainnames: for domainname in invalid_domainnames:
with self.assertRaises(jsonschema.ValidationError): self.assertFalse(format.is_domainname(domainname))
LOG.debug('Expecting failure for: %s' % domainname)
validator.validate({'domainname': domainname})
def test_validate_format_email(self):
test_schema = {
"properties": {
"email": {
"type": "string",
"format": "email",
"required": True
},
}
}
validator = schema.SchemaValidator(test_schema)
def test_is_email(self):
valid_emails = [ valid_emails = [
'user@example.com', 'user@example.com',
'user@emea.example.com', 'user@emea.example.com',
@ -289,36 +230,8 @@ class TestSchemaValidator(TestCase):
for email in valid_emails: for email in valid_emails:
LOG.debug('Expecting success for: %s' % email) LOG.debug('Expecting success for: %s' % email)
validator.validate({'email': email}) self.assertTrue(format.is_email(email))
for email in invalid_emails: for email in invalid_emails:
with self.assertRaises(jsonschema.ValidationError): LOG.debug('Expecting failure for: %s' % email)
LOG.debug('Expecting failure for: %s' % email) self.assertFalse(format.is_email(email))
validator.validate({'email': email})
def test_validate_format_datetime(self):
test_schema = {
"properties": {
"date_time": {
"type": "string",
"format": "date-time",
"required": True
},
}
}
validator = schema.SchemaValidator(test_schema)
valid_datetimes = [
datetime(2013, 1, 1)
]
for dt in valid_datetimes:
validator.validate({'date_time': dt})
class TestSchema(TestCase):
def test_constructor(self):
domain = schema.Schema('v1', 'domain')
self.assertIsInstance(domain, schema.Schema)