Check that each of the IPv4 bytes are valid
This commit is contained in:
@@ -125,6 +125,8 @@ SUBAUTHORITY_MATCHER = re.compile((
|
||||
':?(?P<port>\d+)?$' # port
|
||||
).format(HOST_PATTERN))
|
||||
|
||||
IPv4_MATCHER = re.compile('^' + ipv4 + '$')
|
||||
|
||||
|
||||
# ####################
|
||||
# Path Matcher Section
|
||||
|
||||
@@ -17,8 +17,8 @@ from collections import namedtuple
|
||||
from .compat import to_str
|
||||
from .exceptions import InvalidAuthority
|
||||
from .misc import (
|
||||
FRAGMENT_MATCHER, PATH_MATCHER, QUERY_MATCHER, SCHEME_MATCHER,
|
||||
SUBAUTHORITY_MATCHER, URI_MATCHER, URI_COMPONENTS
|
||||
FRAGMENT_MATCHER, IPv4_MATCHER, PATH_MATCHER, QUERY_MATCHER,
|
||||
SCHEME_MATCHER, SUBAUTHORITY_MATCHER, URI_MATCHER, URI_COMPONENTS
|
||||
)
|
||||
from .normalizers import (
|
||||
encode_component, normalize_scheme, normalize_authority, normalize_path,
|
||||
@@ -29,14 +29,17 @@ from .normalizers import (
|
||||
class URIReference(namedtuple('URIReference', URI_COMPONENTS)):
|
||||
slots = ()
|
||||
|
||||
def __new__(cls, scheme, authority, path, query, fragment):
|
||||
return super(URIReference, cls).__new__(
|
||||
def __new__(cls, scheme, authority, path, query, fragment,
|
||||
encoding='utf-8'):
|
||||
ref = super(URIReference, cls).__new__(
|
||||
cls,
|
||||
scheme or None,
|
||||
authority or None,
|
||||
path or None,
|
||||
query or None,
|
||||
fragment or None)
|
||||
ref.encoding = encoding
|
||||
return ref
|
||||
|
||||
def __eq__(self, other):
|
||||
other_ref = other
|
||||
@@ -68,7 +71,8 @@ class URIReference(namedtuple('URIReference', URI_COMPONENTS)):
|
||||
return URIReference(split_uri['scheme'], split_uri['authority'],
|
||||
encode_component(split_uri['path'], encoding),
|
||||
encode_component(split_uri['query'], encoding),
|
||||
encode_component(split_uri['fragment'], encoding))
|
||||
encode_component(split_uri['fragment'], encoding),
|
||||
encoding)
|
||||
|
||||
def authority_info(self):
|
||||
"""Returns a dictionary with the ``userinfo``, ``host``, and ``port``.
|
||||
@@ -93,9 +97,20 @@ class URIReference(namedtuple('URIReference', URI_COMPONENTS)):
|
||||
# Reference, but it cannot be further parsed by our
|
||||
# SUBAUTHORITY_MATCHER. In this case it must not be a valid
|
||||
# authority.
|
||||
raise InvalidAuthority(self.authority)
|
||||
raise InvalidAuthority(self.authority.encode(self.encoding))
|
||||
|
||||
return match.groupdict()
|
||||
# We had a match, now let's ensure that it is actually a valid host
|
||||
# address if it is IPv4
|
||||
matches = match.groupdict()
|
||||
host = matches.get('host')
|
||||
|
||||
if (host and IPv4_MATCHER.match(host) and not
|
||||
valid_ipv4_host_address(host)):
|
||||
# If we have a host, it appears to be IPv4 and it does not have
|
||||
# valid bytes, it is an InvalidAuthority.
|
||||
raise InvalidAuthority(self.authority.encode(self.encoding))
|
||||
|
||||
return matches
|
||||
|
||||
@property
|
||||
def host(self):
|
||||
@@ -165,7 +180,23 @@ class URIReference(namedtuple('URIReference', URI_COMPONENTS)):
|
||||
:returns: ``True`` if the authority is valid. ``False`` otherwise.
|
||||
:rtype: bool
|
||||
"""
|
||||
return self._is_valid(self.authority, SUBAUTHORITY_MATCHER, require)
|
||||
try:
|
||||
self.authority_info()
|
||||
except InvalidAuthority:
|
||||
return False
|
||||
|
||||
is_valid = self._is_valid(self.authority,
|
||||
SUBAUTHORITY_MATCHER,
|
||||
require)
|
||||
|
||||
# Ensure that IPv4 addresses have valid bytes
|
||||
if is_valid and self.host and IPv4_MATCHER.match(self.host):
|
||||
return valid_ipv4_host_address(self.host)
|
||||
|
||||
# Perhaps the host didn't exist or if it did, it wasn't an IPv4-like
|
||||
# address. In either case, we want to rely on the `_is_valid` check,
|
||||
# so let's return that.
|
||||
return is_valid
|
||||
|
||||
def scheme_is_valid(self, require=False):
|
||||
"""Determines if the scheme component is valid.
|
||||
@@ -254,3 +285,18 @@ class URIReference(namedtuple('URIReference', URI_COMPONENTS)):
|
||||
if self.fragment:
|
||||
result_list.extend(['#', self.fragment])
|
||||
return ''.join(result_list)
|
||||
|
||||
|
||||
def valid_ipv4_host_address(host):
|
||||
# If the host exists, and it might be IPv4, check each byte in the
|
||||
# address.
|
||||
for byte in host.split('.'):
|
||||
byte_val = int(byte, base=10)
|
||||
if byte_val < 0 or byte_val > 255:
|
||||
# If the value is not in the correct range it is invalid.
|
||||
# Return False immediately
|
||||
return False
|
||||
|
||||
# If we haven't returned yet, the host existed, and appeared to be
|
||||
# IPv4, then it should be a valid IPv4
|
||||
return True
|
||||
|
||||
@@ -16,7 +16,8 @@ valid_hosts = [
|
||||
invalid_hosts = [
|
||||
'[FF02::3::5]', # IPv6 can only have one ::
|
||||
'[FADF:01]', # Not properly compacted (missing a :)
|
||||
'localhost:80:80:80' # Too many ports
|
||||
'localhost:80:80:80', # Too many ports
|
||||
'256.256.256.256' # Invalid IPv4 Address
|
||||
]
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user