221 lines
7.9 KiB

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Requirements handling."""
# This module has no IO at all, and none should be added.
import collections
import distutils.version
import packaging.specifiers
import pkg_resources
import re
# A header for the requirements file(s).
# TODO(lifeless): Remove this once constraints are in use.
'# The order of packages is significant, because pip processes '
'them in the order\n',
'# of appearance. Changing the order has an impact on the overall '
'# process, which may cause wedges in the gate later.\n',
def key_specifier(a):
weight = {'>=': 0, '>': 0,
'===': 1, '==': 1, '~=': 1, '!=': 1,
'<': 2, '<=': 2}
a = a._spec
return (weight[a[0]], distutils.version.LooseVersion(a[1]))
class Requirement(collections.namedtuple('Requirement',
['package', 'location', 'specifiers',
'markers', 'comment', 'extras'])):
def __new__(cls, package, location, specifiers, markers, comment,
return super(Requirement, cls).__new__(
cls, package, location, specifiers, markers, comment,
frozenset(extras or ()))
def to_line(self, marker_sep=';', line_prefix='', comment_prefix=' ',
comment_p = comment_prefix if self.package else ''
comment = (comment_p + self.comment if self.comment else '')
marker = marker_sep + self.markers if self.markers else ''
package = line_prefix + self.package if self.package else ''
location = self.location + '#egg=' if self.location else ''
extras = '[%s]' % ",".join(sorted(self.extras)) if self.extras else ''
specifiers = self.specifiers
if sort_specifiers:
_specifiers = packaging.specifiers.SpecifierSet(specifiers)
_specifiers = ['%s' % s for s in sorted(_specifiers,
specifiers = ','.join(_specifiers)
return '%s%s%s%s%s%s\n' % (location,
Requirements = collections.namedtuple('Requirements', ['reqs'])
url_re = re.compile(
def canonical_name(req_name):
"""Return the canonical form of req_name."""
return pkg_resources.safe_name(req_name).lower()
def parse(content, permit_urls=False):
return to_dict(to_reqs(content, permit_urls=permit_urls))
def parse_line(req_line, permit_urls=False):
"""Parse a single line of a requirements file.
requirements files here are a subset of pip requirements files: we don't
try to parse URL entries, or pip options like -f and -e. Those are not
permitted in global-requirements.txt. If encountered in a synchronised
file such as requirements.txt or test-requirements.txt, they are illegal
but currently preserved as-is.
They may of course be used by local test configurations, just not
committed into the OpenStack reference branches.
:param permit_urls: If True, urls are parsed into Requirement tuples.
By default they are not, because they cannot be reflected into
setuptools kwargs, and thus the default is conservative. When
urls are permitted, -e *may* be supplied at the start of the line.
end = len(req_line)
hash_pos = req_line.find('#')
if hash_pos < 0:
hash_pos = end
# Don't find urls that are in comments.
if '://' in req_line[:hash_pos]:
if permit_urls:
# We accept only a subset of urls here - they have to have an egg
# name so that we can tell what project its for without doing
# network access. Egg markers use a fragment, so we need to pull
# out url from the entire line.
m = url_re.match(req_line)
name ='name')
location ='url')
parse_start = m.end('name')
hash_pos = req_line[parse_start:].find('#')
if hash_pos < 0:
hash_pos = end
hash_pos = hash_pos + parse_start
# Trigger an early failure before we look for ':'
parse_start = 0
location = ''
semi_pos = req_line.find(';', parse_start, hash_pos)
colon_pos = req_line.find(':', parse_start, hash_pos)
marker_pos = max(semi_pos, colon_pos)
if marker_pos < 0:
marker_pos = hash_pos
markers = req_line[marker_pos + 1:hash_pos].strip()
if hash_pos != end:
comment = req_line[hash_pos:]
comment = ''
req_line = req_line[parse_start:marker_pos]
extras = ()
if parse_start:
# We parsed a url before
specifier = ''
elif req_line:
# Pulled out a requirement
parsed = pkg_resources.Requirement.parse(req_line)
name = parsed.project_name
extras = parsed.extras
specifier = str(parsed.specifier)
# Comments / blank lines etc.
name = ''
specifier = ''
return Requirement(name, location, specifier, markers, comment, extras)
def to_content(reqs, marker_sep=';', line_prefix='', prefix=True):
lines = []
if prefix:
lines += _REQS_HEADER
for req in reqs.reqs:
lines.append(req.to_line(marker_sep, line_prefix))
return u''.join(lines)
def to_dict(req_sequence):
reqs = dict()
for req, req_line in req_sequence:
if req is not None:
key = canonical_name(req.package)
reqs.setdefault(key, []).append((req, req_line))
return reqs
def _pass_through(req_line, permit_urls=False):
"""Identify unparsable lines."""
if permit_urls:
return (req_line.startswith('') or
return (req_line.startswith('') or
req_line.startswith('-e') or
def to_reqs(content, permit_urls=False):
for content_line in content.splitlines(True):
req_line = content_line.strip()
if _pass_through(req_line, permit_urls=permit_urls):
yield None, content_line
yield parse_line(req_line, permit_urls=permit_urls), content_line
def check_reqs_bounds_policy(global_reqs):
"""Check that the global requirement version specifiers match the policy.
The policy is defined as
* There needs to be exactly one lower bound (>=1.2 defined)
* There can be one or more excludes (!=1.2.1, !=1.2.2)
* TODO: Clarify (non-) existance of upper caps
for pkg_requirement in global_reqs.values():
req = pkg_requirement[0][0]
if req.package:
_specifiers = packaging.specifiers.SpecifierSet(req.specifiers)
lower_bound = set()
for spec in _specifiers:
if spec.operator == '>=':
if len(lower_bound):
yield ('Requirement %s should not include a >= specifier' %