At least part of it anyway. Rather than waiting to split the lines and strip the non-dependency information (comments etc) from the file, we do it immediately on load. We also add a whole lot of type hints to the modified code, to help us reason about things a little easier. Change-Id: Ide4574c53123ebe20d28211b80b9c079d01dd5e0 Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
247 lines
7.7 KiB
Python
247 lines
7.7 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Requirements handling."""
|
|
|
|
# This module has no IO at all, and none should be added.
|
|
|
|
import collections
|
|
import packaging.requirements
|
|
import packaging.specifiers
|
|
import packaging.utils
|
|
import packaging.version
|
|
import re
|
|
|
|
|
|
def key_specifier(a):
|
|
weight = {
|
|
'>=': 0,
|
|
'>': 0,
|
|
'===': 1,
|
|
'==': 1,
|
|
'~=': 1,
|
|
'!=': 1,
|
|
'<': 2,
|
|
'<=': 2,
|
|
}
|
|
a = a._spec
|
|
return (weight[a[0]], packaging.version.parse(a[1]))
|
|
|
|
|
|
class Requirement(
|
|
collections.namedtuple(
|
|
'Requirement',
|
|
['package', 'location', 'specifiers', 'markers', 'comment', 'extras'],
|
|
)
|
|
):
|
|
def __new__(
|
|
cls, package, location, specifiers, markers, comment, extras=None
|
|
):
|
|
return super().__new__(
|
|
cls,
|
|
package,
|
|
location,
|
|
specifiers,
|
|
markers,
|
|
comment,
|
|
frozenset(extras or ()),
|
|
)
|
|
|
|
def to_line(
|
|
self,
|
|
marker_sep=';',
|
|
line_prefix='',
|
|
comment_prefix=' ',
|
|
sort_specifiers=False,
|
|
):
|
|
comment_p = comment_prefix if self.package else ''
|
|
comment = comment_p + self.comment if self.comment else ''
|
|
marker = marker_sep + self.markers if self.markers else ''
|
|
package = line_prefix + self.package if self.package else ''
|
|
location = self.location + '#egg=' if self.location else ''
|
|
extras = (
|
|
'[{}]'.format(",".join(sorted(self.extras))) if self.extras else ''
|
|
)
|
|
specifiers = self.specifiers
|
|
if sort_specifiers:
|
|
_specifiers = packaging.specifiers.SpecifierSet(specifiers)
|
|
_specifiers = [
|
|
f'{s}' for s in sorted(_specifiers, key=key_specifier)
|
|
]
|
|
specifiers = ','.join(_specifiers)
|
|
return f'{location}{package}{extras}{specifiers}{marker}{comment}\n'
|
|
|
|
|
|
Requirements = collections.namedtuple('Requirements', ['reqs'])
|
|
|
|
|
|
url_re = re.compile(
|
|
r'^(?P<url>\s*(?:-e\s)?\s*(?:(?:[a-z]+\+)?(?:[a-z]+))://[^#]*)'
|
|
r'#egg=(?P<name>[-\.\w]+)'
|
|
)
|
|
|
|
|
|
def canonical_name(req_name):
|
|
"""Return the canonical form of req_name."""
|
|
return packaging.utils.canonicalize_name(req_name)
|
|
|
|
|
|
def parse(content, permit_urls=False):
|
|
return to_dict(to_reqs(content, permit_urls=permit_urls))
|
|
|
|
|
|
def parse_lines(lines, permit_urls=False):
|
|
return to_dict(to_req(line, permit_urls=permit_urls) for line in lines)
|
|
|
|
|
|
def parse_line(req_line, permit_urls=False):
|
|
"""Parse a single line of a requirements file.
|
|
|
|
requirements files here are a subset of pip requirements files: we don't
|
|
try to parse URL entries, or pip options like -f and -e. Those are not
|
|
permitted in global-requirements.txt. If encountered in a synchronised
|
|
file such as requirements.txt or test-requirements.txt, they are illegal
|
|
but currently preserved as-is.
|
|
|
|
They may of course be used by local test configurations, just not
|
|
committed into the OpenStack reference branches.
|
|
|
|
:param permit_urls: If True, urls are parsed into Requirement tuples.
|
|
By default they are not, because they cannot be reflected into
|
|
setuptools kwargs, and thus the default is conservative. When
|
|
urls are permitted, -e *may* be supplied at the start of the line.
|
|
"""
|
|
end = len(req_line)
|
|
hash_pos = req_line.find('#')
|
|
if hash_pos < 0:
|
|
hash_pos = end
|
|
# Don't find urls that are in comments.
|
|
if '://' in req_line[:hash_pos]:
|
|
if permit_urls:
|
|
# We accept only a subset of urls here - they have to have an egg
|
|
# name so that we can tell what project its for without doing
|
|
# network access. Egg markers use a fragment, so we need to pull
|
|
# out url from the entire line.
|
|
m = url_re.match(req_line)
|
|
name = m.group('name')
|
|
location = m.group('url')
|
|
parse_start = m.end('name')
|
|
hash_pos = req_line[parse_start:].find('#')
|
|
if hash_pos < 0:
|
|
hash_pos = end
|
|
else:
|
|
hash_pos = hash_pos + parse_start
|
|
else:
|
|
# Trigger an early failure before we look for ':'
|
|
packaging.requirements.Requirement(req_line)
|
|
else:
|
|
parse_start = 0
|
|
location = ''
|
|
semi_pos = req_line.find(';', parse_start, hash_pos)
|
|
colon_pos = req_line.find(':', parse_start, hash_pos)
|
|
marker_pos = max(semi_pos, colon_pos)
|
|
if marker_pos < 0:
|
|
marker_pos = hash_pos
|
|
markers = req_line[marker_pos + 1 : hash_pos].strip()
|
|
if hash_pos != end:
|
|
comment = req_line[hash_pos:]
|
|
else:
|
|
comment = ''
|
|
req_line = req_line[parse_start:marker_pos]
|
|
|
|
extras = ()
|
|
if parse_start:
|
|
# We parsed a url before
|
|
specifier = ''
|
|
elif req_line:
|
|
# Pulled out a requirement
|
|
parsed = packaging.requirements.Requirement(req_line)
|
|
name = parsed.name
|
|
extras = parsed.extras
|
|
specifier = str(parsed.specifier)
|
|
else:
|
|
# Comments / blank lines etc.
|
|
name = ''
|
|
specifier = ''
|
|
return Requirement(name, location, specifier, markers, comment, extras)
|
|
|
|
|
|
def to_content(reqs, marker_sep=';', line_prefix=''):
|
|
lines = []
|
|
for req in reqs.reqs:
|
|
lines.append(req.to_line(marker_sep, line_prefix))
|
|
return ''.join(lines)
|
|
|
|
|
|
def to_dict(req_sequence):
|
|
reqs = dict()
|
|
for req, req_line in req_sequence:
|
|
if req is not None:
|
|
key = canonical_name(req.package)
|
|
reqs.setdefault(key, []).append((req, req_line))
|
|
return reqs
|
|
|
|
|
|
def _pass_through(req_line, permit_urls=False):
|
|
"""Identify unparsable lines."""
|
|
if permit_urls:
|
|
return req_line.startswith(
|
|
'http://tarballs.openstack.org/'
|
|
) or req_line.startswith('-f')
|
|
else:
|
|
return (
|
|
req_line.startswith('http://tarballs.openstack.org/')
|
|
or req_line.startswith('-e')
|
|
or req_line.startswith('-f')
|
|
)
|
|
|
|
|
|
def to_req(line, permit_urls=False):
|
|
if _pass_through(line, permit_urls=permit_urls):
|
|
return (None, line)
|
|
else:
|
|
return (parse_line(line, permit_urls=permit_urls), line)
|
|
|
|
|
|
def to_reqs(content, permit_urls=False):
|
|
for content_line in content.splitlines(True):
|
|
req_line = content_line.strip()
|
|
|
|
# skip comments, blank lines
|
|
if req_line.startswith('#') or not req_line:
|
|
continue
|
|
|
|
yield to_req(req_line, permit_urls)
|
|
|
|
|
|
def check_reqs_bounds_policy(global_reqs):
|
|
"""Check that the global requirement version specifiers match the policy.
|
|
|
|
The policy is defined as
|
|
* There needs to be exactly one lower bound (>=1.2 defined)
|
|
* There can be one or more excludes (!=1.2.1, !=1.2.2)
|
|
* TODO: Clarify (non-) existance of upper caps
|
|
"""
|
|
|
|
for pkg_requirement in global_reqs.values():
|
|
req = pkg_requirement[0][0]
|
|
if req.package:
|
|
_specifiers = packaging.specifiers.SpecifierSet(req.specifiers)
|
|
lower_bound = set()
|
|
for spec in _specifiers:
|
|
if spec.operator == '>=':
|
|
lower_bound.add(spec)
|
|
if len(lower_bound):
|
|
yield (
|
|
f'Requirement {req.package} should not include a >= specifier'
|
|
)
|