Files
anvil/tools/multipip
Joshua Harlow d3e2377269 Add a way to test multipip against many samples
Add a test that can read from a data test yaml file
to test alot of multipip version checks at once, making
it easy to add new tests (and delete older ones) in a
relatively simpler manner.

- Move all current sample tests to this new format.
- Fix a bug in allowing variations of matching x<1, x<4
  as a good combination.

Change-Id: I66b806c6d1d804c0de4957f95dda93b0390f94c0
2014-03-28 14:28:01 -07:00

421 lines
14 KiB
Python
Executable File

#!/usr/bin/python
from __future__ import print_function
import collections
import itertools
import logging
import re
import sys
import argparse
import six
import pip.index
import pip.req
import pkg_resources
BAD_REQUIREMENTS = 2
INCOMPATIBLE_REQUIREMENTS = 3
logger = logging.getLogger()
class RequirementException(Exception):
pass
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
"-r", "--requirement",
dest="requirements",
nargs="*",
default=[],
metavar="<file>",
help="Install all the packages listed in the given requirements file")
parser.add_argument(
"requirement_specs",
nargs="*",
default=[],
metavar="<requirement specifier>",
help="Install specified package")
parser.add_argument(
# A regex to be used to skip requirements
"--skip-requirements-regex",
default="",
help=argparse.SUPPRESS)
parser.add_argument(
# The default version control system for editables, e.g. 'svn'
'--default-vcs',
dest='default_vcs',
default='',
help=argparse.SUPPRESS)
parser.add_argument(
"--debug", "-d",
action="store_true",
default=False,
help="Print debug information")
parser.add_argument(
"--ignore-packages",
nargs="*",
default=[],
metavar="<requirement specifier>",
help="Ignore listed packages")
return parser
def setup_logging(options):
level = logging.DEBUG if options.debug else logging.WARNING
handler = logging.StreamHandler(sys.stderr)
logger.addHandler(handler)
logger.setLevel(level)
def install_requirement_ensure_req_field(req):
if not hasattr(req, 'req') or not req.req:
# pip 0.8 or so
link = pip.index.Link(req.url)
name = link.egg_fragment
if not name:
raise Exception("Cannot find package name from `%s'" % req.url)
req.req = pkg_resources.Requirement.parse(name)
return req
def install_requirement_str(req):
return req.url or str(req.req)
def install_requirement_parse(line, comes_from):
line = line.strip()
if line.startswith('-e') or line.startswith('--editable'):
if line.startswith('-e'):
line = line[2:].strip()
else:
line = line[len('--editable'):].strip().lstrip('=')
req = pip.req.InstallRequirement.from_editable(
line, comes_from=comes_from)
else:
req = pip.req.InstallRequirement.from_line(line, comes_from)
return install_requirement_ensure_req_field(req)
def iter_combinations(elements, include_empty=False):
"""Iterates over all combinations of the given elements list."""
if include_empty:
start = 0
else:
start = 1
for i in range(start, len(elements) + 1):
for c in itertools.combinations(elements, i):
yield c
def conflict_scorer(versioned):
"""Scores a list of (op, version) tuples, a higher score means more
conflicts while a lower score means less conflicts.
"""
if len(versioned) == 1:
return 0
op_versions = collections.defaultdict(list)
for (op, version) in versioned:
op_versions[op].append(version)
score = 0
for version in sorted(op_versions.get("==", [])):
for (op, version2) in versioned:
if version != version2:
score += 1
if version == version2 and op != "==":
score += 1
for version in sorted(op_versions.get("!=", [])):
for (op, version2) in versioned:
if op in ["!=", ">", "<"]:
continue
if version2 == version:
score += 1
for version in sorted(op_versions.get(">", [])):
for (op, version2) in versioned:
if (op, version2) == (">", version):
continue
if op in ["<", "<="] and version2 <= version:
score += 1
elif op in ["==", ">="] and version2 == version:
score += 1
elif op == ">" and version2 < version:
score += 1
elif op == ">=" and version2 <= version:
score += 1
for version in sorted(op_versions.get(">=", [])):
for (op, version2) in versioned:
if (op, version2) == (">=", version):
continue
if op in ["<", "<="] and version2 < version:
score += 1
elif op in [">=", ">"] and version2 < version:
score += 1
for version in sorted(op_versions.get("<", [])):
for (op, version2) in versioned:
if (op, version2) == ("<", version):
continue
if op in [">", ">="] and version2 >= version:
score += 1
elif op in ["==", "<="] and version2 == version:
score += 1
elif op == "<" and version2 > version:
score += 1
elif op == "<=" and version2 >= version:
score += 1
for version in sorted(op_versions.get("<=", [])):
for (op, version2) in versioned:
if (op, version2) == ("<=", version):
continue
if op in [">", ">="] and version2 > version:
score += 1
elif op in ["<=", "<"] and version2 > version:
score += 1
return score
def find_best_match(versioned, counts, scorer_func):
"""Iterates over all combinations of the given version and comparator in
the provided lists and finds the one with the best score (closest to zero
with the maximum number of elements).
"""
scored = []
for combo in iter_combinations(versioned):
scored.append((combo, scorer_func(combo)))
if len(scored) == 0:
raise ValueError("No version combinations scored")
# Find the lowest score with the highest number of elements.
min_score = sys.maxint
for (combo, combo_score) in scored:
if combo_score < min_score:
min_score = combo_score
max_elems = -1
best_matches = []
for (combo, combo_score) in scored:
if min_score == combo_score:
if len(combo) > max_elems:
best_matches = [combo]
max_elems = len(combo)
if len(combo) == max_elems:
best_matches.append(combo)
if len(best_matches) == 1:
best_match = best_matches[0]
else:
# If equivalent scores, then select the one that has the most requests
# for its combinations over ones that have less requests.
best_score = -1
best_match = None
for match in best_matches:
match_score = 0
for combo in match:
match_score += counts.get(combo, 0)
if match_score > best_score:
best_score = match_score
best_match = match
incompatibles = set()
for (combo, combo_score) in scored:
for spec in combo:
if spec not in best_match:
incompatibles.add(spec)
return (best_match, incompatibles)
def best_match(req_key, req_list):
"""Attempts to find the versions which will work the best for the given
requirement specification list.
"""
def fetch_specs(req):
try:
specs = req.req.specs
except (AttributeError, TypeError):
specs = []
return tuple(specs)
all_specs = []
for req in req_list:
all_specs.extend(fetch_specs(req))
if not all_specs:
return (req_list, [])
def spec_sort(spec1, spec2):
(op1, version1) = spec1
(op2, version2) = spec2
c = cmp(version1, version2)
if c == 0:
c = cmp(op1, op2)
return c
def reform(specs, versions):
# Covert the parsed versions back into the string versions so that
# we can return that as matches (instead of the comparable versions).
cleaned_specs = []
for (op, version) in specs:
cleaned_specs.append((op, versions[version]))
# Try to see if any of the requirements that we had actually had this
# exact spec, if so then just return that as the requirement, if not
# create a requirement instead.
specs = tuple(cleaned_specs)
sources = []
for req in req_list:
if fetch_specs(req) == specs:
sources.append(req)
if not sources:
spec_pieces = []
for (op, version) in specs:
spec_pieces.append("%s%s" % (op, version))
spec = "%s%s" % (req_key, ",".join(spec_pieces))
sources.append(pip.req.InstallRequirement.from_line(spec, 'compiled'))
return sources
def reform_incompatibles(incompatible_specs, versions):
causes = []
for (op, version) in incompatible_specs:
matches = 0
version = versions[version]
for req in req_list:
matched = False
for (op2, version2) in fetch_specs(req):
if (op2, version2) == (op, version):
matched = True
break
if matched:
if req not in causes:
causes.append(req)
matches += 1
if not matches:
spec_pieces = "%s%s" % (op, version)
spec = "%s%s" % (req_key, spec_pieces)
causes.append(pip.req.InstallRequirement.from_line(spec,
"compiled conflict"))
return causes
versions = {}
versioned = set()
counts = collections.defaultdict(int)
for (op, version) in all_specs:
parsed_version = pkg_resources.parse_version(version)
versioned.add((op, parsed_version))
versions[parsed_version] = version
counts[(op, parsed_version)] += 1
versioned = list(sorted(versioned, cmp=spec_sort))
initial_score = conflict_scorer(versioned)
if initial_score == 0:
incompatibles = []
match = versioned
else:
match, incompatibles = find_best_match(versioned, counts, conflict_scorer)
return (reform(match, versions),
reform_incompatibles(incompatibles, versions))
def parse_requirements(options):
"""Parse package requirements from command line and files.
:return: tuple (all, ignored) of InstallRequirement
"""
all_requirements = {}
skip_match = None
if options.skip_requirements_regex:
skip_match = re.compile(options.skip_requirements_regex)
for req_spec in options.requirement_specs:
try:
req = install_requirement_parse(req_spec, "command line")
if skip_match and skip_match.search(req.req.key):
continue
all_requirements.setdefault(req.req.key, []).append(req)
except Exception as ex:
raise RequirementException("Cannot parse `%s': %s" % (req_spec, ex))
for filename in options.requirements:
try:
for req in pip.req.parse_requirements(filename):
req = install_requirement_ensure_req_field(req)
if skip_match and skip_match.search(req.req.key):
continue
all_requirements.setdefault(req.req.key, []).append(req)
except Exception as ex:
raise RequirementException("Cannot parse `%s': %s" % (filename, ex))
ignored_requirements = []
for req_spec in options.ignore_packages:
try:
req = install_requirement_parse(req_spec, "command line")
ignored_requirements.append(req)
except Exception as ex:
raise RequirementException("Cannot parse `%s': %s" % (req_spec, ex))
return (all_requirements, ignored_requirements)
def join_requirements(requirements, ignored_requirements):
skip_keys = set(pkg.req.key for pkg in ignored_requirements)
incompatibles = {}
joined_requirements = {}
for (req_key, req_list) in six.iteritems(requirements):
if req_key in skip_keys:
continue
req_matches, req_incompatibles = best_match(req_key, req_list)
joined_requirements[req_key] = req_matches
if req_incompatibles:
incompatibles[req_key] = req_incompatibles
return (joined_requirements, incompatibles)
def print_requirements(joined_requirements):
formatted_requirements = []
for req_key in sorted(six.iterkeys(joined_requirements)):
req = joined_requirements[req_key][0]
if req.url:
req = "%s#egg=%s" % (req.url, req.req)
else:
req = str(req.req)
formatted_requirements.append(req)
for req in formatted_requirements:
print(req)
def print_incompatibles(incompatibles, joined_requirements):
for req_key in sorted(six.iterkeys(incompatibles)):
req_incompatibles = incompatibles[req_key]
if not req_incompatibles:
continue
print("%s: incompatible requirements" % (req_key),
file=sys.stderr)
chosen_reqs = joined_requirements.get(req_key, [])
if chosen_reqs:
print("Choosing:", file=sys.stderr)
for chosen in chosen_reqs:
print("\t%s: %s" % (chosen.comes_from,
install_requirement_str(chosen)),
file=sys.stderr)
print("Conflicting:", file=sys.stderr)
for conflicting in req_incompatibles:
print("\t%s: %s" % (conflicting.comes_from,
install_requirement_str(conflicting)),
file=sys.stderr)
def main():
parser = create_parser()
options = parser.parse_args()
setup_logging(options)
try:
requirements, ignored_requirements = parse_requirements(options)
except RequirementException as ex:
logger.error("Requirement failure: %s", ex)
sys.exit(BAD_REQUIREMENTS)
else:
joined_requirements, incompatibles = join_requirements(requirements,
ignored_requirements)
print_incompatibles(incompatibles, joined_requirements)
print_requirements(joined_requirements)
if incompatibles:
sys.exit(INCOMPATIBLE_REQUIREMENTS)
if __name__ == "__main__":
main()