anvil/tools/multipip
Alessio Ababilov 2cbb151b3c Handle strict version conflicts correctly
Handle version conflicts like 'x>1' and 'x<=1'.

Change-Id: I5cfab2d8b3428ad6c772931011f0cefd3eb74ab7
Fixes: bug #1208335
2013-08-08 12:33:50 +03:00

341 lines
11 KiB
Python
Executable File

#!/usr/bin/python
import argparse
import distutils.spawn
import logging
import re
import subprocess
import sys
import pip.index
import pip.req
import pkg_resources
BAD_REQUIREMENTS = 2
INCOMPATIBLE_REQUIREMENTS = 3
logger = logging.getLogger()
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
"-r", "--requirement",
dest="requirements",
nargs="*",
default=[],
metavar="<file>",
help="Install all the packages listed in the given requirements file")
parser.add_argument(
"requirement_specs",
nargs="*",
default=[],
metavar="<requirement specifier>",
help="Install specified package")
parser.add_argument(
# A regex to be used to skip requirements
"--skip-requirements-regex",
default="",
help=argparse.SUPPRESS)
parser.add_argument(
# The default version control system for editables, e.g. 'svn'
'--default-vcs',
dest='default_vcs',
default='',
help=argparse.SUPPRESS)
parser.add_argument(
"--debug", "-d",
action="store_true",
default=False,
help="Print debug information")
parser.add_argument(
"--ignore-installed", "-i",
action="store_true",
default=False,
help="Ignore installed packages")
parser.add_argument(
"--ignore-packages",
nargs="*",
default=[],
metavar="<requirement specifier>",
help="Ignore listed packages")
parser.add_argument(
"--frozen", "-f",
action="store_true",
default=False,
help="Make requirements meet installed packages (taken from pip freeze)")
pip_executable = (distutils.spawn.find_executable("pip") or
distutils.spawn.find_executable("pip-python"))
parser.add_argument(
"--pip",
metavar="<filename>",
default=pip_executable,
help="Full or short name of pip executable (default: %s)" %
pip_executable)
return parser
def setup_logging(options):
level = logging.DEBUG if options.debug else logging.WARNING
handler = logging.StreamHandler(sys.stderr)
logger.addHandler(handler)
logger.setLevel(level)
incompatibles = set()
joined_requirements = []
def install_requirement_ensure_req_field(req):
if not hasattr(req, 'req') or not req.req:
# pip 0.8 or so
link = pip.index.Link(req.url)
name = link.egg_fragment
if not name:
raise Exception("Cannot find package name from `%s'" % req.url)
req.req = pkg_resources.Requirement.parse(name)
return req
def install_requirement_str(req):
return req.url or str(req.req)
def install_requirement_parse(line, comes_from):
line = line.strip()
if line.startswith('-e') or line.startswith('--editable'):
if line.startswith('-e'):
line = line[2:].strip()
else:
line = line[len('--editable'):].strip().lstrip('=')
req = pip.req.InstallRequirement.from_editable(
line, comes_from=comes_from)
else:
req = pip.req.InstallRequirement.from_line(line, comes_from)
return install_requirement_ensure_req_field(req)
def incompatible_requirement(chosen, conflicting):
if chosen.req.key not in incompatibles:
incompatibles.add(chosen.req.key)
print >> sys.stderr, "%s: incompatible requirements" % chosen.req.key
print >> sys.stderr, "Choosing:"
print >> sys.stderr, ("\t%s: %s" %
(chosen.comes_from,
install_requirement_str(chosen)))
print >> sys.stderr, "Conflicting:"
print >> sys.stderr, ("\t%s: %s" %
(conflicting.comes_from,
install_requirement_str(conflicting)))
def parse_requirements(options):
"""Parse package requirements from command line and files.
:return: tuple (all, ignored) of InstallRequirement
"""
all_requirements = {}
skip_match = None
if options.skip_requirements_regex:
skip_match = re.compile(options.skip_requirements_regex)
for req_spec in options.requirement_specs:
try:
req = install_requirement_parse(req_spec, "command line")
if skip_match and skip_match.search(req.req.key):
continue
all_requirements.setdefault(req.req.key, []).append(req)
except Exception as ex:
logger.error("Cannot parse `%s': %s" % (req_spec, ex))
sys.exit(BAD_REQUIREMENTS)
for filename in options.requirements:
try:
for req in pip.req.parse_requirements(filename):
req = install_requirement_ensure_req_field(req)
if skip_match and skip_match.search(req.req.key):
continue
all_requirements.setdefault(req.req.key, []).append(req)
except Exception as ex:
logger.error("Cannot parse `%s': %s" % (filename, ex))
sys.exit(BAD_REQUIREMENTS)
ignored_requirements = []
for req_spec in options.ignore_packages:
try:
req = install_requirement_parse(req_spec, "command line")
ignored_requirements.append(req)
except Exception as ex:
logger.error("Cannot parse `%s': %s" % (req_spec, ex))
sys.exit(BAD_REQUIREMENTS)
return all_requirements, ignored_requirements
def installed_packages(options):
pip_cmdline = [
options.pip,
"freeze",
]
(package_list, _) = subprocess.Popen(
pip_cmdline, stdout=subprocess.PIPE).communicate()
pkg_list = []
for line in package_list.splitlines():
try:
pkg_list.append(install_requirement_parse(line, "pip freeze").req)
except Exception:
pass
return pkg_list
def join_one_requirement(req_list):
"""Join requirement list for one package together.
Possible returns:
* ==A - exact version (even when there are conflicts)
* >=?A,<=?B,(!=C)+ - line segment (no conflicts detected)
* >=?A,(!=C)+ - more than (also when conflicts detected)
:param:req_list list of pip.req.InstallRequirement
:return: pip.req.InstallRequirement
"""
if len(req_list) == 1:
return req_list[0]
lower_bound_str = None
lower_bound_version = None
upper_bound_str = None
upper_bound_version = None
conflicts = []
for req in req_list:
for spec in req.req.specs:
if spec[0] == "==":
return req
spec_str = "%s%s" % spec
if spec[0] == "!=":
conflicts.append(spec_str)
continue
version = pkg_resources.parse_version(spec[1])
# strict_check is < or >, not <= or >=
strict_check = len(spec[0]) == 1
if spec[0][0] == ">":
if (not lower_bound_version or (version > lower_bound_version) or
(strict_check and version == lower_bound_version)):
lower_bound_version = version
lower_bound_str = spec_str
else:
if (not upper_bound_version or (version < upper_bound_version) or
(strict_check and version == upper_bound_version)):
upper_bound_version = version
upper_bound_str = spec_str
req_key = req_list[0].req.key
if lower_bound_version and upper_bound_version:
if lower_bound_version > upper_bound_version:
upper_bound_str = None
if lower_bound_version == upper_bound_version:
if lower_bound_str[1] == "=" and upper_bound_str[1] == "=":
return pip.req.InstallRequirement.from_line(
"%s==%s" % (req_key, upper_bound_str[2:]),
"compiled")
else:
upper_bound_str = None
req_specs = []
if lower_bound_str:
req_specs.append(lower_bound_str)
if upper_bound_str:
req_specs.append(upper_bound_str)
req_specs.extend(conflicts)
return pip.req.InstallRequirement.from_line(
"%s%s" % (req_key, ",".join(req_specs)),
"compiled")
def join_requirements(options):
global joined_requirements
all_requirements, ignored_requirements = parse_requirements(options)
skip_keys = set(pkg.req.key for pkg in ignored_requirements)
installed_by_key = {}
installed_requrements = []
if options.ignore_installed or options.frozen:
installed_requrements = installed_packages(options)
if options.ignore_installed:
skip_keys |= set(pkg.key for pkg in installed_requrements)
if options.frozen:
installed_by_key = dict((pkg.key, pkg) for pkg in installed_requrements)
for req_key, req_list in all_requirements.iteritems():
if req_key in skip_keys:
continue
joined_req = join_one_requirement(req_list)
try:
installed_req = installed_by_key[req_key]
installed_version = installed_req.index[0][0]
except (KeyError, IndexError):
pass
else:
if installed_version not in joined_req.req:
frozen_req = pip.req.InstallRequirement.from_line(
"%s>=%s" % (installed_req.project_name,
installed_req.specs[0][1]),
"pip freeze")
incompatible_requirement(frozen_req, joined_req)
joined_req = frozen_req
joined_requirements.append(joined_req)
segment_ok = False
lower_version = None
lower_strict = False
exact_version = None
conflicts = []
for parsed, trans, op, ver in joined_req.req.index:
if op[0] == ">":
lower_version = parsed
lower_strict = len(op) == 1
elif op[0] == "<":
segment_ok = True
elif op[0] == "=":
exact_version = parsed
else:
conflicts.append(parsed)
if exact_version:
for req in req_list:
if exact_version not in req.req:
incompatible_requirement(joined_req, req)
else:
for req in req_list:
for parsed, trans, op, ver in req.req.index:
if op[0] == "=":
if parsed in conflicts:
incompatible_requirement(joined_req, req)
break
elif not segment_ok and op[0] == "<":
# analyse lower bound: x >= A or x > A
if (lower_version > parsed or (
lower_version == parsed and
(lower_strict or len(op) != 2))):
incompatible_requirement(joined_req, req)
break
def print_requirements():
formatted_requirements = []
for req in joined_requirements:
if req.url:
req = "%s#egg=%s" % (req.url, req.req)
else:
req = str(req.req)
formatted_requirements.append(req)
for req in sorted(formatted_requirements):
print req
def main():
parser = create_parser()
options = parser.parse_args()
setup_logging(options)
join_requirements(options)
print_requirements()
if incompatibles:
sys.exit(INCOMPATIBLE_REQUIREMENTS)
if __name__ == "__main__":
main()