[RPM] Use yum library for packages comparison

Closes-bug: #1587032

Change-Id: I8c857edb839a441b71864d915ef4a814463ed81b
This commit is contained in:
Sergey Kulanov 2016-05-30 23:03:32 +03:00
parent 2bd9a9415d
commit 07079a0b55
5 changed files with 62 additions and 205 deletions

View File

@ -19,6 +19,7 @@
import copy
import multiprocessing
import os
import rpmUtils
import shutil
import tempfile
@ -30,10 +31,10 @@ from packetary.drivers.base import RepositoryDriverBase
from packetary.library.checksum import composite as checksum_composite
from packetary.library.streams import GzipDecompress
from packetary.library import utils
from packetary.objects.base import ComparableObject
from packetary.objects import FileChecksum
from packetary.objects import Package
from packetary.objects import PackageRelation
from packetary.objects import PackageVersion
from packetary.objects import Repository
from packetary.objects import VersionRange
from packetary.schemas import RPM_REPO_SCHEMA
@ -86,6 +87,53 @@ class CreaterepoCallBack(object):
pass
class RpmPackageVersion(ComparableObject):
"""The Package version."""
__slots__ = ["epoch", "version", "release"]
def __init__(self, epoch, version, release=None):
self.epoch = epoch
self.version = version
self.release = release or ''
@classmethod
def from_string(cls, text):
"""Constructs from string.
:param text: the version in format '[{epoch-}]-{version}-{release}'
"""
(epoch, version, release) = rpmUtils.miscutils.stringToVersion(text)
return cls(epoch, version, release)
def cmp(self, other):
if not isinstance(other, RpmPackageVersion):
other = RpmPackageVersion.from_string(str(other))
return rpmUtils.miscutils.compareEVR(
(self.epoch, self.version, self.release),
(other.epoch, other.version, other.release)
)
def __eq__(self, other):
if other is self:
return True
return self.cmp(other) == 0
def __str__(self):
if self.release:
return "{0}:{1}-{2}".format(
self.epoch,
self.version,
self.release,
)
else:
return "{0}:{1}".format(
self.epoch,
self.version,
)
class RpmRepositoryDriver(RepositoryDriverBase):
def get_repository_data_schema(self):
return RPM_REPO_SCHEMA
@ -194,7 +242,7 @@ class RpmRepositoryDriver(RepositoryDriverBase):
return Package(
repository=repository,
name=hdr["name"],
version=PackageVersion(
version=RpmPackageVersion(
hdr['epoch'], hdr['version'], hdr['release']
),
filesize=int(hdr['size']),
@ -381,7 +429,7 @@ class RpmRepositoryDriver(RepositoryDriverBase):
return [
PackageRelation(
x[0], VersionRange(
_OPERATORS_MAPPING[x[1]], x[1] and PackageVersion(*x[2])
_OPERATORS_MAPPING[x[1]], x[1] and RpmPackageVersion(*x[2])
)
)
for x in relations
@ -416,11 +464,11 @@ class RpmRepositoryDriver(RepositoryDriverBase):
"""Gets the package version from attributes.
:param attrs: the relation tag attributes
:return: the PackageVersion object
:return: the RpmPackageVersion object
"""
return PackageVersion(
attrs.get("epoch", 0),
return RpmPackageVersion(
attrs.get("epoch", "0"),
attrs.get("ver", "0.0"),
attrs.get("rel")
)

View File

@ -21,7 +21,6 @@ from packetary.objects.package import FileChecksum
from packetary.objects.package import Package
from packetary.objects.package_relation import PackageRelation
from packetary.objects.package_relation import VersionRange
from packetary.objects.package_version import PackageVersion
from packetary.objects.packages_forest import PackagesForest
from packetary.objects.packages_tree import PackagesTree
from packetary.objects.repository import Repository
@ -34,7 +33,6 @@ __all__ = [
"PackageRelation",
"PackagesForest",
"PackagesTree",
"PackageVersion",
"Repository",
"VersionRange",
]

View File

@ -1,154 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from packetary.objects.base import ComparableObject
class PackageVersion(ComparableObject):
"""The Package version."""
__slots__ = ["epoch", "version", "release"]
def __init__(self, epoch, version, release=None):
self.epoch = int(epoch or 0)
self.version = tuple(version.split('.'))
if release:
self.release = tuple(release.split('.'))
else:
self.release = None
@classmethod
def from_string(cls, text):
"""Constructs from string.
:param text: the version in format '[{epoch-}]-{version}-{release}'
"""
pos1 = text.find(':')
if pos1 != -1:
epoch = text[0:pos1]
else:
epoch = 0
pos1 += 1
pos2 = text.find('-', pos1)
if pos2 != -1:
version = text[pos1: pos2]
release = text[pos2 + 1:]
else:
version = text[pos1:]
release = None
return cls(epoch, version, release)
def cmp(self, other):
if not isinstance(other, PackageVersion):
other = PackageVersion.from_string(str(other))
if not isinstance(other, PackageVersion):
raise TypeError
if self.epoch < other.epoch:
return -1
if self.epoch > other.epoch:
return 1
res = self._cmp_version_part(self.version, other.version)
if res != 0:
return res
if self.release and other.release:
return self._cmp_version_part(self.release, other.release)
return 0
def __eq__(self, other):
if other is self:
return True
return self.cmp(other) == 0
def __str__(self):
if self.release:
return "{0}:{1}-{2}".format(
self.epoch,
".".join(str(x) for x in self.version),
".".join(str(x) for x in self.release)
)
else:
return "{0}:{1}".format(
self.epoch,
".".join(str(x) for x in self.version),
)
@classmethod
def _order(cls, x):
"""Return an integer value for character x"""
if x.isdigit():
return int(x) + 1
if x.isalpha():
return ord(x)
return ord(x) + 256
@classmethod
def _cmp_version_string(cls, version1, version2):
"""Compares two versions as string."""
la = [cls._order(x) for x in version1]
lb = [cls._order(x) for x in version2]
while la or lb:
a = 0
b = 0
if la:
a = la.pop(0)
if lb:
b = lb.pop(0)
# handle the tilde separator, which is ~ = (int) 382
# _order("~") == 382
# if both versions have tilde then let's continue
if a == b == 382:
continue
if a == 382:
return -1
if b == 382:
return 1
if a < b:
return -1
elif a > b:
return 1
return 0
@classmethod
def _cmp_version_part(cls, version1, version2):
"""Compares two versions."""
ver1_it = iter(version1)
ver2_it = iter(version2)
while True:
v1 = next(ver1_it, None)
v2 = next(ver2_it, None)
if v1 is None or v2 is None:
if v1 is not None:
return 1
if v2 is not None:
return -1
return 0
if v1.isdigit() and v2.isdigit():
a = int(v1)
b = int(v2)
if a < b:
return -1
if a > b:
return 1
else:
r = cls._cmp_version_string(v1, v2)
if r != 0:
return r

View File

@ -20,7 +20,6 @@ import copy
import six
from packetary.objects import PackageRelation
from packetary.objects import PackageVersion
from packetary.objects import VersionRange
@ -215,44 +214,3 @@ class TestVersionRange(TestObjectBase):
def test_intersection_is_typesafe(self):
with self.assertRaises(TypeError):
VersionRange("=", 1).has_intersection(("=", 1))
class TestPackageVersion(base.TestCase):
def test_get_from_string(self):
ver = PackageVersion.from_string("1.0-22")
self.assertEqual(0, ver.epoch)
self.assertEqual(('1', '0'), ver.version)
self.assertEqual(('22',), ver.release)
ver2 = PackageVersion.from_string("1:11.0-2")
self.assertEqual(1, ver2.epoch)
self.assertEqual(('11', '0'), ver2.version)
self.assertEqual(('2',), ver2.release)
ver3 = PackageVersion.from_string("11.0")
self.assertEqual(0, ver3.epoch)
self.assertEqual(('11', '0'), ver3.version)
self.assertIsNone(ver3.release)
def test_compare(self):
ver1 = PackageVersion.from_string("6.3-31.5")
ver2 = PackageVersion.from_string("13.9-16.12")
ver3 = PackageVersion.from_string("13.9")
ver4 = PackageVersion.from_string("1:13.9")
ver5 = PackageVersion.from_string("1:8.0.0~b3")
ver6 = PackageVersion.from_string("1:8.0.0")
ver7 = PackageVersion.from_string("1:8.0.0~a1")
self.assertLess(ver1, ver2)
self.assertGreater(ver2, ver1)
self.assertEqual(ver1, ver1)
self.assertLess(ver1, "6.3-40")
self.assertGreater(ver1, "6.3-31.4a")
self.assertEqual(ver2, ver3)
self.assertGreater(ver4, ver3)
self.assertGreater(ver4, ver2)
# test tilda in versioning
# rpmdev-vercmp 1:8.0.0~b3 1:8.0.0
# 1:8.0.0~b3 < 1:8.0.0
# 1:8.0.0~b3 > 1:8.0.0~a1
self.assertGreater(ver6, ver5)
self.assertGreater(ver5, ver7)

View File

@ -43,8 +43,10 @@ class TestRpmDriver(base.TestCase):
@classmethod
def setUpClass(cls):
sys.modules["createrepo"] = mock.MagicMock()
sys.modules["rpmUtils"] = mock.MagicMock()
from packetary.drivers import rpm_driver
cls.createrepo = rpm_driver.createrepo = mock.MagicMock()
cls.rpmUtils = rpm_driver.rpmUtils = mock.MagicMock()
super(TestRpmDriver, cls).setUpClass()
cls.driver = rpm_driver.RpmRepositoryDriver()
@ -52,6 +54,7 @@ class TestRpmDriver(base.TestCase):
def setUp(self):
self.createrepo.reset_mock()
self.rpmUtils.reset_mock()
self.connection = mock.MagicMock()
def configure_streams(self, groups_gzipped=True):
@ -127,6 +130,10 @@ class TestRpmDriver(base.TestCase):
self.assertEqual(2, len(packages))
package = packages[0]
self.assertEqual("test1", package.name)
self.rpmUtils.miscutils.stringToVersion.return_value = (
"0", "1.1.1.1", "1.el7"
)
self.rpmUtils.miscutils.compareEVR.return_value = 0
self.assertEqual("1.1.1.1-1.el7", package.version)
self.assertEqual(100, package.filesize)
self.assertEqual(
@ -366,7 +373,7 @@ class TestRpmDriver(base.TestCase):
rpm_mock.returnFileEntries.return_value = ["/usr/bin/test"]
self.createrepo.yumbased.YumLocalPackage.return_value = rpm_mock
rpm_mock.returnLocalHeader.return_value = {
"name": "Test", "epoch": 1, "version": "1.2.3", "release": "1",
"name": "Test", "epoch": "1", "version": "1.2.3", "release": "1",
"size": "10", "group": "Group"
}
repo = gen_repository("Test", url="file:///repo/os/x86_64/")